一、方案背景与目标

随着电商业务的复杂化和用户需求的个性化,单一智能体已难以高效覆盖从用户咨询、商品推荐到订单履约全流程服务。多智能体Handoffs(交接)模式通过将业务流程拆解为不同模块,由专用智能体各司其职并实现无缝交接,可大幅提升服务效率与用户体验。

本方案核心目标:构建适配电商全流程的多智能体协作体系,明确各智能体职责边界与交接规则,针对购买、支付、订单等关键操作设计标准化消息打印机制,实现流程可追溯、问题可定位。

二、多智能体架构设计

2.1 智能体分类及核心职责

智能体类型 核心职责 关联业务节点 交接触发条件
用户咨询智能体 承接用户初始咨询、商品信息查询、活动规则解读、常见问题解答 售前咨询 用户表达购买意向、提出购买相关问题时,交接至购买智能体
购买智能体 引导用户选品、确认商品规格/数量、生成待支付订单、处理购买相关异常 选品-下单 用户确认下单后,交接至支付智能体;订单生成异常时,触发异常处理流程
支付智能体 提供支付方式选择、引导完成支付、查询支付状态、处理支付异常 支付环节 支付成功后,交接至订单管理智能体;支付失败/超时,引导重试或取消订单
订单管理智能体 订单信息同步、订单状态更新、物流信息关联、订单修改/取消处理 订单履约全流程 订单完成后,交接至售后智能体;订单出现履约异常,触发异常预警
售后智能体 处理退换货申请、售后咨询、投诉建议、满意度调研 售后环节 售后问题解决后,流程闭环;需跨环节协调时,反向交接至对应智能体
异常处理智能体 承接各环节异常(如库存不足、支付失败、物流延误),协调资源解决 全流程异常节点 异常触发时由对应业务智能体交接,问题解决后返回原业务智能体

2.2 核心协作流程(Handoffs链路)

用户咨询 → 购买意向触发 → 购买智能体交接 → 下单确认 → 支付智能体交接 → 支付完成 → 订单管理智能体交接 → 订单履约 → 售后需求触发 → 售后智能体交接(异常场景同步触发异常处理智能体)

三、关键操作消息打印机制设计

针对购买、支付、订单等核心操作,设计标准化消息打印格式,包含操作主体、时间、内容、状态、关联ID等关键信息,确保流程可追溯。消息打印触发方式为:对应操作执行时自动同步打印,支持本地日志存储与后台日志系统上传。

3.1 购买相关操作消息打印

3.1.1 选品确认消息

【打印时机】用户确认商品规格、数量并提交选品需求时

【打印内容】购买操作-选品确认 | 时间:{当前时间戳} | 用户ID:{user_id} | 商品信息:[{商品ID1}-{商品名称1}-{规格1}-{数量1},{商品ID2}-{商品名称2}-{规格2}-{数量2}] | 操作状态:选品成功 | 处理智能体:购买智能体 | 关联会话ID:{session_id}

3.1.2 待支付订单生成消息

【打印时机】购买智能体生成待支付订单后

【打印内容】购买操作-订单生成 | 时间:{当前时间戳} | 用户ID:{user_id} | 订单ID:{order_id} | 订单金额:{amount}元 | 商品明细:[{商品ID1}-{数量1}-{单价1},{商品ID2}-{数量2}-{单价2}] | 操作状态:待支付订单生成成功 | 处理智能体:购买智能体 | 交接状态:待交接至支付智能体

3.1.3 购买异常消息

【打印时机】选品失败、库存不足等购买环节异常发生时

【打印内容】购买操作-异常 | 时间:{当前时间戳} | 用户ID:{user_id} | 异常场景:{具体异常描述,如库存不足/商品下架} | 关联商品:{商品ID}-{商品名称} | 操作状态:购买失败 | 处理智能体:购买智能体 | 交接状态:已交接至异常处理智能体

