大模型实战:用图式工作流编排多智能体—LangGraph
本文提出使用图式工作流(Graph-based Workflow)来解决多智能体系统中的流程编排问题。通过将有向图概念引入工作流设计,将每个Agent封装为节点(Node),流程控制转化为有向边(Edge),实现以下优势: 核心架构: 节点抽象:封装Agent/工具为可复用单元 有向图引擎:支持条件分支、顺序/并行执行 全局状态管理:通过共享上下文(state)传递数据 典型实现: 构建路由、检索
前面几篇已经有了:
- 基于向量库的 RAG(含多模态);
- 多智能体中台(不同 Agent 各司其职);
- 监控、日志、成本估算;
- 向量数据库选型与性能调优。
实际写代码时,你多半是用「一长串 Python 逻辑」把这些 Agent 串起来的,比如:
# 伪代码
res1 = qa_agent.handle(...)
res2 = tool_agent.handle(res1, ...)
res3 = summary_agent.handle(res2, ...)
return res3
这个方式在 Demo 阶段没问题,但只要业务流程一复杂,就会马上遇到这些痛点:
- if/else 分支越来越多,很难一眼看懂“流程图”;
- 想插入一个新步骤,要改很多地方;
- 无法直观追踪「一次调用走过哪些节点」;
- 很难做「局部重试」「分支并行」。
这些问题,本质上都是工作流编排问题。
这篇就用 LangGraph 的「图式工作流」思路,带你构建一套:
以“有向图”来描述和执行多智能体工作流的框架
让每个 Agent 变成图上的一个节点,流程变成一张可视化的有向图。
一、图式工作流的核心思想
一句话概括:
不再写“线性脚本”,而是写一张「有向图」,
每个节点是一个 Agent 或工具,每条边代表任务流向。
一个典型流程可以画成这样的结构(文字版):
用户请求
│
▼
[路由节点 Router]
├──(问答)────► [QA_Agent] ──► [Answer_Formatter]
├──(数据处理)► [Data_Agent] ─► [Report_Agent]
└──(多步流程)► [Planner] ─────► [执行子图 Subgraph]
优点很直接:
- 流程结构可视化:一眼能看出有哪些节点、怎么连的;
- 方便扩展:新增一个节点,只要接到图上合适的位置;
- 易于调试:可以在节点级别记录输入/输出、耗时、错误;
- 更适合并行:天然可以在图上跑分支并行。
下面用一个「多步骤任务处理」的小例子,从 0 开始搭一套极简图式工作流引擎(思路与 LangGraph 类似,但实现更轻量,更方便你直接改到项目里用)。
二、定义节点(Node)与有向边(Edge)
我们先抽象两个基础概念:
- 节点(Node):可以是一个 Agent、一个工具调用、一个小函数;
- 边(Edge):从节点 A 指向节点 B 的有向边,表示“执行完 A 之后进入 B”。
2.1 Node 抽象
from abc import ABC, abstractmethod
from typing import Dict, Any
class Node(ABC):
"""图上的一个节点:可以是 Agent、工具或简单函数"""
def __init__(self, name: str):
self.name = name
@abstractmethod
def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""
:param state: 当前工作流的状态(上下文)
:return: 更新后的状态
"""
pass
这里的 state 是一个「全局上下文」,在整个图中流转,类似:
{
"user_query": "...",
"retrieved_docs": [...],
"answer": "...",
"plan": [...],
...
}
2.2 Edge 与 Graph
from typing import Callable, List, Optional
class Edge:
def __init__(self,
source: str,
target: str,
condition: Optional[Callable[Dictstr, Any](# "Reference Dictstr, Any \|\|\| __GENERATING_DETAILS__"), bool]] = None):
"""
:param source: 源节点名称
:param target: 目标节点名称
:param condition: 条件函数,返回 True 才会走这条边
"""
self.source = source
self.target = target
self.condition = condition
class Graph:
"""极简有向图工作流"""
def __init__(self):
self.nodes: Dict[str, Node] = {}
self.edges: List[Edge] = []
self.start_node: Optional[str] = None
def add_node(self, node: Node, is_start: bool = False):
self.nodes[node.name] = node
if is_start:
self.start_node = node.name
def add_edge(self,
source: str,
target: str,
condition: Optional[Callable[Dictstr, Any](# "Reference Dictstr, Any \|\|\| __GENERATING_DETAILS__"), bool]] = None):
self.edges.append(Edge(source, target, condition))
def _get_next_nodes(self, current: str, state: Dict[str, Any]) -> List[str]:
result = []
for e in self.edges:
if e.source != current:
continue
if e.condition is None or e.condition(state):
result.append(e.target)
return result
def run(self, init_state: Dict[str, Any]) -> Dict[str, Any]:
if not self.start_node:
raise RuntimeError("未设置起始节点")
state = dict(init_state)
current = self.start_node
visited = set()
while current is not None:
node = self.nodes[current]
# 这里可以加日志/监控
state = node.run(state)
visited.add(current)
next_nodes = self._get_next_nodes(current, state)
if not next_nodes:
# 无后继节点,结束
break
elif len(next_nodes) == 1:
current = next_nodes[0]
else:
# 简化:多分支时只取第一个,你可以改成并行执行
current = next_nodes[0]
return state
这个 Graph 就是一个最小可用的「工作流引擎」:
你只需要实现每个节点的 run,再用 add_node + add_edge 把它们接成一张图,调用 graph.run() 即可执行整套流程。
三、用节点封装 Agent / 工具
下面我们以一个「多步骤任务」为例,构建 4 个节点:
RouterNode:根据用户输入,决定走什么流程;RetrieveNode:从向量库/RAG 检索文档;AnswerNode:调用大模型生成回答;SummaryNode:对多轮结果做最终总结。
3.1 RouterNode:路由节点
class RouterNode(Node):
def __init__(self, llm_client):
super().__init__("router")
self.llm = llm_client
def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
query = state["user_query"]
# 这里用简单规则/模型判断意图
# 真实项目里可以用分类模型或少样本提示词
if "总结" in query or "概括" in query:
intent = "summary"
elif "怎么" in query or "如何" in query:
intent = "qa"
else:
intent = "qa"
state["intent"] = intent
return state
3.2 RetrieveNode:检索节点
class RetrieveNode(Node):
def __init__(self, retriever):
super().__init__("retrieve")
self.retriever = retriever
def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
query = state["user_query"]
docs = self.retriever.retrieve(query, k=5)
state["retrieved_docs"] = docs
return state
3.3 AnswerNode:问答节点
class AnswerNode(Node):
def __init__(self, llm_client):
super().__init__("answer")
self.llm = llm_client
def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
docs = state.get("retrieved_docs", [])
context = "\n\n".join(docs) if isinstance(docs, list) else str(docs)
query = state["user_query"]
messages = [
{"role": "system", "content": "你是一个文档问答助手,请基于给定上下文回答。"},
{"role": "user", "content": f"问题:{query}\n\n上下文:\n{context}"}
]
answer = self.llm.chat(messages, scene="qa", agent="answer_agent")
state["answer"] = answer
return state
3.4 SummaryNode:总结节点
class SummaryNode(Node):
def __init__(self, llm_client):
super().__init__("summary")
self.llm = llm_client
def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
# 这里假设有一个历史答案列表
answers = state.get("history_answers", [])
if "answer" in state:
answers.append(state["answer"])
state["history_answers"] = answers
summary_input = "\n\n".join(
f"- {a}" for a in answers
)
messages = [
{"role": "system", "content": "你是一个总结助手,请对多条答复做归纳。"},
{"role": "user", "content": f"请对下面内容做整体总结:\n{summary_input}"}
]
summary = self.llm.chat(messages, scene="summary", agent="summary_agent")
state["final_summary"] = summary
return state
四、把节点连成一张图(Graph)
现在我们有了 4 个节点,接下来把它们用有向边连起来:
graph = Graph()
# 注册节点
graph.add_node(RouterNode(llm_client), is_start=True)
graph.add_node(RetrieveNode(retriever))
graph.add_node(AnswerNode(llm_client))
graph.add_node(SummaryNode(llm_client))
# 连边:Router → Retrieve
graph.add_edge("router", "retrieve")
# Retrieve → Answer
graph.add_edge("retrieve", "answer")
# Answer → Summary(仅当意图是 summary 时才走)
def need_summary(state):
return state.get("intent") == "summary"
graph.add_edge("answer", "summary", condition=need_summary)
执行一次完整流程:
init_state = {"user_query": "请根据文档内容帮我总结一下这套系统的核心功能"}
final_state = graph.run(init_state)
print("中间回答:", final_state.get("answer"))
print("最终总结:", final_state.get("final_summary"))
你会得到一个「图式执行」的完整链路,而不是一长串 if/else。
五、与多智能体中台结合:每个 Agent 变成一个节点
在你现有的多智能体中台里,每个 Agent 已经是一个类,比如 QAAgent、ToolAgent、ReportAgent 等。
你可以很自然地定义一个「AgentNode」,把任意 Agent 包装成图节点:
class AgentNode(Node):
def __init__(self, agent):
super().__init__(agent.meta()["name"]) # 假设每个 agent 有 meta
self.agent = agent
def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
# 把 state 映射成 agent 需要的 task/context 结构
task = {
"payload": {
"message": state["user_query"],
"state": state
}
}
result = self.agent.handle(task, state)
# 合并结果到全局 state
state.update(result.get("context_updates", {}))
if "answer" in result:
state["answer"] = result["answer"]
return state
这样你可以:
- 用一张 Graph 描述多个 Agent 的协作顺序;
- 按节点级粒度加入监控、日志、超时控制;
- 在图中插入「人类审核节点」、「工具节点」等各种能力。
六、如何扩展:并行执行与子图(Subgraph)
上面的 Graph.run 是顺序执行的,如果想要并行跑分支,可以做一个简单扩展:
-
当
_get_next_nodes返回多个目标节点时,不是取第一个,而是:- 为每个目标节点起一个子协程/线程/任务;
- 收集各子任务的结果合并回 state。
-
对于一个复杂任务,可以定义一个
SubgraphNode,内部再持有一张子图:
class SubgraphNode(Node):
def __init__(self, name: str, subgraph: Graph):
super().__init__(name)
self.subgraph = subgraph
def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
# 把当前 state 作为子图的初始 state
sub_state = self.subgraph.run(state)
# 子图执行完后的状态合并回上层
state.update(sub_state)
return state
这样,大型流程可以拆成若干「子图」,每个子图负责一个子任务(比如「数据采集子图」「文档分析子图」「报告生成子图」),整体结构就会非常清晰。
七、监控与调试:按节点记录 Trace
有了图式结构,做监控就简单很多:
- 在
Graph.run中,每次进入节点前后记录:- 节点名称;
- 执行开始/结束时间;
- 输入/输出大小;
- 是否抛异常;
示例(只展示关键部分):
import time
class Graph:
...
def run(self, init_state: Dict[str, Any]) -> Dict[str, Any]:
...
trace = []
while current is not None:
node = self.nodes[current]
start = time.time()
try:
state = node.run(state)
status = "success"
except Exception as e:
status = f"error:{e.__class__.__name__}"
# 也可以选择中断或跳过
raise
finally:
end = time.time()
trace.append({
"node": current,
"status": status,
"duration_ms": (end - start) * 1000
})
...
state["trace"] = trace
return state
这样,每次请求的 state["trace"] 就是一条完整的「执行路径」,你可以:
- 打到日志里排查问题;
- 接入监控系统画成「各节点耗时分布图」。
这和你之前做的大模型调用监控是互补关系:
一个看「LLM 维度」,一个看「工作流节点维度」。
八、如何在现有项目里渐进接入?
给一个尽量务实的迁移路线:
-
选一个最简单的流程先图式化
- 比如「RAG 问答」:Router → Retrieve → Answer
- 用这篇的
Graph+ 几个Node先跑起来。
-
把已有的 1–2 个 Agent 包装成 Node
- 用
AgentNode包装QAAgent/PolicyAgent之类; - 在 Graph 里替换原来的函数调用。
- 用
-
逐步把复杂逻辑从 if/else 拆到图上
- 把分支条件写成 Edge 的
condition; - 把多步流程拆成多个节点,而不是一个超级函数。
- 把分支条件写成 Edge 的
-
最后再考虑并行执行、子图、可视化编辑器
- 并行执行可以先用线程池/协程实现;
- 可视化编辑可以后面用 JSON/YAML 描述图结构,再渲染成前端流程图。
做到这里,你的多智能体系统就从:
「一堆 Agent 互相调用的脚本」
升级成了:
「一张可以清晰画出来、可以动态调整的有向图工作流」,
既容易维护,又更适合做后续的自动化测试、回溯和优化。
更多推荐

所有评论(0)