基于电商的多智能体Handoffs模式实战方案
本文提出了一种电商多智能体协作系统方案,通过将业务流程拆解为咨询、购买、支付、订单管理等模块化智能体,实现全流程高效协同服务。方案设计了标准化的智能体交接机制和消息打印格式,确保各环节操作可追溯。系统采用微服务架构部署,基于RabbitMQ实现智能体间通信,并通过ELK日志系统存储分析操作日志。测试验证了正常流程和异常场景下的智能体协作能力,后续可结合用户画像优化交接策略,并引入机器学习实现异常预
一、方案背景与目标
随着电商业务的复杂化和用户需求的个性化,单一智能体已难以高效覆盖从用户咨询、商品推荐到订单履约全流程服务。多智能体Handoffs(交接)模式通过将业务流程拆解为不同模块,由专用智能体各司其职并实现无缝交接,可大幅提升服务效率与用户体验。
本方案核心目标:构建适配电商全流程的多智能体协作体系,明确各智能体职责边界与交接规则,针对购买、支付、订单等关键操作设计标准化消息打印机制,实现流程可追溯、问题可定位。
二、多智能体架构设计
2.1 智能体分类及核心职责
| 智能体类型 | 核心职责 | 关联业务节点 | 交接触发条件 |
|---|---|---|---|
| 用户咨询智能体 | 承接用户初始咨询、商品信息查询、活动规则解读、常见问题解答 | 售前咨询 | 用户表达购买意向、提出购买相关问题时,交接至购买智能体 |
| 购买智能体 | 引导用户选品、确认商品规格/数量、生成待支付订单、处理购买相关异常 | 选品-下单 | 用户确认下单后,交接至支付智能体;订单生成异常时,触发异常处理流程 |
| 支付智能体 | 提供支付方式选择、引导完成支付、查询支付状态、处理支付异常 | 支付环节 | 支付成功后,交接至订单管理智能体;支付失败/超时,引导重试或取消订单 |
| 订单管理智能体 | 订单信息同步、订单状态更新、物流信息关联、订单修改/取消处理 | 订单履约全流程 | 订单完成后,交接至售后智能体;订单出现履约异常,触发异常预警 |
| 售后智能体 | 处理退换货申请、售后咨询、投诉建议、满意度调研 | 售后环节 | 售后问题解决后,流程闭环;需跨环节协调时,反向交接至对应智能体 |
| 异常处理智能体 | 承接各环节异常(如库存不足、支付失败、物流延误),协调资源解决 | 全流程异常节点 | 异常触发时由对应业务智能体交接,问题解决后返回原业务智能体 |
2.2 核心协作流程(Handoffs链路)
用户咨询 → 购买意向触发 → 购买智能体交接 → 下单确认 → 支付智能体交接 → 支付完成 → 订单管理智能体交接 → 订单履约 → 售后需求触发 → 售后智能体交接(异常场景同步触发异常处理智能体)
三、关键操作消息打印机制设计
针对购买、支付、订单等核心操作,设计标准化消息打印格式,包含操作主体、时间、内容、状态、关联ID等关键信息,确保流程可追溯。消息打印触发方式为:对应操作执行时自动同步打印,支持本地日志存储与后台日志系统上传。
3.1 购买相关操作消息打印
3.1.1 选品确认消息
【打印时机】用户确认商品规格、数量并提交选品需求时
【打印内容】购买操作-选品确认 | 时间:{当前时间戳} | 用户ID:{user_id} | 商品信息:[{商品ID1}-{商品名称1}-{规格1}-{数量1},{商品ID2}-{商品名称2}-{规格2}-{数量2}] | 操作状态:选品成功 | 处理智能体:购买智能体 | 关联会话ID:{session_id}
3.1.2 待支付订单生成消息
【打印时机】购买智能体生成待支付订单后
【打印内容】购买操作-订单生成 | 时间:{当前时间戳} | 用户ID:{user_id} | 订单ID:{order_id} | 订单金额:{amount}元 | 商品明细:[{商品ID1}-{数量1}-{单价1},{商品ID2}-{数量2}-{单价2}] | 操作状态:待支付订单生成成功 | 处理智能体:购买智能体 | 交接状态:待交接至支付智能体
3.1.3 购买异常消息
【打印时机】选品失败、库存不足等购买环节异常发生时
【打印内容】购买操作-异常 | 时间:{当前时间戳} | 用户ID:{user_id} | 异常场景:{具体异常描述,如库存不足/商品下架} | 关联商品:{商品ID}-{商品名称} | 操作状态:购买失败 | 处理智能体:购买智能体 | 交接状态:已交接至异常处理智能体
3.2 支付相关操作消息打印
3.2.1 支付方式选择消息
【打印时机】用户选择支付方式后
【打印内容】支付操作-方式选择 | 时间:{当前时间戳} | 用户ID:{user_id} | 订单ID:{order_id} | 选择支付方式:{具体支付方式,如微信支付/支付宝/银行卡支付} | 操作状态:支付方式确认 | 处理智能体:支付智能体 | 关联支付流水号:{pay_flow_id}
3.2.2 支付成功消息
【打印时机】支付渠道返回支付成功结果后
【打印内容】支付操作-支付成功 | 时间:{当前时间戳} | 用户ID:{user_id} | 订单ID:{order_id} | 支付金额:{amount}元 | 支付方式:{具体支付方式} | 支付流水号:{pay_flow_id} | 操作状态:支付成功 | 处理智能体:支付智能体 | 交接状态:待交接至订单管理智能体
3.2.3 支付异常消息
【打印时机】支付失败、支付超时、支付渠道异常时
【打印内容】支付操作-异常 | 时间:{当前时间戳} | 用户ID:{user_id} | 订单ID:{order_id} | 异常类型:{具体异常,如支付超时/余额不足/渠道维护} | 支付金额:{amount}元 | 操作状态:支付失败 | 处理智能体:支付智能体 | 处理建议:{重试支付/取消订单} | 交接状态:异常时交接至异常处理智能体,取消订单时返回至购买智能体
3.3 订单相关操作消息打印
3.3.1 订单状态更新消息
【打印时机】订单状态发生变更时(如待发货、已发货、已签收)
【打印内容】订单操作-状态更新 | 时间:{当前时间戳} | 用户ID:{user_id} | 订单ID:{order_id} | 原状态:{原订单状态} | 新状态:{新订单状态} | 更新原因:{如支付成功触发待发货/商家发货触发已发货} | 处理智能体:订单管理智能体 | 关联物流单号(如有):{logistics_id}
3.3.2 订单修改/取消消息
【打印时机】用户成功修改订单(如修改收货地址)或取消订单时
订单修改:【打印内容】订单操作-修改成功 | 时间:{当前时间戳} | 用户ID:{user_id} | 订单ID:{order_id} | 修改内容:{具体修改项,如收货地址从A修改为B} | 订单状态:{当前订单状态} | 处理智能体:订单管理智能体
订单取消:【打印内容】订单操作-取消成功 | 时间:{当前时间戳} | 用户ID:{user_id} | 订单ID:{order_id} | 取消原因:{用户主动取消/支付超时/库存不足} | 原订单状态:{取消前状态} | 退款状态(如有):{未退款/退款中/退款成功} | 处理智能体:订单管理智能体 | 交接状态:退款时交接至支付智能体
3.3.3 订单完成消息
【打印时机】用户确认收货且无售后需求,或售后问题解决后订单闭环时
【打印内容】订单操作-完成 | 时间:{当前时间戳} | 用户ID:{user_id} | 订单ID:{order_id} | 订单金额:{amount}元 | 履约周期:{从下单到完成的时长} | 订单状态:已完成 | 处理智能体:订单管理智能体 | 交接状态:待交接至售后智能体(满意度调研)
四、实战部署与测试
4.1 部署架构
采用微服务架构部署各智能体模块,各智能体通过消息队列(如RabbitMQ、Kafka)实现Handoffs交接通信,日志系统采用ELK(Elasticsearch+Logstash+Kibana)栈存储和分析打印的消息日志,确保日志实时可查、可分析。
4.2 测试场景设计
-
正常流程测试:模拟用户从咨询→选品→下单→支付→订单履约→完成全流程,验证各智能体交接顺畅性及关键操作消息打印的完整性。
-
异常场景测试:模拟库存不足、支付超时、物流延误等异常场景,验证异常处理智能体的交接机制及异常消息打印的准确性。
-
高并发测试:模拟峰值时段多用户同时操作,验证智能体协作的稳定性及消息打印的实时性,避免日志丢失。
五、效果评估与优化方向
5.1 评估指标
-
流程效率:全流程平均处理时长、各环节交接耗时
-
消息质量:关键操作消息打印成功率、日志信息完整度
-
用户体验:用户咨询到订单完成的满意度评分、异常问题解决率
5.2 优化方向
-
智能交接优化:基于用户画像和历史数据,优化Handoffs触发条件,实现更精准的智能体匹配交接。
-
消息打印优化:根据业务需求扩展消息打印维度,如增加商品类目、用户等级等信息,提升日志分析价值。
-
异常预判:引入机器学习模型,对高风险异常场景(如高客单价支付失败、热门商品库存不足)进行预判,提前触发预警机制。
六、实战代码实现
本章节采用Python语言实现多智能体核心逻辑,基于面向对象思想定义各智能体类,通过消息队列模拟智能体间交接通信,集成关键操作的消息打印功能。代码适配中小型电商场景,可根据实际业务需求扩展。
6.1 环境依赖
# 安装依赖包
# pip install pika # 消息队列依赖,用于智能体间通信
# pip install python-dotenv # 环境配置依赖
6.2 核心配置定义
import os
import time
import json
import pika
from dotenv import load_dotenv
# 加载环境配置
load_dotenv()
class Config:
"""系统配置类"""
# 消息队列配置
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost")
RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", 5672))
RABBITMQ_USER = os.getenv("RABBITMQ_USER", "guest")
RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "guest")
# 队列名称定义(对应智能体交接链路)
QUEUE_CONSULT = "consult_agent_queue" # 咨询智能体队列
QUEUE_PURCHASE = "purchase_agent_queue" # 购买智能体队列
QUEUE_PAY = "pay_agent_queue" # 支付智能体队列
QUEUE_ORDER = "order_agent_queue" # 订单管理智能体队列
QUEUE_AFTERSALE = "aftersale_agent_queue" # 售后智能体队列
QUEUE_EXCEPTION = "exception_agent_queue" # 异常处理智能体队列
@staticmethod
def get_rabbitmq_connection():
"""获取消息队列连接"""
credentials = pika.PlainCredentials(Config.RABBITMQ_USER, Config.RABBITMQ_PASS)
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=Config.RABBITMQ_HOST, port=Config.RABBITMQ_PORT, credentials=credentials)
)
return connection
# 工具类:消息打印与日志处理
class LogUtils:
"""日志工具类,实现关键操作消息标准化打印"""
@staticmethod
def print_purchase_msg(operation_type, user_id, status, goods_info=None, order_id=None, exception_msg=None):
"""购买相关操作消息打印"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
base_msg = f"购买操作-{operation_type} | 时间:{timestamp} | 用户ID:{user_id} | 操作状态:{status} | 处理智能体:购买智能体"
if goods_info:
base_msg += f" | 商品信息:{goods_info}"
if order_id:
base_msg += f" | 订单ID:{order_id}"
if exception_msg:
base_msg += f" | 异常场景:{exception_msg} | 交接状态:已交接至异常处理智能体"
else:
if operation_type == "订单生成":
base_msg += " | 交接状态:待交接至支付智能体"
print(f"[PURCHASE_LOG] {base_msg}")
# 此处可扩展:将日志写入ELK等日志系统
# LogUtils.upload_to_log_system(base_msg, "purchase")
@staticmethod
def print_pay_msg(operation_type, user_id, order_id, status, pay_method=None, amount=None, exception_msg=None):
"""支付相关操作消息打印"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
pay_flow_id = f"PAY{timestamp.replace('-','').replace(' ','').replace(':','')}{user_id}"
base_msg = f"支付操作-{operation_type} | 时间:{timestamp} | 用户ID:{user_id} | 订单ID:{order_id} | 操作状态:{status} | 处理智能体:支付智能体 | 关联支付流水号:{pay_flow_id}"
if pay_method:
base_msg += f" | 支付方式:{pay_method}"
if amount:
base_msg += f" | 支付金额:{amount}元"
if exception_msg:
base_msg += f" | 异常类型:{exception_msg} | 处理建议:重试支付/取消订单 | 交接状态:已交接至异常处理智能体"
else:
if operation_type == "支付成功":
base_msg += " | 交接状态:待交接至订单管理智能体"
print(f"[PAY_LOG] {base_msg}")
# 此处可扩展:将日志写入ELK等日志系统
# LogUtils.upload_to_log_system(base_msg, "pay")
@staticmethod
def print_order_msg(operation_type, user_id, order_id, status, old_status=None, new_status=None, logistics_id=None):
"""订单相关操作消息打印"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
base_msg = f"订单操作-{operation_type} | 时间:{timestamp} | 用户ID:{user_id} | 订单ID:{order_id} | 操作状态:{status} | 处理智能体:订单管理智能体"
if old_status and new_status:
base_msg += f" | 原状态:{old_status} | 新状态:{new_status} | 更新原因:{LogUtils._get_update_reason(old_status, new_status)}"
if logistics_id:
base_msg += f" | 关联物流单号:{logistics_id}"
if operation_type == "取消成功":
base_msg += " | 退款状态:未退款 | 交接状态:退款时交接至支付智能体"
elif operation_type == "完成":
base_msg += " | 交接状态:待交接至售后智能体(满意度调研)"
print(f"[ORDER_LOG] {base_msg}")
# 此处可扩展:将日志写入ELK等日志系统
# LogUtils.upload_to_log_system(base_msg, "order")
@staticmethod
def _get_update_reason(old_status, new_status):
"""获取订单状态更新原因"""
reason_map = {
("待支付", "待发货"): "支付成功触发待发货",
("待发货", "已发货"): "商家发货触发已发货",
("已发货", "已签收"): "用户确认收货触发已签收",
("已签收", "已完成"): "无售后需求触发订单完成"
}
return reason_map.get((old_status, new_status), "系统自动更新")
@staticmethod
def upload_to_log_system(msg, log_type):
"""模拟日志上传至ELK系统"""
# 实际场景中可通过Logstash的HTTP输入插件实现
import requests
elk_url = os.getenv("ELK_URL", "http://localhost:5044")
try:
requests.post(elk_url, json={"log": msg, "type": log_type, "timestamp": time.time()})
except Exception as e:
print(f"日志上传失败:{str(e)}")
6.3 多智能体类定义
class BaseAgent:
"""智能体基类,定义通用方法"""
def __init__(self, queue_name):
self.queue_name = queue_name
self.connection = Config.get_rabbitmq_connection()
self.channel = self.connection.channel()
# 声明队列(不存在则创建)
self.channel.queue_declare(queue=self.queue_name, durable=True)
def send_msg(self, target_queue, msg_data):
"""发送消息至目标智能体队列(实现交接)"""
try:
self.channel.basic_publish(
exchange='',
routing_key=target_queue,
body=json.dumps(msg_data, ensure_ascii=False),
properties=pika.BasicProperties(delivery_mode=2) # 消息持久化
)
print(f"[交接成功] 从{self.queue_name}发送消息至{target_queue},数据:{msg_data}")
except Exception as e:
print(f"[交接失败] 从{self.queue_name}发送消息至{target_queue},错误:{str(e)}")
# 触发异常处理
self.send_to_exception(msg_data, f"交接失败:{str(e)}")
def send_to_exception(self, msg_data, exception_msg):
"""发送异常消息至异常处理智能体"""
exception_data = {
"source_queue": self.queue_name,
"data": msg_data,
"exception_msg": exception_msg,
"timestamp": time.time()
}
self.send_msg(Config.QUEUE_EXCEPTION, exception_data)
def start_listen(self):
"""启动监听队列(子类需重写回调方法)"""
self.channel.basic_consume(queue=self.queue_name, on_message_callback=self.callback, auto_ack=False)
print(f"[{self.__class__.__name__}] 开始监听队列:{self.queue_name}")
self.channel.start_consuming()
def callback(self, ch, method, properties, body):
"""消息回调处理(子类必须重写)"""
raise NotImplementedError("子类必须实现callback方法")
def close(self):
"""关闭连接"""
if self.connection.is_open:
self.connection.close()
class ConsultAgent(BaseAgent):
"""用户咨询智能体"""
def __init__(self):
super().__init__(Config.QUEUE_CONSULT)
def callback(self, ch, method, properties, body):
"""处理咨询消息,触发购买意向判断"""
try:
msg_data = json.loads(body.decode('utf-8'))
user_id = msg_data.get("user_id")
user_msg = msg_data.get("user_msg")
goods_id = msg_data.get("goods_id")
print(f"[咨询处理] 用户{user_id}咨询:{user_msg},关联商品ID:{goods_id}")
# 模拟判断购买意向(实际场景可结合NLP识别)
purchase_intent_keywords = ["购买", "下单", "买", "怎么买", "多少钱"]
has_purchase_intent = any(keyword in user_msg for keyword in purchase_intent_keywords)
if has_purchase_intent:
# 有购买意向,交接至购买智能体
purchase_data = {
"user_id": user_id,
"goods_id": goods_id,
"consult_msg": user_msg,
"timestamp": time.time()
}
self.send_msg(Config.QUEUE_PURCHASE, purchase_data)
else:
# 无购买意向,直接回复咨询
reply_msg = self._get_consult_reply(user_msg, goods_id)
print(f"[咨询回复] 给用户{user_id}的回复:{reply_msg}")
ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消息处理完成
except Exception as e:
print(f"[咨询处理失败] 错误:{str(e)}")
self.send_to_exception(json.loads(body.decode('utf-8')), str(e))
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) # 拒绝消息,不重新入队
def _get_consult_reply(self, user_msg, goods_id):
"""模拟咨询回复(实际场景可对接知识库)"""
if "价格" in user_msg:
return f"商品{goods_id}的当前售价为99元,支持满200减50活动"
elif "库存" in user_msg:
return f"商品{goods_id}当前库存充足,全国包邮"
else:
return "您好,请问有什么可以帮您?如需购买可直接告知哦~"
class PurchaseAgent(BaseAgent):
"""购买智能体"""
def __init__(self):
super().__init__(Config.QUEUE_PURCHASE)
def callback(self, ch, method, properties, body):
"""处理购买相关逻辑:选品确认、订单生成"""
try:
msg_data = json.loads(body.decode('utf-8'))
user_id = msg_data.get("user_id")
goods_id = msg_data.get("goods_id")
# 模拟选品确认(实际场景需校验商品规格、数量)
goods_info = self._confirm_goods(goods_id)
if not goods_info:
# 选品异常
LogUtils.print_purchase_msg(
operation_type="选品确认",
user_id=user_id,
status="选品失败",
exception_msg=f"商品{goods_id}不存在或已下架"
)
ch.basic_ack(delivery_tag=method.delivery_tag)
return
# 选品成功,打印消息
LogUtils.print_purchase_msg(
operation_type="选品确认",
user_id=user_id,
status="选品成功",
goods_info=goods_info
)
# 生成待支付订单
order_id = self._generate_order(user_id, goods_info)
# 订单生成成功,打印消息并交接至支付智能体
LogUtils.print_purchase_msg(
operation_type="订单生成",
user_id=user_id,
status="待支付订单生成成功",
goods_info=goods_info,
order_id=order_id
)
# 交接至支付智能体
pay_data = {
"user_id": user_id,
"order_id": order_id,
"goods_info": goods_info,
"amount": self._calculate_amount(goods_info),
"timestamp": time.time()
}
self.send_msg(Config.QUEUE_PAY, pay_data)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"[购买处理失败] 错误:{str(e)}")
self.send_to_exception(json.loads(body.decode('utf-8')), str(e))
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
def _confirm_goods(self, goods_id):
"""模拟选品确认,返回商品信息"""
# 实际场景对接商品数据库
goods_db = {
"G1001": "G1001-智能手机A-128G-1",
"G1002": "G1002-无线耳机B-标准版-1",
"G1003": "G1003-笔记本电脑C-16G+512G-1"
}
return goods_db.get(goods_id)
def _generate_order(self, user_id, goods_info):
"""生成订单ID"""
timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
return f"ORD{timestamp}{user_id}"
def _calculate_amount(self, goods_info):
"""计算订单金额(模拟)"""
goods_price_db = {
"G1001": 3999,
"G1002": 499,
"G1003": 5999
}
goods_id = goods_info.split("-")[0]
return goods_price_db.get(goods_id, 0)
class PayAgent(BaseAgent):
"""支付智能体"""
def __init__(self):
super().__init__(Config.QUEUE_PAY)
def callback(self, ch, method, properties, body):
"""处理支付相关逻辑:支付方式选择、支付状态校验"""
try:
msg_data = json.loads(body.decode('utf-8'))
user_id = msg_data.get("user_id")
order_id = msg_data.get("order_id")
amount = msg_data.get("amount")
goods_info = msg_data.get("goods_info")
# 模拟用户选择支付方式(实际场景对接支付渠道)
pay_method = self._select_pay_method(user_id)
# 打印支付方式选择消息
LogUtils.print_pay_msg(
operation_type="方式选择",
user_id=user_id,
order_id=order_id,
status="支付方式确认",
pay_method=pay_method
)
# 模拟支付校验(实际场景调用支付渠道API)
pay_result = self._verify_payment(user_id, order_id, amount, pay_method)
if pay_result["success"]:
# 支付成功
LogUtils.print_pay_msg(
operation_type="支付成功",
user_id=user_id,
order_id=order_id,
status="支付成功",
pay_method=pay_method,
amount=amount
)
# 交接至订单管理智能体
order_data = {
"user_id": user_id,
"order_id": order_id,
"goods_info": goods_info,
"amount": amount,
"pay_method": pay_method,
"pay_flow_id": pay_result["pay_flow_id"],
"timestamp": time.time()
}
self.send_msg(Config.QUEUE_ORDER, order_data)
else:
# 支付异常
LogUtils.print_pay_msg(
operation_type="支付异常",
user_id=user_id,
order_id=order_id,
status="支付失败",
pay_method=pay_method,
amount=amount,
exception_msg=pay_result["error_msg"]
)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"[支付处理失败] 错误:{str(e)}")
self.send_to_exception(json.loads(body.decode('utf-8')), str(e))
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
def _select_pay_method(self, user_id):
"""模拟用户选择支付方式(实际场景由前端传入)"""
# 简单模拟:奇数用户选微信,偶数用户选支付宝
return "微信支付" if int(user_id) % 2 == 1 else "支付宝"
def _verify_payment(self, user_id, order_id, amount, pay_method):
"""模拟支付校验(实际场景对接支付渠道)"""
# 模拟90%支付成功率
import random
if random.random() < 0.9:
return {
"success": True,
"pay_flow_id": f"PAY{time.strftime('%Y%m%d%H%M%S')}{user_id}",
"error_msg": ""
}
else:
error_msgs = ["余额不足", "支付超时", "渠道维护中"]
return {
"success": False,
"pay_flow_id": "",
"error_msg": random.choice(error_msgs)
}
class OrderAgent(BaseAgent):
"""订单管理智能体"""
def __init__(self):
super().__init__(Config.QUEUE_ORDER)
# 模拟订单状态存储
self.order_status_db = {}
def callback(self, ch, method, properties, body):
"""处理订单相关逻辑:状态更新、物流关联、订单完成"""
try:
msg_data = json.loads(body.decode('utf-8'))
user_id = msg_data.get("user_id")
order_id = msg_data.get("order_id")
amount = msg_data.get("amount")
# 初始化订单状态为待发货
self.order_status_db[order_id] = "待发货"
# 打印订单状态更新消息(待发货)
LogUtils.print_order_msg(
operation_type="状态更新",
user_id=user_id,
order_id=order_id,
status="更新成功",
old_status="待支付",
new_status="待发货"
)
# 模拟商家发货(实际场景由商家系统触发)
time.sleep(2) # 模拟处理耗时
self.order_status_db[order_id] = "已发货"
logistics_id = self._generate_logistics_id()
# 打印订单状态更新消息(已发货)
LogUtils.print_order_msg(
operation_type="状态更新",
user_id=user_id,
order_id=order_id,
status="更新成功",
old_status="待发货",
new_status="已发货",
logistics_id=logistics_id
)
# 模拟用户确认收货(实际场景由用户操作触发)
time.sleep(3) # 模拟物流耗时
self.order_status_db[order_id] = "已签收"
# 打印订单状态更新消息(已签收)
LogUtils.print_order_msg(
operation_type="状态更新",
user_id=user_id,
order_id=order_id,
status="更新成功",
old_status="已发货",
new_status="已签收",
logistics_id=logistics_id
)
# 模拟订单完成(无售后)
time.sleep(1)
self.order_status_db[order_id] = "已完成"
# 打印订单完成消息
LogUtils.print_order_msg(
operation_type="完成",
user_id=user_id,
order_id=order_id,
status="已完成"
)
# 交接至售后智能体(满意度调研)
aftersale_data = {
"user_id": user_id,
"order_id": order_id,
"amount": amount,
"logistics_id": logistics_id,
"timestamp": time.time()
}
self.send_msg(Config.QUEUE_AFTERSALE, aftersale_data)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"[订单处理失败] 错误:{str(e)}")
self.send_to_exception(json.loads(body.decode('utf-8')), str(e))
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
def _generate_logistics_id(self):
"""生成物流单号"""
return f"LOG{time.strftime('%Y%m%d%H%M%S')}{random.randint(1000, 9999)}"
class AftersaleAgent(BaseAgent):
"""售后智能体"""
def __init__(self):
super().__init__(Config.QUEUE_AFTERSALE)
def callback(self, ch, method, properties, body):
"""处理售后逻辑:满意度调研"""
try:
msg_data = json.loads(body.decode('utf-8'))
user_id = msg_data.get("user_id")
order_id = msg_data.get("order_id")
print(f"[售后处理] 向用户{user_id}发送订单{order_id}的满意度调研:您好,您的订单已完成,请问对本次服务满意吗?")
# 实际场景可对接调研问卷系统
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"[售后处理失败] 错误:{str(e)}")
self.send_to_exception(json.loads(body.decode('utf-8')), str(e))
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
class ExceptionAgent(BaseAgent):
"""异常处理智能体"""
def __init__(self):
super().__init__(Config.QUEUE_EXCEPTION)
def callback(self, ch, method, properties, body):
"""处理各环节异常"""
try:
msg_data = json.loads(body.decode('utf-8'))
source_queue = msg_data.get("source_queue")
exception_msg = msg_data.get("exception_msg")
data = msg_data.get("data")
print(f"[异常处理] 来源队列:{source_queue} | 异常信息:{exception_msg} | 关联数据:{data}")
# 实际场景可触发工单系统、通知运营人员等
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"[异常处理失败] 错误:{str(e)}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
6.4 启动与测试脚本
import threading
import random
def start_agent(agent_class):
"""启动单个智能体(线程方式)"""
agent = agent_class()
thread = threading.Thread(target=agent.start_listen, daemon=True)
thread.start()
return agent, thread
def send_test_consult_msg():
"""发送测试咨询消息"""
# 启动咨询智能体的生产者端
consult_agent = ConsultAgent()
# 模拟3个用户咨询
test_users = [
{"user_id": "U1001", "user_msg": "我要购买商品G1001", "goods_id": "G1001"},
{"user_id": "U1002", "user_msg": "商品G1002多少钱?", "goods_id": "G1002"},
{"user_id": "U1003", "user_msg": "商品G1003有库存吗?我想买", "goods_id": "G1003"}
]
for user in test_users:
consult_agent.send_msg(Config.QUEUE_CONSULT, user)
time.sleep(1) # 间隔发送,避免消息拥堵
consult_agent.close()
if __name__ == "__main__":
try:
# 1. 启动所有智能体
agents = []
agent_classes = [ConsultAgent, PurchaseAgent, PayAgent, OrderAgent, AftersaleAgent, ExceptionAgent]
for cls in agent_classes:
agent, thread = start_agent(cls)
agents.append((agent, thread))
time.sleep(0.5) # 间隔启动,避免资源竞争
# 2. 发送测试消息
print("\n=== 开始发送测试咨询消息 ===")
send_test_consult_msg()
# 3. 保持主线程运行
while True:
time.sleep(10)
except KeyboardInterrupt:
print("\n=== 系统正在关闭 ===")
# 关闭所有智能体连接
for agent, _ in agents:
agent.close()
print("=== 系统已关闭 ===")
6.5 代码说明与扩展建议
-
代码结构:采用分层设计,BaseAgent封装通用消息队列操作,各业务智能体继承并实现具体业务逻辑,LogUtils统一管理消息打印,确保格式标准化。
-
核心功能:实现智能体间基于RabbitMQ的交接通信,覆盖咨询→购买→支付→订单→售后全流程,包含选品、下单、支付、订单状态更新等关键操作的消息打印。
-
扩展建议:
-
生产环境优化:增加服务注册发现(如Consul)、配置中心(如Nacos)、熔断降级(如Sentinel)等组件。
-
业务扩展:对接真实商品数据库、支付渠道API、物流系统,实现退款、退换货等完整售后流程。
-
监控增强:基于Prometheus+Grafana实现智能体运行状态、交接成功率、消息打印量等指标监控。
更多推荐



所有评论(0)