# 星枢 Agent 任务解耦技术方案 > 基于 RabbitMQ 的分布式任务队列架构 > 版本: 2.1 (已更新) > 最后更新: 2026-03-17 --- ## 一、概述 ### 背景 当前星枢(主 Agent)与其他 Agent 的通信方式: | 方式 | 命令 | 局限 | | --- | ---------------------------------------------------------- | ---- | | 本地 | `openclaw agent --agent xingyao --message "..." --deliver` | 同步等待 | | 远程 | `ssh ubuntu2 "openclaw agent --agent yunce --message ..."` | 串行阻塞 | ### 目标 - **异步执行**:任务下发不等待结果 - **任务持久化**:重启不丢失 - **可监控**:实时查看任务状态 - **可扩展**:支持多 Agent 并行 ### 讨论结论 **技术选型**: RabbitMQ **Exchange 命名**: `task_exchange` / `result_exchange` --- ## 二、星枢调度 Agent 列表 根据 `星枢调度Agent列表.md`,星枢需要调度的 Agent 包括: ### Mac Mini(中央控制节点) | Agent ID | 角色 | 职责 | |----------|------|------| | xingyao | 星曜 | IT 管家 / 运维管理 | | xinghui | 星辉 | 个人助理 / 日程管理 | ### Ubuntu2(开发服务器) | Agent ID | 角色 | 职责 | |----------|------|------| | yunhan | 云瀚 | 监控官 / 系统监控 | | yunce | 云策 | 架构师 / 技术方案 | | yunjiang | 云匠 | 工匠 / 代码开发 | | yunzhi | 云织 | 自动化师 / CI/CD | ### Ubuntu1(准生产服务器) | Agent ID | 角色 | 职责 | |----------|------|------| | fengheng | 风衡 | 质检官 / QA测试 | | fengchi | 风驰 | 执行者 / 业务流程 | | fengji | 风纪 | 审计官 / 规则审计 | **总计**: 9 个子 Agent --- ## 三、架构设计 ### 3.1 整体架构 ``` ┌─────────────────────────────────────────────────────────────────────────┐ │ 用户 │ │ (Telegram/Discord) │ └─────────────────────────────────┬───────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────────┐ │ 星枢 (xingshu) │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ 意图理解 │ │ 任务分解 │ │ 队列管理 │ │ 结果聚合 │ │ │ │ intent_ │ │ message_ │ │ rabbitmq_ │ │ result_ │ │ │ │ parser.py │ │ builder.py │ │ sender.py │ │ receiver.py│ │ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────┬───────────────────────────────────────┘ │ ┌─────────────┴─────────────┐ │ RabbitMQ 集群 │ │ (task_exchange) │ └─────────────┬─────────────┘ │ ┌───────────┬───────────┼───────────┬───────────┐ │ │ │ │ │ ▼ ▼ ▼ ▼ ▼ ┌──────────┐┌──────────┐┌──────────┐┌──────────┐┌──────────┐ │ tasks ││ tasks ││ tasks ││ tasks ││ tasks │ │ .xingyao ││ .xinghui ││ .yunhan ││ .yunce ││.yunjiang │ └────┬─────┘└────┬─────┘└────┬─────┘└────┬─────┘└────┬─────┘ │ │ │ │ │ ▼ ▼ ▼ ▼ ▼ [星曜] [星辉] [云瀚] [云策] [云匠] │ │ │ │ │ └───────────┴───────────┼───────────┴───────────┘ │ ┌────────────┴────────────┐ │ result_exchange │ │ (result.#) │ └────────────┬────────────┘ │ ▼ ┌─────────────────────┐ │ results.xingshu │ ← 星枢监听 └─────────────────────┘ ``` ### 3.2 消息流设计 ``` ┌──────────────────────────────────────────────────────────────────────────┐ │ 消息生命周期 │ └──────────────────────────────────────────────────────────────────────────┘ [1] 任务下发 [5] 结果处理 │ ▲ ▼ │ ┌────────┐ ┌────────────┐ ┌───────────┐ ┌───────────┐ │ │ 星枢 │───▶│ RabbitMQ │───▶│ Agent N │───▶│ RabbitMQ │──────┐ │创建任务 │ │ (持久化) │ │ 执行任务 │ │ (结果队列) │ │ └────────┘ └────────────┘ └───────────┘ └───────────┘ │ │ │ │ │ [4] ACK 确认 │ │ │ │ [2] 任务入队 │ │ (可选: 延迟队列) ▼ │ │ ┌───────────┐ │ └─────────────▶│ 状态变更 │ │ │ (处理中→完成) │ └───────────┘ │ [3] Agent 消费任务 ``` --- ## 四、Exchange & Queue 设计 ### 4.1 Exchanges | Exchange | 类型 | 用途 | Routing Key | |----------|------|------|-------------| | `task_exchange` | topic | 任务下发 | `task.{agent}` | | `result_exchange` | topic | 结果收集 | `result.{agent}` | ### 4.2 Queues | Queue | 消费者 | Agent 角色 | |-------|--------|------------| | `tasks.xingyao` | xingyao | 星曜 - IT管家 | | `tasks.xinghui` | xinghui | 星辉 - 个人助理 | | `tasks.yunhan` | yunhan | 云瀚 - 监控官 | | `tasks.yunce` | yunce | 云策 - 架构师 | | `tasks.yunjiang` | yunjiang | 云匠 - 工匠 | | `tasks.yunzhi` | yunzhi | 云织 - 自动化师 | | `tasks.fengheng` | fengheng | 风衡 - 质检官 | | `tasks.fengchi` | fengchi | 风驰 - 执行者 | | `tasks.fengji` | fengji | 风纪 - 审计官 | | `results.xingshu` | xingshu | 星枢 - 结果聚合 | ### 4.3 Routing 规则 ``` 任务下发: task_exchange │ ├─→ task.xingyao → tasks.xingyao ├─→ task.xinghui → tasks.xinghui ├─→ task.yunhan → tasks.yunhan ├─→ task.yunce → tasks.yunce ├─→ task.yunjiang → tasks.yunjiang ├─→ task.yunzhi → tasks.yunzhi ├─→ task.fengheng → tasks.fengheng ├─→ task.fengchi → tasks.fengchi └─→ task.fengji → tasks.fengji 结果收集: result_exchange │ └─→ result.# → results.xingshu (星枢) ``` --- ## 五、消息格式定义 ### 5.1 任务消息 (Task Message) ```json { "taskId": "task_20260317_001", "type": "task", "source": "xingshu", "target": "yunce", "priority": "high", "content": { "action": "code_review", "params": { "repo": "my-project", "branch": "feature/login" } }, "metadata": { "createdAt": "2026-03-17T10:30:00Z", "expireAt": "2026-03-17T11:30:00Z", "retryCount": 0, "maxRetries": 3 } } ``` ### 5.2 结果消息 (Result Message) ```json { "taskId": "task_20260317_001", "type": "result", "source": "yunce", "target": "xingshu", "status": "success", "content": { "summary": "代码审查完成", "findings": [ {"severity": "warning", "message": "建议添加参数校验"} ], "output": "/path/to/report.md" }, "metadata": { "completedAt": "2026-03-17T10:35:00Z", "duration": 300 } } ``` ### 5.3 心跳消息 (Heartbeat Message) ```json { "type": "heartbeat", "agent": "yunce", "status": "idle", "currentTask": null, "timestamp": "2026-03-17T10:30:00Z" } ``` --- ## 六、星枢指令解析 ### 6.1 意图到 Agent 的映射 | 用户指令 | action | target | 执行 Agent | |----------|--------|--------|-----------| | 运维/服务器管理 | ops | xingyao | 星曜 | | 日程/个人事务 | personal | xinghui | 星辉 | | 监控巡检 | monitor | yunhan | 云瀚 | | 架构/方案 | architecture | yunce | 云策 | | 代码开发 | coding | yunjiang | 云匠 | | CI/CD/自动化 | automation | yunzhi | 云织 | | QA测试 | qa_test | fengheng | 风衡 | | 业务执行 | execute | fengchi | 风驰 | | 审计/合规 | audit | fengji | 风纪 | ### 6.2 async-task-scheduling 支持的 action | action | target | 说明 | |--------|--------|------| | ops | xingyao | IT 运维任务 | | personal | xinghui | 个人事务 | | monitor | yunhan | 系统监控 | | architecture | yunce | 技术方案 | | coding | yunjiang | 代码开发 | | automation | yunzhi | 自动化 | | qa_test | fengheng | QA 测试 | | execute | fengchi | 业务执行 | | audit | fengji | 审计检查 | | code_review | yunce | 代码审查 (快捷) | | deploy | yunzhi | 部署服务 | --- ## 七、RabbitMQ 初始化步骤 ### 7.1 环境信息 | 项目 | 值 | |------|-----| | **IP 地址** | 192.168.3.189 | | **AMQP 端口** | 5672 | | **管理界面** | http://192.168.3.189:15672/ | | **用户名** | guest | | **密码** | guest | ### 7.2 一键初始化命令 ```bash python3 << 'EOF' import pika connection = pika.BlockingConnection( pika.ConnectionParameters( host='192.168.3.189', port=5672, credentials=pika.PlainCredentials('guest', 'guest') ) ) channel = connection.channel() # ========== 1. 创建 Exchange ========== channel.exchange_declare(exchange='task_exchange', exchange_type='topic', durable=True) channel.exchange_declare(exchange='result_exchange', exchange_type='topic', durable=True) print("✅ Exchanges: task_exchange, result_exchange") # ========== 2. 创建任务队列 (9个子Agent) ========== agents = [ 'xingyao', # 星曜 - IT管家 'xinghui', # 星辉 - 个人助理 'yunhan', # 云瀚 - 监控官 'yunce', # 云策 - 架构师 'yunjiang', # 云匠 - 工匠 'yunzhi', # 云织 - 自动化师 'fengheng', # 风衡 - 质检官 'fengchi', # 风驰 - 执行者 'fengji', # 风纪 - 审计官 ] for agent in agents: queue_name = f'tasks.{agent}' channel.queue_declare(queue=queue_name, durable=True) channel.queue_bind( exchange='task_exchange', queue=queue_name, routing_key=f'task.{agent}' ) print(f" 📬 {queue_name} -> task.{agent}") # ========== 3. 创建结果聚合队列 ========== channel.queue_declare(queue='results.xingshu', durable=True) channel.queue_bind( exchange='result_exchange', queue='results.xingshu', routing_key='result.#' ) print(" 📬 results.xingshu -> result.#") connection.close() print("\n🎉 RabbitMQ 初始化完成!") EOF ``` --- ## 八、async-task-scheduling 技能 ### 8.1 技能概述 **技能名称**: async-task-scheduling (异步任务调度) **用途**: 星枢专用技能,用于解析用户指令并通过 RabbitMQ 异步调度子 Agent **技能位置**: ``` ~/Obsidian/shenwei/openclaw/yunce/async-task-scheduling/ ``` ### 8.2 文件结构 ``` async-task-scheduling/ ├── SKILL.md # 技能说明 └── scripts/ ├── __init__.py ├── intent_parser.py # 意图解析 (action → target 映射) ├── message_builder.py # 消息构建 (JSON 格式) ├── rabbitmq_sender.py # RabbitMQ 发送 └── agent_listener.py # 子 Agent 监听 (可选) ``` ### 8.3 使用方法 ```python from scripts import IntentParser, MessageBuilder, RabbitMQSender # 1. 解析用户指令 parser = IntentParser() intent = parser.parse_intent("帮我审查 my-project 仓库") # 2. 构建消息 builder = MessageBuilder(source="xingshu") message = builder.build_task_message(intent) # 3. 发送到 RabbitMQ sender = RabbitMQSender() task_id = sender.send_task(message) sender.close() return f"✅ 任务已下发: {task_id} → {intent['target']}" ``` ### 8.4 配置 RabbitMQ 连接配置 (rabbitmq_sender.py): ```python DEFAULT_CONFIG = { "host": "192.168.3.189", "port": 5672, "username": "guest", "password": "guest", "exchange": "task_exchange", "result_exchange": "result_exchange" } ``` --- ## 九、待执行事项 - [ ] 确认设计后,初始化 RabbitMQ - [ ] 为星枢 (xingshu) 加载 async-task-scheduling 技能 - [ ] 子 Agent 监听队列并消费任务 - [ ] 端到端测试 --- ## 十、相关文件 | 文件 | 位置 | |------|------| | 星枢调度 Agent 列表 | `~/Obsidian/shenwei/openclaw/xingshu/星枢调度Agent列表.md` | | 异步任务调度技能 | `~/Obsidian/shenwei/openclaw/yunce/async-task-scheduling/` | --- *文档版本: 2.1* *更新日期: 2026-03-17* *作者: 云策*