一、多专用Agent的设计必要性

在前几个阶段,我们实现了智能问诊和体质评估,但这只是收集信息和静态判定。真正的中医诊疗需要多层次、多角度的综合分析:

  • 根据症状判断证型(辨证)

  • 查阅古籍中的经典治法(古为今用)

  • 参考现代临床研究成果(今为今用)

  • 制定个性化的食疗方案

  • 审核方案的安全性

  • 提供经络穴位、运动作息等调养建议

  • 整合所有结果生成可读报告

如果用一个“万能Agent”同时完成所有任务,会出现三个问题:

问题 说明
上下文膨胀 一个提示词包含所有要求,token消耗巨大,且容易相互干扰
专业性不足 辨证需要严谨的理论依据,食疗需要食材知识,混在一起会互相稀释
难以独立优化 某个环节出错需要重新调整个大模型,调试困难

因此,设计了7个专用Agent,各司其职,通过编排器协同工作。

二、多智能体架构设计

2.1 Agent清单与职责

Agent 职责
diagnosis 中医辨证(八纲、六经、脏腑、病机)
ancient 查询经典古籍条文、医案、经方
modern 现代中医研究、循证证据
dietary_therapy 食疗原则、推荐/禁忌食材、食疗方
safety 配伍禁忌、体质冲突、特殊人群安全
convalescence 针灸、推拿、气功、生活调护
report 整合所有结果生成结构化报告

2.2 依赖关系与调度策略

根据中医诊疗的逻辑顺序,确定了Agent间的依赖关系:

diagnosis(辨证)是核心,所有其他Agent依赖它
    ├── ancient(古籍)   ──┐
    ├── modern(现代文献)──┼── ancient和modern可并行,无相互依赖
    ├── dietary_therapy(食疗)── 依赖辨证
    │       ├── safety(安全审核)── 依赖食疗───────────┐
    │       └── convalescence(调养)── 依赖食疗和辩证──┼──(safety和convalescence可并行)
    └── report(报告)── 依赖所有Agent完成

基于此,编排器采用五阶段流水线,并行执行无依赖的Agent:

阶段 Agent 执行方式 依赖
1 diagnosis 串行
2 ancient + modern 并行 diagnosis
3 dietary_therapy 串行 diagnosis
4 safety + convalescence 并行 dietary_therapy
5 report 串行 上述所有

效果:原本纯粹串行耗时稍久,通过两处并行优化,速度提升,用户体验提升。

2.3 统一基类设计(BaseAgent)

为了避免7个Agent重复实现公共功能,设计了抽象基类BaseAgent。所有专用Agent继承该类,只需关注自身的提示词构建和业务逻辑,而公共能力由基类统一提供。

核心功能模块

功能 说明
统一接口 定义get_prompt()run()抽象方法,强制子类实现
LLM调用封装 _call_llm_with_history()自动注入会话历史,减少重复代码
三级JSON容错提取 _extract_json()处理LLM输出的不稳定格式(直接解析→代码块→花括号)
RAG检索集成 _do_rag_retrieve()自动根据rag_knowledge_base检索知识库;子类只需重写_build_rag_query()
状态管理 使用AgentStatus枚举(PENDING/RUNNING/COMPLETED/FAILED),配合set_status()统一更新
SSE进度回调 通过_send_agent_progress()向编排器发送实时状态,支持前端展示
条件执行 can_run()允许子类根据上下文决定是否跳过当前Agent

简化子类实现的示例(以DiagnosisAgent为例):

class DiagnosisAgent(BaseAgent):
    rag_knowledge_base = "agent_diagnosis"    # 指定RAG知识库
    
    async def _build_rag_query(self, **kwargs):
        symptoms = kwargs.get("symptoms", [])
        return " ".join(symptoms) + " 中医辨证", 5
    
    def get_prompt(self, **kwargs):
        # 构建专属提示词,可用rag_text参数
        return prompts.diagnosis_agent(...)
    
    async def run(self, **kwargs):
        self.set_status(AgentStatus.RUNNING)
        rag_text = await self._do_rag_retrieve(**kwargs)
        prompt = self.get_prompt(rag_text=rag_text, ...)
        response = await self._call_llm_with_history(prompt)
        result = self._extract_json(response)   # 三级容错
        self.set_status(AgentStatus.COMPLETED)
        return {"status": "completed", "data": result}

