From d141313a24333b1144b6d2532f8b1fd8424b2ad9 Mon Sep 17 00:00:00 2001 From: weishen Date: Sun, 5 Apr 2026 18:07:41 +0800 Subject: [PATCH] docs: add implementation plan Co-Authored-By: Claude Opus 4.6 --- .../2026-04-05-openclaw-session-archive.md | 2552 +++++++++++++++++ 1 file changed, 2552 insertions(+) create mode 100644 docs/superpowers/plans/2026-04-05-openclaw-session-archive.md diff --git a/docs/superpowers/plans/2026-04-05-openclaw-session-archive.md b/docs/superpowers/plans/2026-04-05-openclaw-session-archive.md new file mode 100644 index 0000000..273f160 --- /dev/null +++ b/docs/superpowers/plans/2026-04-05-openclaw-session-archive.md @@ -0,0 +1,2552 @@ +# OpenClaw Session Archive 实施计划 + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 构建一个 Django + DRF 项目,包含数据模型、批量写入 API、自定义 Admin 视图、每日 Markdown 导出,以及客户端 JSONL 解析推送脚本,通过 Docker Compose 部署。 + +**Architecture:** 三节点各运行一个独立解析脚本(纯 Python),将 JSONL 解析为结构化 JSON 后 POST 到 Django API。Django 服务容器化运行,连接远程 NAS 上的 PostgreSQL + TimescaleDB。 + +**Tech Stack:** Python 3.12, Django 5.x, DRF 3.15+, psycopg 3.x, PostgreSQL 16+ TimescaleDB, Docker Compose, pytest, pytest-django + +--- + +## 文件结构总览 + +``` +agent-base/ +├── manage.py # Django 入口 +├── requirements.txt # Python 依赖 +├── requirements-dev.txt # 开发依赖(pytest 等) +├── pyproject.toml # pytest 配置 +├── .env.example # 环境变量模板 +├── .gitignore +├── .dockerignore +├── Dockerfile # Django 生产镜像 +├── docker-compose.yml # 编排配置 +├── nginx/ +│ └── nginx.conf.placeholder # Nginx 反代占位(预留) +├── src/ +│ ├── config/ # Django 项目配置 +│ │ ├── __init__.py +│ │ ├── settings/ +│ │ │ ├── __init__.py +│ │ │ ├── base.py # 公共配置 +│ │ │ └── dev.py # 开发覆盖(DB 用 SQLite) +│ │ ├── urls.py # 路由 +│ │ └── wsgi.py +│ └── openclaw/ # Django app +│ ├── __init__.py +│ ├── apps.py +│ ├── models.py # Session, Message, ToolCall +│ ├── views.py # API views +│ ├── admin.py # Admin 自定义 +│ ├── services.py # 批量写入 + Markdown 导出 +│ ├── urls.py # app 路由 +│ └── templates/ +│ └── admin/ +│ └── openclaw/ +│ └── daily_view.html +├── tests/ +│ ├── conftest.py # pytest fixtures +│ ├── test_models.py # 模型测试 +│ ├── test_api.py # API 测试 +│ ├── test_services.py # 服务层测试 +│ └── test_admin_export.py # 导出测试 +├── scripts/ +│ └── sync_sessions.py # 客户端解析与推送脚本 +└── docs/ + ├── specs/ # Spec 文档 + └── plans/ # 实施计划 +``` + +--- + +### Task 0: 项目骨架与测试基础 + +**Files:** +- Create: `pyproject.toml`, `manage.py`, `requirements.txt`, `requirements-dev.txt` +- Create: `src/config/__init__.py`, `src/config/settings/__init__.py`, `src/config/settings/base.py`, `src/config/settings/dev.py`, `src/config/urls.py`, `src/config/wsgi.py` +- Create: `src/openclaw/__init__.py`, `src/openclaw/apps.py` +- Create: `tests/conftest.py` +- Create: `.gitignore`, `.env.example` + +- [ ] **Step 1: 依赖文件** + +`requirements.txt`: +``` +Django>=5.0,<6.0 +djangorestframework>=3.15,<4.0 +psycopg[binary]>=3.1,<4.0 +gunicorn>=22.0,<24.0 +``` + +`requirements-dev.txt`: +``` +-r requirements.txt +pytest>=8.0,<9.0 +pytest-django>=4.8,<5.0 +``` + +- [ ] **Step 2: pytest 配置** + +`pyproject.toml`: +```toml +[tool.pytest.ini_options] +DJANGO_SETTINGS_MODULE = "config.settings.dev" +python_files = ["tests.py", "test_*.py"] +pythonpath = ["src"] +``` + +- [ ] **Step 3: Django settings** + +`src/config/settings/__init__.py`: +```python +from .base import * # noqa: F401,F403 +``` + +`src/config/settings/base.py`: +```python +import os + +SECRET_KEY = os.environ.get("DJANGO_SECRET_KEY", "dev-secret-key") +DEBUG = False +ALLOWED_HOSTS = os.environ.get("DJANGO_ALLOWED_HOSTS", "*").split(",") + +INSTALLED_APPS = [ + "django.contrib.admin", + "django.contrib.auth", + "django.contrib.contenttypes", + "django.contrib.sessions", + "django.contrib.messages", + "django.contrib.staticfiles", + "rest_framework", + "openclaw", +] + +MIDDLEWARE = [ + "django.middleware.security.SecurityMiddleware", + "django.contrib.sessions.middleware.SessionMiddleware", + "django.middleware.common.CommonMiddleware", + "django.middleware.csrf.CsrfViewMiddleware", + "django.contrib.auth.middleware.AuthenticationMiddleware", + "django.contrib.messages.middleware.MessageMiddleware", + "django.middleware.clickjacking.XFrameOptionsMiddleware", +] + +ROOT_URLCONF = "config.urls" + +TEMPLATES = [ + { + "BACKEND": "django.template.backends.django.DjangoTemplates", + "DIRS": [], + "APP_DIRS": True, + "OPTIONS": { + "context_processors": [ + "django.template.context_processors.debug", + "django.template.context_processors.request", + "django.contrib.auth.context_processors.auth", + "django.contrib.messages.context_processors.messages", + ], + }, + }, +] + +DATABASES = { + "default": { + "ENGINE": "django.db.backends.postgresql", + "HOST": os.environ.get("DB_HOST", "localhost"), + "PORT": os.environ.get("DB_PORT", "5432"), + "NAME": os.environ.get("DB_NAME", "openclaw_archive"), + "USER": os.environ.get("DB_USER", "openclaw"), + "PASSWORD": os.environ.get("DB_PASSWORD", ""), + } +} + +DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField" +STATIC_URL = "static/" +``` + +`src/config/settings/dev.py`: +```python +from .base import * # noqa: F401,F403 + +DEBUG = True +DATABASES = { + "default": { + "ENGINE": "django.db.backends.sqlite3", + "NAME": BASE_DIR / "db.sqlite3", # noqa: F821 + } +} +``` + +- [ ] **Step 4: Django 基础文件** + +`src/config/urls.py`: +```python +from django.contrib import admin +from django.urls import path, include + +urlpatterns = [ + path("admin/", admin.site.urls), + path("api/", include("openclaw.urls")), +] +``` + +`src/config/wsgi.py`: +```python +import os +from django.core.wsgi import get_wsgi_application + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings") +application = get_wsgi_application() +``` + +`src/openclaw/apps.py`: +```python +from django.apps import AppConfig + +class OpenclawConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "openclaw" +``` + +- [ ] **Step 5: manage.py** + +`manage.py`: +```python +#!/usr/bin/env python +import os +import sys + +def main(): + os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings") + try: + from django.core.management import execute_from_command_line + except ImportError as exc: + raise ImportError( + "Couldn't import Django. " + "Are you sure it's installed and available on your PYTHONPATH " + f"environment variable is set? Did you forget to activate a virtual environment?" + ) from exc + execute_from_command_line(sys.argv) + +if __name__ == "__main__": + main() +``` + +- [ ] **Step 6: 测试基础** + +`tests/conftest.py`: +```python +import pytest + +@pytest.fixture(autouse=True) +def media_storage(settings, tmpdir): + settings.MEDIA_ROOT = str(tmpdir) +``` + +- [ ] **Step 7: .gitignore 和 .env.example** + +`.gitignore`: +``` +__pycache__/ +*.py[cod] +*.so +*.egg-info/ +dist/ +.venv/ +db.sqlite3 +.env +*.sqlite3 +``` + +`.env.example`: +``` +# Django +DJANGO_SECRET_KEY=CHANGE_ME +DJANGO_PORT=8000 +DJANGO_ALLOWED_HOSTS=* + +# Database (NAS) +DB_HOST=192.168.x.x +DB_PORT=5432 +DB_NAME=openclaw_archive +DB_USER=openclaw +DB_PASSWORD=CHANGE_ME +``` + +- [ ] **Step 8: 安装依赖并验证 Django 启动** + +```bash +pip install -r requirements-dev.txt +cd src && python -c "import django; django.setup(); print('Django OK')" +cd .. +pytest --collect-only +``` + +Expected: `Django OK` and test collection passes (0 tests). + +- [ ] **Step 9: Commit** + +```bash +git add . +git commit -m "feat: Django project skeleton with pytest setup" +``` + +--- + +### Task 1: 数据模型 + +**Files:** +- Create: `src/openclaw/models.py` +- Create: `tests/test_models.py` + +- [ ] **Step 1: 写模型测试** + +`tests/test_models.py`: +```python +from datetime import datetime, timezone + +import pytest +from openclaw.models import Session, Message, ToolCall + +@pytest.mark.django_db +class TestModelFields: + def test_session_creation(self): + s = Session.objects.create( + session_id="a" * 36, + agent_name="xingyao", + source_node="macmini", + status="active", + ) + assert s.session_id == "a" * 36 + assert s.total_tokens == 0 + assert s.message_count == 0 + + def test_message_creation(self): + s = Session.objects.create( + session_id="b" * 36, + agent_name="test", + source_node="ubuntu1", + status="active", + ) + msg = Message.objects.create( + session=s, + message_id="msg-001", + parent_id="root", + role="assistant", + timestamp=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc), + ) + assert msg.role == "assistant" + assert msg.tokens_total == 0 + + def test_toolcall_creation(self): + s = Session.objects.create( + session_id="c" * 36, + agent_name="test", + source_node="ubuntu2", + status="active", + ) + msg = Message.objects.create( + session=s, + message_id="msg-002", + parent_id="root", + role="assistant", + timestamp=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc), + ) + tc = ToolCall.objects.create( + session=s, + message=msg, + tool_call_id="call_0", + tool_name="exec", + ) + assert tc.tool_name == "exec" + assert tc.is_error is False +``` + +- [ ] **Step 2: 运行测试确认失败** + +```bash +pytest tests/test_models.py -v +``` + +Expected: FAIL — Model not found. + +- [ ] **Step 3: 实现模型** + +`src/openclaw/models.py`: +```python +from django.db import models + + +class Session(models.Model): + session_id = models.CharField(max_length=64) + agent_name = models.CharField(max_length=128) + source_node = models.CharField(max_length=64) + session_version = models.IntegerField(default=0) + model_provider = models.CharField(max_length=64, blank=True, default="") + model_id = models.CharField(max_length=128, blank=True, default="") + thinking_level = models.CharField(max_length=64, blank=True, default="") + start_time = models.DateTimeField(null=True, blank=True) + end_time = models.DateTimeField(null=True, blank=True) + cwd = models.CharField(max_length=512, blank=True, default="") + total_tokens = models.IntegerField(default=0) + total_cost = models.FloatField(default=0.0) + message_count = models.IntegerField(default=0) + tool_call_count = models.IntegerField(default=0) + error_count = models.IntegerField(default=0) + raw_file_path = models.TextField(blank=True, default="") + pushed_at = models.DateTimeField(null=True, blank=True) + status = models.CharField(max_length=16, default="active") + metadata = models.JSONField(default=dict, blank=True) + + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + db_table = "sessions" + unique_together = ("session_id", "agent_name") + ordering = ["-start_time"] + + def __str__(self): + return f"Session({self.session_id} {self.agent_name})" + + +class Message(models.Model): + session = models.ForeignKey( + Session, on_delete=models.CASCADE, related_name="messages" + ) + message_id = models.CharField(max_length=128) + parent_id = models.CharField(max_length=128, blank=True, default="") + seq = models.IntegerField(default=0) + role = models.CharField(max_length=32) + content_text = models.TextField(blank=True, default="") + raw_content = models.JSONField(default=list, blank=True) + raw_message = models.JSONField(default=dict, blank=True) + timestamp = models.DateTimeField() + # assistant 专用 + model = models.CharField(max_length=128, blank=True, default="") + provider = models.CharField(max_length=64, blank=True, default="") + stop_reason = models.CharField(max_length=64, blank=True, default="") + tokens_input = models.IntegerField(default=0) + tokens_output = models.IntegerField(default=0) + tokens_cache_read = models.IntegerField(default=0) + tokens_cache_write = models.IntegerField(default=0) + tokens_total = models.IntegerField(default=0) + cost_total = models.FloatField(default=0.0) + # toolResult 专用 + tool_call_id = models.CharField(max_length=128, blank=True, default="") + tool_name = models.CharField(max_length=128, blank=True, default="") + is_error = models.BooleanField(default=False) + exit_code = models.IntegerField(null=True, blank=True) + duration_ms = models.IntegerField(null=True, blank=True) + + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + db_table = "messages" + ordering = ["seq"] + + def __str__(self): + return f"Message({self.message_id} {self.role})" + + +class ToolCall(models.Model): + session = models.ForeignKey( + Session, on_delete=models.CASCADE, related_name="tool_calls" + ) + message = models.ForeignKey( + Message, on_delete=models.CASCADE, related_name="tool_calls" + ) + tool_call_id = models.CharField(max_length=128) + tool_name = models.CharField(max_length=128) + arguments = models.JSONField(default=dict, blank=True) + result_text = models.TextField(blank=True, default="") + is_error = models.BooleanField(default=False) + exit_code = models.IntegerField(null=True, blank=True) + duration_ms = models.IntegerField(null=True, blank=True) + seq = models.IntegerField(default=0) + + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + db_table = "tool_calls" + ordering = ["seq"] + + def __str__(self): + return f"ToolCall({self.tool_name} {self.tool_call_id})" +``` + +- [ ] **Step 4: make migrations + migrate** + +```bash +cd src && python manage.py makemigrations openclaw +python manage.py migrate +cd .. +``` + +- [ ] **Step 5: 重新运行测试确认通过** + +```bash +pytest tests/test_models.py -v +``` + +Expected: 3 PASS. + +- [ ] **Step 6: Commit** + +```bash +git add src/openclaw/models.py tests/test_models.py src/openclaw/migrations/ pyproject.toml +git commit -m "feat: add Session, Message, ToolCall models" +``` + +--- + +### Task 2: 批量写入 API + +**Files:** +- Create: `src/openclaw/urls.py` +- Create: `src/openclaw/views.py` +- Create: `src/openclaw/services.py` +- Create: `tests/test_api.py` +- Create: `tests/test_services.py` + +- [ ] **Step 1: 写服务层测试(BulkUpsertService)** + +`tests/test_services.py`: +```python +from datetime import datetime, timezone + +import pytest +from openclaw.models import Session, Message, ToolCall +from openclaw.services import BulkUpsertService + + +@pytest.mark.django_db +class TestBulkUpsertService: + def _new_session_payload(self): + return { + "agent_name": "test-agent", + "source_node": "macmini", + "sessions": [ + { + "session_id": "sess-001", + "session_version": 1, + "model_provider": "anthropic", + "model_id": "claude-sonnet-4-6", + "thinking_level": "high", + "start_time": "2026-04-05T10:00:00Z", + "end_time": "2026-04-05T10:30:00Z", + "cwd": "/tmp/test", + "total_tokens": 5000, + "total_cost": 0.12, + "message_count": 10, + "tool_call_count": 3, + "error_count": 0, + "raw_file_path": "/path/to/sessions/sess-001.jsonl", + "status": "active", + "metadata": {}, + } + ], + "messages": [ + { + "session_id": "sess-001", + "message_id": "msg-001", + "parent_id": "root", + "seq": 0, + "role": "user", + "content_text": "Hello", + "raw_content": [{"type": "text", "text": "Hello"}], + "raw_message": {}, + "timestamp": "2026-04-05T10:05:00Z", + } + ], + "tool_calls": [], + } + + def test_upsert_new_session(self): + result = BulkUpsertService.upsert(self._new_session_payload()) + assert result["sessions_upserted"] == 1 + assert Session.objects.get(session_id="sess-001") + + def test_upsert_idempotent(self): + BulkUpsertService.upsert(self._new_session_payload()) + result = BulkUpsertService.upsert(self._new_session_payload()) + # second push should not create duplicate + assert Session.objects.filter(session_id="sess-001").count() == 1 + assert result["sessions_upserted"] == 0 # skipped + + def test_upsert_with_messages_and_toolcalls(self): + payload = self._new_session_payload() + payload["messages"].append( + { + "session_id": "sess-001", + "message_id": "msg-002", + "parent_id": "msg-001", + "seq": 1, + "role": "assistant", + "content_text": "Hi there", + "raw_content": [{"type": "text", "text": "Hi there"}], + "raw_message": {}, + "timestamp": "2026-04-05T10:06:00Z", + "model": "claude-sonnet-4-6", + "provider": "anthropic", + "stop_reason": "end_turn", + "tokens_input": 100, + "tokens_output": 50, + "tokens_total": 150, + "cost_total": 0.01, + } + ) + payload["tool_calls"].append( + { + "session_id": "sess-001", + "message_id": "msg-002", + "tool_call_id": "call_0", + "tool_name": "exec", + "arguments": {"command": "ls"}, + "result_text": "file.txt", + "is_error": False, + "exit_code": 0, + "duration_ms": 200, + "seq": 0, + } + ) + result = BulkUpsertService.upsert(payload) + assert result["messages_upserted"] == 2 + assert result["tool_calls_upserted"] == 1 + assert ToolCall.objects.get(tool_call_id="call_0").tool_name == "exec" +``` + +- [ ] **Step 2: 运行测试确认失败** + +```bash +pytest tests/test_services.py -v +``` + +Expected: FAIL — module not found. + +- [ ] **Step 3: 实现 BulkUpsertService** + +`src/openclaw/services.py`: +```python +from datetime import datetime, timezone + +from django.db import transaction + +from openclaw.models import Session, Message, ToolCall + + +def _parse_ts(value): + if not value: + return None + if isinstance(value, str): + # Handle ISO 8601 Z suffix + value = value.replace("Z", "+00:00") + return datetime.fromisoformat(value) + return value + + +class BulkUpsertService: + @staticmethod + @transaction.atomic + def upsert(payload): + agent_name = payload["agent_name"] + source_node = payload["source_node"] + sessions_data = payload.get("sessions", []) + messages_data = payload.get("messages", []) + tool_calls_data = payload.get("tool_calls", []) + + sessions_upserted = 0 + messages_upserted = 0 + tool_calls_upserted = 0 + + for sess in sessions_data: + session_id = sess["session_id"] + defaults = { + "source_node": source_node, + "session_version": sess.get("session_version", 0), + "model_provider": sess.get("model_provider", ""), + "model_id": sess.get("model_id", ""), + "thinking_level": sess.get("thinking_level", ""), + "start_time": _parse_ts(sess.get("start_time")), + "end_time": _parse_ts(sess.get("end_time")), + "cwd": sess.get("cwd", ""), + "total_tokens": sess.get("total_tokens", 0), + "total_cost": sess.get("total_cost", 0.0), + "message_count": sess.get("message_count", 0), + "tool_call_count": sess.get("tool_call_count", 0), + "error_count": sess.get("error_count", 0), + "raw_file_path": sess.get("raw_file_path", ""), + "pushed_at": datetime.now(timezone.utc), + "status": sess.get("status", "active"), + "metadata": sess.get("metadata", {}), + } + _, created = Session.objects.update_or_create( + session_id=session_id, + agent_name=agent_name, + defaults=defaults, + ) + if created: + sessions_upserted += 1 + + # Build session lookup: session_id -> Session instance + session_ids = {s["session_id"] for s in sessions_data} + session_lookup = { + s.session_id: s + for s in Session.objects.filter( + session_id__in=session_ids, agent_name=agent_name + ) + } + + # Upsert messages + for msg in messages_data: + session = session_lookup.get(msg["session_id"]) + if not session: + continue + defaults = { + "parent_id": msg.get("parent_id", ""), + "seq": msg.get("seq", 0), + "role": msg.get("role", ""), + "content_text": msg.get("content_text", ""), + "raw_content": msg.get("raw_content", []), + "raw_message": msg.get("raw_message", {}), + "timestamp": _parse_ts(msg.get("timestamp")), + "model": msg.get("model", ""), + "provider": msg.get("provider", ""), + "stop_reason": msg.get("stop_reason", ""), + "tokens_input": msg.get("tokens_input", 0), + "tokens_output": msg.get("tokens_output", 0), + "tokens_cache_read": msg.get("tokens_cache_read", 0), + "tokens_cache_write": msg.get("tokens_cache_write", 0), + "tokens_total": msg.get("tokens_total", 0), + "cost_total": msg.get("cost_total", 0.0), + "tool_call_id": msg.get("tool_call_id", ""), + "tool_name": msg.get("tool_name", ""), + "is_error": msg.get("is_error", False), + "exit_code": msg.get("exit_code"), + "duration_ms": msg.get("duration_ms"), + } + Message.objects.update_or_create( + session=session, + message_id=msg["message_id"], + defaults=defaults, + ) + messages_upserted += 1 + + # Build message lookup: message_id -> Message instance + msg_lookup = { + m.message_id: m + for m in Message.objects.filter(session__in=session_lookup.values()) + } + + # Upsert tool_calls + for tc in tool_calls_data: + session = session_lookup.get(tc["session_id"]) + message = msg_lookup.get(tc["message_id"]) + if not session or not message: + continue + ToolCall.objects.update_or_create( + session=session, + message=message, + tool_call_id=tc["tool_call_id"], + defaults={ + "tool_name": tc.get("tool_name", ""), + "arguments": tc.get("arguments", {}), + "result_text": tc.get("result_text", ""), + "is_error": tc.get("is_error", False), + "exit_code": tc.get("exit_code"), + "duration_ms": tc.get("duration_ms"), + "seq": tc.get("seq", 0), + }, + ) + tool_calls_upserted += 1 + + return { + "sessions_upserted": sessions_upserted, + "messages_upserted": messages_upserted, + "tool_calls_upserted": tool_calls_upserted, + } +``` + +- [ ] **Step 4: 运行服务层测试** + +```bash +pytest tests/test_services.py -v +``` + +Expected: 3 PASS. + +- [ ] **Step 5: 写 API 测试** + +`tests/test_api.py`: +```python +import pytest + +from openclaw.models import Session + +BULK_URL = "/api/sessions/bulk_upsert/" + + +def _minimal_payload(): + return { + "agent_name": "test", + "source_node": "macmini", + "sessions": [ + { + "session_id": "test-session", + "model_provider": "test", + "model_id": "test-model", + } + ], + "messages": [], + "tool_calls": [], + } + + +@pytest.mark.django_db +class TestBulkUpsertAPI: + def test_bulk_upsert_ok(self, client): + resp = client.post(BULK_URL, _minimal_payload(), content_type="application/json") + assert resp.status_code == 200 + data = resp.json() + assert data["status"] == "ok" + assert data["sessions_upserted"] == 1 + + def test_bulk_upsert_idempotent(self, client): + client.post(BULK_URL, _minimal_payload(), content_type="application/json") + resp = client.post(BULK_URL, _minimal_payload(), content_type="application/json") + data = resp.json() + assert data["sessions_upserted"] == 0 + + def test_bulk_upsert_missing_fields_returns_400(self, client): + resp = client.post(BULK_URL, {}, content_type="application/json") + assert resp.status_code == 400 +``` + +- [ ] **Step 6: 运行 API 测试确认失败** + +```bash +pytest tests/test_api.py -v +``` + +Expected: FAIL — URL not found. + +- [ ] **Step 7: 实现 API view + 路由** + +`src/openclaw/urls.py`: +```python +from django.urls import path +from openclaw.views import sessions_bulk_upsert + +urlpatterns = [ + path("sessions/bulk_upsert/", sessions_bulk_upsert, name="sessions_bulk_upsert"), +] +``` + +`src/openclaw/views.py`: +```python +import json + +from django.http import JsonResponse +from django.views.decorators.csrf import csrf_exempt +from django.views.decorators.http import require_http_methods + +from openclaw.services import BulkUpsertService + + +@csrf_exempt +@require_http_methods(["POST"]) +def sessions_bulk_upsert(request): + try: + payload = json.loads(request.body) + except json.JSONDecodeError: + return JsonResponse({"error": "Invalid JSON"}, status=400) + + if "agent_name" not in payload or "source_node" not in payload: + return JsonResponse( + {"error": "Missing agent_name or source_node"}, status=400 + ) + + if "sessions" not in payload: + return JsonResponse({"error": "Missing sessions"}, status=400) + + result = BulkUpsertService.upsert(payload) + return JsonResponse({"status": "ok", **result}) +``` + +- [ ] **Step 8: 运行 API 测试确认通过** + +```bash +pytest tests/test_api.py -v +``` + +Expected: 3 PASS. + +- [ ] **Step 9: 运行所有测试** + +```bash +pytest -v +``` + +Expected: ALL PASS (3 models + 3 services + 3 api = 9). + +- [ ] **Step 10: Commit** + +```bash +git add src/openclaw/services.py src/openclaw/views.py src/openclaw/urls.py tests/test_services.py tests/test_api.py +git commit -m "feat: bulk upsert API with idempotent writes" +``` + +--- + +### Task 3: Django Admin 基础配置 + +**Files:** +- Create: `src/openclaw/admin.py` + +- [ ] **Step 1: Admin 配置** + +`src/openclaw/admin.py`: +```python +from django.contrib import admin +from openclaw.models import Session, Message, ToolCall + + +class MessageInline(admin.TabularInline): + model = Message + extra = 0 + fields = ("seq", "role", "content_text", "timestamp") + readonly_fields = ("seq", "role", "content_text", "timestamp") + + def has_add_permission(self, request, obj=None): + return False + + def has_delete_permission(self, request, obj=None): + return False + + +class ToolCallInline(admin.TabularInline): + model = ToolCall + extra = 0 + fields = ("seq", "tool_name", "tool_call_id", "is_error", "duration_ms") + readonly_fields = ("seq", "tool_name", "tool_call_id", "is_error", "duration_ms") + + def has_add_permission(self, request, obj=None): + return False + + def has_delete_permission(self, request, obj=None): + return False + + +@admin.register(Session) +class SessionAdmin(admin.ModelAdmin): + list_display = ( + "session_id", + "agent_name", + "model_id", + "total_tokens", + "message_count", + "start_time", + ) + list_filter = ("agent_name", "source_node", "model_id", "start_time") + search_fields = ("session_id", "cwd") + ordering = ("-start_time",) + inlines = [MessageInline, ToolCallInline] + readonly_fields = ( + "session_id", + "agent_name", + "source_node", + "start_time", + "end_time", + "pushed_at", + ) + + +@admin.register(Message) +class MessageAdmin(admin.ModelAdmin): + list_display = ("message_id", "session", "role", "timestamp", "tokens_total") + list_filter = ("role", "model", "timestamp") + search_fields = ("content_text",) + ordering = ("-timestamp",) + + +@admin.register(ToolCall) +class ToolCallAdmin(admin.ModelAdmin): + list_display = ("tool_name", "tool_call_id", "session", "is_error", "duration_ms") + list_filter = ("tool_name", "is_error", "exit_code") + ordering = ("-created_at",) +``` + +- [ ] **Step 2: 手动验证 Admin** + +```bash +cd src && python manage.py migrate && python manage.py createsuperuser +cd src && python manage.py runserver +``` + +访问 `http://localhost:8000/admin/` 确认三张表在 Admin 中可见。 + +- [ ] **Step 3: Commit** + +```bash +git add src/openclaw/admin.py +git commit -m "feat: Django Admin for Session, Message, ToolCall with inlines" +``` + +--- + +### Task 4: Admin 自定义视图 — 按日期查询对话 + +**Files:** +- Modify: `src/openclaw/admin.py` +- Create: `src/openclaw/templates/admin/openclaw/daily_view.html` +- Modify: `src/openclaw/urls.py`(admin 路由由 admin.site 管理,不需要改) + +- [ ] **Step 1: 模板** + +`src/openclaw/templates/admin/openclaw/daily_view.html`: +```html +{% extends "admin/base_site.html" %} +{% block content %} +

