From 414c30f02370eef6b24b7b46a6ca93308d2cf741 Mon Sep 17 00:00:00 2001 From: weishen Date: Sun, 5 Apr 2026 16:27:27 +0800 Subject: [PATCH] feat: client JSONL parse and push script Co-Authored-By: Claude Opus 4.6 --- scripts/sync_sessions.py | 415 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 415 insertions(+) create mode 100755 scripts/sync_sessions.py diff --git a/scripts/sync_sessions.py b/scripts/sync_sessions.py new file mode 100755 index 0000000..7ea9f9f --- /dev/null +++ b/scripts/sync_sessions.py @@ -0,0 +1,415 @@ +#!/usr/bin/env python +""" +OpenClaw Session Sync Script + +Scans local agent sessions directories, parses JSONL files, +and pushes structured JSON to the Django API. + +Usage: + python sync_sessions.py --remote-url http://macmini:8000/api/sessions/bulk_upsert/ + +Cron: + 0 2 * * * cd /path/to/scripts && python sync_sessions.py --remote-url +""" + +import argparse +import json +import os +import sys +import time +import urllib.error +import urllib.request +from pathlib import Path + +# ───────────────────────────────────────────────────────────────── +# Configuration +# ───────────────────────────────────────────────────────────────── + +SESSIONS_DIR_NAME = "sessions" +STATE_FILE = ".sync_state" +DELETED_SUFFIX = ".deleted." + + +# ───────────────────────────────────────────────────────────────── +# File Discovery +# ───────────────────────────────────────────────────────────────── + +def find_sessions(root_path): + """Walk root_path/agents/*/sessions/ and yield (agent_name, jsonl_path).""" + agents_dir = Path(root_path) / "agents" + if not agents_dir.exists(): + return + for agent_folder in sorted(agents_dir.iterdir()): + if not agent_folder.is_dir(): + continue + sessions_dir = agent_folder / SESSIONS_DIR_NAME + if not sessions_dir.exists(): + continue + agent_name = agent_folder.name + for jsonl_file in sorted(sessions_dir.glob("*.jsonl")): + if DELETED_SUFFIX in jsonl_file.name: + continue + yield agent_name, str(jsonl_file) + + +def get_sync_state(sessions_dir): + """Read .sync_state from sessions directory, return {path: mtime}.""" + state_path = Path(sessions_dir) / STATE_FILE + if not state_path.exists(): + return {} + try: + with open(state_path) as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + return {} + + +def save_sync_state(sessions_dir, state): + """Write .sync_state file.""" + state_path = Path(sessions_dir) / STATE_FILE + with open(state_path, "w") as f: + json.dump(state, f) + + +def get_new_files(root_path): + """Find files that are new or modified since last sync.""" + state = {} + all_sessions_dirs = set() + + agents_dir = Path(root_path) / "agents" + if agents_dir.exists(): + for agent_folder in agents_dir.iterdir(): + if agent_folder.is_dir(): + sessions_dir = agent_folder / SESSIONS_DIR_NAME + if sessions_dir.exists(): + all_sessions_dirs.add(str(sessions_dir)) + + # Load existing state from all session dirs + merged_state = {} + for sd in all_sessions_dirs: + sd_state = get_sync_state(sd) + merged_state.update(sd_state) + + new_files = [] + for agent_name, jsonl_path in find_sessions(root_path): + stat = os.stat(jsonl_path) + mtime = stat.st_mtime + file_key = jsonl_path + old_mtime = merged_state.get(file_key, 0) + if mtime > old_mtime: + new_files.append((agent_name, jsonl_path)) + merged_state[file_key] = mtime + + # Save new state + for sd in all_sessions_dirs: + dir_files = {k: v for k, v in merged_state.items() if k.startswith(sd)} + save_sync_state(sd, dir_files) + + return new_files + + +# ───────────────────────────────────────────────────────────────── +# JSONL Parser +# ───────────────────────────────────────────────────────────────── + +def parse_jsonl(file_path): + """Parse a JSONL file and return structured session data.""" + sessions = [] + messages = [] + tool_calls = [] + + # State tracking for model/thinking changes + current_model_provider = "" + current_model_id = "" + current_thinking_level = "" + + # Tool results lookup by tool_call_id + tool_results = {} + + events = [] + with open(file_path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + event = json.loads(line) + events.append(event) + except json.JSONDecodeError: + continue + + if not events: + return sessions, messages, tool_calls + + # First pass: extract session metadata + session_event = None + for event in events: + event_type = event.get("type", "") + if event_type == "session": + session_event = event + break + + if not session_event: + return sessions, messages, tool_calls + + session_id = session_event.get("id", "") + session_timestamp = session_event.get("timestamp", "") + session_cwd = session_event.get("cwd", "") + session_version = events[-1].get("version", 0) if events else 0 + + # Determine start and end time from all events + timestamps = [] + for event in events: + ts = event.get("timestamp", "") + if ts: + timestamps.append(ts) + + # Second pass: process events + message_seq = 0 + total_tokens = 0 + total_cost = 0.0 + message_count = 0 + tool_call_count = 0 + error_count = 0 + + for event in events: + event_type = event.get("type", "") + + if event_type == "model_change": + current_model_provider = event.get("provider", "") + current_model_id = event.get("modelId", "") + + elif event_type == "thinking_level_change": + current_thinking_level = event.get("thinkingLevel", "") + + elif event_type == "message": + role = event.get("role", "") + msg_id = event.get("id", "") + parent_id = event.get("parentId", "") + msg_timestamp = event.get("timestamp", "") + + # Extract text content (skip thinking) + content_items = event.get("content", []) + text_parts = [] + tc_list = [] + for item in content_items: + if isinstance(item, dict): + if item.get("type") == "text": + text_parts.append(item.get("text", "")) + elif item.get("type") == "toolCall": + tc_list.append(item) + # Skip thinking types + + content_text = "\n".join(text_parts) + + msg_data = { + "session_id": session_id, + "message_id": msg_id, + "parent_id": parent_id, + "seq": message_seq, + "role": role, + "content_text": content_text, + "raw_content": content_items if content_items else [], + "raw_message": event.get("content", []), + "timestamp": msg_timestamp, + } + + if role == "assistant": + usage = event.get("usage", {}) + msg_data.update({ + "model": current_model_id, + "provider": current_model_provider, + "stop_reason": event.get("stopReason", ""), + "tokens_input": usage.get("inputTokens", 0), + "tokens_output": usage.get("outputTokens", 0), + "tokens_cache_read": usage.get("cacheReadInputTokens", 0), + "tokens_cache_write": usage.get("cacheWriteInputTokens", 0), + "tokens_total": usage.get("totalTokens", 0), + }) + total_tokens += usage.get("totalTokens", 0) + + if event.get("cost"): + cost_val = event["cost"].get("total", 0.0) + msg_data["cost_total"] = cost_val + total_cost += cost_val + + message_count += 1 + + elif role == "toolResult": + msg_data.update({ + "tool_call_id": event.get("toolCallId", ""), + "tool_name": event.get("toolName", ""), + "is_error": event.get("isError", False), + "exit_code": event.get("exitCode"), + "duration_ms": event.get("durationMs"), + }) + if event.get("isError"): + error_count += 1 + # Store for tool call association + if event.get("toolCallId"): + tool_results[event["toolCallId"]] = { + "result_text": content_text, + "is_error": event.get("isError", False), + "exit_code": event.get("exitCode"), + "duration_ms": event.get("durationMs"), + } + + messages.append(msg_data) + message_seq += 1 + + # Extract tool calls from assistant messages + tc_seq = 0 + for tc in tc_list: + tool_call_data = { + "session_id": session_id, + "message_id": msg_id, + "tool_call_id": tc.get("id", f"call_{msg_id}_{tc_seq}"), + "tool_name": tc.get("name", "unknown"), + "arguments": tc.get("arguments", {}), + "seq": tc_seq, + } + # Enrich with tool result if available + tr = tool_results.get(tool_call_data["tool_call_id"], {}) + tool_call_data["result_text"] = tr.get("result_text", "") + tool_call_data["is_error"] = tr.get("is_error", False) + tool_call_data["exit_code"] = tr.get("exit_code") + tool_call_data["duration_ms"] = tr.get("duration_ms") + tool_calls.append(tool_call_data) + tool_call_count += 1 + tc_seq += 1 + + # Build session record + start_time = timestamps[0] if timestamps else session_timestamp + end_time = timestamps[-1] if timestamps else session_timestamp + + session_data = { + "session_id": session_id, + "session_version": session_version, + "model_provider": current_model_provider, + "model_id": current_model_id, + "thinking_level": current_thinking_level, + "start_time": start_time, + "end_time": end_time, + "cwd": session_cwd, + "total_tokens": total_tokens, + "total_cost": total_cost, + "message_count": message_count, + "tool_call_count": tool_call_count, + "error_count": error_count, + "raw_file_path": str(file_path), + "status": "active", + "metadata": {}, + } + + sessions.append(session_data) + return sessions, messages, tool_calls + + +# ───────────────────────────────────────────────────────────────── +# HTTP Client +# ───────────────────────────────────────────────────────────────── + +def push_to_api(remote_url, payload): + """POST structured JSON to Django API.""" + data = json.dumps(payload).encode("utf-8") + req = urllib.request.Request( + remote_url, + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + try: + with urllib.request.urlopen(req, timeout=120) as resp: + return json.loads(resp.read()) + except urllib.error.HTTPError as e: + print(f"HTTP Error {e.code}: {e.read().decode('utf-8', errors='replace')}") + raise + except urllib.error.URLError as e: + print(f"URL Error: {e.reason}") + raise + except Exception as e: + print(f"Error: {e}") + raise + + + +# ───────────────────────────────────────────────────────────────── +# Main +# ───────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser(description="Sync OpenClaw sessions to Django API") + parser.add_argument( + "--remote-url", + required=True, + help="Django API bulk_upsert endpoint URL", + ) + parser.add_argument( + "--root-path", + default=".", + help="Root path containing agents/ directory (default: current dir)", + ) + args = parser.parse_args() + + new_files = get_new_files(args.root_path) + if not new_files: + print("No new or modified session files found.") + return + + print(f"Found {len(new_files)} new/modified session(s).") + + total_sessions = 0 + total_messages = 0 + total_tool_calls = 0 + + # Group by agent_name (batch per agent) + agent_batches = {} + for agent_name, jsonl_path in new_files: + agent_batches.setdefault(agent_name, []).append(jsonl_path) + + for agent_name, file_paths in agent_batches.items(): + all_sessions = [] + all_messages = [] + all_tool_calls = [] + + for fp in file_paths: + print(f" Parsing: {fp}") + try: + sessions, messages, tool_calls = parse_jsonl(fp) + all_sessions.extend(sessions) + all_messages.extend(messages) + all_tool_calls.extend(tool_calls) + except Exception as e: + print(f" ERROR parsing {fp}: {e}") + continue + + if not all_sessions: + continue + + payload = { + "agent_name": agent_name, + "source_node": os.environ.get("SOURCE_NODE", "unknown"), + "sessions": all_sessions, + "messages": all_messages, + "tool_calls": all_tool_calls, + } + + print(f" Pushing {len(all_sessions)} session(s), " + f"{len(all_messages)} message(s), " + f"{len(all_tool_calls)} tool call(s)...") + + try: + result = push_to_api(args.remote_url, payload) + print(f" OK: {result}") + total_sessions += result.get("sessions_upserted", 0) + total_messages += result.get("messages_upserted", 0) + total_tool_calls += result.get("tool_calls_upserted", 0) + except Exception: + print(f" FAILED to push {agent_name} sessions.") + + print(f"\nSync complete: {total_sessions} sessions, " + f"{total_messages} messages, {total_tool_calls} tool calls pushed.") + + +if __name__ == "__main__": + main()