LangChain 工作流(Workflows)
将 LLM 应用的核心逻辑(模型调用、工具执行、分支判断、记忆管理、检索增强)抽象为节点(Node) 和边(Edge),通过可视化的图结构定义执行流程,支持循环、条件分支、多智能体协作等复杂逻辑。# 定义工作流状态user_input: str # 用户输入(如“查询2025Q4销售额并分析”)quarter: str # 提取的季度参数tool_results: dict # 工具执行结果ana
一、简介
LangChain 1.0 的工作流(Workflows)是为了实现复杂 LLM 应用编排的核心能力 —— 通过可视化、可复用、可调试的方式,将模型、工具、记忆、检索等组件串联成生产级应用。
1.1 核心定义
LangChain 1.0 的工作流是基于 LangGraph 构建的有状态、可中断、可重试的应用编排体系,核心特征是:
- 将 LLM 应用的核心逻辑(模型调用、工具执行、分支判断、记忆管理、检索增强)抽象为节点(Node) 和边(Edge),通过可视化的图结构定义执行流程,支持循环、条件分支、多智能体协作等复杂逻辑。
1.2 核心定位
从线性链到智能流程图,用 LangGraph 构建可控AI流程,让你从“写代码”升级为“画流程图”,用节点和边来编排复杂的AI逻辑。旧版 Chain 仅用于极简线性场景(且逐步被 LCEL 链式调用替代)。
| 维度 | 旧版 Chain(0.x) | 1.0 工作流(LangGraph) |
|---|---|---|
| 核心形态 | 线性链式调用 | 有向图(支持分支 / 循环 / 并行) |
| 状态管理 | 无原生状态,需手动维护 | 原生 Checkpoint 状态管理(可中断 / 恢复) |
| 可调试性 | 弱(线性日志,难定位问题) | 强(可视化图、节点级日志、断点调试) |
| 复杂逻辑支持 | 弱(仅简单串联) | 强(多智能体、条件分支、工具重试) |
| 生产级特性 | 无(无重试 / 兜底 / 并发) | 全支持(重试策略、异常兜底、异步并发) |
| 可视化 | 无 | 原生支持(LangGraph UI) |
1.3 核心价值
- 可视化编排:用图结构替代硬编码,降低复杂逻辑开发成本;
- 状态持久化:Checkpoint 支持中断 / 恢复,适配长时任务;
- 可复用性:节点可独立封装、跨工作流复用;
- 可调试性:节点级日志、可视化执行轨迹,快速定位问题;
- 生产级鲁棒性:内置重试、兜底、并发控制,适配高可用场景。
二、工作流核心架构(LangGraph)
LangChain 1.0 工作流的核心是 LangGraph,其核心优势是支持循环、分支、状态管理,且原生支持可视化 —— 能把复杂的智能体 / 多步骤流程转化为直观的图形,方便调试和理解。
2.1 核心概念
| 组件 | 作用 | 示例 |
|---|---|---|
| State | 工作流全局状态(数据载体) | 包含用户输入、工具结果、模型输出、上下文 |
| Node | 工作流节点(执行单元) | 模型调用节点、工具执行节点、判断节点 |
| Edge | 节点间的边(流程走向) | 线性边(直接跳转)、条件边(分支判断) |
| Checkpoint | 状态快照(可中断 / 恢复) | 保存当前节点、状态数据,支持重启 |
| Agent | 智能体节点(决策 + 执行) | 内置思考 - 行动循环的智能体节点 |
| ToolNode | 工具执行节点 | 封装工具调用逻辑,简化工具执行 |
2.2 状态(State)设计
State 是工作流的「数据总线」,所有节点的输入 / 输出均通过 State 传递,LangChain 1.0 提供 TypedDict 风格的强类型 State 定义:
from langgraph.graph import StateGraph, START, END
from langgraph.graph.state import StateGraph
from typing import TypedDict, List
# 定义工作流状态(强类型)
class WorkflowState(TypedDict):
user_input: str # 用户输入
intermediate_steps: List[str] # 中间步骤
tool_results: dict # 工具执行结果
final_answer: str # 最终答案
need_retry: bool # 是否需要重试
2.3 节点(Node)分类与实现
节点是工作流的「执行单元」,分为三类核心节点:
-
功能节点(Function Node)
最基础的节点,封装任意自定义逻辑(模型调用、工具执行、数据处理):# 1. 模型调用节点 def llm_node(state: WorkflowState): from langchain_openai import ChatOpenAI llm = ChatOpenAI(model="gpt-4o", temperature=0) # 调用模型 response = llm.invoke([("human", state["user_input"])]) # 更新状态 return { "intermediate_steps": state["intermediate_steps"] + [f"模型调用结果:{response.content}"], "final_answer": response.content } # 2. 工具执行节点 def tool_node(state: WorkflowState): from langchain_core.tools import tool @tool def query_sales(quarter: str) -> str: return "2025Q4销售额:1280万元" # 执行工具 result = query_sales.invoke({"quarter": "2025Q4"}) # 更新状态 return { "tool_results": {"sales": result}, "intermediate_steps": state["intermediate_steps"] + [f"工具执行结果:{result}"] } # 3. 条件判断节点(分支逻辑) def judge_node(state: WorkflowState): # 判断是否需要重试 if "失败" in state["tool_results"].get("sales", ""): return "retry" # 跳转到重试节点 else: return "end" # 跳转到结束节点 -
工具节点(ToolNode)
LangGraph 内置的工具执行节点,简化工具调用逻辑:from langgraph.prebuilt import ToolNode # 定义工具列表 tools = [query_sales] # 创建工具节点 tool_node = ToolNode(tools) -
智能体节点(AgentNode)
内置思考 - 行动循环的智能体节点,适配复杂决策场景:from langgraph.prebuilt import create_react_agent # 创建 ReAct 智能体节点 agent_node = create_react_agent(llm, tools)
2.4 边(Edge)设计
边定义节点间的执行流向,分为两类:
-
线性边(Direct Edge)
无条件跳转,适用于线性流程:# 从 START 到 tool_node graph.add_edge(START, "tool_node") # 从 tool_node 到 llm_node graph.add_edge("tool_node", "llm_node") # 从 llm_node 到 END graph.add_edge("llm_node", END) -
条件边(Conditional Edge)
根据节点输出动态跳转,适用于分支逻辑:# 从 judge_node 跳转到不同节点 graph.add_conditional_edges( "judge_node", # 源节点 judge_node, # 判断函数(返回目标节点名) { "retry": "tool_node", # 返回 "retry" 跳转到 tool_node "end": END # 返回 "end" 跳转到 END } )
三、工作流核心开发流程
以「销售数据查询 + 分析」工作流为例,完整展示 1.0 工作流的开发步骤:
步骤 1:定义状态(State)
from typing import TypedDict, List
from langgraph.graph import StateGraph, START, END
# 定义工作流状态
class SalesWorkflowState(TypedDict):
user_input: str # 用户输入(如“查询2025Q4销售额并分析”)
quarter: str # 提取的季度参数
tool_results: dict # 工具执行结果
analysis_result: str # 分析结果
need_retry: bool = False # 是否需要重试
步骤 2:定义核心节点
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
# 初始化模型
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# 节点1:提取季度参数
def extract_quarter_node(state: SalesWorkflowState):
# 调用模型提取参数
prompt = f"从用户输入中提取季度参数(格式:2025Q4):{state['user_input']}"
quarter = llm.invoke([("human", prompt)]).content.strip()
return {"quarter": quarter}
# 节点2:查询销售数据(工具节点)
@tool
def query_sales_tool(quarter: str) -> str:
"""查询指定季度销售额"""
return f"{quarter}销售额:1280万元,同比增长15.3%"
# 节点3:分析销售数据
def analyze_sales_node(state: SalesWorkflowState):
prompt = f"分析销售数据:{state['tool_results']['query_sales_tool']}"
analysis = llm.invoke([("human", prompt)]).content
return {"analysis_result": analysis}
# 节点4:判断是否需要重试
def judge_retry_node(state: SalesWorkflowState):
if not state["quarter"]:
return "retry_extract" # 季度提取失败,重试
elif not state["tool_results"]:
return "retry_query" # 工具执行失败,重试
else:
return "end" # 成功,结束
步骤 3:构建工作流图
from langgraph.prebuilt import ToolNode
# 1. 创建状态图
graph = StateGraph(SalesWorkflowState)
# 2. 添加节点
graph.add_node("extract_quarter", extract_quarter_node)
graph.add_node("query_sales", ToolNode([query_sales_tool]))
graph.add_node("analyze_sales", analyze_sales_node)
graph.add_node("judge_retry", judge_retry_node)
# 3. 添加边(定义流程)
# 初始流程:START → 提取季度 → 判断重试
graph.add_edge(START, "extract_quarter")
graph.add_edge("extract_quarter", "judge_retry")
# 条件边:根据判断结果跳转
graph.add_conditional_edges(
"judge_retry",
judge_retry_node,
{
"retry_extract": "extract_quarter", # 重试提取季度
"retry_query": "query_sales", # 重试查询数据
"end": "analyze_sales" # 成功则分析数据
}
)
# 最终流程:分析数据 → END
graph.add_edge("query_sales", "analyze_sales")
graph.add_edge("analyze_sales", END)
# 4. 编译工作流(开启Checkpoint支持状态持久化)
app = graph.compile(checkpointer=None) # 生产环境需指定checkpointer(如Redis)
步骤 4:运行工作流
# 输入初始状态
initial_state = {
"user_input": "查询2025Q4的销售额并做简单分析"
}
# 运行工作流
result = app.invoke(initial_state)
# 输出结果
print(f"季度:{result['quarter']}")
print(f"工具结果:{result['tool_results']}")
print(f"分析结果:{result['analysis_result']}")
步骤 5:可视化工作流
LangGraph 提供 3 种可视化方式:
- Mermaid 文本:生成可复制到在线工具的 Mermaid 代码(最通用);
- HTML 可视化:生成交互式 HTML 页面(可本地打开,支持缩放 / 拖拽);
- Jupyter 可视化:在 Notebook 中直接渲染(适合数据分析场景)。
-
生成 Mermaid 代码
Mermaid 是通用的流程图语法,可复制到 Mermaid Live Editor 在线渲染:# 生成 Mermaid 代码 mermaid_code = app.get_graph().draw_mermaid() print("=== Mermaid 可视化代码 ===") print(mermaid_code) -
生成交互式 HTML 页面
LangGraph 支持生成 HTML 文件,本地打开即可交互(缩放、拖拽、查看节点详情):# 生成 HTML 可视化文件 html_content = app.get_graph().draw_html() # 保存为 HTML 文件 with open("langgraph_workflow.html", "w", encoding="utf-8") as f: f.write(html_content) print("HTML 可视化文件已保存:langgraph_workflow.html") -
Jupyter Notebook 中直接渲染
如果用 Jupyter 环境,可直接渲染流程图(无需保存文件):# 在 Jupyter 中渲染(需安装 ipywidgets) from IPython.display import display display(app.get_graph().draw_widget())
四、工作流高级特性
4.1 状态管理高级特性
4.1.1 可中断、可恢复(Checkpoint / Checkpointer)
- 特性说明:工作流执行中可暂停,重启后从断点继续,支持长任务 / 多轮对话 / 人工介入。
- 核心价值:解决服务重启、崩溃导致的流程中断问题,是生产级应用必备。
from langgraph.graph import StateGraph, START, END
from langchain_community.checkpoint.sqlite import SqliteCheckpointSaver
from typing import TypedDict
# 1. 定义状态
class WorkflowState(TypedDict):
user_input: str
step: str # 记录当前执行步骤
result: str
# 2. 初始化Checkpoint(持久化到SQLite)
checkpointer = SqliteCheckpointSaver.from_conn_string("checkpoint.db")
# 3. 定义节点
def step1(state: WorkflowState):
print("执行步骤1:处理用户输入")
return {"step": "step1", "result": f"处理完成:{state['user_input']}"}
def step2(state: WorkflowState):
print("执行步骤2:生成结果")
return {"step": "step2", "result": f"最终结果:{state['user_input']} - 已完成"}
# 4. 构建工作流
graph = StateGraph(WorkflowState)
graph.add_node("step1", step1)
graph.add_node("step2", step2)
graph.add_edge(START, "step1")
graph.add_edge("step1", "step2")
graph.add_edge("step2", END)
# 5. 编译时绑定Checkpoint(核心)
app = graph.compile(checkpointer=checkpointer)
# 6. 第一次运行(执行到step1后中断)
config = {"configurable": {"thread_id": "user_123"}} # 会话唯一标识
initial_state = {"user_input": "测试可中断工作流"}
# 执行step1(手动中断演示)
app.invoke(initial_state, config=config, stop_after=["step1"])
# 7. 恢复执行(从step1继续到结束)
resumed_result = app.invoke({}, config=config)
print("恢复执行结果:", resumed_result)
关键说明:
- checkpointer 支持 Redis/MongoDB/SQLite 等,生产推荐 Redis;
- thread_id 区分不同用户 / 会话,保证状态隔离;
- stop_after 手动指定中断节点,模拟异常 / 人工暂停场景。
4.1.2 状态增量更新
- 特性说明:节点仅返回需要更新的字段,LangGraph 自动合并到全局状态,无需全量赋值。
- 核心价值:简化节点逻辑,避免状态覆盖丢失。
# 基于上面的State定义
def step1_incremental(state: WorkflowState):
# 仅返回需要更新的字段(无需包含user_input)
return {"step": "step1"}
def step2_incremental(state: WorkflowState):
# 仅更新result字段,保留step和user_input
return {"result": f"增量更新结果:{state['user_input']}"}
# 构建并运行
graph = StateGraph(WorkflowState)
graph.add_node("step1", step1_incremental)
graph.add_node("step2", step2_incremental)
graph.add_edge(START, "step1")
graph.add_edge("step1", "step2")
graph.add_edge("step2", END)
app = graph.compile()
result = app.invoke({"user_input": "测试增量更新"})
print("最终状态:", result) # 包含user_input/step/result所有字段
4.1.3 消息自动合并(Message Graph)
- 特性说明:对 messages 列表字段自动追加 / 去重,无需手动拼接多轮对话上下文。
- 核心价值:简化多轮对话 / 工具调用的上下文管理。
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import HumanMessage, AIMessage
from typing import Annotated, List
from langgraph.graph import add_messages
# 1. 定义带消息合并的状态
class ChatState(TypedDict):
# add_messages 标记自动合并消息列表
messages: Annotated[List, add_messages]
# 2. 定义聊天节点
def chat_node(state: ChatState):
last_msg = state["messages"][-1].content
return {"messages": [AIMessage(content=f"回复:{last_msg}")]}
# 3. 构建工作流
graph = StateGraph(ChatState)
graph.add_node("chat", chat_node)
graph.add_edge(START, "chat")
graph.add_edge("chat", END)
app = graph.compile()
# 4. 多轮调用(消息自动追加)
result1 = app.invoke({"messages": [HumanMessage(content="你好")]})
result2 = app.invoke({"messages": [HumanMessage(content="再聊下工作流")]},
config={"configurable": {"thread_id": "chat_123"}})
print("所有消息:", [msg.content for msg in result2["messages"]])
4.2 流程控制高级特性
4.2.1 条件分支(Conditional Edge)
- 特性说明:根据节点输出动态决定下一步执行路径,支持分支逻辑(核心的 Agent 思考 - 行动循环依赖此特性)。
- 核心价值:替代硬编码的 if-else,实现灵活的流程分支。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class BranchState(TypedDict):
user_input: str
need_tool: bool # 是否需要调用工具
result: str
# 1. 定义节点
def judge_node(state: BranchState):
# 判断是否需要调用工具
if "查询" in state["user_input"]:
return "call_tool" # 分支1:调用工具
else:
return "direct_answer" # 分支2:直接回答
def tool_node(state: BranchState):
return {"result": "工具调用结果:2025Q4销售额1280万", "need_tool": False}
def answer_node(state: BranchState):
return {"result": f"直接回答:{state['user_input']}"}
# 2. 构建工作流
graph = StateGraph(BranchState)
graph.add_node("judge", judge_node)
graph.add_node("tool", tool_node)
graph.add_node("answer", answer_node)
# 3. 添加条件边(核心)
graph.add_edge(START, "judge")
graph.add_conditional_edges(
"judge", # 源节点
judge_node, # 分支判断函数
{
"call_tool": "tool", # 返回"call_tool"跳转到tool节点
"direct_answer": "answer" # 返回"direct_answer"跳转到answer节点
}
)
graph.add_edge("tool", END)
graph.add_edge("answer", END)
app = graph.compile()
# 测试分支1:调用工具
result1 = app.invoke({"user_input": "查询2025Q4销售额"})
print("分支1结果:", result1["result"])
# 测试分支2:直接回答
result2 = app.invoke({"user_input": "介绍下LangChain工作流"})
print("分支2结果:", result2["result"])
4.2.1 循环执行(Loop)
- 特性说明:支持流程循环执行(如工具调用重试、多轮思考),是 ReAct Agent 的核心。
- 核心价值:实现 “思考→行动→观察→再思考” 的循环逻辑。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class LoopState(TypedDict):
count: int
result: str
# 1. 定义节点
def loop_node(state: LoopState):
new_count = state["count"] + 1
return {"count": new_count}
def judge_loop(state: LoopState):
# 循环3次后结束
if state["count"] < 3:
return "loop" # 继续循环
else:
return "end" # 结束循环
# 2. 构建工作流
graph = StateGraph(LoopState)
graph.add_node("loop", loop_node)
graph.add_node("judge", judge_loop)
# 3. 添加循环边
graph.add_edge(START, "loop")
graph.add_edge("loop", "judge")
graph.add_conditional_edges(
"judge",
judge_loop,
{
"loop": "loop", # 循环回到loop节点
"end": END
}
)
app = graph.compile()
# 运行循环
result = app.invoke({"count": 0})
print("循环结果:", result) # count=3,循环3次后结束
4.2.3 节点中断(NodeInterrupt)
- 特性说明:在节点中主动抛出中断异常,暂停工作流,支持人工审核 / 外部回调场景。
- 核心价值:实现 “机器执行 + 人工介入” 的混合流程。
from langgraph.graph import StateGraph, START, END
from langgraph.errors import NodeInterrupt
from typing import TypedDict
class InterruptState(TypedDict):
user_input: str
need_audit: bool
audit_result: str
# 1. 定义节点
def check_audit(state: InterruptState):
# 敏感内容需要人工审核
if "敏感信息" in state["user_input"]:
# 抛出中断异常,暂停工作流
raise NodeInterrupt("需要人工审核敏感内容")
return {"need_audit": False}
def generate_result(state: InterruptState):
return {"result": f"生成结果:{state['user_input']}"}
# 2. 构建工作流
graph = StateGraph(InterruptState)
graph.add_node("check", check_audit)
graph.add_node("generate", generate_result)
graph.add_edge(START, "check")
graph.add_edge("check", "generate")
graph.add_edge("generate", END)
app = graph.compile()
# 测试中断
try:
app.invoke({"user_input": "包含敏感信息的请求"})
except NodeInterrupt as e:
print("工作流中断:", e)
# 人工审核后恢复执行
resumed_result = app.invoke(
{"user_input": "包含敏感信息的请求", "audit_result": "审核通过"},
config={"configurable": {"thread_id": "audit_123"}}
)
print("恢复执行结果:", resumed_result)
4.2.4 并行执行(Parallel Node)
- 特性说明:同时执行多个节点,提升流程效率(如并行调用多个工具、并行检索)。
- 核心价值:减少流程总耗时,适配高并发场景。
from langgraph.graph import StateGraph, START, END, create_parallel_node
from typing import TypedDict
class ParallelState(TypedDict):
tool1_result: str
tool2_result: str
merged_result: str
# 1. 定义并行节点
def tool1_node(state: ParallelState):
return {"tool1_result": "工具1结果"}
def tool2_node(state: ParallelState):
return {"tool2_result": "工具2结果"}
# 2. 合并并行结果
def merge_node(state: ParallelState):
return {"merged_result": f"{state['tool1_result']} + {state['tool2_result']}"}
# 3. 构建并行工作流
graph = StateGraph(ParallelState)
# 创建并行节点(同时执行tool1和tool2)
parallel_node = create_parallel_node(
graph,
{"tool1": tool1_node, "tool2": tool2_node}
)
graph.add_node("parallel", parallel_node)
graph.add_node("merge", merge_node)
# 4. 定义流程
graph.add_edge(START, "parallel")
graph.add_edge("parallel", "merge")
graph.add_edge("merge", END)
app = graph.compile()
# 运行并行流程
result = app.invoke({})
print("并行执行结果:", result["merged_result"])
4.3 调试 / 观测高级特性
4.3.1 工作流可视化
- 特性说明:将工作流导出为 Mermaid 图形(可可视化),支持 PNG 导出。
- 核心价值:直观展示流程结构,便于调试和团队协作。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class VisState(TypedDict):
input: str
def node1(state: VisState):
return {"input": state["input"] + "_node1"}
def node2(state: VisState):
return {"input": state["input"] + "_node2"}
# 构建工作流
graph = StateGraph(VisState)
graph.add_node("node1", node1)
graph.add_node("node2", node2)
graph.add_edge(START, "node1")
graph.add_edge("node1", "node2")
graph.add_edge("node2", END)
app = graph.compile()
# 1. 导出Mermaid格式(可在Mermaid编辑器中可视化)
mermaid_code = app.get_graph().draw_mermaid()
print("Mermaid可视化代码:")
print(mermaid_code)
# 2. 导出PNG图片(需安装pygraphviz)
# app.get_graph().draw_png("workflow.png")
4.3.2 流式输出整个工作流
- 特性说明:实时流式输出工作流的节点执行、状态变化、模型结果,支持前端打字机效果。
- 核心价值:提升用户体验,实时展示 AI 思考 / 执行过程。
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from typing import TypedDict
class StreamState(TypedDict):
question: str
answer: str
# 1. 初始化模型
llm = ChatOpenAI(model="gpt-4o", streaming=True)
# 2. 定义流式节点
def stream_answer_node(state: StreamState):
# 流式生成回答
response = ""
for chunk in llm.stream([("human", state["question"])]):
response += chunk.content
# 实时返回流式结果
yield {"answer": response}
# 3. 构建工作流
graph = StateGraph(StreamState)
graph.add_node("stream_answer", stream_answer_node)
graph.add_edge(START, "stream_answer")
graph.add_edge("stream_answer", END)
app = graph.compile()
# 4. 流式运行工作流
print("流式输出:", end="")
for chunk in app.stream({"question": "详细介绍LangChain 1.0工作流"}):
print(chunk["answer"][-10:], end="", flush=True) # 打印最后10个字符
4.4 生产级稳定性特性
4.4.1 重试机制
- 特性说明:为节点添加重试逻辑,处理工具调用 / 模型调用失败场景。
- 核心价值:提升工作流鲁棒性,减少临时故障导致的失败。
from langgraph.graph import StateGraph, START, END
from tenacity import retry, stop_after_attempt, wait_exponential
from typing import TypedDict
class RetryState(TypedDict):
input: str
result: str
# 1. 定义带重试的节点
@retry(
stop=stop_after_attempt(3), # 最多重试3次
wait=wait_exponential(multiplier=1, min=1, max=5) # 指数退避等待
)
def unreliable_node(state: RetryState):
# 模拟随机失败
import random
if random.random() < 0.7:
raise Exception("节点执行失败")
return {"result": f"成功:{state['input']}"}
# 2. 构建工作流
graph = StateGraph(RetryState)
graph.add_node("unreliable", unreliable_node)
graph.add_edge(START, "unreliable")
graph.add_edge("unreliable", END)
app = graph.compile()
# 运行带重试的节点
result = app.invoke({"input": "测试重试机制"})
print("重试结果:", result["result"])
4.4.2 异步执行
- 特性说明:支持异步调用工作流,适配 FastAPI / 异步框架,提升并发能力。
- 核心价值:解决同步调用的性能瓶颈,支持高并发场景。
import asyncio
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class AsyncState(TypedDict):
input: str
result: str
# 1. 定义异步节点
async def async_node(state: AsyncState):
# 模拟异步操作(如异步调用模型/工具)
await asyncio.sleep(1)
return {"result": f"异步结果:{state['input']}"}
# 2. 构建工作流
graph = StateGraph(AsyncState)
graph.add_node("async_node", async_node)
graph.add_edge(START, "async_node")
graph.add_edge("async_node", END)
app = graph.compile()
# 3. 异步调用工作流
async def run_async_workflow():
result = await app.ainvoke({"input": "测试异步执行"})
print("异步执行结果:", result["result"])
# 批量异步调用
batch_results = await app.abatch([
{"input": "批量1"},
{"input": "批量2"}
])
print("批量异步结果:", [r["result"] for r in batch_results])
asyncio.run(run_async_workflow())
4.4.3 中间件(Middleware)
- 特性说明:通过中间件实现日志、监控、脱敏等横切功能,不侵入业务逻辑。
- 核心价值:解耦业务与非业务逻辑,提升代码可维护性。
from langgraph.graph import StateGraph, START, END
from langchain_core.middleware import BaseMiddleware
from typing import TypedDict
class MiddlewareState(TypedDict):
input: str
result: str
# 1. 定义日志中间件
class LogMiddleware(BaseMiddleware):
def invoke(self, input, config, next):
# 前置处理:记录输入
print(f"节点 {config['node']} 输入:{input}")
# 执行原节点逻辑
output = next(input, config)
# 后置处理:记录输出
print(f"节点 {config['node']} 输出:{output}")
return output
# 2. 定义业务节点
def business_node(state: MiddlewareState):
return {"result": f"业务结果:{state['input']}"}
# 3. 构建工作流并绑定中间件
graph = StateGraph(MiddlewareState)
graph.add_node("business", business_node)
graph.add_edge(START, "business")
graph.add_edge("business", END)
# 绑定中间件
app = graph.compile(middlewares=[LogMiddleware()])
# 运行工作流(中间件自动记录日志)
app.invoke({"input": "测试中间件"})
4.5 架构扩展高级特性
4.5.1 子图模块化(Graph as Node)
- 特性说明:将一个完整工作流作为子图嵌入到更大的工作流中,实现模块化开发。
- 核心价值:支持大型应用的分层架构(如 RAG 子图、工具调用子图)。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
# 1. 定义子图状态
class SubGraphState(TypedDict):
sub_input: str
sub_result: str
# 2. 构建子图(独立的小工作流)
sub_graph = StateGraph(SubGraphState)
def sub_node(state: SubGraphState):
return {"sub_result": f"子图处理:{state['sub_input']}"}
sub_graph.add_node("sub_node", sub_node)
sub_graph.add_edge(START, "sub_node")
sub_graph.add_edge("sub_node", END)
sub_app = sub_graph.compile()
# 3. 定义主工作流状态
class MainGraphState(TypedDict):
main_input: str
main_result: str
sub_result: str
# 4. 主工作流中调用子图
def call_subgraph_node(state: MainGraphState):
# 调用子图
sub_result = sub_app.invoke({"sub_input": state["main_input"]})
return {"sub_result": sub_result["sub_result"]}
def main_node(state: MainGraphState):
return {"main_result": f"主图合并:{state['sub_result']}"}
# 5. 构建主工作流
main_graph = StateGraph(MainGraphState)
main_graph.add_node("call_subgraph", call_subgraph_node)
main_graph.add_node("main_node", main_node)
main_graph.add_edge(START, "call_subgraph")
main_graph.add_edge("call_subgraph", "main_node")
main_graph.add_edge("main_node", END)
main_app = main_graph.compile()
# 运行主工作流
result = main_app.invoke({"main_input": "测试子图模块化"})
print("主工作流结果:", result["main_result"])
4.5.2 多智能体协作
- 特性说明:支持多智能体分工协作(如 Supervisor + Worker 架构),实现复杂任务拆解。
- 核心价值:解决单一智能体无法处理的复杂任务,提升任务处理能力。
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
from typing import TypedDict
# 1. 初始化模型
llm = ChatOpenAI(model="gpt-4o")
# 2. 定义工具
@tool
def sales_tool(quarter: str) -> str:
"""查询销售额"""
return f"{quarter}销售额:1280万"
@tool
def analysis_tool(data: str) -> str:
"""分析销售数据"""
return f"分析结果:{data},同比增长15.3%"
# 3. 定义多智能体状态
class MultiAgentState(TypedDict):
user_input: str
sales_data: str
analysis_result: str
# 4. 创建Worker智能体
sales_agent = create_react_agent(llm, [sales_tool], name="sales_agent")
analysis_agent = create_react_agent(llm, [analysis_tool], name="analysis_agent")
# 5. 定义智能体节点
def sales_agent_node(state: MultiAgentState):
result = sales_agent.invoke({"input": state["user_input"]})
return {"sales_data": result["output"]}
def analysis_agent_node(state: MultiAgentState):
result = analysis_agent.invoke({"input": state["sales_data"]})
return {"analysis_result": result["output"]}
# 6. 构建多智能体工作流
graph = StateGraph(MultiAgentState)
graph.add_node("sales_agent", sales_agent_node)
graph.add_node("analysis_agent", analysis_agent_node)
graph.add_edge(START, "sales_agent")
graph.add_edge("sales_agent", "analysis_agent")
graph.add_edge("analysis_agent", END)
app = graph.compile()
# 运行多智能体工作流
result = app.invoke({"user_input": "查询2025Q4销售额并分析"})
print("多智能体结果:", result["analysis_result"])
五、完整实战——多智能体协作工作流
构建一个复杂的例子:研究助手团队,包含研究员、分析师、写作者三个智能体协同工作。
- 多智能体分工:研究员→分析师→写作者→编辑
- 迭代优化:编辑可以退回给写作者修改
- 条件判断:直到质量达标才结束
from typing import TypedDict, List
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
# 定义状态
class ResearchState(TypedDict):
topic: str
research_notes: str
analysis: str
draft: str
final_report: str
# 初始化模型
model = ChatOpenAI(model="gpt-4o")
# 节点1:研究员
def researcher(state: ResearchState):
prompt = f"""你是一个研究员,请对以下主题进行深入研究:
主题:{state['topic']}
请提供:
1. 关键事实和数据
2. 主要观点和争议
3. 相关引用来源
"""
response = model.invoke([HumanMessage(content=prompt)])
return {"research_notes": response.content}
# 节点2:分析师
def analyst(state: ResearchState):
prompt = f"""你是一个分析师,请基于以下研究笔记进行分析:
{state['research_notes']}
请提供:
1. 核心洞察
2. 趋势分析
3. 优缺点评估
"""
response = model.invoke([HumanMessage(content=prompt)])
return {"analysis": response.content}
# 节点3:写作者
def writer(state: ResearchState):
prompt = f"""你是一个专业写作者,请基于研究和分析撰写最终报告:
研究笔记:{state['research_notes']}
分析:{state['analysis']}
请用正式、清晰的报告格式输出,包含:
- 执行摘要
- 主要发现
- 结论和建议
"""
response = model.invoke([HumanMessage(content=prompt)])
return {"draft": response.content}
# 节点4:编辑(质检)
def editor(state: ResearchState):
prompt = f"""你是一个编辑,请审阅以下报告草稿并提出修改建议:
{state['draft']}
如果质量合格,输出"QUALITY_PASS"加上最终版本;
如果需要改进,输出具体修改意见。
"""
response = model.invoke([HumanMessage(content=prompt)])
if "QUALITY_PASS" in response.content:
final = response.content.replace("QUALITY_PASS", "").strip()
return {"final_report": final}
else:
# 如果不合格,把修改意见返回给写作者(通过状态)
return {"draft": f"修改意见:{response.content}\n\n请根据以上意见修改。"}
# 条件判断函数
def check_quality(state: ResearchState):
"""判断是否需要继续迭代"""
if "final_report" in state and state["final_report"]:
return "end"
else:
return "revise"
# 构建图
builder = StateGraph(ResearchState)
builder.add_node("researcher", researcher)
builder.add_node("analyst", analyst)
builder.add_node("writer", writer)
builder.add_node("editor", editor)
builder.set_entry_point("researcher")
builder.add_edge("researcher", "analyst")
builder.add_edge("analyst", "writer")
builder.add_edge("writer", "editor")
# 条件边:如果质检通过就结束,否则回写作者重写
builder.add_conditional_edges(
"editor",
check_quality,
{
"end": END,
"revise": "writer" # 回到writer节点继续修改
}
)
research_app = builder.compile()
# 执行
result = research_app.invoke({"topic": "人工智能在医疗领域的应用"})
print(result["final_report"])
六、总结
6.1 核心关键点
- LangChain 1.0 工作流核心是 LangGraph:基于有向图的编排体系,替代旧版线性 Chain,支持分支、循环、多智能体等复杂逻辑;
- 核心组件:State(状态)、Node(节点)、Edge(边)、Checkpoint(状态持久化);
- 开发流程:定义状态 → 实现节点 → 编排图结构 → 编译运行 → 可视化调试;
- 核心特性:状态持久化、异步执行、异常重试、多智能体协作、可视化;
- 生产级关键:开启 Checkpoint、节点单一职责、流程简化、添加重试 / 兜底。
6.2 核心应用场景
| 场景 | 工作流设计要点 |
|---|---|
| RAG 检索增强生成 | 拆分「文档加载→切分→嵌入→检索→生成」节点,支持检索重试 |
| 智能体工具调用 | 「参数提取→工具执行→结果处理→模型生成」分支流程,支持工具重试 |
| 多智能体协作 | 按职责拆分智能体节点,定义节点间数据传递规则 |
| 长文本生成 | 「分段生成→拼接→校验」循环流程,支持断点续生成 |
| 数据分析 | 「数据查询→清洗→分析→可视化」线性流程,支持异常兜底 |
6.3 生产级最佳实践
- 状态设计原则:
- 仅存储核心数据(避免状态过大);
- 使用强类型 TypedDict 定义状态(提升可维护性);
- 状态字段命名统一(如 user_input/tool_results)。
- 节点设计原则:
- 单一职责(一个节点只做一件事);
- 节点逻辑可复用(封装为独立函数);
- 节点输出仅更新状态(不做全局操作)。
- 流程设计原则:
- 避免复杂嵌套分支(降低调试成本);
- 关键节点添加重试 / 兜底;
- 开启 Checkpoint 支持状态持久化。
- 性能优化:
- 异步执行高耗时节点(如工具调用、模型生成);
- 批量处理重复节点调用;
- 缓存高频节点结果(如嵌入生成)。
6.4 常见问题与解决方案
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 工作流死循环 | 条件边逻辑错误 | 增加最大循环次数限制、完善判断逻辑 |
| 状态数据丢失 | 未开启 Checkpoint | 生产环境必须指定 checkpointer(Redis/MongoDB) |
| 节点执行超时 | 模型 / 工具响应慢 | 增加节点超时控制、开启重试 |
| 可视化失败 | 依赖缺失(pygraphviz) | 安装依赖或使用 Mermaid 格式可视化 |
| 多智能体数据冲突 | 状态字段未隔离 | 为不同智能体设置独立状态字段 |
更多推荐



所有评论(0)