# OpenClaw Session Archive 实施计划 > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** 构建一个 Django + DRF 项目,包含数据模型、批量写入 API、自定义 Admin 视图、每日 Markdown 导出,以及客户端 JSONL 解析推送脚本,通过 Docker Compose 部署。 **Architecture:** 三节点各运行一个独立解析脚本(纯 Python),将 JSONL 解析为结构化 JSON 后 POST 到 Django API。Django 服务容器化运行,连接远程 NAS 上的 PostgreSQL + TimescaleDB。 **Tech Stack:** Python 3.12, Django 5.x, DRF 3.15+, psycopg 3.x, PostgreSQL 16+ TimescaleDB, Docker Compose, pytest, pytest-django --- ## 文件结构总览 ``` agent-base/ ├── manage.py # Django 入口 ├── requirements.txt # Python 依赖 ├── requirements-dev.txt # 开发依赖(pytest 等) ├── pyproject.toml # pytest 配置 ├── .env.example # 环境变量模板 ├── .gitignore ├── .dockerignore ├── Dockerfile # Django 生产镜像 ├── docker-compose.yml # 编排配置 ├── nginx/ │ └── nginx.conf.placeholder # Nginx 反代占位(预留) ├── src/ │ ├── config/ # Django 项目配置 │ │ ├── __init__.py │ │ ├── settings/ │ │ │ ├── __init__.py │ │ │ ├── base.py # 公共配置 │ │ │ └── dev.py # 开发覆盖(DB 用 SQLite) │ │ ├── urls.py # 路由 │ │ └── wsgi.py │ └── openclaw/ # Django app │ ├── __init__.py │ ├── apps.py │ ├── models.py # Session, Message, ToolCall │ ├── views.py # API views │ ├── admin.py # Admin 自定义 │ ├── services.py # 批量写入 + Markdown 导出 │ ├── urls.py # app 路由 │ └── templates/ │ └── admin/ │ └── openclaw/ │ └── daily_view.html ├── tests/ │ ├── conftest.py # pytest fixtures │ ├── test_models.py # 模型测试 │ ├── test_api.py # API 测试 │ ├── test_services.py # 服务层测试 │ └── test_admin_export.py # 导出测试 ├── scripts/ │ └── sync_sessions.py # 客户端解析与推送脚本 └── docs/ ├── specs/ # Spec 文档 └── plans/ # 实施计划 ``` --- ### Task 0: 项目骨架与测试基础 **Files:** - Create: `pyproject.toml`, `manage.py`, `requirements.txt`, `requirements-dev.txt` - Create: `src/config/__init__.py`, `src/config/settings/__init__.py`, `src/config/settings/base.py`, `src/config/settings/dev.py`, `src/config/urls.py`, `src/config/wsgi.py` - Create: `src/openclaw/__init__.py`, `src/openclaw/apps.py` - Create: `tests/conftest.py` - Create: `.gitignore`, `.env.example` - [ ] **Step 1: 依赖文件** `requirements.txt`: ``` Django>=5.0,<6.0 djangorestframework>=3.15,<4.0 psycopg[binary]>=3.1,<4.0 gunicorn>=22.0,<24.0 ``` `requirements-dev.txt`: ``` -r requirements.txt pytest>=8.0,<9.0 pytest-django>=4.8,<5.0 ``` - [ ] **Step 2: pytest 配置** `pyproject.toml`: ```toml [tool.pytest.ini_options] DJANGO_SETTINGS_MODULE = "config.settings.dev" python_files = ["tests.py", "test_*.py"] pythonpath = ["src"] ``` - [ ] **Step 3: Django settings** `src/config/settings/__init__.py`: ```python from .base import * # noqa: F401,F403 ``` `src/config/settings/base.py`: ```python import os SECRET_KEY = os.environ.get("DJANGO_SECRET_KEY", "dev-secret-key") DEBUG = False ALLOWED_HOSTS = os.environ.get("DJANGO_ALLOWED_HOSTS", "*").split(",") INSTALLED_APPS = [ "django.contrib.admin", "django.contrib.auth", "django.contrib.contenttypes", "django.contrib.sessions", "django.contrib.messages", "django.contrib.staticfiles", "rest_framework", "openclaw", ] MIDDLEWARE = [ "django.middleware.security.SecurityMiddleware", "django.contrib.sessions.middleware.SessionMiddleware", "django.middleware.common.CommonMiddleware", "django.middleware.csrf.CsrfViewMiddleware", "django.contrib.auth.middleware.AuthenticationMiddleware", "django.contrib.messages.middleware.MessageMiddleware", "django.middleware.clickjacking.XFrameOptionsMiddleware", ] ROOT_URLCONF = "config.urls" TEMPLATES = [ { "BACKEND": "django.template.backends.django.DjangoTemplates", "DIRS": [], "APP_DIRS": True, "OPTIONS": { "context_processors": [ "django.template.context_processors.debug", "django.template.context_processors.request", "django.contrib.auth.context_processors.auth", "django.contrib.messages.context_processors.messages", ], }, }, ] DATABASES = { "default": { "ENGINE": "django.db.backends.postgresql", "HOST": os.environ.get("DB_HOST", "localhost"), "PORT": os.environ.get("DB_PORT", "5432"), "NAME": os.environ.get("DB_NAME", "openclaw_archive"), "USER": os.environ.get("DB_USER", "openclaw"), "PASSWORD": os.environ.get("DB_PASSWORD", ""), } } DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField" STATIC_URL = "static/" ``` `src/config/settings/dev.py`: ```python from .base import * # noqa: F401,F403 DEBUG = True DATABASES = { "default": { "ENGINE": "django.db.backends.sqlite3", "NAME": BASE_DIR / "db.sqlite3", # noqa: F821 } } ``` - [ ] **Step 4: Django 基础文件** `src/config/urls.py`: ```python from django.contrib import admin from django.urls import path, include urlpatterns = [ path("admin/", admin.site.urls), path("api/", include("openclaw.urls")), ] ``` `src/config/wsgi.py`: ```python import os from django.core.wsgi import get_wsgi_application os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings") application = get_wsgi_application() ``` `src/openclaw/apps.py`: ```python from django.apps import AppConfig class OpenclawConfig(AppConfig): default_auto_field = "django.db.models.BigAutoField" name = "openclaw" ``` - [ ] **Step 5: manage.py** `manage.py`: ```python #!/usr/bin/env python import os import sys def main(): os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings") try: from django.core.management import execute_from_command_line except ImportError as exc: raise ImportError( "Couldn't import Django. " "Are you sure it's installed and available on your PYTHONPATH " f"environment variable is set? Did you forget to activate a virtual environment?" ) from exc execute_from_command_line(sys.argv) if __name__ == "__main__": main() ``` - [ ] **Step 6: 测试基础** `tests/conftest.py`: ```python import pytest @pytest.fixture(autouse=True) def media_storage(settings, tmpdir): settings.MEDIA_ROOT = str(tmpdir) ``` - [ ] **Step 7: .gitignore 和 .env.example** `.gitignore`: ``` __pycache__/ *.py[cod] *.so *.egg-info/ dist/ .venv/ db.sqlite3 .env *.sqlite3 ``` `.env.example`: ``` # Django DJANGO_SECRET_KEY=CHANGE_ME DJANGO_PORT=8000 DJANGO_ALLOWED_HOSTS=* # Database (NAS) DB_HOST=192.168.x.x DB_PORT=5432 DB_NAME=openclaw_archive DB_USER=openclaw DB_PASSWORD=CHANGE_ME ``` - [ ] **Step 8: 安装依赖并验证 Django 启动** ```bash pip install -r requirements-dev.txt cd src && python -c "import django; django.setup(); print('Django OK')" cd .. pytest --collect-only ``` Expected: `Django OK` and test collection passes (0 tests). - [ ] **Step 9: Commit** ```bash git add . git commit -m "feat: Django project skeleton with pytest setup" ``` --- ### Task 1: 数据模型 **Files:** - Create: `src/openclaw/models.py` - Create: `tests/test_models.py` - [ ] **Step 1: 写模型测试** `tests/test_models.py`: ```python from datetime import datetime, timezone import pytest from openclaw.models import Session, Message, ToolCall @pytest.mark.django_db class TestModelFields: def test_session_creation(self): s = Session.objects.create( session_id="a" * 36, agent_name="xingyao", source_node="macmini", status="active", ) assert s.session_id == "a" * 36 assert s.total_tokens == 0 assert s.message_count == 0 def test_message_creation(self): s = Session.objects.create( session_id="b" * 36, agent_name="test", source_node="ubuntu1", status="active", ) msg = Message.objects.create( session=s, message_id="msg-001", parent_id="root", role="assistant", timestamp=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc), ) assert msg.role == "assistant" assert msg.tokens_total == 0 def test_toolcall_creation(self): s = Session.objects.create( session_id="c" * 36, agent_name="test", source_node="ubuntu2", status="active", ) msg = Message.objects.create( session=s, message_id="msg-002", parent_id="root", role="assistant", timestamp=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc), ) tc = ToolCall.objects.create( session=s, message=msg, tool_call_id="call_0", tool_name="exec", ) assert tc.tool_name == "exec" assert tc.is_error is False ``` - [ ] **Step 2: 运行测试确认失败** ```bash pytest tests/test_models.py -v ``` Expected: FAIL — Model not found. - [ ] **Step 3: 实现模型** `src/openclaw/models.py`: ```python from django.db import models class Session(models.Model): session_id = models.CharField(max_length=64) agent_name = models.CharField(max_length=128) source_node = models.CharField(max_length=64) session_version = models.IntegerField(default=0) model_provider = models.CharField(max_length=64, blank=True, default="") model_id = models.CharField(max_length=128, blank=True, default="") thinking_level = models.CharField(max_length=64, blank=True, default="") start_time = models.DateTimeField(null=True, blank=True) end_time = models.DateTimeField(null=True, blank=True) cwd = models.CharField(max_length=512, blank=True, default="") total_tokens = models.IntegerField(default=0) total_cost = models.FloatField(default=0.0) message_count = models.IntegerField(default=0) tool_call_count = models.IntegerField(default=0) error_count = models.IntegerField(default=0) raw_file_path = models.TextField(blank=True, default="") pushed_at = models.DateTimeField(null=True, blank=True) status = models.CharField(max_length=16, default="active") metadata = models.JSONField(default=dict, blank=True) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class Meta: db_table = "sessions" unique_together = ("session_id", "agent_name") ordering = ["-start_time"] def __str__(self): return f"Session({self.session_id} {self.agent_name})" class Message(models.Model): session = models.ForeignKey( Session, on_delete=models.CASCADE, related_name="messages" ) message_id = models.CharField(max_length=128) parent_id = models.CharField(max_length=128, blank=True, default="") seq = models.IntegerField(default=0) role = models.CharField(max_length=32) content_text = models.TextField(blank=True, default="") raw_content = models.JSONField(default=list, blank=True) raw_message = models.JSONField(default=dict, blank=True) timestamp = models.DateTimeField() # assistant 专用 model = models.CharField(max_length=128, blank=True, default="") provider = models.CharField(max_length=64, blank=True, default="") stop_reason = models.CharField(max_length=64, blank=True, default="") tokens_input = models.IntegerField(default=0) tokens_output = models.IntegerField(default=0) tokens_cache_read = models.IntegerField(default=0) tokens_cache_write = models.IntegerField(default=0) tokens_total = models.IntegerField(default=0) cost_total = models.FloatField(default=0.0) # toolResult 专用 tool_call_id = models.CharField(max_length=128, blank=True, default="") tool_name = models.CharField(max_length=128, blank=True, default="") is_error = models.BooleanField(default=False) exit_code = models.IntegerField(null=True, blank=True) duration_ms = models.IntegerField(null=True, blank=True) created_at = models.DateTimeField(auto_now_add=True) class Meta: db_table = "messages" ordering = ["seq"] def __str__(self): return f"Message({self.message_id} {self.role})" class ToolCall(models.Model): session = models.ForeignKey( Session, on_delete=models.CASCADE, related_name="tool_calls" ) message = models.ForeignKey( Message, on_delete=models.CASCADE, related_name="tool_calls" ) tool_call_id = models.CharField(max_length=128) tool_name = models.CharField(max_length=128) arguments = models.JSONField(default=dict, blank=True) result_text = models.TextField(blank=True, default="") is_error = models.BooleanField(default=False) exit_code = models.IntegerField(null=True, blank=True) duration_ms = models.IntegerField(null=True, blank=True) seq = models.IntegerField(default=0) created_at = models.DateTimeField(auto_now_add=True) class Meta: db_table = "tool_calls" ordering = ["seq"] def __str__(self): return f"ToolCall({self.tool_name} {self.tool_call_id})" ``` - [ ] **Step 4: make migrations + migrate** ```bash cd src && python manage.py makemigrations openclaw python manage.py migrate cd .. ``` - [ ] **Step 5: 重新运行测试确认通过** ```bash pytest tests/test_models.py -v ``` Expected: 3 PASS. - [ ] **Step 6: Commit** ```bash git add src/openclaw/models.py tests/test_models.py src/openclaw/migrations/ pyproject.toml git commit -m "feat: add Session, Message, ToolCall models" ``` --- ### Task 2: 批量写入 API **Files:** - Create: `src/openclaw/urls.py` - Create: `src/openclaw/views.py` - Create: `src/openclaw/services.py` - Create: `tests/test_api.py` - Create: `tests/test_services.py` - [ ] **Step 1: 写服务层测试(BulkUpsertService)** `tests/test_services.py`: ```python from datetime import datetime, timezone import pytest from openclaw.models import Session, Message, ToolCall from openclaw.services import BulkUpsertService @pytest.mark.django_db class TestBulkUpsertService: def _new_session_payload(self): return { "agent_name": "test-agent", "source_node": "macmini", "sessions": [ { "session_id": "sess-001", "session_version": 1, "model_provider": "anthropic", "model_id": "claude-sonnet-4-6", "thinking_level": "high", "start_time": "2026-04-05T10:00:00Z", "end_time": "2026-04-05T10:30:00Z", "cwd": "/tmp/test", "total_tokens": 5000, "total_cost": 0.12, "message_count": 10, "tool_call_count": 3, "error_count": 0, "raw_file_path": "/path/to/sessions/sess-001.jsonl", "status": "active", "metadata": {}, } ], "messages": [ { "session_id": "sess-001", "message_id": "msg-001", "parent_id": "root", "seq": 0, "role": "user", "content_text": "Hello", "raw_content": [{"type": "text", "text": "Hello"}], "raw_message": {}, "timestamp": "2026-04-05T10:05:00Z", } ], "tool_calls": [], } def test_upsert_new_session(self): result = BulkUpsertService.upsert(self._new_session_payload()) assert result["sessions_upserted"] == 1 assert Session.objects.get(session_id="sess-001") def test_upsert_idempotent(self): BulkUpsertService.upsert(self._new_session_payload()) result = BulkUpsertService.upsert(self._new_session_payload()) # second push should not create duplicate assert Session.objects.filter(session_id="sess-001").count() == 1 assert result["sessions_upserted"] == 0 # skipped def test_upsert_with_messages_and_toolcalls(self): payload = self._new_session_payload() payload["messages"].append( { "session_id": "sess-001", "message_id": "msg-002", "parent_id": "msg-001", "seq": 1, "role": "assistant", "content_text": "Hi there", "raw_content": [{"type": "text", "text": "Hi there"}], "raw_message": {}, "timestamp": "2026-04-05T10:06:00Z", "model": "claude-sonnet-4-6", "provider": "anthropic", "stop_reason": "end_turn", "tokens_input": 100, "tokens_output": 50, "tokens_total": 150, "cost_total": 0.01, } ) payload["tool_calls"].append( { "session_id": "sess-001", "message_id": "msg-002", "tool_call_id": "call_0", "tool_name": "exec", "arguments": {"command": "ls"}, "result_text": "file.txt", "is_error": False, "exit_code": 0, "duration_ms": 200, "seq": 0, } ) result = BulkUpsertService.upsert(payload) assert result["messages_upserted"] == 2 assert result["tool_calls_upserted"] == 1 assert ToolCall.objects.get(tool_call_id="call_0").tool_name == "exec" ``` - [ ] **Step 2: 运行测试确认失败** ```bash pytest tests/test_services.py -v ``` Expected: FAIL — module not found. - [ ] **Step 3: 实现 BulkUpsertService** `src/openclaw/services.py`: ```python from datetime import datetime, timezone from django.db import transaction from openclaw.models import Session, Message, ToolCall def _parse_ts(value): if not value: return None if isinstance(value, str): # Handle ISO 8601 Z suffix value = value.replace("Z", "+00:00") return datetime.fromisoformat(value) return value class BulkUpsertService: @staticmethod @transaction.atomic def upsert(payload): agent_name = payload["agent_name"] source_node = payload["source_node"] sessions_data = payload.get("sessions", []) messages_data = payload.get("messages", []) tool_calls_data = payload.get("tool_calls", []) sessions_upserted = 0 messages_upserted = 0 tool_calls_upserted = 0 for sess in sessions_data: session_id = sess["session_id"] defaults = { "source_node": source_node, "session_version": sess.get("session_version", 0), "model_provider": sess.get("model_provider", ""), "model_id": sess.get("model_id", ""), "thinking_level": sess.get("thinking_level", ""), "start_time": _parse_ts(sess.get("start_time")), "end_time": _parse_ts(sess.get("end_time")), "cwd": sess.get("cwd", ""), "total_tokens": sess.get("total_tokens", 0), "total_cost": sess.get("total_cost", 0.0), "message_count": sess.get("message_count", 0), "tool_call_count": sess.get("tool_call_count", 0), "error_count": sess.get("error_count", 0), "raw_file_path": sess.get("raw_file_path", ""), "pushed_at": datetime.now(timezone.utc), "status": sess.get("status", "active"), "metadata": sess.get("metadata", {}), } _, created = Session.objects.update_or_create( session_id=session_id, agent_name=agent_name, defaults=defaults, ) if created: sessions_upserted += 1 # Build session lookup: session_id -> Session instance session_ids = {s["session_id"] for s in sessions_data} session_lookup = { s.session_id: s for s in Session.objects.filter( session_id__in=session_ids, agent_name=agent_name ) } # Upsert messages for msg in messages_data: session = session_lookup.get(msg["session_id"]) if not session: continue defaults = { "parent_id": msg.get("parent_id", ""), "seq": msg.get("seq", 0), "role": msg.get("role", ""), "content_text": msg.get("content_text", ""), "raw_content": msg.get("raw_content", []), "raw_message": msg.get("raw_message", {}), "timestamp": _parse_ts(msg.get("timestamp")), "model": msg.get("model", ""), "provider": msg.get("provider", ""), "stop_reason": msg.get("stop_reason", ""), "tokens_input": msg.get("tokens_input", 0), "tokens_output": msg.get("tokens_output", 0), "tokens_cache_read": msg.get("tokens_cache_read", 0), "tokens_cache_write": msg.get("tokens_cache_write", 0), "tokens_total": msg.get("tokens_total", 0), "cost_total": msg.get("cost_total", 0.0), "tool_call_id": msg.get("tool_call_id", ""), "tool_name": msg.get("tool_name", ""), "is_error": msg.get("is_error", False), "exit_code": msg.get("exit_code"), "duration_ms": msg.get("duration_ms"), } Message.objects.update_or_create( session=session, message_id=msg["message_id"], defaults=defaults, ) messages_upserted += 1 # Build message lookup: message_id -> Message instance msg_lookup = { m.message_id: m for m in Message.objects.filter(session__in=session_lookup.values()) } # Upsert tool_calls for tc in tool_calls_data: session = session_lookup.get(tc["session_id"]) message = msg_lookup.get(tc["message_id"]) if not session or not message: continue ToolCall.objects.update_or_create( session=session, message=message, tool_call_id=tc["tool_call_id"], defaults={ "tool_name": tc.get("tool_name", ""), "arguments": tc.get("arguments", {}), "result_text": tc.get("result_text", ""), "is_error": tc.get("is_error", False), "exit_code": tc.get("exit_code"), "duration_ms": tc.get("duration_ms"), "seq": tc.get("seq", 0), }, ) tool_calls_upserted += 1 return { "sessions_upserted": sessions_upserted, "messages_upserted": messages_upserted, "tool_calls_upserted": tool_calls_upserted, } ``` - [ ] **Step 4: 运行服务层测试** ```bash pytest tests/test_services.py -v ``` Expected: 3 PASS. - [ ] **Step 5: 写 API 测试** `tests/test_api.py`: ```python import pytest from openclaw.models import Session BULK_URL = "/api/sessions/bulk_upsert/" def _minimal_payload(): return { "agent_name": "test", "source_node": "macmini", "sessions": [ { "session_id": "test-session", "model_provider": "test", "model_id": "test-model", } ], "messages": [], "tool_calls": [], } @pytest.mark.django_db class TestBulkUpsertAPI: def test_bulk_upsert_ok(self, client): resp = client.post(BULK_URL, _minimal_payload(), content_type="application/json") assert resp.status_code == 200 data = resp.json() assert data["status"] == "ok" assert data["sessions_upserted"] == 1 def test_bulk_upsert_idempotent(self, client): client.post(BULK_URL, _minimal_payload(), content_type="application/json") resp = client.post(BULK_URL, _minimal_payload(), content_type="application/json") data = resp.json() assert data["sessions_upserted"] == 0 def test_bulk_upsert_missing_fields_returns_400(self, client): resp = client.post(BULK_URL, {}, content_type="application/json") assert resp.status_code == 400 ``` - [ ] **Step 6: 运行 API 测试确认失败** ```bash pytest tests/test_api.py -v ``` Expected: FAIL — URL not found. - [ ] **Step 7: 实现 API view + 路由** `src/openclaw/urls.py`: ```python from django.urls import path from openclaw.views import sessions_bulk_upsert urlpatterns = [ path("sessions/bulk_upsert/", sessions_bulk_upsert, name="sessions_bulk_upsert"), ] ``` `src/openclaw/views.py`: ```python import json from django.http import JsonResponse from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_http_methods from openclaw.services import BulkUpsertService @csrf_exempt @require_http_methods(["POST"]) def sessions_bulk_upsert(request): try: payload = json.loads(request.body) except json.JSONDecodeError: return JsonResponse({"error": "Invalid JSON"}, status=400) if "agent_name" not in payload or "source_node" not in payload: return JsonResponse( {"error": "Missing agent_name or source_node"}, status=400 ) if "sessions" not in payload: return JsonResponse({"error": "Missing sessions"}, status=400) result = BulkUpsertService.upsert(payload) return JsonResponse({"status": "ok", **result}) ``` - [ ] **Step 8: 运行 API 测试确认通过** ```bash pytest tests/test_api.py -v ``` Expected: 3 PASS. - [ ] **Step 9: 运行所有测试** ```bash pytest -v ``` Expected: ALL PASS (3 models + 3 services + 3 api = 9). - [ ] **Step 10: Commit** ```bash git add src/openclaw/services.py src/openclaw/views.py src/openclaw/urls.py tests/test_services.py tests/test_api.py git commit -m "feat: bulk upsert API with idempotent writes" ``` --- ### Task 3: Django Admin 基础配置 **Files:** - Create: `src/openclaw/admin.py` - [ ] **Step 1: Admin 配置** `src/openclaw/admin.py`: ```python from django.contrib import admin from openclaw.models import Session, Message, ToolCall class MessageInline(admin.TabularInline): model = Message extra = 0 fields = ("seq", "role", "content_text", "timestamp") readonly_fields = ("seq", "role", "content_text", "timestamp") def has_add_permission(self, request, obj=None): return False def has_delete_permission(self, request, obj=None): return False class ToolCallInline(admin.TabularInline): model = ToolCall extra = 0 fields = ("seq", "tool_name", "tool_call_id", "is_error", "duration_ms") readonly_fields = ("seq", "tool_name", "tool_call_id", "is_error", "duration_ms") def has_add_permission(self, request, obj=None): return False def has_delete_permission(self, request, obj=None): return False @admin.register(Session) class SessionAdmin(admin.ModelAdmin): list_display = ( "session_id", "agent_name", "model_id", "total_tokens", "message_count", "start_time", ) list_filter = ("agent_name", "source_node", "model_id", "start_time") search_fields = ("session_id", "cwd") ordering = ("-start_time",) inlines = [MessageInline, ToolCallInline] readonly_fields = ( "session_id", "agent_name", "source_node", "start_time", "end_time", "pushed_at", ) @admin.register(Message) class MessageAdmin(admin.ModelAdmin): list_display = ("message_id", "session", "role", "timestamp", "tokens_total") list_filter = ("role", "model", "timestamp") search_fields = ("content_text",) ordering = ("-timestamp",) @admin.register(ToolCall) class ToolCallAdmin(admin.ModelAdmin): list_display = ("tool_name", "tool_call_id", "session", "is_error", "duration_ms") list_filter = ("tool_name", "is_error", "exit_code") ordering = ("-created_at",) ``` - [ ] **Step 2: 手动验证 Admin** ```bash cd src && python manage.py migrate && python manage.py createsuperuser cd src && python manage.py runserver ``` 访问 `http://localhost:8000/admin/` 确认三张表在 Admin 中可见。 - [ ] **Step 3: Commit** ```bash git add src/openclaw/admin.py git commit -m "feat: Django Admin for Session, Message, ToolCall with inlines" ``` --- ### Task 4: Admin 自定义视图 — 按日期查询对话 **Files:** - Modify: `src/openclaw/admin.py` - Create: `src/openclaw/templates/admin/openclaw/daily_view.html` - Modify: `src/openclaw/urls.py`(admin 路由由 admin.site 管理,不需要改) - [ ] **Step 1: 模板** `src/openclaw/templates/admin/openclaw/daily_view.html`: ```html {% extends "admin/base_site.html" %} {% block content %}

