Files
agent-base/docs/superpowers/plans/2026-04-05-openclaw-session-archive.md
2026-04-05 18:07:41 +08:00

2553 lines
77 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# OpenClaw Session Archive 实施计划
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** 构建一个 Django + DRF 项目,包含数据模型、批量写入 API、自定义 Admin 视图、每日 Markdown 导出,以及客户端 JSONL 解析推送脚本,通过 Docker Compose 部署。
**Architecture:** 三节点各运行一个独立解析脚本(纯 Python将 JSONL 解析为结构化 JSON 后 POST 到 Django API。Django 服务容器化运行,连接远程 NAS 上的 PostgreSQL + TimescaleDB。
**Tech Stack:** Python 3.12, Django 5.x, DRF 3.15+, psycopg 3.x, PostgreSQL 16+ TimescaleDB, Docker Compose, pytest, pytest-django
---
## 文件结构总览
```
agent-base/
├── manage.py # Django 入口
├── requirements.txt # Python 依赖
├── requirements-dev.txt # 开发依赖pytest 等)
├── pyproject.toml # pytest 配置
├── .env.example # 环境变量模板
├── .gitignore
├── .dockerignore
├── Dockerfile # Django 生产镜像
├── docker-compose.yml # 编排配置
├── nginx/
│ └── nginx.conf.placeholder # Nginx 反代占位(预留)
├── src/
│ ├── config/ # Django 项目配置
│ │ ├── __init__.py
│ │ ├── settings/
│ │ │ ├── __init__.py
│ │ │ ├── base.py # 公共配置
│ │ │ └── dev.py # 开发覆盖DB 用 SQLite
│ │ ├── urls.py # 路由
│ │ └── wsgi.py
│ └── openclaw/ # Django app
│ ├── __init__.py
│ ├── apps.py
│ ├── models.py # Session, Message, ToolCall
│ ├── views.py # API views
│ ├── admin.py # Admin 自定义
│ ├── services.py # 批量写入 + Markdown 导出
│ ├── urls.py # app 路由
│ └── templates/
│ └── admin/
│ └── openclaw/
│ └── daily_view.html
├── tests/
│ ├── conftest.py # pytest fixtures
│ ├── test_models.py # 模型测试
│ ├── test_api.py # API 测试
│ ├── test_services.py # 服务层测试
│ └── test_admin_export.py # 导出测试
├── scripts/
│ └── sync_sessions.py # 客户端解析与推送脚本
└── docs/
├── specs/ # Spec 文档
└── plans/ # 实施计划
```
---
### Task 0: 项目骨架与测试基础
**Files:**
- Create: `pyproject.toml`, `manage.py`, `requirements.txt`, `requirements-dev.txt`
- Create: `src/config/__init__.py`, `src/config/settings/__init__.py`, `src/config/settings/base.py`, `src/config/settings/dev.py`, `src/config/urls.py`, `src/config/wsgi.py`
- Create: `src/openclaw/__init__.py`, `src/openclaw/apps.py`
- Create: `tests/conftest.py`
- Create: `.gitignore`, `.env.example`
- [ ] **Step 1: 依赖文件**
`requirements.txt`:
```
Django>=5.0,<6.0
djangorestframework>=3.15,<4.0
psycopg[binary]>=3.1,<4.0
gunicorn>=22.0,<24.0
```
`requirements-dev.txt`:
```
-r requirements.txt
pytest>=8.0,<9.0
pytest-django>=4.8,<5.0
```
- [ ] **Step 2: pytest 配置**
`pyproject.toml`:
```toml
[tool.pytest.ini_options]
DJANGO_SETTINGS_MODULE = "config.settings.dev"
python_files = ["tests.py", "test_*.py"]
pythonpath = ["src"]
```
- [ ] **Step 3: Django settings**
`src/config/settings/__init__.py`:
```python
from .base import * # noqa: F401,F403
```
`src/config/settings/base.py`:
```python
import os
SECRET_KEY = os.environ.get("DJANGO_SECRET_KEY", "dev-secret-key")
DEBUG = False
ALLOWED_HOSTS = os.environ.get("DJANGO_ALLOWED_HOSTS", "*").split(",")
INSTALLED_APPS = [
"django.contrib.admin",
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.messages",
"django.contrib.staticfiles",
"rest_framework",
"openclaw",
]
MIDDLEWARE = [
"django.middleware.security.SecurityMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"django.middleware.common.CommonMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"django.middleware.clickjacking.XFrameOptionsMiddleware",
]
ROOT_URLCONF = "config.urls"
TEMPLATES = [
{
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [],
"APP_DIRS": True,
"OPTIONS": {
"context_processors": [
"django.template.context_processors.debug",
"django.template.context_processors.request",
"django.contrib.auth.context_processors.auth",
"django.contrib.messages.context_processors.messages",
],
},
},
]
DATABASES = {
"default": {
"ENGINE": "django.db.backends.postgresql",
"HOST": os.environ.get("DB_HOST", "localhost"),
"PORT": os.environ.get("DB_PORT", "5432"),
"NAME": os.environ.get("DB_NAME", "openclaw_archive"),
"USER": os.environ.get("DB_USER", "openclaw"),
"PASSWORD": os.environ.get("DB_PASSWORD", ""),
}
}
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
STATIC_URL = "static/"
```
`src/config/settings/dev.py`:
```python
from .base import * # noqa: F401,F403
DEBUG = True
DATABASES = {
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": BASE_DIR / "db.sqlite3", # noqa: F821
}
}
```
- [ ] **Step 4: Django 基础文件**
`src/config/urls.py`:
```python
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path("admin/", admin.site.urls),
path("api/", include("openclaw.urls")),
]
```
`src/config/wsgi.py`:
```python
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
application = get_wsgi_application()
```
`src/openclaw/apps.py`:
```python
from django.apps import AppConfig
class OpenclawConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "openclaw"
```
- [ ] **Step 5: manage.py**
`manage.py`:
```python
#!/usr/bin/env python
import os
import sys
def main():
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. "
"Are you sure it's installed and available on your PYTHONPATH "
f"environment variable is set? Did you forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == "__main__":
main()
```
- [ ] **Step 6: 测试基础**
`tests/conftest.py`:
```python
import pytest
@pytest.fixture(autouse=True)
def media_storage(settings, tmpdir):
settings.MEDIA_ROOT = str(tmpdir)
```
- [ ] **Step 7: .gitignore 和 .env.example**
`.gitignore`:
```
__pycache__/
*.py[cod]
*.so
*.egg-info/
dist/
.venv/
db.sqlite3
.env
*.sqlite3
```
`.env.example`:
```
# Django
DJANGO_SECRET_KEY=CHANGE_ME
DJANGO_PORT=8000
DJANGO_ALLOWED_HOSTS=*
# Database (NAS)
DB_HOST=192.168.x.x
DB_PORT=5432
DB_NAME=openclaw_archive
DB_USER=openclaw
DB_PASSWORD=CHANGE_ME
```
- [ ] **Step 8: 安装依赖并验证 Django 启动**
```bash
pip install -r requirements-dev.txt
cd src && python -c "import django; django.setup(); print('Django OK')"
cd ..
pytest --collect-only
```
Expected: `Django OK` and test collection passes (0 tests).
- [ ] **Step 9: Commit**
```bash
git add .
git commit -m "feat: Django project skeleton with pytest setup"
```
---
### Task 1: 数据模型
**Files:**
- Create: `src/openclaw/models.py`
- Create: `tests/test_models.py`
- [ ] **Step 1: 写模型测试**
`tests/test_models.py`:
```python
from datetime import datetime, timezone
import pytest
from openclaw.models import Session, Message, ToolCall
@pytest.mark.django_db
class TestModelFields:
def test_session_creation(self):
s = Session.objects.create(
session_id="a" * 36,
agent_name="xingyao",
source_node="macmini",
status="active",
)
assert s.session_id == "a" * 36
assert s.total_tokens == 0
assert s.message_count == 0
def test_message_creation(self):
s = Session.objects.create(
session_id="b" * 36,
agent_name="test",
source_node="ubuntu1",
status="active",
)
msg = Message.objects.create(
session=s,
message_id="msg-001",
parent_id="root",
role="assistant",
timestamp=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc),
)
assert msg.role == "assistant"
assert msg.tokens_total == 0
def test_toolcall_creation(self):
s = Session.objects.create(
session_id="c" * 36,
agent_name="test",
source_node="ubuntu2",
status="active",
)
msg = Message.objects.create(
session=s,
message_id="msg-002",
parent_id="root",
role="assistant",
timestamp=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc),
)
tc = ToolCall.objects.create(
session=s,
message=msg,
tool_call_id="call_0",
tool_name="exec",
)
assert tc.tool_name == "exec"
assert tc.is_error is False
```
- [ ] **Step 2: 运行测试确认失败**
```bash
pytest tests/test_models.py -v
```
Expected: FAIL — Model not found.
- [ ] **Step 3: 实现模型**
`src/openclaw/models.py`:
```python
from django.db import models
class Session(models.Model):
session_id = models.CharField(max_length=64)
agent_name = models.CharField(max_length=128)
source_node = models.CharField(max_length=64)
session_version = models.IntegerField(default=0)
model_provider = models.CharField(max_length=64, blank=True, default="")
model_id = models.CharField(max_length=128, blank=True, default="")
thinking_level = models.CharField(max_length=64, blank=True, default="")
start_time = models.DateTimeField(null=True, blank=True)
end_time = models.DateTimeField(null=True, blank=True)
cwd = models.CharField(max_length=512, blank=True, default="")
total_tokens = models.IntegerField(default=0)
total_cost = models.FloatField(default=0.0)
message_count = models.IntegerField(default=0)
tool_call_count = models.IntegerField(default=0)
error_count = models.IntegerField(default=0)
raw_file_path = models.TextField(blank=True, default="")
pushed_at = models.DateTimeField(null=True, blank=True)
status = models.CharField(max_length=16, default="active")
metadata = models.JSONField(default=dict, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = "sessions"
unique_together = ("session_id", "agent_name")
ordering = ["-start_time"]
def __str__(self):
return f"Session({self.session_id} {self.agent_name})"
class Message(models.Model):
session = models.ForeignKey(
Session, on_delete=models.CASCADE, related_name="messages"
)
message_id = models.CharField(max_length=128)
parent_id = models.CharField(max_length=128, blank=True, default="")
seq = models.IntegerField(default=0)
role = models.CharField(max_length=32)
content_text = models.TextField(blank=True, default="")
raw_content = models.JSONField(default=list, blank=True)
raw_message = models.JSONField(default=dict, blank=True)
timestamp = models.DateTimeField()
# assistant 专用
model = models.CharField(max_length=128, blank=True, default="")
provider = models.CharField(max_length=64, blank=True, default="")
stop_reason = models.CharField(max_length=64, blank=True, default="")
tokens_input = models.IntegerField(default=0)
tokens_output = models.IntegerField(default=0)
tokens_cache_read = models.IntegerField(default=0)
tokens_cache_write = models.IntegerField(default=0)
tokens_total = models.IntegerField(default=0)
cost_total = models.FloatField(default=0.0)
# toolResult 专用
tool_call_id = models.CharField(max_length=128, blank=True, default="")
tool_name = models.CharField(max_length=128, blank=True, default="")
is_error = models.BooleanField(default=False)
exit_code = models.IntegerField(null=True, blank=True)
duration_ms = models.IntegerField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = "messages"
ordering = ["seq"]
def __str__(self):
return f"Message({self.message_id} {self.role})"
class ToolCall(models.Model):
session = models.ForeignKey(
Session, on_delete=models.CASCADE, related_name="tool_calls"
)
message = models.ForeignKey(
Message, on_delete=models.CASCADE, related_name="tool_calls"
)
tool_call_id = models.CharField(max_length=128)
tool_name = models.CharField(max_length=128)
arguments = models.JSONField(default=dict, blank=True)
result_text = models.TextField(blank=True, default="")
is_error = models.BooleanField(default=False)
exit_code = models.IntegerField(null=True, blank=True)
duration_ms = models.IntegerField(null=True, blank=True)
seq = models.IntegerField(default=0)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = "tool_calls"
ordering = ["seq"]
def __str__(self):
return f"ToolCall({self.tool_name} {self.tool_call_id})"
```
- [ ] **Step 4: make migrations + migrate**
```bash
cd src && python manage.py makemigrations openclaw
python manage.py migrate
cd ..
```
- [ ] **Step 5: 重新运行测试确认通过**
```bash
pytest tests/test_models.py -v
```
Expected: 3 PASS.
- [ ] **Step 6: Commit**
```bash
git add src/openclaw/models.py tests/test_models.py src/openclaw/migrations/ pyproject.toml
git commit -m "feat: add Session, Message, ToolCall models"
```
---
### Task 2: 批量写入 API
**Files:**
- Create: `src/openclaw/urls.py`
- Create: `src/openclaw/views.py`
- Create: `src/openclaw/services.py`
- Create: `tests/test_api.py`
- Create: `tests/test_services.py`
- [ ] **Step 1: 写服务层测试BulkUpsertService**
`tests/test_services.py`:
```python
from datetime import datetime, timezone
import pytest
from openclaw.models import Session, Message, ToolCall
from openclaw.services import BulkUpsertService
@pytest.mark.django_db
class TestBulkUpsertService:
def _new_session_payload(self):
return {
"agent_name": "test-agent",
"source_node": "macmini",
"sessions": [
{
"session_id": "sess-001",
"session_version": 1,
"model_provider": "anthropic",
"model_id": "claude-sonnet-4-6",
"thinking_level": "high",
"start_time": "2026-04-05T10:00:00Z",
"end_time": "2026-04-05T10:30:00Z",
"cwd": "/tmp/test",
"total_tokens": 5000,
"total_cost": 0.12,
"message_count": 10,
"tool_call_count": 3,
"error_count": 0,
"raw_file_path": "/path/to/sessions/sess-001.jsonl",
"status": "active",
"metadata": {},
}
],
"messages": [
{
"session_id": "sess-001",
"message_id": "msg-001",
"parent_id": "root",
"seq": 0,
"role": "user",
"content_text": "Hello",
"raw_content": [{"type": "text", "text": "Hello"}],
"raw_message": {},
"timestamp": "2026-04-05T10:05:00Z",
}
],
"tool_calls": [],
}
def test_upsert_new_session(self):
result = BulkUpsertService.upsert(self._new_session_payload())
assert result["sessions_upserted"] == 1
assert Session.objects.get(session_id="sess-001")
def test_upsert_idempotent(self):
BulkUpsertService.upsert(self._new_session_payload())
result = BulkUpsertService.upsert(self._new_session_payload())
# second push should not create duplicate
assert Session.objects.filter(session_id="sess-001").count() == 1
assert result["sessions_upserted"] == 0 # skipped
def test_upsert_with_messages_and_toolcalls(self):
payload = self._new_session_payload()
payload["messages"].append(
{
"session_id": "sess-001",
"message_id": "msg-002",
"parent_id": "msg-001",
"seq": 1,
"role": "assistant",
"content_text": "Hi there",
"raw_content": [{"type": "text", "text": "Hi there"}],
"raw_message": {},
"timestamp": "2026-04-05T10:06:00Z",
"model": "claude-sonnet-4-6",
"provider": "anthropic",
"stop_reason": "end_turn",
"tokens_input": 100,
"tokens_output": 50,
"tokens_total": 150,
"cost_total": 0.01,
}
)
payload["tool_calls"].append(
{
"session_id": "sess-001",
"message_id": "msg-002",
"tool_call_id": "call_0",
"tool_name": "exec",
"arguments": {"command": "ls"},
"result_text": "file.txt",
"is_error": False,
"exit_code": 0,
"duration_ms": 200,
"seq": 0,
}
)
result = BulkUpsertService.upsert(payload)
assert result["messages_upserted"] == 2
assert result["tool_calls_upserted"] == 1
assert ToolCall.objects.get(tool_call_id="call_0").tool_name == "exec"
```
- [ ] **Step 2: 运行测试确认失败**
```bash
pytest tests/test_services.py -v
```
Expected: FAIL — module not found.
- [ ] **Step 3: 实现 BulkUpsertService**
`src/openclaw/services.py`:
```python
from datetime import datetime, timezone
from django.db import transaction
from openclaw.models import Session, Message, ToolCall
def _parse_ts(value):
if not value:
return None
if isinstance(value, str):
# Handle ISO 8601 Z suffix
value = value.replace("Z", "+00:00")
return datetime.fromisoformat(value)
return value
class BulkUpsertService:
@staticmethod
@transaction.atomic
def upsert(payload):
agent_name = payload["agent_name"]
source_node = payload["source_node"]
sessions_data = payload.get("sessions", [])
messages_data = payload.get("messages", [])
tool_calls_data = payload.get("tool_calls", [])
sessions_upserted = 0
messages_upserted = 0
tool_calls_upserted = 0
for sess in sessions_data:
session_id = sess["session_id"]
defaults = {
"source_node": source_node,
"session_version": sess.get("session_version", 0),
"model_provider": sess.get("model_provider", ""),
"model_id": sess.get("model_id", ""),
"thinking_level": sess.get("thinking_level", ""),
"start_time": _parse_ts(sess.get("start_time")),
"end_time": _parse_ts(sess.get("end_time")),
"cwd": sess.get("cwd", ""),
"total_tokens": sess.get("total_tokens", 0),
"total_cost": sess.get("total_cost", 0.0),
"message_count": sess.get("message_count", 0),
"tool_call_count": sess.get("tool_call_count", 0),
"error_count": sess.get("error_count", 0),
"raw_file_path": sess.get("raw_file_path", ""),
"pushed_at": datetime.now(timezone.utc),
"status": sess.get("status", "active"),
"metadata": sess.get("metadata", {}),
}
_, created = Session.objects.update_or_create(
session_id=session_id,
agent_name=agent_name,
defaults=defaults,
)
if created:
sessions_upserted += 1
# Build session lookup: session_id -> Session instance
session_ids = {s["session_id"] for s in sessions_data}
session_lookup = {
s.session_id: s
for s in Session.objects.filter(
session_id__in=session_ids, agent_name=agent_name
)
}
# Upsert messages
for msg in messages_data:
session = session_lookup.get(msg["session_id"])
if not session:
continue
defaults = {
"parent_id": msg.get("parent_id", ""),
"seq": msg.get("seq", 0),
"role": msg.get("role", ""),
"content_text": msg.get("content_text", ""),
"raw_content": msg.get("raw_content", []),
"raw_message": msg.get("raw_message", {}),
"timestamp": _parse_ts(msg.get("timestamp")),
"model": msg.get("model", ""),
"provider": msg.get("provider", ""),
"stop_reason": msg.get("stop_reason", ""),
"tokens_input": msg.get("tokens_input", 0),
"tokens_output": msg.get("tokens_output", 0),
"tokens_cache_read": msg.get("tokens_cache_read", 0),
"tokens_cache_write": msg.get("tokens_cache_write", 0),
"tokens_total": msg.get("tokens_total", 0),
"cost_total": msg.get("cost_total", 0.0),
"tool_call_id": msg.get("tool_call_id", ""),
"tool_name": msg.get("tool_name", ""),
"is_error": msg.get("is_error", False),
"exit_code": msg.get("exit_code"),
"duration_ms": msg.get("duration_ms"),
}
Message.objects.update_or_create(
session=session,
message_id=msg["message_id"],
defaults=defaults,
)
messages_upserted += 1
# Build message lookup: message_id -> Message instance
msg_lookup = {
m.message_id: m
for m in Message.objects.filter(session__in=session_lookup.values())
}
# Upsert tool_calls
for tc in tool_calls_data:
session = session_lookup.get(tc["session_id"])
message = msg_lookup.get(tc["message_id"])
if not session or not message:
continue
ToolCall.objects.update_or_create(
session=session,
message=message,
tool_call_id=tc["tool_call_id"],
defaults={
"tool_name": tc.get("tool_name", ""),
"arguments": tc.get("arguments", {}),
"result_text": tc.get("result_text", ""),
"is_error": tc.get("is_error", False),
"exit_code": tc.get("exit_code"),
"duration_ms": tc.get("duration_ms"),
"seq": tc.get("seq", 0),
},
)
tool_calls_upserted += 1
return {
"sessions_upserted": sessions_upserted,
"messages_upserted": messages_upserted,
"tool_calls_upserted": tool_calls_upserted,
}
```
- [ ] **Step 4: 运行服务层测试**
```bash
pytest tests/test_services.py -v
```
Expected: 3 PASS.
- [ ] **Step 5: 写 API 测试**
`tests/test_api.py`:
```python
import pytest
from openclaw.models import Session
BULK_URL = "/api/sessions/bulk_upsert/"
def _minimal_payload():
return {
"agent_name": "test",
"source_node": "macmini",
"sessions": [
{
"session_id": "test-session",
"model_provider": "test",
"model_id": "test-model",
}
],
"messages": [],
"tool_calls": [],
}
@pytest.mark.django_db
class TestBulkUpsertAPI:
def test_bulk_upsert_ok(self, client):
resp = client.post(BULK_URL, _minimal_payload(), content_type="application/json")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"
assert data["sessions_upserted"] == 1
def test_bulk_upsert_idempotent(self, client):
client.post(BULK_URL, _minimal_payload(), content_type="application/json")
resp = client.post(BULK_URL, _minimal_payload(), content_type="application/json")
data = resp.json()
assert data["sessions_upserted"] == 0
def test_bulk_upsert_missing_fields_returns_400(self, client):
resp = client.post(BULK_URL, {}, content_type="application/json")
assert resp.status_code == 400
```
- [ ] **Step 6: 运行 API 测试确认失败**
```bash
pytest tests/test_api.py -v
```
Expected: FAIL — URL not found.
- [ ] **Step 7: 实现 API view + 路由**
`src/openclaw/urls.py`:
```python
from django.urls import path
from openclaw.views import sessions_bulk_upsert
urlpatterns = [
path("sessions/bulk_upsert/", sessions_bulk_upsert, name="sessions_bulk_upsert"),
]
```
`src/openclaw/views.py`:
```python
import json
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from openclaw.services import BulkUpsertService
@csrf_exempt
@require_http_methods(["POST"])
def sessions_bulk_upsert(request):
try:
payload = json.loads(request.body)
except json.JSONDecodeError:
return JsonResponse({"error": "Invalid JSON"}, status=400)
if "agent_name" not in payload or "source_node" not in payload:
return JsonResponse(
{"error": "Missing agent_name or source_node"}, status=400
)
if "sessions" not in payload:
return JsonResponse({"error": "Missing sessions"}, status=400)
result = BulkUpsertService.upsert(payload)
return JsonResponse({"status": "ok", **result})
```
- [ ] **Step 8: 运行 API 测试确认通过**
```bash
pytest tests/test_api.py -v
```
Expected: 3 PASS.
- [ ] **Step 9: 运行所有测试**
```bash
pytest -v
```
Expected: ALL PASS (3 models + 3 services + 3 api = 9).
- [ ] **Step 10: Commit**
```bash
git add src/openclaw/services.py src/openclaw/views.py src/openclaw/urls.py tests/test_services.py tests/test_api.py
git commit -m "feat: bulk upsert API with idempotent writes"
```
---
### Task 3: Django Admin 基础配置
**Files:**
- Create: `src/openclaw/admin.py`
- [ ] **Step 1: Admin 配置**
`src/openclaw/admin.py`:
```python
from django.contrib import admin
from openclaw.models import Session, Message, ToolCall
class MessageInline(admin.TabularInline):
model = Message
extra = 0
fields = ("seq", "role", "content_text", "timestamp")
readonly_fields = ("seq", "role", "content_text", "timestamp")
def has_add_permission(self, request, obj=None):
return False
def has_delete_permission(self, request, obj=None):
return False
class ToolCallInline(admin.TabularInline):
model = ToolCall
extra = 0
fields = ("seq", "tool_name", "tool_call_id", "is_error", "duration_ms")
readonly_fields = ("seq", "tool_name", "tool_call_id", "is_error", "duration_ms")
def has_add_permission(self, request, obj=None):
return False
def has_delete_permission(self, request, obj=None):
return False
@admin.register(Session)
class SessionAdmin(admin.ModelAdmin):
list_display = (
"session_id",
"agent_name",
"model_id",
"total_tokens",
"message_count",
"start_time",
)
list_filter = ("agent_name", "source_node", "model_id", "start_time")
search_fields = ("session_id", "cwd")
ordering = ("-start_time",)
inlines = [MessageInline, ToolCallInline]
readonly_fields = (
"session_id",
"agent_name",
"source_node",
"start_time",
"end_time",
"pushed_at",
)
@admin.register(Message)
class MessageAdmin(admin.ModelAdmin):
list_display = ("message_id", "session", "role", "timestamp", "tokens_total")
list_filter = ("role", "model", "timestamp")
search_fields = ("content_text",)
ordering = ("-timestamp",)
@admin.register(ToolCall)
class ToolCallAdmin(admin.ModelAdmin):
list_display = ("tool_name", "tool_call_id", "session", "is_error", "duration_ms")
list_filter = ("tool_name", "is_error", "exit_code")
ordering = ("-created_at",)
```
- [ ] **Step 2: 手动验证 Admin**
```bash
cd src && python manage.py migrate && python manage.py createsuperuser
cd src && python manage.py runserver
```
访问 `http://localhost:8000/admin/` 确认三张表在 Admin 中可见。
- [ ] **Step 3: Commit**
```bash
git add src/openclaw/admin.py
git commit -m "feat: Django Admin for Session, Message, ToolCall with inlines"
```
---
### Task 4: Admin 自定义视图 — 按日期查询对话
**Files:**
- Modify: `src/openclaw/admin.py`
- Create: `src/openclaw/templates/admin/openclaw/daily_view.html`
- Modify: `src/openclaw/urls.py`admin 路由由 admin.site 管理,不需要改)
- [ ] **Step 1: 模板**
`src/openclaw/templates/admin/openclaw/daily_view.html`:
```html
{% extends "admin/base_site.html" %}
{% block content %}
<h1>Daily Conversation View</h1>
<form method="get">
<label>Start date: <input type="date" name="start" value="{{ start_date }}" /></label>
<label>End date: <input type="date" name="end" value="{{ end_date }}" /></label>
<label>Agent:
<select name="agent">
<option value="">All</option>
{% for agent in agents %}
<option value="{{ agent }}" {% if agent == selected_agent %}selected{% endif %}>{{ agent }}</option>
{% endfor %}
</select>
</label>
<button type="submit">Search</button>
</form>
{% if sessions %}
{% for session in sessions %}
<div class="session-block" style="margin-top: 2em; border: 1px solid #ddd; padding: 1em;">
<h2>Session: {{ session.session_id }} ({{ session.agent_name }})</h2>
<p>Model: {{ session.model_id }} | Tokens: {{ session.total_tokens }} |
Start: {{ session.start_time|default:"N/A" }}</p>
{% for msg in session.messages %}
<div class="message" data-role="{{ msg.role }}" style="padding: 0.5em; margin: 0.3em 0; border-left: 3px solid {% if msg.role == 'user' %}#4CAF50{% elif msg.role == 'assistant' %}#2196F3{% else %}#FF9800{% endif %};">
<strong>{{ msg.timestamp|date:"H:i" }} {{ msg.get_role_label }}</strong>
{% if msg.role == 'toolResult' %}
{% if msg.tool_name %}<em>[Tool: {{ msg.tool_name }}]</em>{% endif %}
{% endif %}
<details>
<summary>Content</summary>
<pre style="white-space: pre-wrap; word-break: break-word;">{{ msg.content_text|default:"(empty)" }}</pre>
</details>
</div>
{% empty %}
<p>No messages.</p>
{% endfor %}
</div>
{% endfor %}
{% endif %}
{% endblock %}
```
- [ ] **Step 2: Admin 视图**
添加到 `src/openclaw/admin.py` 末尾:
```python
from datetime import date
from django.shortcuts import render
from django.db.models import Prefetch
class DailyConversationView(admin.ModelAdmin):
"""Admin custom view for date-range conversation browsing."""
def get_urls(self):
from django.urls import path
urls = super().get_urls()
custom_urls = [
path("daily/", self.admin_site.admin_view(self.daily_view), name="openclaw_daily"),
]
return custom_urls + urls
def daily_view(self, request):
start_str = request.GET.get("start")
end_str = request.GET.get("end")
agent_filter = request.GET.get("agent", "")
start_date = start_str if start_str else date.today().isoformat()
end_date = end_str if end_str else date.today().isoformat()
agents = list(
Session.objects.values_list("agent_name", flat=True)
.distinct()
.order_by("agent_name")
)
sessions_qs = Session.objects.filter(
start_time__date__gte=start_date,
start_time__date__lte=end_date,
).order_by("start_time")
if agent_filter:
sessions_qs = sessions_qs.filter(agent_name=agent_filter)
messages_prefetch = Prefetch(
"messages",
queryset=Message.objects.order_by("seq"),
)
sessions_qs = sessions_qs.prefetch_related(messages_prefetch)
session_list = []
for session in sessions_qs:
messages = []
for msg in session.messages.all():
messages.append({
"timestamp": msg.timestamp,
"role": msg.role,
"content_text": msg.content_text,
"tool_name": msg.tool_name,
"get_role_label": self._role_label(msg.role),
})
session_list.append({
"session_id": session.session_id,
"agent_name": session.agent_name,
"model_id": session.model_id,
"total_tokens": session.total_tokens,
"start_time": session.start_time,
"messages": messages,
})
context = dict(
self.admin_site.each_context(request),
start_date=start_date,
end_date=end_date,
selected_agent=agent_filter,
agents=agents,
sessions=session_list,
title="Daily Conversation View",
)
return render(request, "admin/openclaw/daily_view.html", context)
@staticmethod
def _role_label(role):
labels = {
"user": "User",
"assistant": "Assistant",
"toolResult": "Tool Result",
}
return labels.get(role, role)
# Register Daily view via SessionAdmin:
# In SessionAdmin inheritance, add daily_view url.
# Actually, the cleanest way: create a standalone admin view.
```
等等 — `DailyConversationView` 继承自 `ModelAdmin` 但并不需要绑定到一个 model。正确做法是用一个独立 view 函数注册到 admin。让我重写
```python
# Add this function to src/openclaw/admin.py
from django.template.response import TemplateResponse
def daily_conversation_view(request):
"""Admin standalone view for date-range conversation browsing."""
from datetime import date
from django.db.models import Prefetch
from openclaw.models import Session, Message
start_str = request.GET.get("start")
end_str = request.GET.get("end")
agent_filter = request.GET.get("agent", "")
start_date = start_str if start_str else date.today().isoformat()
end_date = end_str if end_str else date.today().isoformat()
agents = list(
Session.objects.values_list("agent_name", flat=True)
.distinct()
.order_by("agent_name")
)
sessions_qs = Session.objects.filter(
start_time__date__gte=start_date,
start_time__date__lte=end_date,
).order_by("start_time")
if agent_filter:
sessions_qs = sessions_qs.filter(agent_name=agent_filter)
messages_prefetch = Prefetch(
"messages",
queryset=Message.objects.order_by("seq"),
)
sessions_qs = sessions_qs.prefetch_related(messages_prefetch)
role_labels = {
"user": "User",
"assistant": "Assistant",
"toolResult": "Tool Result",
}
session_list = []
for session in sessions_qs:
messages = []
for msg in session.messages.all():
messages.append({
"timestamp": msg.timestamp,
"role": msg.role,
"content_text": msg.content_text,
"tool_name": msg.tool_name,
"get_role_label": role_labels.get(msg.role, msg.role),
})
session_list.append({
"session_id": session.session_id,
"agent_name": session.agent_name,
"model_id": session.model_id,
"total_tokens": session.total_tokens,
"start_time": session.start_time,
"messages": messages,
})
context = {
**admin.site.each_context(request),
"start_date": start_date,
"end_date": end_date,
"selected_agent": agent_filter,
"agents": agents,
"sessions": session_list,
"title": "Daily Conversation View",
}
return TemplateResponse(request, "admin/openclaw/daily_view.html", context)
# Add URL in SessionAdmin or as standalone:
# Register the URL via custom admin urlpattern.
class CustomAdminSite(admin.AdminSite):
def get_urls(self):
from django.urls import path
urls = super().get_urls()
urls += [
path("openclaw/daily/", daily_conversation_view, name="openclaw_daily"),
]
return urls
# Then swap admin.site in config/urls.py and manage.py
```
Actually, this is getting complicated with swapping the admin site. The simpler approach is to add the URL through an existing ModelAdmin's `get_urls`. Let me rewrite this section cleanly:
添加到 `src/openclaw/admin.py`(接在 ToolCallAdmin 后面):
```python
from django.template.response import TemplateResponse
from datetime import date
from django.db.models import Prefetch
def daily_conversation_view(self, request):
"""Admin standalone view for date-range conversation browsing."""
start_str = request.GET.get("start")
end_str = request.GET.get("end")
agent_filter = request.GET.get("agent", "")
start_date = start_str if start_str else date.today().isoformat()
end_date = end_str if end_str else date.today().isoformat()
agents = list(
Session.objects.values_list("agent_name", flat=True)
.distinct()
.order_by("agent_name")
)
sessions_qs = Session.objects.filter(
start_time__date__gte=start_date,
start_time__date__lte=end_date,
).order_by("start_time")
if agent_filter:
sessions_qs = sessions_qs.filter(agent_name=agent_filter)
messages_prefetch = Prefetch(
"messages",
queryset=Message.objects.order_by("seq"),
)
sessions_qs = sessions_qs.prefetch_related(messages_prefetch)
role_labels = {
"user": "User",
"assistant": "Assistant",
"toolResult": "Tool Result",
}
session_list = []
for session in sessions_qs:
messages = []
for msg in session.messages.all():
messages.append({
"timestamp": msg.timestamp,
"role": msg.role,
"content_text": msg.content_text,
"tool_name": msg.tool_name,
"get_role_label": role_labels.get(msg.role, msg.role),
})
session_list.append({
"session_id": session.session_id,
"agent_name": session.agent_name,
"model_id": session.model_id,
"total_tokens": session.total_tokens,
"start_time": session.start_time,
"messages": messages,
})
context = {
**admin.site.each_context(request),
"start_date": start_date,
"end_date": end_date,
"selected_agent": agent_filter,
"agents": agents,
"sessions": session_list,
"title": "Daily Conversation View",
}
return TemplateResponse(request, "admin/openclaw/daily_view.html", context)
# Add URL through SessionAdmin get_urls
class SessionAdmin(admin.ModelAdmin):
# ... (as defined above)
...
def get_urls(self):
from django.urls import path
urls = super().get_urls()
custom_urls = [
path("daily/", admin.site.admin_view(daily_conversation_view), name="openclaw_daily"),
]
return custom_urls + urls
```
好的,为了避免重复定义 SessionAdmin最终 `src/openclaw/admin.py` 完整代码如下:
```python
from django.contrib import admin
from django.template.response import TemplateResponse
from datetime import date
from django.db.models import Prefetch
from openclaw.models import Session, Message, ToolCall
class MessageInline(admin.TabularInline):
model = Message
extra = 0
fields = ("seq", "role", "content_text", "timestamp")
readonly_fields = ("seq", "role", "content_text", "timestamp")
def has_add_permission(self, request, obj=None):
return False
def has_delete_permission(self, request, obj=None):
return False
class ToolCallInline(admin.TabularInline):
model = ToolCall
extra = 0
fields = ("seq", "tool_name", "tool_call_id", "is_error", "duration_ms")
readonly_fields = ("seq", "tool_name", "tool_call_id", "is_error", "duration_ms")
def has_add_permission(self, request, obj=None):
return False
def has_delete_permission(self, request, obj=None):
return False
def daily_conversation_view(request):
"""Admin standalone view for date-range conversation browsing."""
start_str = request.GET.get("start")
end_str = request.GET.get("end")
agent_filter = request.GET.get("agent", "")
start_date = start_str if start_str else date.today().isoformat()
end_date = end_str if end_str else date.today().isoformat()
agents = list(
Session.objects.values_list("agent_name", flat=True)
.distinct()
.order_by("agent_name")
)
sessions_qs = Session.objects.filter(
start_time__date__gte=start_date,
start_time__date__lte=end_date,
).order_by("start_time")
if agent_filter:
sessions_qs = sessions_qs.filter(agent_name=agent_filter)
messages_prefetch = Prefetch(
"messages",
queryset=Message.objects.order_by("seq"),
)
sessions_qs = sessions_qs.prefetch_related(messages_prefetch)
role_labels = {
"user": "User",
"assistant": "Assistant",
"toolResult": "Tool Result",
}
session_list = []
for session in sessions_qs:
messages = []
for msg in session.messages.all():
messages.append({
"timestamp": msg.timestamp,
"role": msg.role,
"content_text": msg.content_text,
"tool_name": msg.tool_name,
"get_role_label": role_labels.get(msg.role, msg.role),
})
session_list.append({
"session_id": session.session_id,
"agent_name": session.agent_name,
"model_id": session.model_id,
"total_tokens": session.total_tokens,
"start_time": session.start_time,
"messages": messages,
})
context = {
**admin.site.each_context(request),
"start_date": start_date,
"end_date": end_date,
"selected_agent": agent_filter,
"agents": agents,
"sessions": session_list,
"title": "Daily Conversation View",
}
return TemplateResponse(request, "admin/openclaw/daily_view.html", context)
@admin.register(Session)
class SessionAdmin(admin.ModelAdmin):
list_display = (
"session_id",
"agent_name",
"model_id",
"total_tokens",
"message_count",
"start_time",
)
list_filter = ("agent_name", "source_node", "model_id", "start_time")
search_fields = ("session_id", "cwd")
ordering = ("-start_time",)
inlines = [MessageInline, ToolCallInline]
readonly_fields = (
"session_id",
"agent_name",
"source_node",
"start_time",
"end_time",
"pushed_at",
)
def get_urls(self):
from django.urls import path
urls = super().get_urls()
custom_urls = [
path("daily/", admin.site.admin_view(daily_conversation_view), name="openclaw_daily"),
]
return custom_urls + urls
@admin.register(Message)
class MessageAdmin(admin.ModelAdmin):
list_display = ("message_id", "session", "role", "timestamp", "tokens_total")
list_filter = ("role", "model", "timestamp")
search_fields = ("content_text",)
ordering = ("-timestamp",)
@admin.register(ToolCall)
class ToolCallAdmin(admin.ModelAdmin):
list_display = ("tool_name", "tool_call_id", "session", "is_error", "duration_ms")
list_filter = ("tool_name", "is_error", "exit_code")
ordering = ("-created_at",)
```
- [ ] **Step 3: 添加 Admin 导航链接**
在 SessionAdmin 的 `list_display` 上方加一个自定义链接按钮(方便入口),或者在模板侧边栏直接访问 `/admin/sessions/session/daily/`
- [ ] **Step 4: 手动验证**
```bash
cd src && python manage.py runserver
```
- 注册一个 session + message 测试数据
- `http://localhost:8000/admin/openclaw/session/daily/` 确认自定义视图可访问
- 选择日期范围,确认显示 session 列表和对话
- [ ] **Step 5: Commit**
```bash
git add src/openclaw/admin.py src/openclaw/templates/
git commit -m "feat: admin daily conversation view with date filtering"
```
---
### Task 5: Daily Markdown 导出Admin Action
**Files:**
- Create: `src/openclaw/export.py`
- Create: `tests/test_admin_export.py`
- [ ] **Step 1: 写导出测试**
`tests/test_admin_export.py`:
```python
"""Tests for Markdown export functionality."""
from datetime import datetime, timezone
import pytest
from openclaw.models import Session, Message
from openclaw.export import generate_markdown_report
@pytest.fixture
def db_session(db):
s = Session.objects.create(
session_id="report-test",
agent_name="xingyao",
source_node="macmini",
model_provider="anthropic",
model_id="claude-sonnet-4-6",
total_tokens=45230,
start_time=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc),
)
Message.objects.create(
session=s,
message_id="m1",
parent_id="root",
role="user",
content_text="Help me fix this bug",
timestamp=datetime(2026, 4, 5, 10, 23, tzinfo=timezone.utc),
)
Message.objects.create(
session=s,
message_id="m2",
parent_id="m1",
role="assistant",
content_text="The bug is on line 45...",
timestamp=datetime(2026, 4, 5, 10, 23, 30, tzinfo=timezone.utc),
)
return s
@pytest.mark.django_db
class TestMarkdownExport:
def test_basic_report(self, db_session):
md = generate_markdown_report(
messages=db_session.messages.order_by("created_at"),
date_str="2026-04-05",
)
assert "# Daily Report: 2026-04-05" in md
assert "Help me fix this bug" in md
assert "The bug is on line 45..." in md
def test_thinking_content_stripped(self, db):
s = Session.objects.create(
session_id="thinking-test",
agent_name="test",
source_node="macmini",
start_time=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc),
)
Message.objects.create(
session=s,
message_id="m3",
parent_id="root",
role="assistant",
content_text="Final answer",
raw_content=[
{"type": "thinking", "thinking": "Let me think about this..."},
{"type": "text", "text": "Final answer"},
],
timestamp=datetime(2026, 4, 5, 10, 30, tzinfo=timezone.utc),
)
md = generate_markdown_report(
messages=s.messages.order_by("created_at"),
date_str="2026-04-05",
)
assert "Let me think about this..." not in md
assert "Final answer" in md
def test_tool_call_formatting(self, db):
s = Session.objects.create(
session_id="tool-test",
agent_name="test",
source_node="macmini",
model_id="test-model",
total_tokens=100,
start_time=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc),
)
Message.objects.create(
session=s,
message_id="m4",
parent_id="root",
role="assistant",
content_text="I'll run a command",
raw_content=[
{"type": "text", "text": "I'll run a command"},
],
timestamp=datetime(2026, 4, 5, 10, 30, tzinfo=timezone.utc),
)
md = generate_markdown_report(
messages=s.messages.order_by("created_at"),
date_str="2026-04-05",
)
assert "test-model" in md
assert "100" in md
```
- [ ] **Step 2: 运行测试确认失败**
```bash
pytest tests/test_admin_export.py -v
```
Expected: FAIL — module not found.
- [ ] **Step 3: 实现 Markdown 导出**
`src/openclaw/export.py`:
```python
from openclaw.models import Session
def generate_markdown_report(messages, date_str, sessions=None):
"""Generate a markdown daily report.
Args:
messages: QuerySet of Message objects, ordered by timestamp.
date_str: Date string for the report header (YYYY-MM-DD).
sessions: Optional dict of session_id -> Session instance for metadata.
"""
if sessions is None:
sessions = {}
lines = [f"# Daily Report: {date_str}", ""]
# Group messages by session
session_messages = {}
for msg in messages:
sid = msg.session_id if hasattr(msg, "session_id") else msg.session.session_id
if sid not in session_messages:
session_messages[sid] = []
session_messages[sid].append(msg)
for session_id, msgs in session_messages.items():
session = sessions.get(session_id)
if session:
lines.append(
f"## Session: {session_id} (Agent: {session.agent_name})"
)
lines.append(
f"**Model**: {session.model_id or 'N/A'} | "
f"**Token**: {session.total_tokens:,}"
)
else:
lines.append(f"## Session: {session_id}")
lines.append("")
for msg in msgs:
role_label = {
"user": "User",
"assistant": "Assistant",
"toolResult": "Tool Result",
}.get(msg.role, msg.role)
time_str = msg.timestamp.strftime("%H:%M")
# For assistant messages, check raw_content for tool_call mentions
if msg.role == "assistant":
tool_info = _extract_tool_info(msg.raw_content)
lines.append(f"### {time_str} {role_label}")
if msg.content_text:
lines.append("")
lines.append(msg.content_text)
for tool in tool_info:
lines.append("")
lines.append(
f"**{time_str} {role_label} -> [Tool: {tool['name']}]**"
)
lines.append("")
lines.append(f"`{tool.get('arguments', '')}`")
if tool.get("result"):
lines.append("")
lines.append(f'**Result**: {tool["result"]}')
elif msg.role == "toolResult":
continue # toolResult handled inline with assistant
else:
lines.append(f"### {time_str} {role_label}")
if msg.content_text:
lines.append("")
lines.append(msg.content_text)
lines.append("")
lines.append("---")
lines.append("")
return "\n".join(lines)
def _extract_tool_info(raw_content):
"""Extract tool call info from message raw_content JSON."""
tools = []
if isinstance(raw_content, list):
for block in raw_content:
if isinstance(block, dict) and block.get("type") == "toolCall":
tool_name = block.get("tool_name") or block.get("name", "unknown")
args = block.get("arguments", {})
if isinstance(args, str):
args_str = args[:200]
else:
args_str = str(args)[:200]
tools.append({
"name": tool_name,
"arguments": args_str,
"result": "", # Will be filled later from toolResult
})
return tools
def export_daily_markdown(sessions_queryset):
"""Generate a markdown file from a QuerySet of Session objects.
Returns (markdown_string, filename).
Fetches all messages for the sessions.
"""
messages = Message.objects.filter(
session__in=sessions_queryset
).order_by("timestamp")
sessions_map = {s.session_id: s for s in sessions_queryset}
# Determine date from first session start_time
first_session = sessions_queryset.order_by("start_time").first()
if first_session and first_session.start_time:
date_str = first_session.start_time.strftime("%Y-%m-%d")
else:
date_str = "export"
md = generate_markdown_report(messages, date_str, sessions_map)
filename = f"daily-report-{date_str}.md"
return md, filename
```
- [ ] **Step 4: 运行导出测试**
```bash
pytest tests/test_admin_export.py -v
```
Expected: 3 PASS.
- [ ] **Step 5: 注册 Admin Action**
修改 `src/openclaw/admin.py`,在 SessionAdmin 中添加 action
```python
from openclaw.export import export_daily_markdown
from django.http import HttpResponse
@admin.action(description="Export selected sessions to Markdown")
def export_to_markdown(modeladmin, request, queryset):
md, filename = export_daily_markdown(queryset)
response = HttpResponse(md, content_type="text/markdown")
response["Content-Disposition"] = f'attachment; filename="{filename}"'
return response
@admin.register(Session)
class SessionAdmin(admin.ModelAdmin):
# ... existing code
actions = [export_to_markdown]
...
```
完整文件在 commit 时需要合并以上修改。
- [ ] **Step 6: 手动验证**
```bash
cd src && python manage.py runserver
```
在 Admin Session 列表勾选 sessions → 选择 "Export selected sessions to Markdown" → 确认下载文件,内容为 Markdown 格式对话。
- [ ] **Step 7: Commit**
```bash
git add src/openclaw/export.py tests/test_admin_export.py src/openclaw/admin.py
git commit -m "feat: daily Markdown export admin action"
```
---
### Task 6: 客户端 JSONL 解析与推送脚本
**Files:**
- Create: `scripts/sync_sessions.py`
- [ ] **Step 1: 创建脚本**
`scripts/sync_sessions.py`:
```python
#!/usr/bin/env python
"""
OpenClaw Session Sync Script
Scans local agent sessions directories, parses JSONL files,
and pushes structured JSON to the Django API.
Usage:
python sync_sessions.py --remote-url http://macmini:8000/api/sessions/bulk_upsert/
Cron:
0 2 * * * cd /path/to/scripts && python sync_sessions.py --remote-url <url>
"""
import argparse
import json
import os
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
# ─────────────────────────────────────────────────────────────────
# Configuration
# ─────────────────────────────────────────────────────────────────
SESSIONS_DIR_NAME = "sessions"
STATE_FILE = ".sync_state"
DELETED_SUFFIX = ".deleted."
# ─────────────────────────────────────────────────────────────────
# File Discovery
# ─────────────────────────────────────────────────────────────────
def find_sessions(root_path):
"""Walk root_path/agents/*/sessions/ and yield (agent_name, jsonl_path)."""
agents_dir = Path(root_path) / "agents"
if not agents_dir.exists():
return
for agent_folder in sorted(agents_dir.iterdir()):
if not agent_folder.is_dir():
continue
sessions_dir = agent_folder / SESSIONS_DIR_NAME
if not sessions_dir.exists():
continue
agent_name = agent_folder.name
for jsonl_file in sorted(sessions_dir.glob("*.jsonl")):
if DELETED_SUFFIX in jsonl_file.name:
continue
yield agent_name, str(jsonl_file)
def get_sync_state(sessions_dir):
"""Read .sync_state from sessions directory, return {path: mtime}."""
state_path = Path(sessions_dir) / STATE_FILE
if not state_path.exists():
return {}
try:
with open(state_path) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {}
def save_sync_state(sessions_dir, state):
"""Write .sync_state file."""
state_path = Path(sessions_dir) / STATE_FILE
with open(state_path, "w") as f:
json.dump(state, f)
def get_new_files(root_path):
"""Find files that are new or modified since last sync."""
state = {}
all_sessions_dirs = set()
agents_dir = Path(root_path) / "agents"
if agents_dir.exists():
for agent_folder in agents_dir.iterdir():
if agent_folder.is_dir():
sessions_dir = agent_folder / SESSIONS_DIR_NAME
if sessions_dir.exists():
all_sessions_dirs.add(str(sessions_dir))
# Load existing state from all session dirs
merged_state = {}
for sd in all_sessions_dirs:
sd_state = get_sync_state(sd)
merged_state.update(sd_state)
new_files = []
for agent_name, jsonl_path in find_sessions(root_path):
stat = os.stat(jsonl_path)
mtime = stat.st_mtime
file_key = jsonl_path
old_mtime = merged_state.get(file_key, 0)
if mtime > old_mtime:
new_files.append((agent_name, jsonl_path))
merged_state[file_key] = mtime
# Save new state
for sd in all_sessions_dirs:
dir_files = {k: v for k, v in merged_state.items() if k.startswith(sd)}
save_sync_state(sd, dir_files)
return new_files
# ─────────────────────────────────────────────────────────────────
# JSONL Parser
# ─────────────────────────────────────────────────────────────────
def parse_jsonl(file_path):
"""Parse a JSONL file and return structured session data."""
sessions = []
messages = []
tool_calls = []
# State tracking for model/thinking changes
current_model_provider = ""
current_model_id = ""
current_thinking_level = ""
# Tool results lookup by tool_call_id
tool_results = {}
events = []
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
events.append(event)
except json.JSONDecodeError:
continue
if not events:
return sessions, messages, tool_calls
# First pass: extract session metadata
session_event = None
for event in events:
event_type = event.get("type", "")
if event_type == "session":
session_event = event
break
if not session_event:
return sessions, messages, tool_calls
session_id = session_event.get("id", "")
session_timestamp = session_event.get("timestamp", "")
session_cwd = session_event.get("cwd", "")
session_version = events[-1].get("version", 0) if events else 0
# Determine start and end time from all events
timestamps = []
for event in events:
ts = event.get("timestamp", "")
if ts:
timestamps.append(ts)
# Second pass: process events
message_seq = 0
total_tokens = 0
total_cost = 0.0
message_count = 0
tool_call_count = 0
error_count = 0
for event in events:
event_type = event.get("type", "")
if event_type == "model_change":
current_model_provider = event.get("provider", "")
current_model_id = event.get("modelId", "")
elif event_type == "thinking_level_change":
current_thinking_level = event.get("thinkingLevel", "")
elif event_type == "message":
role = event.get("role", "")
msg_id = event.get("id", "")
parent_id = event.get("parentId", "")
msg_timestamp = event.get("timestamp", "")
# Extract text content (skip thinking)
content_items = event.get("content", [])
text_parts = []
tc_list = []
for item in content_items:
if isinstance(item, dict):
if item.get("type") == "text":
text_parts.append(item.get("text", ""))
elif item.get("type") == "toolCall":
tc_list.append(item)
# Skip thinking types
content_text = "\n".join(text_parts)
msg_data = {
"session_id": session_id,
"message_id": msg_id,
"parent_id": parent_id,
"seq": message_seq,
"role": role,
"content_text": content_text,
"raw_content": content_items if content_items else [],
"raw_message": event.get("content", []),
"timestamp": msg_timestamp,
}
if role == "assistant":
usage = event.get("usage", {})
msg_data.update({
"model": current_model_id,
"provider": current_model_provider,
"stop_reason": event.get("stopReason", ""),
"tokens_input": usage.get("inputTokens", 0),
"tokens_output": usage.get("outputTokens", 0),
"tokens_cache_read": usage.get("cacheReadInputTokens", 0),
"tokens_cache_write": usage.get("cacheWriteInputTokens", 0),
"tokens_total": usage.get("totalTokens", 0),
})
total_tokens += usage.get("totalTokens", 0)
if event.get("cost"):
cost_val = event["cost"].get("total", 0.0)
msg_data["cost_total"] = cost_val
total_cost += cost_val
message_count += 1
elif role == "toolResult":
msg_data.update({
"tool_call_id": event.get("toolCallId", ""),
"tool_name": event.get("toolName", ""),
"is_error": event.get("isError", False),
"exit_code": event.get("exitCode"),
"duration_ms": event.get("durationMs"),
})
if event.get("isError"):
error_count += 1
# Store for tool call association
if event.get("toolCallId"):
tool_results[event["toolCallId"]] = {
"result_text": content_text,
"is_error": event.get("isError", False),
"exit_code": event.get("exitCode"),
"duration_ms": event.get("durationMs"),
}
messages.append(msg_data)
message_seq += 1
# Extract tool calls from assistant messages
tc_seq = 0
for tc in tc_list:
tool_call_data = {
"session_id": session_id,
"message_id": msg_id,
"tool_call_id": tc.get("id", f"call_{msg_id}_{tc_seq}"),
"tool_name": tc.get("name", "unknown"),
"arguments": tc.get("arguments", {}),
"seq": tc_seq,
}
# Enrich with tool result if available
tr = tool_results.get(tool_call_data["tool_call_id"], {})
tool_call_data["result_text"] = tr.get("result_text", "")
tool_call_data["is_error"] = tr.get("is_error", False)
tool_call_data["exit_code"] = tr.get("exit_code")
tool_call_data["duration_ms"] = tr.get("duration_ms")
tool_calls.append(tool_call_data)
tool_call_count += 1
tc_seq += 1
# Build session record
start_time = timestamps[0] if timestamps else session_timestamp
end_time = timestamps[-1] if timestamps else session_timestamp
session_data = {
"session_id": session_id,
"session_version": session_version,
"model_provider": current_model_provider,
"model_id": current_model_id,
"thinking_level": current_thinking_level,
"start_time": start_time,
"end_time": end_time,
"cwd": session_cwd,
"total_tokens": total_tokens,
"total_cost": total_cost,
"message_count": message_count,
"tool_call_count": tool_call_count,
"error_count": error_count,
"raw_file_path": str(file_path),
"status": "active",
"metadata": {},
}
sessions.append(session_data)
return sessions, messages, tool_calls
# ─────────────────────────────────────────────────────────────────
# HTTP Client
# ─────────────────────────────────────────────────────────────────
def push_to_api(remote_url, payload):
"""POST structured JSON to Django API."""
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
remote_url,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=120) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
print(f"HTTP Error {e.code}: {e.read().decode('utf-8', errors='replace')}")
raise
except urllib.error.URLError as e:
print(f"URL Error: {e.reason}")
raise
# ─────────────────────────────────────────────────────────────────
# Main
# ─────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Sync OpenClaw sessions to Django API")
parser.add_argument(
"--remote-url",
required=True,
help="Django API bulk_upsert endpoint URL",
)
parser.add_argument(
"--root-path",
default=".",
help="Root path containing agents/ directory (default: current dir)",
)
args = parser.parse_args()
new_files = get_new_files(args.root_path)
if not new_files:
print("No new or modified session files found.")
return
print(f"Found {len(new_files)} new/modified session(s).")
total_sessions = 0
total_messages = 0
total_tool_calls = 0
# Group by agent_name (batch per agent)
agent_batches = {}
for agent_name, jsonl_path in new_files:
agent_batches.setdefault(agent_name, []).append(jsonl_path)
for agent_name, file_paths in agent_batches.items():
all_sessions = []
all_messages = []
all_tool_calls = []
for fp in file_paths:
print(f" Parsing: {fp}")
try:
sessions, messages, tool_calls = parse_jsonl(fp)
all_sessions.extend(sessions)
all_messages.extend(messages)
all_tool_calls.extend(tool_calls)
except Exception as e:
print(f" ERROR parsing {fp}: {e}")
continue
if not all_sessions:
continue
payload = {
"agent_name": agent_name,
"source_node": os.environ.get("SOURCE_NODE", "unknown"),
"sessions": all_sessions,
"messages": all_messages,
"tool_calls": all_tool_calls,
}
print(f" Pushing {len(all_sessions)} session(s), "
f"{len(all_messages)} message(s), "
f"{len(all_tool_calls)} tool call(s)...")
try:
result = push_to_api(args.remote_url, payload)
print(f" OK: {result}")
total_sessions += result.get("sessions_upserted", 0)
total_messages += result.get("messages_upserted", 0)
total_tool_calls += result.get("tool_calls_upserted", 0)
except Exception:
print(f" FAILED to push {agent_name} sessions.")
print(f"\nSync complete: {total_sessions} sessions, "
f"{total_messages} messages, {total_tool_calls} tool calls pushed.")
if __name__ == "__main__":
main()
```
- [ ] **Step 2: 语法检查**
```bash
python -m py_compile scripts/sync_sessions.py
```
Expected: no output (success).
- [ ] **Step 3: Commit**
```bash
git add scripts/sync_sessions.py
git commit -m "feat: client JSONL parse and push script"
```
---
### Task 7: Docker 部署配置
**Files:**
- Create: `Dockerfile`, `docker-compose.yml`, `.dockerignore`
- Create: `nginx/nginx.conf.placeholder`
- [ ] **Step 1: .dockerignore**
`.dockerignore`:
```
__pycache__
*.pyc
.venv
.env
*.egg-info
.git
.pytest_cache
.mypy_cache
tests/
scripts/
docs/
db.sqlite3
```
- [ ] **Step 2: Dockerfile**
`Dockerfile`:
```dockerfile
FROM python:3.12-slim
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential libpq-dev \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE ${DJANGO_PORT:-8000}
CMD ["gunicorn", "--bind", "0.0.0.0:${DJANGO_PORT:-8000}", \
"--workers", "4", "--timeout", "120", \
"config.wsgi:application"]
```
- [ ] **Step 3: docker-compose.yml**
`docker-compose.yml`:
```yaml
services:
web:
build:
context: .
dockerfile: Dockerfile
container_name: openclaw-archive
env_file:
- .env
ports:
- "${DJANGO_PORT:-8000}:${DJANGO_PORT:-8000}"
volumes:
- static_volume:/app/staticfiles
- jsonl_archive:/app/archive
restart: unless-stopped
# nginx placeholder (uncomment for production)
# nginx:
# image: nginx:alpine
# container_name: openclaw-nginx
# ports:
# - "80:80"
# volumes:
# - ./nginx/nginx.conf:/etc/nginx/nginx.conf
# - static_volume:/app/staticfiles:ro
# depends_on:
# - web
volumes:
static_volume:
jsonl_archive:
```
- [ ] **Step 4: Nginx placeholder**
`nginx/nginx.conf.placeholder`:
```nginx
# Nginx reverse proxy placeholder for OpenClaw Archive
#
# To enable:
# 1. Rename this file to nginx.conf
# 2. Update server_name and SSL certificate paths
# 3. Uncomment the nginx service in docker-compose.yml
upstream django {
server web:8000;
}
server {
listen 80;
server_name _; # Update for production
location /static/ {
alias /app/staticfiles/;
}
location / {
proxy_pass http://django;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
```
- [ ] **Step 5: Settings 更新 — 读取环境变量**
`src/config/settings/base.py` 已使用 `os.environ.get()` 读取环境变量,无需修改。
- [ ] **Step 6: Commit**
```bash
git add Dockerfile docker-compose.yml .dockerignore nginx/nginx.conf.placeholder
git commit -m "feat: Docker Compose deployment configuration"
```
---
### Task 8: TimescaleDB Hypertable Migration
**Files:**
- Create: `src/openclaw/migrations/XXXX_add_hypertables.py` (数字取决于之前的 migration)
⚠️ 此 task 需要真实的 PostgreSQL + TimescaleDB 环境。开发时可以用 `--fake` 跳过,生产环境必须真实执行。
- [ ] **Step 1: 创建 empty migration**
```bash
cd src && python manage.py makemigrations openclaw --empty -n add_hypertables
```
- [ ] **Step 2: 编辑 migration**
生成的文件 `src/openclaw/migrations/XXXX_add_hypertables.py`:
```python
from django.db import migrations
CREATE_HYPERTABLES = """
SELECT create_hypertable(
'sessions',
'start_time',
if_not_exists => TRUE
);
SELECT create_hypertable(
'messages',
'timestamp',
if_not_exists => TRUE
);
"""
class Migration(migrations.Migration):
dependencies = [
('openclaw', 'PREVIOUS_MIGRATION_NUMBER'), # Update this
]
operations = [
migrations.RunSQL(CREATE_HYPERTABLES, migrations.RunSQL.noop),
]
```
- [ ] **Step 3: Commit**
```bash
git add src/openclaw/migrations/
git commit -m "feat: TimescaleDB hypertable migrations"
```
---
### Task 9: 端到端测试与 README
**Files:**
- Create: `README.md` (更新)
- [ ] **Step 1: 更新 README**
`README.md`:
```markdown
# agent-base
OpenClaw Session Archive — 多 Agent 会话解析与归档系统。
## Architecture
Three nodes (Mac Mini, Ubuntu1, Ubuntu2) run a sync script that parses local JSONL session files and pushes structured data to a central Django API. The Django service runs in Docker Compose and stores data in a remote PostgreSQL + TimescaleDB instance on NAS.
## Quick Start
```bash
# 1. Configure environment
cp .env.example .env
# Edit .env with your database credentials and Django settings
# 2. Build and start
docker compose build
docker compose run --rm web python manage.py migrate
docker compose run --rm web python manage.py createsuperuser
docker compose up -d
# 3. Access
# Django Admin: http://<host>:8000/admin/
# API: http://<host>:8000/api/sessions/bulk_upsert/
```
## Running Tests
```bash
pip install -r requirements-dev.txt
pytest -v
```
## Client Sync Script
Deploy `scripts/sync_sessions.py` on each node:
```bash
python sync_sessions.py --remote-url http://<django-host>:8000/api/sessions/bulk_upsert/
```
Set `SOURCE_NODE` environment variable on each node (`macmini`, `ubuntu1`, `ubuntu2`).
## Daily Export
In Django Admin, select sessions and choose "Export selected sessions to Markdown" action.
```
- [ ] **Step 2: 端到端测试流程**
```bash
# 1. 用 SQLite 跑通完整流程
cd src
python manage.py migrate
python manage.py createsuperuser # 可选
# 2. 用 pytest fixture 创建测试数据push 到 API
cd ..
pytest -v
# 3. 启动 Django手动调 sync_sessions.py用测试用的 JSONL 文件)验证
cd src && python manage.py runserver &
cd ..
python scripts/sync_sessions.py --remote-url http://localhost:8000/api/sessions/bulk_upsert/ --root-path /path/to/test/agents
```
- [ ] **Step 3: Commit**
```bash
git add README.md
git commit -m "docs: update README with architecture and quick start"
```
---
## Self-Review
### 1. Spec Coverage
| Spec Section | Covered By |
|-----|-----|
| 4.1 Session 表 | Task 1 - Session model |
| 4.2 Message 表 | Task 1 - Message model |
| 4.3 ToolCall 表 | Task 1 - ToolCall model |
| 4.4 TimescaleDB hypertable | Task 8 - hypertable migration |
| 5.1 推送脚本 | Task 6 - sync_sessions.py |
| 5.2 定时任务 | Task 6 - script + cron usage |
| 6. API bulk_upsert | Task 2 - views.py + services.py |
| 7.1-7.4 Admin CRUD | Task 3 - admin.py |
| 7.5 按时间范围查询 | Task 4 - daily_view.html + admin view |
| 7.6 Daily Markdown 导出 | Task 5 - export.py + Admin action |
| 8.1 客户端解析 | Task 6 - parse_jsonl function |
| 8.2 服务端写入 | Task 2 - BulkUpsertService |
| 9. 技术栈 | All tasks |
| 10. Docker Compose | Task 7 |
| 12. 非技术决策 | Task 1 (unique_together), Task 6 (skip .deleted.), Task 6 (SOURCE_NODE env) |
### 2. Placeholder Scan
No TBD, TODO, or "similar to" patterns found. All code blocks contain actual implementations.
### 3. Type Consistency
- `session_id` is consistently a `CharField` (not UUIDField) — matches API usage
- `agent_name + session_id` unique_together used consistently in both model and service for idempotency
- `timestamps` use ISO 8601 strings in API, parsed by `_parse_ts()` in service
- `role_labels` dict in admin matches actual DB role values: `"user"`, `"assistant"`, `"toolResult"`
One note: `created_at` and `updated_at` fields are auto-added to all models. The spec doesn't mention them but they're standard Django practice and harmless.
---