first build nexus
This commit is contained in:
161
#recycle/openclaw/yunce/star-tasksender/SKILL.md
Normal file
161
#recycle/openclaw/yunce/star-tasksender/SKILL.md
Normal file
@@ -0,0 +1,161 @@
|
||||
---
|
||||
name: star-tasksender
|
||||
description: 星枢任务分发器 - 解析用户指令并发送到 RabbitMQ 队列
|
||||
metadata:
|
||||
version: 1.0.0
|
||||
author: 云策
|
||||
created: 2026-03-17
|
||||
---
|
||||
|
||||
# Star Task Sender - 星枢任务分发器
|
||||
|
||||
将星枢收到的用户指令解析为标准消息格式,并发送到 RabbitMQ 队列。
|
||||
|
||||
## 功能
|
||||
|
||||
- **意图解析**: 将自然语言指令转换为结构化任务
|
||||
- **消息构建**: 生成标准 JSON 消息格式
|
||||
- **队列发送**: 通过 RabbitMQ 分发给子 Agent
|
||||
|
||||
## 文件结构
|
||||
|
||||
```
|
||||
star-tasksender/
|
||||
├── SKILL.md
|
||||
└── scripts/
|
||||
├── intent_parser.py # 意图解析模块
|
||||
├── message_builder.py # 消息构建模块
|
||||
├── rabbitmq_sender.py # RabbitMQ 发送模块
|
||||
└── __init__.py
|
||||
```
|
||||
|
||||
## 使用方法
|
||||
|
||||
### 1. 意图解析
|
||||
|
||||
```python
|
||||
from scripts.intent_parser import IntentParser
|
||||
|
||||
parser = IntentParser()
|
||||
|
||||
# 解析用户指令
|
||||
intent = parser.parse_intent("帮我审查 my-project 仓库")
|
||||
# 结果: {
|
||||
# "action": "code_review",
|
||||
# "target": "yunce",
|
||||
# "params": {"repo": "my-project", "branch": "main"}
|
||||
# }
|
||||
```
|
||||
|
||||
### 2. 构建消息
|
||||
|
||||
```python
|
||||
from scripts.message_builder import MessageBuilder
|
||||
from scripts.intent_parser import IntentParser
|
||||
|
||||
parser = IntentParser()
|
||||
intent = parser.parse_intent("帮我审查 my-project")
|
||||
|
||||
builder = MessageBuilder()
|
||||
message = builder.build_task_message(intent)
|
||||
# 结果: 标准任务消息 JSON
|
||||
```
|
||||
|
||||
### 3. 发送到 RabbitMQ
|
||||
|
||||
```python
|
||||
from scripts.rabbitmq_sender import RabbitMQSender
|
||||
from scripts.message_builder import MessageBuilder
|
||||
|
||||
# 构建消息
|
||||
builder = MessageBuilder()
|
||||
message = builder.build_task_message(intent)
|
||||
|
||||
# 发送
|
||||
sender = RabbitMQSender(host='192.168.1.100', username='admin', password='password')
|
||||
task_id = sender.send_task(message)
|
||||
sender.close()
|
||||
```
|
||||
|
||||
### 4. 完整流程
|
||||
|
||||
```python
|
||||
from scripts.intent_parser import IntentParser
|
||||
from scripts.message_builder import MessageBuilder
|
||||
from scripts.rabbitmq_sender import RabbitMQSender
|
||||
|
||||
def process_user_command(user_input):
|
||||
# 1. 解析
|
||||
parser = IntentParser()
|
||||
intent = parser.parse_intent(user_input)
|
||||
|
||||
# 2. 构建
|
||||
builder = MessageBuilder()
|
||||
message = builder.build_task_message(intent)
|
||||
|
||||
# 3. 发送
|
||||
sender = RabbitMQSender()
|
||||
task_id = sender.send_task(message)
|
||||
sender.close()
|
||||
|
||||
return f"✅ 任务已下发: {task_id}"
|
||||
```
|
||||
|
||||
## 支持的指令模式
|
||||
|
||||
| 用户指令 | action | target | params |
|
||||
|----------|--------|--------|--------|
|
||||
| 审查 xxx 仓库 | code_review | yunce | {repo: "xxx", branch: "main"} |
|
||||
| 部署 xxx 到生产 | deploy | prometheus | {service: "xxx", env: "prod"} |
|
||||
| 检查 xxx 状态 | status_check | atlas | {target: "xxx"} |
|
||||
| 分析 xxx 数据 | data_analysis | atlas | {data: "xxx"} |
|
||||
|
||||
## RabbitMQ 配置
|
||||
|
||||
默认配置 (可在 rabbitmq_sender.py 中修改):
|
||||
|
||||
```python
|
||||
{
|
||||
"host": "localhost",
|
||||
"port": 5672,
|
||||
"username": "admin",
|
||||
"password": "password",
|
||||
"exchange": "task_exchange"
|
||||
}
|
||||
```
|
||||
|
||||
## 消息格式
|
||||
|
||||
### 输入: 用户指令
|
||||
```
|
||||
"帮我审查 my-project 仓库"
|
||||
```
|
||||
|
||||
### 输出: RabbitMQ 消息
|
||||
```json
|
||||
{
|
||||
"taskId": "task_20260317_135600_a1b2c3",
|
||||
"type": "task",
|
||||
"source": "xingyao",
|
||||
"target": "yunce",
|
||||
"priority": "high",
|
||||
"content": {
|
||||
"action": "code_review",
|
||||
"params": {
|
||||
"repo": "my-project",
|
||||
"branch": "main"
|
||||
}
|
||||
},
|
||||
"metadata": {
|
||||
"createdAt": "2026-03-17T13:56:00Z",
|
||||
"retryCount": 0,
|
||||
"maxRetries": 3
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 注意事项
|
||||
|
||||
1. **首次使用**: 需要在 `rabbitmq_sender.py` 中配置 RabbitMQ 连接信息
|
||||
2. **权限**: 确保 RabbitMQ 用户有写权限到 task_exchange
|
||||
3. **错误处理**: 发送失败时会抛出异常,需要调用方处理
|
||||
191
#recycle/openclaw/yunce/star-tasksender/scripts/intent_parser.py
Normal file
191
#recycle/openclaw/yunce/star-tasksender/scripts/intent_parser.py
Normal file
@@ -0,0 +1,191 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
意图解析模块 - Intent Parser
|
||||
将自然语言指令转换为结构化任务
|
||||
"""
|
||||
|
||||
import re
|
||||
from typing import Dict, Optional
|
||||
|
||||
|
||||
class IntentParser:
|
||||
"""意图解析器"""
|
||||
|
||||
# 指令模式定义
|
||||
PATTERNS = {
|
||||
"code_review": {
|
||||
"keywords": ["审查", "review", "代码审查", "review code"],
|
||||
"target": "yunce",
|
||||
"action": "code_review",
|
||||
"param_extractor": "extract_repo_info"
|
||||
},
|
||||
"deploy": {
|
||||
"keywords": ["部署", "deploy", "发布"],
|
||||
"target": "prometheus",
|
||||
"action": "deploy",
|
||||
"param_extractor": "extract_deploy_info"
|
||||
},
|
||||
"status_check": {
|
||||
"keywords": ["检查状态", "status", "查看状态", "状态"],
|
||||
"target": "atlas",
|
||||
"action": "status_check",
|
||||
"param_extractor": "extract_target"
|
||||
},
|
||||
"data_analysis": {
|
||||
"keywords": ["分析", "analysis", "数据分析"],
|
||||
"target": "atlas",
|
||||
"action": "data_analysis",
|
||||
"param_extractor": "extract_data_info"
|
||||
},
|
||||
"file_operation": {
|
||||
"keywords": ["复制", "移动", "删除", "copy", "move", "delete"],
|
||||
"target": "oracle",
|
||||
"action": "file_operation",
|
||||
"param_extractor": "extract_file_info"
|
||||
}
|
||||
}
|
||||
|
||||
def parse_intent(self, user_input: str) -> Dict:
|
||||
"""
|
||||
解析用户输入,返回结构化意图
|
||||
|
||||
Args:
|
||||
user_input: 用户输入的自然语言
|
||||
|
||||
Returns:
|
||||
{
|
||||
"action": "code_review",
|
||||
"target": "yunce",
|
||||
"params": {...}
|
||||
}
|
||||
"""
|
||||
user_input = user_input.strip()
|
||||
|
||||
# 遍历所有模式,匹配关键词
|
||||
for pattern_name, pattern_config in self.PATTERNS.items():
|
||||
for keyword in pattern_config["keywords"]:
|
||||
if keyword in user_input:
|
||||
# 提取参数
|
||||
extractor_name = pattern_config["param_extractor"]
|
||||
extractor = getattr(self, extractor_name)
|
||||
params = extractor(user_input)
|
||||
|
||||
return {
|
||||
"action": pattern_config["action"],
|
||||
"target": pattern_config["target"],
|
||||
"params": params
|
||||
}
|
||||
|
||||
# 无法识别
|
||||
return {
|
||||
"error": "无法理解指令",
|
||||
"original_input": user_input
|
||||
}
|
||||
|
||||
def extract_repo_info(self, text: str) -> Dict:
|
||||
"""从文本中提取仓库信息"""
|
||||
result = {}
|
||||
|
||||
# 提取仓库名
|
||||
repo_match = re.search(r'(?:仓库|repo|项目)[:\s]*(\S+)', text)
|
||||
if not repo_match:
|
||||
repo_match = re.search(r'(?:审查|review)\s+(\S+)', text)
|
||||
|
||||
if repo_match:
|
||||
result["repo"] = repo_match.group(1).strip(',。')
|
||||
|
||||
# 提取分支
|
||||
branch_match = re.search(r'(?:分支|branch)[:\s]*(\S+)', text)
|
||||
if branch_match:
|
||||
result["branch"] = branch_match.group(1).strip(',。')
|
||||
else:
|
||||
result["branch"] = "main"
|
||||
|
||||
return result
|
||||
|
||||
def extract_deploy_info(self, text: str) -> Dict:
|
||||
"""从文本中提取部署信息"""
|
||||
result = {}
|
||||
|
||||
# 提取服务名
|
||||
service_match = re.search(r'(?:服务|service|部署)[:\s]*(\S+)', text)
|
||||
if not service_match:
|
||||
service_match = re.search(r'部署\s+(\S+)', text)
|
||||
|
||||
if service_match:
|
||||
result["service"] = service_match.group(1).strip(',。')
|
||||
|
||||
# 提取环境
|
||||
if "生产" in text or "prod" in text.lower():
|
||||
result["env"] = "prod"
|
||||
elif "测试" in text or "test" in text.lower():
|
||||
result["env"] = "test"
|
||||
else:
|
||||
result["env"] = "dev"
|
||||
|
||||
return result
|
||||
|
||||
def extract_target(self, text: str) -> Dict:
|
||||
"""提取目标信息"""
|
||||
result = {}
|
||||
|
||||
# 尝试提取目标名称
|
||||
target_match = re.search(r'(?:服务器|server|目标)[:\s]*(\S+)', text)
|
||||
if not target_match:
|
||||
target_match = re.search(r'检查\s+(\S+)', text)
|
||||
|
||||
if target_match:
|
||||
result["target"] = target_match.group(1).strip(',。')
|
||||
|
||||
return result
|
||||
|
||||
def extract_data_info(self, text: str) -> Dict:
|
||||
"""提取数据分析信息"""
|
||||
result = {}
|
||||
|
||||
# 提取数据源
|
||||
data_match = re.search(r'(?:数据|data|分析)[:\s]*(\S+)', text)
|
||||
if not data_match:
|
||||
data_match = re.search(r'分析\s+(\S+)', text)
|
||||
|
||||
if data_match:
|
||||
result["data"] = data_match.group(1).strip(',。')
|
||||
|
||||
return result
|
||||
|
||||
def extract_file_info(self, text: str) -> Dict:
|
||||
"""提取文件操作信息"""
|
||||
result = {}
|
||||
|
||||
# 提取文件路径
|
||||
path_match = re.search(r'[:\s](\S+\.\S+)', text)
|
||||
if path_match:
|
||||
result["path"] = path_match.group(1)
|
||||
|
||||
# 提取操作类型
|
||||
if "复制" in text or "copy" in text.lower():
|
||||
result["operation"] = "copy"
|
||||
elif "移动" in text or "move" in text.lower():
|
||||
result["operation"] = "move"
|
||||
elif "删除" in text or "delete" in text.lower():
|
||||
result["operation"] = "delete"
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# 测试
|
||||
if __name__ == "__main__":
|
||||
parser = IntentParser()
|
||||
|
||||
test_cases = [
|
||||
"帮我审查 my-project 仓库",
|
||||
"部署 test-server 到生产",
|
||||
"检查服务器状态",
|
||||
"分析销售数据"
|
||||
]
|
||||
|
||||
for test in test_cases:
|
||||
result = parser.parse_intent(test)
|
||||
print(f"输入: {test}")
|
||||
print(f"输出: {result}")
|
||||
print("-" * 40)
|
||||
@@ -0,0 +1,204 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
消息构建模块 - Message Builder
|
||||
将意图解析结果转换为标准的 RabbitMQ 消息格式
|
||||
"""
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from typing import Dict, Optional
|
||||
|
||||
|
||||
class MessageBuilder:
|
||||
"""消息构建器"""
|
||||
|
||||
# 高优先级动作
|
||||
HIGH_PRIORITY_ACTIONS = ["code_review", "deploy", "security_check", "data_analysis"]
|
||||
|
||||
# 默认超时时间 (毫秒)
|
||||
DEFAULT_TIMEOUT = 3600000 # 1小时
|
||||
|
||||
def __init__(self, source: str = "xingyao", default_timeout: int = None):
|
||||
"""
|
||||
初始化消息构建器
|
||||
|
||||
Args:
|
||||
source: 消息来源 Agent 名称
|
||||
default_timeout: 默认超时时间 (毫秒)
|
||||
"""
|
||||
self.source = source
|
||||
self.default_timeout = default_timeout or self.DEFAULT_TIMEOUT
|
||||
|
||||
def build_task_message(self, intent: Dict, priority: str = None) -> Dict:
|
||||
"""
|
||||
构建任务消息
|
||||
|
||||
Args:
|
||||
intent: 意图解析结果
|
||||
priority: 优先级 (high/normal/low),如果为 None 则自动判断
|
||||
|
||||
Returns:
|
||||
标准任务消息 JSON
|
||||
"""
|
||||
if "error" in intent:
|
||||
return {"error": intent["error"]}
|
||||
|
||||
# 自动判断优先级
|
||||
if priority is None:
|
||||
priority = self._detect_priority(intent.get("action", ""))
|
||||
|
||||
# 生成任务ID
|
||||
task_id = self._generate_task_id()
|
||||
|
||||
message = {
|
||||
"taskId": task_id,
|
||||
"type": "task",
|
||||
"source": self.source,
|
||||
"target": intent["target"],
|
||||
"priority": priority,
|
||||
"content": {
|
||||
"action": intent["action"],
|
||||
"params": intent.get("params", {})
|
||||
},
|
||||
"metadata": {
|
||||
"createdAt": datetime.now().isoformat() + "Z",
|
||||
"expireAt": None,
|
||||
"retryCount": 0,
|
||||
"maxRetries": 3,
|
||||
"timeout": self.default_timeout
|
||||
}
|
||||
}
|
||||
|
||||
return message
|
||||
|
||||
def build_result_message(self, task_id: str, target: str,
|
||||
status: str, content: Dict) -> Dict:
|
||||
"""
|
||||
构建结果消息 (由子 Agent 使用)
|
||||
|
||||
Args:
|
||||
task_id: 原始任务ID
|
||||
target: 目标 Agent (通常是星枢)
|
||||
status: 执行状态 (success/error/partial)
|
||||
content: 结果内容
|
||||
|
||||
Returns:
|
||||
标准结果消息 JSON
|
||||
"""
|
||||
message = {
|
||||
"taskId": task_id,
|
||||
"type": "result",
|
||||
"source": self.source,
|
||||
"target": target,
|
||||
"status": status,
|
||||
"content": content,
|
||||
"metadata": {
|
||||
"completedAt": datetime.now().isoformat() + "Z"
|
||||
}
|
||||
}
|
||||
|
||||
return message
|
||||
|
||||
def build_heartbeat_message(self, agent_name: str,
|
||||
status: str = "idle",
|
||||
current_task: str = None) -> Dict:
|
||||
"""
|
||||
构建心跳消息
|
||||
|
||||
Args:
|
||||
agent_name: Agent 名称
|
||||
status: 当前状态 (idle/busy/error)
|
||||
current_task: 当前执行的任务ID
|
||||
|
||||
Returns:
|
||||
心跳消息 JSON
|
||||
"""
|
||||
message = {
|
||||
"type": "heartbeat",
|
||||
"agent": agent_name,
|
||||
"status": status,
|
||||
"currentTask": current_task,
|
||||
"timestamp": datetime.now().isoformat() + "Z"
|
||||
}
|
||||
|
||||
return message
|
||||
|
||||
def build_error_message(self, task_id: str, target: str,
|
||||
error: str, details: Dict = None) -> Dict:
|
||||
"""
|
||||
构建错误消息
|
||||
|
||||
Args:
|
||||
task_id: 任务ID
|
||||
target: 目标 Agent
|
||||
error: 错误描述
|
||||
details: 错误详情
|
||||
|
||||
Returns:
|
||||
错误消息 JSON
|
||||
"""
|
||||
message = {
|
||||
"taskId": task_id,
|
||||
"type": "error",
|
||||
"source": self.source,
|
||||
"target": target,
|
||||
"error": error,
|
||||
"content": details or {},
|
||||
"metadata": {
|
||||
"occurredAt": datetime.now().isoformat() + "Z"
|
||||
}
|
||||
}
|
||||
|
||||
return message
|
||||
|
||||
def _detect_priority(self, action: str) -> str:
|
||||
"""根据动作自动判断优先级"""
|
||||
return "high" if action in self.HIGH_PRIORITY_ACTIONS else "normal"
|
||||
|
||||
def _generate_task_id(self) -> str:
|
||||
"""生成唯一任务ID"""
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
unique_id = uuid.uuid4().hex[:6]
|
||||
return f"task_{timestamp}_{unique_id}"
|
||||
|
||||
def to_json(self, message: Dict) -> str:
|
||||
"""转换为 JSON 字符串"""
|
||||
return json.dumps(message, ensure_ascii=False, indent=2)
|
||||
|
||||
def from_json(self, json_str: str) -> Dict:
|
||||
"""从 JSON 字符串解析"""
|
||||
return json.loads(json_str)
|
||||
|
||||
|
||||
# 测试
|
||||
if __name__ == "__main__":
|
||||
builder = MessageBuilder()
|
||||
|
||||
# 测试任务消息
|
||||
intent = {
|
||||
"action": "code_review",
|
||||
"target": "yunce",
|
||||
"params": {"repo": "my-project", "branch": "main"}
|
||||
}
|
||||
|
||||
task_msg = builder.build_task_message(intent)
|
||||
print("任务消息:")
|
||||
print(builder.to_json(task_msg))
|
||||
print("-" * 40)
|
||||
|
||||
# 测试心跳消息
|
||||
heartbeat = builder.build_heartbeat_message("yunce", "idle")
|
||||
print("心跳消息:")
|
||||
print(builder.to_json(heartbeat))
|
||||
print("-" * 40)
|
||||
|
||||
# 测试结果消息
|
||||
result_msg = builder.build_result_message(
|
||||
task_id=task_msg["taskId"],
|
||||
target="xingyao",
|
||||
status="success",
|
||||
content={"summary": "审查完成", "findings": []}
|
||||
)
|
||||
print("结果消息:")
|
||||
print(builder.to_json(result_msg))
|
||||
@@ -0,0 +1,248 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
RabbitMQ 发送模块 - RabbitMQ Sender
|
||||
连接到 RabbitMQ 并发送消息
|
||||
"""
|
||||
|
||||
import pika
|
||||
import json
|
||||
import logging
|
||||
from typing import Dict, Optional
|
||||
from datetime import datetime
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RabbitMQSender:
|
||||
"""RabbitMQ 消息发送器"""
|
||||
|
||||
DEFAULT_CONFIG = {
|
||||
"host": "192.168.3.189",
|
||||
"port": 5672,
|
||||
"username": "guest",
|
||||
"password": "guest",
|
||||
"exchange": "task_exchange",
|
||||
"result_exchange": "result_exchange",
|
||||
"heartbeat": 600,
|
||||
"blocked_connection_timeout": 300
|
||||
}
|
||||
|
||||
def __init__(self, config: Dict = None):
|
||||
"""
|
||||
初始化 RabbitMQ 发送器
|
||||
|
||||
Args:
|
||||
config: RabbitMQ 配置,包含 host, port, username, password 等
|
||||
"""
|
||||
self.config = {**self.DEFAULT_CONFIG, **(config or {})}
|
||||
self.connection = None
|
||||
self.channel = None
|
||||
self._connect()
|
||||
|
||||
def _connect(self):
|
||||
"""建立 RabbitMQ 连接"""
|
||||
credentials = pika.PlainCredentials(
|
||||
self.config["username"],
|
||||
self.config["password"]
|
||||
)
|
||||
|
||||
parameters = pika.ConnectionParameters(
|
||||
host=self.config["host"],
|
||||
port=self.config["port"],
|
||||
credentials=credentials,
|
||||
heartbeat=self.config["heartbeat"],
|
||||
blocked_connection_timeout=self.config["blocked_connection_timeout"]
|
||||
)
|
||||
|
||||
try:
|
||||
self.connection = pika.BlockingConnection(parameters)
|
||||
self.channel = self.connection.channel()
|
||||
|
||||
# 声明交换机
|
||||
self.channel.exchange_declare(
|
||||
exchange=self.config["exchange"],
|
||||
exchange_type="topic",
|
||||
durable=True
|
||||
)
|
||||
self.channel.exchange_declare(
|
||||
exchange=self.config["result_exchange"],
|
||||
exchange_type="topic",
|
||||
durable=True
|
||||
)
|
||||
|
||||
logger.info(f"✅ RabbitMQ 连接成功: {self.config['host']}:{self.config['port']}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ RabbitMQ 连接失败: {e}")
|
||||
raise
|
||||
|
||||
def send_task(self, message: Dict, priority: str = None) -> str:
|
||||
"""
|
||||
发送任务消息
|
||||
|
||||
Args:
|
||||
message: 任务消息 (由 MessageBuilder 构建)
|
||||
priority: 优先级覆盖 (high/normal/low)
|
||||
|
||||
Returns:
|
||||
task_id
|
||||
"""
|
||||
if "error" in message:
|
||||
raise ValueError(f"无法发送错误消息: {message['error']}")
|
||||
|
||||
target = message.get("target", "unknown")
|
||||
task_id = message.get("taskId", "unknown")
|
||||
routing_key = f"task.{target}"
|
||||
|
||||
# 确定优先级
|
||||
priority_value = self._get_priority_value(
|
||||
priority or message.get("priority", "normal")
|
||||
)
|
||||
|
||||
try:
|
||||
self.channel.basic_publish(
|
||||
exchange=self.config["exchange"],
|
||||
routing_key=routing_key,
|
||||
body=json.dumps(message, ensure_ascii=False),
|
||||
properties=pika.BasicProperties(
|
||||
delivery_mode=2, # 消息持久化
|
||||
content_type="application/json",
|
||||
priority=priority_value,
|
||||
timestamp=int(datetime.now().timestamp())
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(f"✅ 任务已发送: {task_id} -> {target} (routing: {routing_key})")
|
||||
return task_id
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ 发送任务失败: {e}")
|
||||
raise
|
||||
|
||||
def send_result(self, message: Dict) -> str:
|
||||
"""
|
||||
发送结果消息
|
||||
|
||||
Args:
|
||||
message: 结果消息 (由 MessageBuilder 构建)
|
||||
|
||||
Returns:
|
||||
task_id
|
||||
"""
|
||||
source = message.get("source", "unknown")
|
||||
task_id = message.get("taskId", "unknown")
|
||||
routing_key = f"result.{source}"
|
||||
|
||||
try:
|
||||
self.channel.basic_publish(
|
||||
exchange=self.config["result_exchange"],
|
||||
routing_key=routing_key,
|
||||
body=json.dumps(message, ensure_ascii=False),
|
||||
properties=pika.BasicProperties(
|
||||
delivery_mode=2,
|
||||
content_type="application/json"
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(f"✅ 结果已发送: {task_id} from {source}")
|
||||
return task_id
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ 发送结果失败: {e}")
|
||||
raise
|
||||
|
||||
def send_heartbeat(self, message: Dict) -> bool:
|
||||
"""
|
||||
发送心跳消息
|
||||
|
||||
Args:
|
||||
message: 心跳消息
|
||||
|
||||
Returns:
|
||||
是否发送成功
|
||||
"""
|
||||
agent = message.get("agent", "unknown")
|
||||
routing_key = f"heartbeat.{agent}"
|
||||
|
||||
try:
|
||||
self.channel.basic_publish(
|
||||
exchange=self.config["exchange"],
|
||||
routing_key=routing_key,
|
||||
body=json.dumps(message, ensure_ascii=False),
|
||||
properties=pika.BasicProperties(
|
||||
delivery_mode=2,
|
||||
content_type="application/json"
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug(f"💓 心跳已发送: {agent}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ 发送心跳失败: {e}")
|
||||
return False
|
||||
|
||||
def _get_priority_value(self, priority: str) -> int:
|
||||
"""将优先级字符串转换为 RabbitMQ 数值"""
|
||||
priority_map = {
|
||||
"high": 10,
|
||||
"normal": 5,
|
||||
"low": 1
|
||||
}
|
||||
return priority_map.get(priority.lower(), 5)
|
||||
|
||||
def is_connected(self) -> bool:
|
||||
"""检查连接状态"""
|
||||
return self.connection is not None and self.connection.is_open
|
||||
|
||||
def reconnect(self):
|
||||
"""重新连接"""
|
||||
self.close()
|
||||
self._connect()
|
||||
|
||||
def close(self):
|
||||
"""关闭连接"""
|
||||
if self.connection and self.connection.is_open:
|
||||
self.connection.close()
|
||||
logger.info("🔌 RabbitMQ 连接已关闭")
|
||||
|
||||
|
||||
# 便捷函数
|
||||
def send_task_quick(message: Dict, config: Dict = None) -> str:
|
||||
"""
|
||||
快速发送任务 (自动创建和关闭连接)
|
||||
|
||||
Args:
|
||||
message: 任务消息
|
||||
config: RabbitMQ 配置
|
||||
|
||||
Returns:
|
||||
task_id
|
||||
"""
|
||||
sender = RabbitMQSender(config)
|
||||
task_id = sender.send_task(message)
|
||||
sender.close()
|
||||
return task_id
|
||||
|
||||
|
||||
# 测试
|
||||
if __name__ == "__main__":
|
||||
from message_builder import MessageBuilder
|
||||
from intent_parser import IntentParser
|
||||
|
||||
# 解析意图
|
||||
parser = IntentParser()
|
||||
intent = parser.parse_intent("帮我审查 my-project 仓库")
|
||||
|
||||
# 构建消息
|
||||
builder = MessageBuilder()
|
||||
message = builder.build_task_message(intent)
|
||||
|
||||
# 发送 (需要配置实际的 RabbitMQ 地址)
|
||||
# sender = RabbitMQSender({"host": "192.168.1.100", "username": "admin", "password": "password"})
|
||||
# task_id = sender.send_task(message)
|
||||
# sender.close()
|
||||
|
||||
print("消息构建成功:")
|
||||
print(builder.to_json(message))
|
||||
709
#recycle/openclaw/yunce/星枢-Agent任务解耦方案.md
Normal file
709
#recycle/openclaw/yunce/星枢-Agent任务解耦方案.md
Normal file
@@ -0,0 +1,709 @@
|
||||
# 星枢 Agent 任务解耦技术方案
|
||||
|
||||
> 基于 RabbitMQ 的分布式任务队列架构
|
||||
|
||||
---
|
||||
|
||||
## 一、概述
|
||||
|
||||
### 背景
|
||||
|
||||
当前星枢(主 Agent)与其他 Agent 的通信方式:
|
||||
|
||||
| 方式 | 命令 | 局限 |
|
||||
|------|------|------|
|
||||
| 本地 | `openclaw agent --agent xingyao --message "..." --deliver` | 同步等待 |
|
||||
| 远程 | `ssh ubuntu2 "openclaw agent --agent yunce --message ..."` | 串行阻塞 |
|
||||
|
||||
### 目标
|
||||
|
||||
- **异步执行**:任务下发不等待结果
|
||||
- **任务持久化**:重启不丢失
|
||||
- **可监控**:实时查看任务状态
|
||||
- **可扩展**:支持多 Agent 并行
|
||||
|
||||
---
|
||||
|
||||
## 二、技术选型
|
||||
|
||||
### RabbitMQ vs 其他
|
||||
|
||||
| 特性 | RabbitMQ | Redis Streams | Kafka |
|
||||
|------|----------|---------------|-------|
|
||||
| 消息确认 | ✅ ACK | ✅ ACK | ✅ ACK |
|
||||
| 优先级队列 | ✅ | ❌ | ❌ |
|
||||
| 延迟队列 | ✅ (插件) | ✅ | ❌ |
|
||||
| 持久化 | ✅ | ✅ | ✅ |
|
||||
| 集群 | ✅ | 有限 | ✅ |
|
||||
| 生态成熟度 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
|
||||
| 轻量级 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐ |
|
||||
|
||||
**推荐:RabbitMQ**
|
||||
|
||||
理由:
|
||||
- 消息确认机制完善
|
||||
- 支持复杂路由规则
|
||||
- 管理界面友好
|
||||
- 适合中低并发场景
|
||||
|
||||
---
|
||||
|
||||
## 三、架构设计
|
||||
|
||||
### 3.1 整体架构
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────────────┐
|
||||
│ 用户 │
|
||||
│ (Telegram/Discord) │
|
||||
└─────────────────────────────────┬───────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────────┐
|
||||
│ 星枢 (主 Agent) │
|
||||
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
|
||||
│ │ 意图理解 │ │ 任务分解 │ │ 队列管理 │ │ 结果聚合 │ │
|
||||
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
|
||||
└─────────────────────────────────┬───────────────────────────────────────┘
|
||||
│
|
||||
┌─────────────┴─────────────┐
|
||||
│ RabbitMQ 集群 │
|
||||
│ (task_exchange) │
|
||||
└─────────────┬─────────────┘
|
||||
│
|
||||
┌───────────────────────┼───────────────────────┐
|
||||
│ │ │
|
||||
▼ ▼ ▼
|
||||
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
|
||||
│ Yunce (Agent) │ │ Atlas (Agent) │ │ Prometheus │
|
||||
│ 队列: tasks │ │ 队列: tasks │ │ 队列: tasks │
|
||||
│ 状态: running │ │ 状态: idle │ │ 状态: idle │
|
||||
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
|
||||
│ │ │
|
||||
│ ┌──────────────────┴──────────────────┐ │
|
||||
│ │ 结果收集 (result_exchange) │ │
|
||||
│ └──────────────────┬──────────────────┘ │
|
||||
│ │ │
|
||||
└──────────────────────┼──────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────────────┐
|
||||
│ 星枢 (结果处理) │
|
||||
│ - 任务状态更新 │
|
||||
│ - 用户反馈 │
|
||||
│ - 后续任务触发 │
|
||||
└─────────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### 3.2 消息流设计
|
||||
|
||||
```
|
||||
┌──────────────────────────────────────────────────────────────────────────┐
|
||||
│ 消息生命周期 │
|
||||
└──────────────────────────────────────────────────────────────────────────┘
|
||||
|
||||
[1] 任务下发 [5] 结果处理
|
||||
│ ▲
|
||||
▼ │
|
||||
┌────────┐ ┌────────────┐ ┌───────────┐ ┌───────────┐ │
|
||||
│ 星枢 │───▶│ RabbitMQ │───▶│ Agent N │───▶│ RabbitMQ │──────┘
|
||||
│创建任务 │ │ (持久化) │ │ 执行任务 │ │ (结果队列) │
|
||||
└────────┘ └────────────┘ └───────────┘ └───────────┘
|
||||
│ │
|
||||
│ [4] ACK 确认
|
||||
│ │
|
||||
[2] 任务入队 │
|
||||
(可选: 延迟队列) ▼
|
||||
│ ┌───────────┐
|
||||
└─────────────▶│ 状态变更 │
|
||||
│ (处理中→完成)
|
||||
└───────────┘
|
||||
|
||||
[3] Agent 消费任务
|
||||
```
|
||||
|
||||
### 3.3 Exchange & Queue 设计
|
||||
|
||||
```
|
||||
┌─────────────────┐
|
||||
│ task_exchange │ (Topic Exchange)
|
||||
│ (星枢下发) │
|
||||
└────────┬────────┘
|
||||
│
|
||||
┌───────────────────┼───────────────────┐
|
||||
│ │ │
|
||||
▼ ▼ ▼
|
||||
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
|
||||
│ queue.yunce │ │ queue.atlas │ │ queue.prometheus│
|
||||
│ routing: │ │ routing: │ │ routing: │
|
||||
│ task.yunce │ │ task.atlas │ │ task.prometheus │
|
||||
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
|
||||
│ │ │
|
||||
▼ ▼ ▼
|
||||
[Agent: Yunce] [Agent: Atlas] [Agent: Prometheus]
|
||||
|
||||
─────────────────────────────────────────────────────────────────────────
|
||||
|
||||
┌─────────────────┐
|
||||
│result_exchange │ (Topic Exchange)
|
||||
│ (结果收集) │
|
||||
└────────┬────────┘
|
||||
│
|
||||
┌───────────────────┼───────────────────┐
|
||||
│ │ │
|
||||
▼ ▼ ▼
|
||||
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
|
||||
│result.yunce │ │result.atlas │ │result.prometheus │
|
||||
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
|
||||
│ │ │
|
||||
└────────────────────┼────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────┐
|
||||
│ queue.star聚合 │ ← 星枢监听此队列
|
||||
│ routing: result.#│
|
||||
└─────────────────┘
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 四、消息格式定义
|
||||
|
||||
### 4.1 任务消息 (Task Message)
|
||||
|
||||
```json
|
||||
{
|
||||
"taskId": "task_20260317_001",
|
||||
"type": "task",
|
||||
"source": "xingyao",
|
||||
"target": "yunce",
|
||||
"priority": "high",
|
||||
"content": {
|
||||
"action": "code_review",
|
||||
"params": {
|
||||
"repo": "my-project",
|
||||
"branch": "feature/login"
|
||||
}
|
||||
},
|
||||
"metadata": {
|
||||
"createdAt": "2026-03-17T10:30:00Z",
|
||||
"expireAt": "2026-03-17T11:30:00Z",
|
||||
"retryCount": 0,
|
||||
"maxRetries": 3
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 4.2 结果消息 (Result Message)
|
||||
|
||||
```json
|
||||
{
|
||||
"taskId": "task_20260317_001",
|
||||
"type": "result",
|
||||
"source": "yunce",
|
||||
"target": "xingyao",
|
||||
"status": "success",
|
||||
"content": {
|
||||
"summary": "代码审查完成",
|
||||
"findings": [
|
||||
{"severity": "warning", "message": "建议添加参数校验"}
|
||||
],
|
||||
"output": "/path/to/report.md"
|
||||
},
|
||||
"metadata": {
|
||||
"completedAt": "2026-03-17T10:35:00Z",
|
||||
"duration": 300
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 4.3 心跳消息 (Heartbeat Message)
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "heartbeat",
|
||||
"agent": "yunce",
|
||||
"status": "idle",
|
||||
"currentTask": null,
|
||||
"timestamp": "2026-03-17T10:30:00Z"
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 五、实现步骤
|
||||
|
||||
### 5.1 RabbitMQ 部署
|
||||
|
||||
```bash
|
||||
# Docker 部署
|
||||
docker run -d \
|
||||
--name rabbitmq \
|
||||
-p 5672:5672 \
|
||||
-p 15672:15672 \
|
||||
-e RABBITMQ_DEFAULT_USER=admin \
|
||||
-e RABBITMQ_DEFAULT_PASS=your_password \
|
||||
rabbitmq:3.12-management
|
||||
|
||||
# 访问管理界面
|
||||
# http://your-server:15672
|
||||
```
|
||||
|
||||
### 5.2 创建 Exchange 和 Queue (初始化脚本)
|
||||
|
||||
```python
|
||||
# setup_rabbitmq.py
|
||||
import pika
|
||||
|
||||
def setup_rabbitmq():
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost', port=5672)
|
||||
)
|
||||
channel = connection.channel()
|
||||
|
||||
# 1. 创建 Exchange
|
||||
channel.exchange_declare(exchange='task_exchange', exchange_type='topic', durable=True)
|
||||
channel.exchange_declare(exchange='result_exchange', exchange_type='topic', durable=True)
|
||||
|
||||
# 2. 创建任务队列 (按 Agent)
|
||||
agents = ['yunce', 'atlas', 'prometheus', 'oracle']
|
||||
for agent in agents:
|
||||
channel.queue_declare(queue=f'queue.{agent}', durable=True)
|
||||
channel.queue_bind(
|
||||
exchange='task_exchange',
|
||||
queue=f'queue.{agent}',
|
||||
routing_key=f'task.{agent}'
|
||||
)
|
||||
|
||||
# 3. 创建星枢结果聚合队列
|
||||
channel.queue_declare(queue='queue.star', durable=True)
|
||||
channel.queue_bind(
|
||||
exchange='result_exchange',
|
||||
queue='queue.star',
|
||||
routing_key='result.#'
|
||||
)
|
||||
|
||||
connection.close()
|
||||
print("✅ RabbitMQ 初始化完成")
|
||||
|
||||
if __name__ == '__main__':
|
||||
setup_rabbitmq()
|
||||
```
|
||||
|
||||
### 5.3 星枢任务下发模块
|
||||
|
||||
```python
|
||||
# star_sender.py
|
||||
import pika
|
||||
import json
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
|
||||
class StarTaskSender:
|
||||
def __init__(self, rabbitmq_host='localhost'):
|
||||
self.connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host=rabbitmq_host)
|
||||
)
|
||||
self.channel = self.connection.channel()
|
||||
|
||||
def send_task(self, target_agent, action, params, priority='normal'):
|
||||
task_id = f"task_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
|
||||
|
||||
message = {
|
||||
"taskId": task_id,
|
||||
"type": "task",
|
||||
"source": "xingyao",
|
||||
"target": target_agent,
|
||||
"priority": priority,
|
||||
"content": {
|
||||
"action": action,
|
||||
"params": params
|
||||
},
|
||||
"metadata": {
|
||||
"createdAt": datetime.now().isoformat() + "Z",
|
||||
"retryCount": 0,
|
||||
"maxRetries": 3
|
||||
}
|
||||
}
|
||||
|
||||
self.channel.basic_publish(
|
||||
exchange='task_exchange',
|
||||
routing_key=f'task.{target_agent}',
|
||||
body=json.dumps(message),
|
||||
properties=pika.BasicProperties(
|
||||
delivery_mode=2, # 持久化
|
||||
priority=10 if priority == 'high' else 5
|
||||
)
|
||||
)
|
||||
|
||||
print(f"✅ 任务已下发: {task_id} -> {target_agent}")
|
||||
return task_id
|
||||
|
||||
def close(self):
|
||||
self.connection.close()
|
||||
|
||||
# 使用示例
|
||||
if __name__ == '__main__':
|
||||
sender = StarTaskSender()
|
||||
|
||||
# 下发任务给 Yunce
|
||||
task_id = sender.send_task(
|
||||
target_agent='yunce',
|
||||
action='code_review',
|
||||
params={'repo': 'my-project', 'branch': 'main'},
|
||||
priority='high'
|
||||
)
|
||||
|
||||
sender.close()
|
||||
```
|
||||
|
||||
### 5.4 Agent 任务监听模块
|
||||
|
||||
```python
|
||||
# agent_listener.py
|
||||
import pika
|
||||
import json
|
||||
import subprocess
|
||||
import logging
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class AgentListener:
|
||||
def __init__(self, agent_name, rabbitmq_host='localhost'):
|
||||
self.agent_name = agent_name
|
||||
self.connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host=rabbitmq_host)
|
||||
)
|
||||
self.channel = self.connection.channel()
|
||||
|
||||
def execute_task(self, task_content):
|
||||
"""执行任务的核心逻辑"""
|
||||
action = task_content['action']
|
||||
params = task_content['params']
|
||||
|
||||
logger.info(f"执行任务: {action}")
|
||||
|
||||
# 根据 action 调用不同的处理函数
|
||||
handlers = {
|
||||
'code_review': self.handle_code_review,
|
||||
'data_analysis': self.handle_data_analysis,
|
||||
'file_operation': self.handle_file_operation,
|
||||
}
|
||||
|
||||
handler = handlers.get(action, self.handle_default)
|
||||
return handler(params)
|
||||
|
||||
def handle_code_review(self, params):
|
||||
# 调用 OpenClaw agent
|
||||
result = subprocess.run(
|
||||
['openclaw', 'agent', '--agent', 'yunce',
|
||||
'--message', f"请审查代码仓库 {params.get('repo')}"],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
return {'output': result.stdout, 'status': 'success'}
|
||||
|
||||
def handle_default(self, params):
|
||||
return {'message': f'Unknown action: {params}'}
|
||||
|
||||
def on_message(self, ch, method, properties, body):
|
||||
"""消息处理回调"""
|
||||
try:
|
||||
message = json.loads(body)
|
||||
task_id = message['taskId']
|
||||
|
||||
logger.info(f"收到任务: {task_id}")
|
||||
|
||||
# 执行任务
|
||||
result = self.execute_task(message['content'])
|
||||
|
||||
# 发送结果
|
||||
self.send_result(task_id, result)
|
||||
|
||||
# ACK 确认
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"任务执行失败: {e}")
|
||||
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
||||
|
||||
def send_result(self, task_id, result):
|
||||
"""发送结果到星枢"""
|
||||
result_message = {
|
||||
"taskId": task_id,
|
||||
"type": "result",
|
||||
"source": self.agent_name,
|
||||
"target": "xingyao",
|
||||
"status": "success",
|
||||
"content": result,
|
||||
"metadata": {
|
||||
"completedAt": datetime.now().isoformat() + "Z"
|
||||
}
|
||||
}
|
||||
|
||||
self.channel.basic_publish(
|
||||
exchange='result_exchange',
|
||||
routing_key=f'result.{self.agent_name}',
|
||||
body=json.dumps(result_message),
|
||||
properties=pika.BasicProperties(delivery_mode=2)
|
||||
)
|
||||
|
||||
def start_listening(self):
|
||||
"""开始监听任务队列"""
|
||||
self.channel.basic_qos(prefetch_count=1)
|
||||
self.channel.basic_consume(
|
||||
queue=f'queue.{self.agent_name}',
|
||||
on_message_callback=self.on_message
|
||||
)
|
||||
|
||||
logger.info(f"🤖 Agent [{self.agent_name}] 开始监听任务队列...")
|
||||
self.channel.start_consuming()
|
||||
|
||||
# 使用示例
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
agent_name = sys.argv[1] if len(sys.argv) > 1 else 'yunce'
|
||||
listener = AgentListener(agent_name)
|
||||
listener.start_listening()
|
||||
```
|
||||
|
||||
### 5.5 星枢结果收集模块
|
||||
|
||||
```python
|
||||
# star_receiver.py
|
||||
import pika
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
class StarResultReceiver:
|
||||
def __init__(self, rabbitmq_host='localhost'):
|
||||
self.connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host=rabbitmq_host)
|
||||
)
|
||||
self.channel = self.connection.channel()
|
||||
self.pending_tasks = {} # 跟踪待处理任务
|
||||
|
||||
def on_message(self, ch, method, properties, body):
|
||||
message = json.loads(body)
|
||||
|
||||
if message['type'] == 'result':
|
||||
task_id = message['taskId']
|
||||
status = message['status']
|
||||
result = message['content']
|
||||
|
||||
print(f"📋 任务完成: {task_id}")
|
||||
print(f" 状态: {status}")
|
||||
print(f" 结果: {result}")
|
||||
|
||||
# 更新任务状态
|
||||
if task_id in self.pending_tasks:
|
||||
self.pending_tasks[task_id]['status'] = 'completed'
|
||||
self.pending_tasks[task_id]['result'] = result
|
||||
|
||||
# 可以触发后续任务
|
||||
self.handle_next_action(message)
|
||||
|
||||
elif message['type'] == 'heartbeat':
|
||||
print(f"💓 Agent 心跳: {message['agent']} - {message['status']}")
|
||||
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
def handle_next_action(self, message):
|
||||
"""根据结果触发后续动作"""
|
||||
# 示例:根据结果发送新任务
|
||||
pass
|
||||
|
||||
def start_listening(self):
|
||||
self.channel.basic_qos(prefetch_count=1)
|
||||
self.channel.basic_consume(
|
||||
queue='queue.star',
|
||||
on_message_callback=self.on_message
|
||||
)
|
||||
|
||||
print("🌟 星枢开始监听任务结果...")
|
||||
self.channel.start_consuming()
|
||||
|
||||
# 使用示例
|
||||
if __name__ == '__main__':
|
||||
receiver = StarResultReceiver()
|
||||
receiver.start_listening()
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 六、监控界面
|
||||
|
||||
### 6.1 RabbitMQ 管理界面
|
||||
|
||||
```
|
||||
URL: http://localhost:15672
|
||||
用户名: admin
|
||||
密码: your_password
|
||||
|
||||
可查看:
|
||||
- 队列状态 (Messages, Ready, Unacked)
|
||||
- 连接数
|
||||
- 消息流速
|
||||
- 交换机绑定
|
||||
```
|
||||
|
||||
### 6.2 自定义监控面板 (可选)
|
||||
|
||||
```python
|
||||
# 简单的任务状态查询
|
||||
def get_task_status(task_id):
|
||||
# 可以通过 REST API 查询
|
||||
# 或者维护一个 Redis 状态缓存
|
||||
pass
|
||||
|
||||
def list_pending_tasks():
|
||||
# 列出所有待处理任务
|
||||
pass
|
||||
|
||||
def list_agent_status():
|
||||
# 列出所有 Agent 状态
|
||||
pass
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 七、完整工作流程示例
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────────────┐
|
||||
│ 完整示例:代码审查任务 │
|
||||
└─────────────────────────────────────────────────────────────────────────┘
|
||||
|
||||
[用户]
|
||||
│
|
||||
│ "星枢,帮我审查 my-project 的 main 分支"
|
||||
▼
|
||||
[星枢 - 意图理解]
|
||||
│ action: code_review
|
||||
│ target: yunce
|
||||
│ params: {repo: "my-project", branch: "main"}
|
||||
▼
|
||||
[星枢 - 任务下发]
|
||||
│ RabbitMQ: task.yunce
|
||||
│ taskId: task_20260317_001
|
||||
▼
|
||||
[RabbitMQ] (持久化消息)
|
||||
▼
|
||||
[Yunce Agent - 任务监听]
|
||||
│ 收到任务 -> 执行 code_review
|
||||
│ 调用: openclaw agent --agent yunce --message "审查 my-project"
|
||||
▼
|
||||
[Yunce Agent - 返回结果]
|
||||
│ RabbitMQ: result.yunce
|
||||
│ status: success, findings: [...]
|
||||
▼
|
||||
[RabbitMQ]
|
||||
│ result.# -> queue.star
|
||||
▼
|
||||
[星枢 - 结果收集]
|
||||
│ 接收结果 -> 更新状态
|
||||
│ 格式化输出 -> 推送给用户
|
||||
▼
|
||||
[用户]
|
||||
│ 收到审查报告
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 八、部署建议
|
||||
|
||||
### 8.1 生产环境配置
|
||||
|
||||
```yaml
|
||||
# docker-compose.yml
|
||||
version: '3.8'
|
||||
|
||||
services:
|
||||
rabbitmq:
|
||||
image: rabbitmq:3.12-management
|
||||
ports:
|
||||
- "5672:5672"
|
||||
- "15672:15672"
|
||||
environment:
|
||||
RABBITMQ_DEFAULT_USER: admin
|
||||
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
|
||||
volumes:
|
||||
- rabbitmq_data:/var/lib/rabbitmq
|
||||
healthcheck:
|
||||
test: ["CMD", "rabbitmq-diagnostics", "check_running"]
|
||||
interval: 30s
|
||||
|
||||
volumes:
|
||||
rabbitmq_data:
|
||||
```
|
||||
|
||||
### 8.2 安全建议
|
||||
|
||||
1. **认证**:启用 RabbitMQ 用户认证
|
||||
2. **SSL/TLS**:生产环境启用 amqps
|
||||
3. **VHost**:不同项目使用不同 vhost
|
||||
4. **权限**:最小权限原则
|
||||
|
||||
---
|
||||
|
||||
## 九、故障处理
|
||||
|
||||
| 故障场景 | 解决方案 |
|
||||
|----------|----------|
|
||||
| Agent 宕机 | 任务自动重新入队 (requeue) |
|
||||
| RabbitMQ 宕机 | 消息持久化,重启后恢复 |
|
||||
| 任务超时 | 设置 TTL,自动移到死信队列 |
|
||||
| 消息积压 | 监控队列长度,扩展消费者 |
|
||||
|
||||
---
|
||||
|
||||
## 十、进阶功能
|
||||
|
||||
### 10.1 延迟任务
|
||||
|
||||
```python
|
||||
# 延迟队列:让任务在指定时间后执行
|
||||
def send_delayed_task(target, action, delay_seconds):
|
||||
# 使用 RabbitMQ 延迟插件 或 配合 Redis 实现
|
||||
pass
|
||||
```
|
||||
|
||||
### 10.2 优先级队列
|
||||
|
||||
```python
|
||||
# 高优先级任务优先处理
|
||||
channel.queue_declare(queue='queue.yunce', arguments={
|
||||
'x-max-priority': 10
|
||||
})
|
||||
```
|
||||
|
||||
### 10.3 任务超时
|
||||
|
||||
```python
|
||||
# 消息 TTL + 死信队列
|
||||
channel.queue_declare(
|
||||
queue='queue.yunce',
|
||||
arguments={
|
||||
'x-message-ttl': 3600000, # 1小时
|
||||
'x-dead-letter-exchange': 'dlx_exchange'
|
||||
}
|
||||
)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 附录:文件清单
|
||||
|
||||
| 文件 | 说明 |
|
||||
|------|------|
|
||||
| `setup_rabbitmq.py` | RabbitMQ 初始化脚本 |
|
||||
| `star_sender.py` | 星枢任务下发模块 |
|
||||
| `agent_listener.py` | Agent 任务监听模块 |
|
||||
| `star_receiver.py` | 星枢结果收集模块 |
|
||||
| `docker-compose.yml` | 一键部署配置 |
|
||||
|
||||
---
|
||||
|
||||
*文档版本: 1.0*
|
||||
*创建时间: 2026-03-17*
|
||||
*作者: 云策*
|
||||
Reference in New Issue
Block a user