3.2 支付相关操作消息打印

3.2.1 支付方式选择消息

【打印时机】用户选择支付方式后

【打印内容】支付操作-方式选择 | 时间:{当前时间戳} | 用户ID:{user_id} | 订单ID:{order_id} | 选择支付方式:{具体支付方式,如微信支付/支付宝/银行卡支付} | 操作状态:支付方式确认 | 处理智能体:支付智能体 | 关联支付流水号:{pay_flow_id}

3.2.2 支付成功消息

【打印时机】支付渠道返回支付成功结果后

【打印内容】支付操作-支付成功 | 时间:{当前时间戳} | 用户ID:{user_id} | 订单ID:{order_id} | 支付金额:{amount}元 | 支付方式:{具体支付方式} | 支付流水号:{pay_flow_id} | 操作状态:支付成功 | 处理智能体:支付智能体 | 交接状态:待交接至订单管理智能体

3.2.3 支付异常消息

【打印时机】支付失败、支付超时、支付渠道异常时

【打印内容】支付操作-异常 | 时间:{当前时间戳} | 用户ID:{user_id} | 订单ID:{order_id} | 异常类型:{具体异常,如支付超时/余额不足/渠道维护} | 支付金额:{amount}元 | 操作状态:支付失败 | 处理智能体:支付智能体 | 处理建议:{重试支付/取消订单} | 交接状态:异常时交接至异常处理智能体,取消订单时返回至购买智能体

3.3 订单相关操作消息打印

3.3.1 订单状态更新消息

【打印时机】订单状态发生变更时(如待发货、已发货、已签收)

【打印内容】订单操作-状态更新 | 时间:{当前时间戳} | 用户ID:{user_id} | 订单ID:{order_id} | 原状态:{原订单状态} | 新状态:{新订单状态} | 更新原因:{如支付成功触发待发货/商家发货触发已发货} | 处理智能体:订单管理智能体 | 关联物流单号(如有):{logistics_id}

3.3.2 订单修改/取消消息

【打印时机】用户成功修改订单(如修改收货地址)或取消订单时

订单修改:【打印内容】订单操作-修改成功 | 时间:{当前时间戳} | 用户ID:{user_id} | 订单ID:{order_id} | 修改内容:{具体修改项,如收货地址从A修改为B} | 订单状态:{当前订单状态} | 处理智能体:订单管理智能体

订单取消:【打印内容】订单操作-取消成功 | 时间:{当前时间戳} | 用户ID:{user_id} | 订单ID:{order_id} | 取消原因:{用户主动取消/支付超时/库存不足} | 原订单状态:{取消前状态} | 退款状态(如有):{未退款/退款中/退款成功} | 处理智能体:订单管理智能体 | 交接状态:退款时交接至支付智能体

3.3.3 订单完成消息

【打印时机】用户确认收货且无售后需求,或售后问题解决后订单闭环时

【打印内容】订单操作-完成 | 时间:{当前时间戳} | 用户ID:{user_id} | 订单ID:{order_id} | 订单金额:{amount}元 | 履约周期:{从下单到完成的时长} | 订单状态:已完成 | 处理智能体:订单管理智能体 | 交接状态:待交接至售后智能体(满意度调研)

四、实战部署与测试

4.1 部署架构

采用微服务架构部署各智能体模块,各智能体通过消息队列(如RabbitMQ、Kafka)实现Handoffs交接通信,日志系统采用ELK(Elasticsearch+Logstash+Kibana)栈存储和分析打印的消息日志,确保日志实时可查、可分析。

4.2 测试场景设计

  1. 正常流程测试:模拟用户从咨询→选品→下单→支付→订单履约→完成全流程,验证各智能体交接顺畅性及关键操作消息打印的完整性。

  2. 异常场景测试:模拟库存不足、支付超时、物流延误等异常场景,验证异常处理智能体的交接机制及异常消息打印的准确性。

  3. 高并发测试:模拟峰值时段多用户同时操作,验证智能体协作的稳定性及消息打印的实时性,避免日志丢失。

