Files
nexus/openclaw/yunce/星枢-Agent任务解耦方案.md

723 lines
24 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 星枢 Agent 任务解耦技术方案
> 基于 RabbitMQ 的分布式任务队列架构
---
## 一、概述
### 背景
当前星枢(主 Agent与其他 Agent 的通信方式:
| 方式 | 命令 | 局限 |
|------|------|------|
| 本地 | `openclaw agent --agent xingyao --message "..." --deliver` | 同步等待 |
| 远程 | `ssh ubuntu2 "openclaw agent --agent yunce --message ..."` | 串行阻塞 |
### 目标
- **异步执行**:任务下发不等待结果
- **任务持久化**:重启不丢失
- **可监控**:实时查看任务状态
- **可扩展**:支持多 Agent 并行
---
## 二、技术选型
### RabbitMQ vs 其他
| 特性 | RabbitMQ | Redis Streams | Kafka |
|------|----------|---------------|-------|
| 消息确认 | ✅ ACK | ✅ ACK | ✅ ACK |
| 优先级队列 | ✅ | ❌ | ❌ |
| 延迟队列 | ✅ (插件) | ✅ | ❌ |
| 持久化 | ✅ | ✅ | ✅ |
| 集群 | ✅ | 有限 | ✅ |
| 生态成熟度 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 轻量级 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐ |
**推荐RabbitMQ**
理由:
- 消息确认机制完善
- 支持复杂路由规则
- 管理界面友好
- 适合中低并发场景
---
## 三、架构设计
### 3.1 整体架构
```
┌─────────────────────────────────────────────────────────────────────────┐
│ 用户 │
│ (Telegram/Discord) │
└─────────────────────────────────┬───────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ 星枢 (主 Agent) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 意图理解 │ │ 任务分解 │ │ 队列管理 │ │ 结果聚合 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────┬───────────────────────────────────────┘
┌─────────────┴─────────────┐
│ RabbitMQ 集群 │
│ (task_exchange) │
└─────────────┬─────────────┘
┌───────────────────────┼───────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Yunce (Agent) │ │ Atlas (Agent) │ │ Prometheus │
│ 队列: tasks │ │ 队列: tasks │ │ 队列: tasks │
│ 状态: running │ │ 状态: idle │ │ 状态: idle │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
│ ┌──────────────────┴──────────────────┐ │
│ │ 结果收集 (result_exchange) │ │
│ └──────────────────┬──────────────────┘ │
│ │ │
└──────────────────────┼──────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ 星枢 (结果处理) │
│ - 任务状态更新 │
│ - 用户反馈 │
│ - 后续任务触发 │
└─────────────────────────────────────────────────────────────────────────┘
```
### 3.2 消息流设计
```
┌──────────────────────────────────────────────────────────────────────────┐
│ 消息生命周期 │
└──────────────────────────────────────────────────────────────────────────┘
[1] 任务下发 [5] 结果处理
│ ▲
▼ │
┌────────┐ ┌────────────┐ ┌───────────┐ ┌───────────┐ │
│ 星枢 │───▶│ RabbitMQ │───▶│ Agent N │───▶│ RabbitMQ │──────┘
│创建任务 │ │ (持久化) │ │ 执行任务 │ │ (结果队列) │
└────────┘ └────────────┘ └───────────┘ └───────────┘
│ │
│ [4] ACK 确认
│ │
[2] 任务入队 │
(可选: 延迟队列) ▼
│ ┌───────────┐
└─────────────▶│ 状态变更 │
│ (处理中→完成)
└───────────┘
[3] Agent 消费任务
```
### 3.3 Exchange & Queue 设计
```
┌─────────────────┐
│ task_exchange │ (Topic Exchange)
│ (星枢下发) │
└────────┬────────┘
┌───────────────────┼───────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ queue.yunce │ │ queue.atlas │ │ queue.prometheus│
│ routing: │ │ routing: │ │ routing: │
│ task.yunce │ │ task.atlas │ │ task.prometheus │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
▼ ▼ ▼
[Agent: Yunce] [Agent: Atlas] [Agent: Prometheus]
─────────────────────────────────────────────────────────────────────────
┌─────────────────┐
│result_exchange │ (Topic Exchange)
│ (结果收集) │
└────────┬────────┘
┌───────────────────┼───────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│result.yunce │ │result.atlas │ │result.prometheus │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
└────────────────────┼────────────────────┘
┌─────────────────┐
│ queue.star聚合 │ ← 星枢监听此队列
│ routing: result.#│
└─────────────────┘
```
---
## 四、消息格式定义
### 4.1 任务消息 (Task Message)
```json
{
"taskId": "task_20260317_001",
"type": "task",
"source": "xingyao",
"target": "yunce",
"priority": "high",
"content": {
"action": "code_review",
"params": {
"repo": "my-project",
"branch": "feature/login"
}
},
"metadata": {
"createdAt": "2026-03-17T10:30:00Z",
"expireAt": "2026-03-17T11:30:00Z",
"retryCount": 0,
"maxRetries": 3
}
}
```
### 4.2 结果消息 (Result Message)
```json
{
"taskId": "task_20260317_001",
"type": "result",
"source": "yunce",
"target": "xingyao",
"status": "success",
"content": {
"summary": "代码审查完成",
"findings": [
{"severity": "warning", "message": "建议添加参数校验"}
],
"output": "/path/to/report.md"
},
"metadata": {
"completedAt": "2026-03-17T10:35:00Z",
"duration": 300
}
}
```
### 4.3 心跳消息 (Heartbeat Message)
```json
{
"type": "heartbeat",
"agent": "yunce",
"status": "idle",
"currentTask": null,
"timestamp": "2026-03-17T10:30:00Z"
}
```
---
## 五、实现步骤
### 5.1 RabbitMQ 部署
```bash---
title: 星枢 Agent 任务解耦技术方案
author: shenwei
---
---
title: 星枢 Agent 任务解耦技术方案
source:
author: shenwei
published:
created:
description:
tags: []
---
# Docker 部署
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=your_password \
rabbitmq:3.12-management
# 访问管理界面
# http://your-server:15672
```
### 5.2 创建 Exchange 和 Queue (初始化脚本)
```python
# setup_rabbitmq.py
import pika
def setup_rabbitmq():
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', port=5672)
)
channel = connection.channel()
# 1. 创建 Exchange
channel.exchange_declare(exchange='task_exchange', exchange_type='topic', durable=True)
channel.exchange_declare(exchange='result_exchange', exchange_type='topic', durable=True)
# 2. 创建任务队列 (按 Agent)
agents = ['yunce', 'atlas', 'prometheus', 'oracle']
for agent in agents:
channel.queue_declare(queue=f'queue.{agent}', durable=True)
channel.queue_bind(
exchange='task_exchange',
queue=f'queue.{agent}',
routing_key=f'task.{agent}'
)
# 3. 创建星枢结果聚合队列
channel.queue_declare(queue='queue.star', durable=True)
channel.queue_bind(
exchange='result_exchange',
queue='queue.star',
routing_key='result.#'
)
connection.close()
print("✅ RabbitMQ 初始化完成")
if __name__ == '__main__':
setup_rabbitmq()
```
### 5.3 星枢任务下发模块
```python
# star_sender.py
import pika
import json
import uuid
from datetime import datetime
class StarTaskSender:
def __init__(self, rabbitmq_host='localhost'):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=rabbitmq_host)
)
self.channel = self.connection.channel()
def send_task(self, target_agent, action, params, priority='normal'):
task_id = f"task_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
message = {
"taskId": task_id,
"type": "task",
"source": "xingyao",
"target": target_agent,
"priority": priority,
"content": {
"action": action,
"params": params
},
"metadata": {
"createdAt": datetime.now().isoformat() + "Z",
"retryCount": 0,
"maxRetries": 3
}
}
self.channel.basic_publish(
exchange='task_exchange',
routing_key=f'task.{target_agent}',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # 持久化
priority=10 if priority == 'high' else 5
)
)
print(f"✅ 任务已下发: {task_id} -> {target_agent}")
return task_id
def close(self):
self.connection.close()
# 使用示例
if __name__ == '__main__':
sender = StarTaskSender()
# 下发任务给 Yunce
task_id = sender.send_task(
target_agent='yunce',
action='code_review',
params={'repo': 'my-project', 'branch': 'main'},
priority='high'
)
sender.close()
```
### 5.4 Agent 任务监听模块
```python
# agent_listener.py
import pika
import json
import subprocess
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AgentListener:
def __init__(self, agent_name, rabbitmq_host='localhost'):
self.agent_name = agent_name
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=rabbitmq_host)
)
self.channel = self.connection.channel()
def execute_task(self, task_content):
"""执行任务的核心逻辑"""
action = task_content['action']
params = task_content['params']
logger.info(f"执行任务: {action}")
# 根据 action 调用不同的处理函数
handlers = {
'code_review': self.handle_code_review,
'data_analysis': self.handle_data_analysis,
'file_operation': self.handle_file_operation,
}
handler = handlers.get(action, self.handle_default)
return handler(params)
def handle_code_review(self, params):
# 调用 OpenClaw agent
result = subprocess.run(
['openclaw', 'agent', '--agent', 'yunce',
'--message', f"请审查代码仓库 {params.get('repo')}"],
capture_output=True, text=True
)
return {'output': result.stdout, 'status': 'success'}
def handle_default(self, params):
return {'message': f'Unknown action: {params}'}
def on_message(self, ch, method, properties, body):
"""消息处理回调"""
try:
message = json.loads(body)
task_id = message['taskId']
logger.info(f"收到任务: {task_id}")
# 执行任务
result = self.execute_task(message['content'])
# 发送结果
self.send_result(task_id, result)
# ACK 确认
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.error(f"任务执行失败: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def send_result(self, task_id, result):
"""发送结果到星枢"""
result_message = {
"taskId": task_id,
"type": "result",
"source": self.agent_name,
"target": "xingyao",
"status": "success",
"content": result,
"metadata": {
"completedAt": datetime.now().isoformat() + "Z"
}
}
self.channel.basic_publish(
exchange='result_exchange',
routing_key=f'result.{self.agent_name}',
body=json.dumps(result_message),
properties=pika.BasicProperties(delivery_mode=2)
)
def start_listening(self):
"""开始监听任务队列"""
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(
queue=f'queue.{self.agent_name}',
on_message_callback=self.on_message
)
logger.info(f"🤖 Agent [{self.agent_name}] 开始监听任务队列...")
self.channel.start_consuming()
# 使用示例
if __name__ == '__main__':
import sys
agent_name = sys.argv[1] if len(sys.argv) > 1 else 'yunce'
listener = AgentListener(agent_name)
listener.start_listening()
```
### 5.5 星枢结果收集模块
```python
# star_receiver.py
import pika
import json
from datetime import datetime
class StarResultReceiver:
def __init__(self, rabbitmq_host='localhost'):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=rabbitmq_host)
)
self.channel = self.connection.channel()
self.pending_tasks = {} # 跟踪待处理任务
def on_message(self, ch, method, properties, body):
message = json.loads(body)
if message['type'] == 'result':
task_id = message['taskId']
status = message['status']
result = message['content']
print(f"📋 任务完成: {task_id}")
print(f" 状态: {status}")
print(f" 结果: {result}")
# 更新任务状态
if task_id in self.pending_tasks:
self.pending_tasks[task_id]['status'] = 'completed'
self.pending_tasks[task_id]['result'] = result
# 可以触发后续任务
self.handle_next_action(message)
elif message['type'] == 'heartbeat':
print(f"💓 Agent 心跳: {message['agent']} - {message['status']}")
ch.basic_ack(delivery_tag=method.delivery_tag)
def handle_next_action(self, message):
"""根据结果触发后续动作"""
# 示例:根据结果发送新任务
pass
def start_listening(self):
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(
queue='queue.star',
on_message_callback=self.on_message
)
print("🌟 星枢开始监听任务结果...")
self.channel.start_consuming()
# 使用示例
if __name__ == '__main__':
receiver = StarResultReceiver()
receiver.start_listening()
```
---
## 六、监控界面
### 6.1 RabbitMQ 管理界面
```
URL: http://localhost:15672
用户名: admin
密码: your_password
可查看:
- 队列状态 (Messages, Ready, Unacked)
- 连接数
- 消息流速
- 交换机绑定
```
### 6.2 自定义监控面板 (可选)
```python
# 简单的任务状态查询
def get_task_status(task_id):
# 可以通过 REST API 查询
# 或者维护一个 Redis 状态缓存
pass
def list_pending_tasks():
# 列出所有待处理任务
pass
def list_agent_status():
# 列出所有 Agent 状态
pass
```
---
## 七、完整工作流程示例
```
┌─────────────────────────────────────────────────────────────────────────┐
│ 完整示例:代码审查任务 │
└─────────────────────────────────────────────────────────────────────────┘
[用户]
│ "星枢,帮我审查 my-project 的 main 分支"
[星枢 - 意图理解]
│ action: code_review
│ target: yunce
│ params: {repo: "my-project", branch: "main"}
[星枢 - 任务下发]
│ RabbitMQ: task.yunce
│ taskId: task_20260317_001
[RabbitMQ] (持久化消息)
[Yunce Agent - 任务监听]
│ 收到任务 -> 执行 code_review
│ 调用: openclaw agent --agent yunce --message "审查 my-project"
[Yunce Agent - 返回结果]
│ RabbitMQ: result.yunce
│ status: success, findings: [...]
[RabbitMQ]
│ result.# -> queue.star
[星枢 - 结果收集]
│ 接收结果 -> 更新状态
│ 格式化输出 -> 推送给用户
[用户]
│ 收到审查报告
```
---
## 八、部署建议
### 8.1 生产环境配置
```yaml
# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3.12-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
volumes:
- rabbitmq_data:/var/lib/rabbitmq
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "check_running"]
interval: 30s
volumes:
rabbitmq_data:
```
### 8.2 安全建议
1. **认证**:启用 RabbitMQ 用户认证
2. **SSL/TLS**:生产环境启用 amqps
3. **VHost**:不同项目使用不同 vhost
4. **权限**:最小权限原则
---
## 九、故障处理
| 故障场景 | 解决方案 |
|----------|----------|
| Agent 宕机 | 任务自动重新入队 (requeue) |
| RabbitMQ 宕机 | 消息持久化,重启后恢复 |
| 任务超时 | 设置 TTL自动移到死信队列 |
| 消息积压 | 监控队列长度,扩展消费者 |
---
## 十、进阶功能
### 10.1 延迟任务
```python
# 延迟队列:让任务在指定时间后执行
def send_delayed_task(target, action, delay_seconds):
# 使用 RabbitMQ 延迟插件 或 配合 Redis 实现
pass
```
### 10.2 优先级队列
```python
# 高优先级任务优先处理
channel.queue_declare(queue='queue.yunce', arguments={
'x-max-priority': 10
})
```
### 10.3 任务超时
```python
# 消息 TTL + 死信队列
channel.queue_declare(
queue='queue.yunce',
arguments={
'x-message-ttl': 3600000, # 1小时
'x-dead-letter-exchange': 'dlx_exchange'
}
)
```
---
## 附录:文件清单
| 文件 | 说明 |
|------|------|
| `setup_rabbitmq.py` | RabbitMQ 初始化脚本 |
| `star_sender.py` | 星枢任务下发模块 |
| `agent_listener.py` | Agent 任务监听模块 |
| `star_receiver.py` | 星枢结果收集模块 |
| `docker-compose.yml` | 一键部署配置 |
---
*文档版本: 1.0*
*创建时间: 2026-03-17*
*作者: 云策*