Files
agent-base/scripts/test_all_pages.py

712 lines
24 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
agent-base 全页面遍历测试脚本
使用 agent-browser 进行浏览器自动化测试
功能:
- 登录 Django Admin
- 遍历所有管理页面
- 检查页面是否正常渲染(无 500/模板错误)
- 截图留存
- 保存认证状态,可重复使用
用法:
python3 test_all_pages.py # 运行全部测试
python3 test_all_pages.py --login # 强制重新登录(不使用缓存)
python3 test_all_pages.py --screenshot # 仅截图不测试
python3 test_all_pages.py --page /admin/openclaw/session/ # 测试指定页面
"""
import subprocess
import json
import sys
import os
import re
import time
import argparse
import urllib.request
import urllib.parse
import http.cookiejar
from datetime import datetime
from pathlib import Path
# ========== 配置 ==========
BASE_URL = "http://192.168.3.45:8765"
ADMIN_URL = f"{BASE_URL}/admin/"
USERNAME = "admin"
PASSWORD = "admin123"
SESSION_NAME = "agent-base-full-test"
STATE_FILE = "/tmp/agent-browser-auth.json"
SCREENSHOT_DIR = "/tmp/agent-browser-screenshots"
# agent-browser 路径Mac mini 本地)
AGENT_BROWSER = "/opt/homebrew/bin/agent-browser"
# 项目所有页面列表
PAGES_TO_TEST = [
# 页面路径, 描述, 是否需要登录
("/admin/login/", "登录页", False),
("/admin/", "Admin 首页", True),
("/admin/openclaw/session/", "Session 管理", True),
("/admin/openclaw/message/", "Message 管理", True),
("/admin/openclaw/toolcall/", "ToolCall 管理", True),
("/admin/daily-reports/", "日报列表", True),
("/admin/daily-reports/xingjiang/2026-4-7/", "日报详情-星匠-4月7日", True),
("/admin/daily-reports/xingjiang/2026-4-8/", "日报详情-星匠-4月8日", True),
("/api/sessions/bulk_upsert/", "API Bulk Upsert", True),
]
# ========== 工具函数 ==========
def log(msg: str, emoji: str = ""):
ts = datetime.now().strftime("%H:%M:%S")
prefix = f"[{ts}]"
if emoji:
prefix += f" {emoji}"
print(f"{prefix} {msg}", flush=True)
def run_browser(args: list, timeout: int = 30, check: bool = True) -> tuple:
"""执行 agent-browser 命令,返回 (success, result_dict)"""
cmd = [AGENT_BROWSER, "--session", SESSION_NAME, "--json"] + args
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
if result.stdout:
try:
return True, json.loads(result.stdout)
except json.JSONDecodeError:
return True, {"raw": result.stdout}
if result.returncode != 0 and check:
log(f" 命令失败 exit={result.returncode}: {' '.join(args)}", "")
if result.stderr:
log(f" stderr: {result.stderr[:200]}", "")
return False, {}
return result.returncode == 0, {}
except subprocess.TimeoutExpired:
log(f" 命令超时 {timeout}s: {' '.join(args)}", "")
return False, {}
except FileNotFoundError:
log(f" agent-browser 未找到: {AGENT_BROWSER}", "")
return False, {}
except Exception as e:
log(f" 命令异常: {e}", "")
return False, {}
def get_snapshot() -> dict:
"""获取当前页面的交互元素快照"""
success, result = run_browser(["snapshot", "-i", "--json"], timeout=20, check=False)
if success and result.get("data"):
return result["data"].get("refs", {})
return {}
def get_url() -> str:
"""获取当前 URL"""
success, result = run_browser(["get", "url", "--json"], check=False)
if success and result.get("data"):
return result["data"].get("url", "")
return ""
def get_title() -> str:
"""获取当前页面标题"""
success, result = run_browser(["get", "title", "--json"], check=False)
if success and result.get("data"):
return result["data"].get("title", "")
return ""
def find_ref(elements: dict, role_pat: str = None, name_pat: str = None) -> str:
"""根据 role 和 name 模式查找元素 ref"""
for ref, info in elements.items():
role = info.get("role", "")
name = info.get("name", "")
if role_pat and not re.match(role_pat, role, re.I):
continue
if name_pat and not re.search(name_pat, name, re.I):
continue
return ref
return None
def find_refs(elements: dict, role_pat: str = None, name_pat: str = None) -> list:
"""查找所有匹配的元素 refs"""
results = []
for ref, info in elements.items():
role = info.get("role", "")
name = info.get("name", "")
if role_pat and not re.match(role_pat, role, re.I):
continue
if name_pat and not re.search(name_pat, name, re.I):
continue
results.append(ref)
return results
def wait_load(seconds: float = 2):
"""等待页面稳定"""
time.sleep(seconds)
def take_screenshot(name: str) -> str:
"""截图并返回保存路径"""
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
path = f"{SCREENSHOT_DIR}/{name}.png"
success, _ = run_browser(["screenshot", path], timeout=15, check=False)
if success:
return path
return ""
def close_session():
"""关闭浏览器会话"""
run_browser(["close"], check=False)
def cleanup_all():
"""清理所有会话"""
subprocess.run([AGENT_BROWSER, "close", "--all"],
capture_output=True, timeout=10)
def check_page_errors() -> list:
"""检查页面是否有错误(仅检测真正的 Django error alert"""
elements = get_snapshot()
errors = []
# 只检查 role=alert 或 role=alertdialogDjango 错误提示的标准 role
for ref, info in elements.items():
role = info.get("role", "")
name = info.get("name", "")
if role in ("alert", "alertdialog"):
# Django error alerts contain error messages
errors.append(f"[alert] {name[:80]}")
# 也检查 title/heading 中的 Django 典型错误关键字
if role == "heading":
if any(kw in name.lower() for kw in ["500", "internal server error", "template does not exist"]):
errors.append(f"[heading] {name[:80]}")
return errors
def login() -> bool:
"""
使用 agent-browser 直接操作 Django Admin 登录页。
"""
log("正在通过 agent-browser 登录 Django Admin...", "🔐")
login_url = f"{ADMIN_URL}login/"
# Step 1: 打开登录页
success, _ = run_browser(["open", login_url])
if not success:
log("无法打开登录页", "")
return False
wait_load(2)
# Step 2: 获取元素快照
elements = get_snapshot()
if not elements:
log("登录页无交互元素", "")
return False
# 如果已经在 admin 首页,说明已登录
url = get_url()
if "/admin/login" not in url:
log(f"已在登录状态: {url}", "")
run_browser(["state", "save", STATE_FILE], check=False)
return True
# Step 3: 找用户名/密码输入框Django admin 中文界面)
all_textboxes = {r: i for r, i in elements.items() if "textbox" in i.get("role", "").lower()}
username_ref = None
password_ref = None
for ref, info in all_textboxes.items():
name = info.get("name", "") # e.g. "用户名:" 或 "密码:"
if not username_ref and ("用户" in name or "user" in name.lower()):
username_ref = ref
elif not password_ref and ("密码" in name or "pass" in name.lower()):
password_ref = ref
# fallback: 按顺序取前两个 textbox
tb_refs = list(all_textboxes.keys())
if not username_ref and len(tb_refs) >= 1:
username_ref = tb_refs[0]
if not password_ref and len(tb_refs) >= 2:
password_ref = tb_refs[1]
if not username_ref:
log(f"未找到用户名输入框,可用: {all_textboxes}", "")
return False
if not password_ref:
log(f"未找到密码输入框,可用: {all_textboxes}", "")
return False
log(f" 找到用户框: {username_ref} | 密码框: {password_ref}", "🔍")
# Step 4: 找提交按钮(中文 Django admin 按钮文字是"登录"
submit_ref = find_ref(elements, "button", "登录|log|sign|submit")
if not submit_ref:
submit_ref = find_refs(elements, "button")[0] if find_refs(elements, "button") else None
if not submit_ref:
log("未找到提交按钮", "")
return False
log(f" 找到提交按钮: {submit_ref}", "🔍")
# Step 5: 填表
log(f" 填入用户名...", "✏️")
run_browser(["fill", username_ref, USERNAME])
wait_load(0.5)
log(f" 填入密码...", "✏️")
run_browser(["fill", password_ref, PASSWORD])
wait_load(0.5)
log(" 点击登录...", "✏️")
run_browser(["click", submit_ref])
wait_load(3) # Django 重定向需要时间
# Step 6: 验证
url = get_url()
if "/admin/" in url and "login" not in url.lower():
log(f"登录成功,当前页面: {url}", "")
run_browser(["state", "save", STATE_FILE], check=False)
log("认证状态已保存", "💾")
return True
else:
log(f"登录后 URL 不对: {url}", "")
# 打印页面内容帮助调试
elements = get_snapshot()
errors = [info.get("name", "") for ref, info in elements.items()
if any(k in info.get("name", "").lower() for k in ["error", "invalid", "incorrect"])]
if errors:
log(f" 页面错误提示: {errors[:3]}", "")
return False
def ensure_logged_in(force: bool = False) -> bool:
"""确保已登录,可选强制重新登录"""
if force or not os.path.exists(STATE_FILE):
return login()
# 尝试加载缓存状态
success, _ = run_browser(["state", "load", STATE_FILE], check=False)
if success:
log("已加载缓存的认证状态", "📂")
# 强制打开一个新页面验证state load 后需要重新 open
run_browser(["open", ADMIN_URL])
wait_load(2)
url = get_url()
title = get_title()
if "/admin/" in url and "login" not in url.lower():
log(f"会话验证成功: {url} | {title[:30]}", "")
return True
log(f"缓存已过期 (url={url}),将重新登录", "🔄")
return login()
# ========== 单页测试函数 ==========
def build_url(path: str) -> str:
"""正确拼接页面 URL避免双重 /admin/ /api/ 前缀"""
path = path.lstrip("/")
# 已包含完整 URL以 http 开头)
if path.startswith("http"):
return path
# 已经是完整路径(如 admin/login/、api/sessions/
if path.startswith("admin/") or path.startswith("api/"):
return ADMIN_URL.rstrip("/") + "/" + path
# 纯路径片段
return ADMIN_URL + path
def test_page(path: str, desc: str, requires_login: bool) -> dict:
"""测试单个页面,返回测试结果 dict"""
url = build_url(path)
result = {
"path": path,
"desc": desc,
"url": url,
"status": "skip",
"http_code": None,
"errors": [],
"title": "",
"elements_count": 0,
"screenshot": "",
"timestamp": datetime.now().isoformat(),
}
# 如果需要登录但未登录,跳过
if requires_login:
if not ensure_logged_in():
result["status"] = "skip_login_failed"
return result
log(f"测试: {desc} ({url})", "🌐")
# 打开页面
success, _ = run_browser(["open", url])
if not success:
result["status"] = "nav_failed"
return result
wait_load(2)
# 截图
screenshot_name = path.strip("/").replace("/", "_").replace("-", "_")
screenshot_name = f"{datetime.now().strftime('%H%M%S')}_{screenshot_name}"
result["screenshot"] = take_screenshot(screenshot_name)
# 获取页面信息
result["url"] = get_url()
result["title"] = get_title()
elements = get_snapshot()
result["elements_count"] = len(elements)
# 检查 HTTP 错误(通过 URL 重定向判断)
if "login" in result["url"].lower() and path != "/admin/login/":
result["status"] = "redirected_to_login"
result["errors"].append("需要登录但未认证")
return result
if result["url"].startswith(BASE_URL + "/admin/login"):
result["status"] = "login_page"
return result
# 检查页面错误
errors = check_page_errors()
if errors:
result["status"] = "page_error"
result["errors"] = errors
log(f" 页面有错误: {errors[:2]}", "")
return result
# 检查 Django 500 错误
elements_text = " ".join(v.get("name", "") for v in elements.values())
if any(kw in elements_text for kw in ["500", "Internal Server Error", "TemplateSyntaxError", "Invalid filter"]):
result["status"] = "server_error"
result["errors"].append("Django 500 or template error detected")
log(f" 检测到服务器错误", "")
return result
# 基本检查
if result["elements_count"] < 3:
result["status"] = "page_empty"
result["errors"].append("页面元素过少,可能加载失败")
log(f" 页面元素过少: {result['elements_count']}", "⚠️")
return result
result["status"] = "pass"
log(f" ✅ OK | 元素数: {result['elements_count']} | 标题: {result['title'][:40]}", "")
return result
def test_admin_index() -> dict:
"""测试 Admin 首页"""
result = test_page("openclaw/", "Admin 首页概览", True)
# Admin 首页重定向到 auth/user/,特殊处理
elements = get_snapshot()
if result["status"] == "pass":
# 检查是否有模型列表
model_links = find_refs(elements, "link")
result["models_found"] = len(model_links)
if len(model_links) > 0:
log(f" 发现 {len(model_links)} 个模型链接", "📋")
return result
def test_session_list() -> dict:
"""测试 Session 列表页"""
result = test_page("openclaw/session/", "Session 列表", True)
if result["status"] == "pass":
elements = get_snapshot()
# 检查是否有表格
has_table = find_ref(elements, "table") is not None
has_search = find_ref(elements, "searchbox") is not None
result["has_table"] = has_table
result["has_search"] = has_search
log(f" 表格: {has_table} | 搜索: {has_search}")
return result
def test_message_list() -> dict:
"""测试 Message 列表页"""
return test_page("openclaw/message/", "Message 列表", True)
def test_toolcall_list() -> dict:
"""测试 ToolCall 列表页"""
return test_page("openclaw/toolcall/", "ToolCall 列表", True)
def test_daily_reports_list() -> dict:
"""测试日报列表页"""
result = test_page("daily-reports/", "日报列表", True)
if result["status"] == "pass":
elements = get_snapshot()
# 检查是否有数据(链接或表格)
links = find_refs(elements, None, "xingjiang|2026|report|daily")
tables = find_refs(elements, "table")
result["links_found"] = len(links)
result["tables_found"] = len(tables)
log(f" 链接数: {len(links)} | 表格数: {len(tables)}")
return result
def test_daily_report_detail(agent: str, year: int, month: int, day: int) -> dict:
"""测试指定日期的日报详情页"""
path = f"daily-reports/{agent}/{year}-{month}-{day}/"
desc = f"日报详情-{agent}-{year}-{month:02d}-{day:02d}"
result = test_page(path, desc, True)
if result["status"] == "pass":
elements = get_snapshot()
# 检查关键数据
has_heading = find_ref(elements, "heading") is not None
has_session = bool(find_ref(elements, None, "session"))
has_tokens = bool(find_ref(elements, None, "token"))
result["has_heading"] = has_heading
result["has_session"] = has_session
result["has_tokens"] = has_tokens
log(f" 标题: {has_heading} | Session: {has_session} | Tokens: {has_tokens}")
return result
def test_login_page() -> dict:
"""测试登录页 - 先强制刷新(清除已登录状态)再打开"""
# 强制重新打开登录页(清除可能已登录的会话状态)
login_url = build_url("/admin/login/")
result = {
"path": "/admin/login/",
"desc": "登录页",
"url": login_url,
"status": "skip",
"http_code": None,
"errors": [],
"title": "",
"elements_count": 0,
"screenshot": "",
"timestamp": datetime.now().isoformat(),
}
# 直接打开登录页(不清除全局会话,仅此处处理)
# 如果已经登录Django 会重定向到 /admin/,这种情况记录为"已登录"
success, _ = run_browser(["open", login_url])
if not success:
result["status"] = "nav_failed"
return result
wait_load(2)
# 截图
result["screenshot"] = take_screenshot(f"login_page_{datetime.now().strftime('%H%M%S')}")
result["url"] = get_url()
result["title"] = get_title()
elements = get_snapshot()
result["elements_count"] = len(elements)
# 如果已经登录URL 跳转到 admin index记录为 pass 但注明"已登录"
if "/admin/login" not in result["url"] and "/admin/" in result["url"]:
result["status"] = "pass"
result["errors"].append("用户已登录,登录页重定向到 admin index")
log(f" 当前已登录,登录页自动跳转至: {result['url']}", "")
return result
# 正常登录页检查
has_user = find_ref(elements, "textbox") is not None
has_pass = (
find_ref(elements, "textbox", "pass") is not None or
len(find_refs(elements, "textbox")) > 1
)
has_button = find_ref(elements, "button") is not None
result["has_username_field"] = has_user
result["has_password_field"] = has_pass
result["has_submit_button"] = has_button
if has_user and has_pass and has_button:
result["status"] = "pass"
log(f" ✅ 登录页正常 | 用户: {has_user} | 密码: {has_pass} | 按钮: {has_button}")
else:
result["status"] = "page_error"
result["errors"].append(f"缺少表单元素: user={has_user}, pass={has_pass}, btn={has_button}")
log(f" ❌ 登录页表单不完整", "")
return result
if result["status"] in ("pass", "login_page"):
result["status"] = "pass"
elements = get_snapshot()
has_user = find_ref(elements, "textbox") is not None
has_pass = find_ref(elements, "password") is not None
has_button = find_ref(elements, "button") is not None
result["has_username_field"] = has_user
result["has_password_field"] = has_pass
result["has_submit_button"] = has_button
log(f" 用户名框: {has_user} | 密码框: {has_pass} | 按钮: {has_button}")
return result
# ========== 主测试套件 ==========
def run_full_test_suite() -> list:
"""运行完整测试套件"""
results = []
log("=" * 60, "")
log("开始全页面遍历测试", "🚀")
log("=" * 60, "")
# 1. 登录页测试
results.append(test_login_page())
# 2. Admin 首页
results.append(test_admin_index())
# 3. Session 管理
results.append(test_session_list())
# 4. Message 管理
results.append(test_message_list())
# 5. ToolCall 管理
results.append(test_toolcall_list())
# 6. 日报列表
results.append(test_daily_reports_list())
# 7. 日报详情(多个日期)
results.append(test_daily_report_detail("xingjiang", 2026, 4, 7))
results.append(test_daily_report_detail("xingjiang", 2026, 4, 8))
# 8. API 页面(检查是否返回 JSON 或有响应)
results.append(test_page("/api/sessions/bulk_upsert/", "API Bulk Upsert", True))
return results
# ========== 报告输出 ==========
def print_report(results: list):
"""打印测试报告"""
passed = sum(1 for r in results if r["status"] == "pass")
failed = len(results) - passed
print("\n" + "=" * 70)
print("🧪 全页面遍历测试报告 - agent-base")
print("=" * 70)
print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"总计: {len(results)} | ✅ 通过: {passed} | ❌ 失败: {failed} | 成功率: {passed/len(results)*100:.0f}%")
print("-" * 70)
for r in results:
status_icon = "" if r["status"] == "pass" else ""
print(f"\n{status_icon} [{r['desc']}]")
print(f" URL: {r['url']}")
if r["status"] != "pass":
print(f" 状态: {r['status']}")
if r["errors"]:
print(f" 错误: {r['errors']}")
else:
print(f" 标题: {r['title'][:50]}")
print(f" 元素数: {r['elements_count']}")
if r["screenshot"]:
print(f" 截图: {r['screenshot']}")
print("\n" + "=" * 70)
# 详细失败列表
if failed > 0:
print("\n❌ 失败页面:")
for r in results:
if r["status"] != "pass":
print(f" - {r['desc']}: {r['status']} | {r['errors']}")
# 截图路径
if os.path.exists(SCREENSHOT_DIR):
print(f"\n📸 截图目录: {SCREENSHOT_DIR}")
for f in sorted(os.listdir(SCREENSHOT_DIR)):
if f.endswith(".png"):
full = os.path.join(SCREENSHOT_DIR, f)
size = os.path.getsize(full) / 1024
print(f" - {f} ({size:.0f} KB)")
print("=" * 70)
return failed == 0
def save_report(results: list, path: str):
"""保存 JSON 报告"""
with open(path, "w", encoding="utf-8") as f:
json.dump({
"generated_at": datetime.now().isoformat(),
"base_url": BASE_URL,
"summary": {
"total": len(results),
"passed": sum(1 for r in results if r["status"] == "pass"),
"failed": sum(1 for r in results if r["status"] != "pass"),
},
"results": results,
}, f, ensure_ascii=False, indent=2)
log(f"报告已保存: {path}", "💾")
# ========== CLI 入口 ==========
def main():
parser = argparse.ArgumentParser(description="agent-base 全页面遍历测试")
parser.add_argument("--login", action="store_true", help="强制重新登录(不使用缓存)")
parser.add_argument("--screenshot", action="store_true", help="仅截图模式")
parser.add_argument("--page", type=str, help="测试指定页面路径")
parser.add_argument("--report", type=str, default="/tmp/agent_base_test_report.json", help="报告保存路径")
parser.add_argument("--session", type=str, default=SESSION_NAME, help="浏览器会话名")
args = parser.parse_args()
# No global mutation needed — keep module-level SESSION_NAME
try:
# 清理旧会话
cleanup_all()
time.sleep(1)
if args.page:
# 单页测试
path = args.page
desc = f"单页测试: {path}"
result = test_page(path, desc, requires_login=True)
print_report([result])
return
if args.screenshot:
# 截图模式
ensure_logged_in(force=args.login)
take_screenshot("manual_screenshot")
log(f"截图已保存: {SCREENSHOT_DIR}")
return
# 完整测试套件
results = run_full_test_suite()
save_report(results, args.report)
all_passed = print_report(results)
# 关闭会话
close_session()
sys.exit(0 if all_passed else 1)
except KeyboardInterrupt:
log("测试被用户中断", "⚠️")
close_session()
sys.exit(130)
except Exception as e:
log(f"测试异常: {e}", "")
import traceback
traceback.print_exc()
close_session()
sys.exit(1)
if __name__ == "__main__":
main()