[大模型架构] LangGraph AI 工作流编排(20)
定义智能体基类,规范所有智能体的核心接口(run执行方法、元数据、输入格式、输出格式),确保兼容性:python运行import json_schema_for_humans # 用于生成输入输出格式校验"""智能体基类(抽象接口)""""""获取智能体元数据(名称、描述、版本等)"""pass"""获取输入数据JSON Schema(用于校验输入合法性)"""pass"""获取输出数据JSON
一、多智能体协作的核心价值与设计原则
视频首先明确 “多智能体协作” 的本质是 “将复杂任务拆解为子任务,分配给具备专业能力的智能体,通过标准化通信实现协同完成目标”,核心价值与设计原则如下:
(一)核心价值
- 任务拆解与专业化分工:复杂任务(如 “PDF 分析→数据提取→报告生成→可视化展示”)拆解为子任务,每个智能体专注单一领域(如 PDF 解析智能体、数据分析智能体、报告生成智能体),提升执行效率与准确性;
- 灵活性与可扩展性:新增任务类型时,无需修改原有工作流,仅需新增对应智能体并配置协作规则;
- 容错性提升:单个智能体执行失败时,可自动切换备用智能体或重试,避免整个任务中断;
- 资源优化:根据任务复杂度动态调度智能体资源(如简单任务用轻量智能体,复杂任务用大模型智能体)。
(二)核心设计原则
- 角色单一职责原则:每个智能体仅负责一类核心任务(如 “数据提取智能体” 仅处理文本 / 表格提取,不涉及报告生成);
- 通信标准化原则:智能体间通过统一的数据格式(如 JSON Schema)传递信息,避免数据歧义;
- 动态调度原则:基于任务状态、智能体负载、执行结果自动分配 / 切换智能体;
- 冲突解决原则:明确智能体间优先级与冲突处理规则(如 “报告生成智能体” 需等待 “数据提取智能体” 完成,若超时则触发告警)。
二、多智能体设计与实现(核心实操)
视频中基于 LangGraph 实现多智能体架构,核心分为 “智能体抽象定义、具体智能体实现、智能体通信机制” 三部分:
(一)智能体抽象定义(统一接口)
定义智能体基类,规范所有智能体的核心接口(run执行方法、get_metadata元数据、get_input_schema输入格式、get_output_schema输出格式),确保兼容性:
python
运行
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
import json_schema_for_humans # 用于生成输入输出格式校验
class BaseAgent(ABC):
"""智能体基类(抽象接口)"""
@abstractmethod
def get_metadata(self) -> Dict[str, str]:
"""获取智能体元数据(名称、描述、版本等)"""
pass
@abstractmethod
def get_input_schema(self) -> Dict[str, Any]:
"""获取输入数据JSON Schema(用于校验输入合法性)"""
pass
@abstractmethod
def get_output_schema(self) -> Dict[str, Any]:
"""获取输出数据JSON Schema(用于校验输出合法性)"""
pass
@abstractmethod
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""执行智能体核心逻辑,输入符合schema的数据,返回符合schema的结果"""
pass
def validate_input(self, input_data: Dict[str, Any]) -> bool:
"""校验输入数据是否符合schema(基类实现通用逻辑)"""
from jsonschema import validate
try:
validate(instance=input_data, schema=self.get_input_schema())
return True
except Exception as e:
print(f"输入数据校验失败:{str(e)}")
return False
(二)具体智能体实现(3 个核心示例)
基于基类实现 3 个常用智能体,覆盖 “文档解析→数据提取→报告生成” 全流程:
- PDF 解析智能体(PDFParserAgent)
python
运行
class PDFParserAgent(BaseAgent):
def get_metadata(self) -> Dict[str, str]:
return {
"agent_name": "pdf_parser_agent",
"description": "解析PDF文件,提取文本、表格数据,支持纯文本PDF与扫描件(OCR)",
"version": "1.0.0",
"required_tools": ["pdf_text_extract", "pdf_table_extract", "ocr_tool"]
}
def get_input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "PDF文件绝对路径"},
"extract_text": {"type": "boolean", "default": True, "description": "是否提取文本"},
"extract_tables": {"type": "boolean", "default": True, "description": "是否提取表格"},
"use_ocr": {"type": "boolean", "default": False, "description": "扫描件PDF需启用OCR"}
},
"required": ["file_path"]
}
def get_output_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"data": {
"type": "object",
"properties": {
"text_content": {"type": "string", "description": "提取的文本内容"},
"tables": {
"type": "array",
"items": {"type": "array", "description": "表格数据(二维数组)"}
}
}
},
"message": {"type": "string"}
}
}
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
if not self.validate_input(input_data):
return {"success": False, "data": None, "message": "输入数据非法"}
try:
file_path = input_data["file_path"]
extract_text = input_data.get("extract_text", True)
extract_tables = input_data.get("extract_tables", True)
use_ocr = input_data.get("use_ocr", False)
# 调用工具执行解析(复用之前定义的PDF工具)
from tools import pdf_text_extract_tool, pdf_table_extract_tool
result = {"text_content": "", "tables": []}
if extract_text:
text_res = pdf_text_extract_tool.run({"file_path": file_path, "use_ocr": use_ocr})
if text_res["success"]:
result["text_content"] = "\n".join(text_res["data"])
if extract_tables:
table_res = pdf_table_extract_tool.run({"file_path": file_path, "use_ocr": use_ocr})
if table_res["success"]:
result["tables"] = table_res["data"]
return {"success": True, "data": result, "message": "PDF解析成功"}
except Exception as e:
return {"success": False, "data": None, "message": f"PDF解析失败:{str(e)}"}
- 数据分析智能体(DataAnalyzerAgent)
python
运行
class DataAnalyzerAgent(BaseAgent):
def get_metadata(self) -> Dict[str, str]:
return {
"agent_name": "data_analyzer_agent",
"description": "对提取的表格数据进行统计分析(求和、平均值、趋势分析)",
"version": "1.0.0",
"required_tools": ["pandas_analysis_tool"]
}
def get_input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"tables": {
"type": "array",
"items": {"type": "array", "description": "输入表格数据(二维数组)"}
},
"analysis_type": {
"type": "string",
"enum": ["sum", "average", "trend", "all"],
"default": "all",
"description": "分析类型:求和/平均值/趋势/全部"
}
},
"required": ["tables"]
}
def get_output_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"data": {
"type": "object",
"properties": {
"sum_result": {"type": "array", "description": "各列求和结果"},
"average_result": {"type": "array", "description": "各列平均值结果"},
"trend_result": {"type": "string", "description": "数据趋势分析描述"}
}
},
"message": {"type": "string"}
}
}
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
if not self.validate_input(input_data):
return {"success": False, "data": None, "message": "输入数据非法"}
try:
import pandas as pd
tables = input_data["tables"]
analysis_type = input_data.get("analysis_type", "all")
result = {"sum_result": [], "average_result": [], "trend_result": ""}
# 转换表格数据为DataFrame
df = pd.DataFrame(tables[1:], columns=tables[0]) # 第一行为表头
df = df.apply(pd.to_numeric, errors="ignore") # 转换数值类型
# 执行分析
if analysis_type in ["sum", "all"]:
sum_result = df.select_dtypes(include="number").sum().tolist()
result["sum_result"] = sum_result
if analysis_type in ["average", "all"]:
avg_result = df.select_dtypes(include="number").mean().tolist()
result["average_result"] = avg_result
if analysis_type in ["trend", "all"]:
# 简单趋势分析(示例)
numeric_cols = df.select_dtypes(include="number").columns
if len(numeric_cols) > 0:
col = numeric_cols[0]
if df[col].iloc[-1] > df[col].iloc[0]:
result["trend_result"] = f"{col}呈现上升趋势"
else:
result["trend_result"] = f"{col}呈现下降趋势"
return {"success": True, "data": result, "message": "数据分析成功"}
except Exception as e:
return {"success": False, "data": None, "message": f"数据分析失败:{str(e)}"}
- 报告生成智能体(ReportGeneratorAgent)
python
运行
class ReportGeneratorAgent(BaseAgent):
def get_metadata(self) -> Dict[str, str]:
return {
"agent_name": "report_generator_agent",
"description": "基于文本内容和分析结果生成结构化报告(Markdown格式)",
"version": "1.0.0",
"required_tools": ["llm_tool"] # 依赖大模型工具生成自然语言报告
}
def get_input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"text_content": {"type": "string", "description": "PDF提取的文本内容"},
"analysis_data": {"type": "object", "description": "数据分析结果"},
"report_title": {"type": "string", "default": "PDF分析报告", "description": "报告标题"},
"report_type": {"type": "string", "enum": ["brief", "detail"], "default": "detail", "description": "报告类型:简要/详细"}
},
"required": ["text_content", "analysis_data"]
}
def get_output_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"data": {"type": "string", "description": "Markdown格式的报告内容"},
"message": {"type": "string"}
}
}
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
if not self.validate_input(input_data):
return {"success": False, "data": None, "message": "输入数据非法"}
try:
text_content = input_data["text_content"]
analysis_data = input_data["analysis_data"]
report_title = input_data.get("report_title", "PDF分析报告")
report_type = input_data.get("report_type", "detail")
# 调用大模型生成报告(示例:使用LangChain的LLM工具)
from langchain_openai import OpenAI
llm = OpenAI(model="gpt-3.5-turbo-instruct")
prompt = f"""
基于以下内容生成{report_type}报告(Markdown格式):
1. 文本摘要:{text_content[:500]}(仅取前500字)
2. 数据分析结果:{json.dumps(analysis_data, ensure_ascii=False)}
报告要求:结构清晰,包含标题、文本摘要、数据分析结果、结论,语言简洁专业。
"""
report_content = llm.invoke(prompt)
return {"success": True, "data": report_content, "message": "报告生成成功"}
except Exception as e:
return {"success": False, "data": None, "message": f"报告生成失败:{str(e)}"}
(三)智能体通信机制(标准化数据传递)
通过 LangGraph 的 State 统一存储智能体间的通信数据,定义包含 “任务状态、各智能体输入输出、协作上下文” 的 State 类,实现智能体间无缝协作:
python
运行
from langgraph.graph import State
from typing import Dict, Optional, Any
class MultiAgentState(State):
"""多智能体协作State(统一通信载体)"""
# 任务基础信息
task_id: str = ""
task_status: str = "running" # running/completed/failed/paused
task_error: str = ""
# 各智能体输入输出数据(key:智能体名称,value:输入/输出)
agent_inputs: Dict[str, Any] = {}
agent_outputs: Dict[str, Any] = {}
# 协作上下文(传递给后续智能体的中间数据)
context: Dict[str, Any] = {}
# 动态配置(如当前执行智能体、下一步智能体)
current_agent: Optional[str] = None
next_agent: Optional[str] = None
三、复杂工作流编排(LangGraph 动态调度)
视频的核心环节是基于 LangGraph 实现 “多智能体协同的动态工作流”,支持条件分支、循环执行、智能体动态切换,核心分为 “工作流节点设计、流转规则定义、动态调度逻辑” 三部分:
(一)工作流节点设计(智能体执行节点)
每个节点对应一个智能体的执行逻辑,负责 “接收上下文数据→构造智能体输入→调用智能体→存储输出到 State→更新上下文”:
python
运行
# 初始化所有智能体实例
pdf_parser_agent = PDFParserAgent()
data_analyzer_agent = DataAnalyzerAgent()
report_generator_agent = ReportGeneratorAgent()
# 智能体执行节点(通用模板)
def agent_exec_node(agent: BaseAgent) -> callable:
"""生成智能体执行节点的通用函数"""
def node(state: MultiAgentState) -> MultiAgentState:
# 标记当前执行智能体
state.current_agent = agent.get_metadata()["agent_name"]
try:
# 从上下文构造智能体输入数据
input_data = state.context.get(f"{state.current_agent}_input", {})
# 校验输入数据
if not agent.validate_input(input_data):
state.task_status = "failed"
state.task_error = f"智能体{state.current_agent}输入数据非法"
return state
# 存储智能体输入
state.agent_inputs[state.current_agent] = input_data
# 执行智能体
agent_result = agent.run(input_data)
# 存储智能体输出
state.agent_outputs[state.current_agent] = agent_result
# 更新上下文(将当前智能体输出传递给下一个智能体)
if agent_result["success"]:
state.context[f"{state.current_agent}_output"] = agent_result["data"]
# 根据当前智能体动态指定下一个智能体(示例:PDF解析→数据分析→报告生成)
agent_name = state.current_agent
if agent_name == "pdf_parser_agent":
state.next_agent = "data_analyzer_agent"
# 构造数据分析智能体的输入(使用PDF解析结果)
state.context["data_analyzer_agent_input"] = {
"tables": agent_result["data"]["tables"]
}
elif agent_name == "data_analyzer_agent":
state.next_agent = "report_generator_agent"
# 构造报告生成智能体的输入(使用PDF文本+分析结果)
state.context["report_generator_agent_input"] = {
"text_content": state.context.get("pdf_parser_agent_output", {}).get("text_content", ""),
"analysis_data": agent_result["data"]
}
elif agent_name == "report_generator_agent":
state.next_agent = None # 所有智能体执行完成
state.task_status = "completed"
else:
state.task_status = "failed"
state.task_error = f"智能体{state.current_agent}执行失败:{agent_result['message']}"
return state
except Exception as e:
state.task_status = "failed"
state.task_error = f"智能体{state.current_agent}节点执行异常:{str(e)}"
return state
return node
# 生成各智能体的执行节点
pdf_parser_node = agent_exec_node(pdf_parser_agent)
data_analyzer_node = agent_exec_node(data_analyzer_agent)
report_generator_node = agent_exec_node(report_generator_agent)
(二)流转规则定义(条件分支与动态调度)
定义工作流的流转规则,支持 “按智能体执行结果分支、按任务类型动态选择智能体、循环重试失败智能体”:
python
运行
def workflow_router(state: MultiAgentState) -> str:
"""工作流路由函数(定义流转规则)"""
# 任务失败:终止流程
if state.task_status == "failed":
return "end"
# 所有智能体执行完成:终止流程
if state.next_agent is None:
return "end"
# 动态流转到下一个智能体节点
agent_to_node_map = {
"pdf_parser_agent": "pdf_parser",
"data_analyzer_agent": "data_analyzer",
"report_generator_agent": "report_generator"
}
return agent_to_node_map.get(state.next_agent, "end")
# 失败重试节点(可选)
def retry_node(state: MultiAgentState) -> MultiAgentState:
"""智能体执行失败后的重试节点(最多重试2次)"""
retry_count = state.context.get("retry_count", 0)
if retry_count < 2:
state.context["retry_count"] = retry_count + 1
# 重试当前智能体
return state
else:
state.task_status = "failed"
state.task_error = f"智能体{state.current_agent}重试2次仍失败"
return state
(三)工作流编译与执行(完整流程)
将节点与流转规则整合到 LangGraph,编译并执行多智能体协作工作流:
python
运行
from langgraph import Graph
import uuid
# 1. 创建Graph实例
graph = Graph(MultiAgentState)
# 2. 添加节点
graph.add_node("pdf_parser", pdf_parser_node) # PDF解析节点
graph.add_node("data_analyzer", data_analyzer_node) # 数据分析节点
graph.add_node("report_generator", report_generator_node) # 报告生成节点
graph.add_node("retry", retry_node) # 重试节点
graph.add_node("end", lambda s: s) # 结束节点
# 3. 定义流转规则
# 初始节点→PDF解析节点
graph.set_entry_point("pdf_parser")
# 各智能体节点→路由函数(动态流转)
graph.add_edge("pdf_parser", workflow_router)
graph.add_edge("data_analyzer", workflow_router)
graph.add_edge("report_generator", workflow_router)
# 重试节点→当前智能体节点(重新执行)
graph.add_edge("retry", lambda s: s.current_agent)
# 路由函数→对应节点/结束节点
graph.add_conditional_edges(
"workflow_router",
workflow_router,
{
"pdf_parser": "pdf_parser",
"data_analyzer": "data_analyzer",
"report_generator": "report_generator",
"retry": "retry",
"end": "end"
}
)
# 4. 编译工作流
multi_agent_app = graph.compile()
# 5. 执行多智能体协作任务
def run_multi_agent_task(file_path: str) -> MultiAgentState:
"""执行多智能体协作任务(PDF解析→数据分析→报告生成)"""
task_id = str(uuid.uuid4())
# 初始化State(构造PDF解析智能体的输入)
initial_state = MultiAgentState(
task_id=task_id,
context={
"pdf_parser_agent_input": {
"file_path": file_path,
"extract_text": True,
"extract_tables": True,
"use_ocr": False
}
}
)
# 执行工作流
final_state = multi_agent_app.invoke(initial_state)
return final_state
# 调用示例
final_state = run_multi_agent_task("C:/docs/business_report.pdf")
if final_state.task_status == "completed":
# 获取最终报告
report_content = final_state.agent_outputs["report_generator_agent"]["data"]
print("多智能体协作完成,报告内容:", report_content)
else:
print("任务执行失败:", final_state.task_error)
(四)高级特性:动态添加智能体与流转规则
支持运行时动态添加新智能体与流转规则,无需重新编译工作流,适配灵活扩展场景:
python
运行
def add_agent_to_workflow(
graph: Graph,
agent: BaseAgent,
prev_agent_name: str, # 前置智能体名称(在哪个智能体之后执行)
next_agent_name: Optional[str] = None # 后置智能体名称(执行后流转到哪个智能体)
) -> Graph:
"""动态向工作流添加新智能体"""
# 生成新智能体的执行节点
new_agent_node = agent_exec_node(agent)
new_agent_name = agent.get_metadata()["agent_name"]
# 添加节点到工作流
graph.add_node(new_agent_name, new_agent_node)
# 更新流转规则:前置智能体→新智能体→后置智能体/结束
graph.add_conditional_edges(
prev_agent_name,
lambda s: new_agent_name if s.next_agent == new_agent_name else s.next_agent,
{new_agent_name: new_agent_name}
)
if next_agent_name:
# 新智能体→后置智能体
graph.add_edge(new_agent_name, lambda s: next_agent_name)
else:
# 新智能体→结束
graph.add_edge(new_agent_name, "end")
return graph
# 示例:动态添加“可视化生成智能体”(在报告生成后执行)
class VisualizationAgent(BaseAgent):
# 实现智能体基类的抽象方法(略,功能:基于报告数据生成图表)
pass
visualization_agent = VisualizationAgent()
# 动态添加到工作流(报告生成智能体之后执行)
multi_agent_app.graph = add_agent_to_workflow(
multi_agent_app.graph,
visualization_agent,
prev_agent_name="report_generator_agent"
)
四、Electron 前端适配(多智能体可视化管控)
视频最后实现了 Electron 前端对多智能体工作流的 “可视化配置、实时状态监控、结果展示”,核心分为 “智能体配置面板、工作流监控面板、结果展示面板” 三部分:
(一)智能体配置面板(动态配置智能体参数)
前端通过 IPC 获取所有智能体的元数据与输入输出 Schema,动态生成配置表单,支持用户自定义智能体参数:
tsx
import { useState, useEffect } from 'react';
import { Form, Button, Input, Switch, Select, Space, message } from 'antd';
import { JsonSchemaForm } from '@rjsf/antd'; // 基于JSON Schema动态生成表单
import '@rjsf/core/dist/css/index.css';
import '@rjsf/antd/dist/css/index.css';
const AgentConfigPanel = ({ taskId }) => {
const [agents, setAgents] = useState([]);
const [selectedAgent, setSelectedAgent] = useState("");
const [agentSchema, setAgentSchema] = useState({});
const [formData, setFormData] = useState({});
// 加载所有智能体元数据
useEffect(() => {
const fetchAgents = async () => {
const res = await window.electronAPI.getAgentsMetadata();
if (res.success) {
setAgents(res.data);
if (res.data.length > 0) {
setSelectedAgent(res.data[0].agent_name);
}
}
};
fetchAgents();
}, []);
// 加载选中智能体的输入Schema
useEffect(() => {
const fetchAgentSchema = async () => {
if (!selectedAgent) return;
const res = await window.electronAPI.getAgentInputSchema(selectedAgent);
if (res.success) {
setAgentSchema(res.data);
}
};
fetchAgentSchema();
}, [selectedAgent]);
// 提交智能体配置,启动任务
const handleSubmit = async () => {
try {
const res = await window.electronAPI.startMultiAgentTask({
taskId,
agentName: selectedAgent,
config: formData
});
if (res.success) {
message.success("多智能体任务已启动");
} else {
message.error(`启动失败:${res.error}`);
}
} catch (e) {
message.error(`提交异常:${e}`);
}
};
return (
<Space direction="vertical" size="large" style={{ width: '100%', padding: '20px' }}>
<Select
value={selectedAgent}
onChange={setSelectedAgent}
style={{ width: 300 }}
options={agents.map(agent => ({
label: agent.agent_name,
value: agent.agent_name,
description: agent.description
}))}
/>
{agentSchema.type && (
<JsonSchemaForm
schema={agentSchema}
formData={formData}
onChange={({ formData }) => setFormData(formData)}
submitButtonOptions={{ show: false }}
/>
)}
<Button type="primary" onClick={handleSubmit}>启动多智能体任务</Button>
</Space>
);
};
export default AgentConfigPanel;
(二)工作流监控面板(实时查看智能体协作状态)
前端通过 IPC 实时获取工作流 State,展示 “当前执行智能体、各智能体执行状态、任务进度、错误信息”,支持暂停 / 恢复 / 终止任务:
tsx
import { useState, useEffect } from 'react';
import { Table, Button, Tag, Space, message } from 'antd';
import { PauseOutlined, PlayOutlined, StopOutlined } from '@ant-design/icons';
const WorkflowMonitorPanel = ({ taskId }) => {
const [taskState, setTaskState] = useState(null);
const [loading, setLoading] = useState(false);
// 实时刷新任务状态(每3秒一次)
useEffect(() => {
const fetchTaskState = async () => {
setLoading(true);
const res = await window.electronAPI.getMultiAgentTaskState(taskId);
if (res.success) {
setTaskState(res.data);
} else {
message.error(`获取状态失败:${res.error}`);
}
setLoading(false);
};
fetchTaskState();
const timer = setInterval(fetchTaskState, 3000);
return () => clearInterval(timer);
}, [taskId]);
// 任务控制:暂停/恢复/终止
const handleTaskControl = async (action) => {
const res = await window.electronAPI.controlMultiAgentTask({ taskId, action });
if (res.success) {
message.success(`任务${action}成功`);
} else {
message.error(`任务${action}失败:${res.error}`);
}
};
// 智能体执行状态表格列
const agentColumns = [
{ title: '智能体名称', dataIndex: 'agent_name', key: 'agent_name' },
{ title: '执行状态', dataIndex: 'status', key: 'status', render: (status) => (
<Tag color={status === 'success' ? 'green' : status === 'failed' ? 'red' : 'blue'}>
{status === 'success' ? '执行成功' : status === 'failed' ? '执行失败' : '执行中'}
</Tag>
)},
{ title: '执行时间', dataIndex: 'exec_time', key: 'exec_time', default: 'N/A' },
{ title: '错误信息', dataIndex: 'error', key: 'error', ellipsis: true },
];
if (!taskState) return <div>任务未启动</div>;
return (
<Space direction="vertical" size="large" style={{ width: '100%', padding: '20px' }}>
<Space>
<Button icon={<PauseOutlined />} onClick={() => handleTaskControl('pause')} disabled={taskState.task_status !== 'running'}>
暂停任务
</Button>
<Button icon={<PlayOutlined />} onClick={() => handleTaskControl('resume')} disabled={taskState.task_status !== 'paused'}>
恢复任务
</Button>
<Button icon={<StopOutlined />} onClick={() => handleTaskControl('stop')} disabled={['completed', 'failed'].includes(taskState.task_status)} danger>
终止任务
</Button>
</Space>
<Table
columns={agentColumns}
dataSource={Object.entries(taskState.agent_outputs).map(([agentName, output]) => ({
key: agentName,
agent_name: agentName,
status: output.success ? 'success' : 'failed',
error: output.message || ''
}))}
title={() => `任务ID:${taskId} | 任务状态:${taskState.task_status}`}
loading={loading}
/>
</Space>
);
};
export default WorkflowMonitorPanel;
(三)结果展示面板(展示多智能体协作最终成果)
展示报告、图表等最终结果,支持下载、分享、二次编辑:
tsx
import { useState, useEffect } from 'react';
import { Card, Button, Space, message, Divider } from 'antd';
import { DownloadOutlined, ShareOutlined, EditOutlined } from '@ant-design/icons';
import ReactMarkdown from 'react-markdown'; // 展示Markdown报告
const ResultDisplayPanel = ({ taskId }) => {
const [result, setResult] = useState(null);
const [loading, setLoading] = useState(false);
useEffect(() => {
const fetchResult = async () => {
setLoading(true);
const res = await window.electronAPI.getMultiAgentTaskResult(taskId);
if (res.success) {
setResult(res.data);
} else {
message.error(`获取结果失败:${res.error}`);
}
setLoading(false);
};
fetchResult();
}, [taskId]);
// 下载报告
const handleDownload = async () => {
const res = await window.electronAPI.downloadMultiAgentResult({ taskId, format: 'markdown' });
if (res.success) {
message.success("报告下载成功");
} else {
message.error(`下载失败:${res.error}`);
}
};
if (loading) return <div>加载结果中...</div>;
if (!result) return <div>暂无结果</div>;
return (
<Card title="多智能体协作结果" style={{ width: '100%', margin: '20px' }}>
<Space style={{ marginBottom: '20px' }}>
<Button icon={<DownloadOutlined />} onClick={handleDownload}>下载报告</Button>
<Button icon={<ShareOutlined />}>分享结果</Button>
<Button icon={<EditOutlined />}>编辑报告</Button>
</Space>
<Divider />
<ReactMarkdown>{result.report_content}</ReactMarkdown>
</Card>
);
};
export default ResultDisplayPanel;
五、多智能体协作常见问题与优化策略
| 问题现象 | 核心原因 | 解决方案 |
|---|---|---|
| 智能体输入数据校验失败 | 上下文数据格式与智能体输入 Schema 不匹配 | 1. 前端配置表单基于 Schema 生成,强制用户输入合法数据;2. 后端在构造输入时添加格式转换逻辑;3. State 中存储数据时统一序列化 |
| 智能体执行超时导致任务阻塞 | 未设置智能体执行超时时间,或无超时处理机制 | 1. 在智能体run方法中添加超时控制(如timeout=60s);2. 超时后自动切换备用智能体或触发重试;3. 超时超过阈值则终止任务并告警 |
| 多智能体协作上下文数据冗余 | 上下文存储过多无用数据,导致 State 体积过大 | 1. 仅存储各智能体执行必需的中间数据;2. 智能体执行完成后清理无用上下文;3. 大体积数据(如大文本、图表)单独存储,上下文仅存引用 |
| 动态添加智能体后流转规则冲突 | 未更新工作流路由函数,导致新智能体无法流转 | 1. 动态添加智能体时自动更新路由函数的agent_to_node_map;2. 新增智能体后重新编译工作流;3. 流转规则中添加 “未知智能体” 分支,避免流程中断 |
| 智能体执行失败后无法恢复 | 无重试机制或重试逻辑不合理 | 1. 实现基于重试次数的重试机制(最多重试 2-3 次);2. 重试失败后切换备用智能体;3. 记录失败原因,便于后续排查 |
更多推荐

所有评论(0)