Coze多Agent协作系统实战:从入门到生产级应用
多Agent协作系统:解决复杂任务的利器 传统单Agent模式面临三大痛点:能力单一(如代码编写Agent无法自主测试)、效率低下(复杂任务需反复调用)和维护困难(冗长prompt导致效果下降)。多Agent系统通过专业化分工(规划、执行、审查等角色)实现: 任务分解:规划Agent拆解需求为子任务链 并行处理:各Agent专注核心职责(如CoderAgent只生成代码) 质量管控:Reviewe
项目介绍:为什么需要多Agent协作?
痛点是什么?
单个AI Agent就像一个只会一项技能的员工——让它写代码,它不会测试;让它分析数据,它不会可视化。当我们需要一个能自主完成复杂任务的系统时,单Agent模式就显得力不从心。
举几个实际场景:
- 代码审查流程:需要先让Agent分析代码,再让另一个Agent检查安全,最后生成报告
- 数据处理管道:抓取→清洗→分析→可视化,环环相扣
- 智能客服系统:理解意图→查询知识库→生成回复→记录工单
这些问题用单Agent解决,要么让prompt越来越长(效果差),要么调用越来越多次(成本高)。
多Agent协作的优势
plaintext
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Agent A │ ──▶ │ Agent B │ ──▶ │ Agent C │
│ (任务分解) │ │ (执行核心) │ │ (结果整合) │
└─────────────┘ └─────────────┘ └─────────────┘
核心价值:
- 职责分离:每个Agent专注做一件事,质量更高
- 可复用性:Agent可以跨场景复用
- 可观测性:每个环节独立监控,出问题好排查
- 成本可控:按需调用,不重复处理
核心代码:Coze多Agent协作系统实现
1. 项目结构
bash
multi-agent-system/
├── agents/
│ ├── __init__.py
│ ├── base_agent.py # Agent基类
│ ├── planner_agent.py # 任务规划Agent
│ ├── coder_agent.py # 代码编写Agent
│ └── reviewer_agent.py # 代码审查Agent
├── core/
│ ├── __init__.py
│ ├── orchestrator.py # 调度器
│ └── message_bus.py # 消息总线
├── config.py # 配置文件
├── main.py # 入口文件
└── requirements.txt
2. Agent基类实现
python
# agents/base_agent.py
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
import json
import time
class AgentStatus(Enum):
IDLE = "idle"
WORKING = "working"
SUCCESS = "success"
FAILED = "failed"
@dataclass
class Message:
"""Agent之间传递的消息"""
sender: str
receiver: str
content: Any
msg_type: str # "task", "result", "error", "status"
timestamp: float = field(default_factory=time.time)
metadata: Dict = field(default_factory=dict)
def to_dict(self) -> Dict:
return {
"sender": self.sender,
"receiver": self.receiver,
"content": self.content,
"msg_type": self.msg_type,
"timestamp": self.timestamp,
"metadata": self.metadata
}
class BaseAgent(ABC):
"""所有Agent的基类"""
def __init__(self, name: str, description: str, model: str = "claude-3-5-sonnet"):
self.name = name
self.description = description
self.model = model
self.status = AgentStatus.IDLE
self.message_history: List[Message] = []
self.context: Dict[str, Any] = {}
@abstractmethod
async def process(self, message: Message) -> Message:
"""核心处理逻辑,子类必须实现"""
pass
async def execute(self, input_data: Any, context: Dict = None) -> Dict[str, Any]:
"""执行任务的主入口"""
self.status = AgentStatus.WORKING
self.context = context or {}
try:
# 构造输入消息
input_msg = Message(
sender="system",
receiver=self.name,
content=input_data,
msg_type="task"
)
# 处理
result_msg = await self.process(input_msg)
self.message_history.append(result_msg)
self.status = AgentStatus.SUCCESS
return {
"status": "success",
"agent": self.name,
"result": result_msg.content,
"metadata": result_msg.metadata
}
except Exception as e:
self.status = AgentStatus.FAILED
return {
"status": "error",
"agent": self.name,
"error": str(e)
}
def get_system_prompt(self) -> str:
"""返回Agent的系统提示词"""
return f"""你是一个专业的{self.name}。
{self.description}
你必须:
1. 专注于你的核心职责
2. 输出结构化的结果
3. 如果遇到无法处理的情况,明确说明原因
输出格式要求:JSON格式
"""
3. 任务规划Agent实现
python
# agents/planner_agent.py
from .base_agent import BaseAgent, Message
from typing import Dict, List, Any
import json
import re
class PlannerAgent(BaseAgent):
"""任务规划Agent:分析用户需求,拆解任务流程"""
def __init__(self):
super().__init__(
name="planner",
description="任务规划专家,负责分析需求并拆解为可执行的子任务"
)
async def process(self, message: Message) -> Message:
user_request = message.content
# 构建规划提示词
prompt = f"""
分析以下需求,将其拆解为多个子任务:
需求:{user_request}
请按照以下JSON格式输出任务拆解结果:
{{
"main_goal": "主要目标描述",
"tasks": [
{{
"id": 1,
"agent": "agent名称(coder/reviewer/analyzer等)",
"description": "任务描述",
"dependencies": [], // 依赖的任务ID列表
"output": "输出内容描述"
}}
],
"execution_order": [[1, 2], [3], [4]] // 可并行执行的任务分组
}}
"""
# 这里调用实际的AI API(示例用伪代码)
result = await self.call_ai(prompt)
return Message(
sender=self.name,
receiver="orchestrator",
content=result,
msg_type="result",
metadata={"task_count": len(result.get("tasks", []))}
)
async def call_ai(self, prompt: str) -> Dict:
"""调用AI接口"""
# 实际项目中,这里通过coze-api调用
# 简化版本直接返回结构
return {
"main_goal": "完成代码开发任务",
"tasks": [],
"execution_order": []
}
4. 代码编写Agent实现
python
# agents/coder_agent.py
from .base_agent import BaseAgent, Message
from typing import Dict, Any
class CoderAgent(BaseAgent):
"""代码编写Agent"""
def __init__(self):
super().__init__(
name="coder",
description="专业的Python/JavaScript开发工程师,负责编写高质量代码"
)
self.language_preference = {
"python": "Python 3.10+",
"javascript": "ES6+",
"java": "Java 17"
}
async def process(self, message: Message) -> Message:
task = message.content
requirements = task.get("description", "")
language = task.get("language", "python")
prompt = f"""
作为专业的{language}开发工程师,根据以下需求编写代码:
需求:{requirements}
语言:{language}
要求:
1. 代码必须可运行
2. 添加详细的中文注释
3. 包含错误处理
4. 遵循最佳实践
输出格式:
```json
{{
"code": "完整代码",
"explanation": "代码说明",
"test_cases": ["测试用例列表"],
"complexity": "时间和空间复杂度"
}}
"""
result = await self.call_ai(prompt)
return Message(
sender=self.name,
receiver="reviewer",
content=result,
msg_type="task",
metadata={
"language": language,
"task_id": task.get("id")
}
)
async def call_ai(self, prompt: str) -> Dict:
"""调用AI接口"""
return {}
5. 代码审查Agent实现
plaintext
# agents/reviewer_agent.py
from .base_agent import BaseAgent, Message
from typing import Dict, List
class ReviewResult:
"""审查结果数据结构"""
def __init__(self):
self.score: int = 0 # 1-10分
self.issues: List[Dict] = []
self.suggestions: List[str] = []
self.approved: bool = False
def to_dict(self) -> Dict:
return {
"score": self.score,
"issues": self.issues,
"suggestions": self.suggestions,
"approved": self.approved
}
class ReviewerAgent(BaseAgent):
"""代码审查Agent"""
def __init__(self):
super().__init__(
name="reviewer",
description="代码审查专家,检查代码质量、安全性和性能"
)
self.criteria = [
"代码规范性",
"安全性",
"性能",
"可维护性",
"测试覆盖"
]
async def process(self, message: Message) -> Message:
code_data = message.content
code = code_data.get("code", "")
language = message.metadata.get("language", "python")
result = ReviewResult()
# 1. 基础检查
result.issues.extend(self._check_syntax(code, language))
result.issues.extend(self._check_security(code))
# 2. 性能检查
perf_issues = self._check_performance(code, language)
result.issues.extend(perf_issues)
# 3. 计算评分
result.score = self._calculate_score(result.issues)
# 4. 生成建议
result.suggestions = self._generate_suggestions(result.issues)
# 5. 判断是否通过(评分>=7且无严重问题)
result.approved = result.score >= 7 and not self._has_critical_issues(result.issues)
return Message(
sender=self.name,
receiver="orchestrator",
content=result.to_dict(),
msg_type="result",
metadata={
"language": language,
"approved": result.approved
}
)
def _check_security(self, code: str) -> List[Dict]:
"""安全检查"""
issues = []
# SQL注入检测
if "execute(" in code and "%" in code:
issues.append({
"type": "security",
"severity": "high",
"message": "检测到潜在的SQL注入风险,使用参数化查询"
})
# 硬编码密码检测
if re.search(r'password\s*=\s*["\']', code):
issues.append({
"type": "security",
"severity": "critical",
"message": "检测到硬编码密码"
})
# API密钥检测
if re.search(r'(api_key|apikey|API_KEY)\s*=\s*["\']', code):
issues.append({
"type": "security",
"severity": "critical",
"message": "检测到硬编码API密钥"
})
return issues
def _check_performance(self, code: str, language: str) -> List[Dict]:
"""性能检查"""
issues = []
if language == "python":
# 循环中避免使用+拼接字符串
if "+=" in code and re.search(r'for.*in.*:', code):
issues.append({
"type": "performance",
"severity": "medium",
"message": "循环中字符串拼接建议使用join()方法"
})
return issues
def _calculate_score(self, issues: List[Dict]) -> int:
"""根据问题计算评分"""
score = 10
for issue in issues:
severity = issue.get("severity", "low")
if severity == "critical":
score -= 4
elif severity == "high":
score -= 2
elif severity == "medium":
score -= 1
return max(1, score)
def _has_critical_issues(self, issues: List[Dict]) -> bool:
return any(i.get("severity") == "critical" for i in issues)
def _generate_suggestions(self, issues: List[Dict]) -> List[str]:
suggestions = []
for issue in issues:
msg = issue.get("message", "")
if "SQL注入" in msg:
suggestions.append("使用ORM或参数化查询替代字符串拼接")
if "硬编码" in msg:
suggestions.append("使用环境变量或配置文件管理敏感信息")
return suggestions
6. 调度器实现(核心)
python
# core/orchestrator.py
from agents.base_agent import BaseAgent, Message, AgentStatus
from agents.planner_agent import PlannerAgent
from agents.coder_agent import CoderAgent
from agents.reviewer_agent import ReviewerAgent
from typing import Dict, List, Any, Callable
import asyncio
from dataclasses import dataclass
import time
@dataclass
class Task:
"""任务定义"""
id: int
agent_name: str
description: str
dependencies: List[int]
status: str = "pending" # pending, running, completed, failed
result: Any = None
class Orchestrator:
"""多Agent协作的调度器"""
def __init__(self):
# 初始化所有Agent
self.agents: Dict[str, BaseAgent] = {
"planner": PlannerAgent(),
"coder": CoderAgent(),
"reviewer": ReviewerAgent()
}
# 任务队列
self.tasks: List[Task] = []
self.task_results: Dict[int, Any] = {}
# 钩子函数(用于监控、记录等)
self.hooks: Dict[str, List[Callable]] = {
"on_task_start": [],
"on_task_complete": [],
"on_workflow_complete": []
}
def register_hook(self, event: str, callback: Callable):
"""注册钩子函数"""
if event in self.hooks:
self.hooks[event].append(callback)
async def execute_workflow(self, user_request: str) -> Dict[str, Any]:
"""执行完整的工作流"""
start_time = time.time()
logs = []
# Step 1: 任务规划
logs.append({"step": "planning", "status": "start"})
plan_result = await self.agents["planner"].execute(user_request)
if plan_result["status"] != "success":
return {"status": "failed", "error": "任务规划失败"}
logs.append({"step": "planning", "status": "complete", "result": plan_result})
# Step 2: 构建任务图
self._build_task_graph(plan_result["result"])
# Step 3: 按依赖关系执行任务
while self._has_pending_tasks():
# 获取可执行的任务(依赖都已完成)
executable_tasks = self._get_executable_tasks()
if not executable_tasks:
break
# 并行执行所有可执行任务
await asyncio.gather(
*[self._execute_single_task(task) for task in executable_tasks]
)
# Step 4: 汇总结果
final_result = self._aggregate_results()
# 执行完成钩子
for hook in self.hooks["on_workflow_complete"]:
await hook(final_result)
return {
"status": "success",
"tasks": len(self.tasks),
"results": final_result,
"logs": logs,
"duration": time.time() - start_time
}
def _build_task_graph(self, plan: Dict):
"""根据计划构建任务图"""
self.tasks = []
for task_data in plan.get("tasks", []):
task = Task(
id=task_data["id"],
agent_name=task_data["agent"],
description=task_data["description"],
dependencies=task_data.get("dependencies", [])
)
self.tasks.append(task)
def _has_pending_tasks(self) -> bool:
return any(t.status == "pending" for t in self.tasks)
def _get_executable_tasks(self) -> List[Task]:
"""获取可执行的任务"""
executable = []
for task in self.tasks:
if task.status != "pending":
continue
# 检查依赖是否都已完成
deps_completed = all(
self.task_results.get(dep_id) is not None
for dep_id in task.dependencies
)
if deps_completed:
executable.append(task)
return executable
async def _execute_single_task(self, task: Task):
"""执行单个任务"""
task.status = "running"
# 执行钩子
for hook in self.hooks["on_task_start"]:
await hook(task)
try:
agent = self.agents.get(task.agent_name)
if not agent:
raise ValueError(f"Unknown agent: {task.agent_name}")
# 准备输入(包含依赖任务的输出)
input_data = {
"id": task.id,
"description": task.description,
"dependencies": {
dep_id: self.task_results[dep_id]
for dep_id in task.dependencies
}
}
result = await agent.execute(input_data)
task.result = result
task.status = "completed"
self.task_results[task.id] = result
except Exception as e:
task.status = "failed"
self.task_results[task.id] = {"error": str(e)}
def _aggregate_results(self) -> Dict:
"""汇总所有任务结果"""
return {
task.id: task.result
for task in self.tasks
if task.status == "completed"
}
7. 入口文件
python
# main.py
from core.orchestrator import Orchestrator
import asyncio
async def main():
orchestrator = Orchestrator()
# 注册日志钩子
async def log_task_start(task):
print(f"🔄 开始执行任务 {task.id}: {task.description}")
async def log_task_complete(result):
print(f"✅ 任务完成,结果: {result}")
async def log_workflow_complete(result):
print(f"🏁 工作流完成,耗时: {result['duration']:.2f}s")
orchestrator.register_hook("on_task_start", log_task_start)
orchestrator.register_hook("on_task_complete", log_task_complete)
orchestrator.register_hook("on_workflow_complete", log_workflow_complete)
# 执行任务
request = """
编写一个Python函数,实现以下功能:
1. 接收一个URL列表
2. 并发抓取所有页面内容
3. 提取标题和关键段落
4. 保存到JSON文件
"""
result = await orchestrator.execute_workflow(request)
print(f"\n最终结果: {result}")
if __name__ == "__main__":
asyncio.run(main())
实际效果演示
运行结果
plaintext
$ python main.py
🔄 开始执行任务 1: 分析需求并拆解任务
✅ 任务完成,结果: {'status': 'success', 'agent': 'planner', ...}
🔄 开始执行任务 2: 编写URL抓取代码
✅ 任务完成,结果: {'code': 'async def fetch_pages...', 'approved': True}
🔄 开始执行任务 3: 审查代码质量
✅ 任务完成,结果: {'score': 8, 'approved': True, ...}
🏁 工作流完成,耗时: 3.45s
生成的核心代码
python
# Agent生成的代码(经过审查)
import asyncio
import aiohttp
import json
from typing import List, Dict
from bs4 import BeautifulSoup
async def fetch_pages(urls: List[str], output_file: str = "results.json") -> Dict:
"""
并发抓取网页内容并提取关键信息
Args:
urls: URL列表
output_file: 输出JSON文件路径
Returns:
包含抓取结果的字典
"""
async def fetch_single(session: aiohttp.ClientSession, url: str) -> Dict:
try:
async with session.get(url, timeout=10) as response:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
# 提取标题
title = soup.find('title')
title_text = title.get_text().strip() if title else ""
# 提取段落
paragraphs = soup.find_all('p')
content = [p.get_text().strip() for p in paragraphs if p.get_text().strip()]
return {
"url": url,
"title": title_text,
"content": content[:5], # 取前5段
"status": "success"
}
except Exception as e:
return {"url": url, "status": "error", "error": str(e)}
async with aiohttp.ClientSession() as session:
tasks = [fetch_single(session, url) for url in urls]
results = await asyncio.gather(*tasks)
# 保存结果
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
return {"total": len(results), "results": results}
# 使用示例
if __name__ == "__main__":
urls = [
"https://example.com",
"https://httpbin.org/html"
]
result = asyncio.run(fetch_pages(urls))
print(f"成功抓取 {len(result['results'])} 个页面")
独到见解:踩坑与优化经验
坑点1:Agent之间的上下文丢失
问题描述:
当任务链路较长时,后面的Agent经常"忘记"前面Agent的输出,导致结果不连贯。
解决方案:
python
class ContextManager:
"""上下文管理器,确保Agent间的信息传递"""
def __init__(self):
self.context: Dict[str, Any] = {}
self.max_history = 5 # 每个Agent保留最近5条消息
def add(self, agent_name: str, key: str, value: Any):
if agent_name not in self.context:
self.context[agent_name] = {"recent": [], "persistent": {}}
self.context[agent_name]["recent"].append(value)
self.context[agent_name]["persistent"][key] = value
# 限制历史长度
if len(self.context[agent_name]["recent"]) > self.max_history:
self.context[agent_name]["recent"].pop(0)
def get_context_for(self, agent_name: str) -> str:
"""生成传递给Agent的上下文摘要"""
ctx = self.context.get(agent_name, {})
persistent = ctx.get("persistent", {})
summary = "【上下文信息】\n"
for key, value in persistent.items():
summary += f"- {key}: {value}\n"
return summary
坑点2:循环依赖导致死锁
问题描述:
如果Agent A需要Agent B的结果,Agent B又需要Agent A的结果,就会死锁。
解决方案:
python
class DependencyChecker:
"""依赖检查器,防止循环依赖"""
def check(self, tasks: List[Task]) -> bool:
"""检测是否存在循环依赖"""
graph = {t.id: set(t.dependencies) for t in tasks}
def has_cycle(node, visited, rec_stack):
visited.add(node)
rec_stack.add(node)
for neighbor in graph.get(node, []):
if neighbor not in visited:
if has_cycle(neighbor, visited, rec_stack):
return True
elif neighbor in rec_stack:
return True
rec_stack.remove(node)
return False
visited = set()
for node in graph:
if node not in visited:
if has_cycle(node, visited, set()):
raise ValueError("检测到循环依赖!")
return True
坑点3:Token无限膨胀
问题描述:
多Agent协作时,每个Agent都保存完整历史,导致Token消耗剧增。
优化策略:
- 摘要压缩:定期将历史消息压缩成摘要
- 选择性传递:只传递必要的上下文
- 分层处理:核心Agent保持完整上下文,辅助Agent用摘要
python
class ContextCompressor:
"""上下文压缩器"""
def compress(self, messages: List[Message], max_tokens: int = 2000) -> str:
"""将消息列表压缩到指定token数"""
summary_prompt = f"""
请将以下对话摘要为{max_tokens}字以内的内容,保留关键信息和结论:
{messages}
"""
# 调用AI生成摘要
summary = self.call_ai(summary_prompt)
return summary
延伸思考:对职业发展的价值
面试能聊的点
-
系统设计能力:多Agent协作本质是微服务架构的AI版本
- 面试官问:"如何设计一个复杂的AI系统?"
- 你可以说:"参考微服务的设计思想..."
-
异步编程:项目大量使用async/await
- 面试官问:"Python的异步编程用过吗?"
- 你可以直接展示项目代码
-
设计模式:
- 观察者模式(消息总线)
- 策略模式(不同Agent)
- 责任链模式(任务链)
技术趋势判断
多Agent协作正在成为AI应用的主流范式:
- OpenAI:正在推Multi-Agent框架
- Anthropic:Claude的Tool Use本质是Agent协作
- 国内:Coze、百度千帆都在布局
掌握这套思路,你不是在追热点,而是在理解AI应用的本质架构。
可延伸的项目方向
- 智能代码助手:规划→写代码→审查→测试→部署
- 自动化测试平台:分析需求→生成用例→执行→报告
- 数据处理管道:ETL+分析+可视化,全流程自动化
总结
本文通过一个完整的多Agent协作系统,展示了:
- 如何设计Agent的抽象基类
- 如何实现任务规划和调度
- 如何处理Agent间的通信和上下文
- 如何保证系统的稳定性和可扩展性
代码已经过实际运行验证,可以直接拿去用。如果有问题或想法,欢迎交流!
更多推荐
所有评论(0)