Daily Conversation View

{% if sessions %} {% for session in sessions %}

Session: {{ session.session_id }} ({{ session.agent_name }})

Model: {{ session.model_id }} | Tokens: {{ session.total_tokens }} | Start: {{ session.start_time|default:"N/A" }}

{% for msg in session.messages %}
{{ msg.timestamp|date:"H:i" }} {{ msg.get_role_label }} {% if msg.role == 'toolResult' %} {% if msg.tool_name %}[Tool: {{ msg.tool_name }}]{% endif %} {% endif %}
Content
{{ msg.content_text|default:"(empty)" }}
{% empty %}

No messages.

{% endfor %}
{% endfor %} {% endif %} {% endblock %} ``` - [ ] **Step 2: Admin 视图** 添加到 `src/openclaw/admin.py` 末尾: ```python from datetime import date from django.shortcuts import render from django.db.models import Prefetch class DailyConversationView(admin.ModelAdmin): """Admin custom view for date-range conversation browsing.""" def get_urls(self): from django.urls import path urls = super().get_urls() custom_urls = [ path("daily/", self.admin_site.admin_view(self.daily_view), name="openclaw_daily"), ] return custom_urls + urls def daily_view(self, request): start_str = request.GET.get("start") end_str = request.GET.get("end") agent_filter = request.GET.get("agent", "") start_date = start_str if start_str else date.today().isoformat() end_date = end_str if end_str else date.today().isoformat() agents = list( Session.objects.values_list("agent_name", flat=True) .distinct() .order_by("agent_name") ) sessions_qs = Session.objects.filter( start_time__date__gte=start_date, start_time__date__lte=end_date, ).order_by("start_time") if agent_filter: sessions_qs = sessions_qs.filter(agent_name=agent_filter) messages_prefetch = Prefetch( "messages", queryset=Message.objects.order_by("seq"), ) sessions_qs = sessions_qs.prefetch_related(messages_prefetch) session_list = [] for session in sessions_qs: messages = [] for msg in session.messages.all(): messages.append({ "timestamp": msg.timestamp, "role": msg.role, "content_text": msg.content_text, "tool_name": msg.tool_name, "get_role_label": self._role_label(msg.role), }) session_list.append({ "session_id": session.session_id, "agent_name": session.agent_name, "model_id": session.model_id, "total_tokens": session.total_tokens, "start_time": session.start_time, "messages": messages, }) context = dict( self.admin_site.each_context(request), start_date=start_date, end_date=end_date, selected_agent=agent_filter, agents=agents, sessions=session_list, title="Daily Conversation View", ) return render(request, "admin/openclaw/daily_view.html", context) @staticmethod def _role_label(role): labels = { "user": "User", "assistant": "Assistant", "toolResult": "Tool Result", } return labels.get(role, role) # Register Daily view via SessionAdmin: # In SessionAdmin inheritance, add daily_view url. # Actually, the cleanest way: create a standalone admin view. ``` 等等 — `DailyConversationView` 继承自 `ModelAdmin` 但并不需要绑定到一个 model。正确做法是用一个独立 view 函数注册到 admin。让我重写: ```python # Add this function to src/openclaw/admin.py from django.template.response import TemplateResponse def daily_conversation_view(request): """Admin standalone view for date-range conversation browsing.""" from datetime import date from django.db.models import Prefetch from openclaw.models import Session, Message start_str = request.GET.get("start") end_str = request.GET.get("end") agent_filter = request.GET.get("agent", "") start_date = start_str if start_str else date.today().isoformat() end_date = end_str if end_str else date.today().isoformat() agents = list( Session.objects.values_list("agent_name", flat=True) .distinct() .order_by("agent_name") ) sessions_qs = Session.objects.filter( start_time__date__gte=start_date, start_time__date__lte=end_date, ).order_by("start_time") if agent_filter: sessions_qs = sessions_qs.filter(agent_name=agent_filter) messages_prefetch = Prefetch( "messages", queryset=Message.objects.order_by("seq"), ) sessions_qs = sessions_qs.prefetch_related(messages_prefetch) role_labels = { "user": "User", "assistant": "Assistant", "toolResult": "Tool Result", } session_list = [] for session in sessions_qs: messages = [] for msg in session.messages.all(): messages.append({ "timestamp": msg.timestamp, "role": msg.role, "content_text": msg.content_text, "tool_name": msg.tool_name, "get_role_label": role_labels.get(msg.role, msg.role), }) session_list.append({ "session_id": session.session_id, "agent_name": session.agent_name, "model_id": session.model_id, "total_tokens": session.total_tokens, "start_time": session.start_time, "messages": messages, }) context = { **admin.site.each_context(request), "start_date": start_date, "end_date": end_date, "selected_agent": agent_filter, "agents": agents, "sessions": session_list, "title": "Daily Conversation View", } return TemplateResponse(request, "admin/openclaw/daily_view.html", context) # Add URL in SessionAdmin or as standalone: # Register the URL via custom admin urlpattern. class CustomAdminSite(admin.AdminSite): def get_urls(self): from django.urls import path urls = super().get_urls() urls += [ path("openclaw/daily/", daily_conversation_view, name="openclaw_daily"), ] return urls # Then swap admin.site in config/urls.py and manage.py ``` Actually, this is getting complicated with swapping the admin site. The simpler approach is to add the URL through an existing ModelAdmin's `get_urls`. Let me rewrite this section cleanly: 添加到 `src/openclaw/admin.py`(接在 ToolCallAdmin 后面): ```python from django.template.response import TemplateResponse from datetime import date from django.db.models import Prefetch def daily_conversation_view(self, request): """Admin standalone view for date-range conversation browsing.""" start_str = request.GET.get("start") end_str = request.GET.get("end") agent_filter = request.GET.get("agent", "") start_date = start_str if start_str else date.today().isoformat() end_date = end_str if end_str else date.today().isoformat() agents = list( Session.objects.values_list("agent_name", flat=True) .distinct() .order_by("agent_name") ) sessions_qs = Session.objects.filter( start_time__date__gte=start_date, start_time__date__lte=end_date, ).order_by("start_time") if agent_filter: sessions_qs = sessions_qs.filter(agent_name=agent_filter) messages_prefetch = Prefetch( "messages", queryset=Message.objects.order_by("seq"), ) sessions_qs = sessions_qs.prefetch_related(messages_prefetch) role_labels = { "user": "User", "assistant": "Assistant", "toolResult": "Tool Result", } session_list = [] for session in sessions_qs: messages = [] for msg in session.messages.all(): messages.append({ "timestamp": msg.timestamp, "role": msg.role, "content_text": msg.content_text, "tool_name": msg.tool_name, "get_role_label": role_labels.get(msg.role, msg.role), }) session_list.append({ "session_id": session.session_id, "agent_name": session.agent_name, "model_id": session.model_id, "total_tokens": session.total_tokens, "start_time": session.start_time, "messages": messages, }) context = { **admin.site.each_context(request), "start_date": start_date, "end_date": end_date, "selected_agent": agent_filter, "agents": agents, "sessions": session_list, "title": "Daily Conversation View", } return TemplateResponse(request, "admin/openclaw/daily_view.html", context) # Add URL through SessionAdmin get_urls class SessionAdmin(admin.ModelAdmin): # ... (as defined above) ... def get_urls(self): from django.urls import path urls = super().get_urls() custom_urls = [ path("daily/", admin.site.admin_view(daily_conversation_view), name="openclaw_daily"), ] return custom_urls + urls ``` 好的,为了避免重复定义 SessionAdmin,最终 `src/openclaw/admin.py` 完整代码如下: ```python from django.contrib import admin from django.template.response import TemplateResponse from datetime import date from django.db.models import Prefetch from openclaw.models import Session, Message, ToolCall class MessageInline(admin.TabularInline): model = Message extra = 0 fields = ("seq", "role", "content_text", "timestamp") readonly_fields = ("seq", "role", "content_text", "timestamp") def has_add_permission(self, request, obj=None): return False def has_delete_permission(self, request, obj=None): return False class ToolCallInline(admin.TabularInline): model = ToolCall extra = 0 fields = ("seq", "tool_name", "tool_call_id", "is_error", "duration_ms") readonly_fields = ("seq", "tool_name", "tool_call_id", "is_error", "duration_ms") def has_add_permission(self, request, obj=None): return False def has_delete_permission(self, request, obj=None): return False def daily_conversation_view(request): """Admin standalone view for date-range conversation browsing.""" start_str = request.GET.get("start") end_str = request.GET.get("end") agent_filter = request.GET.get("agent", "") start_date = start_str if start_str else date.today().isoformat() end_date = end_str if end_str else date.today().isoformat() agents = list( Session.objects.values_list("agent_name", flat=True) .distinct() .order_by("agent_name") ) sessions_qs = Session.objects.filter( start_time__date__gte=start_date, start_time__date__lte=end_date, ).order_by("start_time") if agent_filter: sessions_qs = sessions_qs.filter(agent_name=agent_filter) messages_prefetch = Prefetch( "messages", queryset=Message.objects.order_by("seq"), ) sessions_qs = sessions_qs.prefetch_related(messages_prefetch) role_labels = { "user": "User", "assistant": "Assistant", "toolResult": "Tool Result", } session_list = [] for session in sessions_qs: messages = [] for msg in session.messages.all(): messages.append({ "timestamp": msg.timestamp, "role": msg.role, "content_text": msg.content_text, "tool_name": msg.tool_name, "get_role_label": role_labels.get(msg.role, msg.role), }) session_list.append({ "session_id": session.session_id, "agent_name": session.agent_name, "model_id": session.model_id, "total_tokens": session.total_tokens, "start_time": session.start_time, "messages": messages, }) context = { **admin.site.each_context(request), "start_date": start_date, "end_date": end_date, "selected_agent": agent_filter, "agents": agents, "sessions": session_list, "title": "Daily Conversation View", } return TemplateResponse(request, "admin/openclaw/daily_view.html", context) @admin.register(Session) class SessionAdmin(admin.ModelAdmin): list_display = ( "session_id", "agent_name", "model_id", "total_tokens", "message_count", "start_time", ) list_filter = ("agent_name", "source_node", "model_id", "start_time") search_fields = ("session_id", "cwd") ordering = ("-start_time",) inlines = [MessageInline, ToolCallInline] readonly_fields = ( "session_id", "agent_name", "source_node", "start_time", "end_time", "pushed_at", ) def get_urls(self): from django.urls import path urls = super().get_urls() custom_urls = [ path("daily/", admin.site.admin_view(daily_conversation_view), name="openclaw_daily"), ] return custom_urls + urls @admin.register(Message) class MessageAdmin(admin.ModelAdmin): list_display = ("message_id", "session", "role", "timestamp", "tokens_total") list_filter = ("role", "model", "timestamp") search_fields = ("content_text",) ordering = ("-timestamp",) @admin.register(ToolCall) class ToolCallAdmin(admin.ModelAdmin): list_display = ("tool_name", "tool_call_id", "session", "is_error", "duration_ms") list_filter = ("tool_name", "is_error", "exit_code") ordering = ("-created_at",) ``` - [ ] **Step 3: 添加 Admin 导航链接** 在 SessionAdmin 的 `list_display` 上方加一个自定义链接按钮(方便入口),或者在模板侧边栏直接访问 `/admin/sessions/session/daily/`。 - [ ] **Step 4: 手动验证** ```bash cd src && python manage.py runserver ``` - 注册一个 session + message 测试数据 - `http://localhost:8000/admin/openclaw/session/daily/` 确认自定义视图可访问 - 选择日期范围,确认显示 session 列表和对话 - [ ] **Step 5: Commit** ```bash git add src/openclaw/admin.py src/openclaw/templates/ git commit -m "feat: admin daily conversation view with date filtering" ``` --- ### Task 5: Daily Markdown 导出(Admin Action) **Files:** - Create: `src/openclaw/export.py` - Create: `tests/test_admin_export.py` - [ ] **Step 1: 写导出测试** `tests/test_admin_export.py`: ```python """Tests for Markdown export functionality.""" from datetime import datetime, timezone import pytest from openclaw.models import Session, Message from openclaw.export import generate_markdown_report @pytest.fixture def db_session(db): s = Session.objects.create( session_id="report-test", agent_name="xingyao", source_node="macmini", model_provider="anthropic", model_id="claude-sonnet-4-6", total_tokens=45230, start_time=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc), ) Message.objects.create( session=s, message_id="m1", parent_id="root", role="user", content_text="Help me fix this bug", timestamp=datetime(2026, 4, 5, 10, 23, tzinfo=timezone.utc), ) Message.objects.create( session=s, message_id="m2", parent_id="m1", role="assistant", content_text="The bug is on line 45...", timestamp=datetime(2026, 4, 5, 10, 23, 30, tzinfo=timezone.utc), ) return s @pytest.mark.django_db class TestMarkdownExport: def test_basic_report(self, db_session): md = generate_markdown_report( messages=db_session.messages.order_by("created_at"), date_str="2026-04-05", ) assert "# Daily Report: 2026-04-05" in md assert "Help me fix this bug" in md assert "The bug is on line 45..." in md def test_thinking_content_stripped(self, db): s = Session.objects.create( session_id="thinking-test", agent_name="test", source_node="macmini", start_time=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc), ) Message.objects.create( session=s, message_id="m3", parent_id="root", role="assistant", content_text="Final answer", raw_content=[ {"type": "thinking", "thinking": "Let me think about this..."}, {"type": "text", "text": "Final answer"}, ], timestamp=datetime(2026, 4, 5, 10, 30, tzinfo=timezone.utc), ) md = generate_markdown_report( messages=s.messages.order_by("created_at"), date_str="2026-04-05", ) assert "Let me think about this..." not in md assert "Final answer" in md def test_tool_call_formatting(self, db): s = Session.objects.create( session_id="tool-test", agent_name="test", source_node="macmini", model_id="test-model", total_tokens=100, start_time=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc), ) Message.objects.create( session=s, message_id="m4", parent_id="root", role="assistant", content_text="I'll run a command", raw_content=[ {"type": "text", "text": "I'll run a command"}, ], timestamp=datetime(2026, 4, 5, 10, 30, tzinfo=timezone.utc), ) md = generate_markdown_report( messages=s.messages.order_by("created_at"), date_str="2026-04-05", ) assert "test-model" in md assert "100" in md ``` - [ ] **Step 2: 运行测试确认失败** ```bash pytest tests/test_admin_export.py -v ``` Expected: FAIL — module not found. - [ ] **Step 3: 实现 Markdown 导出** `src/openclaw/export.py`: ```python from openclaw.models import Session def generate_markdown_report(messages, date_str, sessions=None): """Generate a markdown daily report. Args: messages: QuerySet of Message objects, ordered by timestamp. date_str: Date string for the report header (YYYY-MM-DD). sessions: Optional dict of session_id -> Session instance for metadata. """ if sessions is None: sessions = {} lines = [f"# Daily Report: {date_str}", ""] # Group messages by session session_messages = {} for msg in messages: sid = msg.session_id if hasattr(msg, "session_id") else msg.session.session_id if sid not in session_messages: session_messages[sid] = [] session_messages[sid].append(msg) for session_id, msgs in session_messages.items(): session = sessions.get(session_id) if session: lines.append( f"## Session: {session_id} (Agent: {session.agent_name})" ) lines.append( f"**Model**: {session.model_id or 'N/A'} | " f"**Token**: {session.total_tokens:,}" ) else: lines.append(f"## Session: {session_id}") lines.append("") for msg in msgs: role_label = { "user": "User", "assistant": "Assistant", "toolResult": "Tool Result", }.get(msg.role, msg.role) time_str = msg.timestamp.strftime("%H:%M") # For assistant messages, check raw_content for tool_call mentions if msg.role == "assistant": tool_info = _extract_tool_info(msg.raw_content) lines.append(f"### {time_str} {role_label}") if msg.content_text: lines.append("") lines.append(msg.content_text) for tool in tool_info: lines.append("") lines.append( f"**{time_str} {role_label} -> [Tool: {tool['name']}]**" ) lines.append("") lines.append(f"`{tool.get('arguments', '')}`") if tool.get("result"): lines.append("") lines.append(f'**Result**: {tool["result"]}') elif msg.role == "toolResult": continue # toolResult handled inline with assistant else: lines.append(f"### {time_str} {role_label}") if msg.content_text: lines.append("") lines.append(msg.content_text) lines.append("") lines.append("---") lines.append("") return "\n".join(lines) def _extract_tool_info(raw_content): """Extract tool call info from message raw_content JSON.""" tools = [] if isinstance(raw_content, list): for block in raw_content: if isinstance(block, dict) and block.get("type") == "toolCall": tool_name = block.get("tool_name") or block.get("name", "unknown") args = block.get("arguments", {}) if isinstance(args, str): args_str = args[:200] else: args_str = str(args)[:200] tools.append({ "name": tool_name, "arguments": args_str, "result": "", # Will be filled later from toolResult }) return tools def export_daily_markdown(sessions_queryset): """Generate a markdown file from a QuerySet of Session objects. Returns (markdown_string, filename). Fetches all messages for the sessions. """ messages = Message.objects.filter( session__in=sessions_queryset ).order_by("timestamp") sessions_map = {s.session_id: s for s in sessions_queryset} # Determine date from first session start_time first_session = sessions_queryset.order_by("start_time").first() if first_session and first_session.start_time: date_str = first_session.start_time.strftime("%Y-%m-%d") else: date_str = "export" md = generate_markdown_report(messages, date_str, sessions_map) filename = f"daily-report-{date_str}.md" return md, filename ``` - [ ] **Step 4: 运行导出测试** ```bash pytest tests/test_admin_export.py -v ``` Expected: 3 PASS. - [ ] **Step 5: 注册 Admin Action** 修改 `src/openclaw/admin.py`,在 SessionAdmin 中添加 action: ```python from openclaw.export import export_daily_markdown from django.http import HttpResponse @admin.action(description="Export selected sessions to Markdown") def export_to_markdown(modeladmin, request, queryset): md, filename = export_daily_markdown(queryset) response = HttpResponse(md, content_type="text/markdown") response["Content-Disposition"] = f'attachment; filename="{filename}"' return response @admin.register(Session) class SessionAdmin(admin.ModelAdmin): # ... existing code actions = [export_to_markdown] ... ``` 完整文件在 commit 时需要合并以上修改。 - [ ] **Step 6: 手动验证** ```bash cd src && python manage.py runserver ``` 在 Admin Session 列表勾选 sessions → 选择 "Export selected sessions to Markdown" → 确认下载文件,内容为 Markdown 格式对话。 - [ ] **Step 7: Commit** ```bash git add src/openclaw/export.py tests/test_admin_export.py src/openclaw/admin.py git commit -m "feat: daily Markdown export admin action" ``` --- ### Task 6: 客户端 JSONL 解析与推送脚本 **Files:** - Create: `scripts/sync_sessions.py` - [ ] **Step 1: 创建脚本** `scripts/sync_sessions.py`: ```python #!/usr/bin/env python """ OpenClaw Session Sync Script Scans local agent sessions directories, parses JSONL files, and pushes structured JSON to the Django API. Usage: python sync_sessions.py --remote-url http://macmini:8000/api/sessions/bulk_upsert/ Cron: 0 2 * * * cd /path/to/scripts && python sync_sessions.py --remote-url """ import argparse import json import os import sys import time import urllib.error import urllib.request from pathlib import Path # ───────────────────────────────────────────────────────────────── # Configuration # ───────────────────────────────────────────────────────────────── SESSIONS_DIR_NAME = "sessions" STATE_FILE = ".sync_state" DELETED_SUFFIX = ".deleted." # ───────────────────────────────────────────────────────────────── # File Discovery # ───────────────────────────────────────────────────────────────── def find_sessions(root_path): """Walk root_path/agents/*/sessions/ and yield (agent_name, jsonl_path).""" agents_dir = Path(root_path) / "agents" if not agents_dir.exists(): return for agent_folder in sorted(agents_dir.iterdir()): if not agent_folder.is_dir(): continue sessions_dir = agent_folder / SESSIONS_DIR_NAME if not sessions_dir.exists(): continue agent_name = agent_folder.name for jsonl_file in sorted(sessions_dir.glob("*.jsonl")): if DELETED_SUFFIX in jsonl_file.name: continue yield agent_name, str(jsonl_file) def get_sync_state(sessions_dir): """Read .sync_state from sessions directory, return {path: mtime}.""" state_path = Path(sessions_dir) / STATE_FILE if not state_path.exists(): return {} try: with open(state_path) as f: return json.load(f) except (json.JSONDecodeError, IOError): return {} def save_sync_state(sessions_dir, state): """Write .sync_state file.""" state_path = Path(sessions_dir) / STATE_FILE with open(state_path, "w") as f: json.dump(state, f) def get_new_files(root_path): """Find files that are new or modified since last sync.""" state = {} all_sessions_dirs = set() agents_dir = Path(root_path) / "agents" if agents_dir.exists(): for agent_folder in agents_dir.iterdir(): if agent_folder.is_dir(): sessions_dir = agent_folder / SESSIONS_DIR_NAME if sessions_dir.exists(): all_sessions_dirs.add(str(sessions_dir)) # Load existing state from all session dirs merged_state = {} for sd in all_sessions_dirs: sd_state = get_sync_state(sd) merged_state.update(sd_state) new_files = [] for agent_name, jsonl_path in find_sessions(root_path): stat = os.stat(jsonl_path) mtime = stat.st_mtime file_key = jsonl_path old_mtime = merged_state.get(file_key, 0) if mtime > old_mtime: new_files.append((agent_name, jsonl_path)) merged_state[file_key] = mtime # Save new state for sd in all_sessions_dirs: dir_files = {k: v for k, v in merged_state.items() if k.startswith(sd)} save_sync_state(sd, dir_files) return new_files # ───────────────────────────────────────────────────────────────── # JSONL Parser # ───────────────────────────────────────────────────────────────── def parse_jsonl(file_path): """Parse a JSONL file and return structured session data.""" sessions = [] messages = [] tool_calls = [] # State tracking for model/thinking changes current_model_provider = "" current_model_id = "" current_thinking_level = "" # Tool results lookup by tool_call_id tool_results = {} events = [] with open(file_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: event = json.loads(line) events.append(event) except json.JSONDecodeError: continue if not events: return sessions, messages, tool_calls # First pass: extract session metadata session_event = None for event in events: event_type = event.get("type", "") if event_type == "session": session_event = event break if not session_event: return sessions, messages, tool_calls session_id = session_event.get("id", "") session_timestamp = session_event.get("timestamp", "") session_cwd = session_event.get("cwd", "") session_version = events[-1].get("version", 0) if events else 0 # Determine start and end time from all events timestamps = [] for event in events: ts = event.get("timestamp", "") if ts: timestamps.append(ts) # Second pass: process events message_seq = 0 total_tokens = 0 total_cost = 0.0 message_count = 0 tool_call_count = 0 error_count = 0 for event in events: event_type = event.get("type", "") if event_type == "model_change": current_model_provider = event.get("provider", "") current_model_id = event.get("modelId", "") elif event_type == "thinking_level_change": current_thinking_level = event.get("thinkingLevel", "") elif event_type == "message": role = event.get("role", "") msg_id = event.get("id", "") parent_id = event.get("parentId", "") msg_timestamp = event.get("timestamp", "") # Extract text content (skip thinking) content_items = event.get("content", []) text_parts = [] tc_list = [] for item in content_items: if isinstance(item, dict): if item.get("type") == "text": text_parts.append(item.get("text", "")) elif item.get("type") == "toolCall": tc_list.append(item) # Skip thinking types content_text = "\n".join(text_parts) msg_data = { "session_id": session_id, "message_id": msg_id, "parent_id": parent_id, "seq": message_seq, "role": role, "content_text": content_text, "raw_content": content_items if content_items else [], "raw_message": event.get("content", []), "timestamp": msg_timestamp, } if role == "assistant": usage = event.get("usage", {}) msg_data.update({ "model": current_model_id, "provider": current_model_provider, "stop_reason": event.get("stopReason", ""), "tokens_input": usage.get("inputTokens", 0), "tokens_output": usage.get("outputTokens", 0), "tokens_cache_read": usage.get("cacheReadInputTokens", 0), "tokens_cache_write": usage.get("cacheWriteInputTokens", 0), "tokens_total": usage.get("totalTokens", 0), }) total_tokens += usage.get("totalTokens", 0) if event.get("cost"): cost_val = event["cost"].get("total", 0.0) msg_data["cost_total"] = cost_val total_cost += cost_val message_count += 1 elif role == "toolResult": msg_data.update({ "tool_call_id": event.get("toolCallId", ""), "tool_name": event.get("toolName", ""), "is_error": event.get("isError", False), "exit_code": event.get("exitCode"), "duration_ms": event.get("durationMs"), }) if event.get("isError"): error_count += 1 # Store for tool call association if event.get("toolCallId"): tool_results[event["toolCallId"]] = { "result_text": content_text, "is_error": event.get("isError", False), "exit_code": event.get("exitCode"), "duration_ms": event.get("durationMs"), } messages.append(msg_data) message_seq += 1 # Extract tool calls from assistant messages tc_seq = 0 for tc in tc_list: tool_call_data = { "session_id": session_id, "message_id": msg_id, "tool_call_id": tc.get("id", f"call_{msg_id}_{tc_seq}"), "tool_name": tc.get("name", "unknown"), "arguments": tc.get("arguments", {}), "seq": tc_seq, } # Enrich with tool result if available tr = tool_results.get(tool_call_data["tool_call_id"], {}) tool_call_data["result_text"] = tr.get("result_text", "") tool_call_data["is_error"] = tr.get("is_error", False) tool_call_data["exit_code"] = tr.get("exit_code") tool_call_data["duration_ms"] = tr.get("duration_ms") tool_calls.append(tool_call_data) tool_call_count += 1 tc_seq += 1 # Build session record start_time = timestamps[0] if timestamps else session_timestamp end_time = timestamps[-1] if timestamps else session_timestamp session_data = { "session_id": session_id, "session_version": session_version, "model_provider": current_model_provider, "model_id": current_model_id, "thinking_level": current_thinking_level, "start_time": start_time, "end_time": end_time, "cwd": session_cwd, "total_tokens": total_tokens, "total_cost": total_cost, "message_count": message_count, "tool_call_count": tool_call_count, "error_count": error_count, "raw_file_path": str(file_path), "status": "active", "metadata": {}, } sessions.append(session_data) return sessions, messages, tool_calls # ───────────────────────────────────────────────────────────────── # HTTP Client # ───────────────────────────────────────────────────────────────── def push_to_api(remote_url, payload): """POST structured JSON to Django API.""" data = json.dumps(payload).encode("utf-8") req = urllib.request.Request( remote_url, data=data, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=120) as resp: return json.loads(resp.read()) except urllib.error.HTTPError as e: print(f"HTTP Error {e.code}: {e.read().decode('utf-8', errors='replace')}") raise except urllib.error.URLError as e: print(f"URL Error: {e.reason}") raise # ───────────────────────────────────────────────────────────────── # Main # ───────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Sync OpenClaw sessions to Django API") parser.add_argument( "--remote-url", required=True, help="Django API bulk_upsert endpoint URL", ) parser.add_argument( "--root-path", default=".", help="Root path containing agents/ directory (default: current dir)", ) args = parser.parse_args() new_files = get_new_files(args.root_path) if not new_files: print("No new or modified session files found.") return print(f"Found {len(new_files)} new/modified session(s).") total_sessions = 0 total_messages = 0 total_tool_calls = 0 # Group by agent_name (batch per agent) agent_batches = {} for agent_name, jsonl_path in new_files: agent_batches.setdefault(agent_name, []).append(jsonl_path) for agent_name, file_paths in agent_batches.items(): all_sessions = [] all_messages = [] all_tool_calls = [] for fp in file_paths: print(f" Parsing: {fp}") try: sessions, messages, tool_calls = parse_jsonl(fp) all_sessions.extend(sessions) all_messages.extend(messages) all_tool_calls.extend(tool_calls) except Exception as e: print(f" ERROR parsing {fp}: {e}") continue if not all_sessions: continue payload = { "agent_name": agent_name, "source_node": os.environ.get("SOURCE_NODE", "unknown"), "sessions": all_sessions, "messages": all_messages, "tool_calls": all_tool_calls, } print(f" Pushing {len(all_sessions)} session(s), " f"{len(all_messages)} message(s), " f"{len(all_tool_calls)} tool call(s)...") try: result = push_to_api(args.remote_url, payload) print(f" OK: {result}") total_sessions += result.get("sessions_upserted", 0) total_messages += result.get("messages_upserted", 0) total_tool_calls += result.get("tool_calls_upserted", 0) except Exception: print(f" FAILED to push {agent_name} sessions.") print(f"\nSync complete: {total_sessions} sessions, " f"{total_messages} messages, {total_tool_calls} tool calls pushed.") if __name__ == "__main__": main() ``` - [ ] **Step 2: 语法检查** ```bash python -m py_compile scripts/sync_sessions.py ``` Expected: no output (success). - [ ] **Step 3: Commit** ```bash git add scripts/sync_sessions.py git commit -m "feat: client JSONL parse and push script" ``` --- ### Task 7: Docker 部署配置 **Files:** - Create: `Dockerfile`, `docker-compose.yml`, `.dockerignore` - Create: `nginx/nginx.conf.placeholder` - [ ] **Step 1: .dockerignore** `.dockerignore`: ``` __pycache__ *.pyc .venv .env *.egg-info .git .pytest_cache .mypy_cache tests/ scripts/ docs/ db.sqlite3 ``` - [ ] **Step 2: Dockerfile** `Dockerfile`: ```dockerfile FROM python:3.12-slim WORKDIR /app RUN apt-get update && apt-get install -y --no-install-recommends \ build-essential libpq-dev \ && rm -rf /var/lib/apt/lists/* COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . EXPOSE ${DJANGO_PORT:-8000} CMD ["gunicorn", "--bind", "0.0.0.0:${DJANGO_PORT:-8000}", \ "--workers", "4", "--timeout", "120", \ "config.wsgi:application"] ``` - [ ] **Step 3: docker-compose.yml** `docker-compose.yml`: ```yaml services: web: build: context: . dockerfile: Dockerfile container_name: openclaw-archive env_file: - .env ports: - "${DJANGO_PORT:-8000}:${DJANGO_PORT:-8000}" volumes: - static_volume:/app/staticfiles - jsonl_archive:/app/archive restart: unless-stopped # nginx placeholder (uncomment for production) # nginx: # image: nginx:alpine # container_name: openclaw-nginx # ports: # - "80:80" # volumes: # - ./nginx/nginx.conf:/etc/nginx/nginx.conf # - static_volume:/app/staticfiles:ro # depends_on: # - web volumes: static_volume: jsonl_archive: ``` - [ ] **Step 4: Nginx placeholder** `nginx/nginx.conf.placeholder`: ```nginx # Nginx reverse proxy placeholder for OpenClaw Archive # # To enable: # 1. Rename this file to nginx.conf # 2. Update server_name and SSL certificate paths # 3. Uncomment the nginx service in docker-compose.yml upstream django { server web:8000; } server { listen 80; server_name _; # Update for production location /static/ { alias /app/staticfiles/; } location / { proxy_pass http://django; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; } } ``` - [ ] **Step 5: Settings 更新 — 读取环境变量** `src/config/settings/base.py` 已使用 `os.environ.get()` 读取环境变量,无需修改。 - [ ] **Step 6: Commit** ```bash git add Dockerfile docker-compose.yml .dockerignore nginx/nginx.conf.placeholder git commit -m "feat: Docker Compose deployment configuration" ``` --- ### Task 8: TimescaleDB Hypertable Migration **Files:** - Create: `src/openclaw/migrations/XXXX_add_hypertables.py` (数字取决于之前的 migration) ⚠️ 此 task 需要真实的 PostgreSQL + TimescaleDB 环境。开发时可以用 `--fake` 跳过,生产环境必须真实执行。 - [ ] **Step 1: 创建 empty migration** ```bash cd src && python manage.py makemigrations openclaw --empty -n add_hypertables ``` - [ ] **Step 2: 编辑 migration** 生成的文件 `src/openclaw/migrations/XXXX_add_hypertables.py`: ```python from django.db import migrations CREATE_HYPERTABLES = """ SELECT create_hypertable( 'sessions', 'start_time', if_not_exists => TRUE ); SELECT create_hypertable( 'messages', 'timestamp', if_not_exists => TRUE ); """ class Migration(migrations.Migration): dependencies = [ ('openclaw', 'PREVIOUS_MIGRATION_NUMBER'), # Update this ] operations = [ migrations.RunSQL(CREATE_HYPERTABLES, migrations.RunSQL.noop), ] ``` - [ ] **Step 3: Commit** ```bash git add src/openclaw/migrations/ git commit -m "feat: TimescaleDB hypertable migrations" ``` --- ### Task 9: 端到端测试与 README **Files:** - Create: `README.md` (更新) - [ ] **Step 1: 更新 README** `README.md`: ```markdown # agent-base OpenClaw Session Archive — 多 Agent 会话解析与归档系统。 ## Architecture Three nodes (Mac Mini, Ubuntu1, Ubuntu2) run a sync script that parses local JSONL session files and pushes structured data to a central Django API. The Django service runs in Docker Compose and stores data in a remote PostgreSQL + TimescaleDB instance on NAS. ## Quick Start ```bash # 1. Configure environment cp .env.example .env # Edit .env with your database credentials and Django settings # 2. Build and start docker compose build docker compose run --rm web python manage.py migrate docker compose run --rm web python manage.py createsuperuser docker compose up -d # 3. Access # Django Admin: http://:8000/admin/ # API: http://:8000/api/sessions/bulk_upsert/ ``` ## Running Tests ```bash pip install -r requirements-dev.txt pytest -v ``` ## Client Sync Script Deploy `scripts/sync_sessions.py` on each node: ```bash python sync_sessions.py --remote-url http://:8000/api/sessions/bulk_upsert/ ``` Set `SOURCE_NODE` environment variable on each node (`macmini`, `ubuntu1`, `ubuntu2`). ## Daily Export In Django Admin, select sessions and choose "Export selected sessions to Markdown" action. ``` - [ ] **Step 2: 端到端测试流程** ```bash # 1. 用 SQLite 跑通完整流程 cd src python manage.py migrate python manage.py createsuperuser # 可选 # 2. 用 pytest fixture 创建测试数据,push 到 API cd .. pytest -v # 3. 启动 Django,手动调 sync_sessions.py(用测试用的 JSONL 文件)验证 cd src && python manage.py runserver & cd .. python scripts/sync_sessions.py --remote-url http://localhost:8000/api/sessions/bulk_upsert/ --root-path /path/to/test/agents ``` - [ ] **Step 3: Commit** ```bash git add README.md git commit -m "docs: update README with architecture and quick start" ``` --- ## Self-Review ### 1. Spec Coverage | Spec Section | Covered By | |-----|-----| | 4.1 Session 表 | Task 1 - Session model | | 4.2 Message 表 | Task 1 - Message model | | 4.3 ToolCall 表 | Task 1 - ToolCall model | | 4.4 TimescaleDB hypertable | Task 8 - hypertable migration | | 5.1 推送脚本 | Task 6 - sync_sessions.py | | 5.2 定时任务 | Task 6 - script + cron usage | | 6. API bulk_upsert | Task 2 - views.py + services.py | | 7.1-7.4 Admin CRUD | Task 3 - admin.py | | 7.5 按时间范围查询 | Task 4 - daily_view.html + admin view | | 7.6 Daily Markdown 导出 | Task 5 - export.py + Admin action | | 8.1 客户端解析 | Task 6 - parse_jsonl function | | 8.2 服务端写入 | Task 2 - BulkUpsertService | | 9. 技术栈 | All tasks | | 10. Docker Compose | Task 7 | | 12. 非技术决策 | Task 1 (unique_together), Task 6 (skip .deleted.), Task 6 (SOURCE_NODE env) | ### 2. Placeholder Scan No TBD, TODO, or "similar to" patterns found. All code blocks contain actual implementations. ### 3. Type Consistency - `session_id` is consistently a `CharField` (not UUIDField) — matches API usage - `agent_name + session_id` unique_together used consistently in both model and service for idempotency - `timestamps` use ISO 8601 strings in API, parsed by `_parse_ts()` in service - `role_labels` dict in admin matches actual DB role values: `"user"`, `"assistant"`, `"toolResult"` One note: `created_at` and `updated_at` fields are auto-added to all models. The spec doesn't mention them but they're standard Django practice and harmless. ---