多 Agent 协作系统设计与实现

为什么需要多 Agent?

单个 Agent 能力有限,就像一个人很难同时是程序员、设计师、产品经理。多 Agent 协作让专业的人做专业的事。

多 Agent 系统 = 一个团队,每个成员有自己的专长,通过协作完成复杂任务。

多 Agent 架构模式

1. 层级式(Hierarchical)

        ┌─────────────┐
        │   主管 Agent  │  ← 分配任务、协调资源
        └──────┬──────┘
               │
    ┌─────────┼─────────┐
    │         │         │
┌───▼───┐ ┌──▼────┐ ┌──▼────┐
│研究Agent│ │代码Agent│ │测试Agent│
└───────┘ └───────┘ └───────┘

2. 对等式(Peer-to-Peer)

    ┌─────────┐
    │ Agent A │←────→┌─────────┐
    │ (前端)   │      │ Agent B │
    └────┬────┘      │ (后端)   │
         │           └────┬────┘
         │                │
         └────────┬───────┘
                  │
            ┌─────▼─────┐
            │  Agent C  │
            │ (数据库)   │
            └───────────┘

实战:构建开发团队 Agent

from typing import List, Dict
from dataclasses import dataclass

@dataclass
class Task:
    id: str
    description: str
    type: str  # "research", "code", "test", "review"
    status: str = "pending"

class BaseAgent:
    """基础 Agent 类"""
    def __init__(self, name: str, role: str):
        self.name = name
        self.role = role
        self.memory = []
    
    def execute(self, task: Task) -> str:
        raise NotImplementedError
    
    def communicate(self, message: str) -> str:
        """与其他 Agent 通信"""
        self.memory.append(message)
        return f"{self.name} 收到: {message}"

class ResearchAgent(BaseAgent):
    """研究型 Agent"""
    def __init__(self):
        super().__init__("研究员", "research")
    
    def execute(self, task: Task) -> str:
        # 搜索资料、分析需求
        return f"研究完成:{task.description} 的技术方案"

class CodeAgent(BaseAgent):
    """编码 Agent"""
    def __init__(self):
        super().__init__("程序员", "code")
    
    def execute(self, task: Task) -> str:
        # 编写代码
        return f"代码实现:\n```python\n# {task.description} 的实现代码\n```"

class TestAgent(BaseAgent):
    """测试 Agent"""
    def __init__(self):
        super().__init__("测试员", "test")
    
    def execute(self, task: Task) -> str:
        # 编写测试用例
        return f"测试完成:覆盖 95%,发现 2 个 bug"

任务分配与协调

class Coordinator:
    """协调器:负责任务分配"""
    
    def __init__(self):
        self.agents: Dict[str, BaseAgent] = {
            "research": ResearchAgent(),
            "code": CodeAgent(),
            "test": TestAgent()
        }
        self.task_queue: List[Task] = []
    
    def add_task(self, task: Task):
        self.task_queue.append(task)
    
    def dispatch(self) -> List[str]:
        """分配并执行任务"""
        results = []
        
        for task in self.task_queue:
            agent = self.agents.get(task.type)
            if agent:
                result = agent.execute(task)
                results.append(f"{agent.name}: {result}")
            else:
                results.append(f"错误:没有处理 {task.type} 的 Agent")
        
        return results

# 使用示例
coordinator = Coordinator()

tasks = [
    Task("1", "分析用户登录需求", "research"),
    Task("2", "实现 JWT 认证模块", "code"),
    Task("3", "编写单元测试", "test")
]

for task in tasks:
    coordinator.add_task(task)

results = coordinator.dispatch()
for r in results:
    print(r)

Agent 间通信协议

class MessageBus:
    """消息总线:Agent 间通信的中介"""
    
    def __init__(self):
        self.subscribers: Dict[str, List[BaseAgent]] = {}
        self.messages: List[Dict] = []
    
    def subscribe(self, topic: str, agent: BaseAgent):
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(agent)
    
    def publish(self, topic: str, message: str):
        self.messages.append({"topic": topic, "content": message})
        
        for agent in self.subscribers.get(topic, []):
            agent.communicate(message)

# 使用消息总线
bus = MessageBus()
bus.subscribe("code_review", ReviewAgent())
bus.subscribe("code_review", TestAgent())

# 发布代码审查消息
bus.publish("code_review", "新的 PR 需要审查:#123")

冲突解决机制

class ConflictResolver:
    """冲突解决器"""
    
    def resolve(self, agent1: BaseAgent, agent2: BaseAgent, conflict: str) -> str:
        """
        当多个 Agent 给出矛盾建议时,投票决定
        """
        # 方案 1:主管裁决
        # 方案 2:投票机制
        # 方案 3:优先级排序
        
        votes = {
            agent1.name: self._evaluate_solution(agent1, conflict),
            agent2.name: self._evaluate_solution(agent2, conflict)
        }
        
        winner = max(votes, key=votes.get)
        return f"采用 {winner} 的方案"
    
    def _evaluate_solution(self, agent: BaseAgent, conflict: str) -> float:
        # 根据 Agent 的专业度评分
        expertise_score = {
            "研究员": 0.9,
            "程序员": 0.95,
            "测试员": 0.85
        }
        return expertise_score.get(agent.name, 0.5)

实际应用场景

场景 Agent 分工 协作方式
智能客服 意图识别、知识检索、工单处理 流水线
代码审查 语法检查、安全扫描、风格检查 并行执行
数据分析 数据清洗、特征工程、模型训练 层级式
内容创作 选题、写作、编辑、排版 流水线

总结

多 Agent 系统让 AI 应用从"单兵作战"升级为"团队协作":

  • 专业化:每个 Agent 专注一个领域
  • 并行化:多个任务同时处理
  • 容错性:单个 Agent 失败不影响整体

下一步:学习 AI Agent 的工具调用,让 Agent 能力无限扩展。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