Compare commits

..

11 Commits

Author SHA1 Message Date
ishenwei
c517a0bbde sync_sessions.py: add --sync-ssh for remote session sync via SSH
- Add --sync-ssh HOST --sync-source-path PATH to sync sessions from
  a remote host without needing to install the script there.
- Key changes:
  * ssh_list_dir(): enumerate subdirs only (fixes 'sessions' file in agents/)
  * ssh_list_files(): use ls -p to filter directories; strip remote_dir prefix
  * ssh_stat_mtime_batch(): single SSH call via Python stdin, works on
    both Linux and macOS, avoids xargs command-line length limits
  * sync_sessions_via_ssh(): parallel mtime fetch per agent dir, then
    SSH-read & parse only new/modified files
  * parse_jsonl_content(): shared parser for both local files and SSH content
- Session sync now unified: run from macmini, targets macmini/ubuntu1/ubuntu2.
- Cron sync also updated to use ssh_stat_mtime_batch for efficiency.
2026-04-16 10:06:24 +08:00
ishenwei
6f0b3e231a Add: test scripts (sync_sessions, test_all_pages, test_e2e_agent_browser) 2026-04-14 11:04:37 +08:00
ishenwei
74458b4fab feat: add CronJob and CronJobRun models with bulk upsert API and admin
- CronJob: maps jobs.json (schedule, payload, delivery, state fields)
- CronJobRun: stores runs/*.jsonl per-job execution history with usage/tokens
- cron_bulk_upsert service: atomic upsert with GET_OR_CREATE for idempotency
- POST /api/cron/bulk_upsert/ endpoint
- Django Admin: CronJobAdmin with CronJobRunInline, CronJobRunAdmin
- sync_sessions.py --cron mode: SSH read jobs.json + runs/*.jsonl, incremental sync
- 0003_cronjob_cronjobrun migration
2026-04-13 13:34:24 +08:00
ishenwei
1a1985a270 fix: use hardcoded href instead of {% url %} to avoid NoReverseMatch in templates 2026-04-08 20:52:03 +08:00
ishenwei
6bd99d043c fix: add Daily Reports to admin sidebar via ready() monkey-patch
- openclaw_daily/apps.py ready(): monkey-patch admin.site.get_urls() to add
  daily/ and daily-reports/ URLs under admin namespace (admin: prefix)
- admin_custom_site.py: simplified (no custom site needed)
- Revert urls.py to use default admin.site
- URL now correctly resolves as admin:openclaw_daily_report_detail
2026-04-08 20:48:00 +08:00
ishenwei
21c5e895e0 feat: add Daily Reports to admin sidebar via monkey-patch
- urls.py: add daily/ and daily-reports/ URL patterns directly
- admin_new_views.py: use admin.site.each_context for full admin context
- openclaw_daily apps.py: monkey-patch admin.site.get_app_list to inject
  Daily Reports item into sidebar app list (no circular import issues)
2026-04-08 20:44:44 +08:00
ishenwei
9542ebde73 fix: use request.admin_site in daily report views to avoid circular import 2026-04-08 20:36:46 +08:00
ishenwei
9412e7880d fix: add Daily Reports to sidebar via get_app_list override
- Create openclaw_daily app (no models, just app config)
- Override get_app_list() in OpenClawAdminSite to inject Daily Reports item
- Use get_app_list approach (not nav-sidebar template override)
- Inject at top of app list for visibility
2026-04-08 19:42:10 +08:00
ishenwei
721b113c9d feat: add Daily Report to admin sidebar via custom AdminSite
- Create OpenClawAdminSite (openclaw_admin) with Daily Reports menu in sidebar
- Re-register Session/Message/ToolCall with custom site
- Add daily/daily-reports URLs at AdminSite level
- Override admin/index.html template for sidebar item
- Use self.admin_view (not admin.site.admin_view) for site-aware view wrapping
- Clean up admin.py: remove inline URLs from SessionAdmin (now at site level)
2026-04-08 19:38:02 +08:00
ishenwei
49d772139d feat: add daily report list and detail views
- New daily_report_list_view at /admin/openclaw/session/daily-reports/
  - Shows (agent, date) combos with message/session counts
  - Filter by agent name and date range
  - Click date to enter detail page
- New daily_report_detail_view at /admin/openclaw/session/daily-reports/<agent>/<date>/
  - Shows all messages for that agent on that date
  - Full message content, tool calls, arguments, results
  - Session metadata header per session
- Added django.contrib.humanize for intcomma template filter
2026-04-08 19:25:07 +08:00
ishenwei
1a32801e39 Add tool_call_count and status to Session list_display 2026-04-08 13:35:50 +08:00
16 changed files with 2829 additions and 267 deletions

View File

@@ -2,21 +2,36 @@
""" """
OpenClaw Session Sync Script OpenClaw Session Sync Script
Scans local agent sessions directories, parses JSONL files, Scans local or remote (via SSH) agent session directories, parses JSONL files,
and pushes structured JSON to the Django API. and pushes structured JSON to the Django API.
Usage: Usage:
# Local session sync
python sync_sessions.py --remote-url http://macmini:8000/api/sessions/bulk_upsert/ python sync_sessions.py --remote-url http://macmini:8000/api/sessions/bulk_upsert/
Cron: # Remote session sync via SSH (single source, run from macmini)
0 2 * * * cd /path/to/scripts && python sync_sessions.py --remote-url <url> python sync_sessions.py --sync-ssh ubuntu1 \
--sync-source-path /home/shenwei/.openclaw \
--remote-url http://192.168.3.45:8765/api/sessions/bulk_upsert/
# Remote session sync via SSH (all three nodes)
python sync_sessions.py --sync-ssh macmini \
--sync-source-path /Users/weishen/.openclaw \
--remote-url http://192.168.3.45:8765/api/sessions/bulk_upsert/
# Cron job sync (existing)
python sync_sessions.py --cron \
--remote-url http://macmini:8000/api/cron/bulk_upsert/ \
--cron-ssh macmini \
--cron-jobs-path /Users/weishen/openclaw/cron/jobs.json \
--cron-runs-path /Users/weishen/openclaw/cron/runs/
""" """
import argparse import argparse
import json import json
import os import os
import subprocess
import sys import sys
import time
import urllib.error import urllib.error
import urllib.request import urllib.request
from pathlib import Path from pathlib import Path
@@ -28,10 +43,315 @@ from pathlib import Path
SESSIONS_DIR_NAME = "sessions" SESSIONS_DIR_NAME = "sessions"
STATE_FILE = ".sync_state" STATE_FILE = ".sync_state"
DELETED_SUFFIX = ".deleted." DELETED_SUFFIX = ".deleted."
CRON_STATE_FILE = ".sync_cron_state"
SESSION_STATE_FILE = ".sync_session_ssh_state"
# ───────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────
# File Discovery # SSH Helper
# ─────────────────────────────────────────────────────────────────
def ssh_read_file(host, remote_path):
"""Read a remote file via SSH and return content as string."""
result = subprocess.run(
["ssh", host, f"cat {remote_path}"],
capture_output=True,
text=True,
timeout=60,
)
if result.returncode != 0:
raise RuntimeError(f"SSH read failed for {host}:{remote_path}: {result.stderr}")
return result.stdout
def ssh_list_dir(host, remote_dir):
"""List only directories (basenames) in a remote directory via SSH."""
# Use find to list directories only (type -d), then strip the remote_dir prefix
result = subprocess.run(
["ssh", host, f"find {remote_dir} -maxdepth 1 -type d -name '*' | sort"],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
return []
dirs = []
for f in result.stdout.strip().split("\n"):
f = f.strip()
if not f or f == remote_dir:
continue
# Strip remote_dir prefix
basename = f[len(remote_dir):].lstrip("/")
dirs.append(basename)
return dirs
def ssh_list_files(host, remote_dir, pattern="*"):
"""List regular files matching pattern via SSH. Returns basenames only."""
result = subprocess.run(
["ssh", host, f"ls -p {remote_dir}/{pattern}"],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
return []
files = []
for f in result.stdout.strip().split("\n"):
f = f.strip()
if not f:
continue
# `ls -p` appends / to directories; filter them out
if f.endswith("/"):
continue
# ls returns full paths; strip the remote_dir prefix
if f.startswith(remote_dir):
f = f[len(remote_dir):].lstrip("/")
files.append(f)
return files
def ssh_stat_mtime_batch(host, file_list, sessions_dir):
"""
Batch-get mtimes for multiple files via a single SSH call.
Passes file list on stdin to avoid command-line length limits.
Returns {remote_full_path: mtime}.
"""
if not file_list:
return {}
# Build a newline-separated list of files to stat
# Escape single quotes in paths by replacing ' -> '\'' (shell quoting)
safe_list = "\n".join(f"{sessions_dir}/{fn}" for fn in file_list)
# Remote Python one-liner reads paths from stdin, stats each, prints "path:mtime\n"
remote_cmd = (
"python3 -c \""
"import sys,os; [print(f'{p}:{os.path.getmtime(p)}') for p in sys.stdin.read().splitlines()]"
"\""
)
result = subprocess.run(
["ssh", host, remote_cmd],
input=safe_list,
capture_output=True,
text=True,
timeout=60,
)
mtimes = {}
for line in result.stdout.strip().split("\n"):
line = line.strip()
if not line or ":" not in line:
continue
path, mtime_str = line.rsplit(":", 1)
try:
mtimes[path] = int(float(mtime_str))
except ValueError:
continue
return mtimes
# ─────────────────────────────────────────────────────────────────
# JSONL Parser (Session mode)
# ─────────────────────────────────────────────────────────────────
def parse_jsonl_content(content):
"""
Parse JSONL content string and return (sessions, messages, tool_calls).
This is the core parsing logic shared by both local and SSH modes.
"""
sessions = []
messages = []
tool_calls = []
current_model_provider = ""
current_model_id = ""
current_thinking_level = ""
tool_results = {}
events = []
for line in content.strip().split("\n"):
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
events.append(event)
except json.JSONDecodeError:
continue
if not events:
return sessions, messages, tool_calls
session_event = None
for event in events:
event_type = event.get("type", "")
if event_type == "session":
session_event = event
break
if not session_event:
return sessions, messages, tool_calls
session_id = session_event.get("id", "")
session_timestamp = session_event.get("timestamp", "")
session_cwd = session_event.get("cwd", "")
session_version = events[-1].get("version", 0) if events else 0
timestamps = []
for event in events:
ts = event.get("timestamp", "")
if ts:
timestamps.append(ts)
message_seq = 0
total_tokens = 0
total_cost = 0.0
message_count = 0
tool_call_count = 0
error_count = 0
for event in events:
event_type = event.get("type", "")
if event_type == "model_change":
current_model_provider = event.get("provider", "")
current_model_id = event.get("modelId", "")
elif event_type == "thinking_level_change":
current_thinking_level = event.get("thinkingLevel", "")
elif event_type == "message":
message_obj = event.get("message", {})
role = message_obj.get("role", "")
msg_id = event.get("id", "")
parent_id = event.get("parentId", "")
msg_timestamp = event.get("timestamp", "")
content_items = message_obj.get("content", [])
text_parts = []
tc_list = []
for item in content_items:
if isinstance(item, dict):
if item.get("type") == "text":
text_parts.append(item.get("text", ""))
elif item.get("type") == "toolCall":
tc_list.append(item)
content_text = "\n".join(text_parts)
msg_data = {
"session_id": session_id,
"message_id": msg_id,
"parent_id": parent_id or "",
"seq": message_seq,
"role": role or "",
"content_text": content_text,
"raw_content": content_items if content_items else [],
"raw_message": message_obj.get("content", []),
"timestamp": msg_timestamp,
}
if role == "assistant":
usage = message_obj.get("usage", {})
msg_data.update({
"model": current_model_id,
"provider": current_model_provider,
"stop_reason": message_obj.get("stopReason", ""),
"tokens_input": usage.get("inputTokens", 0),
"tokens_output": usage.get("outputTokens", 0),
"tokens_cache_read": usage.get("cacheReadInputTokens", 0),
"tokens_cache_write": usage.get("cacheWriteInputTokens", 0),
"tokens_total": usage.get("totalTokens", 0),
})
total_tokens += usage.get("totalTokens", 0)
if message_obj.get("cost"):
cost_val = message_obj["cost"].get("total", 0.0)
msg_data["cost_total"] = cost_val
total_cost += cost_val
message_count += 1
elif role == "toolResult":
msg_data.update({
"tool_call_id": message_obj.get("toolCallId", ""),
"tool_name": message_obj.get("toolName", ""),
"is_error": message_obj.get("isError", False),
"exit_code": message_obj.get("exitCode"),
"duration_ms": message_obj.get("durationMs"),
})
if message_obj.get("isError"):
error_count += 1
if message_obj.get("toolCallId"):
tool_results[message_obj["toolCallId"]] = {
"result_text": content_text,
"is_error": message_obj.get("isError", False),
"exit_code": message_obj.get("exitCode"),
"duration_ms": message_obj.get("durationMs"),
}
messages.append(msg_data)
message_seq += 1
tc_seq = 0
for tc in tc_list:
tool_call_data = {
"session_id": session_id,
"message_id": msg_id,
"tool_call_id": tc.get("id", f"call_{msg_id}_{tc_seq}"),
"tool_name": tc.get("name", "unknown"),
"arguments": tc.get("arguments", {}),
"seq": tc_seq,
}
tr = tool_results.get(tool_call_data["tool_call_id"], {})
tool_call_data["result_text"] = tr.get("result_text", "")
tool_call_data["is_error"] = tr.get("is_error", False)
tool_call_data["exit_code"] = tr.get("exit_code")
tool_call_data["duration_ms"] = tr.get("duration_ms")
tool_calls.append(tool_call_data)
tool_call_count += 1
tc_seq += 1
start_time = timestamps[0] if timestamps else session_timestamp
end_time = timestamps[-1] if timestamps else session_timestamp
session_data = {
"session_id": session_id,
"session_version": session_version,
"model_provider": current_model_provider,
"model_id": current_model_id,
"thinking_level": current_thinking_level,
"start_time": start_time,
"end_time": end_time,
"cwd": session_cwd,
"total_tokens": total_tokens,
"total_cost": total_cost,
"message_count": message_count,
"tool_call_count": tool_call_count,
"error_count": error_count,
"raw_file_path": "",
"status": "active",
"metadata": {},
}
sessions.append(session_data)
return sessions, messages, tool_calls
def parse_jsonl(file_path):
"""Parse a local JSONL file and return structured session data."""
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
sessions, messages, tool_calls = parse_jsonl_content(content)
# Fill raw_file_path for local files
for s in sessions:
s["raw_file_path"] = str(file_path)
return sessions, messages, tool_calls
# ─────────────────────────────────────────────────────────────────
# File Discovery (local)
# ───────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────
def find_sessions(root_path): def find_sessions(root_path):
@@ -109,201 +429,255 @@ def get_new_files(root_path):
# ───────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────
# JSONL Parser # Session Sync via SSH
# ───────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────
def parse_jsonl(file_path): def get_ssh_sync_state(state_file_path):
"""Parse a JSONL file and return structured session data.""" """Read SSH session sync state, return {file_key: mtime}."""
sessions = [] p = Path(state_file_path)
messages = [] if not p.exists():
tool_calls = [] return {}
try:
with open(p) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {}
# State tracking for model/thinking changes
current_model_provider = ""
current_model_id = ""
current_thinking_level = ""
# Tool results lookup by tool_call_id def save_ssh_sync_state(state_file_path, state):
tool_results = {} """Write SSH session sync state."""
p = Path(state_file_path)
with open(p, "w") as f:
json.dump(state, f)
events = []
with open(file_path, "r", encoding="utf-8") as f: def sync_sessions_via_ssh(args):
for line in f: """Sync sessions from a remote host via SSH."""
line = line.strip() ssh_host = args.sync_ssh
if not line: source_path = args.sync_source_path.rstrip("/")
print(f"[SSH Session Sync] host={ssh_host}, source={source_path}")
# Enumerate all agents on remote
agents_dir = f"{source_path}/agents"
print(f" Scanning {ssh_host}:{agents_dir}/ ...")
agent_folders = ssh_list_dir(ssh_host, agents_dir)
# Load sync state
state_file = Path.home() / SESSION_STATE_FILE
prev_state = get_ssh_sync_state(str(state_file))
new_state = {}
new_files = [] # (agent_name, remote_file_path)
for agent_folder in sorted(agent_folders):
sessions_dir = f"{agents_dir}/{agent_folder}/{SESSIONS_DIR_NAME}"
# List .jsonl files, excluding deleted
raw_files = ssh_list_files(ssh_host, sessions_dir, "*.jsonl")
jsonl_files = [
f for f in raw_files
if DELETED_SUFFIX not in f and f.endswith(".jsonl")
]
if not jsonl_files:
continue
# Batch-fetch mtimes for all files in this agent's sessions dir
all_mtimes = ssh_stat_mtime_batch(ssh_host, sorted(jsonl_files), sessions_dir)
for jsonl_file in sorted(jsonl_files):
remote_full = f"{sessions_dir}/{jsonl_file}"
mtime = all_mtimes.get(remote_full)
if mtime is None:
continue continue
old_mtime = prev_state.get(remote_full, 0)
if mtime > old_mtime:
new_files.append((agent_folder, remote_full))
new_state[remote_full] = mtime
if not new_files:
print(" No new or modified session files found.")
save_ssh_sync_state(str(state_file), new_state)
return
print(f" Found {len(new_files)} new/modified session file(s).")
# Save new state
save_ssh_sync_state(str(state_file), new_state)
# Group by agent_name
agent_batches = {}
for agent_name, remote_path in new_files:
agent_batches.setdefault(agent_name, []).append(remote_path)
total_sessions = 0
total_messages = 0
total_tool_calls = 0
for agent_name, remote_paths in agent_batches.items():
all_sessions = []
all_messages = []
all_tool_calls = []
for remote_path in remote_paths:
print(f" Parsing: {remote_path}")
try: try:
event = json.loads(line) content = ssh_read_file(ssh_host, remote_path)
events.append(event) sessions, messages, tool_calls = parse_jsonl_content(content)
except json.JSONDecodeError: # Fill raw_file_path with remote path
for s in sessions:
s["raw_file_path"] = remote_path
all_sessions.extend(sessions)
all_messages.extend(messages)
all_tool_calls.extend(tool_calls)
except Exception as e:
print(f" ERROR reading {remote_path}: {e}")
continue continue
if not events: if not all_sessions:
return sessions, messages, tool_calls continue
# First pass: extract session metadata payload = {
session_event = None "agent_name": agent_name,
for event in events: "source_node": ssh_host,
event_type = event.get("type", "") "sessions": all_sessions,
if event_type == "session": "messages": all_messages,
session_event = event "tool_calls": all_tool_calls,
break }
if not session_event: print(f" Pushing {len(all_sessions)} session(s), "
return sessions, messages, tool_calls f"{len(all_messages)} message(s), "
f"{len(all_tool_calls)} tool call(s) ...")
session_id = session_event.get("id", "") try:
session_timestamp = session_event.get("timestamp", "") result = push_to_api(args.remote_url, payload)
session_cwd = session_event.get("cwd", "") print(f" OK: sessions_upserted={result.get('sessions_upserted', 0)}, "
session_version = events[-1].get("version", 0) if events else 0 f"messages_upserted={result.get('messages_upserted', 0)}, "
f"tool_calls_upserted={result.get('tool_calls_upserted', 0)}")
total_sessions += result.get("sessions_upserted", 0)
total_messages += result.get("messages_upserted", 0)
total_tool_calls += result.get("tool_calls_upserted", 0)
except Exception as e:
print(f" FAILED to push {agent_name} sessions: {e}")
# Determine start and end time from all events print(f"\nSync complete: {total_sessions} sessions, "
timestamps = [] f"{total_messages} messages, {total_tool_calls} tool calls pushed.")
for event in events:
ts = event.get("timestamp", "")
if ts:
timestamps.append(ts)
# Second pass: process events
message_seq = 0
total_tokens = 0
total_cost = 0.0
message_count = 0
tool_call_count = 0
error_count = 0
for event in events: # ─────────────────────────────────────────────────────────────────
event_type = event.get("type", "") # Cron Sync Mode
# ─────────────────────────────────────────────────────────────────
if event_type == "model_change": def get_cron_state(state_file_path):
current_model_provider = event.get("provider", "") """Read cron sync state, return {run_file: mtime}."""
current_model_id = event.get("modelId", "") p = Path(state_file_path)
if not p.exists():
return {}
try:
with open(p) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {}
elif event_type == "thinking_level_change":
current_thinking_level = event.get("thinkingLevel", "")
elif event_type == "message": def save_cron_state(state_file_path, state):
# Nested structure: message data is inside "message" object """Write cron sync state."""
message_obj = event.get("message", {}) p = Path(state_file_path)
role = message_obj.get("role", "") with open(p, "w") as f:
msg_id = event.get("id", "") json.dump(state, f)
parent_id = event.get("parentId", "")
msg_timestamp = event.get("timestamp", "")
# Extract text content (skip thinking) from nested content
content_items = message_obj.get("content", [])
text_parts = []
tc_list = []
for item in content_items:
if isinstance(item, dict):
if item.get("type") == "text":
text_parts.append(item.get("text", ""))
elif item.get("type") == "toolCall":
tc_list.append(item)
# Skip thinking types
content_text = "\n".join(text_parts) def sync_cron_jobs(args):
"""Sync cron jobs from openclaw cron data."""
ssh_host = args.cron_ssh
jobs_path = args.cron_jobs_path
runs_path = args.cron_runs_path.rstrip("/")
msg_data = { print(f"[Cron Sync] host={ssh_host}, jobs={jobs_path}")
"session_id": session_id, try:
"message_id": msg_id, jobs_raw = ssh_read_file(ssh_host, jobs_path)
"parent_id": parent_id or "", jobs_data = json.loads(jobs_raw)
"seq": message_seq, except Exception as e:
"role": role or "", print(f"ERROR reading jobs.json: {e}")
"content_text": content_text, return
"raw_content": content_items if content_items else [],
"raw_message": message_obj.get("content", []),
"timestamp": msg_timestamp,
}
if role == "assistant": jobs = jobs_data.get("jobs", [])
usage = message_obj.get("usage", {}) job_ids = {j["id"] for j in jobs}
msg_data.update({ print(f" Found {len(jobs)} jobs")
"model": current_model_id,
"provider": current_model_provider,
"stop_reason": message_obj.get("stopReason", ""),
"tokens_input": usage.get("inputTokens", 0),
"tokens_output": usage.get("outputTokens", 0),
"tokens_cache_read": usage.get("cacheReadInputTokens", 0),
"tokens_cache_write": usage.get("cacheWriteInputTokens", 0),
"tokens_total": usage.get("totalTokens", 0),
})
total_tokens += usage.get("totalTokens", 0)
if message_obj.get("cost"): # Find runs files, filter to only those matching known job IDs
cost_val = message_obj["cost"].get("total", 0.0) print(f"Scanning runs directory {ssh_host}:{runs_path}/ ...")
msg_data["cost_total"] = cost_val all_run_files = ssh_list_files(ssh_host, runs_path, "*.jsonl")
total_cost += cost_val run_files = [f for f in all_run_files if Path(f).stem in job_ids]
print(f" Found {len(run_files)} run files matching known job IDs")
message_count += 1 # Load sync state
state_file = Path.home() / CRON_STATE_FILE
prev_state = get_cron_state(str(state_file))
elif role == "toolResult": new_runs = []
msg_data.update({ new_state = {}
"tool_call_id": message_obj.get("toolCallId", ""),
"tool_name": message_obj.get("toolName", ""),
"is_error": message_obj.get("isError", False),
"exit_code": message_obj.get("exitCode"),
"duration_ms": message_obj.get("durationMs"),
})
if message_obj.get("isError"):
error_count += 1
if message_obj.get("toolCallId"):
tool_results[message_obj["toolCallId"]] = {
"result_text": content_text,
"is_error": message_obj.get("isError", False),
"exit_code": message_obj.get("exitCode"),
"duration_ms": message_obj.get("durationMs"),
}
messages.append(msg_data) # Batch-fetch mtimes for all run files
message_seq += 1 all_run_mtimes = ssh_stat_mtime_batch(ssh_host, [Path(f).name for f in run_files], runs_path)
# Extract tool calls from assistant messages for run_file in run_files:
tc_seq = 0 remote_full = f"{runs_path}/{Path(run_file).name}"
for tc in tc_list: mtime = all_run_mtimes.get(remote_full)
tool_call_data = { if mtime is None:
"session_id": session_id, continue
"message_id": msg_id,
"tool_call_id": tc.get("id", f"call_{msg_id}_{tc_seq}"),
"tool_name": tc.get("name", "unknown"),
"arguments": tc.get("arguments", {}),
"seq": tc_seq,
}
# Enrich with tool result if available
tr = tool_results.get(tool_call_data["tool_call_id"], {})
tool_call_data["result_text"] = tr.get("result_text", "")
tool_call_data["is_error"] = tr.get("is_error", False)
tool_call_data["exit_code"] = tr.get("exit_code")
tool_call_data["duration_ms"] = tr.get("duration_ms")
tool_calls.append(tool_call_data)
tool_call_count += 1
tc_seq += 1
# Build session record old_mtime = prev_state.get(remote_full, 0)
start_time = timestamps[0] if timestamps else session_timestamp if mtime > old_mtime:
end_time = timestamps[-1] if timestamps else session_timestamp new_runs.append(remote_full)
new_state[remote_full] = mtime
session_data = { if not new_runs:
"session_id": session_id, print(" No new or modified run files found.")
"session_version": session_version, save_cron_state(str(state_file), new_state)
"model_provider": current_model_provider, return
"model_id": current_model_id,
"thinking_level": current_thinking_level, print(f" Parsing {len(new_runs)} new/modified run file(s) ...")
"start_time": start_time,
"end_time": end_time, all_runs = []
"cwd": session_cwd, for run_file in new_runs:
"total_tokens": total_tokens, print(f" Parsing: {run_file}")
"total_cost": total_cost, try:
"message_count": message_count, raw = ssh_read_file(ssh_host, run_file)
"tool_call_count": tool_call_count, for line in raw.strip().split("\n"):
"error_count": error_count, line = line.strip()
"raw_file_path": str(file_path), if not line:
"status": "active", continue
"metadata": {}, try:
run_obj = json.loads(line)
all_runs.append(run_obj)
except json.JSONDecodeError:
continue
except Exception as e:
print(f" ERROR reading {run_file}: {e}")
continue
if not all_runs:
print(" No run records parsed.")
save_cron_state(str(state_file), new_state)
return
# Save new state
save_cron_state(str(state_file), new_state)
payload = {
"source_node": os.environ.get("SOURCE_NODE", ssh_host),
"jobs": jobs,
"runs": all_runs,
} }
sessions.append(session_data) print(f" Pushing {len(jobs)} jobs and {len(all_runs)} runs to {args.remote_url} ...")
return sessions, messages, tool_calls try:
result = push_to_api(args.remote_url, payload)
print(f" OK: jobs_upserted={result.get('jobs_upserted', 0)}, "
f"runs_upserted={result.get('runs_upserted', 0)}")
except Exception as e:
print(f" FAILED to push cron data: {e}")
# ───────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────
@@ -333,13 +707,14 @@ def push_to_api(remote_url, payload):
raise raise
# ───────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────
# Main # Main
# ───────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────
def main(): def main():
parser = argparse.ArgumentParser(description="Sync OpenClaw sessions to Django API") parser = argparse.ArgumentParser(
description="Sync OpenClaw sessions or cron data to Django API"
)
parser.add_argument( parser.add_argument(
"--remote-url", "--remote-url",
required=True, required=True,
@@ -348,10 +723,51 @@ def main():
parser.add_argument( parser.add_argument(
"--root-path", "--root-path",
default=".", default=".",
help="Root path containing agents/ directory (default: current dir)", help="Local root path containing agents/ directory (default: current dir)",
) )
parser.add_argument(
"--sync-ssh",
metavar="HOST",
help="Sync sessions from a remote host via SSH (e.g. ubuntu1, macmini)",
)
parser.add_argument(
"--sync-source-path",
default="/home/shenwei/.openclaw",
help="Remote source path for --sync-ssh (default: /home/shenwei/.openclaw)",
)
parser.add_argument(
"--cron",
action="store_true",
help="Sync cron jobs and runs instead of session files",
)
parser.add_argument(
"--cron-ssh",
default="macmini",
help="SSH host for cron data (default: macmini)",
)
parser.add_argument(
"--cron-jobs-path",
default="/Users/weishen/openclaw/cron/jobs.json",
help="Remote path to jobs.json",
)
parser.add_argument(
"--cron-runs-path",
default="/Users/weishen/openclaw/cron/runs/",
help="Remote directory containing run JSONL files",
)
args = parser.parse_args() args = parser.parse_args()
# SSH session sync mode takes priority
if args.sync_ssh:
sync_sessions_via_ssh(args)
return
if args.cron:
sync_cron_jobs(args)
return
# Local session sync mode
new_files = get_new_files(args.root_path) new_files = get_new_files(args.root_path)
if not new_files: if not new_files:
print("No new or modified session files found.") print("No new or modified session files found.")
@@ -363,7 +779,6 @@ def main():
total_messages = 0 total_messages = 0
total_tool_calls = 0 total_tool_calls = 0
# Group by agent_name (batch per agent)
agent_batches = {} agent_batches = {}
for agent_name, jsonl_path in new_files: for agent_name, jsonl_path in new_files:
agent_batches.setdefault(agent_name, []).append(jsonl_path) agent_batches.setdefault(agent_name, []).append(jsonl_path)
@@ -397,11 +812,13 @@ def main():
print(f" Pushing {len(all_sessions)} session(s), " print(f" Pushing {len(all_sessions)} session(s), "
f"{len(all_messages)} message(s), " f"{len(all_messages)} message(s), "
f"{len(all_tool_calls)} tool call(s)...") f"{len(all_tool_calls)} tool call(s) ...")
try: try:
result = push_to_api(args.remote_url, payload) result = push_to_api(args.remote_url, payload)
print(f" OK: {result}") print(f" OK: sessions_upserted={result.get('sessions_upserted', 0)}, "
f"messages_upserted={result.get('messages_upserted', 0)}, "
f"tool_calls_upserted={result.get('tool_calls_upserted', 0)}")
total_sessions += result.get("sessions_upserted", 0) total_sessions += result.get("sessions_upserted", 0)
total_messages += result.get("messages_upserted", 0) total_messages += result.get("messages_upserted", 0)
total_tool_calls += result.get("tool_calls_upserted", 0) total_tool_calls += result.get("tool_calls_upserted", 0)

711
scripts/test_all_pages.py Normal file
View File

@@ -0,0 +1,711 @@
#!/usr/bin/env python3
"""
agent-base 全页面遍历测试脚本
使用 agent-browser 进行浏览器自动化测试
功能:
- 登录 Django Admin
- 遍历所有管理页面
- 检查页面是否正常渲染(无 500/模板错误)
- 截图留存
- 保存认证状态,可重复使用
用法:
python3 test_all_pages.py # 运行全部测试
python3 test_all_pages.py --login # 强制重新登录(不使用缓存)
python3 test_all_pages.py --screenshot # 仅截图不测试
python3 test_all_pages.py --page /admin/openclaw/session/ # 测试指定页面
"""
import subprocess
import json
import sys
import os
import re
import time
import argparse
import urllib.request
import urllib.parse
import http.cookiejar
from datetime import datetime
from pathlib import Path
# ========== 配置 ==========
BASE_URL = "http://192.168.3.45:8765"
ADMIN_URL = f"{BASE_URL}/admin/"
USERNAME = "admin"
PASSWORD = "admin123"
SESSION_NAME = "agent-base-full-test"
STATE_FILE = "/tmp/agent-browser-auth.json"
SCREENSHOT_DIR = "/tmp/agent-browser-screenshots"
# agent-browser 路径Mac mini 本地)
AGENT_BROWSER = "/opt/homebrew/bin/agent-browser"
# 项目所有页面列表
PAGES_TO_TEST = [
# 页面路径, 描述, 是否需要登录
("/admin/login/", "登录页", False),
("/admin/", "Admin 首页", True),
("/admin/openclaw/session/", "Session 管理", True),
("/admin/openclaw/message/", "Message 管理", True),
("/admin/openclaw/toolcall/", "ToolCall 管理", True),
("/admin/daily-reports/", "日报列表", True),
("/admin/daily-reports/xingjiang/2026-4-7/", "日报详情-星匠-4月7日", True),
("/admin/daily-reports/xingjiang/2026-4-8/", "日报详情-星匠-4月8日", True),
("/api/sessions/bulk_upsert/", "API Bulk Upsert", True),
]
# ========== 工具函数 ==========
def log(msg: str, emoji: str = ""):
ts = datetime.now().strftime("%H:%M:%S")
prefix = f"[{ts}]"
if emoji:
prefix += f" {emoji}"
print(f"{prefix} {msg}", flush=True)
def run_browser(args: list, timeout: int = 30, check: bool = True) -> tuple:
"""执行 agent-browser 命令,返回 (success, result_dict)"""
cmd = [AGENT_BROWSER, "--session", SESSION_NAME, "--json"] + args
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
if result.stdout:
try:
return True, json.loads(result.stdout)
except json.JSONDecodeError:
return True, {"raw": result.stdout}
if result.returncode != 0 and check:
log(f" 命令失败 exit={result.returncode}: {' '.join(args)}", "")
if result.stderr:
log(f" stderr: {result.stderr[:200]}", "")
return False, {}
return result.returncode == 0, {}
except subprocess.TimeoutExpired:
log(f" 命令超时 {timeout}s: {' '.join(args)}", "")
return False, {}
except FileNotFoundError:
log(f" agent-browser 未找到: {AGENT_BROWSER}", "")
return False, {}
except Exception as e:
log(f" 命令异常: {e}", "")
return False, {}
def get_snapshot() -> dict:
"""获取当前页面的交互元素快照"""
success, result = run_browser(["snapshot", "-i", "--json"], timeout=20, check=False)
if success and result.get("data"):
return result["data"].get("refs", {})
return {}
def get_url() -> str:
"""获取当前 URL"""
success, result = run_browser(["get", "url", "--json"], check=False)
if success and result.get("data"):
return result["data"].get("url", "")
return ""
def get_title() -> str:
"""获取当前页面标题"""
success, result = run_browser(["get", "title", "--json"], check=False)
if success and result.get("data"):
return result["data"].get("title", "")
return ""
def find_ref(elements: dict, role_pat: str = None, name_pat: str = None) -> str:
"""根据 role 和 name 模式查找元素 ref"""
for ref, info in elements.items():
role = info.get("role", "")
name = info.get("name", "")
if role_pat and not re.match(role_pat, role, re.I):
continue
if name_pat and not re.search(name_pat, name, re.I):
continue
return ref
return None
def find_refs(elements: dict, role_pat: str = None, name_pat: str = None) -> list:
"""查找所有匹配的元素 refs"""
results = []
for ref, info in elements.items():
role = info.get("role", "")
name = info.get("name", "")
if role_pat and not re.match(role_pat, role, re.I):
continue
if name_pat and not re.search(name_pat, name, re.I):
continue
results.append(ref)
return results
def wait_load(seconds: float = 2):
"""等待页面稳定"""
time.sleep(seconds)
def take_screenshot(name: str) -> str:
"""截图并返回保存路径"""
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
path = f"{SCREENSHOT_DIR}/{name}.png"
success, _ = run_browser(["screenshot", path], timeout=15, check=False)
if success:
return path
return ""
def close_session():
"""关闭浏览器会话"""
run_browser(["close"], check=False)
def cleanup_all():
"""清理所有会话"""
subprocess.run([AGENT_BROWSER, "close", "--all"],
capture_output=True, timeout=10)
def check_page_errors() -> list:
"""检查页面是否有错误(仅检测真正的 Django error alert"""
elements = get_snapshot()
errors = []
# 只检查 role=alert 或 role=alertdialogDjango 错误提示的标准 role
for ref, info in elements.items():
role = info.get("role", "")
name = info.get("name", "")
if role in ("alert", "alertdialog"):
# Django error alerts contain error messages
errors.append(f"[alert] {name[:80]}")
# 也检查 title/heading 中的 Django 典型错误关键字
if role == "heading":
if any(kw in name.lower() for kw in ["500", "internal server error", "template does not exist"]):
errors.append(f"[heading] {name[:80]}")
return errors
def login() -> bool:
"""
使用 agent-browser 直接操作 Django Admin 登录页。
"""
log("正在通过 agent-browser 登录 Django Admin...", "🔐")
login_url = f"{ADMIN_URL}login/"
# Step 1: 打开登录页
success, _ = run_browser(["open", login_url])
if not success:
log("无法打开登录页", "")
return False
wait_load(2)
# Step 2: 获取元素快照
elements = get_snapshot()
if not elements:
log("登录页无交互元素", "")
return False
# 如果已经在 admin 首页,说明已登录
url = get_url()
if "/admin/login" not in url:
log(f"已在登录状态: {url}", "")
run_browser(["state", "save", STATE_FILE], check=False)
return True
# Step 3: 找用户名/密码输入框Django admin 中文界面)
all_textboxes = {r: i for r, i in elements.items() if "textbox" in i.get("role", "").lower()}
username_ref = None
password_ref = None
for ref, info in all_textboxes.items():
name = info.get("name", "") # e.g. "用户名:" 或 "密码:"
if not username_ref and ("用户" in name or "user" in name.lower()):
username_ref = ref
elif not password_ref and ("密码" in name or "pass" in name.lower()):
password_ref = ref
# fallback: 按顺序取前两个 textbox
tb_refs = list(all_textboxes.keys())
if not username_ref and len(tb_refs) >= 1:
username_ref = tb_refs[0]
if not password_ref and len(tb_refs) >= 2:
password_ref = tb_refs[1]
if not username_ref:
log(f"未找到用户名输入框,可用: {all_textboxes}", "")
return False
if not password_ref:
log(f"未找到密码输入框,可用: {all_textboxes}", "")
return False
log(f" 找到用户框: {username_ref} | 密码框: {password_ref}", "🔍")
# Step 4: 找提交按钮(中文 Django admin 按钮文字是"登录"
submit_ref = find_ref(elements, "button", "登录|log|sign|submit")
if not submit_ref:
submit_ref = find_refs(elements, "button")[0] if find_refs(elements, "button") else None
if not submit_ref:
log("未找到提交按钮", "")
return False
log(f" 找到提交按钮: {submit_ref}", "🔍")
# Step 5: 填表
log(f" 填入用户名...", "✏️")
run_browser(["fill", username_ref, USERNAME])
wait_load(0.5)
log(f" 填入密码...", "✏️")
run_browser(["fill", password_ref, PASSWORD])
wait_load(0.5)
log(" 点击登录...", "✏️")
run_browser(["click", submit_ref])
wait_load(3) # Django 重定向需要时间
# Step 6: 验证
url = get_url()
if "/admin/" in url and "login" not in url.lower():
log(f"登录成功,当前页面: {url}", "")
run_browser(["state", "save", STATE_FILE], check=False)
log("认证状态已保存", "💾")
return True
else:
log(f"登录后 URL 不对: {url}", "")
# 打印页面内容帮助调试
elements = get_snapshot()
errors = [info.get("name", "") for ref, info in elements.items()
if any(k in info.get("name", "").lower() for k in ["error", "invalid", "incorrect"])]
if errors:
log(f" 页面错误提示: {errors[:3]}", "")
return False
def ensure_logged_in(force: bool = False) -> bool:
"""确保已登录,可选强制重新登录"""
if force or not os.path.exists(STATE_FILE):
return login()
# 尝试加载缓存状态
success, _ = run_browser(["state", "load", STATE_FILE], check=False)
if success:
log("已加载缓存的认证状态", "📂")
# 强制打开一个新页面验证state load 后需要重新 open
run_browser(["open", ADMIN_URL])
wait_load(2)
url = get_url()
title = get_title()
if "/admin/" in url and "login" not in url.lower():
log(f"会话验证成功: {url} | {title[:30]}", "")
return True
log(f"缓存已过期 (url={url}),将重新登录", "🔄")
return login()
# ========== 单页测试函数 ==========
def build_url(path: str) -> str:
"""正确拼接页面 URL避免双重 /admin/ /api/ 前缀"""
path = path.lstrip("/")
# 已包含完整 URL以 http 开头)
if path.startswith("http"):
return path
# 已经是完整路径(如 admin/login/、api/sessions/
if path.startswith("admin/") or path.startswith("api/"):
return ADMIN_URL.rstrip("/") + "/" + path
# 纯路径片段
return ADMIN_URL + path
def test_page(path: str, desc: str, requires_login: bool) -> dict:
"""测试单个页面,返回测试结果 dict"""
url = build_url(path)
result = {
"path": path,
"desc": desc,
"url": url,
"status": "skip",
"http_code": None,
"errors": [],
"title": "",
"elements_count": 0,
"screenshot": "",
"timestamp": datetime.now().isoformat(),
}
# 如果需要登录但未登录,跳过
if requires_login:
if not ensure_logged_in():
result["status"] = "skip_login_failed"
return result
log(f"测试: {desc} ({url})", "🌐")
# 打开页面
success, _ = run_browser(["open", url])
if not success:
result["status"] = "nav_failed"
return result
wait_load(2)
# 截图
screenshot_name = path.strip("/").replace("/", "_").replace("-", "_")
screenshot_name = f"{datetime.now().strftime('%H%M%S')}_{screenshot_name}"
result["screenshot"] = take_screenshot(screenshot_name)
# 获取页面信息
result["url"] = get_url()
result["title"] = get_title()
elements = get_snapshot()
result["elements_count"] = len(elements)
# 检查 HTTP 错误(通过 URL 重定向判断)
if "login" in result["url"].lower() and path != "/admin/login/":
result["status"] = "redirected_to_login"
result["errors"].append("需要登录但未认证")
return result
if result["url"].startswith(BASE_URL + "/admin/login"):
result["status"] = "login_page"
return result
# 检查页面错误
errors = check_page_errors()
if errors:
result["status"] = "page_error"
result["errors"] = errors
log(f" 页面有错误: {errors[:2]}", "")
return result
# 检查 Django 500 错误
elements_text = " ".join(v.get("name", "") for v in elements.values())
if any(kw in elements_text for kw in ["500", "Internal Server Error", "TemplateSyntaxError", "Invalid filter"]):
result["status"] = "server_error"
result["errors"].append("Django 500 or template error detected")
log(f" 检测到服务器错误", "")
return result
# 基本检查
if result["elements_count"] < 3:
result["status"] = "page_empty"
result["errors"].append("页面元素过少,可能加载失败")
log(f" 页面元素过少: {result['elements_count']}", "⚠️")
return result
result["status"] = "pass"
log(f" ✅ OK | 元素数: {result['elements_count']} | 标题: {result['title'][:40]}", "")
return result
def test_admin_index() -> dict:
"""测试 Admin 首页"""
result = test_page("openclaw/", "Admin 首页概览", True)
# Admin 首页重定向到 auth/user/,特殊处理
elements = get_snapshot()
if result["status"] == "pass":
# 检查是否有模型列表
model_links = find_refs(elements, "link")
result["models_found"] = len(model_links)
if len(model_links) > 0:
log(f" 发现 {len(model_links)} 个模型链接", "📋")
return result
def test_session_list() -> dict:
"""测试 Session 列表页"""
result = test_page("openclaw/session/", "Session 列表", True)
if result["status"] == "pass":
elements = get_snapshot()
# 检查是否有表格
has_table = find_ref(elements, "table") is not None
has_search = find_ref(elements, "searchbox") is not None
result["has_table"] = has_table
result["has_search"] = has_search
log(f" 表格: {has_table} | 搜索: {has_search}")
return result
def test_message_list() -> dict:
"""测试 Message 列表页"""
return test_page("openclaw/message/", "Message 列表", True)
def test_toolcall_list() -> dict:
"""测试 ToolCall 列表页"""
return test_page("openclaw/toolcall/", "ToolCall 列表", True)
def test_daily_reports_list() -> dict:
"""测试日报列表页"""
result = test_page("daily-reports/", "日报列表", True)
if result["status"] == "pass":
elements = get_snapshot()
# 检查是否有数据(链接或表格)
links = find_refs(elements, None, "xingjiang|2026|report|daily")
tables = find_refs(elements, "table")
result["links_found"] = len(links)
result["tables_found"] = len(tables)
log(f" 链接数: {len(links)} | 表格数: {len(tables)}")
return result
def test_daily_report_detail(agent: str, year: int, month: int, day: int) -> dict:
"""测试指定日期的日报详情页"""
path = f"daily-reports/{agent}/{year}-{month}-{day}/"
desc = f"日报详情-{agent}-{year}-{month:02d}-{day:02d}"
result = test_page(path, desc, True)
if result["status"] == "pass":
elements = get_snapshot()
# 检查关键数据
has_heading = find_ref(elements, "heading") is not None
has_session = bool(find_ref(elements, None, "session"))
has_tokens = bool(find_ref(elements, None, "token"))
result["has_heading"] = has_heading
result["has_session"] = has_session
result["has_tokens"] = has_tokens
log(f" 标题: {has_heading} | Session: {has_session} | Tokens: {has_tokens}")
return result
def test_login_page() -> dict:
"""测试登录页 - 先强制刷新(清除已登录状态)再打开"""
# 强制重新打开登录页(清除可能已登录的会话状态)
login_url = build_url("/admin/login/")
result = {
"path": "/admin/login/",
"desc": "登录页",
"url": login_url,
"status": "skip",
"http_code": None,
"errors": [],
"title": "",
"elements_count": 0,
"screenshot": "",
"timestamp": datetime.now().isoformat(),
}
# 直接打开登录页(不清除全局会话,仅此处处理)
# 如果已经登录Django 会重定向到 /admin/,这种情况记录为"已登录"
success, _ = run_browser(["open", login_url])
if not success:
result["status"] = "nav_failed"
return result
wait_load(2)
# 截图
result["screenshot"] = take_screenshot(f"login_page_{datetime.now().strftime('%H%M%S')}")
result["url"] = get_url()
result["title"] = get_title()
elements = get_snapshot()
result["elements_count"] = len(elements)
# 如果已经登录URL 跳转到 admin index记录为 pass 但注明"已登录"
if "/admin/login" not in result["url"] and "/admin/" in result["url"]:
result["status"] = "pass"
result["errors"].append("用户已登录,登录页重定向到 admin index")
log(f" 当前已登录,登录页自动跳转至: {result['url']}", "")
return result
# 正常登录页检查
has_user = find_ref(elements, "textbox") is not None
has_pass = (
find_ref(elements, "textbox", "pass") is not None or
len(find_refs(elements, "textbox")) > 1
)
has_button = find_ref(elements, "button") is not None
result["has_username_field"] = has_user
result["has_password_field"] = has_pass
result["has_submit_button"] = has_button
if has_user and has_pass and has_button:
result["status"] = "pass"
log(f" ✅ 登录页正常 | 用户: {has_user} | 密码: {has_pass} | 按钮: {has_button}")
else:
result["status"] = "page_error"
result["errors"].append(f"缺少表单元素: user={has_user}, pass={has_pass}, btn={has_button}")
log(f" ❌ 登录页表单不完整", "")
return result
if result["status"] in ("pass", "login_page"):
result["status"] = "pass"
elements = get_snapshot()
has_user = find_ref(elements, "textbox") is not None
has_pass = find_ref(elements, "password") is not None
has_button = find_ref(elements, "button") is not None
result["has_username_field"] = has_user
result["has_password_field"] = has_pass
result["has_submit_button"] = has_button
log(f" 用户名框: {has_user} | 密码框: {has_pass} | 按钮: {has_button}")
return result
# ========== 主测试套件 ==========
def run_full_test_suite() -> list:
"""运行完整测试套件"""
results = []
log("=" * 60, "")
log("开始全页面遍历测试", "🚀")
log("=" * 60, "")
# 1. 登录页测试
results.append(test_login_page())
# 2. Admin 首页
results.append(test_admin_index())
# 3. Session 管理
results.append(test_session_list())
# 4. Message 管理
results.append(test_message_list())
# 5. ToolCall 管理
results.append(test_toolcall_list())
# 6. 日报列表
results.append(test_daily_reports_list())
# 7. 日报详情(多个日期)
results.append(test_daily_report_detail("xingjiang", 2026, 4, 7))
results.append(test_daily_report_detail("xingjiang", 2026, 4, 8))
# 8. API 页面(检查是否返回 JSON 或有响应)
results.append(test_page("/api/sessions/bulk_upsert/", "API Bulk Upsert", True))
return results
# ========== 报告输出 ==========
def print_report(results: list):
"""打印测试报告"""
passed = sum(1 for r in results if r["status"] == "pass")
failed = len(results) - passed
print("\n" + "=" * 70)
print("🧪 全页面遍历测试报告 - agent-base")
print("=" * 70)
print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"总计: {len(results)} | ✅ 通过: {passed} | ❌ 失败: {failed} | 成功率: {passed/len(results)*100:.0f}%")
print("-" * 70)
for r in results:
status_icon = "" if r["status"] == "pass" else ""
print(f"\n{status_icon} [{r['desc']}]")
print(f" URL: {r['url']}")
if r["status"] != "pass":
print(f" 状态: {r['status']}")
if r["errors"]:
print(f" 错误: {r['errors']}")
else:
print(f" 标题: {r['title'][:50]}")
print(f" 元素数: {r['elements_count']}")
if r["screenshot"]:
print(f" 截图: {r['screenshot']}")
print("\n" + "=" * 70)
# 详细失败列表
if failed > 0:
print("\n❌ 失败页面:")
for r in results:
if r["status"] != "pass":
print(f" - {r['desc']}: {r['status']} | {r['errors']}")
# 截图路径
if os.path.exists(SCREENSHOT_DIR):
print(f"\n📸 截图目录: {SCREENSHOT_DIR}")
for f in sorted(os.listdir(SCREENSHOT_DIR)):
if f.endswith(".png"):
full = os.path.join(SCREENSHOT_DIR, f)
size = os.path.getsize(full) / 1024
print(f" - {f} ({size:.0f} KB)")
print("=" * 70)
return failed == 0
def save_report(results: list, path: str):
"""保存 JSON 报告"""
with open(path, "w", encoding="utf-8") as f:
json.dump({
"generated_at": datetime.now().isoformat(),
"base_url": BASE_URL,
"summary": {
"total": len(results),
"passed": sum(1 for r in results if r["status"] == "pass"),
"failed": sum(1 for r in results if r["status"] != "pass"),
},
"results": results,
}, f, ensure_ascii=False, indent=2)
log(f"报告已保存: {path}", "💾")
# ========== CLI 入口 ==========
def main():
parser = argparse.ArgumentParser(description="agent-base 全页面遍历测试")
parser.add_argument("--login", action="store_true", help="强制重新登录(不使用缓存)")
parser.add_argument("--screenshot", action="store_true", help="仅截图模式")
parser.add_argument("--page", type=str, help="测试指定页面路径")
parser.add_argument("--report", type=str, default="/tmp/agent_base_test_report.json", help="报告保存路径")
parser.add_argument("--session", type=str, default=SESSION_NAME, help="浏览器会话名")
args = parser.parse_args()
# No global mutation needed — keep module-level SESSION_NAME
try:
# 清理旧会话
cleanup_all()
time.sleep(1)
if args.page:
# 单页测试
path = args.page
desc = f"单页测试: {path}"
result = test_page(path, desc, requires_login=True)
print_report([result])
return
if args.screenshot:
# 截图模式
ensure_logged_in(force=args.login)
take_screenshot("manual_screenshot")
log(f"截图已保存: {SCREENSHOT_DIR}")
return
# 完整测试套件
results = run_full_test_suite()
save_report(results, args.report)
all_passed = print_report(results)
# 关闭会话
close_session()
sys.exit(0 if all_passed else 1)
except KeyboardInterrupt:
log("测试被用户中断", "⚠️")
close_session()
sys.exit(130)
except Exception as e:
log(f"测试异常: {e}", "")
import traceback
traceback.print_exc()
close_session()
sys.exit(1)
if __name__ == "__main__":
main()

835
scripts/test_e2e_agent_browser.py Executable file
View File

@@ -0,0 +1,835 @@
#!/usr/bin/env python3
"""
End-to-End Test Script for agent-base Django Project
Uses agent-browser CLI for browser automation
Usage: python3 test_e2e_agent_browser.py
Environment Variables:
AGENT_BROWSER_EXECUTABLE_PATH: Path to Chrome/Chromium binary
(Defaults to /Applications/Google Chrome.app/Contents/MacOS/Google Chrome on macOS)
"""
import subprocess
import json
import sys
import time
import os
import re
import uuid
import platform
from datetime import datetime
# Detect agent-browser path based on OS
def get_agent_browser_path():
"""Get agent-browser executable path based on the system."""
env_path = os.environ.get("AGENT_BROWSER_PATH")
if env_path and os.path.exists(env_path):
return env_path
system = platform.system()
if system == "Darwin":
path = "/opt/homebrew/bin/agent-browser"
if os.path.exists(path):
return path
elif system == "Linux":
for path in [
"/home/shenwei/.npm-global/bin/agent-browser",
"/usr/local/bin/agent-browser",
"/usr/bin/agent-browser",
]:
if os.path.exists(path):
return path
for cmd in ["agent-browser"]:
try:
result = subprocess.run(["which", cmd], capture_output=True, text=True, timeout=5)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except:
pass
return "agent-browser"
def get_chrome_path():
"""Get Chrome executable path based on the system."""
if os.environ.get("AGENT_BROWSER_EXECUTABLE_PATH"):
return os.environ["AGENT_BROWSER_EXECUTABLE_PATH"]
system = platform.system()
if system == "Darwin":
mac_chrome = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"
if os.path.exists(mac_chrome):
return mac_chrome
elif system == "Linux":
for path in [
"/usr/bin/google-chrome",
"/usr/bin/chromium-browser",
"/usr/bin/chromium",
"/snap/bin/chromium",
os.path.expanduser("~/.local/bin/chromium"),
]:
if os.path.exists(path):
return path
return ""
# Initialize paths
AGENT_BROWSER = get_agent_browser_path()
CHROME_PATH = get_chrome_path()
if CHROME_PATH:
os.environ["AGENT_BROWSER_EXECUTABLE_PATH"] = CHROME_PATH
print(f"Using agent-browser: {AGENT_BROWSER}", file=sys.stderr)
if CHROME_PATH:
print(f"Using Chrome: {CHROME_PATH}", file=sys.stderr)
else:
print("Chrome: auto-detect (ensure agent-browser install completed)", file=sys.stderr)
# Configuration
BASE_URL = "http://192.168.3.45:8765"
ADMIN_URL = f"{BASE_URL}/admin/"
USERNAME = "admin"
PASSWORD = "admin123"
SESSION_NAME = "agent-base-e2e-test"
STATE_FILE = "/tmp/agent-browser-auth-state.json"
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) if os.path.abspath(__file__).startswith('/tmp') else "/home/shenwei/docker/agent-base/scripts"
PROJECT_DIR = "/home/shenwei/docker/agent-base"
# Test results tracking
test_results = {
"started_at": datetime.now().isoformat(),
"tests": [],
"passed": 0,
"failed": 0,
"errors": []
}
def log(msg):
"""Print log message with timestamp"""
ts = datetime.now().strftime("%H:%M:%S")
print(f"[{ts}] {msg}", flush=True)
def log_step(step_num, msg):
"""Print step with number"""
log(f"[Step {step_num}] {msg}")
def run_agent_browser(args, timeout=30, check=True):
"""
Run agent-browser command and return JSON result.
Args:
args: List of command arguments (e.g., ['open', 'https://example.com'])
timeout: Command timeout in seconds
check: Whether to raise exception on non-zero exit
Returns:
tuple: (success: bool, result: dict or error message)
"""
cmd = [AGENT_BROWSER, "--session", SESSION_NAME, "--json"]
cmd.extend(args)
log(f" Running: agent-browser {' '.join(args)}")
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout
)
if result.stdout:
try:
return True, json.loads(result.stdout)
except json.JSONDecodeError:
return True, {"raw": result.stdout}
if result.returncode != 0 and check:
log(f" ERROR: exit code {result.returncode}")
log(f" stderr: {result.stderr[:500]}")
return False, result.stderr
return result.returncode == 0, {}
except subprocess.TimeoutExpired:
log(f" TIMEOUT after {timeout}s")
return False, "Command timeout"
except Exception as e:
log(f" EXCEPTION: {e}")
return False, str(e)
def wait_for_page_load(timeout=10):
"""Wait for page to finish loading"""
time.sleep(2) # Give page time to settle
def get_interactive_elements():
"""Get snapshot of interactive elements from current page"""
success, result = run_agent_browser(["snapshot", "-i", "--json"], timeout=15, check=False)
if success and "data" in result:
return result["data"].get("refs", {})
return {}
def find_element_by_role(elements, role_pattern, name_pattern=None):
"""Find element by role pattern, optionally with name pattern"""
for ref, info in elements.items():
role = info.get("role", "")
name = info.get("name", "")
if re.match(role_pattern, role, re.IGNORECASE):
if name_pattern is None or re.search(name_pattern, name, re.IGNORECASE):
return ref
return None
def find_element_by_text(elements, text_pattern, role_pattern=None):
"""Find element containing text"""
for ref, info in elements.items():
role = info.get("role", "")
name = info.get("name", "")
if re.search(text_pattern, name, re.IGNORECASE):
if role_pattern is None or re.match(role_pattern, role, re.IGNORECASE):
return ref
return None
def record_test(name, passed, error=None, details=None):
"""Record test result"""
test_entry = {
"name": name,
"passed": passed,
"timestamp": datetime.now().isoformat()
}
if error:
test_entry["error"] = error
if details:
test_entry["details"] = details
test_results["tests"].append(test_entry)
if passed:
test_results["passed"] += 1
log(f" ✓ PASSED: {name}")
else:
test_results["failed"] += 1
test_results["errors"].append(f"{name}: {error}")
log(f" ✗ FAILED: {name} - {error}")
def test_admin_login():
"""Test 1: Admin Login"""
log_step(1, "Testing Admin Login")
# Navigate to admin login
success, _ = run_agent_browser(["open", f"{ADMIN_URL}login/?next=/admin/"])
if not success:
record_test("Admin Login - Navigate", False, "Failed to open login page")
return False
wait_for_page_load()
# Get interactive elements
elements = get_interactive_elements()
if not elements:
record_test("Admin Login - Get Elements", False, "No interactive elements found")
return False
# Find username field
username_ref = find_element_by_role(elements, "textbox", ".*user.*") or \
find_element_by_text(elements, "user", "textbox") or \
find_element_by_role(elements, "textbox")
if not username_ref:
# Try to find by id
for ref, info in elements.items():
if "username" in info.get("name", "").lower() or "id_username" in str(info):
username_ref = ref
break
if not username_ref:
# List all textboxes for debugging
textboxes = {k: v for k, v in elements.items() if "textbox" in v.get("role", "").lower()}
log(f" Available textboxes: {textboxes}")
record_test("Admin Login - Find Username Field", False, "Could not find username field")
return False
# Fill username
success, _ = run_agent_browser(["fill", username_ref, USERNAME])
if not success:
record_test("Admin Login - Fill Username", False, "Failed to fill username")
return False
# Find password field (role=textbox with 密码 in name on Django admin)
password_ref = find_element_by_role(elements, "password")
if not password_ref:
# Try by name - Django admin uses textbox role with "密码:" label
for ref, info in elements.items():
if "密码" in info.get("name", "") or "password" in info.get("name", "").lower():
password_ref = ref
break
if not password_ref:
record_test("Admin Login - Find Password Field", False, "Could not find password field")
return False
# Fill password
success, _ = run_agent_browser(["fill", password_ref, PASSWORD])
if not success:
record_test("Admin Login - Fill Password", False, "Failed to fill password")
return False
# Find and click submit button
submit_ref = find_element_by_text(elements, "log|signin|submit|登|登录", "button") or \
find_element_by_role(elements, "button", ".*")
if not submit_ref:
record_test("Admin Login - Find Submit Button", False, "Could not find submit button")
return False
success, _ = run_agent_browser(["click", submit_ref])
if not success:
record_test("Admin Login - Click Submit", False, "Failed to click submit")
return False
wait_for_page_load()
# Check if login was successful (should be at /admin/ now)
success, result = run_agent_browser(["get", "url", "--json"], check=False)
current_url = ""
if success and result.get("data"):
current_url = result["data"].get("url", "")
log(f" Current URL after login: {current_url}")
if "/admin/" in current_url and "login" not in current_url.lower():
record_test("Admin Login", True, details=f"Logged in successfully, at {current_url}")
# Save auth state for subsequent tests
success, _ = run_agent_browser(["state", "save", STATE_FILE], check=False)
if success:
log(" Auth state saved for subsequent tests")
return True
else:
# Check for error message
elements = get_interactive_elements()
error_text = find_element_by_text(elements, "error|invalid|错误|无效")
record_test("Admin Login", False, f"Login failed, URL: {current_url}, error_element: {error_text}")
return False
def test_session_management():
"""Test 2: Session Management"""
log_step(2, "Testing Session Management")
# Load saved auth state if available
if os.path.exists(STATE_FILE):
run_agent_browser(["state", "load", STATE_FILE], check=False)
# Navigate to session admin
success, _ = run_agent_browser(["open", f"{ADMIN_URL}openclaw/session/"])
if not success:
record_test("Session Management - Navigate", False, "Failed to navigate to session admin")
return False
wait_for_page_load()
# Get page title
success, result = run_agent_browser(["get", "title", "--json"], check=False)
title = ""
if success and result.get("data"):
title = result["data"].get("title", "")
log(f" Page title: {title}")
# Get elements
elements = get_interactive_elements()
# Check for session list - Django admin uses columnheader/cell roles not <table>
has_table = find_element_by_role(elements, "table") is not None
has_columnheaders = find_element_by_role(elements, "columnheader") is not None
has_cells = find_element_by_role(elements, "cell") is not None
has_rowheaders = find_element_by_role(elements, "rowheader") is not None
if has_table or has_columnheaders or has_cells:
record_test("Session Management - List View", True,
details=f"Table:{has_table}, Headers:{has_columnheaders}, Cells:{has_cells}, RowHeaders:{has_rowheaders}")
else:
record_test("Session Management - List View", False, "No table/list found on session list page")
return False
# Test search functionality
search_ref = find_element_by_role(elements, "searchbox") or \
find_element_by_text(elements, "search", "textbox") or \
find_element_by_role(elements, "textbox")
if search_ref:
success, _ = run_agent_browser(["fill", search_ref, "test"], check=False)
if success:
# Try pressing Enter to submit search
run_agent_browser(["press", "Enter"], check=False)
wait_for_page_load()
record_test("Session Management - Search", True)
else:
record_test("Session Management - Search", False, "Search field found but fill failed")
else:
record_test("Session Management - Search", False, "No search field found")
# Test filter functionality
elements = get_interactive_elements()
filter_ref = find_element_by_role(elements, "combobox") or \
find_element_by_text(elements, "filter|筛选", "button") or \
find_element_by_text(elements, "all|全部", "button")
if filter_ref:
record_test("Session Management - Filter", True, details="Filter controls found")
else:
record_test("Session Management - Filter", False, "No filter controls found")
return True
def test_message_management():
"""Test 3: Message Management"""
log_step(3, "Testing Message Management")
# Load saved auth state
if os.path.exists(STATE_FILE):
run_agent_browser(["state", "load", STATE_FILE], check=False)
# Navigate to message admin
success, _ = run_agent_browser(["open", f"{ADMIN_URL}openclaw/message/"])
if not success:
record_test("Message Management - Navigate", False, "Failed to navigate to message admin")
return False
wait_for_page_load()
# Get page title
success, result = run_agent_browser(["get", "title", "--json"], check=False)
title = ""
if success and result.get("data"):
title = result["data"].get("title", "")
log(f" Page title: {title}")
# Get elements
elements = get_interactive_elements()
# Check for message list - Django admin uses columnheader/cell roles
has_table = find_element_by_role(elements, "table") is not None
has_columnheaders = find_element_by_role(elements, "columnheader") is not None
has_cells = find_element_by_role(elements, "cell") is not None
has_rowheaders = find_element_by_role(elements, "rowheader") is not None
if has_table or has_columnheaders or has_cells:
record_test("Message Management - List View", True,
details=f"Table:{has_table}, Headers:{has_columnheaders}, Cells:{has_cells}, RowHeaders:{has_rowheaders}")
else:
record_test("Message Management - List View", False, "No list found on message list page")
return False
# Test search functionality
search_ref = find_element_by_role(elements, "searchbox") or \
find_element_by_text(elements, "search", "textbox") or \
find_element_by_role(elements, "textbox")
if search_ref:
success, _ = run_agent_browser(["fill", search_ref, "hello"], check=False)
if success:
run_agent_browser(["press", "Enter"], check=False)
wait_for_page_load()
record_test("Message Management - Search", True)
else:
record_test("Message Management - Search", False, "Search fill failed")
else:
record_test("Message Management - Search", False, "No search field found")
# Test filter functionality
elements = get_interactive_elements()
filter_ref = find_element_by_role(elements, "combobox") or \
find_element_by_text(elements, "filter|筛选", "button")
if filter_ref:
record_test("Message Management - Filter", True)
else:
record_test("Message Management - Filter", False, "No filter controls found")
return True
def test_toolcall_management():
"""Test 4: ToolCall Management"""
log_step(4, "Testing ToolCall Management")
# Load saved auth state
if os.path.exists(STATE_FILE):
run_agent_browser(["state", "load", STATE_FILE], check=False)
# Navigate to toolcall admin
success, _ = run_agent_browser(["open", f"{ADMIN_URL}openclaw/toolcall/"])
if not success:
record_test("ToolCall Management - Navigate", False, "Failed to navigate to toolcall admin")
return False
wait_for_page_load()
# Get page title
success, result = run_agent_browser(["get", "title", "--json"], check=False)
title = ""
if success and result.get("data"):
title = result["data"].get("title", "")
log(f" Page title: {title}")
# Get elements
elements = get_interactive_elements()
# Check for toolcall list - Django admin uses columnheader/cell roles
has_table = find_element_by_role(elements, "table") is not None
has_columnheaders = find_element_by_role(elements, "columnheader") is not None
has_cells = find_element_by_role(elements, "cell") is not None
has_rowheaders = find_element_by_role(elements, "rowheader") is not None
if has_table or has_columnheaders or has_cells:
record_test("ToolCall Management - List View", True,
details=f"Table:{has_table}, Headers:{has_columnheaders}, Cells:{has_cells}, RowHeaders:{has_rowheaders}")
else:
record_test("ToolCall Management - List View", False, "No list found on toolcall list page")
return False
# Test filter functionality
filter_ref = find_element_by_role(elements, "combobox") or \
find_element_by_text(elements, "filter|筛选", "button")
if filter_ref:
record_test("ToolCall Management - Filter", True)
else:
record_test("ToolCall Management - Filter", False, "No filter controls found")
return True
def test_daily_reports_list():
"""Test 5: Daily Reports List"""
log_step(5, "Testing Daily Reports List")
# Load saved auth state
if os.path.exists(STATE_FILE):
run_agent_browser(["state", "load", STATE_FILE], check=False)
# Navigate to daily reports
success, _ = run_agent_browser(["open", f"{ADMIN_URL}daily-reports/"])
if not success:
record_test("Daily Reports - Navigate", False, "Failed to navigate to daily reports")
return False
wait_for_page_load()
# Get page title
success, result = run_agent_browser(["get", "title", "--json"], check=False)
title = ""
if success and result.get("data"):
title = result["data"].get("title", "")
log(f" Page title: {title}")
# Get elements
elements = get_interactive_elements()
# Check for reports list
has_table = find_element_by_role(elements, "table") is not None
has_columnheaders = find_element_by_role(elements, "columnheader") is not None
has_cells = find_element_by_role(elements, "cell") is not None
has_rowheaders = find_element_by_role(elements, "rowheader") is not None
has_links = find_element_by_text(elements, "xingjiang|report|daily", "link") is not None
if has_table or has_columnheaders or has_cells or has_rowheaders or has_links:
record_test("Daily Reports - List View", True,
details=f"Table:{has_table}, Headers:{has_columnheaders}, Cells:{has_cells}, Links:{has_links}")
else:
record_test("Daily Reports - List View", False, "No reports found on page")
return False
# Try to find report links (for xingjiang or date-based links)
# The daily reports page may show dates or agent names as links
report_link = find_element_by_text(elements, "xingjiang|2026|report", "link")
if report_link:
record_test("Daily Reports - Report Links Present", True, details=f"Found report link: {report_link}")
else:
# Also check for any date/agent links
any_link = find_element_by_role(elements, "link")
if any_link:
# Found some links, the page is working
record_test("Daily Reports - Report Links Present", True, details=f"Page has links (link ref: {any_link})")
else:
record_test("Daily Reports - Report Links Present", False, "No links found on page")
return True
def test_daily_reports_detail():
"""Test 6: Daily Reports Detail"""
log_step(6, "Testing Daily Reports Detail")
# Load saved auth state
if os.path.exists(STATE_FILE):
run_agent_browser(["state", "load", STATE_FILE], check=False)
# Navigate directly to a specific daily report
detail_url = f"{ADMIN_URL}daily-reports/xingjiang/2026-4-8/"
success, _ = run_agent_browser(["open", detail_url])
if not success:
record_test("Daily Reports Detail - Navigate", False, "Failed to navigate to daily report detail")
return False
wait_for_page_load()
# Get page title
success, result = run_agent_browser(["get", "title", "--json"], check=False)
title = ""
if success and result.get("data"):
title = result["data"].get("title", "")
log(f" Page title: {title}")
# Check URL
success, result = run_agent_browser(["get", "url", "--json"], check=False)
current_url = ""
if success and result.get("data"):
current_url = result["data"].get("url", "")
log(f" Current URL: {current_url}")
if "daily-reports" in current_url and "xingjiang" in current_url:
record_test("Daily Reports Detail - URL Valid", True, details=f"URL: {current_url}")
else:
record_test("Daily Reports Detail - URL Valid", False, f"Unexpected URL: {current_url}")
return False
# Get elements
elements = get_interactive_elements()
# Check for content
has_heading = find_element_by_role(elements, "heading") is not None
has_content = len(elements) > 5 # More than just navigation
if has_heading or has_content:
record_test("Daily Reports Detail - Content Present", True)
else:
record_test("Daily Reports Detail - Content Present", False, "No content found on detail page")
return False
return True
def test_bulk_upsert_api():
"""Test 7: Bulk Upsert API"""
log_step(7, "Testing Bulk Upsert API")
# Load saved auth state for CSRF token
if os.path.exists(STATE_FILE):
run_agent_browser(["state", "load", STATE_FILE], check=False)
# First, get a CSRF token by visiting the API or any admin page
success, _ = run_agent_browser(["open", BASE_URL], check=False)
wait_for_page_load()
# Get cookies for CSRF
success, result = run_agent_browser(["cookies"], check=False)
csrf_token = ""
cookies = ""
if success and result.get("data"):
cookies = result["data"]
for cookie in cookies if isinstance(cookies, list) else []:
if cookie.get("name") == "csrftoken":
csrf_token = cookie.get("value", "")
break
log(f" CSRF token obtained: {'yes' if csrf_token else 'no (will try without)'}")
# Test the API with curl (more reliable for API testing)
# Use unique session_id to avoid unique constraint conflicts
unique_session_id = f"test-e2e-{uuid.uuid4().hex[:12]}"
test_payload = {
"agent_name": "xingjiang",
"source_node": "telegram",
"sessions": [
{
"session_id": unique_session_id,
"start_time": "2026-04-08T12:00:00Z",
"end_time": "2026-04-08T13:00:00Z",
"message_count": 10,
"tool_call_count": 5,
"total_tokens": 1000,
"total_cost": 0.05
}
]
}
# Build curl command
curl_cmd = [
"curl", "-s", "-X", "POST",
f"{BASE_URL}/api/sessions/bulk_upsert/",
"-H", "Content-Type: application/json",
"-d", json.dumps(test_payload)
]
if csrf_token:
curl_cmd.extend(["-H", f"X-CSRFToken: {csrf_token}"])
log(f" Testing API: POST /api/sessions/bulk_upsert/")
try:
result = subprocess.run(
curl_cmd,
capture_output=True,
text=True,
timeout=15
)
response = result.stdout
log(f" API Response: {response[:500]}")
# Try to parse response
try:
resp_data = json.loads(response)
if resp_data.get("success") or "created" in str(resp_data).lower() or "upserted" in str(resp_data).lower():
record_test("Bulk Upsert API - Create Session", True, details=f"Response: {response[:200]}")
elif resp_data.get("error"):
# Some errors are expected (like missing fields)
if "missing" in resp_data.get("error", "").lower():
record_test("Bulk Upsert API - Validation", True, details=f"Expected validation: {resp_data.get('error')}")
else:
record_test("Bulk Upsert API - Response", True, details=f"Response: {response[:200]}")
else:
record_test("Bulk Upsert API - Response", True, details=f"Response: {response[:200]}")
except json.JSONDecodeError:
if result.returncode == 0:
record_test("Bulk Upsert API - Endpoint Accessible", True, details=f"Status: {result.returncode}")
else:
record_test("Bulk Upsert API - Response", False, f"Non-JSON response: {response[:200]}")
except subprocess.TimeoutExpired:
record_test("Bulk Upsert API - Timeout", False, "API request timed out")
except Exception as e:
record_test("Bulk Upsert API - Error", False, str(e))
# Also test the API via browser to ensure it works with session auth
success, _ = run_agent_browser(["open", f"{BASE_URL}/api/sessions/bulk_upsert/"], check=False)
wait_for_page_load()
success, result = run_agent_browser(["get", "url", "--json"], check=False)
api_url = ""
if success and result.get("data"):
api_url = result["data"].get("url", "")
if "bulk_upsert" in api_url:
record_test("Bulk Upsert API - Browser Access", True, details=f"URL: {api_url}")
else:
# It's OK if browser can't access API directly (it might return 403/JSON)
elements = get_interactive_elements()
# Just check we got some response
if elements:
record_test("Bulk Upsert API - Browser Access", True, details="API endpoint reachable")
else:
record_test("Bulk Upsert API - Browser Access", False, "Could not verify API via browser")
return True
def cleanup():
"""Cleanup browser sessions"""
log("Cleaning up browser sessions...")
run_agent_browser(["close"], check=False)
# Also close all sessions
subprocess.run(
[AGENT_BROWSER, "close", "--all"],
capture_output=True,
timeout=10
)
def print_report():
"""Print final test report"""
print("\n" + "="*70)
print("E2E TEST REPORT - agent-base Django Project")
print("="*70)
print(f"Test Started: {test_results['started_at']}")
print(f"Test Finished: {datetime.now().isoformat()}")
print("-"*70)
print(f"\nSUMMARY:")
print(f" Total Tests: {len(test_results['tests'])}")
print(f" Passed: {test_results['passed']}")
print(f" Failed: {test_results['failed']}")
print(f" Success Rate: {test_results['passed']/len(test_results['tests'])*100:.1f}%")
if test_results['errors']:
print(f"\nERRORS:")
for i, err in enumerate(test_results['errors'], 1):
print(f" {i}. {err}")
print("\nDETAILED RESULTS:")
for test in test_results['tests']:
status = "✓ PASS" if test['passed'] else "✗ FAIL"
print(f" [{status}] {test['name']}")
if not test['passed'] and test.get('error'):
print(f" Error: {test['error']}")
if test.get('details'):
print(f" Details: {test['details']}")
print("\n" + "="*70)
# Exit with appropriate code
if test_results['failed'] > 0:
sys.exit(1)
else:
sys.exit(0)
def main():
"""Main test runner"""
print("\n" + "="*70)
print("Starting E2E Tests for agent-base Django Project")
print(f"Target: {BASE_URL}")
print(f"Session: {SESSION_NAME}")
print("="*70 + "\n")
try:
# Clean up any existing sessions first
cleanup()
time.sleep(1)
# Run all tests
test_admin_login()
test_session_management()
test_message_management()
test_toolcall_management()
test_daily_reports_list()
test_daily_reports_detail()
test_bulk_upsert_api()
except KeyboardInterrupt:
log("Tests interrupted by user")
except Exception as e:
log(f"Unexpected error: {e}")
import traceback
traceback.print_exc()
finally:
cleanup()
# Print final report
print_report()
if __name__ == "__main__":
main()

View File

@@ -11,8 +11,10 @@ INSTALLED_APPS = [
"django.contrib.sessions", "django.contrib.sessions",
"django.contrib.messages", "django.contrib.messages",
"django.contrib.staticfiles", "django.contrib.staticfiles",
"django.contrib.humanize",
"rest_framework", "rest_framework",
"openclaw", "openclaw",
"openclaw_daily",
] ]
MIDDLEWARE = [ MIDDLEWARE = [

137
src/openclaw/admin.py Normal file → Executable file
View File

@@ -1,12 +1,8 @@
from datetime import date
from django.contrib import admin from django.contrib import admin
from django.db.models import Prefetch
from django.http import HttpResponse from django.http import HttpResponse
from django.template.response import TemplateResponse
from openclaw.export import export_daily_markdown from openclaw.export import export_daily_markdown
from openclaw.models import Session, Message, ToolCall from openclaw.models import Session, Message, ToolCall, CronJob, CronJobRun
class MessageInline(admin.TabularInline): class MessageInline(admin.TabularInline):
@@ -35,73 +31,6 @@ class ToolCallInline(admin.TabularInline):
return False return False
def daily_conversation_view(request):
"""Admin standalone view for date-range conversation browsing."""
start_str = request.GET.get("start")
end_str = request.GET.get("end")
agent_filter = request.GET.get("agent", "")
start_date = start_str if start_str else date.today().isoformat()
end_date = end_str if end_str else date.today().isoformat()
agents = list(
Session.objects.values_list("agent_name", flat=True)
.distinct()
.order_by("agent_name")
)
sessions_qs = Session.objects.filter(
start_time__date__gte=start_date,
start_time__date__lte=end_date,
).order_by("start_time")
if agent_filter:
sessions_qs = sessions_qs.filter(agent_name=agent_filter)
messages_prefetch = Prefetch(
"messages",
queryset=Message.objects.order_by("seq"),
)
sessions_qs = sessions_qs.prefetch_related(messages_prefetch)
role_labels = {
"user": "User",
"assistant": "Assistant",
"toolResult": "Tool Result",
}
session_list = []
for session in sessions_qs:
messages = []
for msg in session.messages.all():
messages.append({
"timestamp": msg.timestamp,
"role": msg.role,
"content_text": msg.content_text,
"tool_name": msg.tool_name,
"get_role_label": role_labels.get(msg.role, msg.role),
})
session_list.append({
"session_id": session.session_id,
"agent_name": session.agent_name,
"model_id": session.model_id,
"total_tokens": session.total_tokens,
"start_time": session.start_time,
"messages": messages,
})
context = {
**admin.site.each_context(request),
"start_date": start_date,
"end_date": end_date,
"selected_agent": agent_filter,
"agents": agents,
"sessions": session_list,
"title": "Daily Conversation View",
}
return TemplateResponse(request, "admin/openclaw/daily_view.html", context)
@admin.action(description="Export selected sessions to Markdown") @admin.action(description="Export selected sessions to Markdown")
def export_to_markdown(modeladmin, request, queryset): def export_to_markdown(modeladmin, request, queryset):
md, filename = export_daily_markdown(queryset) md, filename = export_daily_markdown(queryset)
@@ -114,34 +43,18 @@ def export_to_markdown(modeladmin, request, queryset):
class SessionAdmin(admin.ModelAdmin): class SessionAdmin(admin.ModelAdmin):
actions = [export_to_markdown] actions = [export_to_markdown]
list_display = ( list_display = (
"session_id", "session_id", "agent_name", "model_id", "total_tokens",
"agent_name", "message_count", "tool_call_count", "status", "start_time",
"model_id",
"total_tokens",
"message_count",
"start_time",
) )
list_filter = ("agent_name", "source_node", "model_id", "start_time") list_filter = ("agent_name", "source_node", "model_id", "start_time")
search_fields = ("session_id", "cwd") search_fields = ("session_id", "cwd")
ordering = ("-start_time",) ordering = ("-start_time",)
inlines = [MessageInline, ToolCallInline] inlines = [MessageInline, ToolCallInline]
readonly_fields = ( readonly_fields = (
"session_id", "session_id", "agent_name", "source_node",
"agent_name", "start_time", "end_time", "pushed_at",
"source_node",
"start_time",
"end_time",
"pushed_at",
) )
def get_urls(self):
from django.urls import path
urls = super().get_urls()
custom_urls = [
path("daily/", admin.site.admin_view(daily_conversation_view), name="openclaw_daily"),
]
return custom_urls + urls
@admin.register(Message) @admin.register(Message)
class MessageAdmin(admin.ModelAdmin): class MessageAdmin(admin.ModelAdmin):
@@ -160,3 +73,43 @@ class ToolCallAdmin(admin.ModelAdmin):
list_display = ("tool_name", "tool_call_id", "session", "is_error", "duration_ms") list_display = ("tool_name", "tool_call_id", "session", "is_error", "duration_ms")
list_filter = ("tool_name", "is_error", "exit_code") list_filter = ("tool_name", "is_error", "exit_code")
ordering = ("-created_at",) ordering = ("-created_at",)
class CronJobRunInline(admin.TabularInline):
model = CronJobRun
extra = 0
fields = ("run_at", "status", "duration_ms", "delivery_status", "model", "tokens_total")
readonly_fields = fields
ordering = ("-run_at",)
max_num = 10
def has_add_permission(self, request, obj=None):
return False
def has_delete_permission(self, request, obj=None):
return False
@admin.register(CronJob)
class CronJobAdmin(admin.ModelAdmin):
list_display = (
"job_id", "name", "agent_id", "enabled",
"schedule_expr", "state_last_run_status", "state_last_run_at",
)
list_filter = ("enabled", "agent_id", "state_last_status", "state_last_run_status")
search_fields = ("job_id", "name")
ordering = ("-updated_at",)
inlines = [CronJobRunInline]
readonly_fields = ("job_id", "created_at", "updated_at")
@admin.register(CronJobRun)
class CronJobRunAdmin(admin.ModelAdmin):
list_display = (
"job", "run_at", "status", "duration_ms",
"delivery_status", "model", "tokens_total",
)
list_filter = ("status", "delivery_status", "run_at", "model")
search_fields = ("session_id", "session_key", "summary")
ordering = ("-run_at",)
readonly_fields = ("job", "run_at", "session_id", "created_at")

View File

@@ -0,0 +1 @@
# Kept for potential future use — currently not used

View File

@@ -0,0 +1,111 @@
"""
Daily report views for admin.
List: (agent, date) combos with message/session counts.
Detail: all messages for a specific agent on a specific date.
"""
from collections import defaultdict
from datetime import date
from django.contrib import admin
from django.http import Http404
from django.template.response import TemplateResponse
from openclaw.models import Session, Message, ToolCall
def daily_report_list_view(request):
start_str = request.GET.get("start")
end_str = request.GET.get("end")
agent_filter = request.GET.get("agent", "")
today = date.today()
start_date = start_str if start_str else (today.replace(day=1)).isoformat()
end_date = end_str if end_str else today.isoformat()
agents = list(
Session.objects.values_list("agent_name", flat=True)
.distinct()
.order_by("agent_name")
)
messages_qs = Message.objects.filter(
timestamp__date__gte=start_date,
timestamp__date__lte=end_date,
).select_related("session").order_by("timestamp")
if agent_filter:
messages_qs = messages_qs.filter(session__agent_name=agent_filter)
groups = defaultdict(lambda: {"message_count": 0, "session_ids": set()})
for msg in messages_qs:
d = msg.timestamp.date()
key = (msg.session.agent_name, d)
groups[key]["message_count"] += 1
groups[key]["session_ids"].add(msg.session.session_id)
groups_list = sorted(
[
{
"agent_name": key[0],
"date_val": key[1],
"message_count": data["message_count"],
"session_count": len(data["session_ids"]),
}
for key, data in groups.items()
],
key=lambda x: (-x["date_val"].toordinal(), x["agent_name"]),
)
context = {
**admin.site.each_context(request),
"start_date": start_date,
"end_date": end_date,
"selected_agent": agent_filter,
"agents": agents,
"date_groups": groups_list,
"title": "Daily Report",
}
return TemplateResponse(request, "admin/openclaw/daily_report_list.html", context)
def daily_report_detail_view(request, agent_name, year, month, day):
target_date = date(int(year), int(month), int(day))
sessions = (
Session.objects.filter(agent_name=agent_name)
.prefetch_related("messages", "messages__tool_calls")
.order_by("start_time")
)
sessions_with_messages = []
for session in sessions:
day_messages = session.messages.filter(
timestamp__date=target_date
).order_by("seq")
if day_messages.exists():
sessions_with_messages.append({
"session": session,
"messages": list(day_messages),
})
if not sessions_with_messages:
raise Http404(f"No messages found for agent '{agent_name}' on {target_date}")
role_label_map = {
"user": "User",
"assistant": "Assistant",
"tool": "Tool",
"toolResult": "Tool Result",
"system": "System",
}
context = {
**admin.site.each_context(request),
"agent_name": agent_name,
"target_date": target_date,
"sessions": sessions_with_messages,
"role_label_map": role_label_map,
"title": f"Daily Report: {agent_name} - {target_date}",
}
return TemplateResponse(request, "admin/openclaw/daily_report_detail.html", context)

View File

@@ -0,0 +1,105 @@
from datetime import datetime, timezone
from django.db import transaction
from openclaw.models import CronJob, CronJobRun
def _ms_to_dt(ms):
if not ms:
return None
return datetime.fromtimestamp(ms / 1000, tz=timezone.utc)
class CronBulkUpsertService:
@staticmethod
@transaction.atomic
def upsert(payload):
source_node = payload.get("source_node", "unknown")
jobs_data = payload.get("jobs", [])
runs_data = payload.get("runs", [])
jobs_upserted = 0
runs_upserted = 0
# Upsert jobs
for job_data in jobs_data:
job_id = job_data["id"]
sched = job_data.get("schedule", {})
pay = job_data.get("payload", {})
deliv = job_data.get("delivery", {})
state = job_data.get("state", {})
defaults = {
"agent_id": job_data.get("agentId", ""),
"name": job_data.get("name", ""),
"session_key": job_data.get("sessionKey", ""),
"enabled": job_data.get("enabled", True),
"schedule_kind": sched.get("kind", ""),
"schedule_expr": sched.get("expr", ""),
"schedule_tz": sched.get("tz", ""),
"payload_kind": pay.get("kind", ""),
"payload_message": pay.get("message", ""),
"delivery_mode": deliv.get("mode", ""),
"delivery_channel": deliv.get("channel", ""),
"delivery_to": deliv.get("to", ""),
"state_next_run_at": _ms_to_dt(state.get("nextRunAtMs")),
"state_last_run_at": _ms_to_dt(state.get("lastRunAtMs")),
"state_last_run_status": state.get("lastRunStatus", ""),
"state_last_duration_ms": state.get("lastDurationMs"),
"state_last_status": state.get("lastStatus", ""),
"state_consecutive_errors": state.get("consecutiveErrors", 0),
"created_at_ms": job_data.get("createdAtMs"),
"updated_at_ms": job_data.get("updatedAtMs"),
"raw_json": job_data,
}
_, created = CronJob.objects.update_or_create(
job_id=job_id,
defaults=defaults,
)
if created:
jobs_upserted += 1
# Build job lookup
job_ids = [j["id"] for j in jobs_data]
job_lookup = {j.job_id: j for j in CronJob.objects.filter(job_id__in=job_ids)}
# Upsert runs
for run_data in runs_data:
job_id = run_data.get("jobId")
job = job_lookup.get(job_id)
if not job:
continue
usage = run_data.get("usage", {})
defaults = {
"ts": run_data.get("ts"),
"run_at": _ms_to_dt(run_data.get("runAtMs")),
"action": run_data.get("action", ""),
"status": run_data.get("status", ""),
"error": run_data.get("error", ""),
"summary": run_data.get("summary", ""),
"delivered": run_data.get("delivered", False),
"delivery_status": run_data.get("deliveryStatus", ""),
"session_id": run_data.get("sessionId", ""),
"session_key": run_data.get("sessionKey", ""),
"duration_ms": run_data.get("durationMs"),
"next_run_at": _ms_to_dt(run_data.get("nextRunAtMs")),
"model": run_data.get("model", ""),
"provider": run_data.get("provider", ""),
"tokens_input": usage.get("input_tokens", 0),
"tokens_output": usage.get("output_tokens", 0),
"tokens_total": usage.get("total_tokens", 0),
"raw_json": run_data,
}
_, created = CronJobRun.objects.update_or_create(
job=job,
run_at=defaults["run_at"],
session_id=run_data.get("sessionId", ""),
defaults=defaults,
)
if created:
runs_upserted += 1
return {
"jobs_upserted": jobs_upserted,
"runs_upserted": runs_upserted,
}

View File

@@ -0,0 +1,19 @@
import json
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from openclaw.cron_service import CronBulkUpsertService
@csrf_exempt
@require_http_methods(["POST"])
def cron_bulk_upsert(request):
try:
payload = json.loads(request.body)
except json.JSONDecodeError:
return JsonResponse({"error": "Invalid JSON"}, status=400)
result = CronBulkUpsertService.upsert(payload)
return JsonResponse({"status": "ok", **result})

View File

@@ -0,0 +1,79 @@
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('openclaw', '0002_add_hypertables'),
]
operations = [
migrations.CreateModel(
name='CronJob',
fields=[
('job_id', models.CharField(max_length=64, primary_key=True, serialize=False)),
('agent_id', models.CharField(blank=True, default='', max_length=128)),
('name', models.CharField(blank=True, default='', max_length=256)),
('session_key', models.CharField(blank=True, default='', max_length=256)),
('enabled', models.BooleanField(default=True)),
('schedule_kind', models.CharField(blank=True, default='', max_length=32)),
('schedule_expr', models.CharField(blank=True, default='', max_length=64)),
('schedule_tz', models.CharField(blank=True, default='', max_length=64)),
('payload_kind', models.CharField(blank=True, default='', max_length=32)),
('payload_message', models.TextField(blank=True, default='')),
('delivery_mode', models.CharField(blank=True, default='', max_length=32)),
('delivery_channel', models.CharField(blank=True, default='', max_length=32)),
('delivery_to', models.CharField(blank=True, default='', max_length=128)),
('state_next_run_at', models.DateTimeField(blank=True, null=True)),
('state_last_run_at', models.DateTimeField(blank=True, null=True)),
('state_last_run_status', models.CharField(blank=True, default='', max_length=16)),
('state_last_duration_ms', models.IntegerField(blank=True, null=True)),
('state_last_status', models.CharField(blank=True, default='', max_length=16)),
('state_consecutive_errors', models.IntegerField(default=0)),
('created_at_ms', models.BigIntegerField(blank=True, null=True)),
('updated_at_ms', models.BigIntegerField(blank=True, null=True)),
('raw_json', models.JSONField(blank=True, default=dict)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
],
options={
'db_table': 'cron_jobs',
'ordering': ['-updated_at'],
},
),
migrations.CreateModel(
name='CronJobRun',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('ts', models.BigIntegerField(help_text='event timestamp ms')),
('run_at', models.DateTimeField(blank=True, null=True)),
('action', models.CharField(blank=True, default='', max_length=32)),
('status', models.CharField(blank=True, default='', max_length=16)),
('error', models.TextField(blank=True, default='')),
('summary', models.TextField(blank=True, default='')),
('delivered', models.BooleanField(default=False)),
('delivery_status', models.CharField(blank=True, default='', max_length=32)),
('session_id', models.CharField(blank=True, default='', max_length=64)),
('session_key', models.CharField(blank=True, default='', max_length=256)),
('duration_ms', models.IntegerField(blank=True, null=True)),
('next_run_at', models.DateTimeField(blank=True, null=True)),
('model', models.CharField(blank=True, default='', max_length=128)),
('provider', models.CharField(blank=True, default='', max_length=64)),
('tokens_input', models.IntegerField(default=0)),
('tokens_output', models.IntegerField(default=0)),
('tokens_total', models.IntegerField(default=0)),
('raw_json', models.JSONField(blank=True, default=dict)),
('created_at', models.DateTimeField(auto_now_add=True)),
('job', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='runs', to='openclaw.cronjob')),
],
options={
'db_table': 'cron_job_runs',
'ordering': ['-run_at'],
},
),
migrations.AddConstraint(
model_name='cronjobrun',
constraint=models.UniqueConstraint(fields=('job', 'run_at', 'session_id'), name='cron_run_unique'),
),
]

View File

@@ -97,3 +97,77 @@ class ToolCall(models.Model):
def __str__(self): def __str__(self):
return f"ToolCall({self.tool_name} {self.tool_call_id})" return f"ToolCall({self.tool_name} {self.tool_call_id})"
class CronJob(models.Model):
job_id = models.CharField(max_length=64, primary_key=True)
agent_id = models.CharField(max_length=128, blank=True, default="")
name = models.CharField(max_length=256, blank=True, default="")
session_key = models.CharField(max_length=256, blank=True, default="")
enabled = models.BooleanField(default=True)
schedule_kind = models.CharField(max_length=32, blank=True, default="")
schedule_expr = models.CharField(max_length=64, blank=True, default="")
schedule_tz = models.CharField(max_length=64, blank=True, default="")
payload_kind = models.CharField(max_length=32, blank=True, default="")
payload_message = models.TextField(blank=True, default="")
delivery_mode = models.CharField(max_length=32, blank=True, default="")
delivery_channel = models.CharField(max_length=32, blank=True, default="")
delivery_to = models.CharField(max_length=128, blank=True, default="")
state_next_run_at = models.DateTimeField(null=True, blank=True)
state_last_run_at = models.DateTimeField(null=True, blank=True)
state_last_run_status = models.CharField(max_length=16, blank=True, default="")
state_last_duration_ms = models.IntegerField(null=True, blank=True)
state_last_status = models.CharField(max_length=16, blank=True, default="")
state_consecutive_errors = models.IntegerField(default=0)
created_at_ms = models.BigIntegerField(null=True, blank=True)
updated_at_ms = models.BigIntegerField(null=True, blank=True)
raw_json = models.JSONField(default=dict, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = "cron_jobs"
ordering = ["-updated_at"]
def __str__(self):
return f"CronJob({self.job_id} {self.name})"
class CronJobRun(models.Model):
job = models.ForeignKey(
CronJob, on_delete=models.CASCADE, related_name="runs"
)
ts = models.BigIntegerField(help_text="event timestamp ms")
run_at = models.DateTimeField(null=True, blank=True)
action = models.CharField(max_length=32, blank=True, default="")
status = models.CharField(max_length=16, blank=True, default="")
error = models.TextField(blank=True, default="")
summary = models.TextField(blank=True, default="")
delivered = models.BooleanField(default=False)
delivery_status = models.CharField(max_length=32, blank=True, default="")
session_id = models.CharField(max_length=64, blank=True, default="")
session_key = models.CharField(max_length=256, blank=True, default="")
duration_ms = models.IntegerField(null=True, blank=True)
next_run_at = models.DateTimeField(null=True, blank=True)
model = models.CharField(max_length=128, blank=True, default="")
provider = models.CharField(max_length=64, blank=True, default="")
tokens_input = models.IntegerField(default=0)
tokens_output = models.IntegerField(default=0)
tokens_total = models.IntegerField(default=0)
raw_json = models.JSONField(default=dict, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = "cron_job_runs"
ordering = ["-run_at"]
constraints = [
models.UniqueConstraint(
fields=["job", "run_at", "session_id"],
name="cron_run_unique",
)
]
def __str__(self):
return f"CronJobRun({self.job.job_id} {self.run_at})"

View File

@@ -0,0 +1,118 @@
{% extends "admin/base.html" %}
{% load humanize %}
{% block title %}{{ title }} | OpenClaw Archive{% endblock %}
{% block content %}
<div class="p-3">
<div class="d-flex justify-content-between align-items-center mb-3">
<h1>Daily Report: <span class="text-primary">{{ agent_name }}</span><span class="text-success">{{ target_date }}</span></h1>
<a href="/admin/daily-reports/?agent={{ agent_name }}&start={{ target_date }}&end={{ target_date }}" class="btn btn-secondary">← Back to List</a>
</div>
{% for item in sessions %}
<div class="card mb-4">
<div class="card-header bg-dark text-white">
<strong>Session:</strong> {{ item.session.session_id }}
&nbsp;|&nbsp;
<strong>Model:</strong> {{ item.session.model_id|default:"N/A" }}
&nbsp;|&nbsp;
<strong>Tokens:</strong> {{ item.session.total_tokens|default:0|intcomma }}
&nbsp;|&nbsp;
<strong>Cost:</strong> ${{ item.session.total_cost|default:0|floatformat:4 }}
&nbsp;|&nbsp;
<strong>Time:</strong> {{ item.session.start_time|date:"Y-m-d H:i" }} ~ {{ item.session.end_time|date:"H:i"|default:"Ongoing" }}
</div>
<div class="card-body p-0">
<table class="table table-bordered table-sm mb-0" style="font-size:12px; table-layout: fixed;">
<colgroup>
<col style="width:60px">
<col style="width:80px">
<col style="width:100px">
<col style="width:150px">
<col style="width:500px">
</colgroup>
<thead class="table-light">
<tr>
<th>Seq</th>
<th>Time</th>
<th>Role</th>
<th>Tool / Meta</th>
<th>Content</th>
</tr>
</thead>
<tbody>
{% for msg in item.messages %}
<tr style="vertical-align: top;">
<td><span class="badge bg-secondary">{{ msg.seq }}</span></td>
<td><small>{{ msg.timestamp|date:"H:i:s" }}</small></td>
<td>
{% if msg.role == "user" %}
<span class="badge bg-primary">User</span>
{% elif msg.role == "assistant" %}
<span class="badge bg-success">Assistant</span>
{% elif msg.role == "toolResult" %}
<span class="badge bg-warning text-dark">Tool Result</span>
{% elif msg.role == "tool" %}
<span class="badge bg-info text-dark">Tool</span>
{% else %}
<span class="badge bg-light text-dark">{{ msg.role }}</span>
{% endif %}
</td>
<td>
{% if msg.tool_name %}
<div><small><strong>Tool:</strong> <code>{{ msg.tool_name }}</code></small></div>
{% endif %}
{% if msg.exit_code != None %}
<div><small><strong>Exit:</strong> {{ msg.exit_code }}</small></div>
{% endif %}
{% if msg.duration_ms %}
<div><small><strong>Duration:</strong> {{ msg.duration_ms }}ms</small></div>
{% endif %}
{% if msg.tokens_total > 0 %}
<div><small><strong>Tokens:</strong> {{ msg.tokens_total|intcomma }}</small></div>
{% endif %}
{% if msg.is_error %}
<div><span class="badge bg-danger">Error</span></div>
{% endif %}
</td>
<td>
{% if msg.content_text %}
<pre style="font-size:11px; white-space: pre-wrap; word-break: break-word; margin:0; max-height: 300px; overflow-y: auto; background: #f9f9f9; padding: 4px;">{{ msg.content_text|truncatechars:2000 }}</pre>
{% else %}
<span class="text-muted"><em>(empty)</em></span>
{% endif %}
{% with msg.tool_calls.all as tc_list %}
{% if tc_list %}
<div class="mt-2">
{% for tc in tc_list %}
<div class="border rounded p-1 mb-1 bg-light">
<small><strong>TC:</strong> {{ tc.tool_name }} ({{ tc.tool_call_id|truncatechars:20 }})</small>
{% if tc.arguments %}
<details>
<summary><small>Arguments</small></summary>
<pre style="font-size:10px; white-space: pre-wrap; word-break: break-word; margin:4px 0 0; background:#fff; border:1px solid #ddd; padding:4px;">{{ tc.arguments|pprint }}</pre>
</details>
{% endif %}
{% if tc.result_text %}
<details>
<summary><small>Result ({{ tc.result_text|length }} chars)</small></summary>
<pre style="font-size:10px; white-space: pre-wrap; word-break: break-word; margin:4px 0 0; background:#fff; border:1px solid #ddd; padding:4px; max-height:200px; overflow-y:auto;">{{ tc.result_text|truncatechars:2000 }}</pre>
</details>
{% endif %}
</div>
{% endfor %}
</div>
{% endif %}
{% endwith %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
{% endfor %}
</div>
{% endblock %}

View File

@@ -0,0 +1,72 @@
{% extends "admin/base.html" %}
{% load humanize %}
{% load static %}
{% block title %}{{ title }} | OpenClaw Archive{% endblock %}
{% block branding %}
<h1 id="site-name"><a href="{% url 'admin:index' %}">{{ site_header|default:"Django Admin" }}</a></h1>
{% endblock %}
{% block content %}
<div class="p-3">
<h1>📅 Daily Report</h1>
<form method="get" class="mb-3">
<div class="row">
<div class="col-md-3">
<label>Agent</label>
<select name="agent" class="form-control">
<option value="">All Agents</option>
{% for agent in agents %}
<option value="{{ agent }}" {% if agent == selected_agent %}selected{% endif %}>{{ agent }}</option>
{% endfor %}
</select>
</div>
<div class="col-md-3">
<label>Start Date</label>
<input type="date" name="start" value="{{ start_date }}" class="form-control">
</div>
<div class="col-md-3">
<label>End Date</label>
<input type="date" name="end" value="{{ end_date }}" class="form-control">
</div>
<div class="col-md-3 d-flex align-items-end">
<button type="submit" class="btn btn-primary">Filter</button>
</div>
</div>
</form>
{% if date_groups %}
<table class="table table-bordered table-sm" style="font-size:13px">
<thead class="table-dark">
<tr>
<th>Agent</th>
<th>Date</th>
<th>Sessions</th>
<th>Messages</th>
<th>Action</th>
</tr>
</thead>
<tbody>
{% for group in date_groups %}
<tr>
<td><strong>{{ group.agent_name }}</strong></td>
<td>{{ group.date_val }}</td>
<td>{{ group.session_count }}</td>
<td>{{ group.message_count }}</td>
<td>
<a href="/admin/daily-reports/{{ group.agent_name }}/{{ group.date_val.year }}-{{ group.date_val.month }}-{{ group.date_val.day }}/"
class="btn btn-sm btn-info">
View Report
</a>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="text-muted">No messages found for the selected date range.</p>
{% endif %}
</div>
{% endblock %}

View File

@@ -1,6 +1,8 @@
from django.urls import path from django.urls import path
from openclaw.views import sessions_bulk_upsert from openclaw.views import sessions_bulk_upsert
from openclaw.cron_views import cron_bulk_upsert
urlpatterns = [ urlpatterns = [
path("sessions/bulk_upsert/", sessions_bulk_upsert, name="sessions_bulk_upsert"), path("sessions/bulk_upsert/", sessions_bulk_upsert, name="sessions_bulk_upsert"),
path("cron/bulk_upsert/", cron_bulk_upsert, name="cron_bulk_upsert"),
] ]

View File

@@ -0,0 +1 @@
default_app_config = "openclaw_daily.apps.OpenClawDailyConfig"

View File

@@ -0,0 +1,62 @@
from django.apps import AppConfig
class OpenClawDailyConfig(AppConfig):
name = "openclaw_daily"
label = "openclaw_daily"
verbose_name = "Daily Reports"
def ready(self):
from django.contrib import admin
from django.urls import path
from openclaw.admin_new_views import (
daily_report_list_view,
daily_report_detail_view,
)
# ── Monkey-patch admin.site.get_urls ───────────────────────────────
_orig_get_urls = admin.site.get_urls
def _new_get_urls():
urls = _orig_get_urls()
# Prepend our custom URLs so they take precedence
custom = [
path(
"daily/",
admin.site.admin_view(daily_report_list_view),
name="openclaw_daily",
),
path(
"daily-reports/",
admin.site.admin_view(daily_report_list_view),
name="openclaw_daily_reports",
),
path(
"daily-reports/<str:agent_name>/<int:year>-<int:month>-<int:day>/",
admin.site.admin_view(daily_report_detail_view),
name="openclaw_daily_report_detail",
),
]
return custom + urls
admin.site.get_urls = _new_get_urls
# ── Monkey-patch admin.site.get_app_list ──────────────────────────
_orig_get_app_list = admin.site.get_app_list
def _new_get_app_list(request, app_label=None):
app_list = _orig_get_app_list(request, app_label)
app_list.insert(0, {
"name": "Daily Reports",
"app_label": "openclaw_daily",
"app_url": "/admin/daily-reports/",
"models": [{
"name": "Daily Reports",
"object_name": "DailyReports",
"admin_url": "/admin/daily-reports/",
"view_only": True,
}],
})
return app_list
admin.site.get_app_list = _new_get_app_list