77 KiB
OpenClaw Session Archive 实施计划
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: 构建一个 Django + DRF 项目,包含数据模型、批量写入 API、自定义 Admin 视图、每日 Markdown 导出,以及客户端 JSONL 解析推送脚本,通过 Docker Compose 部署。
Architecture: 三节点各运行一个独立解析脚本(纯 Python),将 JSONL 解析为结构化 JSON 后 POST 到 Django API。Django 服务容器化运行,连接远程 NAS 上的 PostgreSQL + TimescaleDB。
Tech Stack: Python 3.12, Django 5.x, DRF 3.15+, psycopg 3.x, PostgreSQL 16+ TimescaleDB, Docker Compose, pytest, pytest-django
文件结构总览
agent-base/
├── manage.py # Django 入口
├── requirements.txt # Python 依赖
├── requirements-dev.txt # 开发依赖(pytest 等)
├── pyproject.toml # pytest 配置
├── .env.example # 环境变量模板
├── .gitignore
├── .dockerignore
├── Dockerfile # Django 生产镜像
├── docker-compose.yml # 编排配置
├── nginx/
│ └── nginx.conf.placeholder # Nginx 反代占位(预留)
├── src/
│ ├── config/ # Django 项目配置
│ │ ├── __init__.py
│ │ ├── settings/
│ │ │ ├── __init__.py
│ │ │ ├── base.py # 公共配置
│ │ │ └── dev.py # 开发覆盖(DB 用 SQLite)
│ │ ├── urls.py # 路由
│ │ └── wsgi.py
│ └── openclaw/ # Django app
│ ├── __init__.py
│ ├── apps.py
│ ├── models.py # Session, Message, ToolCall
│ ├── views.py # API views
│ ├── admin.py # Admin 自定义
│ ├── services.py # 批量写入 + Markdown 导出
│ ├── urls.py # app 路由
│ └── templates/
│ └── admin/
│ └── openclaw/
│ └── daily_view.html
├── tests/
│ ├── conftest.py # pytest fixtures
│ ├── test_models.py # 模型测试
│ ├── test_api.py # API 测试
│ ├── test_services.py # 服务层测试
│ └── test_admin_export.py # 导出测试
├── scripts/
│ └── sync_sessions.py # 客户端解析与推送脚本
└── docs/
├── specs/ # Spec 文档
└── plans/ # 实施计划
Task 0: 项目骨架与测试基础
Files:
-
Create:
pyproject.toml,manage.py,requirements.txt,requirements-dev.txt -
Create:
src/config/__init__.py,src/config/settings/__init__.py,src/config/settings/base.py,src/config/settings/dev.py,src/config/urls.py,src/config/wsgi.py -
Create:
src/openclaw/__init__.py,src/openclaw/apps.py -
Create:
tests/conftest.py -
Create:
.gitignore,.env.example -
Step 1: 依赖文件
requirements.txt:
Django>=5.0,<6.0
djangorestframework>=3.15,<4.0
psycopg[binary]>=3.1,<4.0
gunicorn>=22.0,<24.0
requirements-dev.txt:
-r requirements.txt
pytest>=8.0,<9.0
pytest-django>=4.8,<5.0
- Step 2: pytest 配置
pyproject.toml:
[tool.pytest.ini_options]
DJANGO_SETTINGS_MODULE = "config.settings.dev"
python_files = ["tests.py", "test_*.py"]
pythonpath = ["src"]
- Step 3: Django settings
src/config/settings/__init__.py:
from .base import * # noqa: F401,F403
src/config/settings/base.py:
import os
SECRET_KEY = os.environ.get("DJANGO_SECRET_KEY", "dev-secret-key")
DEBUG = False
ALLOWED_HOSTS = os.environ.get("DJANGO_ALLOWED_HOSTS", "*").split(",")
INSTALLED_APPS = [
"django.contrib.admin",
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.messages",
"django.contrib.staticfiles",
"rest_framework",
"openclaw",
]
MIDDLEWARE = [
"django.middleware.security.SecurityMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"django.middleware.common.CommonMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"django.middleware.clickjacking.XFrameOptionsMiddleware",
]
ROOT_URLCONF = "config.urls"
TEMPLATES = [
{
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [],
"APP_DIRS": True,
"OPTIONS": {
"context_processors": [
"django.template.context_processors.debug",
"django.template.context_processors.request",
"django.contrib.auth.context_processors.auth",
"django.contrib.messages.context_processors.messages",
],
},
},
]
DATABASES = {
"default": {
"ENGINE": "django.db.backends.postgresql",
"HOST": os.environ.get("DB_HOST", "localhost"),
"PORT": os.environ.get("DB_PORT", "5432"),
"NAME": os.environ.get("DB_NAME", "openclaw_archive"),
"USER": os.environ.get("DB_USER", "openclaw"),
"PASSWORD": os.environ.get("DB_PASSWORD", ""),
}
}
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
STATIC_URL = "static/"
src/config/settings/dev.py:
from .base import * # noqa: F401,F403
DEBUG = True
DATABASES = {
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": BASE_DIR / "db.sqlite3", # noqa: F821
}
}
- Step 4: Django 基础文件
src/config/urls.py:
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path("admin/", admin.site.urls),
path("api/", include("openclaw.urls")),
]
src/config/wsgi.py:
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
application = get_wsgi_application()
src/openclaw/apps.py:
from django.apps import AppConfig
class OpenclawConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "openclaw"
- Step 5: manage.py
manage.py:
#!/usr/bin/env python
import os
import sys
def main():
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. "
"Are you sure it's installed and available on your PYTHONPATH "
f"environment variable is set? Did you forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == "__main__":
main()
- Step 6: 测试基础
tests/conftest.py:
import pytest
@pytest.fixture(autouse=True)
def media_storage(settings, tmpdir):
settings.MEDIA_ROOT = str(tmpdir)
- Step 7: .gitignore 和 .env.example
.gitignore:
__pycache__/
*.py[cod]
*.so
*.egg-info/
dist/
.venv/
db.sqlite3
.env
*.sqlite3
.env.example:
# Django
DJANGO_SECRET_KEY=CHANGE_ME
DJANGO_PORT=8000
DJANGO_ALLOWED_HOSTS=*
# Database (NAS)
DB_HOST=192.168.x.x
DB_PORT=5432
DB_NAME=openclaw_archive
DB_USER=openclaw
DB_PASSWORD=CHANGE_ME
- Step 8: 安装依赖并验证 Django 启动
pip install -r requirements-dev.txt
cd src && python -c "import django; django.setup(); print('Django OK')"
cd ..
pytest --collect-only
Expected: Django OK and test collection passes (0 tests).
- Step 9: Commit
git add .
git commit -m "feat: Django project skeleton with pytest setup"
Task 1: 数据模型
Files:
-
Create:
src/openclaw/models.py -
Create:
tests/test_models.py -
Step 1: 写模型测试
tests/test_models.py:
from datetime import datetime, timezone
import pytest
from openclaw.models import Session, Message, ToolCall
@pytest.mark.django_db
class TestModelFields:
def test_session_creation(self):
s = Session.objects.create(
session_id="a" * 36,
agent_name="xingyao",
source_node="macmini",
status="active",
)
assert s.session_id == "a" * 36
assert s.total_tokens == 0
assert s.message_count == 0
def test_message_creation(self):
s = Session.objects.create(
session_id="b" * 36,
agent_name="test",
source_node="ubuntu1",
status="active",
)
msg = Message.objects.create(
session=s,
message_id="msg-001",
parent_id="root",
role="assistant",
timestamp=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc),
)
assert msg.role == "assistant"
assert msg.tokens_total == 0
def test_toolcall_creation(self):
s = Session.objects.create(
session_id="c" * 36,
agent_name="test",
source_node="ubuntu2",
status="active",
)
msg = Message.objects.create(
session=s,
message_id="msg-002",
parent_id="root",
role="assistant",
timestamp=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc),
)
tc = ToolCall.objects.create(
session=s,
message=msg,
tool_call_id="call_0",
tool_name="exec",
)
assert tc.tool_name == "exec"
assert tc.is_error is False
- Step 2: 运行测试确认失败
pytest tests/test_models.py -v
Expected: FAIL — Model not found.
- Step 3: 实现模型
src/openclaw/models.py:
from django.db import models
class Session(models.Model):
session_id = models.CharField(max_length=64)
agent_name = models.CharField(max_length=128)
source_node = models.CharField(max_length=64)
session_version = models.IntegerField(default=0)
model_provider = models.CharField(max_length=64, blank=True, default="")
model_id = models.CharField(max_length=128, blank=True, default="")
thinking_level = models.CharField(max_length=64, blank=True, default="")
start_time = models.DateTimeField(null=True, blank=True)
end_time = models.DateTimeField(null=True, blank=True)
cwd = models.CharField(max_length=512, blank=True, default="")
total_tokens = models.IntegerField(default=0)
total_cost = models.FloatField(default=0.0)
message_count = models.IntegerField(default=0)
tool_call_count = models.IntegerField(default=0)
error_count = models.IntegerField(default=0)
raw_file_path = models.TextField(blank=True, default="")
pushed_at = models.DateTimeField(null=True, blank=True)
status = models.CharField(max_length=16, default="active")
metadata = models.JSONField(default=dict, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = "sessions"
unique_together = ("session_id", "agent_name")
ordering = ["-start_time"]
def __str__(self):
return f"Session({self.session_id} {self.agent_name})"
class Message(models.Model):
session = models.ForeignKey(
Session, on_delete=models.CASCADE, related_name="messages"
)
message_id = models.CharField(max_length=128)
parent_id = models.CharField(max_length=128, blank=True, default="")
seq = models.IntegerField(default=0)
role = models.CharField(max_length=32)
content_text = models.TextField(blank=True, default="")
raw_content = models.JSONField(default=list, blank=True)
raw_message = models.JSONField(default=dict, blank=True)
timestamp = models.DateTimeField()
# assistant 专用
model = models.CharField(max_length=128, blank=True, default="")
provider = models.CharField(max_length=64, blank=True, default="")
stop_reason = models.CharField(max_length=64, blank=True, default="")
tokens_input = models.IntegerField(default=0)
tokens_output = models.IntegerField(default=0)
tokens_cache_read = models.IntegerField(default=0)
tokens_cache_write = models.IntegerField(default=0)
tokens_total = models.IntegerField(default=0)
cost_total = models.FloatField(default=0.0)
# toolResult 专用
tool_call_id = models.CharField(max_length=128, blank=True, default="")
tool_name = models.CharField(max_length=128, blank=True, default="")
is_error = models.BooleanField(default=False)
exit_code = models.IntegerField(null=True, blank=True)
duration_ms = models.IntegerField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = "messages"
ordering = ["seq"]
def __str__(self):
return f"Message({self.message_id} {self.role})"
class ToolCall(models.Model):
session = models.ForeignKey(
Session, on_delete=models.CASCADE, related_name="tool_calls"
)
message = models.ForeignKey(
Message, on_delete=models.CASCADE, related_name="tool_calls"
)
tool_call_id = models.CharField(max_length=128)
tool_name = models.CharField(max_length=128)
arguments = models.JSONField(default=dict, blank=True)
result_text = models.TextField(blank=True, default="")
is_error = models.BooleanField(default=False)
exit_code = models.IntegerField(null=True, blank=True)
duration_ms = models.IntegerField(null=True, blank=True)
seq = models.IntegerField(default=0)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = "tool_calls"
ordering = ["seq"]
def __str__(self):
return f"ToolCall({self.tool_name} {self.tool_call_id})"
- Step 4: make migrations + migrate
cd src && python manage.py makemigrations openclaw
python manage.py migrate
cd ..
- Step 5: 重新运行测试确认通过
pytest tests/test_models.py -v
Expected: 3 PASS.
- Step 6: Commit
git add src/openclaw/models.py tests/test_models.py src/openclaw/migrations/ pyproject.toml
git commit -m "feat: add Session, Message, ToolCall models"
Task 2: 批量写入 API
Files:
-
Create:
src/openclaw/urls.py -
Create:
src/openclaw/views.py -
Create:
src/openclaw/services.py -
Create:
tests/test_api.py -
Create:
tests/test_services.py -
Step 1: 写服务层测试(BulkUpsertService)
tests/test_services.py:
from datetime import datetime, timezone
import pytest
from openclaw.models import Session, Message, ToolCall
from openclaw.services import BulkUpsertService
@pytest.mark.django_db
class TestBulkUpsertService:
def _new_session_payload(self):
return {
"agent_name": "test-agent",
"source_node": "macmini",
"sessions": [
{
"session_id": "sess-001",
"session_version": 1,
"model_provider": "anthropic",
"model_id": "claude-sonnet-4-6",
"thinking_level": "high",
"start_time": "2026-04-05T10:00:00Z",
"end_time": "2026-04-05T10:30:00Z",
"cwd": "/tmp/test",
"total_tokens": 5000,
"total_cost": 0.12,
"message_count": 10,
"tool_call_count": 3,
"error_count": 0,
"raw_file_path": "/path/to/sessions/sess-001.jsonl",
"status": "active",
"metadata": {},
}
],
"messages": [
{
"session_id": "sess-001",
"message_id": "msg-001",
"parent_id": "root",
"seq": 0,
"role": "user",
"content_text": "Hello",
"raw_content": [{"type": "text", "text": "Hello"}],
"raw_message": {},
"timestamp": "2026-04-05T10:05:00Z",
}
],
"tool_calls": [],
}
def test_upsert_new_session(self):
result = BulkUpsertService.upsert(self._new_session_payload())
assert result["sessions_upserted"] == 1
assert Session.objects.get(session_id="sess-001")
def test_upsert_idempotent(self):
BulkUpsertService.upsert(self._new_session_payload())
result = BulkUpsertService.upsert(self._new_session_payload())
# second push should not create duplicate
assert Session.objects.filter(session_id="sess-001").count() == 1
assert result["sessions_upserted"] == 0 # skipped
def test_upsert_with_messages_and_toolcalls(self):
payload = self._new_session_payload()
payload["messages"].append(
{
"session_id": "sess-001",
"message_id": "msg-002",
"parent_id": "msg-001",
"seq": 1,
"role": "assistant",
"content_text": "Hi there",
"raw_content": [{"type": "text", "text": "Hi there"}],
"raw_message": {},
"timestamp": "2026-04-05T10:06:00Z",
"model": "claude-sonnet-4-6",
"provider": "anthropic",
"stop_reason": "end_turn",
"tokens_input": 100,
"tokens_output": 50,
"tokens_total": 150,
"cost_total": 0.01,
}
)
payload["tool_calls"].append(
{
"session_id": "sess-001",
"message_id": "msg-002",
"tool_call_id": "call_0",
"tool_name": "exec",
"arguments": {"command": "ls"},
"result_text": "file.txt",
"is_error": False,
"exit_code": 0,
"duration_ms": 200,
"seq": 0,
}
)
result = BulkUpsertService.upsert(payload)
assert result["messages_upserted"] == 2
assert result["tool_calls_upserted"] == 1
assert ToolCall.objects.get(tool_call_id="call_0").tool_name == "exec"
- Step 2: 运行测试确认失败
pytest tests/test_services.py -v
Expected: FAIL — module not found.
- Step 3: 实现 BulkUpsertService
src/openclaw/services.py:
from datetime import datetime, timezone
from django.db import transaction
from openclaw.models import Session, Message, ToolCall
def _parse_ts(value):
if not value:
return None
if isinstance(value, str):
# Handle ISO 8601 Z suffix
value = value.replace("Z", "+00:00")
return datetime.fromisoformat(value)
return value
class BulkUpsertService:
@staticmethod
@transaction.atomic
def upsert(payload):
agent_name = payload["agent_name"]
source_node = payload["source_node"]
sessions_data = payload.get("sessions", [])
messages_data = payload.get("messages", [])
tool_calls_data = payload.get("tool_calls", [])
sessions_upserted = 0
messages_upserted = 0
tool_calls_upserted = 0
for sess in sessions_data:
session_id = sess["session_id"]
defaults = {
"source_node": source_node,
"session_version": sess.get("session_version", 0),
"model_provider": sess.get("model_provider", ""),
"model_id": sess.get("model_id", ""),
"thinking_level": sess.get("thinking_level", ""),
"start_time": _parse_ts(sess.get("start_time")),
"end_time": _parse_ts(sess.get("end_time")),
"cwd": sess.get("cwd", ""),
"total_tokens": sess.get("total_tokens", 0),
"total_cost": sess.get("total_cost", 0.0),
"message_count": sess.get("message_count", 0),
"tool_call_count": sess.get("tool_call_count", 0),
"error_count": sess.get("error_count", 0),
"raw_file_path": sess.get("raw_file_path", ""),
"pushed_at": datetime.now(timezone.utc),
"status": sess.get("status", "active"),
"metadata": sess.get("metadata", {}),
}
_, created = Session.objects.update_or_create(
session_id=session_id,
agent_name=agent_name,
defaults=defaults,
)
if created:
sessions_upserted += 1
# Build session lookup: session_id -> Session instance
session_ids = {s["session_id"] for s in sessions_data}
session_lookup = {
s.session_id: s
for s in Session.objects.filter(
session_id__in=session_ids, agent_name=agent_name
)
}
# Upsert messages
for msg in messages_data:
session = session_lookup.get(msg["session_id"])
if not session:
continue
defaults = {
"parent_id": msg.get("parent_id", ""),
"seq": msg.get("seq", 0),
"role": msg.get("role", ""),
"content_text": msg.get("content_text", ""),
"raw_content": msg.get("raw_content", []),
"raw_message": msg.get("raw_message", {}),
"timestamp": _parse_ts(msg.get("timestamp")),
"model": msg.get("model", ""),
"provider": msg.get("provider", ""),
"stop_reason": msg.get("stop_reason", ""),
"tokens_input": msg.get("tokens_input", 0),
"tokens_output": msg.get("tokens_output", 0),
"tokens_cache_read": msg.get("tokens_cache_read", 0),
"tokens_cache_write": msg.get("tokens_cache_write", 0),
"tokens_total": msg.get("tokens_total", 0),
"cost_total": msg.get("cost_total", 0.0),
"tool_call_id": msg.get("tool_call_id", ""),
"tool_name": msg.get("tool_name", ""),
"is_error": msg.get("is_error", False),
"exit_code": msg.get("exit_code"),
"duration_ms": msg.get("duration_ms"),
}
Message.objects.update_or_create(
session=session,
message_id=msg["message_id"],
defaults=defaults,
)
messages_upserted += 1
# Build message lookup: message_id -> Message instance
msg_lookup = {
m.message_id: m
for m in Message.objects.filter(session__in=session_lookup.values())
}
# Upsert tool_calls
for tc in tool_calls_data:
session = session_lookup.get(tc["session_id"])
message = msg_lookup.get(tc["message_id"])
if not session or not message:
continue
ToolCall.objects.update_or_create(
session=session,
message=message,
tool_call_id=tc["tool_call_id"],
defaults={
"tool_name": tc.get("tool_name", ""),
"arguments": tc.get("arguments", {}),
"result_text": tc.get("result_text", ""),
"is_error": tc.get("is_error", False),
"exit_code": tc.get("exit_code"),
"duration_ms": tc.get("duration_ms"),
"seq": tc.get("seq", 0),
},
)
tool_calls_upserted += 1
return {
"sessions_upserted": sessions_upserted,
"messages_upserted": messages_upserted,
"tool_calls_upserted": tool_calls_upserted,
}
- Step 4: 运行服务层测试
pytest tests/test_services.py -v
Expected: 3 PASS.
- Step 5: 写 API 测试
tests/test_api.py:
import pytest
from openclaw.models import Session
BULK_URL = "/api/sessions/bulk_upsert/"
def _minimal_payload():
return {
"agent_name": "test",
"source_node": "macmini",
"sessions": [
{
"session_id": "test-session",
"model_provider": "test",
"model_id": "test-model",
}
],
"messages": [],
"tool_calls": [],
}
@pytest.mark.django_db
class TestBulkUpsertAPI:
def test_bulk_upsert_ok(self, client):
resp = client.post(BULK_URL, _minimal_payload(), content_type="application/json")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"
assert data["sessions_upserted"] == 1
def test_bulk_upsert_idempotent(self, client):
client.post(BULK_URL, _minimal_payload(), content_type="application/json")
resp = client.post(BULK_URL, _minimal_payload(), content_type="application/json")
data = resp.json()
assert data["sessions_upserted"] == 0
def test_bulk_upsert_missing_fields_returns_400(self, client):
resp = client.post(BULK_URL, {}, content_type="application/json")
assert resp.status_code == 400
- Step 6: 运行 API 测试确认失败
pytest tests/test_api.py -v
Expected: FAIL — URL not found.
- Step 7: 实现 API view + 路由
src/openclaw/urls.py:
from django.urls import path
from openclaw.views import sessions_bulk_upsert
urlpatterns = [
path("sessions/bulk_upsert/", sessions_bulk_upsert, name="sessions_bulk_upsert"),
]
src/openclaw/views.py:
import json
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from openclaw.services import BulkUpsertService
@csrf_exempt
@require_http_methods(["POST"])
def sessions_bulk_upsert(request):
try:
payload = json.loads(request.body)
except json.JSONDecodeError:
return JsonResponse({"error": "Invalid JSON"}, status=400)
if "agent_name" not in payload or "source_node" not in payload:
return JsonResponse(
{"error": "Missing agent_name or source_node"}, status=400
)
if "sessions" not in payload:
return JsonResponse({"error": "Missing sessions"}, status=400)
result = BulkUpsertService.upsert(payload)
return JsonResponse({"status": "ok", **result})
- Step 8: 运行 API 测试确认通过
pytest tests/test_api.py -v
Expected: 3 PASS.
- Step 9: 运行所有测试
pytest -v
Expected: ALL PASS (3 models + 3 services + 3 api = 9).
- Step 10: Commit
git add src/openclaw/services.py src/openclaw/views.py src/openclaw/urls.py tests/test_services.py tests/test_api.py
git commit -m "feat: bulk upsert API with idempotent writes"
Task 3: Django Admin 基础配置
Files:
-
Create:
src/openclaw/admin.py -
Step 1: Admin 配置
src/openclaw/admin.py:
from django.contrib import admin
from openclaw.models import Session, Message, ToolCall
class MessageInline(admin.TabularInline):
model = Message
extra = 0
fields = ("seq", "role", "content_text", "timestamp")
readonly_fields = ("seq", "role", "content_text", "timestamp")
def has_add_permission(self, request, obj=None):
return False
def has_delete_permission(self, request, obj=None):
return False
class ToolCallInline(admin.TabularInline):
model = ToolCall
extra = 0
fields = ("seq", "tool_name", "tool_call_id", "is_error", "duration_ms")
readonly_fields = ("seq", "tool_name", "tool_call_id", "is_error", "duration_ms")
def has_add_permission(self, request, obj=None):
return False
def has_delete_permission(self, request, obj=None):
return False
@admin.register(Session)
class SessionAdmin(admin.ModelAdmin):
list_display = (
"session_id",
"agent_name",
"model_id",
"total_tokens",
"message_count",
"start_time",
)
list_filter = ("agent_name", "source_node", "model_id", "start_time")
search_fields = ("session_id", "cwd")
ordering = ("-start_time",)
inlines = [MessageInline, ToolCallInline]
readonly_fields = (
"session_id",
"agent_name",
"source_node",
"start_time",
"end_time",
"pushed_at",
)
@admin.register(Message)
class MessageAdmin(admin.ModelAdmin):
list_display = ("message_id", "session", "role", "timestamp", "tokens_total")
list_filter = ("role", "model", "timestamp")
search_fields = ("content_text",)
ordering = ("-timestamp",)
@admin.register(ToolCall)
class ToolCallAdmin(admin.ModelAdmin):
list_display = ("tool_name", "tool_call_id", "session", "is_error", "duration_ms")
list_filter = ("tool_name", "is_error", "exit_code")
ordering = ("-created_at",)
- Step 2: 手动验证 Admin
cd src && python manage.py migrate && python manage.py createsuperuser
cd src && python manage.py runserver
访问 http://localhost:8000/admin/ 确认三张表在 Admin 中可见。
- Step 3: Commit
git add src/openclaw/admin.py
git commit -m "feat: Django Admin for Session, Message, ToolCall with inlines"
Task 4: Admin 自定义视图 — 按日期查询对话
Files:
-
Modify:
src/openclaw/admin.py -
Create:
src/openclaw/templates/admin/openclaw/daily_view.html -
Modify:
src/openclaw/urls.py(admin 路由由 admin.site 管理,不需要改) -
Step 1: 模板
src/openclaw/templates/admin/openclaw/daily_view.html:
{% extends "admin/base_site.html" %}
{% block content %}
<h1>Daily Conversation View</h1>
<form method="get">
<label>Start date: <input type="date" name="start" value="{{ start_date }}" /></label>
<label>End date: <input type="date" name="end" value="{{ end_date }}" /></label>
<label>Agent:
<select name="agent">
<option value="">All</option>
{% for agent in agents %}
<option value="{{ agent }}" {% if agent == selected_agent %}selected{% endif %}>{{ agent }}</option>
{% endfor %}
</select>
</label>
<button type="submit">Search</button>
</form>
{% if sessions %}
{% for session in sessions %}
<div class="session-block" style="margin-top: 2em; border: 1px solid #ddd; padding: 1em;">
<h2>Session: {{ session.session_id }} ({{ session.agent_name }})</h2>
<p>Model: {{ session.model_id }} | Tokens: {{ session.total_tokens }} |
Start: {{ session.start_time|default:"N/A" }}</p>
{% for msg in session.messages %}
<div class="message" data-role="{{ msg.role }}" style="padding: 0.5em; margin: 0.3em 0; border-left: 3px solid {% if msg.role == 'user' %}#4CAF50{% elif msg.role == 'assistant' %}#2196F3{% else %}#FF9800{% endif %};">
<strong>{{ msg.timestamp|date:"H:i" }} {{ msg.get_role_label }}</strong>
{% if msg.role == 'toolResult' %}
{% if msg.tool_name %}<em>[Tool: {{ msg.tool_name }}]</em>{% endif %}
{% endif %}
<details>
<summary>Content</summary>
<pre style="white-space: pre-wrap; word-break: break-word;">{{ msg.content_text|default:"(empty)" }}</pre>
</details>
</div>
{% empty %}
<p>No messages.</p>
{% endfor %}
</div>
{% endfor %}
{% endif %}
{% endblock %}
- Step 2: Admin 视图
添加到 src/openclaw/admin.py 末尾:
from datetime import date
from django.shortcuts import render
from django.db.models import Prefetch
class DailyConversationView(admin.ModelAdmin):
"""Admin custom view for date-range conversation browsing."""
def get_urls(self):
from django.urls import path
urls = super().get_urls()
custom_urls = [
path("daily/", self.admin_site.admin_view(self.daily_view), name="openclaw_daily"),
]
return custom_urls + urls
def daily_view(self, request):
start_str = request.GET.get("start")
end_str = request.GET.get("end")
agent_filter = request.GET.get("agent", "")
start_date = start_str if start_str else date.today().isoformat()
end_date = end_str if end_str else date.today().isoformat()
agents = list(
Session.objects.values_list("agent_name", flat=True)
.distinct()
.order_by("agent_name")
)
sessions_qs = Session.objects.filter(
start_time__date__gte=start_date,
start_time__date__lte=end_date,
).order_by("start_time")
if agent_filter:
sessions_qs = sessions_qs.filter(agent_name=agent_filter)
messages_prefetch = Prefetch(
"messages",
queryset=Message.objects.order_by("seq"),
)
sessions_qs = sessions_qs.prefetch_related(messages_prefetch)
session_list = []
for session in sessions_qs:
messages = []
for msg in session.messages.all():
messages.append({
"timestamp": msg.timestamp,
"role": msg.role,
"content_text": msg.content_text,
"tool_name": msg.tool_name,
"get_role_label": self._role_label(msg.role),
})
session_list.append({
"session_id": session.session_id,
"agent_name": session.agent_name,
"model_id": session.model_id,
"total_tokens": session.total_tokens,
"start_time": session.start_time,
"messages": messages,
})
context = dict(
self.admin_site.each_context(request),
start_date=start_date,
end_date=end_date,
selected_agent=agent_filter,
agents=agents,
sessions=session_list,
title="Daily Conversation View",
)
return render(request, "admin/openclaw/daily_view.html", context)
@staticmethod
def _role_label(role):
labels = {
"user": "User",
"assistant": "Assistant",
"toolResult": "Tool Result",
}
return labels.get(role, role)
# Register Daily view via SessionAdmin:
# In SessionAdmin inheritance, add daily_view url.
# Actually, the cleanest way: create a standalone admin view.
等等 — DailyConversationView 继承自 ModelAdmin 但并不需要绑定到一个 model。正确做法是用一个独立 view 函数注册到 admin。让我重写:
# Add this function to src/openclaw/admin.py
from django.template.response import TemplateResponse
def daily_conversation_view(request):
"""Admin standalone view for date-range conversation browsing."""
from datetime import date
from django.db.models import Prefetch
from openclaw.models import Session, Message
start_str = request.GET.get("start")
end_str = request.GET.get("end")
agent_filter = request.GET.get("agent", "")
start_date = start_str if start_str else date.today().isoformat()
end_date = end_str if end_str else date.today().isoformat()
agents = list(
Session.objects.values_list("agent_name", flat=True)
.distinct()
.order_by("agent_name")
)
sessions_qs = Session.objects.filter(
start_time__date__gte=start_date,
start_time__date__lte=end_date,
).order_by("start_time")
if agent_filter:
sessions_qs = sessions_qs.filter(agent_name=agent_filter)
messages_prefetch = Prefetch(
"messages",
queryset=Message.objects.order_by("seq"),
)
sessions_qs = sessions_qs.prefetch_related(messages_prefetch)
role_labels = {
"user": "User",
"assistant": "Assistant",
"toolResult": "Tool Result",
}
session_list = []
for session in sessions_qs:
messages = []
for msg in session.messages.all():
messages.append({
"timestamp": msg.timestamp,
"role": msg.role,
"content_text": msg.content_text,
"tool_name": msg.tool_name,
"get_role_label": role_labels.get(msg.role, msg.role),
})
session_list.append({
"session_id": session.session_id,
"agent_name": session.agent_name,
"model_id": session.model_id,
"total_tokens": session.total_tokens,
"start_time": session.start_time,
"messages": messages,
})
context = {
**admin.site.each_context(request),
"start_date": start_date,
"end_date": end_date,
"selected_agent": agent_filter,
"agents": agents,
"sessions": session_list,
"title": "Daily Conversation View",
}
return TemplateResponse(request, "admin/openclaw/daily_view.html", context)
# Add URL in SessionAdmin or as standalone:
# Register the URL via custom admin urlpattern.
class CustomAdminSite(admin.AdminSite):
def get_urls(self):
from django.urls import path
urls = super().get_urls()
urls += [
path("openclaw/daily/", daily_conversation_view, name="openclaw_daily"),
]
return urls
# Then swap admin.site in config/urls.py and manage.py
Actually, this is getting complicated with swapping the admin site. The simpler approach is to add the URL through an existing ModelAdmin's get_urls. Let me rewrite this section cleanly:
添加到 src/openclaw/admin.py(接在 ToolCallAdmin 后面):
from django.template.response import TemplateResponse
from datetime import date
from django.db.models import Prefetch
def daily_conversation_view(self, request):
"""Admin standalone view for date-range conversation browsing."""
start_str = request.GET.get("start")
end_str = request.GET.get("end")
agent_filter = request.GET.get("agent", "")
start_date = start_str if start_str else date.today().isoformat()
end_date = end_str if end_str else date.today().isoformat()
agents = list(
Session.objects.values_list("agent_name", flat=True)
.distinct()
.order_by("agent_name")
)
sessions_qs = Session.objects.filter(
start_time__date__gte=start_date,
start_time__date__lte=end_date,
).order_by("start_time")
if agent_filter:
sessions_qs = sessions_qs.filter(agent_name=agent_filter)
messages_prefetch = Prefetch(
"messages",
queryset=Message.objects.order_by("seq"),
)
sessions_qs = sessions_qs.prefetch_related(messages_prefetch)
role_labels = {
"user": "User",
"assistant": "Assistant",
"toolResult": "Tool Result",
}
session_list = []
for session in sessions_qs:
messages = []
for msg in session.messages.all():
messages.append({
"timestamp": msg.timestamp,
"role": msg.role,
"content_text": msg.content_text,
"tool_name": msg.tool_name,
"get_role_label": role_labels.get(msg.role, msg.role),
})
session_list.append({
"session_id": session.session_id,
"agent_name": session.agent_name,
"model_id": session.model_id,
"total_tokens": session.total_tokens,
"start_time": session.start_time,
"messages": messages,
})
context = {
**admin.site.each_context(request),
"start_date": start_date,
"end_date": end_date,
"selected_agent": agent_filter,
"agents": agents,
"sessions": session_list,
"title": "Daily Conversation View",
}
return TemplateResponse(request, "admin/openclaw/daily_view.html", context)
# Add URL through SessionAdmin get_urls
class SessionAdmin(admin.ModelAdmin):
# ... (as defined above)
...
def get_urls(self):
from django.urls import path
urls = super().get_urls()
custom_urls = [
path("daily/", admin.site.admin_view(daily_conversation_view), name="openclaw_daily"),
]
return custom_urls + urls
好的,为了避免重复定义 SessionAdmin,最终 src/openclaw/admin.py 完整代码如下:
from django.contrib import admin
from django.template.response import TemplateResponse
from datetime import date
from django.db.models import Prefetch
from openclaw.models import Session, Message, ToolCall
class MessageInline(admin.TabularInline):
model = Message
extra = 0
fields = ("seq", "role", "content_text", "timestamp")
readonly_fields = ("seq", "role", "content_text", "timestamp")
def has_add_permission(self, request, obj=None):
return False
def has_delete_permission(self, request, obj=None):
return False
class ToolCallInline(admin.TabularInline):
model = ToolCall
extra = 0
fields = ("seq", "tool_name", "tool_call_id", "is_error", "duration_ms")
readonly_fields = ("seq", "tool_name", "tool_call_id", "is_error", "duration_ms")
def has_add_permission(self, request, obj=None):
return False
def has_delete_permission(self, request, obj=None):
return False
def daily_conversation_view(request):
"""Admin standalone view for date-range conversation browsing."""
start_str = request.GET.get("start")
end_str = request.GET.get("end")
agent_filter = request.GET.get("agent", "")
start_date = start_str if start_str else date.today().isoformat()
end_date = end_str if end_str else date.today().isoformat()
agents = list(
Session.objects.values_list("agent_name", flat=True)
.distinct()
.order_by("agent_name")
)
sessions_qs = Session.objects.filter(
start_time__date__gte=start_date,
start_time__date__lte=end_date,
).order_by("start_time")
if agent_filter:
sessions_qs = sessions_qs.filter(agent_name=agent_filter)
messages_prefetch = Prefetch(
"messages",
queryset=Message.objects.order_by("seq"),
)
sessions_qs = sessions_qs.prefetch_related(messages_prefetch)
role_labels = {
"user": "User",
"assistant": "Assistant",
"toolResult": "Tool Result",
}
session_list = []
for session in sessions_qs:
messages = []
for msg in session.messages.all():
messages.append({
"timestamp": msg.timestamp,
"role": msg.role,
"content_text": msg.content_text,
"tool_name": msg.tool_name,
"get_role_label": role_labels.get(msg.role, msg.role),
})
session_list.append({
"session_id": session.session_id,
"agent_name": session.agent_name,
"model_id": session.model_id,
"total_tokens": session.total_tokens,
"start_time": session.start_time,
"messages": messages,
})
context = {
**admin.site.each_context(request),
"start_date": start_date,
"end_date": end_date,
"selected_agent": agent_filter,
"agents": agents,
"sessions": session_list,
"title": "Daily Conversation View",
}
return TemplateResponse(request, "admin/openclaw/daily_view.html", context)
@admin.register(Session)
class SessionAdmin(admin.ModelAdmin):
list_display = (
"session_id",
"agent_name",
"model_id",
"total_tokens",
"message_count",
"start_time",
)
list_filter = ("agent_name", "source_node", "model_id", "start_time")
search_fields = ("session_id", "cwd")
ordering = ("-start_time",)
inlines = [MessageInline, ToolCallInline]
readonly_fields = (
"session_id",
"agent_name",
"source_node",
"start_time",
"end_time",
"pushed_at",
)
def get_urls(self):
from django.urls import path
urls = super().get_urls()
custom_urls = [
path("daily/", admin.site.admin_view(daily_conversation_view), name="openclaw_daily"),
]
return custom_urls + urls
@admin.register(Message)
class MessageAdmin(admin.ModelAdmin):
list_display = ("message_id", "session", "role", "timestamp", "tokens_total")
list_filter = ("role", "model", "timestamp")
search_fields = ("content_text",)
ordering = ("-timestamp",)
@admin.register(ToolCall)
class ToolCallAdmin(admin.ModelAdmin):
list_display = ("tool_name", "tool_call_id", "session", "is_error", "duration_ms")
list_filter = ("tool_name", "is_error", "exit_code")
ordering = ("-created_at",)
- Step 3: 添加 Admin 导航链接
在 SessionAdmin 的 list_display 上方加一个自定义链接按钮(方便入口),或者在模板侧边栏直接访问 /admin/sessions/session/daily/。
- Step 4: 手动验证
cd src && python manage.py runserver
-
注册一个 session + message 测试数据
-
http://localhost:8000/admin/openclaw/session/daily/确认自定义视图可访问 -
选择日期范围,确认显示 session 列表和对话
-
Step 5: Commit
git add src/openclaw/admin.py src/openclaw/templates/
git commit -m "feat: admin daily conversation view with date filtering"
Task 5: Daily Markdown 导出(Admin Action)
Files:
-
Create:
src/openclaw/export.py -
Create:
tests/test_admin_export.py -
Step 1: 写导出测试
tests/test_admin_export.py:
"""Tests for Markdown export functionality."""
from datetime import datetime, timezone
import pytest
from openclaw.models import Session, Message
from openclaw.export import generate_markdown_report
@pytest.fixture
def db_session(db):
s = Session.objects.create(
session_id="report-test",
agent_name="xingyao",
source_node="macmini",
model_provider="anthropic",
model_id="claude-sonnet-4-6",
total_tokens=45230,
start_time=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc),
)
Message.objects.create(
session=s,
message_id="m1",
parent_id="root",
role="user",
content_text="Help me fix this bug",
timestamp=datetime(2026, 4, 5, 10, 23, tzinfo=timezone.utc),
)
Message.objects.create(
session=s,
message_id="m2",
parent_id="m1",
role="assistant",
content_text="The bug is on line 45...",
timestamp=datetime(2026, 4, 5, 10, 23, 30, tzinfo=timezone.utc),
)
return s
@pytest.mark.django_db
class TestMarkdownExport:
def test_basic_report(self, db_session):
md = generate_markdown_report(
messages=db_session.messages.order_by("created_at"),
date_str="2026-04-05",
)
assert "# Daily Report: 2026-04-05" in md
assert "Help me fix this bug" in md
assert "The bug is on line 45..." in md
def test_thinking_content_stripped(self, db):
s = Session.objects.create(
session_id="thinking-test",
agent_name="test",
source_node="macmini",
start_time=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc),
)
Message.objects.create(
session=s,
message_id="m3",
parent_id="root",
role="assistant",
content_text="Final answer",
raw_content=[
{"type": "thinking", "thinking": "Let me think about this..."},
{"type": "text", "text": "Final answer"},
],
timestamp=datetime(2026, 4, 5, 10, 30, tzinfo=timezone.utc),
)
md = generate_markdown_report(
messages=s.messages.order_by("created_at"),
date_str="2026-04-05",
)
assert "Let me think about this..." not in md
assert "Final answer" in md
def test_tool_call_formatting(self, db):
s = Session.objects.create(
session_id="tool-test",
agent_name="test",
source_node="macmini",
model_id="test-model",
total_tokens=100,
start_time=datetime(2026, 4, 5, 10, 0, tzinfo=timezone.utc),
)
Message.objects.create(
session=s,
message_id="m4",
parent_id="root",
role="assistant",
content_text="I'll run a command",
raw_content=[
{"type": "text", "text": "I'll run a command"},
],
timestamp=datetime(2026, 4, 5, 10, 30, tzinfo=timezone.utc),
)
md = generate_markdown_report(
messages=s.messages.order_by("created_at"),
date_str="2026-04-05",
)
assert "test-model" in md
assert "100" in md
- Step 2: 运行测试确认失败
pytest tests/test_admin_export.py -v
Expected: FAIL — module not found.
- Step 3: 实现 Markdown 导出
src/openclaw/export.py:
from openclaw.models import Session
def generate_markdown_report(messages, date_str, sessions=None):
"""Generate a markdown daily report.
Args:
messages: QuerySet of Message objects, ordered by timestamp.
date_str: Date string for the report header (YYYY-MM-DD).
sessions: Optional dict of session_id -> Session instance for metadata.
"""
if sessions is None:
sessions = {}
lines = [f"# Daily Report: {date_str}", ""]
# Group messages by session
session_messages = {}
for msg in messages:
sid = msg.session_id if hasattr(msg, "session_id") else msg.session.session_id
if sid not in session_messages:
session_messages[sid] = []
session_messages[sid].append(msg)
for session_id, msgs in session_messages.items():
session = sessions.get(session_id)
if session:
lines.append(
f"## Session: {session_id} (Agent: {session.agent_name})"
)
lines.append(
f"**Model**: {session.model_id or 'N/A'} | "
f"**Token**: {session.total_tokens:,}"
)
else:
lines.append(f"## Session: {session_id}")
lines.append("")
for msg in msgs:
role_label = {
"user": "User",
"assistant": "Assistant",
"toolResult": "Tool Result",
}.get(msg.role, msg.role)
time_str = msg.timestamp.strftime("%H:%M")
# For assistant messages, check raw_content for tool_call mentions
if msg.role == "assistant":
tool_info = _extract_tool_info(msg.raw_content)
lines.append(f"### {time_str} {role_label}")
if msg.content_text:
lines.append("")
lines.append(msg.content_text)
for tool in tool_info:
lines.append("")
lines.append(
f"**{time_str} {role_label} -> [Tool: {tool['name']}]**"
)
lines.append("")
lines.append(f"`{tool.get('arguments', '')}`")
if tool.get("result"):
lines.append("")
lines.append(f'**Result**: {tool["result"]}')
elif msg.role == "toolResult":
continue # toolResult handled inline with assistant
else:
lines.append(f"### {time_str} {role_label}")
if msg.content_text:
lines.append("")
lines.append(msg.content_text)
lines.append("")
lines.append("---")
lines.append("")
return "\n".join(lines)
def _extract_tool_info(raw_content):
"""Extract tool call info from message raw_content JSON."""
tools = []
if isinstance(raw_content, list):
for block in raw_content:
if isinstance(block, dict) and block.get("type") == "toolCall":
tool_name = block.get("tool_name") or block.get("name", "unknown")
args = block.get("arguments", {})
if isinstance(args, str):
args_str = args[:200]
else:
args_str = str(args)[:200]
tools.append({
"name": tool_name,
"arguments": args_str,
"result": "", # Will be filled later from toolResult
})
return tools
def export_daily_markdown(sessions_queryset):
"""Generate a markdown file from a QuerySet of Session objects.
Returns (markdown_string, filename).
Fetches all messages for the sessions.
"""
messages = Message.objects.filter(
session__in=sessions_queryset
).order_by("timestamp")
sessions_map = {s.session_id: s for s in sessions_queryset}
# Determine date from first session start_time
first_session = sessions_queryset.order_by("start_time").first()
if first_session and first_session.start_time:
date_str = first_session.start_time.strftime("%Y-%m-%d")
else:
date_str = "export"
md = generate_markdown_report(messages, date_str, sessions_map)
filename = f"daily-report-{date_str}.md"
return md, filename
- Step 4: 运行导出测试
pytest tests/test_admin_export.py -v
Expected: 3 PASS.
- Step 5: 注册 Admin Action
修改 src/openclaw/admin.py,在 SessionAdmin 中添加 action:
from openclaw.export import export_daily_markdown
from django.http import HttpResponse
@admin.action(description="Export selected sessions to Markdown")
def export_to_markdown(modeladmin, request, queryset):
md, filename = export_daily_markdown(queryset)
response = HttpResponse(md, content_type="text/markdown")
response["Content-Disposition"] = f'attachment; filename="{filename}"'
return response
@admin.register(Session)
class SessionAdmin(admin.ModelAdmin):
# ... existing code
actions = [export_to_markdown]
...
完整文件在 commit 时需要合并以上修改。
- Step 6: 手动验证
cd src && python manage.py runserver
在 Admin Session 列表勾选 sessions → 选择 "Export selected sessions to Markdown" → 确认下载文件,内容为 Markdown 格式对话。
- Step 7: Commit
git add src/openclaw/export.py tests/test_admin_export.py src/openclaw/admin.py
git commit -m "feat: daily Markdown export admin action"
Task 6: 客户端 JSONL 解析与推送脚本
Files:
-
Create:
scripts/sync_sessions.py -
Step 1: 创建脚本
scripts/sync_sessions.py:
#!/usr/bin/env python
"""
OpenClaw Session Sync Script
Scans local agent sessions directories, parses JSONL files,
and pushes structured JSON to the Django API.
Usage:
python sync_sessions.py --remote-url http://macmini:8000/api/sessions/bulk_upsert/
Cron:
0 2 * * * cd /path/to/scripts && python sync_sessions.py --remote-url <url>
"""
import argparse
import json
import os
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
# ─────────────────────────────────────────────────────────────────
# Configuration
# ─────────────────────────────────────────────────────────────────
SESSIONS_DIR_NAME = "sessions"
STATE_FILE = ".sync_state"
DELETED_SUFFIX = ".deleted."
# ─────────────────────────────────────────────────────────────────
# File Discovery
# ─────────────────────────────────────────────────────────────────
def find_sessions(root_path):
"""Walk root_path/agents/*/sessions/ and yield (agent_name, jsonl_path)."""
agents_dir = Path(root_path) / "agents"
if not agents_dir.exists():
return
for agent_folder in sorted(agents_dir.iterdir()):
if not agent_folder.is_dir():
continue
sessions_dir = agent_folder / SESSIONS_DIR_NAME
if not sessions_dir.exists():
continue
agent_name = agent_folder.name
for jsonl_file in sorted(sessions_dir.glob("*.jsonl")):
if DELETED_SUFFIX in jsonl_file.name:
continue
yield agent_name, str(jsonl_file)
def get_sync_state(sessions_dir):
"""Read .sync_state from sessions directory, return {path: mtime}."""
state_path = Path(sessions_dir) / STATE_FILE
if not state_path.exists():
return {}
try:
with open(state_path) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {}
def save_sync_state(sessions_dir, state):
"""Write .sync_state file."""
state_path = Path(sessions_dir) / STATE_FILE
with open(state_path, "w") as f:
json.dump(state, f)
def get_new_files(root_path):
"""Find files that are new or modified since last sync."""
state = {}
all_sessions_dirs = set()
agents_dir = Path(root_path) / "agents"
if agents_dir.exists():
for agent_folder in agents_dir.iterdir():
if agent_folder.is_dir():
sessions_dir = agent_folder / SESSIONS_DIR_NAME
if sessions_dir.exists():
all_sessions_dirs.add(str(sessions_dir))
# Load existing state from all session dirs
merged_state = {}
for sd in all_sessions_dirs:
sd_state = get_sync_state(sd)
merged_state.update(sd_state)
new_files = []
for agent_name, jsonl_path in find_sessions(root_path):
stat = os.stat(jsonl_path)
mtime = stat.st_mtime
file_key = jsonl_path
old_mtime = merged_state.get(file_key, 0)
if mtime > old_mtime:
new_files.append((agent_name, jsonl_path))
merged_state[file_key] = mtime
# Save new state
for sd in all_sessions_dirs:
dir_files = {k: v for k, v in merged_state.items() if k.startswith(sd)}
save_sync_state(sd, dir_files)
return new_files
# ─────────────────────────────────────────────────────────────────
# JSONL Parser
# ─────────────────────────────────────────────────────────────────
def parse_jsonl(file_path):
"""Parse a JSONL file and return structured session data."""
sessions = []
messages = []
tool_calls = []
# State tracking for model/thinking changes
current_model_provider = ""
current_model_id = ""
current_thinking_level = ""
# Tool results lookup by tool_call_id
tool_results = {}
events = []
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
events.append(event)
except json.JSONDecodeError:
continue
if not events:
return sessions, messages, tool_calls
# First pass: extract session metadata
session_event = None
for event in events:
event_type = event.get("type", "")
if event_type == "session":
session_event = event
break
if not session_event:
return sessions, messages, tool_calls
session_id = session_event.get("id", "")
session_timestamp = session_event.get("timestamp", "")
session_cwd = session_event.get("cwd", "")
session_version = events[-1].get("version", 0) if events else 0
# Determine start and end time from all events
timestamps = []
for event in events:
ts = event.get("timestamp", "")
if ts:
timestamps.append(ts)
# Second pass: process events
message_seq = 0
total_tokens = 0
total_cost = 0.0
message_count = 0
tool_call_count = 0
error_count = 0
for event in events:
event_type = event.get("type", "")
if event_type == "model_change":
current_model_provider = event.get("provider", "")
current_model_id = event.get("modelId", "")
elif event_type == "thinking_level_change":
current_thinking_level = event.get("thinkingLevel", "")
elif event_type == "message":
role = event.get("role", "")
msg_id = event.get("id", "")
parent_id = event.get("parentId", "")
msg_timestamp = event.get("timestamp", "")
# Extract text content (skip thinking)
content_items = event.get("content", [])
text_parts = []
tc_list = []
for item in content_items:
if isinstance(item, dict):
if item.get("type") == "text":
text_parts.append(item.get("text", ""))
elif item.get("type") == "toolCall":
tc_list.append(item)
# Skip thinking types
content_text = "\n".join(text_parts)
msg_data = {
"session_id": session_id,
"message_id": msg_id,
"parent_id": parent_id,
"seq": message_seq,
"role": role,
"content_text": content_text,
"raw_content": content_items if content_items else [],
"raw_message": event.get("content", []),
"timestamp": msg_timestamp,
}
if role == "assistant":
usage = event.get("usage", {})
msg_data.update({
"model": current_model_id,
"provider": current_model_provider,
"stop_reason": event.get("stopReason", ""),
"tokens_input": usage.get("inputTokens", 0),
"tokens_output": usage.get("outputTokens", 0),
"tokens_cache_read": usage.get("cacheReadInputTokens", 0),
"tokens_cache_write": usage.get("cacheWriteInputTokens", 0),
"tokens_total": usage.get("totalTokens", 0),
})
total_tokens += usage.get("totalTokens", 0)
if event.get("cost"):
cost_val = event["cost"].get("total", 0.0)
msg_data["cost_total"] = cost_val
total_cost += cost_val
message_count += 1
elif role == "toolResult":
msg_data.update({
"tool_call_id": event.get("toolCallId", ""),
"tool_name": event.get("toolName", ""),
"is_error": event.get("isError", False),
"exit_code": event.get("exitCode"),
"duration_ms": event.get("durationMs"),
})
if event.get("isError"):
error_count += 1
# Store for tool call association
if event.get("toolCallId"):
tool_results[event["toolCallId"]] = {
"result_text": content_text,
"is_error": event.get("isError", False),
"exit_code": event.get("exitCode"),
"duration_ms": event.get("durationMs"),
}
messages.append(msg_data)
message_seq += 1
# Extract tool calls from assistant messages
tc_seq = 0
for tc in tc_list:
tool_call_data = {
"session_id": session_id,
"message_id": msg_id,
"tool_call_id": tc.get("id", f"call_{msg_id}_{tc_seq}"),
"tool_name": tc.get("name", "unknown"),
"arguments": tc.get("arguments", {}),
"seq": tc_seq,
}
# Enrich with tool result if available
tr = tool_results.get(tool_call_data["tool_call_id"], {})
tool_call_data["result_text"] = tr.get("result_text", "")
tool_call_data["is_error"] = tr.get("is_error", False)
tool_call_data["exit_code"] = tr.get("exit_code")
tool_call_data["duration_ms"] = tr.get("duration_ms")
tool_calls.append(tool_call_data)
tool_call_count += 1
tc_seq += 1
# Build session record
start_time = timestamps[0] if timestamps else session_timestamp
end_time = timestamps[-1] if timestamps else session_timestamp
session_data = {
"session_id": session_id,
"session_version": session_version,
"model_provider": current_model_provider,
"model_id": current_model_id,
"thinking_level": current_thinking_level,
"start_time": start_time,
"end_time": end_time,
"cwd": session_cwd,
"total_tokens": total_tokens,
"total_cost": total_cost,
"message_count": message_count,
"tool_call_count": tool_call_count,
"error_count": error_count,
"raw_file_path": str(file_path),
"status": "active",
"metadata": {},
}
sessions.append(session_data)
return sessions, messages, tool_calls
# ─────────────────────────────────────────────────────────────────
# HTTP Client
# ─────────────────────────────────────────────────────────────────
def push_to_api(remote_url, payload):
"""POST structured JSON to Django API."""
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
remote_url,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=120) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
print(f"HTTP Error {e.code}: {e.read().decode('utf-8', errors='replace')}")
raise
except urllib.error.URLError as e:
print(f"URL Error: {e.reason}")
raise
# ─────────────────────────────────────────────────────────────────
# Main
# ─────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Sync OpenClaw sessions to Django API")
parser.add_argument(
"--remote-url",
required=True,
help="Django API bulk_upsert endpoint URL",
)
parser.add_argument(
"--root-path",
default=".",
help="Root path containing agents/ directory (default: current dir)",
)
args = parser.parse_args()
new_files = get_new_files(args.root_path)
if not new_files:
print("No new or modified session files found.")
return
print(f"Found {len(new_files)} new/modified session(s).")
total_sessions = 0
total_messages = 0
total_tool_calls = 0
# Group by agent_name (batch per agent)
agent_batches = {}
for agent_name, jsonl_path in new_files:
agent_batches.setdefault(agent_name, []).append(jsonl_path)
for agent_name, file_paths in agent_batches.items():
all_sessions = []
all_messages = []
all_tool_calls = []
for fp in file_paths:
print(f" Parsing: {fp}")
try:
sessions, messages, tool_calls = parse_jsonl(fp)
all_sessions.extend(sessions)
all_messages.extend(messages)
all_tool_calls.extend(tool_calls)
except Exception as e:
print(f" ERROR parsing {fp}: {e}")
continue
if not all_sessions:
continue
payload = {
"agent_name": agent_name,
"source_node": os.environ.get("SOURCE_NODE", "unknown"),
"sessions": all_sessions,
"messages": all_messages,
"tool_calls": all_tool_calls,
}
print(f" Pushing {len(all_sessions)} session(s), "
f"{len(all_messages)} message(s), "
f"{len(all_tool_calls)} tool call(s)...")
try:
result = push_to_api(args.remote_url, payload)
print(f" OK: {result}")
total_sessions += result.get("sessions_upserted", 0)
total_messages += result.get("messages_upserted", 0)
total_tool_calls += result.get("tool_calls_upserted", 0)
except Exception:
print(f" FAILED to push {agent_name} sessions.")
print(f"\nSync complete: {total_sessions} sessions, "
f"{total_messages} messages, {total_tool_calls} tool calls pushed.")
if __name__ == "__main__":
main()
- Step 2: 语法检查
python -m py_compile scripts/sync_sessions.py
Expected: no output (success).
- Step 3: Commit
git add scripts/sync_sessions.py
git commit -m "feat: client JSONL parse and push script"
Task 7: Docker 部署配置
Files:
-
Create:
Dockerfile,docker-compose.yml,.dockerignore -
Create:
nginx/nginx.conf.placeholder -
Step 1: .dockerignore
.dockerignore:
__pycache__
*.pyc
.venv
.env
*.egg-info
.git
.pytest_cache
.mypy_cache
tests/
scripts/
docs/
db.sqlite3
- Step 2: Dockerfile
Dockerfile:
FROM python:3.12-slim
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential libpq-dev \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE ${DJANGO_PORT:-8000}
CMD ["gunicorn", "--bind", "0.0.0.0:${DJANGO_PORT:-8000}", \
"--workers", "4", "--timeout", "120", \
"config.wsgi:application"]
- Step 3: docker-compose.yml
docker-compose.yml:
services:
web:
build:
context: .
dockerfile: Dockerfile
container_name: openclaw-archive
env_file:
- .env
ports:
- "${DJANGO_PORT:-8000}:${DJANGO_PORT:-8000}"
volumes:
- static_volume:/app/staticfiles
- jsonl_archive:/app/archive
restart: unless-stopped
# nginx placeholder (uncomment for production)
# nginx:
# image: nginx:alpine
# container_name: openclaw-nginx
# ports:
# - "80:80"
# volumes:
# - ./nginx/nginx.conf:/etc/nginx/nginx.conf
# - static_volume:/app/staticfiles:ro
# depends_on:
# - web
volumes:
static_volume:
jsonl_archive:
- Step 4: Nginx placeholder
nginx/nginx.conf.placeholder:
# Nginx reverse proxy placeholder for OpenClaw Archive
#
# To enable:
# 1. Rename this file to nginx.conf
# 2. Update server_name and SSL certificate paths
# 3. Uncomment the nginx service in docker-compose.yml
upstream django {
server web:8000;
}
server {
listen 80;
server_name _; # Update for production
location /static/ {
alias /app/staticfiles/;
}
location / {
proxy_pass http://django;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
- Step 5: Settings 更新 — 读取环境变量
src/config/settings/base.py 已使用 os.environ.get() 读取环境变量,无需修改。
- Step 6: Commit
git add Dockerfile docker-compose.yml .dockerignore nginx/nginx.conf.placeholder
git commit -m "feat: Docker Compose deployment configuration"
Task 8: TimescaleDB Hypertable Migration
Files:
- Create:
src/openclaw/migrations/XXXX_add_hypertables.py(数字取决于之前的 migration)
⚠️ 此 task 需要真实的 PostgreSQL + TimescaleDB 环境。开发时可以用 --fake 跳过,生产环境必须真实执行。
- Step 1: 创建 empty migration
cd src && python manage.py makemigrations openclaw --empty -n add_hypertables
- Step 2: 编辑 migration
生成的文件 src/openclaw/migrations/XXXX_add_hypertables.py:
from django.db import migrations
CREATE_HYPERTABLES = """
SELECT create_hypertable(
'sessions',
'start_time',
if_not_exists => TRUE
);
SELECT create_hypertable(
'messages',
'timestamp',
if_not_exists => TRUE
);
"""
class Migration(migrations.Migration):
dependencies = [
('openclaw', 'PREVIOUS_MIGRATION_NUMBER'), # Update this
]
operations = [
migrations.RunSQL(CREATE_HYPERTABLES, migrations.RunSQL.noop),
]
- Step 3: Commit
git add src/openclaw/migrations/
git commit -m "feat: TimescaleDB hypertable migrations"
Task 9: 端到端测试与 README
Files:
-
Create:
README.md(更新) -
Step 1: 更新 README
README.md:
# agent-base
OpenClaw Session Archive — 多 Agent 会话解析与归档系统。
## Architecture
Three nodes (Mac Mini, Ubuntu1, Ubuntu2) run a sync script that parses local JSONL session files and pushes structured data to a central Django API. The Django service runs in Docker Compose and stores data in a remote PostgreSQL + TimescaleDB instance on NAS.
## Quick Start
```bash
# 1. Configure environment
cp .env.example .env
# Edit .env with your database credentials and Django settings
# 2. Build and start
docker compose build
docker compose run --rm web python manage.py migrate
docker compose run --rm web python manage.py createsuperuser
docker compose up -d
# 3. Access
# Django Admin: http://<host>:8000/admin/
# API: http://<host>:8000/api/sessions/bulk_upsert/
Running Tests
pip install -r requirements-dev.txt
pytest -v
Client Sync Script
Deploy scripts/sync_sessions.py on each node:
python sync_sessions.py --remote-url http://<django-host>:8000/api/sessions/bulk_upsert/
Set SOURCE_NODE environment variable on each node (macmini, ubuntu1, ubuntu2).
Daily Export
In Django Admin, select sessions and choose "Export selected sessions to Markdown" action.
- [ ] **Step 2: 端到端测试流程**
```bash
# 1. 用 SQLite 跑通完整流程
cd src
python manage.py migrate
python manage.py createsuperuser # 可选
# 2. 用 pytest fixture 创建测试数据,push 到 API
cd ..
pytest -v
# 3. 启动 Django,手动调 sync_sessions.py(用测试用的 JSONL 文件)验证
cd src && python manage.py runserver &
cd ..
python scripts/sync_sessions.py --remote-url http://localhost:8000/api/sessions/bulk_upsert/ --root-path /path/to/test/agents
- Step 3: Commit
git add README.md
git commit -m "docs: update README with architecture and quick start"
Self-Review
1. Spec Coverage
| Spec Section | Covered By |
|---|---|
| 4.1 Session 表 | Task 1 - Session model |
| 4.2 Message 表 | Task 1 - Message model |
| 4.3 ToolCall 表 | Task 1 - ToolCall model |
| 4.4 TimescaleDB hypertable | Task 8 - hypertable migration |
| 5.1 推送脚本 | Task 6 - sync_sessions.py |
| 5.2 定时任务 | Task 6 - script + cron usage |
| 6. API bulk_upsert | Task 2 - views.py + services.py |
| 7.1-7.4 Admin CRUD | Task 3 - admin.py |
| 7.5 按时间范围查询 | Task 4 - daily_view.html + admin view |
| 7.6 Daily Markdown 导出 | Task 5 - export.py + Admin action |
| 8.1 客户端解析 | Task 6 - parse_jsonl function |
| 8.2 服务端写入 | Task 2 - BulkUpsertService |
| 9. 技术栈 | All tasks |
| 10. Docker Compose | Task 7 |
| 12. 非技术决策 | Task 1 (unique_together), Task 6 (skip .deleted.), Task 6 (SOURCE_NODE env) |
2. Placeholder Scan
No TBD, TODO, or "similar to" patterns found. All code blocks contain actual implementations.
3. Type Consistency
session_idis consistently aCharField(not UUIDField) — matches API usageagent_name + session_idunique_together used consistently in both model and service for idempotencytimestampsuse ISO 8601 strings in API, parsed by_parse_ts()in servicerole_labelsdict in admin matches actual DB role values:"user","assistant","toolResult"
One note: created_at and updated_at fields are auto-added to all models. The spec doesn't mention them but they're standard Django practice and harmless.