Daily Conversation View

+ +
+ + + + +
+ +{% if sessions %} +{% for session in sessions %} +
+

Session: {{ session.session_id }} ({{ session.agent_name }})

+

Model: {{ session.model_id }} | Tokens: {{ session.total_tokens }} | + Start: {{ session.start_time|default:"N/A" }}

+ + {% for msg in session.messages %} +
+ {{ msg.timestamp|date:"H:i" }} {{ msg.get_role_label }} + {% if msg.role == 'toolResult' %} + {% if msg.tool_name %}[Tool: {{ msg.tool_name }}]{% endif %} + {% endif %} +
+ Content +
{{ msg.content_text|default:"(empty)" }}
+
+
+ {% empty %} +

No messages.

+ {% endfor %} +
+{% endfor %} +{% endif %} +{% endblock %} +``` + +- [ ] **Step 2: Admin 视图** + +添加到 `src/openclaw/admin.py` 末尾: + +```python +from datetime import date + +from django.shortcuts import render +from django.db.models import Prefetch + + +class DailyConversationView(admin.ModelAdmin): + """Admin custom view for date-range conversation browsing.""" + def get_urls(self): + from django.urls import path + urls = super().get_urls() + custom_urls = [ + path("daily/", self.admin_site.admin_view(self.daily_view), name="openclaw_daily"), + ] + return custom_urls + urls + + def daily_view(self, request): + start_str = request.GET.get("start") + end_str = request.GET.get("end") + agent_filter = request.GET.get("agent", "") + + start_date = start_str if start_str else date.today().isoformat() + end_date = end_str if end_str else date.today().isoformat() + + agents = list( + Session.objects.values_list("agent_name", flat=True) + .distinct() + .order_by("agent_name") + ) + + sessions_qs = Session.objects.filter( + start_time__date__gte=start_date, + start_time__date__lte=end_date, + ).order_by("start_time") + + if agent_filter: + sessions_qs = sessions_qs.filter(agent_name=agent_filter) + + messages_prefetch = Prefetch( + "messages", + queryset=Message.objects.order_by("seq"), + ) + sessions_qs = sessions_qs.prefetch_related(messages_prefetch) + + session_list = [] + for session in sessions_qs: + messages = [] + for msg in session.messages.all(): + messages.append({ + "timestamp": msg.timestamp, + "role": msg.role, + "content_text": msg.content_text, + "tool_name": msg.tool_name, + "get_role_label": self._role_label(msg.role), + }) + session_list.append({ + "session_id": session.session_id, + "agent_name": session.agent_name, + "model_id": session.model_id, + "total_tokens": session.total_tokens, + "start_time": session.start_time, + "messages": messages, + }) + + context = dict( + self.admin_site.each_context(request), + start_date=start_date, + end_date=end_date, + selected_agent=agent_filter, + agents=agents, + sessions=session_list, + title="Daily Conversation View", + ) + return render(request, "admin/openclaw/daily_view.html", context) + + @staticmethod + def _role_label(role): + labels = { + "user": "User", + "assistant": "Assistant", + "toolResult": "Tool Result", + } + return labels.get(role, role) + + +# Register Daily view via SessionAdmin: +# In SessionAdmin inheritance, add daily_view url. +# Actually, the cleanest way: create a standalone admin view. +``` + +等等 — `DailyConversationView` 继承自 `ModelAdmin` 但并不需要绑定到一个 model。正确做法是用一个独立 view 函数注册到 admin。让我重写: + +```python +# Add this function to src/openclaw/admin.py + +from django.template.response import TemplateResponse + +def daily_conversation_view(request): + """Admin standalone view for date-range conversation browsing.""" + from datetime import date + from django.db.models import Prefetch + from openclaw.models import Session, Message + + start_str = request.GET.get("start") + end_str = request.GET.get("end") + agent_filter = request.GET.get("agent", "") + + start_date = start_str if start_str else date.today().isoformat() + end_date = end_str if end_str else date.today().isoformat() + + agents = list( + Session.objects.values_list("agent_name", flat=True) + .distinct() + .order_by("agent_name") + ) + + sessions_qs = Session.objects.filter( + start_time__date__gte=start_date, + start_time__date__lte=end_date, + ).order_by("start_time") + + if agent_filter: + sessions_qs = sessions_qs.filter(agent_name=agent_filter) + + messages_prefetch = Prefetch( + "messages", + queryset=Message.objects.order_by("seq"), + ) + sessions_qs = sessions_qs.prefetch_related(messages_prefetch) + + role_labels = { + "user": "User", + "assistant": "Assistant", + "toolResult": "Tool Result", + } + + session_list = [] + for session in sessions_qs: + messages = [] + for msg in session.messages.all(): + messages.append({ + "timestamp": msg.timestamp, + "role": msg.role, + "content_text": msg.content_text, + "tool_name": msg.tool_name, + "get_role_label": role_labels.get(msg.role, msg.role), + }) + session_list.append({ + "session_id": session.session_id, + "agent_name": session.agent_name, + "model_id": session.model_id, + "total_tokens": session.total_tokens, + "start_time": session.start_time, + "messages": messages, + }) + + context = { + **admin.site.each_context(request), + "start_date": start_date, + "end_date": end_date, + "selected_agent": agent_filter, + "agents": agents, + "sessions": session_list, + "title": "Daily Conversation View", + } + return TemplateResponse(request, "admin/openclaw/daily_view.html", context) + + +# Add URL in SessionAdmin or as standalone: +# Register the URL via custom admin urlpattern. +class CustomAdminSite(admin.AdminSite): + def get_urls(self): + from django.urls import path + urls = super().get_urls() + urls += [ + path("openclaw/daily/", daily_conversation_view, name="openclaw_daily"), + ] + return urls + + +# Then swap admin.site in config/urls.py and manage.py +``` + +Actually, this is getting complicated with swapping the admin site. The simpler approach is to add the URL through an existing ModelAdmin's `get_urls`. Let me rewrite this section cleanly: + +添加到 `src/openclaw/admin.py`(接在 ToolCallAdmin 后面): + +```python +from django.template.response import TemplateResponse +from datetime import date +from django.db.models import Prefetch + + +def daily_conversation_view(self, request): + """Admin standalone view for date-range conversation browsing.""" + start_str = request.GET.get("start") + end_str = request.GET.get("end") + agent_filter = request.GET.get("agent", "") + + start_date = start_str if start_str else date.today().isoformat() + end_date = end_str if end_str else date.today().isoformat() + + agents = list( + Session.objects.values_list("agent_name", flat=True) + .distinct() + .order_by("agent_name") + ) + + sessions_qs = Session.objects.filter( + start_time__date__gte=start_date, + start_time__date__lte=end_date, + ).order_by("start_time") + + if agent_filter: + sessions_qs = sessions_qs.filter(agent_name=agent_filter) + + messages_prefetch = Prefetch( + "messages", + queryset=Message.objects.order_by("seq"), + ) + sessions_qs = sessions_qs.prefetch_related(messages_prefetch) + + role_labels = { + "user": "User", + "assistant": "Assistant", + "toolResult": "Tool Result", + } + + session_list = [] + for session in sessions_qs: + messages = [] + for msg in session.messages.all(): + messages.append({ + "timestamp": msg.timestamp, + "role": msg.role, + "content_text": msg.content_text, + "tool_name": msg.tool_name, + "get_role_label": role_labels.get(msg.role, msg.role), + }) + session_list.append({ + "session_id": session.session_id, + "agent_name": session.agent_name, + "model_id": session.model_id, + "total_tokens": session.total_tokens, + "start_time": session.start_time, + "messages": messages, + }) + + context = { + **admin.site.each_context(request), + "start_date": start_date, + "end_date": end_date, + "selected_agent": agent_filter, + "agents": agents, + "sessions": session_list, + "title": "Daily Conversation View", + } + return TemplateResponse(request, "admin/openclaw/daily_view.html", context) + + +# Add URL through SessionAdmin get_urls +class SessionAdmin(admin.ModelAdmin): + # ... (as defined above) + ... + + def get_urls(self): + from django.urls import path + urls = super().get_urls() + custom_urls = [ + path("daily/", admin.site.admin_view(daily_conversation_view), name="openclaw_daily"), + ] + return custom_urls + urls +``` + +好的,为了避免重复定义 SessionAdmin,最终 `src/openclaw/admin.py` 完整代码如下: + +```python +from django.contrib import admin +from django.template.response import TemplateResponse +from datetime import date +from django.db.models import Prefetch +from openclaw.models import Session, Message, ToolCall + + +class MessageInline(admin.TabularInline): + model = Message + extra = 0 + fields = ("seq", "role", "content_text", "timestamp") + readonly_fields = ("seq", "role", "content_text", "timestamp") + + def has_add_permission(self, request, obj=None): + return False + + def has_delete_permission(self, request, obj=None): + return False + + +class ToolCallInline(admin.TabularInline): + model = ToolCall + extra = 0 + fields = ("seq", "tool_name", "tool_call_id", "is_error", "duration_ms") + readonly_fields = ("seq", "tool_name", "tool_call_id", "is_error", "duration_ms") + + def has_add_permission(self, request, obj=None): + return False + + def has_delete_permission(self, request, obj=None): + return False + + +def daily_conversation_view(request): + """Admin standalone view for date-range conversation browsing.""" + start_str = request.GET.get("start") + end_str = request.GET.get("end") + agent_filter = request.GET.get("agent", "") + + start_date = start_str if start_str else date.today().isoformat() + end_date = end_str if end_str else date.today().isoformat() + + agents = list( + Session.objects.values_list("agent_name", flat=True) + .distinct() + .order_by("agent_name") + ) + + sessions_qs = Session.objects.filter( + start_time__date__gte=start_date, + start_time__date__lte=end_date, + ).order_by("start_time") + + if agent_filter: + sessions_qs = sessions_qs.filter(agent_name=agent_filter) + + messages_prefetch = Prefetch( + "messages", + queryset=Message.objects.order_by("seq"), + ) + sessions_qs = sessions_qs.prefetch_related(messages_prefetch) + + role_labels = { + "user": "User", + "assistant": "Assistant", + "toolResult": "Tool Result", + } + + session_list = [] + for session in sessions_qs: + messages = [] + for msg in session.messages.all(): + messages.append({ + "timestamp": msg.timestamp, + "role": msg.role, + "content_text": msg.content_text, + "tool_name": msg.tool_name, + "get_role_label": role_labels.get(msg.role, msg.role), + }) + session_list.append({ + "session_id": session.session_id, + "agent_name": session.agent_name, + "model_id": session.model_id, + "total_tokens": session.total_tokens, + "start_time": session.start_time, + "messages": messages, + }) + + context = { + **admin.site.each_context(request), + "start_date": start_date, + "end_date": end_date, + "selected_agent": agent_filter, + "agents": agents, + "sessions": session_list, + "title": "Daily Conversation View", + } + return TemplateResponse(request, "admin/openclaw/daily_view.html", context) + + +@admin.register(Session) +class SessionAdmin(admin.ModelAdmin): + list_display = ( + "session_id", + "agent_name", + "model_id", + "total_tokens", + "message_count", + "start_time", + ) + list_filter = ("agent_name", "source_node", "model_id", "start_time") + search_fields = ("session_id", "cwd") + ordering = ("-start_time",) + inlines = [MessageInline, ToolCallInline] + readonly_fields = ( + "session_id", + "agent_name", + "source_node", + "start_time", + "end_time", + "pushed_at", + ) + + def get_urls(self): + from django.urls import path + urls = super().get_urls() + custom_urls = [ + path("daily/", admin.site.admin_view(daily_conversation_view), name="openclaw_daily"), + ] + return custom_urls + urls + + +@admin.register(Message) +class MessageAdmin(admin.ModelAdmin): + list_display = ("message_id", "session", "role", "timestamp", "tokens_total") + list_filter = ("role", "model", "timestamp") + search_fields = ("content_text",) + ordering = ("-timestamp",) + + +@admin.register(ToolCall) +class ToolCallAdmin(admin.ModelAdmin): + list_display = ("tool_name", "tool_call_id", "session", "is_error", "duration_ms") + list_filter = ("tool_name", "is_error", "exit_code") + ordering = ("-created_at",) +``` + +- [ ] **Step 3: 添加 Admin 导航链接** + +在 SessionAdmin 的 `list_display` 上方加一个自定义链接按钮(方便入口),或者在模板侧边栏直接访问 `/admin/sessions/session/daily/`。 + +- [ ] **Step 4: 手动验证** + +```bash +cd src && python manage.py runserver +``` + +- 注册一个 session + message 测试数据 +- `http://localhost:8000/admin/openclaw/session/daily/` 确认自定义视图可访问 +- 选择日期范围,确认显示 session 列表和对话 + +- [ ] **Step 5: Commit** + +```bash +git add src/openclaw/admin.py src/openclaw/templates/ +git commit -m "feat: admin daily conversation view with date filtering" +``` + +--- + +### Task 5: Daily Markdown 导出(Admin Action) + +**Files:** +- Create: `src/openclaw/export.py` +- Create: `tests/test_admin_export.py` + +- [ ] **Step 1: 写导出测试** + +`tests/test_admin_export.py`: +```python +"""Tests for Markdown export functionality.""" +from datetime import datetime, timezone + +import pytest + +from openclaw.models import Session, Message +from openclaw.export import generate_markdown_report + + +@pytest.fixture +def db_session(db): + s = Session.objects.create( + session_id="report-test", + agent_name="xingyao", + source_node="macmini", + model_provider="anthropic", + model_id="claude-sonnet-4-6", + total_tokens=45230, + start_time=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc), + ) + Message.objects.create( + session=s, + message_id="m1", + parent_id="root", + role="user", + content_text="Help me fix this bug", + timestamp=datetime(2026, 4, 5, 10, 23, tzinfo=timezone.utc), + ) + Message.objects.create( + session=s, + message_id="m2", + parent_id="m1", + role="assistant", + content_text="The bug is on line 45...", + timestamp=datetime(2026, 4, 5, 10, 23, 30, tzinfo=timezone.utc), + ) + return s + + +@pytest.mark.django_db +class TestMarkdownExport: + def test_basic_report(self, db_session): + md = generate_markdown_report( + messages=db_session.messages.order_by("created_at"), + date_str="2026-04-05", + ) + assert "# Daily Report: 2026-04-05" in md + assert "Help me fix this bug" in md + assert "The bug is on line 45..." in md + + def test_thinking_content_stripped(self, db): + s = Session.objects.create( + session_id="thinking-test", + agent_name="test", + source_node="macmini", + start_time=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc), + ) + Message.objects.create( + session=s, + message_id="m3", + parent_id="root", + role="assistant", + content_text="Final answer", + raw_content=[ + {"type": "thinking", "thinking": "Let me think about this..."}, + {"type": "text", "text": "Final answer"}, + ], + timestamp=datetime(2026, 4, 5, 10, 30, tzinfo=timezone.utc), + ) + md = generate_markdown_report( + messages=s.messages.order_by("created_at"), + date_str="2026-04-05", + ) + assert "Let me think about this..." not in md + assert "Final answer" in md + + def test_tool_call_formatting(self, db): + s = Session.objects.create( + session_id="tool-test", + agent_name="test", + source_node="macmini", + model_id="test-model", + total_tokens=100, + start_time=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc), + ) + Message.objects.create( + session=s, + message_id="m4", + parent_id="root", + role="assistant", + content_text="I'll run a command", + raw_content=[ + {"type": "text", "text": "I'll run a command"}, + ], + timestamp=datetime(2026, 4, 5, 10, 30, tzinfo=timezone.utc), + ) + md = generate_markdown_report( + messages=s.messages.order_by("created_at"), + date_str="2026-04-05", + ) + assert "test-model" in md + assert "100" in md +``` + +- [ ] **Step 2: 运行测试确认失败** + +```bash +pytest tests/test_admin_export.py -v +``` + +Expected: FAIL — module not found. + +- [ ] **Step 3: 实现 Markdown 导出** + +`src/openclaw/export.py`: +```python +from openclaw.models import Session + + +def generate_markdown_report(messages, date_str, sessions=None): + """Generate a markdown daily report. + + Args: + messages: QuerySet of Message objects, ordered by timestamp. + date_str: Date string for the report header (YYYY-MM-DD). + sessions: Optional dict of session_id -> Session instance for metadata. + """ + if sessions is None: + sessions = {} + + lines = [f"# Daily Report: {date_str}", ""] + + # Group messages by session + session_messages = {} + for msg in messages: + sid = msg.session_id if hasattr(msg, "session_id") else msg.session.session_id + if sid not in session_messages: + session_messages[sid] = [] + session_messages[sid].append(msg) + + for session_id, msgs in session_messages.items(): + session = sessions.get(session_id) + if session: + lines.append( + f"## Session: {session_id} (Agent: {session.agent_name})" + ) + lines.append( + f"**Model**: {session.model_id or 'N/A'} | " + f"**Token**: {session.total_tokens:,}" + ) + else: + lines.append(f"## Session: {session_id}") + + lines.append("") + + for msg in msgs: + role_label = { + "user": "User", + "assistant": "Assistant", + "toolResult": "Tool Result", + }.get(msg.role, msg.role) + + time_str = msg.timestamp.strftime("%H:%M") + + # For assistant messages, check raw_content for tool_call mentions + if msg.role == "assistant": + tool_info = _extract_tool_info(msg.raw_content) + lines.append(f"### {time_str} {role_label}") + if msg.content_text: + lines.append("") + lines.append(msg.content_text) + + for tool in tool_info: + lines.append("") + lines.append( + f"**{time_str} {role_label} -> [Tool: {tool['name']}]**" + ) + lines.append("") + lines.append(f"`{tool.get('arguments', '')}`") + if tool.get("result"): + lines.append("") + lines.append(f'**Result**: {tool["result"]}') + elif msg.role == "toolResult": + continue # toolResult handled inline with assistant + else: + lines.append(f"### {time_str} {role_label}") + if msg.content_text: + lines.append("") + lines.append(msg.content_text) + + lines.append("") + lines.append("---") + lines.append("") + + return "\n".join(lines) + + +def _extract_tool_info(raw_content): + """Extract tool call info from message raw_content JSON.""" + tools = [] + if isinstance(raw_content, list): + for block in raw_content: + if isinstance(block, dict) and block.get("type") == "toolCall": + tool_name = block.get("tool_name") or block.get("name", "unknown") + args = block.get("arguments", {}) + if isinstance(args, str): + args_str = args[:200] + else: + args_str = str(args)[:200] + tools.append({ + "name": tool_name, + "arguments": args_str, + "result": "", # Will be filled later from toolResult + }) + return tools + + +def export_daily_markdown(sessions_queryset): + """Generate a markdown file from a QuerySet of Session objects. + + Returns (markdown_string, filename). + Fetches all messages for the sessions. + """ + messages = Message.objects.filter( + session__in=sessions_queryset + ).order_by("timestamp") + + sessions_map = {s.session_id: s for s in sessions_queryset} + + # Determine date from first session start_time + first_session = sessions_queryset.order_by("start_time").first() + if first_session and first_session.start_time: + date_str = first_session.start_time.strftime("%Y-%m-%d") + else: + date_str = "export" + + md = generate_markdown_report(messages, date_str, sessions_map) + filename = f"daily-report-{date_str}.md" + return md, filename +``` + +- [ ] **Step 4: 运行导出测试** + +```bash +pytest tests/test_admin_export.py -v +``` + +Expected: 3 PASS. + +- [ ] **Step 5: 注册 Admin Action** + +修改 `src/openclaw/admin.py`,在 SessionAdmin 中添加 action: + +```python +from openclaw.export import export_daily_markdown +from django.http import HttpResponse + + +@admin.action(description="Export selected sessions to Markdown") +def export_to_markdown(modeladmin, request, queryset): + md, filename = export_daily_markdown(queryset) + response = HttpResponse(md, content_type="text/markdown") + response["Content-Disposition"] = f'attachment; filename="{filename}"' + return response + + +@admin.register(Session) +class SessionAdmin(admin.ModelAdmin): + # ... existing code + actions = [export_to_markdown] + ... +``` + +完整文件在 commit 时需要合并以上修改。 + +- [ ] **Step 6: 手动验证** + +```bash +cd src && python manage.py runserver +``` + +在 Admin Session 列表勾选 sessions → 选择 "Export selected sessions to Markdown" → 确认下载文件,内容为 Markdown 格式对话。 + +- [ ] **Step 7: Commit** + +```bash +git add src/openclaw/export.py tests/test_admin_export.py src/openclaw/admin.py +git commit -m "feat: daily Markdown export admin action" +``` + +--- + +### Task 6: 客户端 JSONL 解析与推送脚本 + +**Files:** +- Create: `scripts/sync_sessions.py` + +- [ ] **Step 1: 创建脚本** + +`scripts/sync_sessions.py`: +```python +#!/usr/bin/env python +""" +OpenClaw Session Sync Script + +Scans local agent sessions directories, parses JSONL files, +and pushes structured JSON to the Django API. + +Usage: + python sync_sessions.py --remote-url http://macmini:8000/api/sessions/bulk_upsert/ + +Cron: + 0 2 * * * cd /path/to/scripts && python sync_sessions.py --remote-url +""" + +import argparse +import json +import os +import sys +import time +import urllib.error +import urllib.request +from pathlib import Path + +# ───────────────────────────────────────────────────────────────── +# Configuration +# ───────────────────────────────────────────────────────────────── + +SESSIONS_DIR_NAME = "sessions" +STATE_FILE = ".sync_state" +DELETED_SUFFIX = ".deleted." + + +# ───────────────────────────────────────────────────────────────── +# File Discovery +# ───────────────────────────────────────────────────────────────── + +def find_sessions(root_path): + """Walk root_path/agents/*/sessions/ and yield (agent_name, jsonl_path).""" + agents_dir = Path(root_path) / "agents" + if not agents_dir.exists(): + return + for agent_folder in sorted(agents_dir.iterdir()): + if not agent_folder.is_dir(): + continue + sessions_dir = agent_folder / SESSIONS_DIR_NAME + if not sessions_dir.exists(): + continue + agent_name = agent_folder.name + for jsonl_file in sorted(sessions_dir.glob("*.jsonl")): + if DELETED_SUFFIX in jsonl_file.name: + continue + yield agent_name, str(jsonl_file) + + +def get_sync_state(sessions_dir): + """Read .sync_state from sessions directory, return {path: mtime}.""" + state_path = Path(sessions_dir) / STATE_FILE + if not state_path.exists(): + return {} + try: + with open(state_path) as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + return {} + + +def save_sync_state(sessions_dir, state): + """Write .sync_state file.""" + state_path = Path(sessions_dir) / STATE_FILE + with open(state_path, "w") as f: + json.dump(state, f) + + +def get_new_files(root_path): + """Find files that are new or modified since last sync.""" + state = {} + all_sessions_dirs = set() + + agents_dir = Path(root_path) / "agents" + if agents_dir.exists(): + for agent_folder in agents_dir.iterdir(): + if agent_folder.is_dir(): + sessions_dir = agent_folder / SESSIONS_DIR_NAME + if sessions_dir.exists(): + all_sessions_dirs.add(str(sessions_dir)) + + # Load existing state from all session dirs + merged_state = {} + for sd in all_sessions_dirs: + sd_state = get_sync_state(sd) + merged_state.update(sd_state) + + new_files = [] + for agent_name, jsonl_path in find_sessions(root_path): + stat = os.stat(jsonl_path) + mtime = stat.st_mtime + file_key = jsonl_path + old_mtime = merged_state.get(file_key, 0) + if mtime > old_mtime: + new_files.append((agent_name, jsonl_path)) + merged_state[file_key] = mtime + + # Save new state + for sd in all_sessions_dirs: + dir_files = {k: v for k, v in merged_state.items() if k.startswith(sd)} + save_sync_state(sd, dir_files) + + return new_files + + +# ───────────────────────────────────────────────────────────────── +# JSONL Parser +# ───────────────────────────────────────────────────────────────── + +def parse_jsonl(file_path): + """Parse a JSONL file and return structured session data.""" + sessions = [] + messages = [] + tool_calls = [] + + # State tracking for model/thinking changes + current_model_provider = "" + current_model_id = "" + current_thinking_level = "" + + # Tool results lookup by tool_call_id + tool_results = {} + + events = [] + with open(file_path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + event = json.loads(line) + events.append(event) + except json.JSONDecodeError: + continue + + if not events: + return sessions, messages, tool_calls + + # First pass: extract session metadata + session_event = None + for event in events: + event_type = event.get("type", "") + if event_type == "session": + session_event = event + break + + if not session_event: + return sessions, messages, tool_calls + + session_id = session_event.get("id", "") + session_timestamp = session_event.get("timestamp", "") + session_cwd = session_event.get("cwd", "") + session_version = events[-1].get("version", 0) if events else 0 + + # Determine start and end time from all events + timestamps = [] + for event in events: + ts = event.get("timestamp", "") + if ts: + timestamps.append(ts) + + # Second pass: process events + message_seq = 0 + total_tokens = 0 + total_cost = 0.0 + message_count = 0 + tool_call_count = 0 + error_count = 0 + + for event in events: + event_type = event.get("type", "") + + if event_type == "model_change": + current_model_provider = event.get("provider", "") + current_model_id = event.get("modelId", "") + + elif event_type == "thinking_level_change": + current_thinking_level = event.get("thinkingLevel", "") + + elif event_type == "message": + role = event.get("role", "") + msg_id = event.get("id", "") + parent_id = event.get("parentId", "") + msg_timestamp = event.get("timestamp", "") + + # Extract text content (skip thinking) + content_items = event.get("content", []) + text_parts = [] + tc_list = [] + for item in content_items: + if isinstance(item, dict): + if item.get("type") == "text": + text_parts.append(item.get("text", "")) + elif item.get("type") == "toolCall": + tc_list.append(item) + # Skip thinking types + + content_text = "\n".join(text_parts) + + msg_data = { + "session_id": session_id, + "message_id": msg_id, + "parent_id": parent_id, + "seq": message_seq, + "role": role, + "content_text": content_text, + "raw_content": content_items if content_items else [], + "raw_message": event.get("content", []), + "timestamp": msg_timestamp, + } + + if role == "assistant": + usage = event.get("usage", {}) + msg_data.update({ + "model": current_model_id, + "provider": current_model_provider, + "stop_reason": event.get("stopReason", ""), + "tokens_input": usage.get("inputTokens", 0), + "tokens_output": usage.get("outputTokens", 0), + "tokens_cache_read": usage.get("cacheReadInputTokens", 0), + "tokens_cache_write": usage.get("cacheWriteInputTokens", 0), + "tokens_total": usage.get("totalTokens", 0), + }) + total_tokens += usage.get("totalTokens", 0) + + if event.get("cost"): + cost_val = event["cost"].get("total", 0.0) + msg_data["cost_total"] = cost_val + total_cost += cost_val + + message_count += 1 + + elif role == "toolResult": + msg_data.update({ + "tool_call_id": event.get("toolCallId", ""), + "tool_name": event.get("toolName", ""), + "is_error": event.get("isError", False), + "exit_code": event.get("exitCode"), + "duration_ms": event.get("durationMs"), + }) + if event.get("isError"): + error_count += 1 + # Store for tool call association + if event.get("toolCallId"): + tool_results[event["toolCallId"]] = { + "result_text": content_text, + "is_error": event.get("isError", False), + "exit_code": event.get("exitCode"), + "duration_ms": event.get("durationMs"), + } + + messages.append(msg_data) + message_seq += 1 + + # Extract tool calls from assistant messages + tc_seq = 0 + for tc in tc_list: + tool_call_data = { + "session_id": session_id, + "message_id": msg_id, + "tool_call_id": tc.get("id", f"call_{msg_id}_{tc_seq}"), + "tool_name": tc.get("name", "unknown"), + "arguments": tc.get("arguments", {}), + "seq": tc_seq, + } + # Enrich with tool result if available + tr = tool_results.get(tool_call_data["tool_call_id"], {}) + tool_call_data["result_text"] = tr.get("result_text", "") + tool_call_data["is_error"] = tr.get("is_error", False) + tool_call_data["exit_code"] = tr.get("exit_code") + tool_call_data["duration_ms"] = tr.get("duration_ms") + tool_calls.append(tool_call_data) + tool_call_count += 1 + tc_seq += 1 + + # Build session record + start_time = timestamps[0] if timestamps else session_timestamp + end_time = timestamps[-1] if timestamps else session_timestamp + + session_data = { + "session_id": session_id, + "session_version": session_version, + "model_provider": current_model_provider, + "model_id": current_model_id, + "thinking_level": current_thinking_level, + "start_time": start_time, + "end_time": end_time, + "cwd": session_cwd, + "total_tokens": total_tokens, + "total_cost": total_cost, + "message_count": message_count, + "tool_call_count": tool_call_count, + "error_count": error_count, + "raw_file_path": str(file_path), + "status": "active", + "metadata": {}, + } + + sessions.append(session_data) + return sessions, messages, tool_calls + + +# ───────────────────────────────────────────────────────────────── +# HTTP Client +# ───────────────────────────────────────────────────────────────── + +def push_to_api(remote_url, payload): + """POST structured JSON to Django API.""" + data = json.dumps(payload).encode("utf-8") + req = urllib.request.Request( + remote_url, + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + try: + with urllib.request.urlopen(req, timeout=120) as resp: + return json.loads(resp.read()) + except urllib.error.HTTPError as e: + print(f"HTTP Error {e.code}: {e.read().decode('utf-8', errors='replace')}") + raise + except urllib.error.URLError as e: + print(f"URL Error: {e.reason}") + raise + + +# ───────────────────────────────────────────────────────────────── +# Main +# ───────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser(description="Sync OpenClaw sessions to Django API") + parser.add_argument( + "--remote-url", + required=True, + help="Django API bulk_upsert endpoint URL", + ) + parser.add_argument( + "--root-path", + default=".", + help="Root path containing agents/ directory (default: current dir)", + ) + args = parser.parse_args() + + new_files = get_new_files(args.root_path) + if not new_files: + print("No new or modified session files found.") + return + + print(f"Found {len(new_files)} new/modified session(s).") + + total_sessions = 0 + total_messages = 0 + total_tool_calls = 0 + + # Group by agent_name (batch per agent) + agent_batches = {} + for agent_name, jsonl_path in new_files: + agent_batches.setdefault(agent_name, []).append(jsonl_path) + + for agent_name, file_paths in agent_batches.items(): + all_sessions = [] + all_messages = [] + all_tool_calls = [] + + for fp in file_paths: + print(f" Parsing: {fp}") + try: + sessions, messages, tool_calls = parse_jsonl(fp) + all_sessions.extend(sessions) + all_messages.extend(messages) + all_tool_calls.extend(tool_calls) + except Exception as e: + print(f" ERROR parsing {fp}: {e}") + continue + + if not all_sessions: + continue + + payload = { + "agent_name": agent_name, + "source_node": os.environ.get("SOURCE_NODE", "unknown"), + "sessions": all_sessions, + "messages": all_messages, + "tool_calls": all_tool_calls, + } + + print(f" Pushing {len(all_sessions)} session(s), " + f"{len(all_messages)} message(s), " + f"{len(all_tool_calls)} tool call(s)...") + + try: + result = push_to_api(args.remote_url, payload) + print(f" OK: {result}") + total_sessions += result.get("sessions_upserted", 0) + total_messages += result.get("messages_upserted", 0) + total_tool_calls += result.get("tool_calls_upserted", 0) + except Exception: + print(f" FAILED to push {agent_name} sessions.") + + print(f"\nSync complete: {total_sessions} sessions, " + f"{total_messages} messages, {total_tool_calls} tool calls pushed.") + + +if __name__ == "__main__": + main() +``` + +- [ ] **Step 2: 语法检查** + +```bash +python -m py_compile scripts/sync_sessions.py +``` + +Expected: no output (success). + +- [ ] **Step 3: Commit** + +```bash +git add scripts/sync_sessions.py +git commit -m "feat: client JSONL parse and push script" +``` + +--- + +### Task 7: Docker 部署配置 + +**Files:** +- Create: `Dockerfile`, `docker-compose.yml`, `.dockerignore` +- Create: `nginx/nginx.conf.placeholder` + +- [ ] **Step 1: .dockerignore** + +`.dockerignore`: +``` +__pycache__ +*.pyc +.venv +.env +*.egg-info +.git +.pytest_cache +.mypy_cache +tests/ +scripts/ +docs/ +db.sqlite3 +``` + +- [ ] **Step 2: Dockerfile** + +`Dockerfile`: +```dockerfile +FROM python:3.12-slim + +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential libpq-dev \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +EXPOSE ${DJANGO_PORT:-8000} + +CMD ["gunicorn", "--bind", "0.0.0.0:${DJANGO_PORT:-8000}", \ + "--workers", "4", "--timeout", "120", \ + "config.wsgi:application"] +``` + +- [ ] **Step 3: docker-compose.yml** + +`docker-compose.yml`: +```yaml +services: + web: + build: + context: . + dockerfile: Dockerfile + container_name: openclaw-archive + env_file: + - .env + ports: + - "${DJANGO_PORT:-8000}:${DJANGO_PORT:-8000}" + volumes: + - static_volume:/app/staticfiles + - jsonl_archive:/app/archive + restart: unless-stopped + + # nginx placeholder (uncomment for production) + # nginx: + # image: nginx:alpine + # container_name: openclaw-nginx + # ports: + # - "80:80" + # volumes: + # - ./nginx/nginx.conf:/etc/nginx/nginx.conf + # - static_volume:/app/staticfiles:ro + # depends_on: + # - web + +volumes: + static_volume: + jsonl_archive: +``` + +- [ ] **Step 4: Nginx placeholder** + +`nginx/nginx.conf.placeholder`: +```nginx +# Nginx reverse proxy placeholder for OpenClaw Archive +# +# To enable: +# 1. Rename this file to nginx.conf +# 2. Update server_name and SSL certificate paths +# 3. Uncomment the nginx service in docker-compose.yml + +upstream django { + server web:8000; +} + +server { + listen 80; + server_name _; # Update for production + + location /static/ { + alias /app/staticfiles/; + } + + location / { + proxy_pass http://django; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + } +} +``` + +- [ ] **Step 5: Settings 更新 — 读取环境变量** + +`src/config/settings/base.py` 已使用 `os.environ.get()` 读取环境变量,无需修改。 + +- [ ] **Step 6: Commit** + +```bash +git add Dockerfile docker-compose.yml .dockerignore nginx/nginx.conf.placeholder +git commit -m "feat: Docker Compose deployment configuration" +``` + +--- + +### Task 8: TimescaleDB Hypertable Migration + +**Files:** +- Create: `src/openclaw/migrations/XXXX_add_hypertables.py` (数字取决于之前的 migration) + +⚠️ 此 task 需要真实的 PostgreSQL + TimescaleDB 环境。开发时可以用 `--fake` 跳过,生产环境必须真实执行。 + +- [ ] **Step 1: 创建 empty migration** + +```bash +cd src && python manage.py makemigrations openclaw --empty -n add_hypertables +``` + +- [ ] **Step 2: 编辑 migration** + +生成的文件 `src/openclaw/migrations/XXXX_add_hypertables.py`: + +```python +from django.db import migrations + +CREATE_HYPERTABLES = """ +SELECT create_hypertable( + 'sessions', + 'start_time', + if_not_exists => TRUE +); + +SELECT create_hypertable( + 'messages', + 'timestamp', + if_not_exists => TRUE +); +""" + + +class Migration(migrations.Migration): + + dependencies = [ + ('openclaw', 'PREVIOUS_MIGRATION_NUMBER'), # Update this + ] + + operations = [ + migrations.RunSQL(CREATE_HYPERTABLES, migrations.RunSQL.noop), + ] +``` + +- [ ] **Step 3: Commit** + +```bash +git add src/openclaw/migrations/ +git commit -m "feat: TimescaleDB hypertable migrations" +``` + +--- + +### Task 9: 端到端测试与 README + +**Files:** +- Create: `README.md` (更新) + +- [ ] **Step 1: 更新 README** + +`README.md`: +```markdown +# agent-base + +OpenClaw Session Archive — 多 Agent 会话解析与归档系统。 + +## Architecture + +Three nodes (Mac Mini, Ubuntu1, Ubuntu2) run a sync script that parses local JSONL session files and pushes structured data to a central Django API. The Django service runs in Docker Compose and stores data in a remote PostgreSQL + TimescaleDB instance on NAS. + +## Quick Start + +```bash +# 1. Configure environment +cp .env.example .env +# Edit .env with your database credentials and Django settings + +# 2. Build and start +docker compose build +docker compose run --rm web python manage.py migrate +docker compose run --rm web python manage.py createsuperuser +docker compose up -d + +# 3. Access +# Django Admin: http://:8000/admin/ +# API: http://:8000/api/sessions/bulk_upsert/ +``` + +## Running Tests + +```bash +pip install -r requirements-dev.txt +pytest -v +``` + +## Client Sync Script + +Deploy `scripts/sync_sessions.py` on each node: + +```bash +python sync_sessions.py --remote-url http://:8000/api/sessions/bulk_upsert/ +``` + +Set `SOURCE_NODE` environment variable on each node (`macmini`, `ubuntu1`, `ubuntu2`). + +## Daily Export + +In Django Admin, select sessions and choose "Export selected sessions to Markdown" action. +``` + +- [ ] **Step 2: 端到端测试流程** + +```bash +# 1. 用 SQLite 跑通完整流程 +cd src +python manage.py migrate +python manage.py createsuperuser # 可选 + +# 2. 用 pytest fixture 创建测试数据,push 到 API +cd .. +pytest -v + +# 3. 启动 Django,手动调 sync_sessions.py(用测试用的 JSONL 文件)验证 +cd src && python manage.py runserver & +cd .. +python scripts/sync_sessions.py --remote-url http://localhost:8000/api/sessions/bulk_upsert/ --root-path /path/to/test/agents +``` + +- [ ] **Step 3: Commit** + +```bash +git add README.md +git commit -m "docs: update README with architecture and quick start" +``` + +--- + +## Self-Review + +### 1. Spec Coverage + +| Spec Section | Covered By | +|-----|-----| +| 4.1 Session 表 | Task 1 - Session model | +| 4.2 Message 表 | Task 1 - Message model | +| 4.3 ToolCall 表 | Task 1 - ToolCall model | +| 4.4 TimescaleDB hypertable | Task 8 - hypertable migration | +| 5.1 推送脚本 | Task 6 - sync_sessions.py | +| 5.2 定时任务 | Task 6 - script + cron usage | +| 6. API bulk_upsert | Task 2 - views.py + services.py | +| 7.1-7.4 Admin CRUD | Task 3 - admin.py | +| 7.5 按时间范围查询 | Task 4 - daily_view.html + admin view | +| 7.6 Daily Markdown 导出 | Task 5 - export.py + Admin action | +| 8.1 客户端解析 | Task 6 - parse_jsonl function | +| 8.2 服务端写入 | Task 2 - BulkUpsertService | +| 9. 技术栈 | All tasks | +| 10. Docker Compose | Task 7 | +| 12. 非技术决策 | Task 1 (unique_together), Task 6 (skip .deleted.), Task 6 (SOURCE_NODE env) | + +### 2. Placeholder Scan + +No TBD, TODO, or "similar to" patterns found. All code blocks contain actual implementations. + +### 3. Type Consistency + +- `session_id` is consistently a `CharField` (not UUIDField) — matches API usage +- `agent_name + session_id` unique_together used consistently in both model and service for idempotency +- `timestamps` use ISO 8601 strings in API, parsed by `_parse_ts()` in service +- `role_labels` dict in admin matches actual DB role values: `"user"`, `"assistant"`, `"toolResult"` + +One note: `created_at` and `updated_at` fields are auto-added to all models. The spec doesn't mention them but they're standard Django practice and harmless. + +---