#!/usr/bin/env python3 """ agent-base 全页面遍历测试脚本 使用 agent-browser 进行浏览器自动化测试 功能: - 登录 Django Admin - 遍历所有管理页面 - 检查页面是否正常渲染(无 500/模板错误) - 截图留存 - 保存认证状态,可重复使用 用法: python3 test_all_pages.py # 运行全部测试 python3 test_all_pages.py --login # 强制重新登录(不使用缓存) python3 test_all_pages.py --screenshot # 仅截图不测试 python3 test_all_pages.py --page /admin/openclaw/session/ # 测试指定页面 """ import subprocess import json import sys import os import re import time import argparse import urllib.request import urllib.parse import http.cookiejar from datetime import datetime from pathlib import Path # ========== 配置 ========== BASE_URL = "http://192.168.3.45:8765" ADMIN_URL = f"{BASE_URL}/admin/" USERNAME = "admin" PASSWORD = "admin123" SESSION_NAME = "agent-base-full-test" STATE_FILE = "/tmp/agent-browser-auth.json" SCREENSHOT_DIR = "/tmp/agent-browser-screenshots" # agent-browser 路径(Mac mini 本地) AGENT_BROWSER = "/opt/homebrew/bin/agent-browser" # 项目所有页面列表 PAGES_TO_TEST = [ # 页面路径, 描述, 是否需要登录 ("/admin/login/", "登录页", False), ("/admin/", "Admin 首页", True), ("/admin/openclaw/session/", "Session 管理", True), ("/admin/openclaw/message/", "Message 管理", True), ("/admin/openclaw/toolcall/", "ToolCall 管理", True), ("/admin/daily-reports/", "日报列表", True), ("/admin/daily-reports/xingjiang/2026-4-7/", "日报详情-星匠-4月7日", True), ("/admin/daily-reports/xingjiang/2026-4-8/", "日报详情-星匠-4月8日", True), ("/api/sessions/bulk_upsert/", "API Bulk Upsert", True), ] # ========== 工具函数 ========== def log(msg: str, emoji: str = ""): ts = datetime.now().strftime("%H:%M:%S") prefix = f"[{ts}]" if emoji: prefix += f" {emoji}" print(f"{prefix} {msg}", flush=True) def run_browser(args: list, timeout: int = 30, check: bool = True) -> tuple: """执行 agent-browser 命令,返回 (success, result_dict)""" cmd = [AGENT_BROWSER, "--session", SESSION_NAME, "--json"] + args try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) if result.stdout: try: return True, json.loads(result.stdout) except json.JSONDecodeError: return True, {"raw": result.stdout} if result.returncode != 0 and check: log(f" 命令失败 exit={result.returncode}: {' '.join(args)}", "❌") if result.stderr: log(f" stderr: {result.stderr[:200]}", "❌") return False, {} return result.returncode == 0, {} except subprocess.TimeoutExpired: log(f" 命令超时 {timeout}s: {' '.join(args)}", "❌") return False, {} except FileNotFoundError: log(f" agent-browser 未找到: {AGENT_BROWSER}", "❌") return False, {} except Exception as e: log(f" 命令异常: {e}", "❌") return False, {} def get_snapshot() -> dict: """获取当前页面的交互元素快照""" success, result = run_browser(["snapshot", "-i", "--json"], timeout=20, check=False) if success and result.get("data"): return result["data"].get("refs", {}) return {} def get_url() -> str: """获取当前 URL""" success, result = run_browser(["get", "url", "--json"], check=False) if success and result.get("data"): return result["data"].get("url", "") return "" def get_title() -> str: """获取当前页面标题""" success, result = run_browser(["get", "title", "--json"], check=False) if success and result.get("data"): return result["data"].get("title", "") return "" def find_ref(elements: dict, role_pat: str = None, name_pat: str = None) -> str: """根据 role 和 name 模式查找元素 ref""" for ref, info in elements.items(): role = info.get("role", "") name = info.get("name", "") if role_pat and not re.match(role_pat, role, re.I): continue if name_pat and not re.search(name_pat, name, re.I): continue return ref return None def find_refs(elements: dict, role_pat: str = None, name_pat: str = None) -> list: """查找所有匹配的元素 refs""" results = [] for ref, info in elements.items(): role = info.get("role", "") name = info.get("name", "") if role_pat and not re.match(role_pat, role, re.I): continue if name_pat and not re.search(name_pat, name, re.I): continue results.append(ref) return results def wait_load(seconds: float = 2): """等待页面稳定""" time.sleep(seconds) def take_screenshot(name: str) -> str: """截图并返回保存路径""" os.makedirs(SCREENSHOT_DIR, exist_ok=True) path = f"{SCREENSHOT_DIR}/{name}.png" success, _ = run_browser(["screenshot", path], timeout=15, check=False) if success: return path return "" def close_session(): """关闭浏览器会话""" run_browser(["close"], check=False) def cleanup_all(): """清理所有会话""" subprocess.run([AGENT_BROWSER, "close", "--all"], capture_output=True, timeout=10) def check_page_errors() -> list: """检查页面是否有错误(仅检测真正的 Django error alert)""" elements = get_snapshot() errors = [] # 只检查 role=alert 或 role=alertdialog(Django 错误提示的标准 role) for ref, info in elements.items(): role = info.get("role", "") name = info.get("name", "") if role in ("alert", "alertdialog"): # Django error alerts contain error messages errors.append(f"[alert] {name[:80]}") # 也检查 title/heading 中的 Django 典型错误关键字 if role == "heading": if any(kw in name.lower() for kw in ["500", "internal server error", "template does not exist"]): errors.append(f"[heading] {name[:80]}") return errors def login() -> bool: """ 使用 agent-browser 直接操作 Django Admin 登录页。 """ log("正在通过 agent-browser 登录 Django Admin...", "🔐") login_url = f"{ADMIN_URL}login/" # Step 1: 打开登录页 success, _ = run_browser(["open", login_url]) if not success: log("无法打开登录页", "❌") return False wait_load(2) # Step 2: 获取元素快照 elements = get_snapshot() if not elements: log("登录页无交互元素", "❌") return False # 如果已经在 admin 首页,说明已登录 url = get_url() if "/admin/login" not in url: log(f"已在登录状态: {url}", "✅") run_browser(["state", "save", STATE_FILE], check=False) return True # Step 3: 找用户名/密码输入框(Django admin 中文界面) all_textboxes = {r: i for r, i in elements.items() if "textbox" in i.get("role", "").lower()} username_ref = None password_ref = None for ref, info in all_textboxes.items(): name = info.get("name", "") # e.g. "用户名:" 或 "密码:" if not username_ref and ("用户" in name or "user" in name.lower()): username_ref = ref elif not password_ref and ("密码" in name or "pass" in name.lower()): password_ref = ref # fallback: 按顺序取前两个 textbox tb_refs = list(all_textboxes.keys()) if not username_ref and len(tb_refs) >= 1: username_ref = tb_refs[0] if not password_ref and len(tb_refs) >= 2: password_ref = tb_refs[1] if not username_ref: log(f"未找到用户名输入框,可用: {all_textboxes}", "❌") return False if not password_ref: log(f"未找到密码输入框,可用: {all_textboxes}", "❌") return False log(f" 找到用户框: {username_ref} | 密码框: {password_ref}", "🔍") # Step 4: 找提交按钮(中文 Django admin 按钮文字是"登录") submit_ref = find_ref(elements, "button", "登录|log|sign|submit") if not submit_ref: submit_ref = find_refs(elements, "button")[0] if find_refs(elements, "button") else None if not submit_ref: log("未找到提交按钮", "❌") return False log(f" 找到提交按钮: {submit_ref}", "🔍") # Step 5: 填表 log(f" 填入用户名...", "✏️") run_browser(["fill", username_ref, USERNAME]) wait_load(0.5) log(f" 填入密码...", "✏️") run_browser(["fill", password_ref, PASSWORD]) wait_load(0.5) log(" 点击登录...", "✏️") run_browser(["click", submit_ref]) wait_load(3) # Django 重定向需要时间 # Step 6: 验证 url = get_url() if "/admin/" in url and "login" not in url.lower(): log(f"登录成功,当前页面: {url}", "✅") run_browser(["state", "save", STATE_FILE], check=False) log("认证状态已保存", "💾") return True else: log(f"登录后 URL 不对: {url}", "❌") # 打印页面内容帮助调试 elements = get_snapshot() errors = [info.get("name", "") for ref, info in elements.items() if any(k in info.get("name", "").lower() for k in ["error", "invalid", "incorrect"])] if errors: log(f" 页面错误提示: {errors[:3]}", "❌") return False def ensure_logged_in(force: bool = False) -> bool: """确保已登录,可选强制重新登录""" if force or not os.path.exists(STATE_FILE): return login() # 尝试加载缓存状态 success, _ = run_browser(["state", "load", STATE_FILE], check=False) if success: log("已加载缓存的认证状态", "📂") # 强制打开一个新页面验证(state load 后需要重新 open) run_browser(["open", ADMIN_URL]) wait_load(2) url = get_url() title = get_title() if "/admin/" in url and "login" not in url.lower(): log(f"会话验证成功: {url} | {title[:30]}", "✅") return True log(f"缓存已过期 (url={url}),将重新登录", "🔄") return login() # ========== 单页测试函数 ========== def build_url(path: str) -> str: """正确拼接页面 URL,避免双重 /admin/ /api/ 前缀""" path = path.lstrip("/") # 已包含完整 URL(以 http 开头) if path.startswith("http"): return path # 已经是完整路径(如 admin/login/、api/sessions/) if path.startswith("admin/") or path.startswith("api/"): return ADMIN_URL.rstrip("/") + "/" + path # 纯路径片段 return ADMIN_URL + path def test_page(path: str, desc: str, requires_login: bool) -> dict: """测试单个页面,返回测试结果 dict""" url = build_url(path) result = { "path": path, "desc": desc, "url": url, "status": "skip", "http_code": None, "errors": [], "title": "", "elements_count": 0, "screenshot": "", "timestamp": datetime.now().isoformat(), } # 如果需要登录但未登录,跳过 if requires_login: if not ensure_logged_in(): result["status"] = "skip_login_failed" return result log(f"测试: {desc} ({url})", "🌐") # 打开页面 success, _ = run_browser(["open", url]) if not success: result["status"] = "nav_failed" return result wait_load(2) # 截图 screenshot_name = path.strip("/").replace("/", "_").replace("-", "_") screenshot_name = f"{datetime.now().strftime('%H%M%S')}_{screenshot_name}" result["screenshot"] = take_screenshot(screenshot_name) # 获取页面信息 result["url"] = get_url() result["title"] = get_title() elements = get_snapshot() result["elements_count"] = len(elements) # 检查 HTTP 错误(通过 URL 重定向判断) if "login" in result["url"].lower() and path != "/admin/login/": result["status"] = "redirected_to_login" result["errors"].append("需要登录但未认证") return result if result["url"].startswith(BASE_URL + "/admin/login"): result["status"] = "login_page" return result # 检查页面错误 errors = check_page_errors() if errors: result["status"] = "page_error" result["errors"] = errors log(f" 页面有错误: {errors[:2]}", "❌") return result # 检查 Django 500 错误 elements_text = " ".join(v.get("name", "") for v in elements.values()) if any(kw in elements_text for kw in ["500", "Internal Server Error", "TemplateSyntaxError", "Invalid filter"]): result["status"] = "server_error" result["errors"].append("Django 500 or template error detected") log(f" 检测到服务器错误", "❌") return result # 基本检查 if result["elements_count"] < 3: result["status"] = "page_empty" result["errors"].append("页面元素过少,可能加载失败") log(f" 页面元素过少: {result['elements_count']}", "⚠️") return result result["status"] = "pass" log(f" ✅ OK | 元素数: {result['elements_count']} | 标题: {result['title'][:40]}", "✅") return result def test_admin_index() -> dict: """测试 Admin 首页""" result = test_page("openclaw/", "Admin 首页概览", True) # Admin 首页重定向到 auth/user/,特殊处理 elements = get_snapshot() if result["status"] == "pass": # 检查是否有模型列表 model_links = find_refs(elements, "link") result["models_found"] = len(model_links) if len(model_links) > 0: log(f" 发现 {len(model_links)} 个模型链接", "📋") return result def test_session_list() -> dict: """测试 Session 列表页""" result = test_page("openclaw/session/", "Session 列表", True) if result["status"] == "pass": elements = get_snapshot() # 检查是否有表格 has_table = find_ref(elements, "table") is not None has_search = find_ref(elements, "searchbox") is not None result["has_table"] = has_table result["has_search"] = has_search log(f" 表格: {has_table} | 搜索: {has_search}") return result def test_message_list() -> dict: """测试 Message 列表页""" return test_page("openclaw/message/", "Message 列表", True) def test_toolcall_list() -> dict: """测试 ToolCall 列表页""" return test_page("openclaw/toolcall/", "ToolCall 列表", True) def test_daily_reports_list() -> dict: """测试日报列表页""" result = test_page("daily-reports/", "日报列表", True) if result["status"] == "pass": elements = get_snapshot() # 检查是否有数据(链接或表格) links = find_refs(elements, None, "xingjiang|2026|report|daily") tables = find_refs(elements, "table") result["links_found"] = len(links) result["tables_found"] = len(tables) log(f" 链接数: {len(links)} | 表格数: {len(tables)}") return result def test_daily_report_detail(agent: str, year: int, month: int, day: int) -> dict: """测试指定日期的日报详情页""" path = f"daily-reports/{agent}/{year}-{month}-{day}/" desc = f"日报详情-{agent}-{year}-{month:02d}-{day:02d}" result = test_page(path, desc, True) if result["status"] == "pass": elements = get_snapshot() # 检查关键数据 has_heading = find_ref(elements, "heading") is not None has_session = bool(find_ref(elements, None, "session")) has_tokens = bool(find_ref(elements, None, "token")) result["has_heading"] = has_heading result["has_session"] = has_session result["has_tokens"] = has_tokens log(f" 标题: {has_heading} | Session: {has_session} | Tokens: {has_tokens}") return result def test_login_page() -> dict: """测试登录页 - 先强制刷新(清除已登录状态)再打开""" # 强制重新打开登录页(清除可能已登录的会话状态) login_url = build_url("/admin/login/") result = { "path": "/admin/login/", "desc": "登录页", "url": login_url, "status": "skip", "http_code": None, "errors": [], "title": "", "elements_count": 0, "screenshot": "", "timestamp": datetime.now().isoformat(), } # 直接打开登录页(不清除全局会话,仅此处处理) # 如果已经登录,Django 会重定向到 /admin/,这种情况记录为"已登录" success, _ = run_browser(["open", login_url]) if not success: result["status"] = "nav_failed" return result wait_load(2) # 截图 result["screenshot"] = take_screenshot(f"login_page_{datetime.now().strftime('%H%M%S')}") result["url"] = get_url() result["title"] = get_title() elements = get_snapshot() result["elements_count"] = len(elements) # 如果已经登录(URL 跳转到 admin index),记录为 pass 但注明"已登录" if "/admin/login" not in result["url"] and "/admin/" in result["url"]: result["status"] = "pass" result["errors"].append("用户已登录,登录页重定向到 admin index") log(f" 当前已登录,登录页自动跳转至: {result['url']}", "ℹ️") return result # 正常登录页检查 has_user = find_ref(elements, "textbox") is not None has_pass = ( find_ref(elements, "textbox", "pass") is not None or len(find_refs(elements, "textbox")) > 1 ) has_button = find_ref(elements, "button") is not None result["has_username_field"] = has_user result["has_password_field"] = has_pass result["has_submit_button"] = has_button if has_user and has_pass and has_button: result["status"] = "pass" log(f" ✅ 登录页正常 | 用户: {has_user} | 密码: {has_pass} | 按钮: {has_button}") else: result["status"] = "page_error" result["errors"].append(f"缺少表单元素: user={has_user}, pass={has_pass}, btn={has_button}") log(f" ❌ 登录页表单不完整", "❌") return result if result["status"] in ("pass", "login_page"): result["status"] = "pass" elements = get_snapshot() has_user = find_ref(elements, "textbox") is not None has_pass = find_ref(elements, "password") is not None has_button = find_ref(elements, "button") is not None result["has_username_field"] = has_user result["has_password_field"] = has_pass result["has_submit_button"] = has_button log(f" 用户名框: {has_user} | 密码框: {has_pass} | 按钮: {has_button}") return result # ========== 主测试套件 ========== def run_full_test_suite() -> list: """运行完整测试套件""" results = [] log("=" * 60, "") log("开始全页面遍历测试", "🚀") log("=" * 60, "") # 1. 登录页测试 results.append(test_login_page()) # 2. Admin 首页 results.append(test_admin_index()) # 3. Session 管理 results.append(test_session_list()) # 4. Message 管理 results.append(test_message_list()) # 5. ToolCall 管理 results.append(test_toolcall_list()) # 6. 日报列表 results.append(test_daily_reports_list()) # 7. 日报详情(多个日期) results.append(test_daily_report_detail("xingjiang", 2026, 4, 7)) results.append(test_daily_report_detail("xingjiang", 2026, 4, 8)) # 8. API 页面(检查是否返回 JSON 或有响应) results.append(test_page("/api/sessions/bulk_upsert/", "API Bulk Upsert", True)) return results # ========== 报告输出 ========== def print_report(results: list): """打印测试报告""" passed = sum(1 for r in results if r["status"] == "pass") failed = len(results) - passed print("\n" + "=" * 70) print("🧪 全页面遍历测试报告 - agent-base") print("=" * 70) print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"总计: {len(results)} | ✅ 通过: {passed} | ❌ 失败: {failed} | 成功率: {passed/len(results)*100:.0f}%") print("-" * 70) for r in results: status_icon = "✅" if r["status"] == "pass" else "❌" print(f"\n{status_icon} [{r['desc']}]") print(f" URL: {r['url']}") if r["status"] != "pass": print(f" 状态: {r['status']}") if r["errors"]: print(f" 错误: {r['errors']}") else: print(f" 标题: {r['title'][:50]}") print(f" 元素数: {r['elements_count']}") if r["screenshot"]: print(f" 截图: {r['screenshot']}") print("\n" + "=" * 70) # 详细失败列表 if failed > 0: print("\n❌ 失败页面:") for r in results: if r["status"] != "pass": print(f" - {r['desc']}: {r['status']} | {r['errors']}") # 截图路径 if os.path.exists(SCREENSHOT_DIR): print(f"\n📸 截图目录: {SCREENSHOT_DIR}") for f in sorted(os.listdir(SCREENSHOT_DIR)): if f.endswith(".png"): full = os.path.join(SCREENSHOT_DIR, f) size = os.path.getsize(full) / 1024 print(f" - {f} ({size:.0f} KB)") print("=" * 70) return failed == 0 def save_report(results: list, path: str): """保存 JSON 报告""" with open(path, "w", encoding="utf-8") as f: json.dump({ "generated_at": datetime.now().isoformat(), "base_url": BASE_URL, "summary": { "total": len(results), "passed": sum(1 for r in results if r["status"] == "pass"), "failed": sum(1 for r in results if r["status"] != "pass"), }, "results": results, }, f, ensure_ascii=False, indent=2) log(f"报告已保存: {path}", "💾") # ========== CLI 入口 ========== def main(): parser = argparse.ArgumentParser(description="agent-base 全页面遍历测试") parser.add_argument("--login", action="store_true", help="强制重新登录(不使用缓存)") parser.add_argument("--screenshot", action="store_true", help="仅截图模式") parser.add_argument("--page", type=str, help="测试指定页面路径") parser.add_argument("--report", type=str, default="/tmp/agent_base_test_report.json", help="报告保存路径") parser.add_argument("--session", type=str, default=SESSION_NAME, help="浏览器会话名") args = parser.parse_args() # No global mutation needed — keep module-level SESSION_NAME try: # 清理旧会话 cleanup_all() time.sleep(1) if args.page: # 单页测试 path = args.page desc = f"单页测试: {path}" result = test_page(path, desc, requires_login=True) print_report([result]) return if args.screenshot: # 截图模式 ensure_logged_in(force=args.login) take_screenshot("manual_screenshot") log(f"截图已保存: {SCREENSHOT_DIR}") return # 完整测试套件 results = run_full_test_suite() save_report(results, args.report) all_passed = print_report(results) # 关闭会话 close_session() sys.exit(0 if all_passed else 1) except KeyboardInterrupt: log("测试被用户中断", "⚠️") close_session() sys.exit(130) except Exception as e: log(f"测试异常: {e}", "❌") import traceback traceback.print_exc() close_session() sys.exit(1) if __name__ == "__main__": main()