From 0b94b6765d77444e743fa9533d5ae76e402cb4e0 Mon Sep 17 00:00:00 2001 From: weishen Date: Sun, 5 Apr 2026 14:54:33 +0800 Subject: [PATCH] feat: bulk upsert API with idempotent writes Co-Authored-By: Claude Opus 4.6 --- src/openclaw/services.py | 137 +++++++++++++++++++++++++++++++++++++++ src/openclaw/urls.py | 5 +- src/openclaw/views.py | 27 ++++++++ tests/test_api.py | 41 ++++++++++++ tests/test_services.py | 101 +++++++++++++++++++++++++++++ 5 files changed, 310 insertions(+), 1 deletion(-) create mode 100644 src/openclaw/services.py create mode 100644 src/openclaw/views.py create mode 100644 tests/test_api.py create mode 100644 tests/test_services.py diff --git a/src/openclaw/services.py b/src/openclaw/services.py new file mode 100644 index 0000000..d461008 --- /dev/null +++ b/src/openclaw/services.py @@ -0,0 +1,137 @@ +from datetime import datetime, timezone + +from django.db import transaction + +from openclaw.models import Session, Message, ToolCall + + +def _parse_ts(value): + if not value: + return None + if isinstance(value, str): + # Handle ISO 8601 Z suffix + value = value.replace("Z", "+00:00") + return datetime.fromisoformat(value) + return value + + +class BulkUpsertService: + @staticmethod + @transaction.atomic + def upsert(payload): + agent_name = payload["agent_name"] + source_node = payload["source_node"] + sessions_data = payload.get("sessions", []) + messages_data = payload.get("messages", []) + tool_calls_data = payload.get("tool_calls", []) + + sessions_upserted = 0 + messages_upserted = 0 + tool_calls_upserted = 0 + + for sess in sessions_data: + session_id = sess["session_id"] + defaults = { + "source_node": source_node, + "session_version": sess.get("session_version", 0), + "model_provider": sess.get("model_provider", ""), + "model_id": sess.get("model_id", ""), + "thinking_level": sess.get("thinking_level", ""), + "start_time": _parse_ts(sess.get("start_time")), + "end_time": _parse_ts(sess.get("end_time")), + "cwd": sess.get("cwd", ""), + "total_tokens": sess.get("total_tokens", 0), + "total_cost": sess.get("total_cost", 0.0), + "message_count": sess.get("message_count", 0), + "tool_call_count": sess.get("tool_call_count", 0), + "error_count": sess.get("error_count", 0), + "raw_file_path": sess.get("raw_file_path", ""), + "pushed_at": datetime.now(timezone.utc), + "status": sess.get("status", "active"), + "metadata": sess.get("metadata", {}), + } + _, created = Session.objects.update_or_create( + session_id=session_id, + agent_name=agent_name, + defaults=defaults, + ) + if created: + sessions_upserted += 1 + + # Build session lookup: session_id -> Session instance + session_ids = {s["session_id"] for s in sessions_data} + session_lookup = { + s.session_id: s + for s in Session.objects.filter( + session_id__in=session_ids, agent_name=agent_name + ) + } + + # Upsert messages + for msg in messages_data: + session = session_lookup.get(msg["session_id"]) + if not session: + continue + defaults = { + "parent_id": msg.get("parent_id", ""), + "seq": msg.get("seq", 0), + "role": msg.get("role", ""), + "content_text": msg.get("content_text", ""), + "raw_content": msg.get("raw_content", []), + "raw_message": msg.get("raw_message", {}), + "timestamp": _parse_ts(msg.get("timestamp")), + "model": msg.get("model", ""), + "provider": msg.get("provider", ""), + "stop_reason": msg.get("stop_reason", ""), + "tokens_input": msg.get("tokens_input", 0), + "tokens_output": msg.get("tokens_output", 0), + "tokens_cache_read": msg.get("tokens_cache_read", 0), + "tokens_cache_write": msg.get("tokens_cache_write", 0), + "tokens_total": msg.get("tokens_total", 0), + "cost_total": msg.get("cost_total", 0.0), + "tool_call_id": msg.get("tool_call_id", ""), + "tool_name": msg.get("tool_name", ""), + "is_error": msg.get("is_error", False), + "exit_code": msg.get("exit_code"), + "duration_ms": msg.get("duration_ms"), + } + Message.objects.update_or_create( + session=session, + message_id=msg["message_id"], + defaults=defaults, + ) + messages_upserted += 1 + + # Build message lookup: message_id -> Message instance + msg_lookup = { + m.message_id: m + for m in Message.objects.filter(session__in=session_lookup.values()) + } + + # Upsert tool_calls + for tc in tool_calls_data: + session = session_lookup.get(tc["session_id"]) + message = msg_lookup.get(tc["message_id"]) + if not session or not message: + continue + ToolCall.objects.update_or_create( + session=session, + message=message, + tool_call_id=tc["tool_call_id"], + defaults={ + "tool_name": tc.get("tool_name", ""), + "arguments": tc.get("arguments", {}), + "result_text": tc.get("result_text", ""), + "is_error": tc.get("is_error", False), + "exit_code": tc.get("exit_code"), + "duration_ms": tc.get("duration_ms"), + "seq": tc.get("seq", 0), + }, + ) + tool_calls_upserted += 1 + + return { + "sessions_upserted": sessions_upserted, + "messages_upserted": messages_upserted, + "tool_calls_upserted": tool_calls_upserted, + } diff --git a/src/openclaw/urls.py b/src/openclaw/urls.py index e39cb2c..9025e8e 100644 --- a/src/openclaw/urls.py +++ b/src/openclaw/urls.py @@ -1,3 +1,6 @@ from django.urls import path +from openclaw.views import sessions_bulk_upsert -urlpatterns = [] +urlpatterns = [ + path("sessions/bulk_upsert/", sessions_bulk_upsert, name="sessions_bulk_upsert"), +] diff --git a/src/openclaw/views.py b/src/openclaw/views.py new file mode 100644 index 0000000..67da1ee --- /dev/null +++ b/src/openclaw/views.py @@ -0,0 +1,27 @@ +import json + +from django.http import JsonResponse +from django.views.decorators.csrf import csrf_exempt +from django.views.decorators.http import require_http_methods + +from openclaw.services import BulkUpsertService + + +@csrf_exempt +@require_http_methods(["POST"]) +def sessions_bulk_upsert(request): + try: + payload = json.loads(request.body) + except json.JSONDecodeError: + return JsonResponse({"error": "Invalid JSON"}, status=400) + + if "agent_name" not in payload or "source_node" not in payload: + return JsonResponse( + {"error": "Missing agent_name or source_node"}, status=400 + ) + + if "sessions" not in payload: + return JsonResponse({"error": "Missing sessions"}, status=400) + + result = BulkUpsertService.upsert(payload) + return JsonResponse({"status": "ok", **result}) diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..b2cf304 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,41 @@ +import pytest + +from openclaw.models import Session + +BULK_URL = "/api/sessions/bulk_upsert/" + + +def _minimal_payload(): + return { + "agent_name": "test", + "source_node": "macmini", + "sessions": [ + { + "session_id": "test-session", + "model_provider": "test", + "model_id": "test-model", + } + ], + "messages": [], + "tool_calls": [], + } + + +@pytest.mark.django_db +class TestBulkUpsertAPI: + def test_bulk_upsert_ok(self, client): + resp = client.post(BULK_URL, _minimal_payload(), content_type="application/json") + assert resp.status_code == 200 + data = resp.json() + assert data["status"] == "ok" + assert data["sessions_upserted"] == 1 + + def test_bulk_upsert_idempotent(self, client): + client.post(BULK_URL, _minimal_payload(), content_type="application/json") + resp = client.post(BULK_URL, _minimal_payload(), content_type="application/json") + data = resp.json() + assert data["sessions_upserted"] == 0 + + def test_bulk_upsert_missing_fields_returns_400(self, client): + resp = client.post(BULK_URL, {}, content_type="application/json") + assert resp.status_code == 400 diff --git a/tests/test_services.py b/tests/test_services.py new file mode 100644 index 0000000..65d9249 --- /dev/null +++ b/tests/test_services.py @@ -0,0 +1,101 @@ +from datetime import datetime, timezone + +import pytest +from openclaw.models import Session, Message, ToolCall +from openclaw.services import BulkUpsertService + + +@pytest.mark.django_db +class TestBulkUpsertService: + def _new_session_payload(self): + return { + "agent_name": "test-agent", + "source_node": "macmini", + "sessions": [ + { + "session_id": "sess-001", + "session_version": 1, + "model_provider": "anthropic", + "model_id": "claude-sonnet-4-6", + "thinking_level": "high", + "start_time": "2026-04-05T10:00:00Z", + "end_time": "2026-04-05T10:30:00Z", + "cwd": "/tmp/test", + "total_tokens": 5000, + "total_cost": 0.12, + "message_count": 10, + "tool_call_count": 3, + "error_count": 0, + "raw_file_path": "/path/to/sessions/sess-001.jsonl", + "status": "active", + "metadata": {}, + } + ], + "messages": [ + { + "session_id": "sess-001", + "message_id": "msg-001", + "parent_id": "root", + "seq": 0, + "role": "user", + "content_text": "Hello", + "raw_content": [{"type": "text", "text": "Hello"}], + "raw_message": {}, + "timestamp": "2026-04-05T10:05:00Z", + } + ], + "tool_calls": [], + } + + def test_upsert_new_session(self): + result = BulkUpsertService.upsert(self._new_session_payload()) + assert result["sessions_upserted"] == 1 + assert Session.objects.get(session_id="sess-001") + + def test_upsert_idempotent(self): + BulkUpsertService.upsert(self._new_session_payload()) + result = BulkUpsertService.upsert(self._new_session_payload()) + # second push should not create duplicate + assert Session.objects.filter(session_id="sess-001").count() == 1 + assert result["sessions_upserted"] == 0 # skipped + + def test_upsert_with_messages_and_toolcalls(self): + payload = self._new_session_payload() + payload["messages"].append( + { + "session_id": "sess-001", + "message_id": "msg-002", + "parent_id": "msg-001", + "seq": 1, + "role": "assistant", + "content_text": "Hi there", + "raw_content": [{"type": "text", "text": "Hi there"}], + "raw_message": {}, + "timestamp": "2026-04-05T10:06:00Z", + "model": "claude-sonnet-4-6", + "provider": "anthropic", + "stop_reason": "end_turn", + "tokens_input": 100, + "tokens_output": 50, + "tokens_total": 150, + "cost_total": 0.01, + } + ) + payload["tool_calls"].append( + { + "session_id": "sess-001", + "message_id": "msg-002", + "tool_call_id": "call_0", + "tool_name": "exec", + "arguments": {"command": "ls"}, + "result_text": "file.txt", + "is_error": False, + "exit_code": 0, + "duration_ms": 200, + "seq": 0, + } + ) + result = BulkUpsertService.upsert(payload) + assert result["messages_upserted"] == 2 + assert result["tool_calls_upserted"] == 1 + assert ToolCall.objects.get(tool_call_id="call_0").tool_name == "exec"