五、效果评估与优化方向

5.1 评估指标

  • 流程效率:全流程平均处理时长、各环节交接耗时

  • 消息质量:关键操作消息打印成功率、日志信息完整度

  • 用户体验:用户咨询到订单完成的满意度评分、异常问题解决率

5.2 优化方向

  1. 智能交接优化:基于用户画像和历史数据,优化Handoffs触发条件,实现更精准的智能体匹配交接。

  2. 消息打印优化:根据业务需求扩展消息打印维度,如增加商品类目、用户等级等信息,提升日志分析价值。

  3. 异常预判:引入机器学习模型,对高风险异常场景(如高客单价支付失败、热门商品库存不足)进行预判,提前触发预警机制。

六、实战代码实现

本章节采用Python语言实现多智能体核心逻辑,基于面向对象思想定义各智能体类,通过消息队列模拟智能体间交接通信,集成关键操作的消息打印功能。代码适配中小型电商场景,可根据实际业务需求扩展。

6.1 环境依赖

# 安装依赖包
# pip install pika  # 消息队列依赖,用于智能体间通信
# pip install python-dotenv  # 环境配置依赖

6.2 核心配置定义

import os
import time
import json
import pika
from dotenv import load_dotenv

# 加载环境配置
load_dotenv()

class Config:
    """系统配置类"""
    # 消息队列配置
    RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost")
    RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", 5672))
    RABBITMQ_USER = os.getenv("RABBITMQ_USER", "guest")
    RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "guest")
    # 队列名称定义(对应智能体交接链路)
    QUEUE_CONSULT = "consult_agent_queue"  # 咨询智能体队列
    QUEUE_PURCHASE = "purchase_agent_queue"  # 购买智能体队列
    QUEUE_PAY = "pay_agent_queue"  # 支付智能体队列
    QUEUE_ORDER = "order_agent_queue"  # 订单管理智能体队列
    QUEUE_AFTERSALE = "aftersale_agent_queue"  # 售后智能体队列
    QUEUE_EXCEPTION = "exception_agent_queue"  # 异常处理智能体队列

    @staticmethod
    def get_rabbitmq_connection():
        """获取消息队列连接"""
        credentials = pika.PlainCredentials(Config.RABBITMQ_USER, Config.RABBITMQ_PASS)
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=Config.RABBITMQ_HOST, port=Config.RABBITMQ_PORT, credentials=credentials)
        )
        return connection

