Compare commits
10 Commits
435bbf23d0
...
6f0b3e231a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6f0b3e231a | ||
|
|
74458b4fab | ||
|
|
1a1985a270 | ||
|
|
6bd99d043c | ||
|
|
21c5e895e0 | ||
|
|
9542ebde73 | ||
|
|
9412e7880d | ||
|
|
721b113c9d | ||
|
|
49d772139d | ||
|
|
1a32801e39 |
@@ -6,17 +6,26 @@ Scans local agent sessions directories, parses JSONL files,
|
||||
and pushes structured JSON to the Django API.
|
||||
|
||||
Usage:
|
||||
# Session sync (existing)
|
||||
python sync_sessions.py --remote-url http://macmini:8000/api/sessions/bulk_upsert/
|
||||
|
||||
# Cron job sync (new)
|
||||
python sync_sessions.py --cron \
|
||||
--remote-url http://macmini:8000/api/cron/bulk_upsert/ \
|
||||
--cron-ssh macmini \
|
||||
--cron-jobs-path /Users/weishen/openclaw/cron/jobs.json \
|
||||
--cron-runs-path /Users/weishen/openclaw/cron/runs/
|
||||
|
||||
Cron:
|
||||
0 2 * * * cd /path/to/scripts && python sync_sessions.py --remote-url <url>
|
||||
0 3 * * * cd /path/to/scripts && python sync_sessions.py --cron --remote-url <cron-url> ...
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
@@ -28,6 +37,37 @@ from pathlib import Path
|
||||
SESSIONS_DIR_NAME = "sessions"
|
||||
STATE_FILE = ".sync_state"
|
||||
DELETED_SUFFIX = ".deleted."
|
||||
CRON_STATE_FILE = ".sync_cron_state"
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────
|
||||
# SSH Helper
|
||||
# ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def ssh_read_file(host, remote_path):
|
||||
"""Read a remote file via SSH and return content as string."""
|
||||
result = subprocess.run(
|
||||
["ssh", host, f"cat {remote_path}"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=60,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"SSH read failed for {host}:{remote_path}: {result.stderr}")
|
||||
return result.stdout
|
||||
|
||||
|
||||
def ssh_list_files(host, remote_dir, pattern="*.jsonl"):
|
||||
"""List remote files matching pattern via SSH."""
|
||||
result = subprocess.run(
|
||||
["ssh", host, f"ls {remote_dir}/{pattern}"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
return []
|
||||
return [f.strip() for f in result.stdout.strip().split("\n") if f.strip()]
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────
|
||||
@@ -109,7 +149,7 @@ def get_new_files(root_path):
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────
|
||||
# JSONL Parser
|
||||
# JSONL Parser (Session mode)
|
||||
# ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def parse_jsonl(file_path):
|
||||
@@ -118,12 +158,9 @@ def parse_jsonl(file_path):
|
||||
messages = []
|
||||
tool_calls = []
|
||||
|
||||
# State tracking for model/thinking changes
|
||||
current_model_provider = ""
|
||||
current_model_id = ""
|
||||
current_thinking_level = ""
|
||||
|
||||
# Tool results lookup by tool_call_id
|
||||
tool_results = {}
|
||||
|
||||
events = []
|
||||
@@ -141,7 +178,6 @@ def parse_jsonl(file_path):
|
||||
if not events:
|
||||
return sessions, messages, tool_calls
|
||||
|
||||
# First pass: extract session metadata
|
||||
session_event = None
|
||||
for event in events:
|
||||
event_type = event.get("type", "")
|
||||
@@ -157,14 +193,12 @@ def parse_jsonl(file_path):
|
||||
session_cwd = session_event.get("cwd", "")
|
||||
session_version = events[-1].get("version", 0) if events else 0
|
||||
|
||||
# Determine start and end time from all events
|
||||
timestamps = []
|
||||
for event in events:
|
||||
ts = event.get("timestamp", "")
|
||||
if ts:
|
||||
timestamps.append(ts)
|
||||
|
||||
# Second pass: process events
|
||||
message_seq = 0
|
||||
total_tokens = 0
|
||||
total_cost = 0.0
|
||||
@@ -183,14 +217,12 @@ def parse_jsonl(file_path):
|
||||
current_thinking_level = event.get("thinkingLevel", "")
|
||||
|
||||
elif event_type == "message":
|
||||
# Nested structure: message data is inside "message" object
|
||||
message_obj = event.get("message", {})
|
||||
role = message_obj.get("role", "")
|
||||
msg_id = event.get("id", "")
|
||||
parent_id = event.get("parentId", "")
|
||||
msg_timestamp = event.get("timestamp", "")
|
||||
|
||||
# Extract text content (skip thinking) from nested content
|
||||
content_items = message_obj.get("content", [])
|
||||
text_parts = []
|
||||
tc_list = []
|
||||
@@ -200,7 +232,6 @@ def parse_jsonl(file_path):
|
||||
text_parts.append(item.get("text", ""))
|
||||
elif item.get("type") == "toolCall":
|
||||
tc_list.append(item)
|
||||
# Skip thinking types
|
||||
|
||||
content_text = "\n".join(text_parts)
|
||||
|
||||
@@ -258,7 +289,6 @@ def parse_jsonl(file_path):
|
||||
messages.append(msg_data)
|
||||
message_seq += 1
|
||||
|
||||
# Extract tool calls from assistant messages
|
||||
tc_seq = 0
|
||||
for tc in tc_list:
|
||||
tool_call_data = {
|
||||
@@ -269,7 +299,6 @@ def parse_jsonl(file_path):
|
||||
"arguments": tc.get("arguments", {}),
|
||||
"seq": tc_seq,
|
||||
}
|
||||
# Enrich with tool result if available
|
||||
tr = tool_results.get(tool_call_data["tool_call_id"], {})
|
||||
tool_call_data["result_text"] = tr.get("result_text", "")
|
||||
tool_call_data["is_error"] = tr.get("is_error", False)
|
||||
@@ -279,7 +308,6 @@ def parse_jsonl(file_path):
|
||||
tool_call_count += 1
|
||||
tc_seq += 1
|
||||
|
||||
# Build session record
|
||||
start_time = timestamps[0] if timestamps else session_timestamp
|
||||
end_time = timestamps[-1] if timestamps else session_timestamp
|
||||
|
||||
@@ -306,6 +334,135 @@ def parse_jsonl(file_path):
|
||||
return sessions, messages, tool_calls
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────
|
||||
# Cron Sync Mode
|
||||
# ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def get_cron_state(state_file_path):
|
||||
"""Read cron sync state, return {run_file: mtime}."""
|
||||
p = Path(state_file_path)
|
||||
if not p.exists():
|
||||
return {}
|
||||
try:
|
||||
with open(p) as f:
|
||||
return json.load(f)
|
||||
except (json.JSONDecodeError, IOError):
|
||||
return {}
|
||||
|
||||
|
||||
def save_cron_state(state_file_path, state):
|
||||
"""Write cron sync state."""
|
||||
p = Path(state_file_path)
|
||||
with open(p, "w") as f:
|
||||
json.dump(state, f)
|
||||
|
||||
|
||||
def sync_cron_jobs(args):
|
||||
"""Sync cron jobs from openclaw cron data."""
|
||||
ssh_host = args.cron_ssh
|
||||
jobs_path = args.cron_jobs_path
|
||||
runs_path = args.cron_runs_path.rstrip("/")
|
||||
|
||||
print(f"Fetching jobs.json from {ssh_host}:{jobs_path}...")
|
||||
try:
|
||||
jobs_raw = ssh_read_file(ssh_host, jobs_path)
|
||||
jobs_data = json.loads(jobs_raw)
|
||||
except Exception as e:
|
||||
print(f"ERROR reading jobs.json: {e}")
|
||||
return
|
||||
|
||||
jobs = jobs_data.get("jobs", [])
|
||||
job_ids = {j["id"] for j in jobs}
|
||||
print(f" Found {len(jobs)} jobs")
|
||||
|
||||
# Find runs files, filter to only those matching known job IDs
|
||||
print(f"Scanning runs directory {ssh_host}:{runs_path}/...")
|
||||
all_run_files = ssh_list_files(ssh_host, runs_path, "*.jsonl")
|
||||
run_files = [f for f in all_run_files if Path(f).stem in job_ids]
|
||||
print(f" Found {len(run_files)} run files matching known job IDs")
|
||||
|
||||
# Load sync state
|
||||
state_file = Path.home() / ".sync_cron_state"
|
||||
prev_state = get_cron_state(str(state_file))
|
||||
|
||||
new_runs = []
|
||||
new_state = {}
|
||||
|
||||
# Detect remote platform (Linux vs macOS) for stat syntax
|
||||
uname_result = subprocess.run(
|
||||
["ssh", ssh_host, "uname"],
|
||||
capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
is_macos = uname_result.stdout.strip() == "Darwin"
|
||||
stat_cmd = f"stat -f %m" if is_macos else f"stat -c %Y"
|
||||
|
||||
for run_file in run_files:
|
||||
remote_full = f"{runs_path}/{Path(run_file).name}"
|
||||
# Get mtime via SSH
|
||||
result = subprocess.run(
|
||||
["ssh", ssh_host, f"{stat_cmd} {remote_full}"],
|
||||
capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
continue
|
||||
try:
|
||||
mtime = int(result.stdout.strip())
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
old_mtime = prev_state.get(remote_full, 0)
|
||||
if mtime > old_mtime:
|
||||
new_runs.append(remote_full)
|
||||
new_state[remote_full] = mtime
|
||||
|
||||
if not new_runs:
|
||||
print("No new or modified run files found.")
|
||||
save_cron_state(str(state_file), new_state)
|
||||
return
|
||||
|
||||
print(f"Parsing {len(new_runs)} new/modified run file(s)...")
|
||||
|
||||
all_runs = []
|
||||
for run_file in new_runs:
|
||||
print(f" Parsing: {run_file}")
|
||||
try:
|
||||
raw = ssh_read_file(ssh_host, run_file)
|
||||
for line in raw.strip().split("\n"):
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
run_obj = json.loads(line)
|
||||
all_runs.append(run_obj)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f" ERROR reading {run_file}: {e}")
|
||||
continue
|
||||
|
||||
if not all_runs:
|
||||
print("No run records parsed.")
|
||||
save_cron_state(str(state_file), new_state)
|
||||
return
|
||||
|
||||
# Save new state
|
||||
save_cron_state(str(state_file), new_state)
|
||||
|
||||
payload = {
|
||||
"source_node": os.environ.get("SOURCE_NODE", ssh_host),
|
||||
"jobs": jobs,
|
||||
"runs": all_runs,
|
||||
}
|
||||
|
||||
print(f"Pushing {len(jobs)} jobs and {len(all_runs)} runs to {args.remote_url}...")
|
||||
try:
|
||||
result = push_to_api(args.remote_url, payload)
|
||||
print(f" OK: jobs_upserted={result.get('jobs_upserted', 0)}, "
|
||||
f"runs_upserted={result.get('runs_upserted', 0)}")
|
||||
except Exception as e:
|
||||
print(f" FAILED to push cron data: {e}")
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────
|
||||
# HTTP Client
|
||||
# ─────────────────────────────────────────────────────────────────
|
||||
@@ -333,13 +490,12 @@ def push_to_api(remote_url, payload):
|
||||
raise
|
||||
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────
|
||||
# Main
|
||||
# ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Sync OpenClaw sessions to Django API")
|
||||
parser = argparse.ArgumentParser(description="Sync OpenClaw sessions or cron data to Django API")
|
||||
parser.add_argument(
|
||||
"--remote-url",
|
||||
required=True,
|
||||
@@ -350,8 +506,34 @@ def main():
|
||||
default=".",
|
||||
help="Root path containing agents/ directory (default: current dir)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cron",
|
||||
action="store_true",
|
||||
help="Sync cron jobs and runs instead of session files",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cron-ssh",
|
||||
default="macmini",
|
||||
help="SSH host for cron data (default: macmini)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cron-jobs-path",
|
||||
default="/Users/weishen/openclaw/cron/jobs.json",
|
||||
help="Remote path to jobs.json",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cron-runs-path",
|
||||
default="/Users/weishen/openclaw/cron/runs/",
|
||||
help="Remote directory containing run JSONL files",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.cron:
|
||||
sync_cron_jobs(args)
|
||||
return
|
||||
|
||||
# Original session sync mode
|
||||
new_files = get_new_files(args.root_path)
|
||||
if not new_files:
|
||||
print("No new or modified session files found.")
|
||||
@@ -363,7 +545,6 @@ def main():
|
||||
total_messages = 0
|
||||
total_tool_calls = 0
|
||||
|
||||
# Group by agent_name (batch per agent)
|
||||
agent_batches = {}
|
||||
for agent_name, jsonl_path in new_files:
|
||||
agent_batches.setdefault(agent_name, []).append(jsonl_path)
|
||||
|
||||
711
scripts/test_all_pages.py
Normal file
711
scripts/test_all_pages.py
Normal file
@@ -0,0 +1,711 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
agent-base 全页面遍历测试脚本
|
||||
使用 agent-browser 进行浏览器自动化测试
|
||||
|
||||
功能:
|
||||
- 登录 Django Admin
|
||||
- 遍历所有管理页面
|
||||
- 检查页面是否正常渲染(无 500/模板错误)
|
||||
- 截图留存
|
||||
- 保存认证状态,可重复使用
|
||||
|
||||
用法:
|
||||
python3 test_all_pages.py # 运行全部测试
|
||||
python3 test_all_pages.py --login # 强制重新登录(不使用缓存)
|
||||
python3 test_all_pages.py --screenshot # 仅截图不测试
|
||||
python3 test_all_pages.py --page /admin/openclaw/session/ # 测试指定页面
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
import argparse
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
import http.cookiejar
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
# ========== 配置 ==========
|
||||
BASE_URL = "http://192.168.3.45:8765"
|
||||
ADMIN_URL = f"{BASE_URL}/admin/"
|
||||
USERNAME = "admin"
|
||||
PASSWORD = "admin123"
|
||||
SESSION_NAME = "agent-base-full-test"
|
||||
STATE_FILE = "/tmp/agent-browser-auth.json"
|
||||
SCREENSHOT_DIR = "/tmp/agent-browser-screenshots"
|
||||
|
||||
# agent-browser 路径(Mac mini 本地)
|
||||
AGENT_BROWSER = "/opt/homebrew/bin/agent-browser"
|
||||
|
||||
# 项目所有页面列表
|
||||
PAGES_TO_TEST = [
|
||||
# 页面路径, 描述, 是否需要登录
|
||||
("/admin/login/", "登录页", False),
|
||||
("/admin/", "Admin 首页", True),
|
||||
("/admin/openclaw/session/", "Session 管理", True),
|
||||
("/admin/openclaw/message/", "Message 管理", True),
|
||||
("/admin/openclaw/toolcall/", "ToolCall 管理", True),
|
||||
("/admin/daily-reports/", "日报列表", True),
|
||||
("/admin/daily-reports/xingjiang/2026-4-7/", "日报详情-星匠-4月7日", True),
|
||||
("/admin/daily-reports/xingjiang/2026-4-8/", "日报详情-星匠-4月8日", True),
|
||||
("/api/sessions/bulk_upsert/", "API Bulk Upsert", True),
|
||||
]
|
||||
|
||||
# ========== 工具函数 ==========
|
||||
|
||||
def log(msg: str, emoji: str = ""):
|
||||
ts = datetime.now().strftime("%H:%M:%S")
|
||||
prefix = f"[{ts}]"
|
||||
if emoji:
|
||||
prefix += f" {emoji}"
|
||||
print(f"{prefix} {msg}", flush=True)
|
||||
|
||||
|
||||
def run_browser(args: list, timeout: int = 30, check: bool = True) -> tuple:
|
||||
"""执行 agent-browser 命令,返回 (success, result_dict)"""
|
||||
cmd = [AGENT_BROWSER, "--session", SESSION_NAME, "--json"] + args
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
|
||||
if result.stdout:
|
||||
try:
|
||||
return True, json.loads(result.stdout)
|
||||
except json.JSONDecodeError:
|
||||
return True, {"raw": result.stdout}
|
||||
if result.returncode != 0 and check:
|
||||
log(f" 命令失败 exit={result.returncode}: {' '.join(args)}", "❌")
|
||||
if result.stderr:
|
||||
log(f" stderr: {result.stderr[:200]}", "❌")
|
||||
return False, {}
|
||||
return result.returncode == 0, {}
|
||||
except subprocess.TimeoutExpired:
|
||||
log(f" 命令超时 {timeout}s: {' '.join(args)}", "❌")
|
||||
return False, {}
|
||||
except FileNotFoundError:
|
||||
log(f" agent-browser 未找到: {AGENT_BROWSER}", "❌")
|
||||
return False, {}
|
||||
except Exception as e:
|
||||
log(f" 命令异常: {e}", "❌")
|
||||
return False, {}
|
||||
|
||||
|
||||
def get_snapshot() -> dict:
|
||||
"""获取当前页面的交互元素快照"""
|
||||
success, result = run_browser(["snapshot", "-i", "--json"], timeout=20, check=False)
|
||||
if success and result.get("data"):
|
||||
return result["data"].get("refs", {})
|
||||
return {}
|
||||
|
||||
|
||||
def get_url() -> str:
|
||||
"""获取当前 URL"""
|
||||
success, result = run_browser(["get", "url", "--json"], check=False)
|
||||
if success and result.get("data"):
|
||||
return result["data"].get("url", "")
|
||||
return ""
|
||||
|
||||
|
||||
def get_title() -> str:
|
||||
"""获取当前页面标题"""
|
||||
success, result = run_browser(["get", "title", "--json"], check=False)
|
||||
if success and result.get("data"):
|
||||
return result["data"].get("title", "")
|
||||
return ""
|
||||
|
||||
|
||||
def find_ref(elements: dict, role_pat: str = None, name_pat: str = None) -> str:
|
||||
"""根据 role 和 name 模式查找元素 ref"""
|
||||
for ref, info in elements.items():
|
||||
role = info.get("role", "")
|
||||
name = info.get("name", "")
|
||||
if role_pat and not re.match(role_pat, role, re.I):
|
||||
continue
|
||||
if name_pat and not re.search(name_pat, name, re.I):
|
||||
continue
|
||||
return ref
|
||||
return None
|
||||
|
||||
|
||||
def find_refs(elements: dict, role_pat: str = None, name_pat: str = None) -> list:
|
||||
"""查找所有匹配的元素 refs"""
|
||||
results = []
|
||||
for ref, info in elements.items():
|
||||
role = info.get("role", "")
|
||||
name = info.get("name", "")
|
||||
if role_pat and not re.match(role_pat, role, re.I):
|
||||
continue
|
||||
if name_pat and not re.search(name_pat, name, re.I):
|
||||
continue
|
||||
results.append(ref)
|
||||
return results
|
||||
|
||||
|
||||
def wait_load(seconds: float = 2):
|
||||
"""等待页面稳定"""
|
||||
time.sleep(seconds)
|
||||
|
||||
|
||||
def take_screenshot(name: str) -> str:
|
||||
"""截图并返回保存路径"""
|
||||
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
|
||||
path = f"{SCREENSHOT_DIR}/{name}.png"
|
||||
success, _ = run_browser(["screenshot", path], timeout=15, check=False)
|
||||
if success:
|
||||
return path
|
||||
return ""
|
||||
|
||||
|
||||
def close_session():
|
||||
"""关闭浏览器会话"""
|
||||
run_browser(["close"], check=False)
|
||||
|
||||
|
||||
def cleanup_all():
|
||||
"""清理所有会话"""
|
||||
subprocess.run([AGENT_BROWSER, "close", "--all"],
|
||||
capture_output=True, timeout=10)
|
||||
|
||||
|
||||
def check_page_errors() -> list:
|
||||
"""检查页面是否有错误(仅检测真正的 Django error alert)"""
|
||||
elements = get_snapshot()
|
||||
errors = []
|
||||
|
||||
# 只检查 role=alert 或 role=alertdialog(Django 错误提示的标准 role)
|
||||
for ref, info in elements.items():
|
||||
role = info.get("role", "")
|
||||
name = info.get("name", "")
|
||||
if role in ("alert", "alertdialog"):
|
||||
# Django error alerts contain error messages
|
||||
errors.append(f"[alert] {name[:80]}")
|
||||
# 也检查 title/heading 中的 Django 典型错误关键字
|
||||
if role == "heading":
|
||||
if any(kw in name.lower() for kw in ["500", "internal server error", "template does not exist"]):
|
||||
errors.append(f"[heading] {name[:80]}")
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def login() -> bool:
|
||||
"""
|
||||
使用 agent-browser 直接操作 Django Admin 登录页。
|
||||
"""
|
||||
log("正在通过 agent-browser 登录 Django Admin...", "🔐")
|
||||
|
||||
login_url = f"{ADMIN_URL}login/"
|
||||
|
||||
# Step 1: 打开登录页
|
||||
success, _ = run_browser(["open", login_url])
|
||||
if not success:
|
||||
log("无法打开登录页", "❌")
|
||||
return False
|
||||
wait_load(2)
|
||||
|
||||
# Step 2: 获取元素快照
|
||||
elements = get_snapshot()
|
||||
if not elements:
|
||||
log("登录页无交互元素", "❌")
|
||||
return False
|
||||
|
||||
# 如果已经在 admin 首页,说明已登录
|
||||
url = get_url()
|
||||
if "/admin/login" not in url:
|
||||
log(f"已在登录状态: {url}", "✅")
|
||||
run_browser(["state", "save", STATE_FILE], check=False)
|
||||
return True
|
||||
|
||||
# Step 3: 找用户名/密码输入框(Django admin 中文界面)
|
||||
all_textboxes = {r: i for r, i in elements.items() if "textbox" in i.get("role", "").lower()}
|
||||
username_ref = None
|
||||
password_ref = None
|
||||
|
||||
for ref, info in all_textboxes.items():
|
||||
name = info.get("name", "") # e.g. "用户名:" 或 "密码:"
|
||||
if not username_ref and ("用户" in name or "user" in name.lower()):
|
||||
username_ref = ref
|
||||
elif not password_ref and ("密码" in name or "pass" in name.lower()):
|
||||
password_ref = ref
|
||||
|
||||
# fallback: 按顺序取前两个 textbox
|
||||
tb_refs = list(all_textboxes.keys())
|
||||
if not username_ref and len(tb_refs) >= 1:
|
||||
username_ref = tb_refs[0]
|
||||
if not password_ref and len(tb_refs) >= 2:
|
||||
password_ref = tb_refs[1]
|
||||
|
||||
if not username_ref:
|
||||
log(f"未找到用户名输入框,可用: {all_textboxes}", "❌")
|
||||
return False
|
||||
if not password_ref:
|
||||
log(f"未找到密码输入框,可用: {all_textboxes}", "❌")
|
||||
return False
|
||||
|
||||
log(f" 找到用户框: {username_ref} | 密码框: {password_ref}", "🔍")
|
||||
|
||||
# Step 4: 找提交按钮(中文 Django admin 按钮文字是"登录")
|
||||
submit_ref = find_ref(elements, "button", "登录|log|sign|submit")
|
||||
if not submit_ref:
|
||||
submit_ref = find_refs(elements, "button")[0] if find_refs(elements, "button") else None
|
||||
|
||||
if not submit_ref:
|
||||
log("未找到提交按钮", "❌")
|
||||
return False
|
||||
|
||||
log(f" 找到提交按钮: {submit_ref}", "🔍")
|
||||
|
||||
# Step 5: 填表
|
||||
log(f" 填入用户名...", "✏️")
|
||||
run_browser(["fill", username_ref, USERNAME])
|
||||
wait_load(0.5)
|
||||
|
||||
log(f" 填入密码...", "✏️")
|
||||
run_browser(["fill", password_ref, PASSWORD])
|
||||
wait_load(0.5)
|
||||
|
||||
log(" 点击登录...", "✏️")
|
||||
run_browser(["click", submit_ref])
|
||||
wait_load(3) # Django 重定向需要时间
|
||||
|
||||
# Step 6: 验证
|
||||
url = get_url()
|
||||
if "/admin/" in url and "login" not in url.lower():
|
||||
log(f"登录成功,当前页面: {url}", "✅")
|
||||
run_browser(["state", "save", STATE_FILE], check=False)
|
||||
log("认证状态已保存", "💾")
|
||||
return True
|
||||
else:
|
||||
log(f"登录后 URL 不对: {url}", "❌")
|
||||
# 打印页面内容帮助调试
|
||||
elements = get_snapshot()
|
||||
errors = [info.get("name", "") for ref, info in elements.items()
|
||||
if any(k in info.get("name", "").lower() for k in ["error", "invalid", "incorrect"])]
|
||||
if errors:
|
||||
log(f" 页面错误提示: {errors[:3]}", "❌")
|
||||
return False
|
||||
|
||||
|
||||
def ensure_logged_in(force: bool = False) -> bool:
|
||||
"""确保已登录,可选强制重新登录"""
|
||||
if force or not os.path.exists(STATE_FILE):
|
||||
return login()
|
||||
|
||||
# 尝试加载缓存状态
|
||||
success, _ = run_browser(["state", "load", STATE_FILE], check=False)
|
||||
if success:
|
||||
log("已加载缓存的认证状态", "📂")
|
||||
# 强制打开一个新页面验证(state load 后需要重新 open)
|
||||
run_browser(["open", ADMIN_URL])
|
||||
wait_load(2)
|
||||
url = get_url()
|
||||
title = get_title()
|
||||
if "/admin/" in url and "login" not in url.lower():
|
||||
log(f"会话验证成功: {url} | {title[:30]}", "✅")
|
||||
return True
|
||||
log(f"缓存已过期 (url={url}),将重新登录", "🔄")
|
||||
|
||||
return login()
|
||||
|
||||
|
||||
# ========== 单页测试函数 ==========
|
||||
|
||||
def build_url(path: str) -> str:
|
||||
"""正确拼接页面 URL,避免双重 /admin/ /api/ 前缀"""
|
||||
path = path.lstrip("/")
|
||||
# 已包含完整 URL(以 http 开头)
|
||||
if path.startswith("http"):
|
||||
return path
|
||||
# 已经是完整路径(如 admin/login/、api/sessions/)
|
||||
if path.startswith("admin/") or path.startswith("api/"):
|
||||
return ADMIN_URL.rstrip("/") + "/" + path
|
||||
# 纯路径片段
|
||||
return ADMIN_URL + path
|
||||
|
||||
|
||||
def test_page(path: str, desc: str, requires_login: bool) -> dict:
|
||||
"""测试单个页面,返回测试结果 dict"""
|
||||
url = build_url(path)
|
||||
result = {
|
||||
"path": path,
|
||||
"desc": desc,
|
||||
"url": url,
|
||||
"status": "skip",
|
||||
"http_code": None,
|
||||
"errors": [],
|
||||
"title": "",
|
||||
"elements_count": 0,
|
||||
"screenshot": "",
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
|
||||
# 如果需要登录但未登录,跳过
|
||||
if requires_login:
|
||||
if not ensure_logged_in():
|
||||
result["status"] = "skip_login_failed"
|
||||
return result
|
||||
|
||||
log(f"测试: {desc} ({url})", "🌐")
|
||||
|
||||
# 打开页面
|
||||
success, _ = run_browser(["open", url])
|
||||
if not success:
|
||||
result["status"] = "nav_failed"
|
||||
return result
|
||||
wait_load(2)
|
||||
|
||||
# 截图
|
||||
screenshot_name = path.strip("/").replace("/", "_").replace("-", "_")
|
||||
screenshot_name = f"{datetime.now().strftime('%H%M%S')}_{screenshot_name}"
|
||||
result["screenshot"] = take_screenshot(screenshot_name)
|
||||
|
||||
# 获取页面信息
|
||||
result["url"] = get_url()
|
||||
result["title"] = get_title()
|
||||
elements = get_snapshot()
|
||||
result["elements_count"] = len(elements)
|
||||
|
||||
# 检查 HTTP 错误(通过 URL 重定向判断)
|
||||
if "login" in result["url"].lower() and path != "/admin/login/":
|
||||
result["status"] = "redirected_to_login"
|
||||
result["errors"].append("需要登录但未认证")
|
||||
return result
|
||||
|
||||
if result["url"].startswith(BASE_URL + "/admin/login"):
|
||||
result["status"] = "login_page"
|
||||
return result
|
||||
|
||||
# 检查页面错误
|
||||
errors = check_page_errors()
|
||||
if errors:
|
||||
result["status"] = "page_error"
|
||||
result["errors"] = errors
|
||||
log(f" 页面有错误: {errors[:2]}", "❌")
|
||||
return result
|
||||
|
||||
# 检查 Django 500 错误
|
||||
elements_text = " ".join(v.get("name", "") for v in elements.values())
|
||||
if any(kw in elements_text for kw in ["500", "Internal Server Error", "TemplateSyntaxError", "Invalid filter"]):
|
||||
result["status"] = "server_error"
|
||||
result["errors"].append("Django 500 or template error detected")
|
||||
log(f" 检测到服务器错误", "❌")
|
||||
return result
|
||||
|
||||
# 基本检查
|
||||
if result["elements_count"] < 3:
|
||||
result["status"] = "page_empty"
|
||||
result["errors"].append("页面元素过少,可能加载失败")
|
||||
log(f" 页面元素过少: {result['elements_count']}", "⚠️")
|
||||
return result
|
||||
|
||||
result["status"] = "pass"
|
||||
log(f" ✅ OK | 元素数: {result['elements_count']} | 标题: {result['title'][:40]}", "✅")
|
||||
return result
|
||||
|
||||
|
||||
def test_admin_index() -> dict:
|
||||
"""测试 Admin 首页"""
|
||||
result = test_page("openclaw/", "Admin 首页概览", True)
|
||||
# Admin 首页重定向到 auth/user/,特殊处理
|
||||
elements = get_snapshot()
|
||||
if result["status"] == "pass":
|
||||
# 检查是否有模型列表
|
||||
model_links = find_refs(elements, "link")
|
||||
result["models_found"] = len(model_links)
|
||||
if len(model_links) > 0:
|
||||
log(f" 发现 {len(model_links)} 个模型链接", "📋")
|
||||
return result
|
||||
|
||||
|
||||
def test_session_list() -> dict:
|
||||
"""测试 Session 列表页"""
|
||||
result = test_page("openclaw/session/", "Session 列表", True)
|
||||
if result["status"] == "pass":
|
||||
elements = get_snapshot()
|
||||
# 检查是否有表格
|
||||
has_table = find_ref(elements, "table") is not None
|
||||
has_search = find_ref(elements, "searchbox") is not None
|
||||
result["has_table"] = has_table
|
||||
result["has_search"] = has_search
|
||||
log(f" 表格: {has_table} | 搜索: {has_search}")
|
||||
return result
|
||||
|
||||
|
||||
def test_message_list() -> dict:
|
||||
"""测试 Message 列表页"""
|
||||
return test_page("openclaw/message/", "Message 列表", True)
|
||||
|
||||
|
||||
def test_toolcall_list() -> dict:
|
||||
"""测试 ToolCall 列表页"""
|
||||
return test_page("openclaw/toolcall/", "ToolCall 列表", True)
|
||||
|
||||
|
||||
def test_daily_reports_list() -> dict:
|
||||
"""测试日报列表页"""
|
||||
result = test_page("daily-reports/", "日报列表", True)
|
||||
if result["status"] == "pass":
|
||||
elements = get_snapshot()
|
||||
# 检查是否有数据(链接或表格)
|
||||
links = find_refs(elements, None, "xingjiang|2026|report|daily")
|
||||
tables = find_refs(elements, "table")
|
||||
result["links_found"] = len(links)
|
||||
result["tables_found"] = len(tables)
|
||||
log(f" 链接数: {len(links)} | 表格数: {len(tables)}")
|
||||
return result
|
||||
|
||||
|
||||
def test_daily_report_detail(agent: str, year: int, month: int, day: int) -> dict:
|
||||
"""测试指定日期的日报详情页"""
|
||||
path = f"daily-reports/{agent}/{year}-{month}-{day}/"
|
||||
desc = f"日报详情-{agent}-{year}-{month:02d}-{day:02d}"
|
||||
result = test_page(path, desc, True)
|
||||
if result["status"] == "pass":
|
||||
elements = get_snapshot()
|
||||
# 检查关键数据
|
||||
has_heading = find_ref(elements, "heading") is not None
|
||||
has_session = bool(find_ref(elements, None, "session"))
|
||||
has_tokens = bool(find_ref(elements, None, "token"))
|
||||
result["has_heading"] = has_heading
|
||||
result["has_session"] = has_session
|
||||
result["has_tokens"] = has_tokens
|
||||
log(f" 标题: {has_heading} | Session: {has_session} | Tokens: {has_tokens}")
|
||||
return result
|
||||
|
||||
|
||||
def test_login_page() -> dict:
|
||||
"""测试登录页 - 先强制刷新(清除已登录状态)再打开"""
|
||||
# 强制重新打开登录页(清除可能已登录的会话状态)
|
||||
login_url = build_url("/admin/login/")
|
||||
result = {
|
||||
"path": "/admin/login/",
|
||||
"desc": "登录页",
|
||||
"url": login_url,
|
||||
"status": "skip",
|
||||
"http_code": None,
|
||||
"errors": [],
|
||||
"title": "",
|
||||
"elements_count": 0,
|
||||
"screenshot": "",
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
|
||||
# 直接打开登录页(不清除全局会话,仅此处处理)
|
||||
# 如果已经登录,Django 会重定向到 /admin/,这种情况记录为"已登录"
|
||||
success, _ = run_browser(["open", login_url])
|
||||
if not success:
|
||||
result["status"] = "nav_failed"
|
||||
return result
|
||||
wait_load(2)
|
||||
|
||||
# 截图
|
||||
result["screenshot"] = take_screenshot(f"login_page_{datetime.now().strftime('%H%M%S')}")
|
||||
result["url"] = get_url()
|
||||
result["title"] = get_title()
|
||||
elements = get_snapshot()
|
||||
result["elements_count"] = len(elements)
|
||||
|
||||
# 如果已经登录(URL 跳转到 admin index),记录为 pass 但注明"已登录"
|
||||
if "/admin/login" not in result["url"] and "/admin/" in result["url"]:
|
||||
result["status"] = "pass"
|
||||
result["errors"].append("用户已登录,登录页重定向到 admin index")
|
||||
log(f" 当前已登录,登录页自动跳转至: {result['url']}", "ℹ️")
|
||||
return result
|
||||
|
||||
# 正常登录页检查
|
||||
has_user = find_ref(elements, "textbox") is not None
|
||||
has_pass = (
|
||||
find_ref(elements, "textbox", "pass") is not None or
|
||||
len(find_refs(elements, "textbox")) > 1
|
||||
)
|
||||
has_button = find_ref(elements, "button") is not None
|
||||
result["has_username_field"] = has_user
|
||||
result["has_password_field"] = has_pass
|
||||
result["has_submit_button"] = has_button
|
||||
|
||||
if has_user and has_pass and has_button:
|
||||
result["status"] = "pass"
|
||||
log(f" ✅ 登录页正常 | 用户: {has_user} | 密码: {has_pass} | 按钮: {has_button}")
|
||||
else:
|
||||
result["status"] = "page_error"
|
||||
result["errors"].append(f"缺少表单元素: user={has_user}, pass={has_pass}, btn={has_button}")
|
||||
log(f" ❌ 登录页表单不完整", "❌")
|
||||
|
||||
return result
|
||||
if result["status"] in ("pass", "login_page"):
|
||||
result["status"] = "pass"
|
||||
elements = get_snapshot()
|
||||
has_user = find_ref(elements, "textbox") is not None
|
||||
has_pass = find_ref(elements, "password") is not None
|
||||
has_button = find_ref(elements, "button") is not None
|
||||
result["has_username_field"] = has_user
|
||||
result["has_password_field"] = has_pass
|
||||
result["has_submit_button"] = has_button
|
||||
log(f" 用户名框: {has_user} | 密码框: {has_pass} | 按钮: {has_button}")
|
||||
return result
|
||||
|
||||
|
||||
# ========== 主测试套件 ==========
|
||||
|
||||
def run_full_test_suite() -> list:
|
||||
"""运行完整测试套件"""
|
||||
results = []
|
||||
|
||||
log("=" * 60, "")
|
||||
log("开始全页面遍历测试", "🚀")
|
||||
log("=" * 60, "")
|
||||
|
||||
# 1. 登录页测试
|
||||
results.append(test_login_page())
|
||||
|
||||
# 2. Admin 首页
|
||||
results.append(test_admin_index())
|
||||
|
||||
# 3. Session 管理
|
||||
results.append(test_session_list())
|
||||
|
||||
# 4. Message 管理
|
||||
results.append(test_message_list())
|
||||
|
||||
# 5. ToolCall 管理
|
||||
results.append(test_toolcall_list())
|
||||
|
||||
# 6. 日报列表
|
||||
results.append(test_daily_reports_list())
|
||||
|
||||
# 7. 日报详情(多个日期)
|
||||
results.append(test_daily_report_detail("xingjiang", 2026, 4, 7))
|
||||
results.append(test_daily_report_detail("xingjiang", 2026, 4, 8))
|
||||
|
||||
# 8. API 页面(检查是否返回 JSON 或有响应)
|
||||
results.append(test_page("/api/sessions/bulk_upsert/", "API Bulk Upsert", True))
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# ========== 报告输出 ==========
|
||||
|
||||
def print_report(results: list):
|
||||
"""打印测试报告"""
|
||||
passed = sum(1 for r in results if r["status"] == "pass")
|
||||
failed = len(results) - passed
|
||||
|
||||
print("\n" + "=" * 70)
|
||||
print("🧪 全页面遍历测试报告 - agent-base")
|
||||
print("=" * 70)
|
||||
print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
print(f"总计: {len(results)} | ✅ 通过: {passed} | ❌ 失败: {failed} | 成功率: {passed/len(results)*100:.0f}%")
|
||||
print("-" * 70)
|
||||
|
||||
for r in results:
|
||||
status_icon = "✅" if r["status"] == "pass" else "❌"
|
||||
print(f"\n{status_icon} [{r['desc']}]")
|
||||
print(f" URL: {r['url']}")
|
||||
if r["status"] != "pass":
|
||||
print(f" 状态: {r['status']}")
|
||||
if r["errors"]:
|
||||
print(f" 错误: {r['errors']}")
|
||||
else:
|
||||
print(f" 标题: {r['title'][:50]}")
|
||||
print(f" 元素数: {r['elements_count']}")
|
||||
if r["screenshot"]:
|
||||
print(f" 截图: {r['screenshot']}")
|
||||
|
||||
print("\n" + "=" * 70)
|
||||
|
||||
# 详细失败列表
|
||||
if failed > 0:
|
||||
print("\n❌ 失败页面:")
|
||||
for r in results:
|
||||
if r["status"] != "pass":
|
||||
print(f" - {r['desc']}: {r['status']} | {r['errors']}")
|
||||
|
||||
# 截图路径
|
||||
if os.path.exists(SCREENSHOT_DIR):
|
||||
print(f"\n📸 截图目录: {SCREENSHOT_DIR}")
|
||||
for f in sorted(os.listdir(SCREENSHOT_DIR)):
|
||||
if f.endswith(".png"):
|
||||
full = os.path.join(SCREENSHOT_DIR, f)
|
||||
size = os.path.getsize(full) / 1024
|
||||
print(f" - {f} ({size:.0f} KB)")
|
||||
|
||||
print("=" * 70)
|
||||
|
||||
return failed == 0
|
||||
|
||||
|
||||
def save_report(results: list, path: str):
|
||||
"""保存 JSON 报告"""
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
json.dump({
|
||||
"generated_at": datetime.now().isoformat(),
|
||||
"base_url": BASE_URL,
|
||||
"summary": {
|
||||
"total": len(results),
|
||||
"passed": sum(1 for r in results if r["status"] == "pass"),
|
||||
"failed": sum(1 for r in results if r["status"] != "pass"),
|
||||
},
|
||||
"results": results,
|
||||
}, f, ensure_ascii=False, indent=2)
|
||||
log(f"报告已保存: {path}", "💾")
|
||||
|
||||
|
||||
# ========== CLI 入口 ==========
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="agent-base 全页面遍历测试")
|
||||
parser.add_argument("--login", action="store_true", help="强制重新登录(不使用缓存)")
|
||||
parser.add_argument("--screenshot", action="store_true", help="仅截图模式")
|
||||
parser.add_argument("--page", type=str, help="测试指定页面路径")
|
||||
parser.add_argument("--report", type=str, default="/tmp/agent_base_test_report.json", help="报告保存路径")
|
||||
parser.add_argument("--session", type=str, default=SESSION_NAME, help="浏览器会话名")
|
||||
args = parser.parse_args()
|
||||
|
||||
# No global mutation needed — keep module-level SESSION_NAME
|
||||
|
||||
try:
|
||||
# 清理旧会话
|
||||
cleanup_all()
|
||||
time.sleep(1)
|
||||
|
||||
if args.page:
|
||||
# 单页测试
|
||||
path = args.page
|
||||
desc = f"单页测试: {path}"
|
||||
result = test_page(path, desc, requires_login=True)
|
||||
print_report([result])
|
||||
return
|
||||
|
||||
if args.screenshot:
|
||||
# 截图模式
|
||||
ensure_logged_in(force=args.login)
|
||||
take_screenshot("manual_screenshot")
|
||||
log(f"截图已保存: {SCREENSHOT_DIR}")
|
||||
return
|
||||
|
||||
# 完整测试套件
|
||||
results = run_full_test_suite()
|
||||
save_report(results, args.report)
|
||||
all_passed = print_report(results)
|
||||
|
||||
# 关闭会话
|
||||
close_session()
|
||||
|
||||
sys.exit(0 if all_passed else 1)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
log("测试被用户中断", "⚠️")
|
||||
close_session()
|
||||
sys.exit(130)
|
||||
except Exception as e:
|
||||
log(f"测试异常: {e}", "❌")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
close_session()
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
835
scripts/test_e2e_agent_browser.py
Executable file
835
scripts/test_e2e_agent_browser.py
Executable file
@@ -0,0 +1,835 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
End-to-End Test Script for agent-base Django Project
|
||||
Uses agent-browser CLI for browser automation
|
||||
|
||||
Usage: python3 test_e2e_agent_browser.py
|
||||
|
||||
Environment Variables:
|
||||
AGENT_BROWSER_EXECUTABLE_PATH: Path to Chrome/Chromium binary
|
||||
(Defaults to /Applications/Google Chrome.app/Contents/MacOS/Google Chrome on macOS)
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
import os
|
||||
import re
|
||||
import uuid
|
||||
import platform
|
||||
from datetime import datetime
|
||||
|
||||
# Detect agent-browser path based on OS
|
||||
def get_agent_browser_path():
|
||||
"""Get agent-browser executable path based on the system."""
|
||||
env_path = os.environ.get("AGENT_BROWSER_PATH")
|
||||
if env_path and os.path.exists(env_path):
|
||||
return env_path
|
||||
|
||||
system = platform.system()
|
||||
if system == "Darwin":
|
||||
path = "/opt/homebrew/bin/agent-browser"
|
||||
if os.path.exists(path):
|
||||
return path
|
||||
elif system == "Linux":
|
||||
for path in [
|
||||
"/home/shenwei/.npm-global/bin/agent-browser",
|
||||
"/usr/local/bin/agent-browser",
|
||||
"/usr/bin/agent-browser",
|
||||
]:
|
||||
if os.path.exists(path):
|
||||
return path
|
||||
|
||||
for cmd in ["agent-browser"]:
|
||||
try:
|
||||
result = subprocess.run(["which", cmd], capture_output=True, text=True, timeout=5)
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
return result.stdout.strip()
|
||||
except:
|
||||
pass
|
||||
|
||||
return "agent-browser"
|
||||
|
||||
def get_chrome_path():
|
||||
"""Get Chrome executable path based on the system."""
|
||||
if os.environ.get("AGENT_BROWSER_EXECUTABLE_PATH"):
|
||||
return os.environ["AGENT_BROWSER_EXECUTABLE_PATH"]
|
||||
|
||||
system = platform.system()
|
||||
if system == "Darwin":
|
||||
mac_chrome = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"
|
||||
if os.path.exists(mac_chrome):
|
||||
return mac_chrome
|
||||
elif system == "Linux":
|
||||
for path in [
|
||||
"/usr/bin/google-chrome",
|
||||
"/usr/bin/chromium-browser",
|
||||
"/usr/bin/chromium",
|
||||
"/snap/bin/chromium",
|
||||
os.path.expanduser("~/.local/bin/chromium"),
|
||||
]:
|
||||
if os.path.exists(path):
|
||||
return path
|
||||
|
||||
return ""
|
||||
|
||||
# Initialize paths
|
||||
AGENT_BROWSER = get_agent_browser_path()
|
||||
CHROME_PATH = get_chrome_path()
|
||||
|
||||
if CHROME_PATH:
|
||||
os.environ["AGENT_BROWSER_EXECUTABLE_PATH"] = CHROME_PATH
|
||||
|
||||
print(f"Using agent-browser: {AGENT_BROWSER}", file=sys.stderr)
|
||||
if CHROME_PATH:
|
||||
print(f"Using Chrome: {CHROME_PATH}", file=sys.stderr)
|
||||
else:
|
||||
print("Chrome: auto-detect (ensure agent-browser install completed)", file=sys.stderr)
|
||||
|
||||
# Configuration
|
||||
BASE_URL = "http://192.168.3.45:8765"
|
||||
ADMIN_URL = f"{BASE_URL}/admin/"
|
||||
USERNAME = "admin"
|
||||
PASSWORD = "admin123"
|
||||
SESSION_NAME = "agent-base-e2e-test"
|
||||
STATE_FILE = "/tmp/agent-browser-auth-state.json"
|
||||
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) if os.path.abspath(__file__).startswith('/tmp') else "/home/shenwei/docker/agent-base/scripts"
|
||||
PROJECT_DIR = "/home/shenwei/docker/agent-base"
|
||||
|
||||
# Test results tracking
|
||||
test_results = {
|
||||
"started_at": datetime.now().isoformat(),
|
||||
"tests": [],
|
||||
"passed": 0,
|
||||
"failed": 0,
|
||||
"errors": []
|
||||
}
|
||||
|
||||
|
||||
def log(msg):
|
||||
"""Print log message with timestamp"""
|
||||
ts = datetime.now().strftime("%H:%M:%S")
|
||||
print(f"[{ts}] {msg}", flush=True)
|
||||
|
||||
|
||||
def log_step(step_num, msg):
|
||||
"""Print step with number"""
|
||||
log(f"[Step {step_num}] {msg}")
|
||||
|
||||
|
||||
def run_agent_browser(args, timeout=30, check=True):
|
||||
"""
|
||||
Run agent-browser command and return JSON result.
|
||||
|
||||
Args:
|
||||
args: List of command arguments (e.g., ['open', 'https://example.com'])
|
||||
timeout: Command timeout in seconds
|
||||
check: Whether to raise exception on non-zero exit
|
||||
|
||||
Returns:
|
||||
tuple: (success: bool, result: dict or error message)
|
||||
"""
|
||||
cmd = [AGENT_BROWSER, "--session", SESSION_NAME, "--json"]
|
||||
cmd.extend(args)
|
||||
|
||||
log(f" Running: agent-browser {' '.join(args)}")
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
if result.stdout:
|
||||
try:
|
||||
return True, json.loads(result.stdout)
|
||||
except json.JSONDecodeError:
|
||||
return True, {"raw": result.stdout}
|
||||
|
||||
if result.returncode != 0 and check:
|
||||
log(f" ERROR: exit code {result.returncode}")
|
||||
log(f" stderr: {result.stderr[:500]}")
|
||||
return False, result.stderr
|
||||
|
||||
return result.returncode == 0, {}
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
log(f" TIMEOUT after {timeout}s")
|
||||
return False, "Command timeout"
|
||||
except Exception as e:
|
||||
log(f" EXCEPTION: {e}")
|
||||
return False, str(e)
|
||||
|
||||
|
||||
def wait_for_page_load(timeout=10):
|
||||
"""Wait for page to finish loading"""
|
||||
time.sleep(2) # Give page time to settle
|
||||
|
||||
|
||||
def get_interactive_elements():
|
||||
"""Get snapshot of interactive elements from current page"""
|
||||
success, result = run_agent_browser(["snapshot", "-i", "--json"], timeout=15, check=False)
|
||||
if success and "data" in result:
|
||||
return result["data"].get("refs", {})
|
||||
return {}
|
||||
|
||||
|
||||
def find_element_by_role(elements, role_pattern, name_pattern=None):
|
||||
"""Find element by role pattern, optionally with name pattern"""
|
||||
for ref, info in elements.items():
|
||||
role = info.get("role", "")
|
||||
name = info.get("name", "")
|
||||
if re.match(role_pattern, role, re.IGNORECASE):
|
||||
if name_pattern is None or re.search(name_pattern, name, re.IGNORECASE):
|
||||
return ref
|
||||
return None
|
||||
|
||||
|
||||
def find_element_by_text(elements, text_pattern, role_pattern=None):
|
||||
"""Find element containing text"""
|
||||
for ref, info in elements.items():
|
||||
role = info.get("role", "")
|
||||
name = info.get("name", "")
|
||||
if re.search(text_pattern, name, re.IGNORECASE):
|
||||
if role_pattern is None or re.match(role_pattern, role, re.IGNORECASE):
|
||||
return ref
|
||||
return None
|
||||
|
||||
|
||||
def record_test(name, passed, error=None, details=None):
|
||||
"""Record test result"""
|
||||
test_entry = {
|
||||
"name": name,
|
||||
"passed": passed,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
if error:
|
||||
test_entry["error"] = error
|
||||
if details:
|
||||
test_entry["details"] = details
|
||||
|
||||
test_results["tests"].append(test_entry)
|
||||
|
||||
if passed:
|
||||
test_results["passed"] += 1
|
||||
log(f" ✓ PASSED: {name}")
|
||||
else:
|
||||
test_results["failed"] += 1
|
||||
test_results["errors"].append(f"{name}: {error}")
|
||||
log(f" ✗ FAILED: {name} - {error}")
|
||||
|
||||
|
||||
def test_admin_login():
|
||||
"""Test 1: Admin Login"""
|
||||
log_step(1, "Testing Admin Login")
|
||||
|
||||
# Navigate to admin login
|
||||
success, _ = run_agent_browser(["open", f"{ADMIN_URL}login/?next=/admin/"])
|
||||
if not success:
|
||||
record_test("Admin Login - Navigate", False, "Failed to open login page")
|
||||
return False
|
||||
|
||||
wait_for_page_load()
|
||||
|
||||
# Get interactive elements
|
||||
elements = get_interactive_elements()
|
||||
if not elements:
|
||||
record_test("Admin Login - Get Elements", False, "No interactive elements found")
|
||||
return False
|
||||
|
||||
# Find username field
|
||||
username_ref = find_element_by_role(elements, "textbox", ".*user.*") or \
|
||||
find_element_by_text(elements, "user", "textbox") or \
|
||||
find_element_by_role(elements, "textbox")
|
||||
|
||||
if not username_ref:
|
||||
# Try to find by id
|
||||
for ref, info in elements.items():
|
||||
if "username" in info.get("name", "").lower() or "id_username" in str(info):
|
||||
username_ref = ref
|
||||
break
|
||||
|
||||
if not username_ref:
|
||||
# List all textboxes for debugging
|
||||
textboxes = {k: v for k, v in elements.items() if "textbox" in v.get("role", "").lower()}
|
||||
log(f" Available textboxes: {textboxes}")
|
||||
record_test("Admin Login - Find Username Field", False, "Could not find username field")
|
||||
return False
|
||||
|
||||
# Fill username
|
||||
success, _ = run_agent_browser(["fill", username_ref, USERNAME])
|
||||
if not success:
|
||||
record_test("Admin Login - Fill Username", False, "Failed to fill username")
|
||||
return False
|
||||
|
||||
# Find password field (role=textbox with 密码 in name on Django admin)
|
||||
password_ref = find_element_by_role(elements, "password")
|
||||
if not password_ref:
|
||||
# Try by name - Django admin uses textbox role with "密码:" label
|
||||
for ref, info in elements.items():
|
||||
if "密码" in info.get("name", "") or "password" in info.get("name", "").lower():
|
||||
password_ref = ref
|
||||
break
|
||||
|
||||
if not password_ref:
|
||||
record_test("Admin Login - Find Password Field", False, "Could not find password field")
|
||||
return False
|
||||
|
||||
# Fill password
|
||||
success, _ = run_agent_browser(["fill", password_ref, PASSWORD])
|
||||
if not success:
|
||||
record_test("Admin Login - Fill Password", False, "Failed to fill password")
|
||||
return False
|
||||
|
||||
# Find and click submit button
|
||||
submit_ref = find_element_by_text(elements, "log|signin|submit|登|登录", "button") or \
|
||||
find_element_by_role(elements, "button", ".*")
|
||||
|
||||
if not submit_ref:
|
||||
record_test("Admin Login - Find Submit Button", False, "Could not find submit button")
|
||||
return False
|
||||
|
||||
success, _ = run_agent_browser(["click", submit_ref])
|
||||
if not success:
|
||||
record_test("Admin Login - Click Submit", False, "Failed to click submit")
|
||||
return False
|
||||
|
||||
wait_for_page_load()
|
||||
|
||||
# Check if login was successful (should be at /admin/ now)
|
||||
success, result = run_agent_browser(["get", "url", "--json"], check=False)
|
||||
current_url = ""
|
||||
if success and result.get("data"):
|
||||
current_url = result["data"].get("url", "")
|
||||
|
||||
log(f" Current URL after login: {current_url}")
|
||||
|
||||
if "/admin/" in current_url and "login" not in current_url.lower():
|
||||
record_test("Admin Login", True, details=f"Logged in successfully, at {current_url}")
|
||||
|
||||
# Save auth state for subsequent tests
|
||||
success, _ = run_agent_browser(["state", "save", STATE_FILE], check=False)
|
||||
if success:
|
||||
log(" Auth state saved for subsequent tests")
|
||||
|
||||
return True
|
||||
else:
|
||||
# Check for error message
|
||||
elements = get_interactive_elements()
|
||||
error_text = find_element_by_text(elements, "error|invalid|错误|无效")
|
||||
|
||||
record_test("Admin Login", False, f"Login failed, URL: {current_url}, error_element: {error_text}")
|
||||
return False
|
||||
|
||||
|
||||
def test_session_management():
|
||||
"""Test 2: Session Management"""
|
||||
log_step(2, "Testing Session Management")
|
||||
|
||||
# Load saved auth state if available
|
||||
if os.path.exists(STATE_FILE):
|
||||
run_agent_browser(["state", "load", STATE_FILE], check=False)
|
||||
|
||||
# Navigate to session admin
|
||||
success, _ = run_agent_browser(["open", f"{ADMIN_URL}openclaw/session/"])
|
||||
if not success:
|
||||
record_test("Session Management - Navigate", False, "Failed to navigate to session admin")
|
||||
return False
|
||||
|
||||
wait_for_page_load()
|
||||
|
||||
# Get page title
|
||||
success, result = run_agent_browser(["get", "title", "--json"], check=False)
|
||||
title = ""
|
||||
if success and result.get("data"):
|
||||
title = result["data"].get("title", "")
|
||||
|
||||
log(f" Page title: {title}")
|
||||
|
||||
# Get elements
|
||||
elements = get_interactive_elements()
|
||||
|
||||
# Check for session list - Django admin uses columnheader/cell roles not <table>
|
||||
has_table = find_element_by_role(elements, "table") is not None
|
||||
has_columnheaders = find_element_by_role(elements, "columnheader") is not None
|
||||
has_cells = find_element_by_role(elements, "cell") is not None
|
||||
has_rowheaders = find_element_by_role(elements, "rowheader") is not None
|
||||
|
||||
if has_table or has_columnheaders or has_cells:
|
||||
record_test("Session Management - List View", True,
|
||||
details=f"Table:{has_table}, Headers:{has_columnheaders}, Cells:{has_cells}, RowHeaders:{has_rowheaders}")
|
||||
else:
|
||||
record_test("Session Management - List View", False, "No table/list found on session list page")
|
||||
return False
|
||||
|
||||
# Test search functionality
|
||||
search_ref = find_element_by_role(elements, "searchbox") or \
|
||||
find_element_by_text(elements, "search", "textbox") or \
|
||||
find_element_by_role(elements, "textbox")
|
||||
|
||||
if search_ref:
|
||||
success, _ = run_agent_browser(["fill", search_ref, "test"], check=False)
|
||||
if success:
|
||||
# Try pressing Enter to submit search
|
||||
run_agent_browser(["press", "Enter"], check=False)
|
||||
wait_for_page_load()
|
||||
record_test("Session Management - Search", True)
|
||||
else:
|
||||
record_test("Session Management - Search", False, "Search field found but fill failed")
|
||||
else:
|
||||
record_test("Session Management - Search", False, "No search field found")
|
||||
|
||||
# Test filter functionality
|
||||
elements = get_interactive_elements()
|
||||
filter_ref = find_element_by_role(elements, "combobox") or \
|
||||
find_element_by_text(elements, "filter|筛选", "button") or \
|
||||
find_element_by_text(elements, "all|全部", "button")
|
||||
|
||||
if filter_ref:
|
||||
record_test("Session Management - Filter", True, details="Filter controls found")
|
||||
else:
|
||||
record_test("Session Management - Filter", False, "No filter controls found")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def test_message_management():
|
||||
"""Test 3: Message Management"""
|
||||
log_step(3, "Testing Message Management")
|
||||
|
||||
# Load saved auth state
|
||||
if os.path.exists(STATE_FILE):
|
||||
run_agent_browser(["state", "load", STATE_FILE], check=False)
|
||||
|
||||
# Navigate to message admin
|
||||
success, _ = run_agent_browser(["open", f"{ADMIN_URL}openclaw/message/"])
|
||||
if not success:
|
||||
record_test("Message Management - Navigate", False, "Failed to navigate to message admin")
|
||||
return False
|
||||
|
||||
wait_for_page_load()
|
||||
|
||||
# Get page title
|
||||
success, result = run_agent_browser(["get", "title", "--json"], check=False)
|
||||
title = ""
|
||||
if success and result.get("data"):
|
||||
title = result["data"].get("title", "")
|
||||
|
||||
log(f" Page title: {title}")
|
||||
|
||||
# Get elements
|
||||
elements = get_interactive_elements()
|
||||
|
||||
# Check for message list - Django admin uses columnheader/cell roles
|
||||
has_table = find_element_by_role(elements, "table") is not None
|
||||
has_columnheaders = find_element_by_role(elements, "columnheader") is not None
|
||||
has_cells = find_element_by_role(elements, "cell") is not None
|
||||
has_rowheaders = find_element_by_role(elements, "rowheader") is not None
|
||||
|
||||
if has_table or has_columnheaders or has_cells:
|
||||
record_test("Message Management - List View", True,
|
||||
details=f"Table:{has_table}, Headers:{has_columnheaders}, Cells:{has_cells}, RowHeaders:{has_rowheaders}")
|
||||
else:
|
||||
record_test("Message Management - List View", False, "No list found on message list page")
|
||||
return False
|
||||
|
||||
# Test search functionality
|
||||
search_ref = find_element_by_role(elements, "searchbox") or \
|
||||
find_element_by_text(elements, "search", "textbox") or \
|
||||
find_element_by_role(elements, "textbox")
|
||||
|
||||
if search_ref:
|
||||
success, _ = run_agent_browser(["fill", search_ref, "hello"], check=False)
|
||||
if success:
|
||||
run_agent_browser(["press", "Enter"], check=False)
|
||||
wait_for_page_load()
|
||||
record_test("Message Management - Search", True)
|
||||
else:
|
||||
record_test("Message Management - Search", False, "Search fill failed")
|
||||
else:
|
||||
record_test("Message Management - Search", False, "No search field found")
|
||||
|
||||
# Test filter functionality
|
||||
elements = get_interactive_elements()
|
||||
filter_ref = find_element_by_role(elements, "combobox") or \
|
||||
find_element_by_text(elements, "filter|筛选", "button")
|
||||
|
||||
if filter_ref:
|
||||
record_test("Message Management - Filter", True)
|
||||
else:
|
||||
record_test("Message Management - Filter", False, "No filter controls found")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def test_toolcall_management():
|
||||
"""Test 4: ToolCall Management"""
|
||||
log_step(4, "Testing ToolCall Management")
|
||||
|
||||
# Load saved auth state
|
||||
if os.path.exists(STATE_FILE):
|
||||
run_agent_browser(["state", "load", STATE_FILE], check=False)
|
||||
|
||||
# Navigate to toolcall admin
|
||||
success, _ = run_agent_browser(["open", f"{ADMIN_URL}openclaw/toolcall/"])
|
||||
if not success:
|
||||
record_test("ToolCall Management - Navigate", False, "Failed to navigate to toolcall admin")
|
||||
return False
|
||||
|
||||
wait_for_page_load()
|
||||
|
||||
# Get page title
|
||||
success, result = run_agent_browser(["get", "title", "--json"], check=False)
|
||||
title = ""
|
||||
if success and result.get("data"):
|
||||
title = result["data"].get("title", "")
|
||||
|
||||
log(f" Page title: {title}")
|
||||
|
||||
# Get elements
|
||||
elements = get_interactive_elements()
|
||||
|
||||
# Check for toolcall list - Django admin uses columnheader/cell roles
|
||||
has_table = find_element_by_role(elements, "table") is not None
|
||||
has_columnheaders = find_element_by_role(elements, "columnheader") is not None
|
||||
has_cells = find_element_by_role(elements, "cell") is not None
|
||||
has_rowheaders = find_element_by_role(elements, "rowheader") is not None
|
||||
|
||||
if has_table or has_columnheaders or has_cells:
|
||||
record_test("ToolCall Management - List View", True,
|
||||
details=f"Table:{has_table}, Headers:{has_columnheaders}, Cells:{has_cells}, RowHeaders:{has_rowheaders}")
|
||||
else:
|
||||
record_test("ToolCall Management - List View", False, "No list found on toolcall list page")
|
||||
return False
|
||||
|
||||
# Test filter functionality
|
||||
filter_ref = find_element_by_role(elements, "combobox") or \
|
||||
find_element_by_text(elements, "filter|筛选", "button")
|
||||
|
||||
if filter_ref:
|
||||
record_test("ToolCall Management - Filter", True)
|
||||
else:
|
||||
record_test("ToolCall Management - Filter", False, "No filter controls found")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def test_daily_reports_list():
|
||||
"""Test 5: Daily Reports List"""
|
||||
log_step(5, "Testing Daily Reports List")
|
||||
|
||||
# Load saved auth state
|
||||
if os.path.exists(STATE_FILE):
|
||||
run_agent_browser(["state", "load", STATE_FILE], check=False)
|
||||
|
||||
# Navigate to daily reports
|
||||
success, _ = run_agent_browser(["open", f"{ADMIN_URL}daily-reports/"])
|
||||
if not success:
|
||||
record_test("Daily Reports - Navigate", False, "Failed to navigate to daily reports")
|
||||
return False
|
||||
|
||||
wait_for_page_load()
|
||||
|
||||
# Get page title
|
||||
success, result = run_agent_browser(["get", "title", "--json"], check=False)
|
||||
title = ""
|
||||
if success and result.get("data"):
|
||||
title = result["data"].get("title", "")
|
||||
|
||||
log(f" Page title: {title}")
|
||||
|
||||
# Get elements
|
||||
elements = get_interactive_elements()
|
||||
|
||||
# Check for reports list
|
||||
has_table = find_element_by_role(elements, "table") is not None
|
||||
has_columnheaders = find_element_by_role(elements, "columnheader") is not None
|
||||
has_cells = find_element_by_role(elements, "cell") is not None
|
||||
has_rowheaders = find_element_by_role(elements, "rowheader") is not None
|
||||
has_links = find_element_by_text(elements, "xingjiang|report|daily", "link") is not None
|
||||
|
||||
if has_table or has_columnheaders or has_cells or has_rowheaders or has_links:
|
||||
record_test("Daily Reports - List View", True,
|
||||
details=f"Table:{has_table}, Headers:{has_columnheaders}, Cells:{has_cells}, Links:{has_links}")
|
||||
else:
|
||||
record_test("Daily Reports - List View", False, "No reports found on page")
|
||||
return False
|
||||
|
||||
# Try to find report links (for xingjiang or date-based links)
|
||||
# The daily reports page may show dates or agent names as links
|
||||
report_link = find_element_by_text(elements, "xingjiang|2026|report", "link")
|
||||
|
||||
if report_link:
|
||||
record_test("Daily Reports - Report Links Present", True, details=f"Found report link: {report_link}")
|
||||
else:
|
||||
# Also check for any date/agent links
|
||||
any_link = find_element_by_role(elements, "link")
|
||||
if any_link:
|
||||
# Found some links, the page is working
|
||||
record_test("Daily Reports - Report Links Present", True, details=f"Page has links (link ref: {any_link})")
|
||||
else:
|
||||
record_test("Daily Reports - Report Links Present", False, "No links found on page")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def test_daily_reports_detail():
|
||||
"""Test 6: Daily Reports Detail"""
|
||||
log_step(6, "Testing Daily Reports Detail")
|
||||
|
||||
# Load saved auth state
|
||||
if os.path.exists(STATE_FILE):
|
||||
run_agent_browser(["state", "load", STATE_FILE], check=False)
|
||||
|
||||
# Navigate directly to a specific daily report
|
||||
detail_url = f"{ADMIN_URL}daily-reports/xingjiang/2026-4-8/"
|
||||
success, _ = run_agent_browser(["open", detail_url])
|
||||
if not success:
|
||||
record_test("Daily Reports Detail - Navigate", False, "Failed to navigate to daily report detail")
|
||||
return False
|
||||
|
||||
wait_for_page_load()
|
||||
|
||||
# Get page title
|
||||
success, result = run_agent_browser(["get", "title", "--json"], check=False)
|
||||
title = ""
|
||||
if success and result.get("data"):
|
||||
title = result["data"].get("title", "")
|
||||
|
||||
log(f" Page title: {title}")
|
||||
|
||||
# Check URL
|
||||
success, result = run_agent_browser(["get", "url", "--json"], check=False)
|
||||
current_url = ""
|
||||
if success and result.get("data"):
|
||||
current_url = result["data"].get("url", "")
|
||||
|
||||
log(f" Current URL: {current_url}")
|
||||
|
||||
if "daily-reports" in current_url and "xingjiang" in current_url:
|
||||
record_test("Daily Reports Detail - URL Valid", True, details=f"URL: {current_url}")
|
||||
else:
|
||||
record_test("Daily Reports Detail - URL Valid", False, f"Unexpected URL: {current_url}")
|
||||
return False
|
||||
|
||||
# Get elements
|
||||
elements = get_interactive_elements()
|
||||
|
||||
# Check for content
|
||||
has_heading = find_element_by_role(elements, "heading") is not None
|
||||
has_content = len(elements) > 5 # More than just navigation
|
||||
|
||||
if has_heading or has_content:
|
||||
record_test("Daily Reports Detail - Content Present", True)
|
||||
else:
|
||||
record_test("Daily Reports Detail - Content Present", False, "No content found on detail page")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def test_bulk_upsert_api():
|
||||
"""Test 7: Bulk Upsert API"""
|
||||
log_step(7, "Testing Bulk Upsert API")
|
||||
|
||||
# Load saved auth state for CSRF token
|
||||
if os.path.exists(STATE_FILE):
|
||||
run_agent_browser(["state", "load", STATE_FILE], check=False)
|
||||
|
||||
# First, get a CSRF token by visiting the API or any admin page
|
||||
success, _ = run_agent_browser(["open", BASE_URL], check=False)
|
||||
wait_for_page_load()
|
||||
|
||||
# Get cookies for CSRF
|
||||
success, result = run_agent_browser(["cookies"], check=False)
|
||||
csrf_token = ""
|
||||
cookies = ""
|
||||
if success and result.get("data"):
|
||||
cookies = result["data"]
|
||||
for cookie in cookies if isinstance(cookies, list) else []:
|
||||
if cookie.get("name") == "csrftoken":
|
||||
csrf_token = cookie.get("value", "")
|
||||
break
|
||||
|
||||
log(f" CSRF token obtained: {'yes' if csrf_token else 'no (will try without)'}")
|
||||
|
||||
# Test the API with curl (more reliable for API testing)
|
||||
# Use unique session_id to avoid unique constraint conflicts
|
||||
unique_session_id = f"test-e2e-{uuid.uuid4().hex[:12]}"
|
||||
test_payload = {
|
||||
"agent_name": "xingjiang",
|
||||
"source_node": "telegram",
|
||||
"sessions": [
|
||||
{
|
||||
"session_id": unique_session_id,
|
||||
"start_time": "2026-04-08T12:00:00Z",
|
||||
"end_time": "2026-04-08T13:00:00Z",
|
||||
"message_count": 10,
|
||||
"tool_call_count": 5,
|
||||
"total_tokens": 1000,
|
||||
"total_cost": 0.05
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
# Build curl command
|
||||
curl_cmd = [
|
||||
"curl", "-s", "-X", "POST",
|
||||
f"{BASE_URL}/api/sessions/bulk_upsert/",
|
||||
"-H", "Content-Type: application/json",
|
||||
"-d", json.dumps(test_payload)
|
||||
]
|
||||
|
||||
if csrf_token:
|
||||
curl_cmd.extend(["-H", f"X-CSRFToken: {csrf_token}"])
|
||||
|
||||
log(f" Testing API: POST /api/sessions/bulk_upsert/")
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
curl_cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=15
|
||||
)
|
||||
|
||||
response = result.stdout
|
||||
log(f" API Response: {response[:500]}")
|
||||
|
||||
# Try to parse response
|
||||
try:
|
||||
resp_data = json.loads(response)
|
||||
if resp_data.get("success") or "created" in str(resp_data).lower() or "upserted" in str(resp_data).lower():
|
||||
record_test("Bulk Upsert API - Create Session", True, details=f"Response: {response[:200]}")
|
||||
elif resp_data.get("error"):
|
||||
# Some errors are expected (like missing fields)
|
||||
if "missing" in resp_data.get("error", "").lower():
|
||||
record_test("Bulk Upsert API - Validation", True, details=f"Expected validation: {resp_data.get('error')}")
|
||||
else:
|
||||
record_test("Bulk Upsert API - Response", True, details=f"Response: {response[:200]}")
|
||||
else:
|
||||
record_test("Bulk Upsert API - Response", True, details=f"Response: {response[:200]}")
|
||||
except json.JSONDecodeError:
|
||||
if result.returncode == 0:
|
||||
record_test("Bulk Upsert API - Endpoint Accessible", True, details=f"Status: {result.returncode}")
|
||||
else:
|
||||
record_test("Bulk Upsert API - Response", False, f"Non-JSON response: {response[:200]}")
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
record_test("Bulk Upsert API - Timeout", False, "API request timed out")
|
||||
except Exception as e:
|
||||
record_test("Bulk Upsert API - Error", False, str(e))
|
||||
|
||||
# Also test the API via browser to ensure it works with session auth
|
||||
success, _ = run_agent_browser(["open", f"{BASE_URL}/api/sessions/bulk_upsert/"], check=False)
|
||||
wait_for_page_load()
|
||||
|
||||
success, result = run_agent_browser(["get", "url", "--json"], check=False)
|
||||
api_url = ""
|
||||
if success and result.get("data"):
|
||||
api_url = result["data"].get("url", "")
|
||||
|
||||
if "bulk_upsert" in api_url:
|
||||
record_test("Bulk Upsert API - Browser Access", True, details=f"URL: {api_url}")
|
||||
else:
|
||||
# It's OK if browser can't access API directly (it might return 403/JSON)
|
||||
elements = get_interactive_elements()
|
||||
# Just check we got some response
|
||||
if elements:
|
||||
record_test("Bulk Upsert API - Browser Access", True, details="API endpoint reachable")
|
||||
else:
|
||||
record_test("Bulk Upsert API - Browser Access", False, "Could not verify API via browser")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def cleanup():
|
||||
"""Cleanup browser sessions"""
|
||||
log("Cleaning up browser sessions...")
|
||||
run_agent_browser(["close"], check=False)
|
||||
# Also close all sessions
|
||||
subprocess.run(
|
||||
[AGENT_BROWSER, "close", "--all"],
|
||||
capture_output=True,
|
||||
timeout=10
|
||||
)
|
||||
|
||||
|
||||
def print_report():
|
||||
"""Print final test report"""
|
||||
print("\n" + "="*70)
|
||||
print("E2E TEST REPORT - agent-base Django Project")
|
||||
print("="*70)
|
||||
print(f"Test Started: {test_results['started_at']}")
|
||||
print(f"Test Finished: {datetime.now().isoformat()}")
|
||||
print("-"*70)
|
||||
|
||||
print(f"\nSUMMARY:")
|
||||
print(f" Total Tests: {len(test_results['tests'])}")
|
||||
print(f" Passed: {test_results['passed']}")
|
||||
print(f" Failed: {test_results['failed']}")
|
||||
print(f" Success Rate: {test_results['passed']/len(test_results['tests'])*100:.1f}%")
|
||||
|
||||
if test_results['errors']:
|
||||
print(f"\nERRORS:")
|
||||
for i, err in enumerate(test_results['errors'], 1):
|
||||
print(f" {i}. {err}")
|
||||
|
||||
print("\nDETAILED RESULTS:")
|
||||
for test in test_results['tests']:
|
||||
status = "✓ PASS" if test['passed'] else "✗ FAIL"
|
||||
print(f" [{status}] {test['name']}")
|
||||
if not test['passed'] and test.get('error'):
|
||||
print(f" Error: {test['error']}")
|
||||
if test.get('details'):
|
||||
print(f" Details: {test['details']}")
|
||||
|
||||
print("\n" + "="*70)
|
||||
|
||||
# Exit with appropriate code
|
||||
if test_results['failed'] > 0:
|
||||
sys.exit(1)
|
||||
else:
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def main():
|
||||
"""Main test runner"""
|
||||
print("\n" + "="*70)
|
||||
print("Starting E2E Tests for agent-base Django Project")
|
||||
print(f"Target: {BASE_URL}")
|
||||
print(f"Session: {SESSION_NAME}")
|
||||
print("="*70 + "\n")
|
||||
|
||||
try:
|
||||
# Clean up any existing sessions first
|
||||
cleanup()
|
||||
time.sleep(1)
|
||||
|
||||
# Run all tests
|
||||
test_admin_login()
|
||||
test_session_management()
|
||||
test_message_management()
|
||||
test_toolcall_management()
|
||||
test_daily_reports_list()
|
||||
test_daily_reports_detail()
|
||||
test_bulk_upsert_api()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
log("Tests interrupted by user")
|
||||
except Exception as e:
|
||||
log(f"Unexpected error: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
cleanup()
|
||||
|
||||
# Print final report
|
||||
print_report()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -11,8 +11,10 @@ INSTALLED_APPS = [
|
||||
"django.contrib.sessions",
|
||||
"django.contrib.messages",
|
||||
"django.contrib.staticfiles",
|
||||
"django.contrib.humanize",
|
||||
"rest_framework",
|
||||
"openclaw",
|
||||
"openclaw_daily",
|
||||
]
|
||||
|
||||
MIDDLEWARE = [
|
||||
|
||||
137
src/openclaw/admin.py
Normal file → Executable file
137
src/openclaw/admin.py
Normal file → Executable file
@@ -1,12 +1,8 @@
|
||||
from datetime import date
|
||||
|
||||
from django.contrib import admin
|
||||
from django.db.models import Prefetch
|
||||
from django.http import HttpResponse
|
||||
from django.template.response import TemplateResponse
|
||||
|
||||
from openclaw.export import export_daily_markdown
|
||||
from openclaw.models import Session, Message, ToolCall
|
||||
from openclaw.models import Session, Message, ToolCall, CronJob, CronJobRun
|
||||
|
||||
|
||||
class MessageInline(admin.TabularInline):
|
||||
@@ -35,73 +31,6 @@ class ToolCallInline(admin.TabularInline):
|
||||
return False
|
||||
|
||||
|
||||
def daily_conversation_view(request):
|
||||
"""Admin standalone view for date-range conversation browsing."""
|
||||
start_str = request.GET.get("start")
|
||||
end_str = request.GET.get("end")
|
||||
agent_filter = request.GET.get("agent", "")
|
||||
|
||||
start_date = start_str if start_str else date.today().isoformat()
|
||||
end_date = end_str if end_str else date.today().isoformat()
|
||||
|
||||
agents = list(
|
||||
Session.objects.values_list("agent_name", flat=True)
|
||||
.distinct()
|
||||
.order_by("agent_name")
|
||||
)
|
||||
|
||||
sessions_qs = Session.objects.filter(
|
||||
start_time__date__gte=start_date,
|
||||
start_time__date__lte=end_date,
|
||||
).order_by("start_time")
|
||||
|
||||
if agent_filter:
|
||||
sessions_qs = sessions_qs.filter(agent_name=agent_filter)
|
||||
|
||||
messages_prefetch = Prefetch(
|
||||
"messages",
|
||||
queryset=Message.objects.order_by("seq"),
|
||||
)
|
||||
sessions_qs = sessions_qs.prefetch_related(messages_prefetch)
|
||||
|
||||
role_labels = {
|
||||
"user": "User",
|
||||
"assistant": "Assistant",
|
||||
"toolResult": "Tool Result",
|
||||
}
|
||||
|
||||
session_list = []
|
||||
for session in sessions_qs:
|
||||
messages = []
|
||||
for msg in session.messages.all():
|
||||
messages.append({
|
||||
"timestamp": msg.timestamp,
|
||||
"role": msg.role,
|
||||
"content_text": msg.content_text,
|
||||
"tool_name": msg.tool_name,
|
||||
"get_role_label": role_labels.get(msg.role, msg.role),
|
||||
})
|
||||
session_list.append({
|
||||
"session_id": session.session_id,
|
||||
"agent_name": session.agent_name,
|
||||
"model_id": session.model_id,
|
||||
"total_tokens": session.total_tokens,
|
||||
"start_time": session.start_time,
|
||||
"messages": messages,
|
||||
})
|
||||
|
||||
context = {
|
||||
**admin.site.each_context(request),
|
||||
"start_date": start_date,
|
||||
"end_date": end_date,
|
||||
"selected_agent": agent_filter,
|
||||
"agents": agents,
|
||||
"sessions": session_list,
|
||||
"title": "Daily Conversation View",
|
||||
}
|
||||
return TemplateResponse(request, "admin/openclaw/daily_view.html", context)
|
||||
|
||||
|
||||
@admin.action(description="Export selected sessions to Markdown")
|
||||
def export_to_markdown(modeladmin, request, queryset):
|
||||
md, filename = export_daily_markdown(queryset)
|
||||
@@ -114,34 +43,18 @@ def export_to_markdown(modeladmin, request, queryset):
|
||||
class SessionAdmin(admin.ModelAdmin):
|
||||
actions = [export_to_markdown]
|
||||
list_display = (
|
||||
"session_id",
|
||||
"agent_name",
|
||||
"model_id",
|
||||
"total_tokens",
|
||||
"message_count",
|
||||
"start_time",
|
||||
"session_id", "agent_name", "model_id", "total_tokens",
|
||||
"message_count", "tool_call_count", "status", "start_time",
|
||||
)
|
||||
list_filter = ("agent_name", "source_node", "model_id", "start_time")
|
||||
search_fields = ("session_id", "cwd")
|
||||
ordering = ("-start_time",)
|
||||
inlines = [MessageInline, ToolCallInline]
|
||||
readonly_fields = (
|
||||
"session_id",
|
||||
"agent_name",
|
||||
"source_node",
|
||||
"start_time",
|
||||
"end_time",
|
||||
"pushed_at",
|
||||
"session_id", "agent_name", "source_node",
|
||||
"start_time", "end_time", "pushed_at",
|
||||
)
|
||||
|
||||
def get_urls(self):
|
||||
from django.urls import path
|
||||
urls = super().get_urls()
|
||||
custom_urls = [
|
||||
path("daily/", admin.site.admin_view(daily_conversation_view), name="openclaw_daily"),
|
||||
]
|
||||
return custom_urls + urls
|
||||
|
||||
|
||||
@admin.register(Message)
|
||||
class MessageAdmin(admin.ModelAdmin):
|
||||
@@ -160,3 +73,43 @@ class ToolCallAdmin(admin.ModelAdmin):
|
||||
list_display = ("tool_name", "tool_call_id", "session", "is_error", "duration_ms")
|
||||
list_filter = ("tool_name", "is_error", "exit_code")
|
||||
ordering = ("-created_at",)
|
||||
|
||||
|
||||
class CronJobRunInline(admin.TabularInline):
|
||||
model = CronJobRun
|
||||
extra = 0
|
||||
fields = ("run_at", "status", "duration_ms", "delivery_status", "model", "tokens_total")
|
||||
readonly_fields = fields
|
||||
ordering = ("-run_at",)
|
||||
max_num = 10
|
||||
|
||||
def has_add_permission(self, request, obj=None):
|
||||
return False
|
||||
|
||||
def has_delete_permission(self, request, obj=None):
|
||||
return False
|
||||
|
||||
|
||||
@admin.register(CronJob)
|
||||
class CronJobAdmin(admin.ModelAdmin):
|
||||
list_display = (
|
||||
"job_id", "name", "agent_id", "enabled",
|
||||
"schedule_expr", "state_last_run_status", "state_last_run_at",
|
||||
)
|
||||
list_filter = ("enabled", "agent_id", "state_last_status", "state_last_run_status")
|
||||
search_fields = ("job_id", "name")
|
||||
ordering = ("-updated_at",)
|
||||
inlines = [CronJobRunInline]
|
||||
readonly_fields = ("job_id", "created_at", "updated_at")
|
||||
|
||||
|
||||
@admin.register(CronJobRun)
|
||||
class CronJobRunAdmin(admin.ModelAdmin):
|
||||
list_display = (
|
||||
"job", "run_at", "status", "duration_ms",
|
||||
"delivery_status", "model", "tokens_total",
|
||||
)
|
||||
list_filter = ("status", "delivery_status", "run_at", "model")
|
||||
search_fields = ("session_id", "session_key", "summary")
|
||||
ordering = ("-run_at",)
|
||||
readonly_fields = ("job", "run_at", "session_id", "created_at")
|
||||
|
||||
1
src/openclaw/admin_custom_site.py
Normal file
1
src/openclaw/admin_custom_site.py
Normal file
@@ -0,0 +1 @@
|
||||
# Kept for potential future use — currently not used
|
||||
111
src/openclaw/admin_new_views.py
Normal file
111
src/openclaw/admin_new_views.py
Normal file
@@ -0,0 +1,111 @@
|
||||
"""
|
||||
Daily report views for admin.
|
||||
List: (agent, date) combos with message/session counts.
|
||||
Detail: all messages for a specific agent on a specific date.
|
||||
"""
|
||||
|
||||
from collections import defaultdict
|
||||
from datetime import date
|
||||
|
||||
from django.contrib import admin
|
||||
from django.http import Http404
|
||||
from django.template.response import TemplateResponse
|
||||
|
||||
from openclaw.models import Session, Message, ToolCall
|
||||
|
||||
|
||||
def daily_report_list_view(request):
|
||||
start_str = request.GET.get("start")
|
||||
end_str = request.GET.get("end")
|
||||
agent_filter = request.GET.get("agent", "")
|
||||
|
||||
today = date.today()
|
||||
start_date = start_str if start_str else (today.replace(day=1)).isoformat()
|
||||
end_date = end_str if end_str else today.isoformat()
|
||||
|
||||
agents = list(
|
||||
Session.objects.values_list("agent_name", flat=True)
|
||||
.distinct()
|
||||
.order_by("agent_name")
|
||||
)
|
||||
|
||||
messages_qs = Message.objects.filter(
|
||||
timestamp__date__gte=start_date,
|
||||
timestamp__date__lte=end_date,
|
||||
).select_related("session").order_by("timestamp")
|
||||
|
||||
if agent_filter:
|
||||
messages_qs = messages_qs.filter(session__agent_name=agent_filter)
|
||||
|
||||
groups = defaultdict(lambda: {"message_count": 0, "session_ids": set()})
|
||||
for msg in messages_qs:
|
||||
d = msg.timestamp.date()
|
||||
key = (msg.session.agent_name, d)
|
||||
groups[key]["message_count"] += 1
|
||||
groups[key]["session_ids"].add(msg.session.session_id)
|
||||
|
||||
groups_list = sorted(
|
||||
[
|
||||
{
|
||||
"agent_name": key[0],
|
||||
"date_val": key[1],
|
||||
"message_count": data["message_count"],
|
||||
"session_count": len(data["session_ids"]),
|
||||
}
|
||||
for key, data in groups.items()
|
||||
],
|
||||
key=lambda x: (-x["date_val"].toordinal(), x["agent_name"]),
|
||||
)
|
||||
|
||||
context = {
|
||||
**admin.site.each_context(request),
|
||||
"start_date": start_date,
|
||||
"end_date": end_date,
|
||||
"selected_agent": agent_filter,
|
||||
"agents": agents,
|
||||
"date_groups": groups_list,
|
||||
"title": "Daily Report",
|
||||
}
|
||||
return TemplateResponse(request, "admin/openclaw/daily_report_list.html", context)
|
||||
|
||||
|
||||
def daily_report_detail_view(request, agent_name, year, month, day):
|
||||
target_date = date(int(year), int(month), int(day))
|
||||
|
||||
sessions = (
|
||||
Session.objects.filter(agent_name=agent_name)
|
||||
.prefetch_related("messages", "messages__tool_calls")
|
||||
.order_by("start_time")
|
||||
)
|
||||
|
||||
sessions_with_messages = []
|
||||
for session in sessions:
|
||||
day_messages = session.messages.filter(
|
||||
timestamp__date=target_date
|
||||
).order_by("seq")
|
||||
if day_messages.exists():
|
||||
sessions_with_messages.append({
|
||||
"session": session,
|
||||
"messages": list(day_messages),
|
||||
})
|
||||
|
||||
if not sessions_with_messages:
|
||||
raise Http404(f"No messages found for agent '{agent_name}' on {target_date}")
|
||||
|
||||
role_label_map = {
|
||||
"user": "User",
|
||||
"assistant": "Assistant",
|
||||
"tool": "Tool",
|
||||
"toolResult": "Tool Result",
|
||||
"system": "System",
|
||||
}
|
||||
|
||||
context = {
|
||||
**admin.site.each_context(request),
|
||||
"agent_name": agent_name,
|
||||
"target_date": target_date,
|
||||
"sessions": sessions_with_messages,
|
||||
"role_label_map": role_label_map,
|
||||
"title": f"Daily Report: {agent_name} - {target_date}",
|
||||
}
|
||||
return TemplateResponse(request, "admin/openclaw/daily_report_detail.html", context)
|
||||
105
src/openclaw/cron_service.py
Normal file
105
src/openclaw/cron_service.py
Normal file
@@ -0,0 +1,105 @@
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from django.db import transaction
|
||||
|
||||
from openclaw.models import CronJob, CronJobRun
|
||||
|
||||
|
||||
def _ms_to_dt(ms):
|
||||
if not ms:
|
||||
return None
|
||||
return datetime.fromtimestamp(ms / 1000, tz=timezone.utc)
|
||||
|
||||
|
||||
class CronBulkUpsertService:
|
||||
@staticmethod
|
||||
@transaction.atomic
|
||||
def upsert(payload):
|
||||
source_node = payload.get("source_node", "unknown")
|
||||
jobs_data = payload.get("jobs", [])
|
||||
runs_data = payload.get("runs", [])
|
||||
|
||||
jobs_upserted = 0
|
||||
runs_upserted = 0
|
||||
|
||||
# Upsert jobs
|
||||
for job_data in jobs_data:
|
||||
job_id = job_data["id"]
|
||||
sched = job_data.get("schedule", {})
|
||||
pay = job_data.get("payload", {})
|
||||
deliv = job_data.get("delivery", {})
|
||||
state = job_data.get("state", {})
|
||||
defaults = {
|
||||
"agent_id": job_data.get("agentId", ""),
|
||||
"name": job_data.get("name", ""),
|
||||
"session_key": job_data.get("sessionKey", ""),
|
||||
"enabled": job_data.get("enabled", True),
|
||||
"schedule_kind": sched.get("kind", ""),
|
||||
"schedule_expr": sched.get("expr", ""),
|
||||
"schedule_tz": sched.get("tz", ""),
|
||||
"payload_kind": pay.get("kind", ""),
|
||||
"payload_message": pay.get("message", ""),
|
||||
"delivery_mode": deliv.get("mode", ""),
|
||||
"delivery_channel": deliv.get("channel", ""),
|
||||
"delivery_to": deliv.get("to", ""),
|
||||
"state_next_run_at": _ms_to_dt(state.get("nextRunAtMs")),
|
||||
"state_last_run_at": _ms_to_dt(state.get("lastRunAtMs")),
|
||||
"state_last_run_status": state.get("lastRunStatus", ""),
|
||||
"state_last_duration_ms": state.get("lastDurationMs"),
|
||||
"state_last_status": state.get("lastStatus", ""),
|
||||
"state_consecutive_errors": state.get("consecutiveErrors", 0),
|
||||
"created_at_ms": job_data.get("createdAtMs"),
|
||||
"updated_at_ms": job_data.get("updatedAtMs"),
|
||||
"raw_json": job_data,
|
||||
}
|
||||
_, created = CronJob.objects.update_or_create(
|
||||
job_id=job_id,
|
||||
defaults=defaults,
|
||||
)
|
||||
if created:
|
||||
jobs_upserted += 1
|
||||
|
||||
# Build job lookup
|
||||
job_ids = [j["id"] for j in jobs_data]
|
||||
job_lookup = {j.job_id: j for j in CronJob.objects.filter(job_id__in=job_ids)}
|
||||
|
||||
# Upsert runs
|
||||
for run_data in runs_data:
|
||||
job_id = run_data.get("jobId")
|
||||
job = job_lookup.get(job_id)
|
||||
if not job:
|
||||
continue
|
||||
usage = run_data.get("usage", {})
|
||||
defaults = {
|
||||
"ts": run_data.get("ts"),
|
||||
"run_at": _ms_to_dt(run_data.get("runAtMs")),
|
||||
"action": run_data.get("action", ""),
|
||||
"status": run_data.get("status", ""),
|
||||
"error": run_data.get("error", ""),
|
||||
"summary": run_data.get("summary", ""),
|
||||
"delivered": run_data.get("delivered", False),
|
||||
"delivery_status": run_data.get("deliveryStatus", ""),
|
||||
"session_id": run_data.get("sessionId", ""),
|
||||
"session_key": run_data.get("sessionKey", ""),
|
||||
"duration_ms": run_data.get("durationMs"),
|
||||
"next_run_at": _ms_to_dt(run_data.get("nextRunAtMs")),
|
||||
"model": run_data.get("model", ""),
|
||||
"provider": run_data.get("provider", ""),
|
||||
"tokens_input": usage.get("input_tokens", 0),
|
||||
"tokens_output": usage.get("output_tokens", 0),
|
||||
"tokens_total": usage.get("total_tokens", 0),
|
||||
"raw_json": run_data,
|
||||
}
|
||||
_, created = CronJobRun.objects.update_or_create(
|
||||
job=job,
|
||||
run_at=defaults["run_at"],
|
||||
session_id=run_data.get("sessionId", ""),
|
||||
defaults=defaults,
|
||||
)
|
||||
if created:
|
||||
runs_upserted += 1
|
||||
|
||||
return {
|
||||
"jobs_upserted": jobs_upserted,
|
||||
"runs_upserted": runs_upserted,
|
||||
}
|
||||
19
src/openclaw/cron_views.py
Normal file
19
src/openclaw/cron_views.py
Normal file
@@ -0,0 +1,19 @@
|
||||
import json
|
||||
|
||||
from django.http import JsonResponse
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
from django.views.decorators.http import require_http_methods
|
||||
|
||||
from openclaw.cron_service import CronBulkUpsertService
|
||||
|
||||
|
||||
@csrf_exempt
|
||||
@require_http_methods(["POST"])
|
||||
def cron_bulk_upsert(request):
|
||||
try:
|
||||
payload = json.loads(request.body)
|
||||
except json.JSONDecodeError:
|
||||
return JsonResponse({"error": "Invalid JSON"}, status=400)
|
||||
|
||||
result = CronBulkUpsertService.upsert(payload)
|
||||
return JsonResponse({"status": "ok", **result})
|
||||
79
src/openclaw/migrations/0003_cronjob_cronjobrun.py
Normal file
79
src/openclaw/migrations/0003_cronjob_cronjobrun.py
Normal file
@@ -0,0 +1,79 @@
|
||||
import django.db.models.deletion
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('openclaw', '0002_add_hypertables'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='CronJob',
|
||||
fields=[
|
||||
('job_id', models.CharField(max_length=64, primary_key=True, serialize=False)),
|
||||
('agent_id', models.CharField(blank=True, default='', max_length=128)),
|
||||
('name', models.CharField(blank=True, default='', max_length=256)),
|
||||
('session_key', models.CharField(blank=True, default='', max_length=256)),
|
||||
('enabled', models.BooleanField(default=True)),
|
||||
('schedule_kind', models.CharField(blank=True, default='', max_length=32)),
|
||||
('schedule_expr', models.CharField(blank=True, default='', max_length=64)),
|
||||
('schedule_tz', models.CharField(blank=True, default='', max_length=64)),
|
||||
('payload_kind', models.CharField(blank=True, default='', max_length=32)),
|
||||
('payload_message', models.TextField(blank=True, default='')),
|
||||
('delivery_mode', models.CharField(blank=True, default='', max_length=32)),
|
||||
('delivery_channel', models.CharField(blank=True, default='', max_length=32)),
|
||||
('delivery_to', models.CharField(blank=True, default='', max_length=128)),
|
||||
('state_next_run_at', models.DateTimeField(blank=True, null=True)),
|
||||
('state_last_run_at', models.DateTimeField(blank=True, null=True)),
|
||||
('state_last_run_status', models.CharField(blank=True, default='', max_length=16)),
|
||||
('state_last_duration_ms', models.IntegerField(blank=True, null=True)),
|
||||
('state_last_status', models.CharField(blank=True, default='', max_length=16)),
|
||||
('state_consecutive_errors', models.IntegerField(default=0)),
|
||||
('created_at_ms', models.BigIntegerField(blank=True, null=True)),
|
||||
('updated_at_ms', models.BigIntegerField(blank=True, null=True)),
|
||||
('raw_json', models.JSONField(blank=True, default=dict)),
|
||||
('created_at', models.DateTimeField(auto_now_add=True)),
|
||||
('updated_at', models.DateTimeField(auto_now=True)),
|
||||
],
|
||||
options={
|
||||
'db_table': 'cron_jobs',
|
||||
'ordering': ['-updated_at'],
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='CronJobRun',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('ts', models.BigIntegerField(help_text='event timestamp ms')),
|
||||
('run_at', models.DateTimeField(blank=True, null=True)),
|
||||
('action', models.CharField(blank=True, default='', max_length=32)),
|
||||
('status', models.CharField(blank=True, default='', max_length=16)),
|
||||
('error', models.TextField(blank=True, default='')),
|
||||
('summary', models.TextField(blank=True, default='')),
|
||||
('delivered', models.BooleanField(default=False)),
|
||||
('delivery_status', models.CharField(blank=True, default='', max_length=32)),
|
||||
('session_id', models.CharField(blank=True, default='', max_length=64)),
|
||||
('session_key', models.CharField(blank=True, default='', max_length=256)),
|
||||
('duration_ms', models.IntegerField(blank=True, null=True)),
|
||||
('next_run_at', models.DateTimeField(blank=True, null=True)),
|
||||
('model', models.CharField(blank=True, default='', max_length=128)),
|
||||
('provider', models.CharField(blank=True, default='', max_length=64)),
|
||||
('tokens_input', models.IntegerField(default=0)),
|
||||
('tokens_output', models.IntegerField(default=0)),
|
||||
('tokens_total', models.IntegerField(default=0)),
|
||||
('raw_json', models.JSONField(blank=True, default=dict)),
|
||||
('created_at', models.DateTimeField(auto_now_add=True)),
|
||||
('job', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='runs', to='openclaw.cronjob')),
|
||||
],
|
||||
options={
|
||||
'db_table': 'cron_job_runs',
|
||||
'ordering': ['-run_at'],
|
||||
},
|
||||
),
|
||||
migrations.AddConstraint(
|
||||
model_name='cronjobrun',
|
||||
constraint=models.UniqueConstraint(fields=('job', 'run_at', 'session_id'), name='cron_run_unique'),
|
||||
),
|
||||
]
|
||||
@@ -97,3 +97,77 @@ class ToolCall(models.Model):
|
||||
|
||||
def __str__(self):
|
||||
return f"ToolCall({self.tool_name} {self.tool_call_id})"
|
||||
|
||||
|
||||
class CronJob(models.Model):
|
||||
job_id = models.CharField(max_length=64, primary_key=True)
|
||||
agent_id = models.CharField(max_length=128, blank=True, default="")
|
||||
name = models.CharField(max_length=256, blank=True, default="")
|
||||
session_key = models.CharField(max_length=256, blank=True, default="")
|
||||
enabled = models.BooleanField(default=True)
|
||||
schedule_kind = models.CharField(max_length=32, blank=True, default="")
|
||||
schedule_expr = models.CharField(max_length=64, blank=True, default="")
|
||||
schedule_tz = models.CharField(max_length=64, blank=True, default="")
|
||||
payload_kind = models.CharField(max_length=32, blank=True, default="")
|
||||
payload_message = models.TextField(blank=True, default="")
|
||||
delivery_mode = models.CharField(max_length=32, blank=True, default="")
|
||||
delivery_channel = models.CharField(max_length=32, blank=True, default="")
|
||||
delivery_to = models.CharField(max_length=128, blank=True, default="")
|
||||
state_next_run_at = models.DateTimeField(null=True, blank=True)
|
||||
state_last_run_at = models.DateTimeField(null=True, blank=True)
|
||||
state_last_run_status = models.CharField(max_length=16, blank=True, default="")
|
||||
state_last_duration_ms = models.IntegerField(null=True, blank=True)
|
||||
state_last_status = models.CharField(max_length=16, blank=True, default="")
|
||||
state_consecutive_errors = models.IntegerField(default=0)
|
||||
created_at_ms = models.BigIntegerField(null=True, blank=True)
|
||||
updated_at_ms = models.BigIntegerField(null=True, blank=True)
|
||||
raw_json = models.JSONField(default=dict, blank=True)
|
||||
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
updated_at = models.DateTimeField(auto_now=True)
|
||||
|
||||
class Meta:
|
||||
db_table = "cron_jobs"
|
||||
ordering = ["-updated_at"]
|
||||
|
||||
def __str__(self):
|
||||
return f"CronJob({self.job_id} {self.name})"
|
||||
|
||||
|
||||
class CronJobRun(models.Model):
|
||||
job = models.ForeignKey(
|
||||
CronJob, on_delete=models.CASCADE, related_name="runs"
|
||||
)
|
||||
ts = models.BigIntegerField(help_text="event timestamp ms")
|
||||
run_at = models.DateTimeField(null=True, blank=True)
|
||||
action = models.CharField(max_length=32, blank=True, default="")
|
||||
status = models.CharField(max_length=16, blank=True, default="")
|
||||
error = models.TextField(blank=True, default="")
|
||||
summary = models.TextField(blank=True, default="")
|
||||
delivered = models.BooleanField(default=False)
|
||||
delivery_status = models.CharField(max_length=32, blank=True, default="")
|
||||
session_id = models.CharField(max_length=64, blank=True, default="")
|
||||
session_key = models.CharField(max_length=256, blank=True, default="")
|
||||
duration_ms = models.IntegerField(null=True, blank=True)
|
||||
next_run_at = models.DateTimeField(null=True, blank=True)
|
||||
model = models.CharField(max_length=128, blank=True, default="")
|
||||
provider = models.CharField(max_length=64, blank=True, default="")
|
||||
tokens_input = models.IntegerField(default=0)
|
||||
tokens_output = models.IntegerField(default=0)
|
||||
tokens_total = models.IntegerField(default=0)
|
||||
raw_json = models.JSONField(default=dict, blank=True)
|
||||
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
|
||||
class Meta:
|
||||
db_table = "cron_job_runs"
|
||||
ordering = ["-run_at"]
|
||||
constraints = [
|
||||
models.UniqueConstraint(
|
||||
fields=["job", "run_at", "session_id"],
|
||||
name="cron_run_unique",
|
||||
)
|
||||
]
|
||||
|
||||
def __str__(self):
|
||||
return f"CronJobRun({self.job.job_id} {self.run_at})"
|
||||
|
||||
118
src/openclaw/templates/admin/openclaw/daily_report_detail.html
Normal file
118
src/openclaw/templates/admin/openclaw/daily_report_detail.html
Normal file
@@ -0,0 +1,118 @@
|
||||
{% extends "admin/base.html" %}
|
||||
{% load humanize %}
|
||||
|
||||
{% block title %}{{ title }} | OpenClaw Archive{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<div class="p-3">
|
||||
<div class="d-flex justify-content-between align-items-center mb-3">
|
||||
<h1>Daily Report: <span class="text-primary">{{ agent_name }}</span> — <span class="text-success">{{ target_date }}</span></h1>
|
||||
<a href="/admin/daily-reports/?agent={{ agent_name }}&start={{ target_date }}&end={{ target_date }}" class="btn btn-secondary">← Back to List</a>
|
||||
</div>
|
||||
|
||||
{% for item in sessions %}
|
||||
<div class="card mb-4">
|
||||
<div class="card-header bg-dark text-white">
|
||||
<strong>Session:</strong> {{ item.session.session_id }}
|
||||
|
|
||||
<strong>Model:</strong> {{ item.session.model_id|default:"N/A" }}
|
||||
|
|
||||
<strong>Tokens:</strong> {{ item.session.total_tokens|default:0|intcomma }}
|
||||
|
|
||||
<strong>Cost:</strong> ${{ item.session.total_cost|default:0|floatformat:4 }}
|
||||
|
|
||||
<strong>Time:</strong> {{ item.session.start_time|date:"Y-m-d H:i" }} ~ {{ item.session.end_time|date:"H:i"|default:"Ongoing" }}
|
||||
</div>
|
||||
<div class="card-body p-0">
|
||||
<table class="table table-bordered table-sm mb-0" style="font-size:12px; table-layout: fixed;">
|
||||
<colgroup>
|
||||
<col style="width:60px">
|
||||
<col style="width:80px">
|
||||
<col style="width:100px">
|
||||
<col style="width:150px">
|
||||
<col style="width:500px">
|
||||
</colgroup>
|
||||
<thead class="table-light">
|
||||
<tr>
|
||||
<th>Seq</th>
|
||||
<th>Time</th>
|
||||
<th>Role</th>
|
||||
<th>Tool / Meta</th>
|
||||
<th>Content</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{% for msg in item.messages %}
|
||||
<tr style="vertical-align: top;">
|
||||
<td><span class="badge bg-secondary">{{ msg.seq }}</span></td>
|
||||
<td><small>{{ msg.timestamp|date:"H:i:s" }}</small></td>
|
||||
<td>
|
||||
{% if msg.role == "user" %}
|
||||
<span class="badge bg-primary">User</span>
|
||||
{% elif msg.role == "assistant" %}
|
||||
<span class="badge bg-success">Assistant</span>
|
||||
{% elif msg.role == "toolResult" %}
|
||||
<span class="badge bg-warning text-dark">Tool Result</span>
|
||||
{% elif msg.role == "tool" %}
|
||||
<span class="badge bg-info text-dark">Tool</span>
|
||||
{% else %}
|
||||
<span class="badge bg-light text-dark">{{ msg.role }}</span>
|
||||
{% endif %}
|
||||
</td>
|
||||
<td>
|
||||
{% if msg.tool_name %}
|
||||
<div><small><strong>Tool:</strong> <code>{{ msg.tool_name }}</code></small></div>
|
||||
{% endif %}
|
||||
{% if msg.exit_code != None %}
|
||||
<div><small><strong>Exit:</strong> {{ msg.exit_code }}</small></div>
|
||||
{% endif %}
|
||||
{% if msg.duration_ms %}
|
||||
<div><small><strong>Duration:</strong> {{ msg.duration_ms }}ms</small></div>
|
||||
{% endif %}
|
||||
{% if msg.tokens_total > 0 %}
|
||||
<div><small><strong>Tokens:</strong> {{ msg.tokens_total|intcomma }}</small></div>
|
||||
{% endif %}
|
||||
{% if msg.is_error %}
|
||||
<div><span class="badge bg-danger">Error</span></div>
|
||||
{% endif %}
|
||||
</td>
|
||||
<td>
|
||||
{% if msg.content_text %}
|
||||
<pre style="font-size:11px; white-space: pre-wrap; word-break: break-word; margin:0; max-height: 300px; overflow-y: auto; background: #f9f9f9; padding: 4px;">{{ msg.content_text|truncatechars:2000 }}</pre>
|
||||
{% else %}
|
||||
<span class="text-muted"><em>(empty)</em></span>
|
||||
{% endif %}
|
||||
|
||||
{% with msg.tool_calls.all as tc_list %}
|
||||
{% if tc_list %}
|
||||
<div class="mt-2">
|
||||
{% for tc in tc_list %}
|
||||
<div class="border rounded p-1 mb-1 bg-light">
|
||||
<small><strong>TC:</strong> {{ tc.tool_name }} ({{ tc.tool_call_id|truncatechars:20 }})</small>
|
||||
{% if tc.arguments %}
|
||||
<details>
|
||||
<summary><small>Arguments</small></summary>
|
||||
<pre style="font-size:10px; white-space: pre-wrap; word-break: break-word; margin:4px 0 0; background:#fff; border:1px solid #ddd; padding:4px;">{{ tc.arguments|pprint }}</pre>
|
||||
</details>
|
||||
{% endif %}
|
||||
{% if tc.result_text %}
|
||||
<details>
|
||||
<summary><small>Result ({{ tc.result_text|length }} chars)</small></summary>
|
||||
<pre style="font-size:10px; white-space: pre-wrap; word-break: break-word; margin:4px 0 0; background:#fff; border:1px solid #ddd; padding:4px; max-height:200px; overflow-y:auto;">{{ tc.result_text|truncatechars:2000 }}</pre>
|
||||
</details>
|
||||
{% endif %}
|
||||
</div>
|
||||
{% endfor %}
|
||||
</div>
|
||||
{% endif %}
|
||||
{% endwith %}
|
||||
</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</div>
|
||||
{% endblock %}
|
||||
72
src/openclaw/templates/admin/openclaw/daily_report_list.html
Normal file
72
src/openclaw/templates/admin/openclaw/daily_report_list.html
Normal file
@@ -0,0 +1,72 @@
|
||||
{% extends "admin/base.html" %}
|
||||
{% load humanize %}
|
||||
{% load static %}
|
||||
|
||||
{% block title %}{{ title }} | OpenClaw Archive{% endblock %}
|
||||
|
||||
{% block branding %}
|
||||
<h1 id="site-name"><a href="{% url 'admin:index' %}">{{ site_header|default:"Django Admin" }}</a></h1>
|
||||
{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<div class="p-3">
|
||||
<h1>📅 Daily Report</h1>
|
||||
|
||||
<form method="get" class="mb-3">
|
||||
<div class="row">
|
||||
<div class="col-md-3">
|
||||
<label>Agent</label>
|
||||
<select name="agent" class="form-control">
|
||||
<option value="">All Agents</option>
|
||||
{% for agent in agents %}
|
||||
<option value="{{ agent }}" {% if agent == selected_agent %}selected{% endif %}>{{ agent }}</option>
|
||||
{% endfor %}
|
||||
</select>
|
||||
</div>
|
||||
<div class="col-md-3">
|
||||
<label>Start Date</label>
|
||||
<input type="date" name="start" value="{{ start_date }}" class="form-control">
|
||||
</div>
|
||||
<div class="col-md-3">
|
||||
<label>End Date</label>
|
||||
<input type="date" name="end" value="{{ end_date }}" class="form-control">
|
||||
</div>
|
||||
<div class="col-md-3 d-flex align-items-end">
|
||||
<button type="submit" class="btn btn-primary">Filter</button>
|
||||
</div>
|
||||
</div>
|
||||
</form>
|
||||
|
||||
{% if date_groups %}
|
||||
<table class="table table-bordered table-sm" style="font-size:13px">
|
||||
<thead class="table-dark">
|
||||
<tr>
|
||||
<th>Agent</th>
|
||||
<th>Date</th>
|
||||
<th>Sessions</th>
|
||||
<th>Messages</th>
|
||||
<th>Action</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{% for group in date_groups %}
|
||||
<tr>
|
||||
<td><strong>{{ group.agent_name }}</strong></td>
|
||||
<td>{{ group.date_val }}</td>
|
||||
<td>{{ group.session_count }}</td>
|
||||
<td>{{ group.message_count }}</td>
|
||||
<td>
|
||||
<a href="/admin/daily-reports/{{ group.agent_name }}/{{ group.date_val.year }}-{{ group.date_val.month }}-{{ group.date_val.day }}/"
|
||||
class="btn btn-sm btn-info">
|
||||
View Report
|
||||
</a>
|
||||
</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
</table>
|
||||
{% else %}
|
||||
<p class="text-muted">No messages found for the selected date range.</p>
|
||||
{% endif %}
|
||||
</div>
|
||||
{% endblock %}
|
||||
@@ -1,6 +1,8 @@
|
||||
from django.urls import path
|
||||
from openclaw.views import sessions_bulk_upsert
|
||||
from openclaw.cron_views import cron_bulk_upsert
|
||||
|
||||
urlpatterns = [
|
||||
path("sessions/bulk_upsert/", sessions_bulk_upsert, name="sessions_bulk_upsert"),
|
||||
path("cron/bulk_upsert/", cron_bulk_upsert, name="cron_bulk_upsert"),
|
||||
]
|
||||
|
||||
1
src/openclaw_daily/__init__.py
Normal file
1
src/openclaw_daily/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
default_app_config = "openclaw_daily.apps.OpenClawDailyConfig"
|
||||
62
src/openclaw_daily/apps.py
Normal file
62
src/openclaw_daily/apps.py
Normal file
@@ -0,0 +1,62 @@
|
||||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class OpenClawDailyConfig(AppConfig):
|
||||
name = "openclaw_daily"
|
||||
label = "openclaw_daily"
|
||||
verbose_name = "Daily Reports"
|
||||
|
||||
def ready(self):
|
||||
from django.contrib import admin
|
||||
from django.urls import path
|
||||
from openclaw.admin_new_views import (
|
||||
daily_report_list_view,
|
||||
daily_report_detail_view,
|
||||
)
|
||||
|
||||
# ── Monkey-patch admin.site.get_urls ───────────────────────────────
|
||||
_orig_get_urls = admin.site.get_urls
|
||||
|
||||
def _new_get_urls():
|
||||
urls = _orig_get_urls()
|
||||
# Prepend our custom URLs so they take precedence
|
||||
custom = [
|
||||
path(
|
||||
"daily/",
|
||||
admin.site.admin_view(daily_report_list_view),
|
||||
name="openclaw_daily",
|
||||
),
|
||||
path(
|
||||
"daily-reports/",
|
||||
admin.site.admin_view(daily_report_list_view),
|
||||
name="openclaw_daily_reports",
|
||||
),
|
||||
path(
|
||||
"daily-reports/<str:agent_name>/<int:year>-<int:month>-<int:day>/",
|
||||
admin.site.admin_view(daily_report_detail_view),
|
||||
name="openclaw_daily_report_detail",
|
||||
),
|
||||
]
|
||||
return custom + urls
|
||||
|
||||
admin.site.get_urls = _new_get_urls
|
||||
|
||||
# ── Monkey-patch admin.site.get_app_list ──────────────────────────
|
||||
_orig_get_app_list = admin.site.get_app_list
|
||||
|
||||
def _new_get_app_list(request, app_label=None):
|
||||
app_list = _orig_get_app_list(request, app_label)
|
||||
app_list.insert(0, {
|
||||
"name": "Daily Reports",
|
||||
"app_label": "openclaw_daily",
|
||||
"app_url": "/admin/daily-reports/",
|
||||
"models": [{
|
||||
"name": "Daily Reports",
|
||||
"object_name": "DailyReports",
|
||||
"admin_url": "/admin/daily-reports/",
|
||||
"view_only": True,
|
||||
}],
|
||||
})
|
||||
return app_list
|
||||
|
||||
admin.site.get_app_list = _new_get_app_list
|
||||
Reference in New Issue
Block a user