feat: add CronJob and CronJobRun models with bulk upsert API and admin

- CronJob: maps jobs.json (schedule, payload, delivery, state fields)
- CronJobRun: stores runs/*.jsonl per-job execution history with usage/tokens
- cron_bulk_upsert service: atomic upsert with GET_OR_CREATE for idempotency
- POST /api/cron/bulk_upsert/ endpoint
- Django Admin: CronJobAdmin with CronJobRunInline, CronJobRunAdmin
- sync_sessions.py --cron mode: SSH read jobs.json + runs/*.jsonl, incremental sync
- 0003_cronjob_cronjobrun migration
This commit is contained in:
ishenwei
2026-04-13 13:34:24 +08:00
parent 1a1985a270
commit 74458b4fab
8 changed files with 511 additions and 18 deletions

View File

@@ -6,17 +6,26 @@ Scans local agent sessions directories, parses JSONL files,
and pushes structured JSON to the Django API.
Usage:
# Session sync (existing)
python sync_sessions.py --remote-url http://macmini:8000/api/sessions/bulk_upsert/
# Cron job sync (new)
python sync_sessions.py --cron \
--remote-url http://macmini:8000/api/cron/bulk_upsert/ \
--cron-ssh macmini \
--cron-jobs-path /Users/weishen/openclaw/cron/jobs.json \
--cron-runs-path /Users/weishen/openclaw/cron/runs/
Cron:
0 2 * * * cd /path/to/scripts && python sync_sessions.py --remote-url <url>
0 3 * * * cd /path/to/scripts && python sync_sessions.py --cron --remote-url <cron-url> ...
"""
import argparse
import json
import os
import subprocess
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
@@ -28,6 +37,37 @@ from pathlib import Path
SESSIONS_DIR_NAME = "sessions"
STATE_FILE = ".sync_state"
DELETED_SUFFIX = ".deleted."
CRON_STATE_FILE = ".sync_cron_state"
# ─────────────────────────────────────────────────────────────────
# SSH Helper
# ─────────────────────────────────────────────────────────────────
def ssh_read_file(host, remote_path):
"""Read a remote file via SSH and return content as string."""
result = subprocess.run(
["ssh", host, f"cat {remote_path}"],
capture_output=True,
text=True,
timeout=60,
)
if result.returncode != 0:
raise RuntimeError(f"SSH read failed for {host}:{remote_path}: {result.stderr}")
return result.stdout
def ssh_list_files(host, remote_dir, pattern="*.jsonl"):
"""List remote files matching pattern via SSH."""
result = subprocess.run(
["ssh", host, f"ls {remote_dir}/{pattern}"],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
return []
return [f.strip() for f in result.stdout.strip().split("\n") if f.strip()]
# ─────────────────────────────────────────────────────────────────
@@ -109,7 +149,7 @@ def get_new_files(root_path):
# ─────────────────────────────────────────────────────────────────
# JSONL Parser
# JSONL Parser (Session mode)
# ─────────────────────────────────────────────────────────────────
def parse_jsonl(file_path):
@@ -118,12 +158,9 @@ def parse_jsonl(file_path):
messages = []
tool_calls = []
# State tracking for model/thinking changes
current_model_provider = ""
current_model_id = ""
current_thinking_level = ""
# Tool results lookup by tool_call_id
tool_results = {}
events = []
@@ -141,7 +178,6 @@ def parse_jsonl(file_path):
if not events:
return sessions, messages, tool_calls
# First pass: extract session metadata
session_event = None
for event in events:
event_type = event.get("type", "")
@@ -157,14 +193,12 @@ def parse_jsonl(file_path):
session_cwd = session_event.get("cwd", "")
session_version = events[-1].get("version", 0) if events else 0
# Determine start and end time from all events
timestamps = []
for event in events:
ts = event.get("timestamp", "")
if ts:
timestamps.append(ts)
# Second pass: process events
message_seq = 0
total_tokens = 0
total_cost = 0.0
@@ -183,14 +217,12 @@ def parse_jsonl(file_path):
current_thinking_level = event.get("thinkingLevel", "")
elif event_type == "message":
# Nested structure: message data is inside "message" object
message_obj = event.get("message", {})
role = message_obj.get("role", "")
msg_id = event.get("id", "")
parent_id = event.get("parentId", "")
msg_timestamp = event.get("timestamp", "")
# Extract text content (skip thinking) from nested content
content_items = message_obj.get("content", [])
text_parts = []
tc_list = []
@@ -200,7 +232,6 @@ def parse_jsonl(file_path):
text_parts.append(item.get("text", ""))
elif item.get("type") == "toolCall":
tc_list.append(item)
# Skip thinking types
content_text = "\n".join(text_parts)
@@ -258,7 +289,6 @@ def parse_jsonl(file_path):
messages.append(msg_data)
message_seq += 1
# Extract tool calls from assistant messages
tc_seq = 0
for tc in tc_list:
tool_call_data = {
@@ -269,7 +299,6 @@ def parse_jsonl(file_path):
"arguments": tc.get("arguments", {}),
"seq": tc_seq,
}
# Enrich with tool result if available
tr = tool_results.get(tool_call_data["tool_call_id"], {})
tool_call_data["result_text"] = tr.get("result_text", "")
tool_call_data["is_error"] = tr.get("is_error", False)
@@ -279,7 +308,6 @@ def parse_jsonl(file_path):
tool_call_count += 1
tc_seq += 1
# Build session record
start_time = timestamps[0] if timestamps else session_timestamp
end_time = timestamps[-1] if timestamps else session_timestamp
@@ -306,6 +334,127 @@ def parse_jsonl(file_path):
return sessions, messages, tool_calls
# ─────────────────────────────────────────────────────────────────
# Cron Sync Mode
# ─────────────────────────────────────────────────────────────────
def get_cron_state(state_file_path):
"""Read cron sync state, return {run_file: mtime}."""
p = Path(state_file_path)
if not p.exists():
return {}
try:
with open(p) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {}
def save_cron_state(state_file_path, state):
"""Write cron sync state."""
p = Path(state_file_path)
with open(p, "w") as f:
json.dump(state, f)
def sync_cron_jobs(args):
"""Sync cron jobs from openclaw cron data."""
ssh_host = args.cron_ssh
jobs_path = args.cron_jobs_path
runs_path = args.cron_runs_path.rstrip("/")
print(f"Fetching jobs.json from {ssh_host}:{jobs_path}...")
try:
jobs_raw = ssh_read_file(ssh_host, jobs_path)
jobs_data = json.loads(jobs_raw)
except Exception as e:
print(f"ERROR reading jobs.json: {e}")
return
jobs = jobs_data.get("jobs", [])
job_ids = {j["id"] for j in jobs}
print(f" Found {len(jobs)} jobs")
# Find runs files, filter to only those matching known job IDs
print(f"Scanning runs directory {ssh_host}:{runs_path}/...")
all_run_files = ssh_list_files(ssh_host, runs_path, "*.jsonl")
run_files = [f for f in all_run_files if Path(f).stem in job_ids]
print(f" Found {len(run_files)} run files matching known job IDs")
# Load sync state
state_file = Path.home() / ".sync_cron_state"
prev_state = get_cron_state(str(state_file))
new_runs = []
new_state = {}
for run_file in run_files:
remote_full = f"{runs_path}/{run_file}"
# Get mtime via SSH
result = subprocess.run(
["ssh", ssh_host, f"stat -c %Y {remote_full}"],
capture_output=True, text=True, timeout=10,
)
if result.returncode != 0:
continue
try:
mtime = int(result.stdout.strip())
except ValueError:
continue
old_mtime = prev_state.get(remote_full, 0)
if mtime > old_mtime:
new_runs.append(remote_full)
new_state[remote_full] = mtime
if not new_runs:
print("No new or modified run files found.")
save_cron_state(str(state_file), new_state)
return
print(f"Parsing {len(new_runs)} new/modified run file(s)...")
all_runs = []
for run_file in new_runs:
print(f" Parsing: {run_file}")
try:
raw = ssh_read_file(ssh_host, run_file)
for line in raw.strip().split("\n"):
line = line.strip()
if not line:
continue
try:
run_obj = json.loads(line)
all_runs.append(run_obj)
except json.JSONDecodeError:
continue
except Exception as e:
print(f" ERROR reading {run_file}: {e}")
continue
if not all_runs:
print("No run records parsed.")
save_cron_state(str(state_file), new_state)
return
# Save new state
save_cron_state(str(state_file), new_state)
payload = {
"source_node": os.environ.get("SOURCE_NODE", ssh_host),
"jobs": jobs,
"runs": all_runs,
}
print(f"Pushing {len(jobs)} jobs and {len(all_runs)} runs to {args.remote_url}...")
try:
result = push_to_api(args.remote_url, payload)
print(f" OK: jobs_upserted={result.get('jobs_upserted', 0)}, "
f"runs_upserted={result.get('runs_upserted', 0)}")
except Exception as e:
print(f" FAILED to push cron data: {e}")
# ─────────────────────────────────────────────────────────────────
# HTTP Client
# ─────────────────────────────────────────────────────────────────
@@ -333,13 +482,12 @@ def push_to_api(remote_url, payload):
raise
# ─────────────────────────────────────────────────────────────────
# Main
# ─────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Sync OpenClaw sessions to Django API")
parser = argparse.ArgumentParser(description="Sync OpenClaw sessions or cron data to Django API")
parser.add_argument(
"--remote-url",
required=True,
@@ -350,8 +498,34 @@ def main():
default=".",
help="Root path containing agents/ directory (default: current dir)",
)
parser.add_argument(
"--cron",
action="store_true",
help="Sync cron jobs and runs instead of session files",
)
parser.add_argument(
"--cron-ssh",
default="macmini",
help="SSH host for cron data (default: macmini)",
)
parser.add_argument(
"--cron-jobs-path",
default="/Users/weishen/openclaw/cron/jobs.json",
help="Remote path to jobs.json",
)
parser.add_argument(
"--cron-runs-path",
default="/Users/weishen/openclaw/cron/runs/",
help="Remote directory containing run JSONL files",
)
args = parser.parse_args()
if args.cron:
sync_cron_jobs(args)
return
# Original session sync mode
new_files = get_new_files(args.root_path)
if not new_files:
print("No new or modified session files found.")
@@ -363,7 +537,6 @@ def main():
total_messages = 0
total_tool_calls = 0
# Group by agent_name (batch per agent)
agent_batches = {}
for agent_name, jsonl_path in new_files:
agent_batches.setdefault(agent_name, []).append(jsonl_path)