目前国内还是很缺AI人才的,希望更多人能真正加入到AI行业,共同促进行业进步。想要系统学习AI知识的朋友可以看看我的教程 http://blog.csdn.net/jiangjunshow,教程通俗易懂,风趣幽默,从深度学习基础原理到各领域实战应用都有讲解。

前言

各位AI入门的小伙伴们,前面咱们把单个Agent的“感知-决策-执行-记忆-工具调用”五大核心模块都讲完了——单个Agent已经能独立完成不少任务了!但遇到复杂任务,光靠一个Agent就不够了,就像咱们人类做大事需要团队协作一样,多Agent也需要“分工配合”才能搞定复杂场景~

比如:

  • 想运营一家线上奶茶店,需要“客服Agent”(回复用户咨询)、“订单Agent”(处理下单支付)、“配送Agent”(安排送货)、“库存Agent”(管理原料库存),四个Agent各司其职又相互配合;
  • 想做一个智能办公系统,需要“文档Agent”(处理文件)、“会议Agent”(安排会议)、“提醒Agent”(发送通知)、“数据分析Agent”(生成报告),协同完成办公流程。

今天咱们就聊聊多Agent的“团队协作机制”——协作模块到底是怎么让多个Agent“互通有无、分工合作”的?重点讲通信方式和协同逻辑,搭配Python代码实现一个简单的多Agent协作系统,看完你就懂多Agent是怎么“组队干活”的!

一、先搞懂:协作模块的本质——“Agent之间的桥梁”

咱们先举个生活例子:一个创业团队做项目,需要:

  • 产品经理(定需求)、程序员(写代码)、设计师(做UI)、测试员(找bug);
  • 他们之间需要沟通(开会、发消息)、分工(各自负责擅长的部分)、配合(程序员写完代码交给测试员,设计师做完UI交给程序员)。

多Agent的协作模块也是这个逻辑!它的核心作用就是:

  1. 建立通信:让多个Agent能“说话”(传递信息),比如AAgent把任务进度告诉BAgent;
  2. 明确分工:根据每个Agent的“特长”分配任务,比如让擅长处理文件的Agent负责读数据,让擅长计算的Agent负责分析;
  3. 协调流程:确保Agent之间的工作衔接顺畅,比如AAgent完成任务后,自动通知BAgent开始下一步;
  4. 解决冲突:遇到资源竞争(比如同时调用同一个工具)或任务矛盾时,协调处理。

用一句话总结:协作模块就是多Agent团队的“项目经理+通信工具”,既要让大家能沟通,又要让大家能高效配合完成任务

二、多Agent协作的2个核心:通信方式+分工模式

要实现多Agent协作,首先得解决“怎么沟通”和“怎么分工”两个问题,咱们用大白话拆解:

1. 通信方式:Agent之间“怎么说话”

Agent不像人类能直接聊天,需要通过特定的“通信协议”传递信息,就像人类用微信、电话、邮件沟通一样。常见的通信方式有3种,简单易懂:

(1)直接消息传递(一对一聊天)

就像你和同事单独发微信——两个Agent之间直接发送信息,不需要中间环节。

  • 特点:简单、快速,适合两个Agent之间的私密沟通;
  • 例子:订单Agent处理完支付后,直接给配送Agent发消息:“订单123已支付,地址XX路,请安排配送”。
(2)消息队列(群聊/公告栏)

就像公司群聊——多个Agent都能往“消息队列”里发消息,也能从里面读消息,不需要知道消息是谁发的,只要关注自己需要的信息。

  • 特点:解耦(Agent之间不用互相认识)、可扩展(随时加新Agent);
  • 例子:库存Agent把“珍珠原料不足”的消息发到消息队列,客服Agent、订单Agent都会读到,客服Agent会提醒用户“珍珠暂时缺货”,订单Agent会暂停珍珠奶茶的下单。
(3)共享知识库(共享文件夹)

就像公司共享盘——多个Agent都能读写同一个知识库(比如数据库、文件),通过共享数据实现间接沟通。

  • 特点:适合传递结构化数据(比如订单信息、库存数据),多个Agent可以同时访问;
  • 例子:所有Agent都能读写“奶茶店订单数据库”,订单Agent新增订单,配送Agent读取订单,库存Agent更新原料消耗,不用直接发消息,通过共享数据就能协同。

2. 分工模式:Agent之间“怎么干活”

