Files
nexus/openclaw/yunce/星枢-Agent任务解耦方案.md

25 KiB
Raw Blame History

星枢 Agent 任务解耦技术方案

基于 RabbitMQ 的分布式任务队列架构


一、概述

背景

当前星枢(主 Agent与其他 Agent 的通信方式:

方式 命令 局限
本地 openclaw agent --agent xingyao --message "..." --deliver 同步等待
远程 ssh ubuntu2 "openclaw agent --agent yunce --message ..." 串行阻塞

目标

  • 异步执行:任务下发不等待结果
  • 任务持久化:重启不丢失
  • 可监控:实时查看任务状态
  • 可扩展:支持多 Agent 并行

二、技术选型

RabbitMQ vs 其他

特性 RabbitMQ Redis Streams Kafka
消息确认 ACK ACK ACK
优先级队列
延迟队列 (插件)
持久化
集群 有限
生态成熟度
轻量级

推荐RabbitMQ

理由:

  • 消息确认机制完善
  • 支持复杂路由规则
  • 管理界面友好
  • 适合中低并发场景

三、架构设计

3.1 整体架构

┌─────────────────────────────────────────────────────────────────────────┐
│                                用户                                      │
│                           (Telegram/Discord)                             │
└─────────────────────────────────┬───────────────────────────────────────┘
                                  │
                                  ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                              星枢 (主 Agent)                              │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  │
│  │  意图理解   │  │  任务分解    │  │  队列管理    │  │  结果聚合    │  │
│  └─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘  │
└─────────────────────────────────┬───────────────────────────────────────┘
                                  │
                    ┌─────────────┴─────────────┐
                    │      RabbitMQ 集群         │
                    │   (task_exchange)          │
                    └─────────────┬─────────────┘
                                  │
          ┌───────────────────────┼───────────────────────┐
          │                       │                       │
          ▼                       ▼                       ▼
┌─────────────────┐   ┌─────────────────┐   ┌─────────────────┐
│  Yunce (Agent)  │   │  Atlas (Agent)  │   │  Prometheus    │
│  队列: tasks   │   │  队列: tasks   │   │  队列: tasks   │
│  状态: running │   │  状态: idle    │   │  状态: idle    │
└────────┬────────┘   └────────┬────────┘   └────────┬────────┘
         │                      │                      │
         │   ┌──────────────────┴──────────────────┐   │
         │   │         结果收集 (result_exchange)   │   │
         │   └──────────────────┬──────────────────┘   │
         │                      │                      │
         └──────────────────────┼──────────────────────┘
                                │
                                ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                          星枢 (结果处理)                                  │
│  - 任务状态更新                                                           │
│  - 用户反馈                                                              │
│  - 后续任务触发                                                          │
└─────────────────────────────────────────────────────────────────────────┘

3.2 消息流设计

┌──────────────────────────────────────────────────────────────────────────┐
│                           消息生命周期                                    │
└──────────────────────────────────────────────────────────────────────────┘

[1] 任务下发                                                        [5] 结果处理
    │                                                                   ▲
    ▼                                                                   │
┌────────┐    ┌────────────┐    ┌───────────┐    ┌───────────┐      │
│  星枢   │───▶│  RabbitMQ  │───▶│  Agent N  │───▶│  RabbitMQ │──────┘
│创建任务 │    │ (持久化)   │    │  执行任务  │    │ (结果队列) │
└────────┘    └────────────┘    └───────────┘    └───────────┘
                  │                   │
                  │              [4] ACK 确认
                  │                   │
[2] 任务入队                           │
(可选: 延迟队列)                       ▼
                  │              ┌───────────┐
                  └─────────────▶│  状态变更  │
                                 │ (处理中→完成)
                                 └───────────┘

[3] Agent 消费任务

3.3 Exchange & Queue 设计

                    ┌─────────────────┐
                    │  task_exchange  │  (Topic Exchange)
                    │   (星枢下发)     │
                    └────────┬────────┘
                             │
         ┌───────────────────┼───────────────────┐
         │                   │                   │
         ▼                   ▼                   ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│  queue.yunce   │ │  queue.atlas   │ │  queue.prometheus│
│  routing:      │ │  routing:      │ │  routing:       │
│  task.yunce    │ │  task.atlas    │ │  task.prometheus │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
         │                    │                    │
         ▼                    ▼                    ▼
   [Agent: Yunce]      [Agent: Atlas]      [Agent: Prometheus]

─────────────────────────────────────────────────────────────────────────

                    ┌─────────────────┐
                    │result_exchange  │  (Topic Exchange)
                    │   (结果收集)     │
                    └────────┬────────┘
                             │
         ┌───────────────────┼───────────────────┐
         │                   │                   │
         ▼                   ▼                   ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│result.yunce    │ │result.atlas    │ │result.prometheus │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
         │                    │                    │
         └────────────────────┼────────────────────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │ queue.star聚合  │  ← 星枢监听此队列
                    │ routing: result.#│
                    └─────────────────┘

四、消息格式定义

4.1 任务消息 (Task Message)

{
  "taskId": "task_20260317_001",
  "type": "task",
  "source": "xingyao",
  "target": "yunce",
  "priority": "high",
  "content": {
    "action": "code_review",
    "params": {
      "repo": "my-project",
      "branch": "feature/login"
    }
  },
  "metadata": {
    "createdAt": "2026-03-17T10:30:00Z",
    "expireAt": "2026-03-17T11:30:00Z",
    "retryCount": 0,
    "maxRetries": 3
  }
}

4.2 结果消息 (Result Message)

{
  "taskId": "task_20260317_001",
  "type": "result",
  "source": "yunce",
  "target": "xingyao",
  "status": "success",
  "content": {
    "summary": "代码审查完成",
    "findings": [
      {"severity": "warning", "message": "建议添加参数校验"}
    ],
    "output": "/path/to/report.md"
  },
  "metadata": {
    "completedAt": "2026-03-17T10:35:00Z",
    "duration": 300
  }
}

4.3 心跳消息 (Heartbeat Message)

{
  "type": "heartbeat",
  "agent": "yunce",
  "status": "idle",
  "currentTask": null,
  "timestamp": "2026-03-17T10:30:00Z"
}

五、实现步骤

5.1 RabbitMQ 部署

title: 星枢 Agent 任务解耦技术方案
author: shenwei
---
---
title: 星枢 Agent 任务解耦技术方案
source:
author: shenwei
published:
created:
description:
tags: []
---

# Docker 部署
docker run -d \
  --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=your_password \
  rabbitmq:3.12-management

# 访问管理界面
# http://your-server:15672

5.2 创建 Exchange 和 Queue (初始化脚本)

# setup_rabbitmq.py
import pika

def setup_rabbitmq():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost', port=5672)
    )
    channel = connection.channel()
    
    # 1. 创建 Exchange
    channel.exchange_declare(exchange='task_exchange', exchange_type='topic', durable=True)
    channel.exchange_declare(exchange='result_exchange', exchange_type='topic', durable=True)
    
    # 2. 创建任务队列 (按 Agent)
    agents = ['yunce', 'atlas', 'prometheus', 'oracle']
    for agent in agents:
        channel.queue_declare(queue=f'queue.{agent}', durable=True)
        channel.queue_bind(
            exchange='task_exchange',
            queue=f'queue.{agent}',
            routing_key=f'task.{agent}'
        )
    
    # 3. 创建星枢结果聚合队列
    channel.queue_declare(queue='queue.star', durable=True)
    channel.queue_bind(
        exchange='result_exchange',
        queue='queue.star',
        routing_key='result.#'
    )
    
    connection.close()
    print("✅ RabbitMQ 初始化完成")

