山东大学-杏林集:智汇中医-项目实训(六)
实现多智能体辩证分析、SSE实时推送进度
一、多专用Agent的设计必要性
在前几个阶段,我们实现了智能问诊和体质评估,但这只是收集信息和静态判定。真正的中医诊疗需要多层次、多角度的综合分析:
-
根据症状判断证型(辨证)
-
查阅古籍中的经典治法(古为今用)
-
参考现代临床研究成果(今为今用)
-
制定个性化的食疗方案
-
审核方案的安全性
-
提供经络穴位、运动作息等调养建议
-
整合所有结果生成可读报告
如果用一个“万能Agent”同时完成所有任务,会出现三个问题:
| 问题 | 说明 |
|---|---|
| 上下文膨胀 | 一个提示词包含所有要求,token消耗巨大,且容易相互干扰 |
| 专业性不足 | 辨证需要严谨的理论依据,食疗需要食材知识,混在一起会互相稀释 |
| 难以独立优化 | 某个环节出错需要重新调整个大模型,调试困难 |
因此,设计了7个专用Agent,各司其职,通过编排器协同工作。
二、多智能体架构设计
2.1 Agent清单与职责
| Agent | 职责 | |
|---|---|---|
| diagnosis | 中医辨证(八纲、六经、脏腑、病机) | |
| ancient | 查询经典古籍条文、医案、经方 | |
| modern | 现代中医研究、循证证据 | |
| dietary_therapy | 食疗原则、推荐/禁忌食材、食疗方 | |
| safety | 配伍禁忌、体质冲突、特殊人群安全 | |
| convalescence | 针灸、推拿、气功、生活调护 | |
| report | 整合所有结果生成结构化报告 |
2.2 依赖关系与调度策略
根据中医诊疗的逻辑顺序,确定了Agent间的依赖关系:
diagnosis(辨证)是核心,所有其他Agent依赖它
├── ancient(古籍) ──┐
├── modern(现代文献)──┼── ancient和modern可并行,无相互依赖
├── dietary_therapy(食疗)── 依赖辨证
│ ├── safety(安全审核)── 依赖食疗───────────┐
│ └── convalescence(调养)── 依赖食疗和辩证──┼──(safety和convalescence可并行)
└── report(报告)── 依赖所有Agent完成
基于此,编排器采用五阶段流水线,并行执行无依赖的Agent:
| 阶段 | Agent | 执行方式 | 依赖 |
|---|---|---|---|
| 1 | diagnosis | 串行 | 无 |
| 2 | ancient + modern | 并行 | diagnosis |
| 3 | dietary_therapy | 串行 | diagnosis |
| 4 | safety + convalescence | 并行 | dietary_therapy |
| 5 | report | 串行 | 上述所有 |
效果:原本纯粹串行耗时稍久,通过两处并行优化,速度提升,用户体验提升。
2.3 统一基类设计(BaseAgent)
为了避免7个Agent重复实现公共功能,设计了抽象基类BaseAgent。所有专用Agent继承该类,只需关注自身的提示词构建和业务逻辑,而公共能力由基类统一提供。
核心功能模块:
| 功能 | 说明 |
|---|---|
| 统一接口 | 定义get_prompt()和run()抽象方法,强制子类实现 |
| LLM调用封装 | _call_llm_with_history()自动注入会话历史,减少重复代码 |
| 三级JSON容错提取 | _extract_json()处理LLM输出的不稳定格式(直接解析→代码块→花括号) |
| RAG检索集成 | _do_rag_retrieve()自动根据rag_knowledge_base检索知识库;子类只需重写_build_rag_query() |
| 状态管理 | 使用AgentStatus枚举(PENDING/RUNNING/COMPLETED/FAILED),配合set_status()统一更新 |
| SSE进度回调 | 通过_send_agent_progress()向编排器发送实时状态,支持前端展示 |
| 条件执行 | can_run()允许子类根据上下文决定是否跳过当前Agent |
简化子类实现的示例(以DiagnosisAgent为例):
class DiagnosisAgent(BaseAgent):
rag_knowledge_base = "agent_diagnosis" # 指定RAG知识库
async def _build_rag_query(self, **kwargs):
symptoms = kwargs.get("symptoms", [])
return " ".join(symptoms) + " 中医辨证", 5
def get_prompt(self, **kwargs):
# 构建专属提示词,可用rag_text参数
return prompts.diagnosis_agent(...)
async def run(self, **kwargs):
self.set_status(AgentStatus.RUNNING)
rag_text = await self._do_rag_retrieve(**kwargs)
prompt = self.get_prompt(rag_text=rag_text, ...)
response = await self._call_llm_with_history(prompt)
result = self._extract_json(response) # 三级容错
self.set_status(AgentStatus.COMPLETED)
return {"status": "completed", "data": result}
设计优势:
-
新增Agent只需继承基类并实现2–3个方法
-
RAG、SSE、JSON解析等能力开箱即用,且可随时优化基类惠及全部Agent
三、SSE实时进度推送:让用户看见思考过程
3.1 选择SSE而非WebSocket
| 特性 | SSE | WebSocket |
|---|---|---|
| 协议 | HTTP(简单) | WS(需升级) |
| 方向 | 服务器→客户端单向 | 双向 |
| 自动重连 | 浏览器原生支持 | 需手动实现 |
| 适用场景 | 进度推送、通知 | 实时聊天、游戏 |
我们的场景只需要服务器推送Agent执行状态,SSE足够且更轻量。
3.2 SSEManager设计
核心数据结构:Dict[session_id, asyncio.Queue],每个会话独立队列。
class SSEManager:
def __init__(self):
self._connections: Dict[str, asyncio.Queue] = {}
self._heartbeat_tasks: Dict[str, asyncio.Task] = {}
async def create_connection(self, session_id):
queue = asyncio.Queue()
self._connections[session_id] = queue
# 启动心跳任务,每15秒发送一次,防止连接断开
self._heartbeat_tasks[session_id] = asyncio.create_task(self._heartbeat_loop(session_id))
return queue
async def send_progress(self, session_id, agent_name, status, progress, message, data=None):
event = {...} # 构造JSON事件
await self._connections[session_id].put(json.dumps(event))
心跳机制:某些网络环境(如Nginx代理)会静默断开长时间无数据的连接。每15秒发送{"type":"heartbeat"}保持活跃。
3.3 平滑进度动画
Agent执行耗时不确定(2~8秒),如果只发送开始和结束,进度条会从0%突然跳到20%,体验生硬。因此在Orchestrator中实现平滑进度更新:
async def _smooth_progress(self, session_id, agent_name, start_progress, end_progress,
steps=4, interval=0.8, messages=None):
step_size = (end_progress - start_progress) / steps
for i in range(1, steps+1):
await asyncio.sleep(interval)
current = start_progress + step_size * i
message = messages[i-1] if messages else f"{agent_name}分析中..."
await self.sse_manager.send_progress(session_id, agent_name, "running",
min(current, end_progress), message)
例如,diagnosis Agent阶段进度范围0.0~0.2,后台任务在0.8秒、1.6秒、2.4秒、3.2秒依次发送“正在进行中医辨证分析...”“正在分析症状信息...”“正在匹配中医理论...”“正在生成辨证结果...”,进度从0.05→0.10→0.15→0.20。
四、编排器核心实现
Orchestrator.py负责Agent注册、依赖解析、并行执行、SSE集成。
4.1 五阶段流水线代码结构
async def run_diagnosis_pipeline(self, session_id, symptoms, user_profile, conversation_history):
results, context = {}, {}
# Stage 1: diagnosis
await self._run_single_agent_with_progress("diagnosis", ..., start=0.0, end=0.2)
# Stage 2: ancient + modern 并行
diagnosis_data = context.get("diagnosis", {})
tasks = [self._run_agent_with_progress("ancient", diagnosis_result=diagnosis_data, ...),
self._run_agent_with_progress("modern", diagnosis_result=diagnosis_data, ...)]
await asyncio.gather(*tasks, return_exceptions=True)
# Stage 3: dietary_therapy
await self._run_single_agent_with_progress("dietary_therapy", ..., start=0.5, end=0.65)
# Stage 4: safety + convalescence 并行
dietary_data = context.get("dietary_therapy", {})
tasks = [self._run_agent_with_progress("safety", dietary_result=dietary_data.get("food_therapy"), ...),
self._run_agent_with_progress("convalescence", dietary_result=dietary_data.get("food_therapy"), ...)]
await asyncio.gather(*tasks, return_exceptions=True)
# Stage 5: report
await self._run_single_agent_with_progress("report", ..., start=0.8, end=1.0)
return results
5.2 异常隔离与降级
asyncio.gather(..., return_exceptions=True)确保一个Agent失败不会导致整个流程崩溃。例如ancient检索超时,modern依然可以执行。失败Agent的结果标记为{"status": "failed"},report Agent会忽略缺失部分,在报告中标注“古籍信息暂时无法获取”。
五、API设计与前端接入
5.1 触发诊疗
POST /api/diagnosis/sessions/{session_id}/analyze
{
"session_id": "xxx",
"user_id": "1",
"profile_id": "2",
"symptoms": []
}
返回立即返回record_id,同时后端开始执行Agent流水线。
5.2 SSE订阅
前端在调用上述接口后,立即订阅SSE流:
const eventSource = new EventSource(`/api/diagnosis/sessions/${sessionId}/stream`);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'agent_progress') {
updateProgress(data.agent, data.progress, data.message);
} else if (data.type === 'complete') {
// 跳转到报告页面
router.push(`/report/${data.record_id}`);
eventSource.close();
}
};
5.3 报告获取
诊疗完成后,前端通过GET /api/diagnosis/records/{record_id}获取完整报告,包含辨证结果、古籍/现代文献、食疗方案、安全警告、调养建议等。
六、技术挑战与解决方案
6.1 并行执行时的上下文传递
问题:ancient和modern并行时,都需要diagnosis的结果。如果通过共享字典传递,会有竞态条件(虽然Python asyncio单线程下字典操作为原子,但读取时仍可能读到不完整写入)。
解决:在Stage1完成后,将diagnosis的完整数据赋值给局部变量diagnosis_data,Stage2的两个任务都收到此变量的副本(字典浅拷贝,但内部数据不变,安全)。
6.2 SSE连接管理的内存泄漏
问题:用户刷新页面或关闭浏览器,服务端的SSE连接队列和心跳任务不会自动清理。
解决:在diagnosis_stream的finally块中调用await sse_manager.close_connection(session_id),确保队列被删除、心跳任务被取消。同时使用asyncio.Lock保护共享字典的并发修改。
七、技术点总结
| 技术点 | 解决的问题 | 实现方式 |
|---|---|---|
| 多Agent分工 | 单一Agent无法同时处理辨证、古籍、食疗等 | 7个专用Agent,单一职责 |
| 并行调度 | 顺序执行耗时长 | 依赖图谱 + asyncio.gather |
| SSE实时推送 | 用户等待焦虑 | 心跳保活 + 平滑进度动画 |
| 三级容错 | LLM输出不稳定、RAG失败 | JSON正则提取 + asyncio异常隔离 |
| 可观测性 | 难以调试Agent输出 | 每个Agent记录RAG来源、执行耗时、输出内容 |
八、后续优化方向
-
引入LangGraph:当前依赖图谱是硬编码的,未来可声明式定义,支持条件分支和循环重试。
-
Agent模型异构:diagnosis 用 GPT 提高准确率,其他 Agent 用 DeepSeek 节省成本。
-
缓存机制:相同证型的古籍查询结果可缓存一天,避免重复调用LLM。
九、总结
本阶段完成了从“信息收集”到“智能分析”的关键跃迁。通过多智能体协同,系统能够:
-
自动辨证并给出多维依据(八纲、六经、脏腑、病机)
-
生成个性化食疗方案,并审核安全性
-
提供经络、运动、起居等综合调养建议
-
实时推送进度,让用户看见AI的思考过程
目前,整个诊疗流程已可完整运行,用户从问诊到获得养生报告的全流程体验顺畅。下一步将聚焦于RAG。
更多推荐

所有评论(0)