分工是协作的基础,就像团队分工一样,多Agent也有3种常见的分工模式,根据任务场景选择:

(1)管道式分工(流水线干活)

就像工厂流水线——每个Agent负责一个环节,上一个Agent完成后,把结果交给下一个Agent,直到任务完成。

  • 特点:流程固定、效率高,适合步骤明确的线性任务;
  • 例子:奶茶店订单处理流程:用户咨询(客服Agent)→ 下单支付(订单Agent)→ 扣减库存(库存Agent)→ 安排配送(配送Agent)。
(2)功能式分工(各司其职)

就像公司部门分工——每个Agent负责一个特定领域的功能,需要时调用对应Agent的能力。

  • 特点:灵活、可复用,适合复杂多变的任务;
  • 例子:智能办公系统:处理文件(文档Agent)、安排会议(会议Agent)、数据分析(数据Agent),用户需要什么功能,就调用对应的Agent。
(3)角色式分工(临时组队)

就像项目组临时组队——根据具体任务,给Agent分配临时角色,任务完成后角色解散。

  • 特点:灵活适配不同任务,适合一次性、复杂的任务;
  • 例子:做“奶茶店促销活动”任务,临时分配:客服Agent(负责宣传)、订单Agent(负责优惠核销)、库存Agent(负责备货)、数据Agent(负责统计销量),活动结束后,角色解散,Agent回归各自常规工作。

三、代码实战:实现一个“奶茶店多Agent协作系统”

光说不练假把式,咱们用Python实现一个简单的奶茶店协作系统,包含4个Agent:客服Agent、订单Agent、库存Agent、配送Agent,采用“消息队列通信”和“管道式分工”,模拟完整的订单处理流程。

第一步:搭建通信工具——消息队列(群聊)

首先实现一个简单的消息队列,让4个Agent能互相通信:

