from datetime import datetime, timezone import pytest from openclaw.models import Session, Message, ToolCall from openclaw.services import BulkUpsertService @pytest.mark.django_db class TestBulkUpsertService: def _new_session_payload(self): return { "agent_name": "test-agent", "source_node": "macmini", "sessions": [ { "session_id": "sess-001", "session_version": 1, "model_provider": "anthropic", "model_id": "claude-sonnet-4-6", "thinking_level": "high", "start_time": "2026-04-05T10:00:00Z", "end_time": "2026-04-05T10:30:00Z", "cwd": "/tmp/test", "total_tokens": 5000, "total_cost": 0.12, "message_count": 10, "tool_call_count": 3, "error_count": 0, "raw_file_path": "/path/to/sessions/sess-001.jsonl", "status": "active", "metadata": {}, } ], "messages": [ { "session_id": "sess-001", "message_id": "msg-001", "parent_id": "root", "seq": 0, "role": "user", "content_text": "Hello", "raw_content": [{"type": "text", "text": "Hello"}], "raw_message": {}, "timestamp": "2026-04-05T10:05:00Z", } ], "tool_calls": [], } def test_upsert_new_session(self): result = BulkUpsertService.upsert(self._new_session_payload()) assert result["sessions_upserted"] == 1 assert Session.objects.get(session_id="sess-001") def test_upsert_idempotent(self): BulkUpsertService.upsert(self._new_session_payload()) result = BulkUpsertService.upsert(self._new_session_payload()) # second push should not create duplicate assert Session.objects.filter(session_id="sess-001").count() == 1 assert result["sessions_upserted"] == 0 # skipped def test_upsert_with_messages_and_toolcalls(self): payload = self._new_session_payload() payload["messages"].append( { "session_id": "sess-001", "message_id": "msg-002", "parent_id": "msg-001", "seq": 1, "role": "assistant", "content_text": "Hi there", "raw_content": [{"type": "text", "text": "Hi there"}], "raw_message": {}, "timestamp": "2026-04-05T10:06:00Z", "model": "claude-sonnet-4-6", "provider": "anthropic", "stop_reason": "end_turn", "tokens_input": 100, "tokens_output": 50, "tokens_total": 150, "cost_total": 0.01, } ) payload["tool_calls"].append( { "session_id": "sess-001", "message_id": "msg-002", "tool_call_id": "call_0", "tool_name": "exec", "arguments": {"command": "ls"}, "result_text": "file.txt", "is_error": False, "exit_code": 0, "duration_ms": 200, "seq": 0, } ) result = BulkUpsertService.upsert(payload) assert result["messages_upserted"] == 2 assert result["tool_calls_upserted"] == 1 assert ToolCall.objects.get(tool_call_id="call_0").tool_name == "exec"