if __name__ == '__main__':
    setup_rabbitmq()

5.3 星枢任务下发模块

# star_sender.py
import pika
import json
import uuid
from datetime import datetime

class StarTaskSender:
    def __init__(self, rabbitmq_host='localhost'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=rabbitmq_host)
        )
        self.channel = self.connection.channel()
        
    def send_task(self, target_agent, action, params, priority='normal'):
        task_id = f"task_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
        
        message = {
            "taskId": task_id,
            "type": "task",
            "source": "xingyao",
            "target": target_agent,
            "priority": priority,
            "content": {
                "action": action,
                "params": params
            },
            "metadata": {
                "createdAt": datetime.now().isoformat() + "Z",
                "retryCount": 0,
                "maxRetries": 3
            }
        }
        
        self.channel.basic_publish(
            exchange='task_exchange',
            routing_key=f'task.{target_agent}',
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2,  # 持久化
                priority=10 if priority == 'high' else 5
            )
        )
        
        print(f"✅ 任务已下发: {task_id} -> {target_agent}")
        return task_id
    
    def close(self):
        self.connection.close()

# 使用示例
if __name__ == '__main__':
    sender = StarTaskSender()
    
    # 下发任务给 Yunce
    task_id = sender.send_task(
        target_agent='yunce',
        action='code_review',
        params={'repo': 'my-project', 'branch': 'main'},
        priority='high'
    )
    
    sender.close()