# 1. 实现简单的消息队列(通信工具)
class MessageQueue:
    def __init__(self):
        self.queue = []  # 存储消息的队列
    
    # 发送消息
    def send(self, sender, receiver, message_type, content):
        """
        sender: 发送者(Agent名称)
        receiver: 接收者(Agent名称,None表示广播)
        message_type: 消息类型(比如"order"、"stock"、"delivery")
        content: 消息内容(字典格式)
        """
        message = {
            "sender": sender,
            "receiver": receiver,
            "type": message_type,
            "content": content,
            "timestamp": pd.Timestamp.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        self.queue.append(message)
        print(f"\n【消息队列】{sender} 发送给 {receiver or '所有人'}{message_type} - {content}")
    
    # 接收消息(某个Agent读取自己的消息)
    def receive(self, receiver):
        """receiver: 接收者(Agent名称),返回所有发给它的消息"""
        received = []
        remaining = []
        for msg in self.queue:
            if msg["receiver"] == receiver or msg["receiver"] is None:
                received.append(msg)
            else:
                remaining.append(msg)
        self.queue = remaining  # 移除已接收的消息
        return received

第二步:实现4个Agent(各自分工)

每个Agent都有自己的核心功能,通过消息队列和其他Agent通信:

import pandas as pd
from datetime import datetime

# 2. 实现各个Agent(继承基础Agent类)
class BaseAgent:
    def __init__(self, name, message_queue):
        self.name = name  # Agent名称
        self.message_queue = message_queue  # 消息队列(通信工具)
        self.memory = {}  # 每个Agent自己的记忆

# 客服Agent:处理用户咨询,创建订单请求
class CustomerServiceAgent(BaseAgent):
    def handle_user_query(self, user_name, order_info):
        """处理用户下单咨询,发送订单请求给订单Agent"""
        print(f"\n【{self.name}】收到用户{user_name}的下单请求:{order_info}")
        
        # 发送消息给订单Agent,请求创建订单
        self.message_queue.send(
            sender=self.name,
            receiver="OrderAgent",
            message_type="create_order",
            content={"user_name": user_name, "order_info": order_info}
        )

# 订单Agent:处理下单支付,通知库存和配送
class OrderAgent(BaseAgent):
    def __init__(self, name, message_queue):
        super().__init__(name, message_queue)
        self.order_id = 1  # 订单编号自增
    
    def process_messages(self):
        """处理收到的消息(循环监听)"""
        messages = self.message_queue.receive(self.name)
        for msg in messages:
            if msg["type"] == "create_order":
                # 处理创建订单请求
                user_name = msg["content"]["user_name"]
                order_info = msg["content"]["order_info"]
                print(f"\n【{self.name}】处理订单请求:用户{user_name},订单信息{order_info}")
                
                # 模拟创建订单(生成订单编号、计算金额)
                order = {
                    "order_id": f"ORDER{self.order_id:04d}",
                    "user_name": user_name,
                    "flavor": order_info["flavor"],
                    "quantity": order_info["quantity"],
                    "price": 18 * order_info["quantity"],  # 假设每杯18元
                    "status": "pending_payment"
                }
                self.order_id += 1
                self.memory[order["order_id"]] = order  # 记忆订单信息
                print(f"【{self.name}】创建订单成功:{order}")
                
                # 模拟支付成功(实际场景中需要调用支付工具)
                order["status"] = "paid"
                self.memory[order["order_id"]] = order
                print(f"【{self.name}】订单支付成功:{order['order_id']}")
                
                # 1. 通知库存Agent:扣减原料
                self.message_queue.send(
                    sender=self.name,
                    receiver="StockAgent",
                    message_type="deduct_stock",
                    content={"order_id": order["order_id"], "flavor": order["flavor"], "quantity": order["quantity"]}
                )
                
                # 2. 通知配送Agent:安排配送
                self.message_queue.send(
                    sender=self.name,
                    receiver="DeliveryAgent",
                    message_type="arrange_delivery",
                    content={"order_id": order["order_id"], "user_name": user_name, "address": order_info["address"]}
                )

# 库存Agent:管理原料库存,处理扣减请求
class StockAgent(BaseAgent):
    def __init__(self, name, message_queue):
        super().__init__(name, message_queue)
        # 初始化库存(珍珠、椰果、芋圆)
        self.memory["stock"] = {
            "珍珠": 50,
            "椰果": 30,
            "芋圆": 20
        }
    
    def process_messages(self):
        """处理收到的消息"""
        messages = self.message_queue.receive(self.name)
        for msg in messages:
            if msg["type"] == "deduct_stock":
                order_id = msg["content"]["order_id"]
                flavor = msg["content"]["flavor"]
                quantity = msg["content"]["quantity"]
                topping = flavor.split("+")[-1]  # 假设口味格式是"三分糖+珍珠",提取配料
                print(f"\n【{self.name}】收到扣减库存请求:订单{order_id}{topping}×{quantity}")
                
                # 检查库存是否充足
                if self.memory["stock"][topping] >= quantity:
                    # 扣减库存
                    self.memory["stock"][topping] -= quantity
                    print(f"【{self.name}】库存扣减成功,剩余{topping}{self.memory['stock'][topping]}份")
                    
                    # 通知订单Agent:库存充足,可继续处理
                    self.message_queue.send(
                        sender=self.name,
                        receiver="OrderAgent",
                        message_type="stock_available",
                        content={"order_id": order_id, "status": "success"}
                    )
                else:
                    # 库存不足,通知订单Agent
                    print(f"【{self.name}】库存不足:{topping}仅剩{self.memory['stock'][topping]}份,无法满足订单{order_id}")
                    self.message_queue.send(
                        sender=self.name,
                        receiver="OrderAgent",
                        message_type="stock_available",
                        content={"order_id": order_id, "status": "failed", "reason": f"{topping}库存不足"}
                    )

# 配送Agent:安排配送,反馈配送状态
class DeliveryAgent(BaseAgent):
    def __init__(self, name, message_queue):
        super().__init__(name, message_queue)
        self.delivery_id = 1  # 配送单编号自增
    
    def process_messages(self):
        """处理收到的消息"""
        messages = self.message_queue.receive(self.name)
        for msg in messages:
            if msg["type"] == "arrange_delivery":
                order_id = msg["content"]["order_id"]
                user_name = msg["content"]["user_name"]
                address = msg["content"]["address"]
                print(f"\n【{self.name}】收到配送请求:订单{order_id},用户{user_name},地址{address}")
                
                # 模拟创建配送单
                delivery_order = {
                    "delivery_id": f"DEL{self.delivery_id:04d}",
                    "order_id": order_id,
                    "user_name": user_name,
                    "address": address,
                    "status": "pending_delivery"
                }
                self.delivery_id += 1
                self.memory[delivery_order["delivery_id"]] = delivery_order
                print(f"【{self.name}】创建配送单成功:{delivery_order}")
                
                # 模拟配送完成
                delivery_order["status"] = "delivered"
                delivery_order["delivery_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                self.memory[delivery_order["delivery_id"]] = delivery_order
                print(f"【{self.name}】订单{order_id}配送完成,送达时间:{delivery_order['delivery_time']}")
                
                # 广播配送完成消息(所有Agent都能收到)
                self.message_queue.send(
                    sender=self.name,
                    receiver=None,  # 广播
                    message_type="delivery_completed",
                    content={"order_id": order_id, "delivery_id": delivery_order["delivery_id"], "status": "success"}
                )

第三步:搭建协作系统,模拟完整流程

将4个Agent和消息队列整合,模拟用户下单→订单处理→库存扣减→配送完成的完整协作流程:

# 3. 主程序:多Agent协作流程
if __name__ == "__main__":
    print("=== 奶茶店多Agent协作系统启动 ===")
    
    # 初始化消息队列(通信工具)
    message_queue = MessageQueue()
    
    # 初始化各个Agent
    cs_agent = CustomerServiceAgent("CustomerServiceAgent", message_queue)
    order_agent = OrderAgent("OrderAgent", message_queue)
    stock_agent = StockAgent("StockAgent", message_queue)
    delivery_agent = DeliveryAgent("DeliveryAgent", message_queue)
    
    # 模拟用户下单(触发协作流程)
    user_name = "小明"
    order_info = {
        "flavor": "三分糖+珍珠",
        "quantity": 2,
        "address": "XX路123号"
    }
    print(f"\n=== 模拟用户{user_name}下单 ===")
    cs_agent.handle_user_query(user_name, order_info)
    
    # 各个Agent处理消息(模拟协作流程)
    print("\n=== 开始处理协作任务 ===")
    # 1. 订单Agent处理创建订单请求
    order_agent.process_messages()
    
    # 2. 库存Agent处理扣减库存请求
    stock_agent.process_messages()
    
    # 3. 配送Agent处理配送请求
    delivery_agent.process_messages()
    
    # 4. 订单Agent接收库存和配送反馈(可选,这里简化流程)
    order_agent.process_messages()
    
    # 输出最终状态
    print("\n=== 协作流程结束 ===")
    print(f"订单Agent的订单记录:{list(order_agent.memory.values())}")
    print(f"库存Agent的剩余库存:{stock_agent.memory['stock']}")
    print(f"配送Agent的配送记录:{list(delivery_agent.memory.values())}")

运行结果(完整协作流程)

=== 奶茶店多Agent协作系统启动 ===

=== 模拟用户小明下单 ===

【CustomerServiceAgent】收到用户小明的下单请求:{'flavor': '三分糖+珍珠', 'quantity': 2, 'address': 'XX路123号'}

【消息队列】CustomerServiceAgent 发送给 OrderAgent:create_order - {'user_name': '小明', 'order_info': {'flavor': '三分糖+珍珠', 'quantity': 2, 'address': 'XX路123号'}}

=== 开始处理协作任务 ===

【OrderAgent】处理订单请求:用户小明,订单信息{'flavor': '三分糖+珍珠', 'quantity': 2, 'address': 'XX路123号'}
【OrderAgent】创建订单成功:{'order_id': 'ORDER0001', 'user_name': '小明', 'flavor': '三分糖+珍珠', 'quantity': 2, 'price': 36, 'status': 'pending_payment'}
【OrderAgent】订单支付成功:ORDER0001

【消息队列】OrderAgent 发送给 StockAgent:deduct_stock - {'order_id': 'ORDER0001', 'flavor': '三分糖+珍珠', 'quantity': 2}

【消息队列】OrderAgent 发送给 DeliveryAgent:arrange_delivery - {'order_id': 'ORDER0001', 'user_name': '小明', 'address': 'XX路123号'}

【StockAgent】收到扣减库存请求:订单ORDER0001,珍珠×2
【StockAgent】库存扣减成功,剩余珍珠:48份

【消息队列】StockAgent 发送给 OrderAgent:stock_available - {'order_id': 'ORDER0001', 'status': 'success'}

【DeliveryAgent】收到配送请求:订单ORDER0001,用户小明,地址XX路123号
【DeliveryAgent】创建配送单成功:{'delivery_id': 'DEL0001', 'order_id': 'ORDER0001', 'user_name': '小明', 'address': 'XX路123号', 'status': 'pending_delivery'}
【DeliveryAgent】订单ORDER0001配送完成,送达时间:2024-05-21 16:45:30

【消息队列】DeliveryAgent 发送给 所有人:delivery_completed - {'order_id': 'ORDER0001', 'delivery_id': 'DEL0001', 'status': 'success'}

=== 协作流程结束 ===
订单Agent的订单记录:[{'order_id': 'ORDER0001', 'user_name': '小明', 'flavor': '三分糖+珍珠', 'quantity': 2, 'price': 36, 'status': 'paid'}]
库存Agent的剩余库存:{'珍珠': 48, '椰果': 30, '芋圆': 20}
配送Agent的配送记录:[{'delivery_id': 'DEL0001', 'order_id': 'ORDER0001', 'user_name': '小明', 'address': 'XX路123号', 'status': 'delivered', 'delivery_time': '2024-05-21 16:45:30'}]

看!4个Agent通过消息队列通信,按流水线分工完成了整个订单流程:客服Agent接收请求→订单Agent创建订单→库存Agent扣减原料→配送Agent完成配送,每个环节自动衔接,不用人工干预——这就是多Agent协作的魅力!

四、多Agent协作的3个关键技巧(避坑指南)

实际开发中,多Agent协作会遇到各种问题,分享3个最实用的技巧:

1. 统一消息格式——避免“鸡同鸭讲”

多个Agent来自不同模块,消息格式不统一会导致无法解析。建议统一消息格式,包含“发送者、接收者、消息类型、内容、时间戳”,就像咱们代码里的消息结构:

message = {
    "sender": "AgentA",  # 谁发的
    "receiver": "AgentB",  # 发给谁
    "type": "order",  # 消息类型(方便Agent判断如何处理)
    "content": {},  # 具体内容(字典格式,结构化数据)
    "timestamp": "2024-05-21 16:00:00"  # 时间戳(用于排序和追溯)
}

2. 处理消息冲突——避免“抢资源”

比如两个Agent同时请求扣减同一原料的库存,会导致库存数据错误。这时候需要加“锁机制”,确保同一时间只有一个Agent能操作资源:

import threading

class StockAgent(BaseAgent):
    def __init__(self, name, message_queue):
        super().__init__(name, message_queue)
        self.memory["stock"] = {"珍珠": 50, "椰果": 30, "芋圆": 20}
        self.lock = threading.Lock()  # 加锁
    
    def process_messages(self):
        for msg in self.message_queue.receive(self.name):
            if msg["type"] == "deduct_stock":
                with self.lock:  # 同一时间只有一个线程能执行
                    # 扣减库存逻辑...

3. 状态同步——确保“信息一致”

多Agent协作时,某个Agent的状态变化需要及时同步给其他相关Agent,避免信息滞后。比如库存Agent的原料不足时,要广播给所有Agent,让客服Agent停止接受相关订单:

# 库存Agent广播库存不足消息
self.message_queue.send(
    sender=self.name,
    receiver=None,  # 广播
    message_type="stock_low",
    content={"topping": topping, "remaining": self.memory["stock"][topping]}
)

# 客服Agent接收广播,更新自己的记忆
def process_messages(self):
    messages = self.message_queue.receive(self.name)
    for msg in messages:
        if msg["type"] == "stock_low":
            self.memory["out_of_stock"] = msg["content"]["topping"]
            print(f"【{self.name}】收到库存不足通知,暂停接受{msg['content']['topping']}相关订单")

五、总结:多Agent协作的核心是什么?

其实多Agent协作的核心很简单:“通信+分工+同步”

  • 通信:让Agent之间能顺畅传递信息,不用“各自为战”;
  • 分工:让每个Agent做自己擅长的事,提高整体效率;
  • 同步:确保所有Agent的信息一致,避免冲突和错误。

有了协作模块,Agent就能从“单打独斗”变成“团队作战”,搞定单个Agent搞不定的复杂任务:

  • 电商平台:客服Agent、订单Agent、物流Agent、售后Agent协作;
  • 智能工厂:生产Agent、质检Agent、仓储Agent、运输Agent协作;
  • 医疗系统:诊断Agent、开药Agent、护理Agent、缴费Agent协作。

如果觉得这篇文章好懂、实用,别忘了点赞、转发,让更多人一起入门AI~ 有任何问题,评论区留言交流呀!
在这里插入图片描述

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