# 工具类:消息打印与日志处理
class LogUtils:
    """日志工具类,实现关键操作消息标准化打印"""
    @staticmethod
    def print_purchase_msg(operation_type, user_id, status, goods_info=None, order_id=None, exception_msg=None):
        """购买相关操作消息打印"""
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        base_msg = f"购买操作-{operation_type} | 时间:{timestamp} | 用户ID:{user_id} | 操作状态:{status} | 处理智能体:购买智能体"
        if goods_info:
            base_msg += f" | 商品信息:{goods_info}"
        if order_id:
            base_msg += f" | 订单ID:{order_id}"
        if exception_msg:
            base_msg += f" | 异常场景:{exception_msg} | 交接状态:已交接至异常处理智能体"
        else:
            if operation_type == "订单生成":
                base_msg += " | 交接状态:待交接至支付智能体"
        print(f"[PURCHASE_LOG] {base_msg}")
        # 此处可扩展:将日志写入ELK等日志系统
        # LogUtils.upload_to_log_system(base_msg, "purchase")

    @staticmethod
    def print_pay_msg(operation_type, user_id, order_id, status, pay_method=None, amount=None, exception_msg=None):
        """支付相关操作消息打印"""
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        pay_flow_id = f"PAY{timestamp.replace('-','').replace(' ','').replace(':','')}{user_id}"
        base_msg = f"支付操作-{operation_type} | 时间:{timestamp} | 用户ID:{user_id} | 订单ID:{order_id} | 操作状态:{status} | 处理智能体:支付智能体 | 关联支付流水号:{pay_flow_id}"
        if pay_method:
            base_msg += f" | 支付方式:{pay_method}"
        if amount:
            base_msg += f" | 支付金额:{amount}元"
        if exception_msg:
            base_msg += f" | 异常类型:{exception_msg} | 处理建议:重试支付/取消订单 | 交接状态:已交接至异常处理智能体"
        else:
            if operation_type == "支付成功":
                base_msg += " | 交接状态:待交接至订单管理智能体"
        print(f"[PAY_LOG] {base_msg}")
        # 此处可扩展:将日志写入ELK等日志系统
        # LogUtils.upload_to_log_system(base_msg, "pay")

    @staticmethod
    def print_order_msg(operation_type, user_id, order_id, status, old_status=None, new_status=None, logistics_id=None):
        """订单相关操作消息打印"""
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        base_msg = f"订单操作-{operation_type} | 时间:{timestamp} | 用户ID:{user_id} | 订单ID:{order_id} | 操作状态:{status} | 处理智能体:订单管理智能体"
        if old_status and new_status:
            base_msg += f" | 原状态:{old_status} | 新状态:{new_status} | 更新原因:{LogUtils._get_update_reason(old_status, new_status)}"
        if logistics_id:
            base_msg += f" | 关联物流单号:{logistics_id}"
        if operation_type == "取消成功":
            base_msg += " | 退款状态:未退款 | 交接状态:退款时交接至支付智能体"
        elif operation_type == "完成":
            base_msg += " | 交接状态:待交接至售后智能体(满意度调研)"
        print(f"[ORDER_LOG] {base_msg}")
        # 此处可扩展:将日志写入ELK等日志系统
        # LogUtils.upload_to_log_system(base_msg, "order")

    @staticmethod
    def _get_update_reason(old_status, new_status):
        """获取订单状态更新原因"""
        reason_map = {
            ("待支付", "待发货"): "支付成功触发待发货",
            ("待发货", "已发货"): "商家发货触发已发货",
            ("已发货", "已签收"): "用户确认收货触发已签收",
            ("已签收", "已完成"): "无售后需求触发订单完成"
        }
        return reason_map.get((old_status, new_status), "系统自动更新")

    @staticmethod
    def upload_to_log_system(msg, log_type):
        """模拟日志上传至ELK系统"""
        # 实际场景中可通过Logstash的HTTP输入插件实现
        import requests
        elk_url = os.getenv("ELK_URL", "http://localhost:5044")
        try:
            requests.post(elk_url, json={"log": msg, "type": log_type, "timestamp": time.time()})
        except Exception as e:
            print(f"日志上传失败:{str(e)}")

6.3 多智能体类定义