5.4 Agent 任务监听模块

# agent_listener.py
import pika
import json
import subprocess
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class AgentListener:
    def __init__(self, agent_name, rabbitmq_host='localhost'):
        self.agent_name = agent_name
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=rabbitmq_host)
        )
        self.channel = self.connection.channel()
        
    def execute_task(self, task_content):
        """执行任务的核心逻辑"""
        action = task_content['action']
        params = task_content['params']
        
        logger.info(f"执行任务: {action}")
        
        # 根据 action 调用不同的处理函数
        handlers = {
            'code_review': self.handle_code_review,
            'data_analysis': self.handle_data_analysis,
            'file_operation': self.handle_file_operation,
        }
        
        handler = handlers.get(action, self.handle_default)
        return handler(params)
    
    def handle_code_review(self, params):
        # 调用 OpenClaw agent
        result = subprocess.run(
            ['openclaw', 'agent', '--agent', 'yunce', 
             '--message', f"请审查代码仓库 {params.get('repo')}"],
            capture_output=True, text=True
        )
        return {'output': result.stdout, 'status': 'success'}
    
    def handle_default(self, params):
        return {'message': f'Unknown action: {params}'}
    
    def on_message(self, ch, method, properties, body):
        """消息处理回调"""
        try:
            message = json.loads(body)
            task_id = message['taskId']
            
            logger.info(f"收到任务: {task_id}")
            
            # 执行任务
            result = self.execute_task(message['content'])
            
            # 发送结果
            self.send_result(task_id, result)
            
            # ACK 确认
            ch.basic_ack(delivery_tag=method.delivery_tag)
            
        except Exception as e:
            logger.error(f"任务执行失败: {e}")
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
    
    def send_result(self, task_id, result):
        """发送结果到星枢"""
        result_message = {
            "taskId": task_id,
            "type": "result",
            "source": self.agent_name,
            "target": "xingyao",
            "status": "success",
            "content": result,
            "metadata": {
                "completedAt": datetime.now().isoformat() + "Z"
            }
        }
        
        self.channel.basic_publish(
            exchange='result_exchange',
            routing_key=f'result.{self.agent_name}',
            body=json.dumps(result_message),
            properties=pika.BasicProperties(delivery_mode=2)
        )
    
    def start_listening(self):
        """开始监听任务队列"""
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            queue=f'queue.{self.agent_name}',
            on_message_callback=self.on_message
        )
        
        logger.info(f"🤖 Agent [{self.agent_name}] 开始监听任务队列...")
        self.channel.start_consuming()

# 使用示例
if __name__ == '__main__':
    import sys
    agent_name = sys.argv[1] if len(sys.argv) > 1 else 'yunce'
    listener = AgentListener(agent_name)
    listener.start_listening()

5.5 星枢结果收集模块

# star_receiver.py
import pika
import json
from datetime import datetime