设计优势

  • 新增Agent只需继承基类并实现2–3个方法

  • RAG、SSE、JSON解析等能力开箱即用,且可随时优化基类惠及全部Agent

三、SSE实时进度推送:让用户看见思考过程

3.1 选择SSE而非WebSocket

特性 SSE WebSocket
协议 HTTP(简单) WS(需升级)
方向 服务器→客户端单向 双向
自动重连 浏览器原生支持 需手动实现
适用场景 进度推送、通知 实时聊天、游戏

我们的场景只需要服务器推送Agent执行状态,SSE足够且更轻量。

3.2 SSEManager设计

核心数据结构:Dict[session_id, asyncio.Queue],每个会话独立队列。

class SSEManager:
    def __init__(self):
        self._connections: Dict[str, asyncio.Queue] = {}
        self._heartbeat_tasks: Dict[str, asyncio.Task] = {}
    
    async def create_connection(self, session_id):
        queue = asyncio.Queue()
        self._connections[session_id] = queue
        # 启动心跳任务,每15秒发送一次,防止连接断开
        self._heartbeat_tasks[session_id] = asyncio.create_task(self._heartbeat_loop(session_id))
        return queue
    
    async def send_progress(self, session_id, agent_name, status, progress, message, data=None):
        event = {...}  # 构造JSON事件
        await self._connections[session_id].put(json.dumps(event))

心跳机制:某些网络环境(如Nginx代理)会静默断开长时间无数据的连接。每15秒发送{"type":"heartbeat"}保持活跃。

3.3 平滑进度动画

Agent执行耗时不确定(2~8秒),如果只发送开始和结束,进度条会从0%突然跳到20%,体验生硬。因此在Orchestrator中实现平滑进度更新:

async def _smooth_progress(self, session_id, agent_name, start_progress, end_progress, 
                           steps=4, interval=0.8, messages=None):
    step_size = (end_progress - start_progress) / steps
    for i in range(1, steps+1):
        await asyncio.sleep(interval)
        current = start_progress + step_size * i
        message = messages[i-1] if messages else f"{agent_name}分析中..."
        await self.sse_manager.send_progress(session_id, agent_name, "running", 
                                              min(current, end_progress), message)

例如,diagnosis Agent阶段进度范围0.0~0.2,后台任务在0.8秒、1.6秒、2.4秒、3.2秒依次发送“正在进行中医辨证分析...”“正在分析症状信息...”“正在匹配中医理论...”“正在生成辨证结果...”,进度从0.05→0.10→0.15→0.20。

四、编排器核心实现

Orchestrator.py负责Agent注册、依赖解析、并行执行、SSE集成。

4.1 五阶段流水线代码结构

async def run_diagnosis_pipeline(self, session_id, symptoms, user_profile, conversation_history):
    results, context = {}, {}
    
    # Stage 1: diagnosis
    await self._run_single_agent_with_progress("diagnosis", ..., start=0.0, end=0.2)
    
    # Stage 2: ancient + modern 并行
    diagnosis_data = context.get("diagnosis", {})
    tasks = [self._run_agent_with_progress("ancient", diagnosis_result=diagnosis_data, ...),
             self._run_agent_with_progress("modern", diagnosis_result=diagnosis_data, ...)]
    await asyncio.gather(*tasks, return_exceptions=True)
    
    # Stage 3: dietary_therapy
    await self._run_single_agent_with_progress("dietary_therapy", ..., start=0.5, end=0.65)
    
    # Stage 4: safety + convalescence 并行
    dietary_data = context.get("dietary_therapy", {})
    tasks = [self._run_agent_with_progress("safety", dietary_result=dietary_data.get("food_therapy"), ...),
             self._run_agent_with_progress("convalescence", dietary_result=dietary_data.get("food_therapy"), ...)]
    await asyncio.gather(*tasks, return_exceptions=True)
    
    # Stage 5: report
    await self._run_single_agent_with_progress("report", ..., start=0.8, end=1.0)
    
    return results

