From c517a0bbde45798fed59cb9da33b745ae77f051b Mon Sep 17 00:00:00 2001 From: ishenwei Date: Thu, 16 Apr 2026 10:06:24 +0800 Subject: [PATCH] sync_sessions.py: add --sync-ssh for remote session sync via SSH - Add --sync-ssh HOST --sync-source-path PATH to sync sessions from a remote host without needing to install the script there. - Key changes: * ssh_list_dir(): enumerate subdirs only (fixes 'sessions' file in agents/) * ssh_list_files(): use ls -p to filter directories; strip remote_dir prefix * ssh_stat_mtime_batch(): single SSH call via Python stdin, works on both Linux and macOS, avoids xargs command-line length limits * sync_sessions_via_ssh(): parallel mtime fetch per agent dir, then SSH-read & parse only new/modified files * parse_jsonl_content(): shared parser for both local files and SSH content - Session sync now unified: run from macmini, targets macmini/ubuntu1/ubuntu2. - Cron sync also updated to use ssh_stat_mtime_batch for efficiency. --- scripts/sync_sessions.py | 482 +++++++++++++++++++++++++++++---------- 1 file changed, 359 insertions(+), 123 deletions(-) diff --git a/scripts/sync_sessions.py b/scripts/sync_sessions.py index f55e42c..3c44ee6 100755 --- a/scripts/sync_sessions.py +++ b/scripts/sync_sessions.py @@ -2,23 +2,29 @@ """ OpenClaw Session Sync Script -Scans local agent sessions directories, parses JSONL files, +Scans local or remote (via SSH) agent session directories, parses JSONL files, and pushes structured JSON to the Django API. Usage: - # Session sync (existing) + # Local session sync python sync_sessions.py --remote-url http://macmini:8000/api/sessions/bulk_upsert/ - # Cron job sync (new) + # Remote session sync via SSH (single source, run from macmini) + python sync_sessions.py --sync-ssh ubuntu1 \ + --sync-source-path /home/shenwei/.openclaw \ + --remote-url http://192.168.3.45:8765/api/sessions/bulk_upsert/ + + # Remote session sync via SSH (all three nodes) + python sync_sessions.py --sync-ssh macmini \ + --sync-source-path /Users/weishen/.openclaw \ + --remote-url http://192.168.3.45:8765/api/sessions/bulk_upsert/ + + # Cron job sync (existing) python sync_sessions.py --cron \ --remote-url http://macmini:8000/api/cron/bulk_upsert/ \ --cron-ssh macmini \ --cron-jobs-path /Users/weishen/openclaw/cron/jobs.json \ --cron-runs-path /Users/weishen/openclaw/cron/runs/ - -Cron: - 0 2 * * * cd /path/to/scripts && python sync_sessions.py --remote-url - 0 3 * * * cd /path/to/scripts && python sync_sessions.py --cron --remote-url ... """ import argparse @@ -38,6 +44,7 @@ SESSIONS_DIR_NAME = "sessions" STATE_FILE = ".sync_state" DELETED_SUFFIX = ".deleted." CRON_STATE_FILE = ".sync_cron_state" +SESSION_STATE_FILE = ".sync_session_ssh_state" # ───────────────────────────────────────────────────────────────── @@ -57,103 +64,102 @@ def ssh_read_file(host, remote_path): return result.stdout -def ssh_list_files(host, remote_dir, pattern="*.jsonl"): - """List remote files matching pattern via SSH.""" +def ssh_list_dir(host, remote_dir): + """List only directories (basenames) in a remote directory via SSH.""" + # Use find to list directories only (type -d), then strip the remote_dir prefix result = subprocess.run( - ["ssh", host, f"ls {remote_dir}/{pattern}"], + ["ssh", host, f"find {remote_dir} -maxdepth 1 -type d -name '*' | sort"], capture_output=True, text=True, timeout=30, ) if result.returncode != 0: return [] - return [f.strip() for f in result.stdout.strip().split("\n") if f.strip()] - - -# ───────────────────────────────────────────────────────────────── -# File Discovery -# ───────────────────────────────────────────────────────────────── - -def find_sessions(root_path): - """Walk root_path/agents/*/sessions/ and yield (agent_name, jsonl_path).""" - agents_dir = Path(root_path) / "agents" - if not agents_dir.exists(): - return - for agent_folder in sorted(agents_dir.iterdir()): - if not agent_folder.is_dir(): + dirs = [] + for f in result.stdout.strip().split("\n"): + f = f.strip() + if not f or f == remote_dir: continue - sessions_dir = agent_folder / SESSIONS_DIR_NAME - if not sessions_dir.exists(): + # Strip remote_dir prefix + basename = f[len(remote_dir):].lstrip("/") + dirs.append(basename) + return dirs + + +def ssh_list_files(host, remote_dir, pattern="*"): + """List regular files matching pattern via SSH. Returns basenames only.""" + result = subprocess.run( + ["ssh", host, f"ls -p {remote_dir}/{pattern}"], + capture_output=True, + text=True, + timeout=30, + ) + if result.returncode != 0: + return [] + files = [] + for f in result.stdout.strip().split("\n"): + f = f.strip() + if not f: continue - agent_name = agent_folder.name - for jsonl_file in sorted(sessions_dir.glob("*.jsonl")): - if DELETED_SUFFIX in jsonl_file.name: - continue - yield agent_name, str(jsonl_file) + # `ls -p` appends / to directories; filter them out + if f.endswith("/"): + continue + # ls returns full paths; strip the remote_dir prefix + if f.startswith(remote_dir): + f = f[len(remote_dir):].lstrip("/") + files.append(f) + return files -def get_sync_state(sessions_dir): - """Read .sync_state from sessions directory, return {path: mtime}.""" - state_path = Path(sessions_dir) / STATE_FILE - if not state_path.exists(): - return {} - try: - with open(state_path) as f: - return json.load(f) - except (json.JSONDecodeError, IOError): +def ssh_stat_mtime_batch(host, file_list, sessions_dir): + """ + Batch-get mtimes for multiple files via a single SSH call. + Passes file list on stdin to avoid command-line length limits. + Returns {remote_full_path: mtime}. + """ + if not file_list: return {} + # Build a newline-separated list of files to stat + # Escape single quotes in paths by replacing ' -> '\'' (shell quoting) + safe_list = "\n".join(f"{sessions_dir}/{fn}" for fn in file_list) -def save_sync_state(sessions_dir, state): - """Write .sync_state file.""" - state_path = Path(sessions_dir) / STATE_FILE - with open(state_path, "w") as f: - json.dump(state, f) + # Remote Python one-liner reads paths from stdin, stats each, prints "path:mtime\n" + remote_cmd = ( + "python3 -c \"" + "import sys,os; [print(f'{p}:{os.path.getmtime(p)}') for p in sys.stdin.read().splitlines()]" + "\"" + ) - -def get_new_files(root_path): - """Find files that are new or modified since last sync.""" - state = {} - all_sessions_dirs = set() - - agents_dir = Path(root_path) / "agents" - if agents_dir.exists(): - for agent_folder in agents_dir.iterdir(): - if agent_folder.is_dir(): - sessions_dir = agent_folder / SESSIONS_DIR_NAME - if sessions_dir.exists(): - all_sessions_dirs.add(str(sessions_dir)) - - # Load existing state from all session dirs - merged_state = {} - for sd in all_sessions_dirs: - sd_state = get_sync_state(sd) - merged_state.update(sd_state) - - new_files = [] - for agent_name, jsonl_path in find_sessions(root_path): - stat = os.stat(jsonl_path) - mtime = stat.st_mtime - file_key = jsonl_path - old_mtime = merged_state.get(file_key, 0) - if mtime > old_mtime: - new_files.append((agent_name, jsonl_path)) - merged_state[file_key] = mtime - - # Save new state - for sd in all_sessions_dirs: - dir_files = {k: v for k, v in merged_state.items() if k.startswith(sd)} - save_sync_state(sd, dir_files) - - return new_files + result = subprocess.run( + ["ssh", host, remote_cmd], + input=safe_list, + capture_output=True, + text=True, + timeout=60, + ) + mtimes = {} + for line in result.stdout.strip().split("\n"): + line = line.strip() + if not line or ":" not in line: + continue + path, mtime_str = line.rsplit(":", 1) + try: + mtimes[path] = int(float(mtime_str)) + except ValueError: + continue + return mtimes # ───────────────────────────────────────────────────────────────── # JSONL Parser (Session mode) # ───────────────────────────────────────────────────────────────── -def parse_jsonl(file_path): - """Parse a JSONL file and return structured session data.""" +def parse_jsonl_content(content): + """ + Parse JSONL content string and return (sessions, messages, tool_calls). + This is the core parsing logic shared by both local and SSH modes. + """ sessions = [] messages = [] tool_calls = [] @@ -164,16 +170,15 @@ def parse_jsonl(file_path): tool_results = {} events = [] - with open(file_path, "r", encoding="utf-8") as f: - for line in f: - line = line.strip() - if not line: - continue - try: - event = json.loads(line) - events.append(event) - except json.JSONDecodeError: - continue + for line in content.strip().split("\n"): + line = line.strip() + if not line: + continue + try: + event = json.loads(line) + events.append(event) + except json.JSONDecodeError: + continue if not events: return sessions, messages, tool_calls @@ -325,7 +330,7 @@ def parse_jsonl(file_path): "message_count": message_count, "tool_call_count": tool_call_count, "error_count": error_count, - "raw_file_path": str(file_path), + "raw_file_path": "", "status": "active", "metadata": {}, } @@ -334,6 +339,231 @@ def parse_jsonl(file_path): return sessions, messages, tool_calls +def parse_jsonl(file_path): + """Parse a local JSONL file and return structured session data.""" + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + sessions, messages, tool_calls = parse_jsonl_content(content) + # Fill raw_file_path for local files + for s in sessions: + s["raw_file_path"] = str(file_path) + return sessions, messages, tool_calls + + +# ───────────────────────────────────────────────────────────────── +# File Discovery (local) +# ───────────────────────────────────────────────────────────────── + +def find_sessions(root_path): + """Walk root_path/agents/*/sessions/ and yield (agent_name, jsonl_path).""" + agents_dir = Path(root_path) / "agents" + if not agents_dir.exists(): + return + for agent_folder in sorted(agents_dir.iterdir()): + if not agent_folder.is_dir(): + continue + sessions_dir = agent_folder / SESSIONS_DIR_NAME + if not sessions_dir.exists(): + continue + agent_name = agent_folder.name + for jsonl_file in sorted(sessions_dir.glob("*.jsonl")): + if DELETED_SUFFIX in jsonl_file.name: + continue + yield agent_name, str(jsonl_file) + + +def get_sync_state(sessions_dir): + """Read .sync_state from sessions directory, return {path: mtime}.""" + state_path = Path(sessions_dir) / STATE_FILE + if not state_path.exists(): + return {} + try: + with open(state_path) as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + return {} + + +def save_sync_state(sessions_dir, state): + """Write .sync_state file.""" + state_path = Path(sessions_dir) / STATE_FILE + with open(state_path, "w") as f: + json.dump(state, f) + + +def get_new_files(root_path): + """Find files that are new or modified since last sync.""" + state = {} + all_sessions_dirs = set() + + agents_dir = Path(root_path) / "agents" + if agents_dir.exists(): + for agent_folder in agents_dir.iterdir(): + if agent_folder.is_dir(): + sessions_dir = agent_folder / SESSIONS_DIR_NAME + if sessions_dir.exists(): + all_sessions_dirs.add(str(sessions_dir)) + + # Load existing state from all session dirs + merged_state = {} + for sd in all_sessions_dirs: + sd_state = get_sync_state(sd) + merged_state.update(sd_state) + + new_files = [] + for agent_name, jsonl_path in find_sessions(root_path): + stat = os.stat(jsonl_path) + mtime = stat.st_mtime + file_key = jsonl_path + old_mtime = merged_state.get(file_key, 0) + if mtime > old_mtime: + new_files.append((agent_name, jsonl_path)) + merged_state[file_key] = mtime + + # Save new state + for sd in all_sessions_dirs: + dir_files = {k: v for k, v in merged_state.items() if k.startswith(sd)} + save_sync_state(sd, dir_files) + + return new_files + + +# ───────────────────────────────────────────────────────────────── +# Session Sync via SSH +# ───────────────────────────────────────────────────────────────── + +def get_ssh_sync_state(state_file_path): + """Read SSH session sync state, return {file_key: mtime}.""" + p = Path(state_file_path) + if not p.exists(): + return {} + try: + with open(p) as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + return {} + + +def save_ssh_sync_state(state_file_path, state): + """Write SSH session sync state.""" + p = Path(state_file_path) + with open(p, "w") as f: + json.dump(state, f) + + +def sync_sessions_via_ssh(args): + """Sync sessions from a remote host via SSH.""" + ssh_host = args.sync_ssh + source_path = args.sync_source_path.rstrip("/") + + print(f"[SSH Session Sync] host={ssh_host}, source={source_path}") + + # Enumerate all agents on remote + agents_dir = f"{source_path}/agents" + print(f" Scanning {ssh_host}:{agents_dir}/ ...") + agent_folders = ssh_list_dir(ssh_host, agents_dir) + + # Load sync state + state_file = Path.home() / SESSION_STATE_FILE + prev_state = get_ssh_sync_state(str(state_file)) + + new_state = {} + new_files = [] # (agent_name, remote_file_path) + + for agent_folder in sorted(agent_folders): + sessions_dir = f"{agents_dir}/{agent_folder}/{SESSIONS_DIR_NAME}" + # List .jsonl files, excluding deleted + raw_files = ssh_list_files(ssh_host, sessions_dir, "*.jsonl") + jsonl_files = [ + f for f in raw_files + if DELETED_SUFFIX not in f and f.endswith(".jsonl") + ] + if not jsonl_files: + continue + + # Batch-fetch mtimes for all files in this agent's sessions dir + all_mtimes = ssh_stat_mtime_batch(ssh_host, sorted(jsonl_files), sessions_dir) + + for jsonl_file in sorted(jsonl_files): + remote_full = f"{sessions_dir}/{jsonl_file}" + mtime = all_mtimes.get(remote_full) + if mtime is None: + continue + + old_mtime = prev_state.get(remote_full, 0) + if mtime > old_mtime: + new_files.append((agent_folder, remote_full)) + new_state[remote_full] = mtime + + if not new_files: + print(" No new or modified session files found.") + save_ssh_sync_state(str(state_file), new_state) + return + + print(f" Found {len(new_files)} new/modified session file(s).") + + # Save new state + save_ssh_sync_state(str(state_file), new_state) + + # Group by agent_name + agent_batches = {} + for agent_name, remote_path in new_files: + agent_batches.setdefault(agent_name, []).append(remote_path) + + total_sessions = 0 + total_messages = 0 + total_tool_calls = 0 + + for agent_name, remote_paths in agent_batches.items(): + all_sessions = [] + all_messages = [] + all_tool_calls = [] + + for remote_path in remote_paths: + print(f" Parsing: {remote_path}") + try: + content = ssh_read_file(ssh_host, remote_path) + sessions, messages, tool_calls = parse_jsonl_content(content) + # Fill raw_file_path with remote path + for s in sessions: + s["raw_file_path"] = remote_path + all_sessions.extend(sessions) + all_messages.extend(messages) + all_tool_calls.extend(tool_calls) + except Exception as e: + print(f" ERROR reading {remote_path}: {e}") + continue + + if not all_sessions: + continue + + payload = { + "agent_name": agent_name, + "source_node": ssh_host, + "sessions": all_sessions, + "messages": all_messages, + "tool_calls": all_tool_calls, + } + + print(f" Pushing {len(all_sessions)} session(s), " + f"{len(all_messages)} message(s), " + f"{len(all_tool_calls)} tool call(s) ...") + + try: + result = push_to_api(args.remote_url, payload) + print(f" OK: sessions_upserted={result.get('sessions_upserted', 0)}, " + f"messages_upserted={result.get('messages_upserted', 0)}, " + f"tool_calls_upserted={result.get('tool_calls_upserted', 0)}") + total_sessions += result.get("sessions_upserted", 0) + total_messages += result.get("messages_upserted", 0) + total_tool_calls += result.get("tool_calls_upserted", 0) + except Exception as e: + print(f" FAILED to push {agent_name} sessions: {e}") + + print(f"\nSync complete: {total_sessions} sessions, " + f"{total_messages} messages, {total_tool_calls} tool calls pushed.") + + # ───────────────────────────────────────────────────────────────── # Cron Sync Mode # ───────────────────────────────────────────────────────────────── @@ -363,7 +593,7 @@ def sync_cron_jobs(args): jobs_path = args.cron_jobs_path runs_path = args.cron_runs_path.rstrip("/") - print(f"Fetching jobs.json from {ssh_host}:{jobs_path}...") + print(f"[Cron Sync] host={ssh_host}, jobs={jobs_path}") try: jobs_raw = ssh_read_file(ssh_host, jobs_path) jobs_data = json.loads(jobs_raw) @@ -376,38 +606,25 @@ def sync_cron_jobs(args): print(f" Found {len(jobs)} jobs") # Find runs files, filter to only those matching known job IDs - print(f"Scanning runs directory {ssh_host}:{runs_path}/...") + print(f"Scanning runs directory {ssh_host}:{runs_path}/ ...") all_run_files = ssh_list_files(ssh_host, runs_path, "*.jsonl") run_files = [f for f in all_run_files if Path(f).stem in job_ids] print(f" Found {len(run_files)} run files matching known job IDs") # Load sync state - state_file = Path.home() / ".sync_cron_state" + state_file = Path.home() / CRON_STATE_FILE prev_state = get_cron_state(str(state_file)) new_runs = [] new_state = {} - # Detect remote platform (Linux vs macOS) for stat syntax - uname_result = subprocess.run( - ["ssh", ssh_host, "uname"], - capture_output=True, text=True, timeout=10, - ) - is_macos = uname_result.stdout.strip() == "Darwin" - stat_cmd = f"stat -f %m" if is_macos else f"stat -c %Y" + # Batch-fetch mtimes for all run files + all_run_mtimes = ssh_stat_mtime_batch(ssh_host, [Path(f).name for f in run_files], runs_path) for run_file in run_files: remote_full = f"{runs_path}/{Path(run_file).name}" - # Get mtime via SSH - result = subprocess.run( - ["ssh", ssh_host, f"{stat_cmd} {remote_full}"], - capture_output=True, text=True, timeout=10, - ) - if result.returncode != 0: - continue - try: - mtime = int(result.stdout.strip()) - except ValueError: + mtime = all_run_mtimes.get(remote_full) + if mtime is None: continue old_mtime = prev_state.get(remote_full, 0) @@ -416,11 +633,11 @@ def sync_cron_jobs(args): new_state[remote_full] = mtime if not new_runs: - print("No new or modified run files found.") + print(" No new or modified run files found.") save_cron_state(str(state_file), new_state) return - print(f"Parsing {len(new_runs)} new/modified run file(s)...") + print(f" Parsing {len(new_runs)} new/modified run file(s) ...") all_runs = [] for run_file in new_runs: @@ -441,7 +658,7 @@ def sync_cron_jobs(args): continue if not all_runs: - print("No run records parsed.") + print(" No run records parsed.") save_cron_state(str(state_file), new_state) return @@ -454,7 +671,7 @@ def sync_cron_jobs(args): "runs": all_runs, } - print(f"Pushing {len(jobs)} jobs and {len(all_runs)} runs to {args.remote_url}...") + print(f" Pushing {len(jobs)} jobs and {len(all_runs)} runs to {args.remote_url} ...") try: result = push_to_api(args.remote_url, payload) print(f" OK: jobs_upserted={result.get('jobs_upserted', 0)}, " @@ -495,7 +712,9 @@ def push_to_api(remote_url, payload): # ───────────────────────────────────────────────────────────────── def main(): - parser = argparse.ArgumentParser(description="Sync OpenClaw sessions or cron data to Django API") + parser = argparse.ArgumentParser( + description="Sync OpenClaw sessions or cron data to Django API" + ) parser.add_argument( "--remote-url", required=True, @@ -504,7 +723,17 @@ def main(): parser.add_argument( "--root-path", default=".", - help="Root path containing agents/ directory (default: current dir)", + help="Local root path containing agents/ directory (default: current dir)", + ) + parser.add_argument( + "--sync-ssh", + metavar="HOST", + help="Sync sessions from a remote host via SSH (e.g. ubuntu1, macmini)", + ) + parser.add_argument( + "--sync-source-path", + default="/home/shenwei/.openclaw", + help="Remote source path for --sync-ssh (default: /home/shenwei/.openclaw)", ) parser.add_argument( "--cron", @@ -529,11 +758,16 @@ def main(): args = parser.parse_args() + # SSH session sync mode takes priority + if args.sync_ssh: + sync_sessions_via_ssh(args) + return + if args.cron: sync_cron_jobs(args) return - # Original session sync mode + # Local session sync mode new_files = get_new_files(args.root_path) if not new_files: print("No new or modified session files found.") @@ -578,11 +812,13 @@ def main(): print(f" Pushing {len(all_sessions)} session(s), " f"{len(all_messages)} message(s), " - f"{len(all_tool_calls)} tool call(s)...") + f"{len(all_tool_calls)} tool call(s) ...") try: result = push_to_api(args.remote_url, payload) - print(f" OK: {result}") + print(f" OK: sessions_upserted={result.get('sessions_upserted', 0)}, " + f"messages_upserted={result.get('messages_upserted', 0)}, " + f"tool_calls_upserted={result.get('tool_calls_upserted', 0)}") total_sessions += result.get("sessions_upserted", 0) total_messages += result.get("messages_upserted", 0) total_tool_calls += result.get("tool_calls_upserted", 0)