class StarResultReceiver:
    def __init__(self, rabbitmq_host='localhost'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=rabbitmq_host)
        )
        self.channel = self.connection.channel()
        self.pending_tasks = {}  # 跟踪待处理任务
        
    def on_message(self, ch, method, properties, body):
        message = json.loads(body)
        
        if message['type'] == 'result':
            task_id = message['taskId']
            status = message['status']
            result = message['content']
            
            print(f"📋 任务完成: {task_id}")
            print(f"   状态: {status}")
            print(f"   结果: {result}")
            
            # 更新任务状态
            if task_id in self.pending_tasks:
                self.pending_tasks[task_id]['status'] = 'completed'
                self.pending_tasks[task_id]['result'] = result
                
            # 可以触发后续任务
            self.handle_next_action(message)
        
        elif message['type'] == 'heartbeat':
            print(f"💓 Agent 心跳: {message['agent']} - {message['status']}")
        
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    def handle_next_action(self, message):
        """根据结果触发后续动作"""
        # 示例:根据结果发送新任务
        pass
    
    def start_listening(self):
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            queue='queue.star',
            on_message_callback=self.on_message
        )
        
        print("🌟 星枢开始监听任务结果...")
        self.channel.start_consuming()

# 使用示例
if __name__ == '__main__':
    receiver = StarResultReceiver()
    receiver.start_listening()

六、监控界面

6.1 RabbitMQ 管理界面

URL: http://localhost:15672
用户名: admin
密码: your_password

可查看:
- 队列状态 (Messages, Ready, Unacked)
- 连接数
- 消息流速
- 交换机绑定

6.2 自定义监控面板 (可选)

# 简单的任务状态查询
def get_task_status(task_id):
    # 可以通过 REST API 查询
    # 或者维护一个 Redis 状态缓存
    pass

def list_pending_tasks():
    # 列出所有待处理任务
    pass

def list_agent_status():
    # 列出所有 Agent 状态
    pass

七、完整工作流程示例

┌─────────────────────────────────────────────────────────────────────────┐
│                           完整示例:代码审查任务                          │
└─────────────────────────────────────────────────────────────────────────┘

[用户]
 │
 │ "星枢,帮我审查 my-project 的 main 分支"
 ▼
[星枢 - 意图理解]
 │ action: code_review
 │ target: yunce
 │ params: {repo: "my-project", branch: "main"}
 ▼
[星枢 - 任务下发]
 │ RabbitMQ: task.yunce
 │ taskId: task_20260317_001
 ▼
[RabbitMQ] (持久化消息)
 ▼
[Yunce Agent - 任务监听]
 │ 收到任务 -> 执行 code_review
 │ 调用: openclaw agent --agent yunce --message "审查 my-project"
 ▼
[Yunce Agent - 返回结果]
 │ RabbitMQ: result.yunce
 │ status: success, findings: [...]
 ▼
[RabbitMQ] 
 │ result.# -> queue.star
 ▼
[星枢 - 结果收集]
 │ 接收结果 -> 更新状态
 │ 格式化输出 -> 推送给用户
 ▼
[用户]
 │ 收到审查报告

八、部署建议

8.1 生产环境配置

# docker-compose.yml
version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3.12-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "check_running"]
      interval: 30s

volumes:
  rabbitmq_data:

8.2 安全建议

  1. 认证:启用 RabbitMQ 用户认证
  2. SSL/TLS:生产环境启用 amqps
  3. VHost:不同项目使用不同 vhost
  4. 权限:最小权限原则

九、故障处理

故障场景 解决方案
Agent 宕机 任务自动重新入队 (requeue)
RabbitMQ 宕机 消息持久化,重启后恢复
任务超时 设置 TTL自动移到死信队列
消息积压 监控队列长度,扩展消费者

十、进阶功能

10.1 延迟任务

# 延迟队列:让任务在指定时间后执行
def send_delayed_task(target, action, delay_seconds):
    # 使用 RabbitMQ 延迟插件 或 配合 Redis 实现
    pass

10.2 优先级队列

# 高优先级任务优先处理
channel.queue_declare(queue='queue.yunce', arguments={
    'x-max-priority': 10
})

10.3 任务超时

# 消息 TTL + 死信队列
channel.queue_declare(
    queue='queue.yunce',
    arguments={
        'x-message-ttl': 3600000,  # 1小时
        'x-dead-letter-exchange': 'dlx_exchange'
    }
)

附录:文件清单

文件 说明
setup_rabbitmq.py RabbitMQ 初始化脚本
star_sender.py 星枢任务下发模块
agent_listener.py Agent 任务监听模块
star_receiver.py 星枢结果收集模块
docker-compose.yml 一键部署配置

文档版本: 1.0
创建时间: 2026-03-17
作者: 云策