class BaseAgent:
    """智能体基类,定义通用方法"""
    def __init__(self, queue_name):
        self.queue_name = queue_name
        self.connection = Config.get_rabbitmq_connection()
        self.channel = self.connection.channel()
        # 声明队列(不存在则创建)
        self.channel.queue_declare(queue=self.queue_name, durable=True)

    def send_msg(self, target_queue, msg_data):
        """发送消息至目标智能体队列(实现交接)"""
        try:
            self.channel.basic_publish(
                exchange='',
                routing_key=target_queue,
                body=json.dumps(msg_data, ensure_ascii=False),
                properties=pika.BasicProperties(delivery_mode=2)  # 消息持久化
            )
            print(f"[交接成功] 从{self.queue_name}发送消息至{target_queue},数据:{msg_data}")
        except Exception as e:
            print(f"[交接失败] 从{self.queue_name}发送消息至{target_queue},错误:{str(e)}")
            # 触发异常处理
            self.send_to_exception(msg_data, f"交接失败:{str(e)}")

    def send_to_exception(self, msg_data, exception_msg):
        """发送异常消息至异常处理智能体"""
        exception_data = {
            "source_queue": self.queue_name,
            "data": msg_data,
            "exception_msg": exception_msg,
            "timestamp": time.time()
        }
        self.send_msg(Config.QUEUE_EXCEPTION, exception_data)

    def start_listen(self):
        """启动监听队列(子类需重写回调方法)"""
        self.channel.basic_consume(queue=self.queue_name, on_message_callback=self.callback, auto_ack=False)
        print(f"[{self.__class__.__name__}] 开始监听队列:{self.queue_name}")
        self.channel.start_consuming()

    def callback(self, ch, method, properties, body):
        """消息回调处理(子类必须重写)"""
        raise NotImplementedError("子类必须实现callback方法")

    def close(self):
        """关闭连接"""
        if self.connection.is_open:
            self.connection.close()


class ConsultAgent(BaseAgent):
    """用户咨询智能体"""
    def __init__(self):
        super().__init__(Config.QUEUE_CONSULT)

    def callback(self, ch, method, properties, body):
        """处理咨询消息,触发购买意向判断"""
        try:
            msg_data = json.loads(body.decode('utf-8'))
            user_id = msg_data.get("user_id")
            user_msg = msg_data.get("user_msg")
            goods_id = msg_data.get("goods_id")
            print(f"[咨询处理] 用户{user_id}咨询:{user_msg},关联商品ID:{goods_id}")

            # 模拟判断购买意向(实际场景可结合NLP识别)
            purchase_intent_keywords = ["购买", "下单", "买", "怎么买", "多少钱"]
            has_purchase_intent = any(keyword in user_msg for keyword in purchase_intent_keywords)

            if has_purchase_intent:
                # 有购买意向,交接至购买智能体
                purchase_data = {
                    "user_id": user_id,
                    "goods_id": goods_id,
                    "consult_msg": user_msg,
                    "timestamp": time.time()
                }
                self.send_msg(Config.QUEUE_PURCHASE, purchase_data)
            else:
                # 无购买意向,直接回复咨询
                reply_msg = self._get_consult_reply(user_msg, goods_id)
                print(f"[咨询回复] 给用户{user_id}的回复:{reply_msg}")

            ch.basic_ack(delivery_tag=method.delivery_tag)  # 确认消息处理完成
        except Exception as e:
            print(f"[咨询处理失败] 错误:{str(e)}")
            self.send_to_exception(json.loads(body.decode('utf-8')), str(e))
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)  # 拒绝消息,不重新入队

    def _get_consult_reply(self, user_msg, goods_id):
        """模拟咨询回复(实际场景可对接知识库)"""
        if "价格" in user_msg:
            return f"商品{goods_id}的当前售价为99元,支持满200减50活动"
        elif "库存" in user_msg:
            return f"商品{goods_id}当前库存充足,全国包邮"
        else:
            return "您好,请问有什么可以帮您?如需购买可直接告知哦~"