5.2 异常隔离与降级

asyncio.gather(..., return_exceptions=True)确保一个Agent失败不会导致整个流程崩溃。例如ancient检索超时,modern依然可以执行。失败Agent的结果标记为{"status": "failed"},report Agent会忽略缺失部分,在报告中标注“古籍信息暂时无法获取”。

五、API设计与前端接入

5.1 触发诊疗

POST /api/diagnosis/sessions/{session_id}/analyze
{
    "session_id": "xxx",
    "user_id": "1",
    "profile_id": "2",
    "symptoms": []   
}

返回立即返回record_id,同时后端开始执行Agent流水线。

5.2 SSE订阅

前端在调用上述接口后,立即订阅SSE流:

const eventSource = new EventSource(`/api/diagnosis/sessions/${sessionId}/stream`);
eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    if (data.type === 'agent_progress') {
        updateProgress(data.agent, data.progress, data.message);
    } else if (data.type === 'complete') {
        // 跳转到报告页面
        router.push(`/report/${data.record_id}`);
        eventSource.close();
    }
};

5.3 报告获取

诊疗完成后,前端通过GET /api/diagnosis/records/{record_id}获取完整报告,包含辨证结果、古籍/现代文献、食疗方案、安全警告、调养建议等。

六、技术挑战与解决方案

6.1 并行执行时的上下文传递

问题:ancient和modern并行时,都需要diagnosis的结果。如果通过共享字典传递,会有竞态条件(虽然Python asyncio单线程下字典操作为原子,但读取时仍可能读到不完整写入)。

解决:在Stage1完成后,将diagnosis的完整数据赋值给局部变量diagnosis_data,Stage2的两个任务都收到此变量的副本(字典浅拷贝,但内部数据不变,安全)。

6.2 SSE连接管理的内存泄漏

问题:用户刷新页面或关闭浏览器,服务端的SSE连接队列和心跳任务不会自动清理。

解决:在diagnosis_streamfinally块中调用await sse_manager.close_connection(session_id),确保队列被删除、心跳任务被取消。同时使用asyncio.Lock保护共享字典的并发修改。

七、技术点总结

技术点 解决的问题 实现方式
多Agent分工 单一Agent无法同时处理辨证、古籍、食疗等 7个专用Agent,单一职责
并行调度 顺序执行耗时长 依赖图谱 + asyncio.gather
SSE实时推送 用户等待焦虑 心跳保活 + 平滑进度动画
三级容错 LLM输出不稳定、RAG失败 JSON正则提取 + asyncio异常隔离
可观测性 难以调试Agent输出 每个Agent记录RAG来源、执行耗时、输出内容

八、后续优化方向

  1. 引入LangGraph:当前依赖图谱是硬编码的,未来可声明式定义,支持条件分支和循环重试。

  2. Agent模型异构:diagnosis 用 GPT 提高准确率,其他 Agent 用 DeepSeek 节省成本。

  3. 缓存机制:相同证型的古籍查询结果可缓存一天,避免重复调用LLM。

九、总结

本阶段完成了从“信息收集”到“智能分析”的关键跃迁。通过多智能体协同,系统能够:

  • 自动辨证并给出多维依据(八纲、六经、脏腑、病机)

  • 生成个性化食疗方案,并审核安全性

  • 提供经络、运动、起居等综合调养建议

  • 实时推送进度,让用户看见AI的思考过程

目前,整个诊疗流程已可完整运行,用户从问诊到获得养生报告的全流程体验顺畅。下一步将聚焦于RAG。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