Compare commits

...

1 Commits

Author SHA1 Message Date
ishenwei
c517a0bbde sync_sessions.py: add --sync-ssh for remote session sync via SSH
- Add --sync-ssh HOST --sync-source-path PATH to sync sessions from
  a remote host without needing to install the script there.
- Key changes:
  * ssh_list_dir(): enumerate subdirs only (fixes 'sessions' file in agents/)
  * ssh_list_files(): use ls -p to filter directories; strip remote_dir prefix
  * ssh_stat_mtime_batch(): single SSH call via Python stdin, works on
    both Linux and macOS, avoids xargs command-line length limits
  * sync_sessions_via_ssh(): parallel mtime fetch per agent dir, then
    SSH-read & parse only new/modified files
  * parse_jsonl_content(): shared parser for both local files and SSH content
- Session sync now unified: run from macmini, targets macmini/ubuntu1/ubuntu2.
- Cron sync also updated to use ssh_stat_mtime_batch for efficiency.
2026-04-16 10:06:24 +08:00

View File

@@ -2,23 +2,29 @@
""" """
OpenClaw Session Sync Script OpenClaw Session Sync Script
Scans local agent sessions directories, parses JSONL files, Scans local or remote (via SSH) agent session directories, parses JSONL files,
and pushes structured JSON to the Django API. and pushes structured JSON to the Django API.
Usage: Usage:
# Session sync (existing) # Local session sync
python sync_sessions.py --remote-url http://macmini:8000/api/sessions/bulk_upsert/ python sync_sessions.py --remote-url http://macmini:8000/api/sessions/bulk_upsert/
# Cron job sync (new) # Remote session sync via SSH (single source, run from macmini)
python sync_sessions.py --sync-ssh ubuntu1 \
--sync-source-path /home/shenwei/.openclaw \
--remote-url http://192.168.3.45:8765/api/sessions/bulk_upsert/
# Remote session sync via SSH (all three nodes)
python sync_sessions.py --sync-ssh macmini \
--sync-source-path /Users/weishen/.openclaw \
--remote-url http://192.168.3.45:8765/api/sessions/bulk_upsert/
# Cron job sync (existing)
python sync_sessions.py --cron \ python sync_sessions.py --cron \
--remote-url http://macmini:8000/api/cron/bulk_upsert/ \ --remote-url http://macmini:8000/api/cron/bulk_upsert/ \
--cron-ssh macmini \ --cron-ssh macmini \
--cron-jobs-path /Users/weishen/openclaw/cron/jobs.json \ --cron-jobs-path /Users/weishen/openclaw/cron/jobs.json \
--cron-runs-path /Users/weishen/openclaw/cron/runs/ --cron-runs-path /Users/weishen/openclaw/cron/runs/
Cron:
0 2 * * * cd /path/to/scripts && python sync_sessions.py --remote-url <url>
0 3 * * * cd /path/to/scripts && python sync_sessions.py --cron --remote-url <cron-url> ...
""" """
import argparse import argparse
@@ -38,6 +44,7 @@ SESSIONS_DIR_NAME = "sessions"
STATE_FILE = ".sync_state" STATE_FILE = ".sync_state"
DELETED_SUFFIX = ".deleted." DELETED_SUFFIX = ".deleted."
CRON_STATE_FILE = ".sync_cron_state" CRON_STATE_FILE = ".sync_cron_state"
SESSION_STATE_FILE = ".sync_session_ssh_state"
# ───────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────
@@ -57,103 +64,102 @@ def ssh_read_file(host, remote_path):
return result.stdout return result.stdout
def ssh_list_files(host, remote_dir, pattern="*.jsonl"): def ssh_list_dir(host, remote_dir):
"""List remote files matching pattern via SSH.""" """List only directories (basenames) in a remote directory via SSH."""
# Use find to list directories only (type -d), then strip the remote_dir prefix
result = subprocess.run( result = subprocess.run(
["ssh", host, f"ls {remote_dir}/{pattern}"], ["ssh", host, f"find {remote_dir} -maxdepth 1 -type d -name '*' | sort"],
capture_output=True, capture_output=True,
text=True, text=True,
timeout=30, timeout=30,
) )
if result.returncode != 0: if result.returncode != 0:
return [] return []
return [f.strip() for f in result.stdout.strip().split("\n") if f.strip()] dirs = []
for f in result.stdout.strip().split("\n"):
f = f.strip()
# ───────────────────────────────────────────────────────────────── if not f or f == remote_dir:
# File Discovery
# ─────────────────────────────────────────────────────────────────
def find_sessions(root_path):
"""Walk root_path/agents/*/sessions/ and yield (agent_name, jsonl_path)."""
agents_dir = Path(root_path) / "agents"
if not agents_dir.exists():
return
for agent_folder in sorted(agents_dir.iterdir()):
if not agent_folder.is_dir():
continue continue
sessions_dir = agent_folder / SESSIONS_DIR_NAME # Strip remote_dir prefix
if not sessions_dir.exists(): basename = f[len(remote_dir):].lstrip("/")
dirs.append(basename)
return dirs
def ssh_list_files(host, remote_dir, pattern="*"):
"""List regular files matching pattern via SSH. Returns basenames only."""
result = subprocess.run(
["ssh", host, f"ls -p {remote_dir}/{pattern}"],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
return []
files = []
for f in result.stdout.strip().split("\n"):
f = f.strip()
if not f:
continue continue
agent_name = agent_folder.name # `ls -p` appends / to directories; filter them out
for jsonl_file in sorted(sessions_dir.glob("*.jsonl")): if f.endswith("/"):
if DELETED_SUFFIX in jsonl_file.name: continue
continue # ls returns full paths; strip the remote_dir prefix
yield agent_name, str(jsonl_file) if f.startswith(remote_dir):
f = f[len(remote_dir):].lstrip("/")
files.append(f)
return files
def get_sync_state(sessions_dir): def ssh_stat_mtime_batch(host, file_list, sessions_dir):
"""Read .sync_state from sessions directory, return {path: mtime}.""" """
state_path = Path(sessions_dir) / STATE_FILE Batch-get mtimes for multiple files via a single SSH call.
if not state_path.exists(): Passes file list on stdin to avoid command-line length limits.
return {} Returns {remote_full_path: mtime}.
try: """
with open(state_path) as f: if not file_list:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {} return {}
# Build a newline-separated list of files to stat
# Escape single quotes in paths by replacing ' -> '\'' (shell quoting)
safe_list = "\n".join(f"{sessions_dir}/{fn}" for fn in file_list)
def save_sync_state(sessions_dir, state): # Remote Python one-liner reads paths from stdin, stats each, prints "path:mtime\n"
"""Write .sync_state file.""" remote_cmd = (
state_path = Path(sessions_dir) / STATE_FILE "python3 -c \""
with open(state_path, "w") as f: "import sys,os; [print(f'{p}:{os.path.getmtime(p)}') for p in sys.stdin.read().splitlines()]"
json.dump(state, f) "\""
)
result = subprocess.run(
def get_new_files(root_path): ["ssh", host, remote_cmd],
"""Find files that are new or modified since last sync.""" input=safe_list,
state = {} capture_output=True,
all_sessions_dirs = set() text=True,
timeout=60,
agents_dir = Path(root_path) / "agents" )
if agents_dir.exists(): mtimes = {}
for agent_folder in agents_dir.iterdir(): for line in result.stdout.strip().split("\n"):
if agent_folder.is_dir(): line = line.strip()
sessions_dir = agent_folder / SESSIONS_DIR_NAME if not line or ":" not in line:
if sessions_dir.exists(): continue
all_sessions_dirs.add(str(sessions_dir)) path, mtime_str = line.rsplit(":", 1)
try:
# Load existing state from all session dirs mtimes[path] = int(float(mtime_str))
merged_state = {} except ValueError:
for sd in all_sessions_dirs: continue
sd_state = get_sync_state(sd) return mtimes
merged_state.update(sd_state)
new_files = []
for agent_name, jsonl_path in find_sessions(root_path):
stat = os.stat(jsonl_path)
mtime = stat.st_mtime
file_key = jsonl_path
old_mtime = merged_state.get(file_key, 0)
if mtime > old_mtime:
new_files.append((agent_name, jsonl_path))
merged_state[file_key] = mtime
# Save new state
for sd in all_sessions_dirs:
dir_files = {k: v for k, v in merged_state.items() if k.startswith(sd)}
save_sync_state(sd, dir_files)
return new_files
# ───────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────
# JSONL Parser (Session mode) # JSONL Parser (Session mode)
# ───────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────
def parse_jsonl(file_path): def parse_jsonl_content(content):
"""Parse a JSONL file and return structured session data.""" """
Parse JSONL content string and return (sessions, messages, tool_calls).
This is the core parsing logic shared by both local and SSH modes.
"""
sessions = [] sessions = []
messages = [] messages = []
tool_calls = [] tool_calls = []
@@ -164,16 +170,15 @@ def parse_jsonl(file_path):
tool_results = {} tool_results = {}
events = [] events = []
with open(file_path, "r", encoding="utf-8") as f: for line in content.strip().split("\n"):
for line in f: line = line.strip()
line = line.strip() if not line:
if not line: continue
continue try:
try: event = json.loads(line)
event = json.loads(line) events.append(event)
events.append(event) except json.JSONDecodeError:
except json.JSONDecodeError: continue
continue
if not events: if not events:
return sessions, messages, tool_calls return sessions, messages, tool_calls
@@ -325,7 +330,7 @@ def parse_jsonl(file_path):
"message_count": message_count, "message_count": message_count,
"tool_call_count": tool_call_count, "tool_call_count": tool_call_count,
"error_count": error_count, "error_count": error_count,
"raw_file_path": str(file_path), "raw_file_path": "",
"status": "active", "status": "active",
"metadata": {}, "metadata": {},
} }
@@ -334,6 +339,231 @@ def parse_jsonl(file_path):
return sessions, messages, tool_calls return sessions, messages, tool_calls
def parse_jsonl(file_path):
"""Parse a local JSONL file and return structured session data."""
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
sessions, messages, tool_calls = parse_jsonl_content(content)
# Fill raw_file_path for local files
for s in sessions:
s["raw_file_path"] = str(file_path)
return sessions, messages, tool_calls
# ─────────────────────────────────────────────────────────────────
# File Discovery (local)
# ─────────────────────────────────────────────────────────────────
def find_sessions(root_path):
"""Walk root_path/agents/*/sessions/ and yield (agent_name, jsonl_path)."""
agents_dir = Path(root_path) / "agents"
if not agents_dir.exists():
return
for agent_folder in sorted(agents_dir.iterdir()):
if not agent_folder.is_dir():
continue
sessions_dir = agent_folder / SESSIONS_DIR_NAME
if not sessions_dir.exists():
continue
agent_name = agent_folder.name
for jsonl_file in sorted(sessions_dir.glob("*.jsonl")):
if DELETED_SUFFIX in jsonl_file.name:
continue
yield agent_name, str(jsonl_file)
def get_sync_state(sessions_dir):
"""Read .sync_state from sessions directory, return {path: mtime}."""
state_path = Path(sessions_dir) / STATE_FILE
if not state_path.exists():
return {}
try:
with open(state_path) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {}
def save_sync_state(sessions_dir, state):
"""Write .sync_state file."""
state_path = Path(sessions_dir) / STATE_FILE
with open(state_path, "w") as f:
json.dump(state, f)
def get_new_files(root_path):
"""Find files that are new or modified since last sync."""
state = {}
all_sessions_dirs = set()
agents_dir = Path(root_path) / "agents"
if agents_dir.exists():
for agent_folder in agents_dir.iterdir():
if agent_folder.is_dir():
sessions_dir = agent_folder / SESSIONS_DIR_NAME
if sessions_dir.exists():
all_sessions_dirs.add(str(sessions_dir))
# Load existing state from all session dirs
merged_state = {}
for sd in all_sessions_dirs:
sd_state = get_sync_state(sd)
merged_state.update(sd_state)
new_files = []
for agent_name, jsonl_path in find_sessions(root_path):
stat = os.stat(jsonl_path)
mtime = stat.st_mtime
file_key = jsonl_path
old_mtime = merged_state.get(file_key, 0)
if mtime > old_mtime:
new_files.append((agent_name, jsonl_path))
merged_state[file_key] = mtime
# Save new state
for sd in all_sessions_dirs:
dir_files = {k: v for k, v in merged_state.items() if k.startswith(sd)}
save_sync_state(sd, dir_files)
return new_files
# ─────────────────────────────────────────────────────────────────
# Session Sync via SSH
# ─────────────────────────────────────────────────────────────────
def get_ssh_sync_state(state_file_path):
"""Read SSH session sync state, return {file_key: mtime}."""
p = Path(state_file_path)
if not p.exists():
return {}
try:
with open(p) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {}
def save_ssh_sync_state(state_file_path, state):
"""Write SSH session sync state."""
p = Path(state_file_path)
with open(p, "w") as f:
json.dump(state, f)
def sync_sessions_via_ssh(args):
"""Sync sessions from a remote host via SSH."""
ssh_host = args.sync_ssh
source_path = args.sync_source_path.rstrip("/")
print(f"[SSH Session Sync] host={ssh_host}, source={source_path}")
# Enumerate all agents on remote
agents_dir = f"{source_path}/agents"
print(f" Scanning {ssh_host}:{agents_dir}/ ...")
agent_folders = ssh_list_dir(ssh_host, agents_dir)
# Load sync state
state_file = Path.home() / SESSION_STATE_FILE
prev_state = get_ssh_sync_state(str(state_file))
new_state = {}
new_files = [] # (agent_name, remote_file_path)
for agent_folder in sorted(agent_folders):
sessions_dir = f"{agents_dir}/{agent_folder}/{SESSIONS_DIR_NAME}"
# List .jsonl files, excluding deleted
raw_files = ssh_list_files(ssh_host, sessions_dir, "*.jsonl")
jsonl_files = [
f for f in raw_files
if DELETED_SUFFIX not in f and f.endswith(".jsonl")
]
if not jsonl_files:
continue
# Batch-fetch mtimes for all files in this agent's sessions dir
all_mtimes = ssh_stat_mtime_batch(ssh_host, sorted(jsonl_files), sessions_dir)
for jsonl_file in sorted(jsonl_files):
remote_full = f"{sessions_dir}/{jsonl_file}"
mtime = all_mtimes.get(remote_full)
if mtime is None:
continue
old_mtime = prev_state.get(remote_full, 0)
if mtime > old_mtime:
new_files.append((agent_folder, remote_full))
new_state[remote_full] = mtime
if not new_files:
print(" No new or modified session files found.")
save_ssh_sync_state(str(state_file), new_state)
return
print(f" Found {len(new_files)} new/modified session file(s).")
# Save new state
save_ssh_sync_state(str(state_file), new_state)
# Group by agent_name
agent_batches = {}
for agent_name, remote_path in new_files:
agent_batches.setdefault(agent_name, []).append(remote_path)
total_sessions = 0
total_messages = 0
total_tool_calls = 0
for agent_name, remote_paths in agent_batches.items():
all_sessions = []
all_messages = []
all_tool_calls = []
for remote_path in remote_paths:
print(f" Parsing: {remote_path}")
try:
content = ssh_read_file(ssh_host, remote_path)
sessions, messages, tool_calls = parse_jsonl_content(content)
# Fill raw_file_path with remote path
for s in sessions:
s["raw_file_path"] = remote_path
all_sessions.extend(sessions)
all_messages.extend(messages)
all_tool_calls.extend(tool_calls)
except Exception as e:
print(f" ERROR reading {remote_path}: {e}")
continue
if not all_sessions:
continue
payload = {
"agent_name": agent_name,
"source_node": ssh_host,
"sessions": all_sessions,
"messages": all_messages,
"tool_calls": all_tool_calls,
}
print(f" Pushing {len(all_sessions)} session(s), "
f"{len(all_messages)} message(s), "
f"{len(all_tool_calls)} tool call(s) ...")
try:
result = push_to_api(args.remote_url, payload)
print(f" OK: sessions_upserted={result.get('sessions_upserted', 0)}, "
f"messages_upserted={result.get('messages_upserted', 0)}, "
f"tool_calls_upserted={result.get('tool_calls_upserted', 0)}")
total_sessions += result.get("sessions_upserted", 0)
total_messages += result.get("messages_upserted", 0)
total_tool_calls += result.get("tool_calls_upserted", 0)
except Exception as e:
print(f" FAILED to push {agent_name} sessions: {e}")
print(f"\nSync complete: {total_sessions} sessions, "
f"{total_messages} messages, {total_tool_calls} tool calls pushed.")
# ───────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────
# Cron Sync Mode # Cron Sync Mode
# ───────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────
@@ -363,7 +593,7 @@ def sync_cron_jobs(args):
jobs_path = args.cron_jobs_path jobs_path = args.cron_jobs_path
runs_path = args.cron_runs_path.rstrip("/") runs_path = args.cron_runs_path.rstrip("/")
print(f"Fetching jobs.json from {ssh_host}:{jobs_path}...") print(f"[Cron Sync] host={ssh_host}, jobs={jobs_path}")
try: try:
jobs_raw = ssh_read_file(ssh_host, jobs_path) jobs_raw = ssh_read_file(ssh_host, jobs_path)
jobs_data = json.loads(jobs_raw) jobs_data = json.loads(jobs_raw)
@@ -376,38 +606,25 @@ def sync_cron_jobs(args):
print(f" Found {len(jobs)} jobs") print(f" Found {len(jobs)} jobs")
# Find runs files, filter to only those matching known job IDs # Find runs files, filter to only those matching known job IDs
print(f"Scanning runs directory {ssh_host}:{runs_path}/...") print(f"Scanning runs directory {ssh_host}:{runs_path}/ ...")
all_run_files = ssh_list_files(ssh_host, runs_path, "*.jsonl") all_run_files = ssh_list_files(ssh_host, runs_path, "*.jsonl")
run_files = [f for f in all_run_files if Path(f).stem in job_ids] run_files = [f for f in all_run_files if Path(f).stem in job_ids]
print(f" Found {len(run_files)} run files matching known job IDs") print(f" Found {len(run_files)} run files matching known job IDs")
# Load sync state # Load sync state
state_file = Path.home() / ".sync_cron_state" state_file = Path.home() / CRON_STATE_FILE
prev_state = get_cron_state(str(state_file)) prev_state = get_cron_state(str(state_file))
new_runs = [] new_runs = []
new_state = {} new_state = {}
# Detect remote platform (Linux vs macOS) for stat syntax # Batch-fetch mtimes for all run files
uname_result = subprocess.run( all_run_mtimes = ssh_stat_mtime_batch(ssh_host, [Path(f).name for f in run_files], runs_path)
["ssh", ssh_host, "uname"],
capture_output=True, text=True, timeout=10,
)
is_macos = uname_result.stdout.strip() == "Darwin"
stat_cmd = f"stat -f %m" if is_macos else f"stat -c %Y"
for run_file in run_files: for run_file in run_files:
remote_full = f"{runs_path}/{Path(run_file).name}" remote_full = f"{runs_path}/{Path(run_file).name}"
# Get mtime via SSH mtime = all_run_mtimes.get(remote_full)
result = subprocess.run( if mtime is None:
["ssh", ssh_host, f"{stat_cmd} {remote_full}"],
capture_output=True, text=True, timeout=10,
)
if result.returncode != 0:
continue
try:
mtime = int(result.stdout.strip())
except ValueError:
continue continue
old_mtime = prev_state.get(remote_full, 0) old_mtime = prev_state.get(remote_full, 0)
@@ -416,11 +633,11 @@ def sync_cron_jobs(args):
new_state[remote_full] = mtime new_state[remote_full] = mtime
if not new_runs: if not new_runs:
print("No new or modified run files found.") print(" No new or modified run files found.")
save_cron_state(str(state_file), new_state) save_cron_state(str(state_file), new_state)
return return
print(f"Parsing {len(new_runs)} new/modified run file(s)...") print(f" Parsing {len(new_runs)} new/modified run file(s) ...")
all_runs = [] all_runs = []
for run_file in new_runs: for run_file in new_runs:
@@ -441,7 +658,7 @@ def sync_cron_jobs(args):
continue continue
if not all_runs: if not all_runs:
print("No run records parsed.") print(" No run records parsed.")
save_cron_state(str(state_file), new_state) save_cron_state(str(state_file), new_state)
return return
@@ -454,7 +671,7 @@ def sync_cron_jobs(args):
"runs": all_runs, "runs": all_runs,
} }
print(f"Pushing {len(jobs)} jobs and {len(all_runs)} runs to {args.remote_url}...") print(f" Pushing {len(jobs)} jobs and {len(all_runs)} runs to {args.remote_url} ...")
try: try:
result = push_to_api(args.remote_url, payload) result = push_to_api(args.remote_url, payload)
print(f" OK: jobs_upserted={result.get('jobs_upserted', 0)}, " print(f" OK: jobs_upserted={result.get('jobs_upserted', 0)}, "
@@ -495,7 +712,9 @@ def push_to_api(remote_url, payload):
# ───────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────
def main(): def main():
parser = argparse.ArgumentParser(description="Sync OpenClaw sessions or cron data to Django API") parser = argparse.ArgumentParser(
description="Sync OpenClaw sessions or cron data to Django API"
)
parser.add_argument( parser.add_argument(
"--remote-url", "--remote-url",
required=True, required=True,
@@ -504,7 +723,17 @@ def main():
parser.add_argument( parser.add_argument(
"--root-path", "--root-path",
default=".", default=".",
help="Root path containing agents/ directory (default: current dir)", help="Local root path containing agents/ directory (default: current dir)",
)
parser.add_argument(
"--sync-ssh",
metavar="HOST",
help="Sync sessions from a remote host via SSH (e.g. ubuntu1, macmini)",
)
parser.add_argument(
"--sync-source-path",
default="/home/shenwei/.openclaw",
help="Remote source path for --sync-ssh (default: /home/shenwei/.openclaw)",
) )
parser.add_argument( parser.add_argument(
"--cron", "--cron",
@@ -529,11 +758,16 @@ def main():
args = parser.parse_args() args = parser.parse_args()
# SSH session sync mode takes priority
if args.sync_ssh:
sync_sessions_via_ssh(args)
return
if args.cron: if args.cron:
sync_cron_jobs(args) sync_cron_jobs(args)
return return
# Original session sync mode # Local session sync mode
new_files = get_new_files(args.root_path) new_files = get_new_files(args.root_path)
if not new_files: if not new_files:
print("No new or modified session files found.") print("No new or modified session files found.")
@@ -578,11 +812,13 @@ def main():
print(f" Pushing {len(all_sessions)} session(s), " print(f" Pushing {len(all_sessions)} session(s), "
f"{len(all_messages)} message(s), " f"{len(all_messages)} message(s), "
f"{len(all_tool_calls)} tool call(s)...") f"{len(all_tool_calls)} tool call(s) ...")
try: try:
result = push_to_api(args.remote_url, payload) result = push_to_api(args.remote_url, payload)
print(f" OK: {result}") print(f" OK: sessions_upserted={result.get('sessions_upserted', 0)}, "
f"messages_upserted={result.get('messages_upserted', 0)}, "
f"tool_calls_upserted={result.get('tool_calls_upserted', 0)}")
total_sessions += result.get("sessions_upserted", 0) total_sessions += result.get("sessions_upserted", 0)
total_messages += result.get("messages_upserted", 0) total_messages += result.get("messages_upserted", 0)
total_tool_calls += result.get("tool_calls_upserted", 0) total_tool_calls += result.get("tool_calls_upserted", 0)