【Agent从入门到实践】14 协作模块:多Agent如何“分工配合”
各位程序员小伙伴,咱们AI Agent系列终于讲到“团队协作”了!前面咱们搞定了单个Agent的“大脑”(决策)、“记忆”(存储)、“手脚”(工具调用),但实际工作中很多任务不是单个Agent能搞定的——比如让Agent完成“市场调研报告”,可能需要一个Agent查数据、一个Agent分析数据、一个Agent写报告,这就需要多Agent协作。
文章目录
目前国内还是很缺AI人才的,希望更多人能真正加入到AI行业,共同促进行业进步。想要系统学习AI知识的朋友可以看看我的教程 http://blog.csdn.net/jiangjunshow,教程通俗易懂,风趣幽默,从深度学习基础原理到各领域实战应用都有讲解。
前言
各位AI入门的小伙伴们,前面咱们把单个Agent的“感知-决策-执行-记忆-工具调用”五大核心模块都讲完了——单个Agent已经能独立完成不少任务了!但遇到复杂任务,光靠一个Agent就不够了,就像咱们人类做大事需要团队协作一样,多Agent也需要“分工配合”才能搞定复杂场景~
比如:
- 想运营一家线上奶茶店,需要“客服Agent”(回复用户咨询)、“订单Agent”(处理下单支付)、“配送Agent”(安排送货)、“库存Agent”(管理原料库存),四个Agent各司其职又相互配合;
- 想做一个智能办公系统,需要“文档Agent”(处理文件)、“会议Agent”(安排会议)、“提醒Agent”(发送通知)、“数据分析Agent”(生成报告),协同完成办公流程。
今天咱们就聊聊多Agent的“团队协作机制”——协作模块到底是怎么让多个Agent“互通有无、分工合作”的?重点讲通信方式和协同逻辑,搭配Python代码实现一个简单的多Agent协作系统,看完你就懂多Agent是怎么“组队干活”的!
一、先搞懂:协作模块的本质——“Agent之间的桥梁”
咱们先举个生活例子:一个创业团队做项目,需要:
- 产品经理(定需求)、程序员(写代码)、设计师(做UI)、测试员(找bug);
- 他们之间需要沟通(开会、发消息)、分工(各自负责擅长的部分)、配合(程序员写完代码交给测试员,设计师做完UI交给程序员)。
多Agent的协作模块也是这个逻辑!它的核心作用就是:
- 建立通信:让多个Agent能“说话”(传递信息),比如AAgent把任务进度告诉BAgent;
- 明确分工:根据每个Agent的“特长”分配任务,比如让擅长处理文件的Agent负责读数据,让擅长计算的Agent负责分析;
- 协调流程:确保Agent之间的工作衔接顺畅,比如AAgent完成任务后,自动通知BAgent开始下一步;
- 解决冲突:遇到资源竞争(比如同时调用同一个工具)或任务矛盾时,协调处理。
用一句话总结:协作模块就是多Agent团队的“项目经理+通信工具”,既要让大家能沟通,又要让大家能高效配合完成任务 。
二、多Agent协作的2个核心:通信方式+分工模式
要实现多Agent协作,首先得解决“怎么沟通”和“怎么分工”两个问题,咱们用大白话拆解:
1. 通信方式:Agent之间“怎么说话”
Agent不像人类能直接聊天,需要通过特定的“通信协议”传递信息,就像人类用微信、电话、邮件沟通一样。常见的通信方式有3种,简单易懂:
(1)直接消息传递(一对一聊天)
就像你和同事单独发微信——两个Agent之间直接发送信息,不需要中间环节。
- 特点:简单、快速,适合两个Agent之间的私密沟通;
- 例子:订单Agent处理完支付后,直接给配送Agent发消息:“订单123已支付,地址XX路,请安排配送”。
(2)消息队列(群聊/公告栏)
就像公司群聊——多个Agent都能往“消息队列”里发消息,也能从里面读消息,不需要知道消息是谁发的,只要关注自己需要的信息。
- 特点:解耦(Agent之间不用互相认识)、可扩展(随时加新Agent);
- 例子:库存Agent把“珍珠原料不足”的消息发到消息队列,客服Agent、订单Agent都会读到,客服Agent会提醒用户“珍珠暂时缺货”,订单Agent会暂停珍珠奶茶的下单。
(3)共享知识库(共享文件夹)
就像公司共享盘——多个Agent都能读写同一个知识库(比如数据库、文件),通过共享数据实现间接沟通。
- 特点:适合传递结构化数据(比如订单信息、库存数据),多个Agent可以同时访问;
- 例子:所有Agent都能读写“奶茶店订单数据库”,订单Agent新增订单,配送Agent读取订单,库存Agent更新原料消耗,不用直接发消息,通过共享数据就能协同。
2. 分工模式:Agent之间“怎么干活”
分工是协作的基础,就像团队分工一样,多Agent也有3种常见的分工模式,根据任务场景选择:
(1)管道式分工(流水线干活)
就像工厂流水线——每个Agent负责一个环节,上一个Agent完成后,把结果交给下一个Agent,直到任务完成。
- 特点:流程固定、效率高,适合步骤明确的线性任务;
- 例子:奶茶店订单处理流程:用户咨询(客服Agent)→ 下单支付(订单Agent)→ 扣减库存(库存Agent)→ 安排配送(配送Agent)。
(2)功能式分工(各司其职)
就像公司部门分工——每个Agent负责一个特定领域的功能,需要时调用对应Agent的能力。
- 特点:灵活、可复用,适合复杂多变的任务;
- 例子:智能办公系统:处理文件(文档Agent)、安排会议(会议Agent)、数据分析(数据Agent),用户需要什么功能,就调用对应的Agent。
(3)角色式分工(临时组队)
就像项目组临时组队——根据具体任务,给Agent分配临时角色,任务完成后角色解散。
- 特点:灵活适配不同任务,适合一次性、复杂的任务;
- 例子:做“奶茶店促销活动”任务,临时分配:客服Agent(负责宣传)、订单Agent(负责优惠核销)、库存Agent(负责备货)、数据Agent(负责统计销量),活动结束后,角色解散,Agent回归各自常规工作。
三、代码实战:实现一个“奶茶店多Agent协作系统”
光说不练假把式,咱们用Python实现一个简单的奶茶店协作系统,包含4个Agent:客服Agent、订单Agent、库存Agent、配送Agent,采用“消息队列通信”和“管道式分工”,模拟完整的订单处理流程。
第一步:搭建通信工具——消息队列(群聊)
首先实现一个简单的消息队列,让4个Agent能互相通信:
# 1. 实现简单的消息队列(通信工具)
class MessageQueue:
def __init__(self):
self.queue = [] # 存储消息的队列
# 发送消息
def send(self, sender, receiver, message_type, content):
"""
sender: 发送者(Agent名称)
receiver: 接收者(Agent名称,None表示广播)
message_type: 消息类型(比如"order"、"stock"、"delivery")
content: 消息内容(字典格式)
"""
message = {
"sender": sender,
"receiver": receiver,
"type": message_type,
"content": content,
"timestamp": pd.Timestamp.now().strftime("%Y-%m-%d %H:%M:%S")
}
self.queue.append(message)
print(f"\n【消息队列】{sender} 发送给 {receiver or '所有人'}:{message_type} - {content}")
# 接收消息(某个Agent读取自己的消息)
def receive(self, receiver):
"""receiver: 接收者(Agent名称),返回所有发给它的消息"""
received = []
remaining = []
for msg in self.queue:
if msg["receiver"] == receiver or msg["receiver"] is None:
received.append(msg)
else:
remaining.append(msg)
self.queue = remaining # 移除已接收的消息
return received
第二步:实现4个Agent(各自分工)
每个Agent都有自己的核心功能,通过消息队列和其他Agent通信:
import pandas as pd
from datetime import datetime
# 2. 实现各个Agent(继承基础Agent类)
class BaseAgent:
def __init__(self, name, message_queue):
self.name = name # Agent名称
self.message_queue = message_queue # 消息队列(通信工具)
self.memory = {} # 每个Agent自己的记忆
# 客服Agent:处理用户咨询,创建订单请求
class CustomerServiceAgent(BaseAgent):
def handle_user_query(self, user_name, order_info):
"""处理用户下单咨询,发送订单请求给订单Agent"""
print(f"\n【{self.name}】收到用户{user_name}的下单请求:{order_info}")
# 发送消息给订单Agent,请求创建订单
self.message_queue.send(
sender=self.name,
receiver="OrderAgent",
message_type="create_order",
content={"user_name": user_name, "order_info": order_info}
)
# 订单Agent:处理下单支付,通知库存和配送
class OrderAgent(BaseAgent):
def __init__(self, name, message_queue):
super().__init__(name, message_queue)
self.order_id = 1 # 订单编号自增
def process_messages(self):
"""处理收到的消息(循环监听)"""
messages = self.message_queue.receive(self.name)
for msg in messages:
if msg["type"] == "create_order":
# 处理创建订单请求
user_name = msg["content"]["user_name"]
order_info = msg["content"]["order_info"]
print(f"\n【{self.name}】处理订单请求:用户{user_name},订单信息{order_info}")
# 模拟创建订单(生成订单编号、计算金额)
order = {
"order_id": f"ORDER{self.order_id:04d}",
"user_name": user_name,
"flavor": order_info["flavor"],
"quantity": order_info["quantity"],
"price": 18 * order_info["quantity"], # 假设每杯18元
"status": "pending_payment"
}
self.order_id += 1
self.memory[order["order_id"]] = order # 记忆订单信息
print(f"【{self.name}】创建订单成功:{order}")
# 模拟支付成功(实际场景中需要调用支付工具)
order["status"] = "paid"
self.memory[order["order_id"]] = order
print(f"【{self.name}】订单支付成功:{order['order_id']}")
# 1. 通知库存Agent:扣减原料
self.message_queue.send(
sender=self.name,
receiver="StockAgent",
message_type="deduct_stock",
content={"order_id": order["order_id"], "flavor": order["flavor"], "quantity": order["quantity"]}
)
# 2. 通知配送Agent:安排配送
self.message_queue.send(
sender=self.name,
receiver="DeliveryAgent",
message_type="arrange_delivery",
content={"order_id": order["order_id"], "user_name": user_name, "address": order_info["address"]}
)
# 库存Agent:管理原料库存,处理扣减请求
class StockAgent(BaseAgent):
def __init__(self, name, message_queue):
super().__init__(name, message_queue)
# 初始化库存(珍珠、椰果、芋圆)
self.memory["stock"] = {
"珍珠": 50,
"椰果": 30,
"芋圆": 20
}
def process_messages(self):
"""处理收到的消息"""
messages = self.message_queue.receive(self.name)
for msg in messages:
if msg["type"] == "deduct_stock":
order_id = msg["content"]["order_id"]
flavor = msg["content"]["flavor"]
quantity = msg["content"]["quantity"]
topping = flavor.split("+")[-1] # 假设口味格式是"三分糖+珍珠",提取配料
print(f"\n【{self.name}】收到扣减库存请求:订单{order_id},{topping}×{quantity}")
# 检查库存是否充足
if self.memory["stock"][topping] >= quantity:
# 扣减库存
self.memory["stock"][topping] -= quantity
print(f"【{self.name}】库存扣减成功,剩余{topping}:{self.memory['stock'][topping]}份")
# 通知订单Agent:库存充足,可继续处理
self.message_queue.send(
sender=self.name,
receiver="OrderAgent",
message_type="stock_available",
content={"order_id": order_id, "status": "success"}
)
else:
# 库存不足,通知订单Agent
print(f"【{self.name}】库存不足:{topping}仅剩{self.memory['stock'][topping]}份,无法满足订单{order_id}")
self.message_queue.send(
sender=self.name,
receiver="OrderAgent",
message_type="stock_available",
content={"order_id": order_id, "status": "failed", "reason": f"{topping}库存不足"}
)
# 配送Agent:安排配送,反馈配送状态
class DeliveryAgent(BaseAgent):
def __init__(self, name, message_queue):
super().__init__(name, message_queue)
self.delivery_id = 1 # 配送单编号自增
def process_messages(self):
"""处理收到的消息"""
messages = self.message_queue.receive(self.name)
for msg in messages:
if msg["type"] == "arrange_delivery":
order_id = msg["content"]["order_id"]
user_name = msg["content"]["user_name"]
address = msg["content"]["address"]
print(f"\n【{self.name}】收到配送请求:订单{order_id},用户{user_name},地址{address}")
# 模拟创建配送单
delivery_order = {
"delivery_id": f"DEL{self.delivery_id:04d}",
"order_id": order_id,
"user_name": user_name,
"address": address,
"status": "pending_delivery"
}
self.delivery_id += 1
self.memory[delivery_order["delivery_id"]] = delivery_order
print(f"【{self.name}】创建配送单成功:{delivery_order}")
# 模拟配送完成
delivery_order["status"] = "delivered"
delivery_order["delivery_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.memory[delivery_order["delivery_id"]] = delivery_order
print(f"【{self.name}】订单{order_id}配送完成,送达时间:{delivery_order['delivery_time']}")
# 广播配送完成消息(所有Agent都能收到)
self.message_queue.send(
sender=self.name,
receiver=None, # 广播
message_type="delivery_completed",
content={"order_id": order_id, "delivery_id": delivery_order["delivery_id"], "status": "success"}
)
第三步:搭建协作系统,模拟完整流程
将4个Agent和消息队列整合,模拟用户下单→订单处理→库存扣减→配送完成的完整协作流程:
# 3. 主程序:多Agent协作流程
if __name__ == "__main__":
print("=== 奶茶店多Agent协作系统启动 ===")
# 初始化消息队列(通信工具)
message_queue = MessageQueue()
# 初始化各个Agent
cs_agent = CustomerServiceAgent("CustomerServiceAgent", message_queue)
order_agent = OrderAgent("OrderAgent", message_queue)
stock_agent = StockAgent("StockAgent", message_queue)
delivery_agent = DeliveryAgent("DeliveryAgent", message_queue)
# 模拟用户下单(触发协作流程)
user_name = "小明"
order_info = {
"flavor": "三分糖+珍珠",
"quantity": 2,
"address": "XX路123号"
}
print(f"\n=== 模拟用户{user_name}下单 ===")
cs_agent.handle_user_query(user_name, order_info)
# 各个Agent处理消息(模拟协作流程)
print("\n=== 开始处理协作任务 ===")
# 1. 订单Agent处理创建订单请求
order_agent.process_messages()
# 2. 库存Agent处理扣减库存请求
stock_agent.process_messages()
# 3. 配送Agent处理配送请求
delivery_agent.process_messages()
# 4. 订单Agent接收库存和配送反馈(可选,这里简化流程)
order_agent.process_messages()
# 输出最终状态
print("\n=== 协作流程结束 ===")
print(f"订单Agent的订单记录:{list(order_agent.memory.values())}")
print(f"库存Agent的剩余库存:{stock_agent.memory['stock']}")
print(f"配送Agent的配送记录:{list(delivery_agent.memory.values())}")
运行结果(完整协作流程)
=== 奶茶店多Agent协作系统启动 ===
=== 模拟用户小明下单 ===
【CustomerServiceAgent】收到用户小明的下单请求:{'flavor': '三分糖+珍珠', 'quantity': 2, 'address': 'XX路123号'}
【消息队列】CustomerServiceAgent 发送给 OrderAgent:create_order - {'user_name': '小明', 'order_info': {'flavor': '三分糖+珍珠', 'quantity': 2, 'address': 'XX路123号'}}
=== 开始处理协作任务 ===
【OrderAgent】处理订单请求:用户小明,订单信息{'flavor': '三分糖+珍珠', 'quantity': 2, 'address': 'XX路123号'}
【OrderAgent】创建订单成功:{'order_id': 'ORDER0001', 'user_name': '小明', 'flavor': '三分糖+珍珠', 'quantity': 2, 'price': 36, 'status': 'pending_payment'}
【OrderAgent】订单支付成功:ORDER0001
【消息队列】OrderAgent 发送给 StockAgent:deduct_stock - {'order_id': 'ORDER0001', 'flavor': '三分糖+珍珠', 'quantity': 2}
【消息队列】OrderAgent 发送给 DeliveryAgent:arrange_delivery - {'order_id': 'ORDER0001', 'user_name': '小明', 'address': 'XX路123号'}
【StockAgent】收到扣减库存请求:订单ORDER0001,珍珠×2
【StockAgent】库存扣减成功,剩余珍珠:48份
【消息队列】StockAgent 发送给 OrderAgent:stock_available - {'order_id': 'ORDER0001', 'status': 'success'}
【DeliveryAgent】收到配送请求:订单ORDER0001,用户小明,地址XX路123号
【DeliveryAgent】创建配送单成功:{'delivery_id': 'DEL0001', 'order_id': 'ORDER0001', 'user_name': '小明', 'address': 'XX路123号', 'status': 'pending_delivery'}
【DeliveryAgent】订单ORDER0001配送完成,送达时间:2024-05-21 16:45:30
【消息队列】DeliveryAgent 发送给 所有人:delivery_completed - {'order_id': 'ORDER0001', 'delivery_id': 'DEL0001', 'status': 'success'}
=== 协作流程结束 ===
订单Agent的订单记录:[{'order_id': 'ORDER0001', 'user_name': '小明', 'flavor': '三分糖+珍珠', 'quantity': 2, 'price': 36, 'status': 'paid'}]
库存Agent的剩余库存:{'珍珠': 48, '椰果': 30, '芋圆': 20}
配送Agent的配送记录:[{'delivery_id': 'DEL0001', 'order_id': 'ORDER0001', 'user_name': '小明', 'address': 'XX路123号', 'status': 'delivered', 'delivery_time': '2024-05-21 16:45:30'}]
看!4个Agent通过消息队列通信,按流水线分工完成了整个订单流程:客服Agent接收请求→订单Agent创建订单→库存Agent扣减原料→配送Agent完成配送,每个环节自动衔接,不用人工干预——这就是多Agent协作的魅力!
四、多Agent协作的3个关键技巧(避坑指南)
实际开发中,多Agent协作会遇到各种问题,分享3个最实用的技巧:
1. 统一消息格式——避免“鸡同鸭讲”
多个Agent来自不同模块,消息格式不统一会导致无法解析。建议统一消息格式,包含“发送者、接收者、消息类型、内容、时间戳”,就像咱们代码里的消息结构:
message = {
"sender": "AgentA", # 谁发的
"receiver": "AgentB", # 发给谁
"type": "order", # 消息类型(方便Agent判断如何处理)
"content": {}, # 具体内容(字典格式,结构化数据)
"timestamp": "2024-05-21 16:00:00" # 时间戳(用于排序和追溯)
}
2. 处理消息冲突——避免“抢资源”
比如两个Agent同时请求扣减同一原料的库存,会导致库存数据错误。这时候需要加“锁机制”,确保同一时间只有一个Agent能操作资源:
import threading
class StockAgent(BaseAgent):
def __init__(self, name, message_queue):
super().__init__(name, message_queue)
self.memory["stock"] = {"珍珠": 50, "椰果": 30, "芋圆": 20}
self.lock = threading.Lock() # 加锁
def process_messages(self):
for msg in self.message_queue.receive(self.name):
if msg["type"] == "deduct_stock":
with self.lock: # 同一时间只有一个线程能执行
# 扣减库存逻辑...
3. 状态同步——确保“信息一致”
多Agent协作时,某个Agent的状态变化需要及时同步给其他相关Agent,避免信息滞后。比如库存Agent的原料不足时,要广播给所有Agent,让客服Agent停止接受相关订单:
# 库存Agent广播库存不足消息
self.message_queue.send(
sender=self.name,
receiver=None, # 广播
message_type="stock_low",
content={"topping": topping, "remaining": self.memory["stock"][topping]}
)
# 客服Agent接收广播,更新自己的记忆
def process_messages(self):
messages = self.message_queue.receive(self.name)
for msg in messages:
if msg["type"] == "stock_low":
self.memory["out_of_stock"] = msg["content"]["topping"]
print(f"【{self.name}】收到库存不足通知,暂停接受{msg['content']['topping']}相关订单")
五、总结:多Agent协作的核心是什么?
其实多Agent协作的核心很简单:“通信+分工+同步” 。
- 通信:让Agent之间能顺畅传递信息,不用“各自为战”;
- 分工:让每个Agent做自己擅长的事,提高整体效率;
- 同步:确保所有Agent的信息一致,避免冲突和错误。
有了协作模块,Agent就能从“单打独斗”变成“团队作战”,搞定单个Agent搞不定的复杂任务:
- 电商平台:客服Agent、订单Agent、物流Agent、售后Agent协作;
- 智能工厂:生产Agent、质检Agent、仓储Agent、运输Agent协作;
- 医疗系统:诊断Agent、开药Agent、护理Agent、缴费Agent协作。
如果觉得这篇文章好懂、实用,别忘了点赞、转发,让更多人一起入门AI~ 有任何问题,评论区留言交流呀!
更多推荐


所有评论(0)