Files
agent-base/scripts/sync_sessions.py
2026-04-05 16:27:27 +08:00

416 lines
15 KiB
Python
Executable File

#!/usr/bin/env python
"""
OpenClaw Session Sync Script
Scans local agent sessions directories, parses JSONL files,
and pushes structured JSON to the Django API.
Usage:
python sync_sessions.py --remote-url http://macmini:8000/api/sessions/bulk_upsert/
Cron:
0 2 * * * cd /path/to/scripts && python sync_sessions.py --remote-url <url>
"""
import argparse
import json
import os
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
# ─────────────────────────────────────────────────────────────────
# Configuration
# ─────────────────────────────────────────────────────────────────
SESSIONS_DIR_NAME = "sessions"
STATE_FILE = ".sync_state"
DELETED_SUFFIX = ".deleted."
# ─────────────────────────────────────────────────────────────────
# File Discovery
# ─────────────────────────────────────────────────────────────────
def find_sessions(root_path):
"""Walk root_path/agents/*/sessions/ and yield (agent_name, jsonl_path)."""
agents_dir = Path(root_path) / "agents"
if not agents_dir.exists():
return
for agent_folder in sorted(agents_dir.iterdir()):
if not agent_folder.is_dir():
continue
sessions_dir = agent_folder / SESSIONS_DIR_NAME
if not sessions_dir.exists():
continue
agent_name = agent_folder.name
for jsonl_file in sorted(sessions_dir.glob("*.jsonl")):
if DELETED_SUFFIX in jsonl_file.name:
continue
yield agent_name, str(jsonl_file)
def get_sync_state(sessions_dir):
"""Read .sync_state from sessions directory, return {path: mtime}."""
state_path = Path(sessions_dir) / STATE_FILE
if not state_path.exists():
return {}
try:
with open(state_path) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {}
def save_sync_state(sessions_dir, state):
"""Write .sync_state file."""
state_path = Path(sessions_dir) / STATE_FILE
with open(state_path, "w") as f:
json.dump(state, f)
def get_new_files(root_path):
"""Find files that are new or modified since last sync."""
state = {}
all_sessions_dirs = set()
agents_dir = Path(root_path) / "agents"
if agents_dir.exists():
for agent_folder in agents_dir.iterdir():
if agent_folder.is_dir():
sessions_dir = agent_folder / SESSIONS_DIR_NAME
if sessions_dir.exists():
all_sessions_dirs.add(str(sessions_dir))
# Load existing state from all session dirs
merged_state = {}
for sd in all_sessions_dirs:
sd_state = get_sync_state(sd)
merged_state.update(sd_state)
new_files = []
for agent_name, jsonl_path in find_sessions(root_path):
stat = os.stat(jsonl_path)
mtime = stat.st_mtime
file_key = jsonl_path
old_mtime = merged_state.get(file_key, 0)
if mtime > old_mtime:
new_files.append((agent_name, jsonl_path))
merged_state[file_key] = mtime
# Save new state
for sd in all_sessions_dirs:
dir_files = {k: v for k, v in merged_state.items() if k.startswith(sd)}
save_sync_state(sd, dir_files)
return new_files
# ─────────────────────────────────────────────────────────────────
# JSONL Parser
# ─────────────────────────────────────────────────────────────────
def parse_jsonl(file_path):
"""Parse a JSONL file and return structured session data."""
sessions = []
messages = []
tool_calls = []
# State tracking for model/thinking changes
current_model_provider = ""
current_model_id = ""
current_thinking_level = ""
# Tool results lookup by tool_call_id
tool_results = {}
events = []
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
events.append(event)
except json.JSONDecodeError:
continue
if not events:
return sessions, messages, tool_calls
# First pass: extract session metadata
session_event = None
for event in events:
event_type = event.get("type", "")
if event_type == "session":
session_event = event
break
if not session_event:
return sessions, messages, tool_calls
session_id = session_event.get("id", "")
session_timestamp = session_event.get("timestamp", "")
session_cwd = session_event.get("cwd", "")
session_version = events[-1].get("version", 0) if events else 0
# Determine start and end time from all events
timestamps = []
for event in events:
ts = event.get("timestamp", "")
if ts:
timestamps.append(ts)
# Second pass: process events
message_seq = 0
total_tokens = 0
total_cost = 0.0
message_count = 0
tool_call_count = 0
error_count = 0
for event in events:
event_type = event.get("type", "")
if event_type == "model_change":
current_model_provider = event.get("provider", "")
current_model_id = event.get("modelId", "")
elif event_type == "thinking_level_change":
current_thinking_level = event.get("thinkingLevel", "")
elif event_type == "message":
role = event.get("role", "")
msg_id = event.get("id", "")
parent_id = event.get("parentId", "")
msg_timestamp = event.get("timestamp", "")
# Extract text content (skip thinking)
content_items = event.get("content", [])
text_parts = []
tc_list = []
for item in content_items:
if isinstance(item, dict):
if item.get("type") == "text":
text_parts.append(item.get("text", ""))
elif item.get("type") == "toolCall":
tc_list.append(item)
# Skip thinking types
content_text = "\n".join(text_parts)
msg_data = {
"session_id": session_id,
"message_id": msg_id,
"parent_id": parent_id,
"seq": message_seq,
"role": role,
"content_text": content_text,
"raw_content": content_items if content_items else [],
"raw_message": event.get("content", []),
"timestamp": msg_timestamp,
}
if role == "assistant":
usage = event.get("usage", {})
msg_data.update({
"model": current_model_id,
"provider": current_model_provider,
"stop_reason": event.get("stopReason", ""),
"tokens_input": usage.get("inputTokens", 0),
"tokens_output": usage.get("outputTokens", 0),
"tokens_cache_read": usage.get("cacheReadInputTokens", 0),
"tokens_cache_write": usage.get("cacheWriteInputTokens", 0),
"tokens_total": usage.get("totalTokens", 0),
})
total_tokens += usage.get("totalTokens", 0)
if event.get("cost"):
cost_val = event["cost"].get("total", 0.0)
msg_data["cost_total"] = cost_val
total_cost += cost_val
message_count += 1
elif role == "toolResult":
msg_data.update({
"tool_call_id": event.get("toolCallId", ""),
"tool_name": event.get("toolName", ""),
"is_error": event.get("isError", False),
"exit_code": event.get("exitCode"),
"duration_ms": event.get("durationMs"),
})
if event.get("isError"):
error_count += 1
# Store for tool call association
if event.get("toolCallId"):
tool_results[event["toolCallId"]] = {
"result_text": content_text,
"is_error": event.get("isError", False),
"exit_code": event.get("exitCode"),
"duration_ms": event.get("durationMs"),
}
messages.append(msg_data)
message_seq += 1
# Extract tool calls from assistant messages
tc_seq = 0
for tc in tc_list:
tool_call_data = {
"session_id": session_id,
"message_id": msg_id,
"tool_call_id": tc.get("id", f"call_{msg_id}_{tc_seq}"),
"tool_name": tc.get("name", "unknown"),
"arguments": tc.get("arguments", {}),
"seq": tc_seq,
}
# Enrich with tool result if available
tr = tool_results.get(tool_call_data["tool_call_id"], {})
tool_call_data["result_text"] = tr.get("result_text", "")
tool_call_data["is_error"] = tr.get("is_error", False)
tool_call_data["exit_code"] = tr.get("exit_code")
tool_call_data["duration_ms"] = tr.get("duration_ms")
tool_calls.append(tool_call_data)
tool_call_count += 1
tc_seq += 1
# Build session record
start_time = timestamps[0] if timestamps else session_timestamp
end_time = timestamps[-1] if timestamps else session_timestamp
session_data = {
"session_id": session_id,
"session_version": session_version,
"model_provider": current_model_provider,
"model_id": current_model_id,
"thinking_level": current_thinking_level,
"start_time": start_time,
"end_time": end_time,
"cwd": session_cwd,
"total_tokens": total_tokens,
"total_cost": total_cost,
"message_count": message_count,
"tool_call_count": tool_call_count,
"error_count": error_count,
"raw_file_path": str(file_path),
"status": "active",
"metadata": {},
}
sessions.append(session_data)
return sessions, messages, tool_calls
# ─────────────────────────────────────────────────────────────────
# HTTP Client
# ─────────────────────────────────────────────────────────────────
def push_to_api(remote_url, payload):
"""POST structured JSON to Django API."""
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
remote_url,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=120) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
print(f"HTTP Error {e.code}: {e.read().decode('utf-8', errors='replace')}")
raise
except urllib.error.URLError as e:
print(f"URL Error: {e.reason}")
raise
except Exception as e:
print(f"Error: {e}")
raise
# ─────────────────────────────────────────────────────────────────
# Main
# ─────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Sync OpenClaw sessions to Django API")
parser.add_argument(
"--remote-url",
required=True,
help="Django API bulk_upsert endpoint URL",
)
parser.add_argument(
"--root-path",
default=".",
help="Root path containing agents/ directory (default: current dir)",
)
args = parser.parse_args()
new_files = get_new_files(args.root_path)
if not new_files:
print("No new or modified session files found.")
return
print(f"Found {len(new_files)} new/modified session(s).")
total_sessions = 0
total_messages = 0
total_tool_calls = 0
# Group by agent_name (batch per agent)
agent_batches = {}
for agent_name, jsonl_path in new_files:
agent_batches.setdefault(agent_name, []).append(jsonl_path)
for agent_name, file_paths in agent_batches.items():
all_sessions = []
all_messages = []
all_tool_calls = []
for fp in file_paths:
print(f" Parsing: {fp}")
try:
sessions, messages, tool_calls = parse_jsonl(fp)
all_sessions.extend(sessions)
all_messages.extend(messages)
all_tool_calls.extend(tool_calls)
except Exception as e:
print(f" ERROR parsing {fp}: {e}")
continue
if not all_sessions:
continue
payload = {
"agent_name": agent_name,
"source_node": os.environ.get("SOURCE_NODE", "unknown"),
"sessions": all_sessions,
"messages": all_messages,
"tool_calls": all_tool_calls,
}
print(f" Pushing {len(all_sessions)} session(s), "
f"{len(all_messages)} message(s), "
f"{len(all_tool_calls)} tool call(s)...")
try:
result = push_to_api(args.remote_url, payload)
print(f" OK: {result}")
total_sessions += result.get("sessions_upserted", 0)
total_messages += result.get("messages_upserted", 0)
total_tool_calls += result.get("tool_calls_upserted", 0)
except Exception:
print(f" FAILED to push {agent_name} sessions.")
print(f"\nSync complete: {total_sessions} sessions, "
f"{total_messages} messages, {total_tool_calls} tool calls pushed.")
if __name__ == "__main__":
main()