class PurchaseAgent(BaseAgent):
    """购买智能体"""
    def __init__(self):
        super().__init__(Config.QUEUE_PURCHASE)

    def callback(self, ch, method, properties, body):
        """处理购买相关逻辑:选品确认、订单生成"""
        try:
            msg_data = json.loads(body.decode('utf-8'))
            user_id = msg_data.get("user_id")
            goods_id = msg_data.get("goods_id")
            # 模拟选品确认(实际场景需校验商品规格、数量)
            goods_info = self._confirm_goods(goods_id)
            if not goods_info:
                # 选品异常
                LogUtils.print_purchase_msg(
                    operation_type="选品确认",
                    user_id=user_id,
                    status="选品失败",
                    exception_msg=f"商品{goods_id}不存在或已下架"
                )
                ch.basic_ack(delivery_tag=method.delivery_tag)
                return

            # 选品成功,打印消息
            LogUtils.print_purchase_msg(
                operation_type="选品确认",
                user_id=user_id,
                status="选品成功",
                goods_info=goods_info
            )

            # 生成待支付订单
            order_id = self._generate_order(user_id, goods_info)
            # 订单生成成功,打印消息并交接至支付智能体
            LogUtils.print_purchase_msg(
                operation_type="订单生成",
                user_id=user_id,
                status="待支付订单生成成功",
                goods_info=goods_info,
                order_id=order_id
            )

            # 交接至支付智能体
            pay_data = {
                "user_id": user_id,
                "order_id": order_id,
                "goods_info": goods_info,
                "amount": self._calculate_amount(goods_info),
                "timestamp": time.time()
            }
            self.send_msg(Config.QUEUE_PAY, pay_data)

            ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            print(f"[购买处理失败] 错误:{str(e)}")
            self.send_to_exception(json.loads(body.decode('utf-8')), str(e))
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

    def _confirm_goods(self, goods_id):
        """模拟选品确认,返回商品信息"""
        # 实际场景对接商品数据库
        goods_db = {
            "G1001": "G1001-智能手机A-128G-1",
            "G1002": "G1002-无线耳机B-标准版-1",
            "G1003": "G1003-笔记本电脑C-16G+512G-1"
        }
        return goods_db.get(goods_id)

    def _generate_order(self, user_id, goods_info):
        """生成订单ID"""
        timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
        return f"ORD{timestamp}{user_id}"

    def _calculate_amount(self, goods_info):
        """计算订单金额(模拟)"""
        goods_price_db = {
            "G1001": 3999,
            "G1002": 499,
            "G1003": 5999
        }
        goods_id = goods_info.split("-")[0]
        return goods_price_db.get(goods_id, 0)


class PayAgent(BaseAgent):
    """支付智能体"""
    def __init__(self):
        super().__init__(Config.QUEUE_PAY)

    def callback(self, ch, method, properties, body):
        """处理支付相关逻辑:支付方式选择、支付状态校验"""
        try:
            msg_data = json.loads(body.decode('utf-8'))
            user_id = msg_data.get("user_id")
            order_id = msg_data.get("order_id")
            amount = msg_data.get("amount")
            goods_info = msg_data.get("goods_info")

            # 模拟用户选择支付方式(实际场景对接支付渠道)
            pay_method = self._select_pay_method(user_id)
            # 打印支付方式选择消息
            LogUtils.print_pay_msg(
                operation_type="方式选择",
                user_id=user_id,
                order_id=order_id,
                status="支付方式确认",
                pay_method=pay_method
            )

            # 模拟支付校验(实际场景调用支付渠道API)
            pay_result = self._verify_payment(user_id, order_id, amount, pay_method)
            if pay_result["success"]:
                # 支付成功
                LogUtils.print_pay_msg(
                    operation_type="支付成功",
                    user_id=user_id,
                    order_id=order_id,
                    status="支付成功",
                    pay_method=pay_method,
                    amount=amount
                )
                # 交接至订单管理智能体
                order_data = {
                    "user_id": user_id,
                    "order_id": order_id,
                    "goods_info": goods_info,
                    "amount": amount,
                    "pay_method": pay_method,
                    "pay_flow_id": pay_result["pay_flow_id"],
                    "timestamp": time.time()
                }
                self.send_msg(Config.QUEUE_ORDER, order_data)
            else:
                # 支付异常
                LogUtils.print_pay_msg(
                    operation_type="支付异常",
                    user_id=user_id,
                    order_id=order_id,
                    status="支付失败",
                    pay_method=pay_method,
                    amount=amount,
                    exception_msg=pay_result["error_msg"]
                )

            ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            print(f"[支付处理失败] 错误:{str(e)}")
            self.send_to_exception(json.loads(body.decode('utf-8')), str(e))
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

    def _select_pay_method(self, user_id):
        """模拟用户选择支付方式(实际场景由前端传入)"""
        # 简单模拟:奇数用户选微信,偶数用户选支付宝
        return "微信支付" if int(user_id) % 2 == 1 else "支付宝"

    def _verify_payment(self, user_id, order_id, amount, pay_method):
        """模拟支付校验(实际场景对接支付渠道)"""
        # 模拟90%支付成功率
        import random
        if random.random() < 0.9:
            return {
                "success": True,
                "pay_flow_id": f"PAY{time.strftime('%Y%m%d%H%M%S')}{user_id}",
                "error_msg": ""
            }
        else:
            error_msgs = ["余额不足", "支付超时", "渠道维护中"]
            return {
                "success": False,
                "pay_flow_id": "",
                "error_msg": random.choice(error_msgs)
            }


