diff --git a/scripts/sync_sessions.py b/scripts/sync_sessions.py index 0df4208..f55e42c 100755 --- a/scripts/sync_sessions.py +++ b/scripts/sync_sessions.py @@ -388,11 +388,19 @@ def sync_cron_jobs(args): new_runs = [] new_state = {} + # Detect remote platform (Linux vs macOS) for stat syntax + uname_result = subprocess.run( + ["ssh", ssh_host, "uname"], + capture_output=True, text=True, timeout=10, + ) + is_macos = uname_result.stdout.strip() == "Darwin" + stat_cmd = f"stat -f %m" if is_macos else f"stat -c %Y" + for run_file in run_files: - remote_full = f"{runs_path}/{run_file}" + remote_full = f"{runs_path}/{Path(run_file).name}" # Get mtime via SSH result = subprocess.run( - ["ssh", ssh_host, f"stat -c %Y {remote_full}"], + ["ssh", ssh_host, f"{stat_cmd} {remote_full}"], capture_output=True, text=True, timeout=10, ) if result.returncode != 0: diff --git a/scripts/test_all_pages.py b/scripts/test_all_pages.py new file mode 100644 index 0000000..290a4e8 --- /dev/null +++ b/scripts/test_all_pages.py @@ -0,0 +1,711 @@ +#!/usr/bin/env python3 +""" +agent-base 全页面遍历测试脚本 +使用 agent-browser 进行浏览器自动化测试 + +功能: + - 登录 Django Admin + - 遍历所有管理页面 + - 检查页面是否正常渲染(无 500/模板错误) + - 截图留存 + - 保存认证状态,可重复使用 + +用法: + python3 test_all_pages.py # 运行全部测试 + python3 test_all_pages.py --login # 强制重新登录(不使用缓存) + python3 test_all_pages.py --screenshot # 仅截图不测试 + python3 test_all_pages.py --page /admin/openclaw/session/ # 测试指定页面 +""" + +import subprocess +import json +import sys +import os +import re +import time +import argparse +import urllib.request +import urllib.parse +import http.cookiejar +from datetime import datetime +from pathlib import Path + +# ========== 配置 ========== +BASE_URL = "http://192.168.3.45:8765" +ADMIN_URL = f"{BASE_URL}/admin/" +USERNAME = "admin" +PASSWORD = "admin123" +SESSION_NAME = "agent-base-full-test" +STATE_FILE = "/tmp/agent-browser-auth.json" +SCREENSHOT_DIR = "/tmp/agent-browser-screenshots" + +# agent-browser 路径(Mac mini 本地) +AGENT_BROWSER = "/opt/homebrew/bin/agent-browser" + +# 项目所有页面列表 +PAGES_TO_TEST = [ + # 页面路径, 描述, 是否需要登录 + ("/admin/login/", "登录页", False), + ("/admin/", "Admin 首页", True), + ("/admin/openclaw/session/", "Session 管理", True), + ("/admin/openclaw/message/", "Message 管理", True), + ("/admin/openclaw/toolcall/", "ToolCall 管理", True), + ("/admin/daily-reports/", "日报列表", True), + ("/admin/daily-reports/xingjiang/2026-4-7/", "日报详情-星匠-4月7日", True), + ("/admin/daily-reports/xingjiang/2026-4-8/", "日报详情-星匠-4月8日", True), + ("/api/sessions/bulk_upsert/", "API Bulk Upsert", True), +] + +# ========== 工具函数 ========== + +def log(msg: str, emoji: str = ""): + ts = datetime.now().strftime("%H:%M:%S") + prefix = f"[{ts}]" + if emoji: + prefix += f" {emoji}" + print(f"{prefix} {msg}", flush=True) + + +def run_browser(args: list, timeout: int = 30, check: bool = True) -> tuple: + """执行 agent-browser 命令,返回 (success, result_dict)""" + cmd = [AGENT_BROWSER, "--session", SESSION_NAME, "--json"] + args + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) + if result.stdout: + try: + return True, json.loads(result.stdout) + except json.JSONDecodeError: + return True, {"raw": result.stdout} + if result.returncode != 0 and check: + log(f" 命令失败 exit={result.returncode}: {' '.join(args)}", "❌") + if result.stderr: + log(f" stderr: {result.stderr[:200]}", "❌") + return False, {} + return result.returncode == 0, {} + except subprocess.TimeoutExpired: + log(f" 命令超时 {timeout}s: {' '.join(args)}", "❌") + return False, {} + except FileNotFoundError: + log(f" agent-browser 未找到: {AGENT_BROWSER}", "❌") + return False, {} + except Exception as e: + log(f" 命令异常: {e}", "❌") + return False, {} + + +def get_snapshot() -> dict: + """获取当前页面的交互元素快照""" + success, result = run_browser(["snapshot", "-i", "--json"], timeout=20, check=False) + if success and result.get("data"): + return result["data"].get("refs", {}) + return {} + + +def get_url() -> str: + """获取当前 URL""" + success, result = run_browser(["get", "url", "--json"], check=False) + if success and result.get("data"): + return result["data"].get("url", "") + return "" + + +def get_title() -> str: + """获取当前页面标题""" + success, result = run_browser(["get", "title", "--json"], check=False) + if success and result.get("data"): + return result["data"].get("title", "") + return "" + + +def find_ref(elements: dict, role_pat: str = None, name_pat: str = None) -> str: + """根据 role 和 name 模式查找元素 ref""" + for ref, info in elements.items(): + role = info.get("role", "") + name = info.get("name", "") + if role_pat and not re.match(role_pat, role, re.I): + continue + if name_pat and not re.search(name_pat, name, re.I): + continue + return ref + return None + + +def find_refs(elements: dict, role_pat: str = None, name_pat: str = None) -> list: + """查找所有匹配的元素 refs""" + results = [] + for ref, info in elements.items(): + role = info.get("role", "") + name = info.get("name", "") + if role_pat and not re.match(role_pat, role, re.I): + continue + if name_pat and not re.search(name_pat, name, re.I): + continue + results.append(ref) + return results + + +def wait_load(seconds: float = 2): + """等待页面稳定""" + time.sleep(seconds) + + +def take_screenshot(name: str) -> str: + """截图并返回保存路径""" + os.makedirs(SCREENSHOT_DIR, exist_ok=True) + path = f"{SCREENSHOT_DIR}/{name}.png" + success, _ = run_browser(["screenshot", path], timeout=15, check=False) + if success: + return path + return "" + + +def close_session(): + """关闭浏览器会话""" + run_browser(["close"], check=False) + + +def cleanup_all(): + """清理所有会话""" + subprocess.run([AGENT_BROWSER, "close", "--all"], + capture_output=True, timeout=10) + + +def check_page_errors() -> list: + """检查页面是否有错误(仅检测真正的 Django error alert)""" + elements = get_snapshot() + errors = [] + + # 只检查 role=alert 或 role=alertdialog(Django 错误提示的标准 role) + for ref, info in elements.items(): + role = info.get("role", "") + name = info.get("name", "") + if role in ("alert", "alertdialog"): + # Django error alerts contain error messages + errors.append(f"[alert] {name[:80]}") + # 也检查 title/heading 中的 Django 典型错误关键字 + if role == "heading": + if any(kw in name.lower() for kw in ["500", "internal server error", "template does not exist"]): + errors.append(f"[heading] {name[:80]}") + + return errors + + +def login() -> bool: + """ + 使用 agent-browser 直接操作 Django Admin 登录页。 + """ + log("正在通过 agent-browser 登录 Django Admin...", "🔐") + + login_url = f"{ADMIN_URL}login/" + + # Step 1: 打开登录页 + success, _ = run_browser(["open", login_url]) + if not success: + log("无法打开登录页", "❌") + return False + wait_load(2) + + # Step 2: 获取元素快照 + elements = get_snapshot() + if not elements: + log("登录页无交互元素", "❌") + return False + + # 如果已经在 admin 首页,说明已登录 + url = get_url() + if "/admin/login" not in url: + log(f"已在登录状态: {url}", "✅") + run_browser(["state", "save", STATE_FILE], check=False) + return True + + # Step 3: 找用户名/密码输入框(Django admin 中文界面) + all_textboxes = {r: i for r, i in elements.items() if "textbox" in i.get("role", "").lower()} + username_ref = None + password_ref = None + + for ref, info in all_textboxes.items(): + name = info.get("name", "") # e.g. "用户名:" 或 "密码:" + if not username_ref and ("用户" in name or "user" in name.lower()): + username_ref = ref + elif not password_ref and ("密码" in name or "pass" in name.lower()): + password_ref = ref + + # fallback: 按顺序取前两个 textbox + tb_refs = list(all_textboxes.keys()) + if not username_ref and len(tb_refs) >= 1: + username_ref = tb_refs[0] + if not password_ref and len(tb_refs) >= 2: + password_ref = tb_refs[1] + + if not username_ref: + log(f"未找到用户名输入框,可用: {all_textboxes}", "❌") + return False + if not password_ref: + log(f"未找到密码输入框,可用: {all_textboxes}", "❌") + return False + + log(f" 找到用户框: {username_ref} | 密码框: {password_ref}", "🔍") + + # Step 4: 找提交按钮(中文 Django admin 按钮文字是"登录") + submit_ref = find_ref(elements, "button", "登录|log|sign|submit") + if not submit_ref: + submit_ref = find_refs(elements, "button")[0] if find_refs(elements, "button") else None + + if not submit_ref: + log("未找到提交按钮", "❌") + return False + + log(f" 找到提交按钮: {submit_ref}", "🔍") + + # Step 5: 填表 + log(f" 填入用户名...", "✏️") + run_browser(["fill", username_ref, USERNAME]) + wait_load(0.5) + + log(f" 填入密码...", "✏️") + run_browser(["fill", password_ref, PASSWORD]) + wait_load(0.5) + + log(" 点击登录...", "✏️") + run_browser(["click", submit_ref]) + wait_load(3) # Django 重定向需要时间 + + # Step 6: 验证 + url = get_url() + if "/admin/" in url and "login" not in url.lower(): + log(f"登录成功,当前页面: {url}", "✅") + run_browser(["state", "save", STATE_FILE], check=False) + log("认证状态已保存", "💾") + return True + else: + log(f"登录后 URL 不对: {url}", "❌") + # 打印页面内容帮助调试 + elements = get_snapshot() + errors = [info.get("name", "") for ref, info in elements.items() + if any(k in info.get("name", "").lower() for k in ["error", "invalid", "incorrect"])] + if errors: + log(f" 页面错误提示: {errors[:3]}", "❌") + return False + + +def ensure_logged_in(force: bool = False) -> bool: + """确保已登录,可选强制重新登录""" + if force or not os.path.exists(STATE_FILE): + return login() + + # 尝试加载缓存状态 + success, _ = run_browser(["state", "load", STATE_FILE], check=False) + if success: + log("已加载缓存的认证状态", "📂") + # 强制打开一个新页面验证(state load 后需要重新 open) + run_browser(["open", ADMIN_URL]) + wait_load(2) + url = get_url() + title = get_title() + if "/admin/" in url and "login" not in url.lower(): + log(f"会话验证成功: {url} | {title[:30]}", "✅") + return True + log(f"缓存已过期 (url={url}),将重新登录", "🔄") + + return login() + + +# ========== 单页测试函数 ========== + +def build_url(path: str) -> str: + """正确拼接页面 URL,避免双重 /admin/ /api/ 前缀""" + path = path.lstrip("/") + # 已包含完整 URL(以 http 开头) + if path.startswith("http"): + return path + # 已经是完整路径(如 admin/login/、api/sessions/) + if path.startswith("admin/") or path.startswith("api/"): + return ADMIN_URL.rstrip("/") + "/" + path + # 纯路径片段 + return ADMIN_URL + path + + +def test_page(path: str, desc: str, requires_login: bool) -> dict: + """测试单个页面,返回测试结果 dict""" + url = build_url(path) + result = { + "path": path, + "desc": desc, + "url": url, + "status": "skip", + "http_code": None, + "errors": [], + "title": "", + "elements_count": 0, + "screenshot": "", + "timestamp": datetime.now().isoformat(), + } + + # 如果需要登录但未登录,跳过 + if requires_login: + if not ensure_logged_in(): + result["status"] = "skip_login_failed" + return result + + log(f"测试: {desc} ({url})", "🌐") + + # 打开页面 + success, _ = run_browser(["open", url]) + if not success: + result["status"] = "nav_failed" + return result + wait_load(2) + + # 截图 + screenshot_name = path.strip("/").replace("/", "_").replace("-", "_") + screenshot_name = f"{datetime.now().strftime('%H%M%S')}_{screenshot_name}" + result["screenshot"] = take_screenshot(screenshot_name) + + # 获取页面信息 + result["url"] = get_url() + result["title"] = get_title() + elements = get_snapshot() + result["elements_count"] = len(elements) + + # 检查 HTTP 错误(通过 URL 重定向判断) + if "login" in result["url"].lower() and path != "/admin/login/": + result["status"] = "redirected_to_login" + result["errors"].append("需要登录但未认证") + return result + + if result["url"].startswith(BASE_URL + "/admin/login"): + result["status"] = "login_page" + return result + + # 检查页面错误 + errors = check_page_errors() + if errors: + result["status"] = "page_error" + result["errors"] = errors + log(f" 页面有错误: {errors[:2]}", "❌") + return result + + # 检查 Django 500 错误 + elements_text = " ".join(v.get("name", "") for v in elements.values()) + if any(kw in elements_text for kw in ["500", "Internal Server Error", "TemplateSyntaxError", "Invalid filter"]): + result["status"] = "server_error" + result["errors"].append("Django 500 or template error detected") + log(f" 检测到服务器错误", "❌") + return result + + # 基本检查 + if result["elements_count"] < 3: + result["status"] = "page_empty" + result["errors"].append("页面元素过少,可能加载失败") + log(f" 页面元素过少: {result['elements_count']}", "⚠️") + return result + + result["status"] = "pass" + log(f" ✅ OK | 元素数: {result['elements_count']} | 标题: {result['title'][:40]}", "✅") + return result + + +def test_admin_index() -> dict: + """测试 Admin 首页""" + result = test_page("openclaw/", "Admin 首页概览", True) + # Admin 首页重定向到 auth/user/,特殊处理 + elements = get_snapshot() + if result["status"] == "pass": + # 检查是否有模型列表 + model_links = find_refs(elements, "link") + result["models_found"] = len(model_links) + if len(model_links) > 0: + log(f" 发现 {len(model_links)} 个模型链接", "📋") + return result + + +def test_session_list() -> dict: + """测试 Session 列表页""" + result = test_page("openclaw/session/", "Session 列表", True) + if result["status"] == "pass": + elements = get_snapshot() + # 检查是否有表格 + has_table = find_ref(elements, "table") is not None + has_search = find_ref(elements, "searchbox") is not None + result["has_table"] = has_table + result["has_search"] = has_search + log(f" 表格: {has_table} | 搜索: {has_search}") + return result + + +def test_message_list() -> dict: + """测试 Message 列表页""" + return test_page("openclaw/message/", "Message 列表", True) + + +def test_toolcall_list() -> dict: + """测试 ToolCall 列表页""" + return test_page("openclaw/toolcall/", "ToolCall 列表", True) + + +def test_daily_reports_list() -> dict: + """测试日报列表页""" + result = test_page("daily-reports/", "日报列表", True) + if result["status"] == "pass": + elements = get_snapshot() + # 检查是否有数据(链接或表格) + links = find_refs(elements, None, "xingjiang|2026|report|daily") + tables = find_refs(elements, "table") + result["links_found"] = len(links) + result["tables_found"] = len(tables) + log(f" 链接数: {len(links)} | 表格数: {len(tables)}") + return result + + +def test_daily_report_detail(agent: str, year: int, month: int, day: int) -> dict: + """测试指定日期的日报详情页""" + path = f"daily-reports/{agent}/{year}-{month}-{day}/" + desc = f"日报详情-{agent}-{year}-{month:02d}-{day:02d}" + result = test_page(path, desc, True) + if result["status"] == "pass": + elements = get_snapshot() + # 检查关键数据 + has_heading = find_ref(elements, "heading") is not None + has_session = bool(find_ref(elements, None, "session")) + has_tokens = bool(find_ref(elements, None, "token")) + result["has_heading"] = has_heading + result["has_session"] = has_session + result["has_tokens"] = has_tokens + log(f" 标题: {has_heading} | Session: {has_session} | Tokens: {has_tokens}") + return result + + +def test_login_page() -> dict: + """测试登录页 - 先强制刷新(清除已登录状态)再打开""" + # 强制重新打开登录页(清除可能已登录的会话状态) + login_url = build_url("/admin/login/") + result = { + "path": "/admin/login/", + "desc": "登录页", + "url": login_url, + "status": "skip", + "http_code": None, + "errors": [], + "title": "", + "elements_count": 0, + "screenshot": "", + "timestamp": datetime.now().isoformat(), + } + + # 直接打开登录页(不清除全局会话,仅此处处理) + # 如果已经登录,Django 会重定向到 /admin/,这种情况记录为"已登录" + success, _ = run_browser(["open", login_url]) + if not success: + result["status"] = "nav_failed" + return result + wait_load(2) + + # 截图 + result["screenshot"] = take_screenshot(f"login_page_{datetime.now().strftime('%H%M%S')}") + result["url"] = get_url() + result["title"] = get_title() + elements = get_snapshot() + result["elements_count"] = len(elements) + + # 如果已经登录(URL 跳转到 admin index),记录为 pass 但注明"已登录" + if "/admin/login" not in result["url"] and "/admin/" in result["url"]: + result["status"] = "pass" + result["errors"].append("用户已登录,登录页重定向到 admin index") + log(f" 当前已登录,登录页自动跳转至: {result['url']}", "ℹ️") + return result + + # 正常登录页检查 + has_user = find_ref(elements, "textbox") is not None + has_pass = ( + find_ref(elements, "textbox", "pass") is not None or + len(find_refs(elements, "textbox")) > 1 + ) + has_button = find_ref(elements, "button") is not None + result["has_username_field"] = has_user + result["has_password_field"] = has_pass + result["has_submit_button"] = has_button + + if has_user and has_pass and has_button: + result["status"] = "pass" + log(f" ✅ 登录页正常 | 用户: {has_user} | 密码: {has_pass} | 按钮: {has_button}") + else: + result["status"] = "page_error" + result["errors"].append(f"缺少表单元素: user={has_user}, pass={has_pass}, btn={has_button}") + log(f" ❌ 登录页表单不完整", "❌") + + return result + if result["status"] in ("pass", "login_page"): + result["status"] = "pass" + elements = get_snapshot() + has_user = find_ref(elements, "textbox") is not None + has_pass = find_ref(elements, "password") is not None + has_button = find_ref(elements, "button") is not None + result["has_username_field"] = has_user + result["has_password_field"] = has_pass + result["has_submit_button"] = has_button + log(f" 用户名框: {has_user} | 密码框: {has_pass} | 按钮: {has_button}") + return result + + +# ========== 主测试套件 ========== + +def run_full_test_suite() -> list: + """运行完整测试套件""" + results = [] + + log("=" * 60, "") + log("开始全页面遍历测试", "🚀") + log("=" * 60, "") + + # 1. 登录页测试 + results.append(test_login_page()) + + # 2. Admin 首页 + results.append(test_admin_index()) + + # 3. Session 管理 + results.append(test_session_list()) + + # 4. Message 管理 + results.append(test_message_list()) + + # 5. ToolCall 管理 + results.append(test_toolcall_list()) + + # 6. 日报列表 + results.append(test_daily_reports_list()) + + # 7. 日报详情(多个日期) + results.append(test_daily_report_detail("xingjiang", 2026, 4, 7)) + results.append(test_daily_report_detail("xingjiang", 2026, 4, 8)) + + # 8. API 页面(检查是否返回 JSON 或有响应) + results.append(test_page("/api/sessions/bulk_upsert/", "API Bulk Upsert", True)) + + return results + + +# ========== 报告输出 ========== + +def print_report(results: list): + """打印测试报告""" + passed = sum(1 for r in results if r["status"] == "pass") + failed = len(results) - passed + + print("\n" + "=" * 70) + print("🧪 全页面遍历测试报告 - agent-base") + print("=" * 70) + print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print(f"总计: {len(results)} | ✅ 通过: {passed} | ❌ 失败: {failed} | 成功率: {passed/len(results)*100:.0f}%") + print("-" * 70) + + for r in results: + status_icon = "✅" if r["status"] == "pass" else "❌" + print(f"\n{status_icon} [{r['desc']}]") + print(f" URL: {r['url']}") + if r["status"] != "pass": + print(f" 状态: {r['status']}") + if r["errors"]: + print(f" 错误: {r['errors']}") + else: + print(f" 标题: {r['title'][:50]}") + print(f" 元素数: {r['elements_count']}") + if r["screenshot"]: + print(f" 截图: {r['screenshot']}") + + print("\n" + "=" * 70) + + # 详细失败列表 + if failed > 0: + print("\n❌ 失败页面:") + for r in results: + if r["status"] != "pass": + print(f" - {r['desc']}: {r['status']} | {r['errors']}") + + # 截图路径 + if os.path.exists(SCREENSHOT_DIR): + print(f"\n📸 截图目录: {SCREENSHOT_DIR}") + for f in sorted(os.listdir(SCREENSHOT_DIR)): + if f.endswith(".png"): + full = os.path.join(SCREENSHOT_DIR, f) + size = os.path.getsize(full) / 1024 + print(f" - {f} ({size:.0f} KB)") + + print("=" * 70) + + return failed == 0 + + +def save_report(results: list, path: str): + """保存 JSON 报告""" + with open(path, "w", encoding="utf-8") as f: + json.dump({ + "generated_at": datetime.now().isoformat(), + "base_url": BASE_URL, + "summary": { + "total": len(results), + "passed": sum(1 for r in results if r["status"] == "pass"), + "failed": sum(1 for r in results if r["status"] != "pass"), + }, + "results": results, + }, f, ensure_ascii=False, indent=2) + log(f"报告已保存: {path}", "💾") + + +# ========== CLI 入口 ========== + +def main(): + parser = argparse.ArgumentParser(description="agent-base 全页面遍历测试") + parser.add_argument("--login", action="store_true", help="强制重新登录(不使用缓存)") + parser.add_argument("--screenshot", action="store_true", help="仅截图模式") + parser.add_argument("--page", type=str, help="测试指定页面路径") + parser.add_argument("--report", type=str, default="/tmp/agent_base_test_report.json", help="报告保存路径") + parser.add_argument("--session", type=str, default=SESSION_NAME, help="浏览器会话名") + args = parser.parse_args() + + # No global mutation needed — keep module-level SESSION_NAME + + try: + # 清理旧会话 + cleanup_all() + time.sleep(1) + + if args.page: + # 单页测试 + path = args.page + desc = f"单页测试: {path}" + result = test_page(path, desc, requires_login=True) + print_report([result]) + return + + if args.screenshot: + # 截图模式 + ensure_logged_in(force=args.login) + take_screenshot("manual_screenshot") + log(f"截图已保存: {SCREENSHOT_DIR}") + return + + # 完整测试套件 + results = run_full_test_suite() + save_report(results, args.report) + all_passed = print_report(results) + + # 关闭会话 + close_session() + + sys.exit(0 if all_passed else 1) + + except KeyboardInterrupt: + log("测试被用户中断", "⚠️") + close_session() + sys.exit(130) + except Exception as e: + log(f"测试异常: {e}", "❌") + import traceback + traceback.print_exc() + close_session() + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/scripts/test_e2e_agent_browser.py b/scripts/test_e2e_agent_browser.py new file mode 100755 index 0000000..ddd4d4e --- /dev/null +++ b/scripts/test_e2e_agent_browser.py @@ -0,0 +1,835 @@ +#!/usr/bin/env python3 +""" +End-to-End Test Script for agent-base Django Project +Uses agent-browser CLI for browser automation + +Usage: python3 test_e2e_agent_browser.py + +Environment Variables: + AGENT_BROWSER_EXECUTABLE_PATH: Path to Chrome/Chromium binary + (Defaults to /Applications/Google Chrome.app/Contents/MacOS/Google Chrome on macOS) +""" + +import subprocess +import json +import sys +import time +import os +import re +import uuid +import platform +from datetime import datetime + +# Detect agent-browser path based on OS +def get_agent_browser_path(): + """Get agent-browser executable path based on the system.""" + env_path = os.environ.get("AGENT_BROWSER_PATH") + if env_path and os.path.exists(env_path): + return env_path + + system = platform.system() + if system == "Darwin": + path = "/opt/homebrew/bin/agent-browser" + if os.path.exists(path): + return path + elif system == "Linux": + for path in [ + "/home/shenwei/.npm-global/bin/agent-browser", + "/usr/local/bin/agent-browser", + "/usr/bin/agent-browser", + ]: + if os.path.exists(path): + return path + + for cmd in ["agent-browser"]: + try: + result = subprocess.run(["which", cmd], capture_output=True, text=True, timeout=5) + if result.returncode == 0 and result.stdout.strip(): + return result.stdout.strip() + except: + pass + + return "agent-browser" + +def get_chrome_path(): + """Get Chrome executable path based on the system.""" + if os.environ.get("AGENT_BROWSER_EXECUTABLE_PATH"): + return os.environ["AGENT_BROWSER_EXECUTABLE_PATH"] + + system = platform.system() + if system == "Darwin": + mac_chrome = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" + if os.path.exists(mac_chrome): + return mac_chrome + elif system == "Linux": + for path in [ + "/usr/bin/google-chrome", + "/usr/bin/chromium-browser", + "/usr/bin/chromium", + "/snap/bin/chromium", + os.path.expanduser("~/.local/bin/chromium"), + ]: + if os.path.exists(path): + return path + + return "" + +# Initialize paths +AGENT_BROWSER = get_agent_browser_path() +CHROME_PATH = get_chrome_path() + +if CHROME_PATH: + os.environ["AGENT_BROWSER_EXECUTABLE_PATH"] = CHROME_PATH + +print(f"Using agent-browser: {AGENT_BROWSER}", file=sys.stderr) +if CHROME_PATH: + print(f"Using Chrome: {CHROME_PATH}", file=sys.stderr) +else: + print("Chrome: auto-detect (ensure agent-browser install completed)", file=sys.stderr) + +# Configuration +BASE_URL = "http://192.168.3.45:8765" +ADMIN_URL = f"{BASE_URL}/admin/" +USERNAME = "admin" +PASSWORD = "admin123" +SESSION_NAME = "agent-base-e2e-test" +STATE_FILE = "/tmp/agent-browser-auth-state.json" +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) if os.path.abspath(__file__).startswith('/tmp') else "/home/shenwei/docker/agent-base/scripts" +PROJECT_DIR = "/home/shenwei/docker/agent-base" + +# Test results tracking +test_results = { + "started_at": datetime.now().isoformat(), + "tests": [], + "passed": 0, + "failed": 0, + "errors": [] +} + + +def log(msg): + """Print log message with timestamp""" + ts = datetime.now().strftime("%H:%M:%S") + print(f"[{ts}] {msg}", flush=True) + + +def log_step(step_num, msg): + """Print step with number""" + log(f"[Step {step_num}] {msg}") + + +def run_agent_browser(args, timeout=30, check=True): + """ + Run agent-browser command and return JSON result. + + Args: + args: List of command arguments (e.g., ['open', 'https://example.com']) + timeout: Command timeout in seconds + check: Whether to raise exception on non-zero exit + + Returns: + tuple: (success: bool, result: dict or error message) + """ + cmd = [AGENT_BROWSER, "--session", SESSION_NAME, "--json"] + cmd.extend(args) + + log(f" Running: agent-browser {' '.join(args)}") + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout + ) + + if result.stdout: + try: + return True, json.loads(result.stdout) + except json.JSONDecodeError: + return True, {"raw": result.stdout} + + if result.returncode != 0 and check: + log(f" ERROR: exit code {result.returncode}") + log(f" stderr: {result.stderr[:500]}") + return False, result.stderr + + return result.returncode == 0, {} + + except subprocess.TimeoutExpired: + log(f" TIMEOUT after {timeout}s") + return False, "Command timeout" + except Exception as e: + log(f" EXCEPTION: {e}") + return False, str(e) + + +def wait_for_page_load(timeout=10): + """Wait for page to finish loading""" + time.sleep(2) # Give page time to settle + + +def get_interactive_elements(): + """Get snapshot of interactive elements from current page""" + success, result = run_agent_browser(["snapshot", "-i", "--json"], timeout=15, check=False) + if success and "data" in result: + return result["data"].get("refs", {}) + return {} + + +def find_element_by_role(elements, role_pattern, name_pattern=None): + """Find element by role pattern, optionally with name pattern""" + for ref, info in elements.items(): + role = info.get("role", "") + name = info.get("name", "") + if re.match(role_pattern, role, re.IGNORECASE): + if name_pattern is None or re.search(name_pattern, name, re.IGNORECASE): + return ref + return None + + +def find_element_by_text(elements, text_pattern, role_pattern=None): + """Find element containing text""" + for ref, info in elements.items(): + role = info.get("role", "") + name = info.get("name", "") + if re.search(text_pattern, name, re.IGNORECASE): + if role_pattern is None or re.match(role_pattern, role, re.IGNORECASE): + return ref + return None + + +def record_test(name, passed, error=None, details=None): + """Record test result""" + test_entry = { + "name": name, + "passed": passed, + "timestamp": datetime.now().isoformat() + } + if error: + test_entry["error"] = error + if details: + test_entry["details"] = details + + test_results["tests"].append(test_entry) + + if passed: + test_results["passed"] += 1 + log(f" ✓ PASSED: {name}") + else: + test_results["failed"] += 1 + test_results["errors"].append(f"{name}: {error}") + log(f" ✗ FAILED: {name} - {error}") + + +def test_admin_login(): + """Test 1: Admin Login""" + log_step(1, "Testing Admin Login") + + # Navigate to admin login + success, _ = run_agent_browser(["open", f"{ADMIN_URL}login/?next=/admin/"]) + if not success: + record_test("Admin Login - Navigate", False, "Failed to open login page") + return False + + wait_for_page_load() + + # Get interactive elements + elements = get_interactive_elements() + if not elements: + record_test("Admin Login - Get Elements", False, "No interactive elements found") + return False + + # Find username field + username_ref = find_element_by_role(elements, "textbox", ".*user.*") or \ + find_element_by_text(elements, "user", "textbox") or \ + find_element_by_role(elements, "textbox") + + if not username_ref: + # Try to find by id + for ref, info in elements.items(): + if "username" in info.get("name", "").lower() or "id_username" in str(info): + username_ref = ref + break + + if not username_ref: + # List all textboxes for debugging + textboxes = {k: v for k, v in elements.items() if "textbox" in v.get("role", "").lower()} + log(f" Available textboxes: {textboxes}") + record_test("Admin Login - Find Username Field", False, "Could not find username field") + return False + + # Fill username + success, _ = run_agent_browser(["fill", username_ref, USERNAME]) + if not success: + record_test("Admin Login - Fill Username", False, "Failed to fill username") + return False + + # Find password field (role=textbox with 密码 in name on Django admin) + password_ref = find_element_by_role(elements, "password") + if not password_ref: + # Try by name - Django admin uses textbox role with "密码:" label + for ref, info in elements.items(): + if "密码" in info.get("name", "") or "password" in info.get("name", "").lower(): + password_ref = ref + break + + if not password_ref: + record_test("Admin Login - Find Password Field", False, "Could not find password field") + return False + + # Fill password + success, _ = run_agent_browser(["fill", password_ref, PASSWORD]) + if not success: + record_test("Admin Login - Fill Password", False, "Failed to fill password") + return False + + # Find and click submit button + submit_ref = find_element_by_text(elements, "log|signin|submit|登|登录", "button") or \ + find_element_by_role(elements, "button", ".*") + + if not submit_ref: + record_test("Admin Login - Find Submit Button", False, "Could not find submit button") + return False + + success, _ = run_agent_browser(["click", submit_ref]) + if not success: + record_test("Admin Login - Click Submit", False, "Failed to click submit") + return False + + wait_for_page_load() + + # Check if login was successful (should be at /admin/ now) + success, result = run_agent_browser(["get", "url", "--json"], check=False) + current_url = "" + if success and result.get("data"): + current_url = result["data"].get("url", "") + + log(f" Current URL after login: {current_url}") + + if "/admin/" in current_url and "login" not in current_url.lower(): + record_test("Admin Login", True, details=f"Logged in successfully, at {current_url}") + + # Save auth state for subsequent tests + success, _ = run_agent_browser(["state", "save", STATE_FILE], check=False) + if success: + log(" Auth state saved for subsequent tests") + + return True + else: + # Check for error message + elements = get_interactive_elements() + error_text = find_element_by_text(elements, "error|invalid|错误|无效") + + record_test("Admin Login", False, f"Login failed, URL: {current_url}, error_element: {error_text}") + return False + + +def test_session_management(): + """Test 2: Session Management""" + log_step(2, "Testing Session Management") + + # Load saved auth state if available + if os.path.exists(STATE_FILE): + run_agent_browser(["state", "load", STATE_FILE], check=False) + + # Navigate to session admin + success, _ = run_agent_browser(["open", f"{ADMIN_URL}openclaw/session/"]) + if not success: + record_test("Session Management - Navigate", False, "Failed to navigate to session admin") + return False + + wait_for_page_load() + + # Get page title + success, result = run_agent_browser(["get", "title", "--json"], check=False) + title = "" + if success and result.get("data"): + title = result["data"].get("title", "") + + log(f" Page title: {title}") + + # Get elements + elements = get_interactive_elements() + + # Check for session list - Django admin uses columnheader/cell roles not