#!/usr/bin/env python """ OpenClaw Session Sync Script Scans local agent sessions directories, parses JSONL files, and pushes structured JSON to the Django API. Usage: python sync_sessions.py --remote-url http://macmini:8000/api/sessions/bulk_upsert/ Cron: 0 2 * * * cd /path/to/scripts && python sync_sessions.py --remote-url """ import argparse import json import os import sys import time import urllib.error import urllib.request from pathlib import Path # ───────────────────────────────────────────────────────────────── # Configuration # ───────────────────────────────────────────────────────────────── SESSIONS_DIR_NAME = "sessions" STATE_FILE = ".sync_state" DELETED_SUFFIX = ".deleted." # ───────────────────────────────────────────────────────────────── # File Discovery # ───────────────────────────────────────────────────────────────── def find_sessions(root_path): """Walk root_path/agents/*/sessions/ and yield (agent_name, jsonl_path).""" agents_dir = Path(root_path) / "agents" if not agents_dir.exists(): return for agent_folder in sorted(agents_dir.iterdir()): if not agent_folder.is_dir(): continue sessions_dir = agent_folder / SESSIONS_DIR_NAME if not sessions_dir.exists(): continue agent_name = agent_folder.name for jsonl_file in sorted(sessions_dir.glob("*.jsonl")): if DELETED_SUFFIX in jsonl_file.name: continue yield agent_name, str(jsonl_file) def get_sync_state(sessions_dir): """Read .sync_state from sessions directory, return {path: mtime}.""" state_path = Path(sessions_dir) / STATE_FILE if not state_path.exists(): return {} try: with open(state_path) as f: return json.load(f) except (json.JSONDecodeError, IOError): return {} def save_sync_state(sessions_dir, state): """Write .sync_state file.""" state_path = Path(sessions_dir) / STATE_FILE with open(state_path, "w") as f: json.dump(state, f) def get_new_files(root_path): """Find files that are new or modified since last sync.""" state = {} all_sessions_dirs = set() agents_dir = Path(root_path) / "agents" if agents_dir.exists(): for agent_folder in agents_dir.iterdir(): if agent_folder.is_dir(): sessions_dir = agent_folder / SESSIONS_DIR_NAME if sessions_dir.exists(): all_sessions_dirs.add(str(sessions_dir)) # Load existing state from all session dirs merged_state = {} for sd in all_sessions_dirs: sd_state = get_sync_state(sd) merged_state.update(sd_state) new_files = [] for agent_name, jsonl_path in find_sessions(root_path): stat = os.stat(jsonl_path) mtime = stat.st_mtime file_key = jsonl_path old_mtime = merged_state.get(file_key, 0) if mtime > old_mtime: new_files.append((agent_name, jsonl_path)) merged_state[file_key] = mtime # Save new state for sd in all_sessions_dirs: dir_files = {k: v for k, v in merged_state.items() if k.startswith(sd)} save_sync_state(sd, dir_files) return new_files # ───────────────────────────────────────────────────────────────── # JSONL Parser # ───────────────────────────────────────────────────────────────── def parse_jsonl(file_path): """Parse a JSONL file and return structured session data.""" sessions = [] messages = [] tool_calls = [] # State tracking for model/thinking changes current_model_provider = "" current_model_id = "" current_thinking_level = "" # Tool results lookup by tool_call_id tool_results = {} events = [] with open(file_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: event = json.loads(line) events.append(event) except json.JSONDecodeError: continue if not events: return sessions, messages, tool_calls # First pass: extract session metadata session_event = None for event in events: event_type = event.get("type", "") if event_type == "session": session_event = event break if not session_event: return sessions, messages, tool_calls session_id = session_event.get("id", "") session_timestamp = session_event.get("timestamp", "") session_cwd = session_event.get("cwd", "") session_version = events[-1].get("version", 0) if events else 0 # Determine start and end time from all events timestamps = [] for event in events: ts = event.get("timestamp", "") if ts: timestamps.append(ts) # Second pass: process events message_seq = 0 total_tokens = 0 total_cost = 0.0 message_count = 0 tool_call_count = 0 error_count = 0 for event in events: event_type = event.get("type", "") if event_type == "model_change": current_model_provider = event.get("provider", "") current_model_id = event.get("modelId", "") elif event_type == "thinking_level_change": current_thinking_level = event.get("thinkingLevel", "") elif event_type == "message": # Nested structure: message data is inside "message" object message_obj = event.get("message", {}) role = message_obj.get("role", "") msg_id = event.get("id", "") parent_id = event.get("parentId", "") msg_timestamp = event.get("timestamp", "") # Extract text content (skip thinking) from nested content content_items = message_obj.get("content", []) text_parts = [] tc_list = [] for item in content_items: if isinstance(item, dict): if item.get("type") == "text": text_parts.append(item.get("text", "")) elif item.get("type") == "toolCall": tc_list.append(item) # Skip thinking types content_text = "\n".join(text_parts) msg_data = { "session_id": session_id, "message_id": msg_id, "parent_id": parent_id or "", "seq": message_seq, "role": role or "", "content_text": content_text, "raw_content": content_items if content_items else [], "raw_message": message_obj.get("content", []), "timestamp": msg_timestamp, } if role == "assistant": usage = message_obj.get("usage", {}) msg_data.update({ "model": current_model_id, "provider": current_model_provider, "stop_reason": message_obj.get("stopReason", ""), "tokens_input": usage.get("inputTokens", 0), "tokens_output": usage.get("outputTokens", 0), "tokens_cache_read": usage.get("cacheReadInputTokens", 0), "tokens_cache_write": usage.get("cacheWriteInputTokens", 0), "tokens_total": usage.get("totalTokens", 0), }) total_tokens += usage.get("totalTokens", 0) if message_obj.get("cost"): cost_val = message_obj["cost"].get("total", 0.0) msg_data["cost_total"] = cost_val total_cost += cost_val message_count += 1 elif role == "toolResult": msg_data.update({ "tool_call_id": message_obj.get("toolCallId", ""), "tool_name": message_obj.get("toolName", ""), "is_error": message_obj.get("isError", False), "exit_code": message_obj.get("exitCode"), "duration_ms": message_obj.get("durationMs"), }) if message_obj.get("isError"): error_count += 1 if message_obj.get("toolCallId"): tool_results[message_obj["toolCallId"]] = { "result_text": content_text, "is_error": message_obj.get("isError", False), "exit_code": message_obj.get("exitCode"), "duration_ms": message_obj.get("durationMs"), } messages.append(msg_data) message_seq += 1 # Extract tool calls from assistant messages tc_seq = 0 for tc in tc_list: tool_call_data = { "session_id": session_id, "message_id": msg_id, "tool_call_id": tc.get("id", f"call_{msg_id}_{tc_seq}"), "tool_name": tc.get("name", "unknown"), "arguments": tc.get("arguments", {}), "seq": tc_seq, } # Enrich with tool result if available tr = tool_results.get(tool_call_data["tool_call_id"], {}) tool_call_data["result_text"] = tr.get("result_text", "") tool_call_data["is_error"] = tr.get("is_error", False) tool_call_data["exit_code"] = tr.get("exit_code") tool_call_data["duration_ms"] = tr.get("duration_ms") tool_calls.append(tool_call_data) tool_call_count += 1 tc_seq += 1 # Build session record start_time = timestamps[0] if timestamps else session_timestamp end_time = timestamps[-1] if timestamps else session_timestamp session_data = { "session_id": session_id, "session_version": session_version, "model_provider": current_model_provider, "model_id": current_model_id, "thinking_level": current_thinking_level, "start_time": start_time, "end_time": end_time, "cwd": session_cwd, "total_tokens": total_tokens, "total_cost": total_cost, "message_count": message_count, "tool_call_count": tool_call_count, "error_count": error_count, "raw_file_path": str(file_path), "status": "active", "metadata": {}, } sessions.append(session_data) return sessions, messages, tool_calls # ───────────────────────────────────────────────────────────────── # HTTP Client # ───────────────────────────────────────────────────────────────── def push_to_api(remote_url, payload): """POST structured JSON to Django API.""" data = json.dumps(payload).encode("utf-8") req = urllib.request.Request( remote_url, data=data, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=120) as resp: return json.loads(resp.read()) except urllib.error.HTTPError as e: print(f"HTTP Error {e.code}: {e.read().decode('utf-8', errors='replace')}") raise except urllib.error.URLError as e: print(f"URL Error: {e.reason}") raise except Exception as e: print(f"Error: {e}") raise # ───────────────────────────────────────────────────────────────── # Main # ───────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Sync OpenClaw sessions to Django API") parser.add_argument( "--remote-url", required=True, help="Django API bulk_upsert endpoint URL", ) parser.add_argument( "--root-path", default=".", help="Root path containing agents/ directory (default: current dir)", ) args = parser.parse_args() new_files = get_new_files(args.root_path) if not new_files: print("No new or modified session files found.") return print(f"Found {len(new_files)} new/modified session(s).") total_sessions = 0 total_messages = 0 total_tool_calls = 0 # Group by agent_name (batch per agent) agent_batches = {} for agent_name, jsonl_path in new_files: agent_batches.setdefault(agent_name, []).append(jsonl_path) for agent_name, file_paths in agent_batches.items(): all_sessions = [] all_messages = [] all_tool_calls = [] for fp in file_paths: print(f" Parsing: {fp}") try: sessions, messages, tool_calls = parse_jsonl(fp) all_sessions.extend(sessions) all_messages.extend(messages) all_tool_calls.extend(tool_calls) except Exception as e: print(f" ERROR parsing {fp}: {e}") continue if not all_sessions: continue payload = { "agent_name": agent_name, "source_node": os.environ.get("SOURCE_NODE", "unknown"), "sessions": all_sessions, "messages": all_messages, "tool_calls": all_tool_calls, } print(f" Pushing {len(all_sessions)} session(s), " f"{len(all_messages)} message(s), " f"{len(all_tool_calls)} tool call(s)...") try: result = push_to_api(args.remote_url, payload) print(f" OK: {result}") total_sessions += result.get("sessions_upserted", 0) total_messages += result.get("messages_upserted", 0) total_tool_calls += result.get("tool_calls_upserted", 0) except Exception: print(f" FAILED to push {agent_name} sessions.") print(f"\nSync complete: {total_sessions} sessions, " f"{total_messages} messages, {total_tool_calls} tool calls pushed.") if __name__ == "__main__": main()