class OrderAgent(BaseAgent):
    """订单管理智能体"""
    def __init__(self):
        super().__init__(Config.QUEUE_ORDER)
        # 模拟订单状态存储
        self.order_status_db = {}

    def callback(self, ch, method, properties, body):
        """处理订单相关逻辑:状态更新、物流关联、订单完成"""
        try:
            msg_data = json.loads(body.decode('utf-8'))
            user_id = msg_data.get("user_id")
            order_id = msg_data.get("order_id")
            amount = msg_data.get("amount")

            # 初始化订单状态为待发货
            self.order_status_db[order_id] = "待发货"
            # 打印订单状态更新消息(待发货)
            LogUtils.print_order_msg(
                operation_type="状态更新",
                user_id=user_id,
                order_id=order_id,
                status="更新成功",
                old_status="待支付",
                new_status="待发货"
            )

            # 模拟商家发货(实际场景由商家系统触发)
            time.sleep(2)  # 模拟处理耗时
            self.order_status_db[order_id] = "已发货"
            logistics_id = self._generate_logistics_id()
            # 打印订单状态更新消息(已发货)
            LogUtils.print_order_msg(
                operation_type="状态更新",
                user_id=user_id,
                order_id=order_id,
                status="更新成功",
                old_status="待发货",
                new_status="已发货",
                logistics_id=logistics_id
            )

            # 模拟用户确认收货(实际场景由用户操作触发)
            time.sleep(3)  # 模拟物流耗时
            self.order_status_db[order_id] = "已签收"
            # 打印订单状态更新消息(已签收)
            LogUtils.print_order_msg(
                operation_type="状态更新",
                user_id=user_id,
                order_id=order_id,
                status="更新成功",
                old_status="已发货",
                new_status="已签收",
                logistics_id=logistics_id
            )

            # 模拟订单完成(无售后)
            time.sleep(1)
            self.order_status_db[order_id] = "已完成"
            # 打印订单完成消息
            LogUtils.print_order_msg(
                operation_type="完成",
                user_id=user_id,
                order_id=order_id,
                status="已完成"
            )

            # 交接至售后智能体(满意度调研)
            aftersale_data = {
                "user_id": user_id,
                "order_id": order_id,
                "amount": amount,
                "logistics_id": logistics_id,
                "timestamp": time.time()
            }
            self.send_msg(Config.QUEUE_AFTERSALE, aftersale_data)

            ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            print(f"[订单处理失败] 错误:{str(e)}")
            self.send_to_exception(json.loads(body.decode('utf-8')), str(e))
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

    def _generate_logistics_id(self):
        """生成物流单号"""
        return f"LOG{time.strftime('%Y%m%d%H%M%S')}{random.randint(1000, 9999)}"


class AftersaleAgent(BaseAgent):
    """售后智能体"""
    def __init__(self):
        super().__init__(Config.QUEUE_AFTERSALE)

    def callback(self, ch, method, properties, body):
        """处理售后逻辑:满意度调研"""
        try:
            msg_data = json.loads(body.decode('utf-8'))
            user_id = msg_data.get("user_id")
            order_id = msg_data.get("order_id")
            print(f"[售后处理] 向用户{user_id}发送订单{order_id}的满意度调研:您好,您的订单已完成,请问对本次服务满意吗?")
            # 实际场景可对接调研问卷系统
            ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            print(f"[售后处理失败] 错误:{str(e)}")
            self.send_to_exception(json.loads(body.decode('utf-8')), str(e))
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)


class ExceptionAgent(BaseAgent):
    """异常处理智能体"""
    def __init__(self):
        super().__init__(Config.QUEUE_EXCEPTION)

    def callback(self, ch, method, properties, body):
        """处理各环节异常"""
        try:
            msg_data = json.loads(body.decode('utf-8'))
            source_queue = msg_data.get("source_queue")
            exception_msg = msg_data.get("exception_msg")
            data = msg_data.get("data")
            print(f"[异常处理] 来源队列:{source_queue} | 异常信息:{exception_msg} | 关联数据:{data}")
            # 实际场景可触发工单系统、通知运营人员等
            ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            print(f"[异常处理失败] 错误:{str(e)}")
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

6.4 启动与测试脚本

import threading
import random

def start_agent(agent_class):
    """启动单个智能体(线程方式)"""
    agent = agent_class()
    thread = threading.Thread(target=agent.start_listen, daemon=True)
    thread.start()
    return agent, thread

def send_test_consult_msg():
    """发送测试咨询消息"""
    # 启动咨询智能体的生产者端
    consult_agent = ConsultAgent()
    # 模拟3个用户咨询
    test_users = [
        {"user_id": "U1001", "user_msg": "我要购买商品G1001", "goods_id": "G1001"},
        {"user_id": "U1002", "user_msg": "商品G1002多少钱?", "goods_id": "G1002"},
        {"user_id": "U1003", "user_msg": "商品G1003有库存吗?我想买", "goods_id": "G1003"}
    ]
    for user in test_users:
        consult_agent.send_msg(Config.QUEUE_CONSULT, user)
        time.sleep(1)  # 间隔发送,避免消息拥堵
    consult_agent.close()

if __name__ == "__main__":
    try:
        # 1. 启动所有智能体
        agents = []
        agent_classes = [ConsultAgent, PurchaseAgent, PayAgent, OrderAgent, AftersaleAgent, ExceptionAgent]
        for cls in agent_classes:
            agent, thread = start_agent(cls)
            agents.append((agent, thread))
            time.sleep(0.5)  # 间隔启动,避免资源竞争

        # 2. 发送测试消息
        print("\n=== 开始发送测试咨询消息 ===")
        send_test_consult_msg()

        # 3. 保持主线程运行
        while True:
            time.sleep(10)
    except KeyboardInterrupt:
        print("\n=== 系统正在关闭 ===")
        # 关闭所有智能体连接
        for agent, _ in agents:
            agent.close()
        print("=== 系统已关闭 ===")

6.5 代码说明与扩展建议

  1. 代码结构:采用分层设计,BaseAgent封装通用消息队列操作,各业务智能体继承并实现具体业务逻辑,LogUtils统一管理消息打印,确保格式标准化。

  2. 核心功能:实现智能体间基于RabbitMQ的交接通信,覆盖咨询→购买→支付→订单→售后全流程,包含选品、下单、支付、订单状态更新等关键操作的消息打印。

  3. 扩展建议:

  • 生产环境优化:增加服务注册发现(如Consul)、配置中心(如Nacos)、熔断降级(如Sentinel)等组件。

  • 业务扩展:对接真实商品数据库、支付渠道API、物流系统,实现退款、退换货等完整售后流程。

  • 监控增强:基于Prometheus+Grafana实现智能体运行状态、交接成功率、消息打印量等指标监控。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