LangGraph 状态图编排深度解析
LangGraph 是 LangChain 生态中用于构建有状态、多代理应用的框架。它基于图结构定义应用逻辑,支持循环、分支、持久化等高级特性。本文将深入解析 LangGraph 的核心概念、状态管理、节点与边、条件路由、人机协作等特性,帮助开发者构建复杂的生产级 AI 应用。
·
关于作者
- 深耕领域:大语言模型开发 / RAG 知识库 / AI Agent 落地 / 模型微调
- 技术栈:Python | RAG (LangChain / Dify + Milvus) | FastAPI + Docker
- 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案
「让 AI 交互更智能,让技术落地更高效」
欢迎技术探讨与项目合作,解锁大模型与智能交互的无限可能!
LangGraph 状态图编排深度解析
摘要
LangGraph 是 LangChain 生态中用于构建有状态、多代理应用的框架。它基于图结构定义应用逻辑,支持循环、分支、持久化等高级特性。本文将深入解析 LangGraph 的核心概念、状态管理、节点与边、条件路由、人机协作等特性,帮助开发者构建复杂的生产级 AI 应用。
一、LangGraph 概述
1.1 为什么需要 LangGraph
传统的 Chain 和 Agent 存在以下局限:
| 局限性 | 说明 | LangGraph 解决方案 |
|---|---|---|
| 无状态 | Chain 不保存状态 | 内置状态管理 |
| 线性流程 | 难以实现复杂控制流 | 图结构支持循环和分支 |
| 难以调试 | 执行过程不透明 | 可视化和检查支持 |
| 无法暂停 | 必须一次性执行完 | 支持断点和恢复 |
1.2 LangGraph 核心概念
1.3 Hello LangGraph
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
# 1. 定义状态
class State(TypedDict):
count: int
message: str
# 2. 定义节点函数
def increment(state: State) -> dict:
return {"count": state["count"] + 1}
def greet(state: State) -> dict:
return {"message": f"计数: {state['count']}"}
# 3. 构建图
graph = StateGraph(State)
graph.add_node("increment", increment)
graph.add_node("greet", greet)
# 4. 添加边
graph.add_edge(START, "increment")
graph.add_edge("increment", "greet")
graph.add_edge("greet", END)
# 5. 编译并运行
app = graph.compile()
result = app.invoke({"count": 0, "message": ""})
print(result) # {'count': 1, 'message': '计数: 1'}
二、状态管理
2.1 状态定义
from typing import TypedDict, Annotated
from langgraph.graph import add_messages
from langchain_core.messages import BaseMessage
# 基础状态定义
class BasicState(TypedDict):
count: int
name: str
data: list[str]
# 带消息的状态(对话场景)
class ChatState(TypedDict):
# add_messages 会自动合并消息列表
messages: Annotated[list[BaseMessage], add_messages]
user_id: str
# 嵌套状态
class NestedState(TypedDict):
user: dict[str, str]
session: dict[str, int]
context: dict[str, list]
2.2 状态更新
from typing import TypedDict
from langgraph.graph import StateGraph
class State(TypedDict):
count: int
items: list[str]
def node_a(state: State) -> dict:
# 返回要更新的字段(部分更新)
return {"count": state["count"] + 1}
def node_b(state: State) -> dict:
# 可以返回多个字段
return {
"count": state["count"] * 2,
"items": state["items"] + ["new_item"],
}
# 状态更新规则:
# 1. 返回的字典会与原状态合并
# 2. 未返回的字段保持不变
# 3. 列表类型默认替换,可用 Annotated 自定义合并
2.3 Reducer 函数
from typing import TypedDict, Annotated
from operator import add
# 自定义 Reducer
def merge_dicts(left: dict, right: dict) -> dict:
"""合并字典。"""
return {**left, **right}
class State(TypedDict):
# 使用 add 合并列表(而非替换)
items: Annotated[list[str], add]
# 自定义合并函数
metadata: Annotated[dict, merge_dicts]
# 普通字段(替换)
count: int
# 示例
def add_item(state: State) -> dict:
return {"items": ["new_item"]} # 会追加到现有列表
def update_meta(state: State) -> dict:
return {"metadata": {"key": "value"}} # 会合并到现有字典
三、节点与边
3.1 节点类型
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
value: int
# 1. 函数节点
def function_node(state: State) -> dict:
return {"value": state["value"] + 1}
# 2. Runnable 节点
from langchain_core.runnables import RunnableLambda
runnable_node = RunnableLambda(
lambda state: {"value": state["value"] * 2}
)
# 添加节点
graph = StateGraph(State)
graph.add_node("func", function_node)
graph.add_node("runnable", runnable_node)
3.2 边的类型
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
value: int
status: str
graph = StateGraph(State)
# 1. 普通边
graph.add_edge(START, "node_a")
graph.add_edge("node_a", "node_b")
# 2. 条件边
def route_by_value(state: State) -> Literal["high", "low"]:
if state["value"] > 50:
return "high"
return "low"
graph.add_conditional_edges(
"node_b",
route_by_value,
{
"high": "high_handler",
"low": "low_handler",
}
)
# 3. 循环边(回到之前的节点)
graph.add_edge("high_handler", "node_a") # 循环
graph.add_edge("low_handler", END)
3.3 完整示例
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
count: int
total: int
status: str
def increment(state: State) -> dict:
return {"count": state["count"] + 1}
def add_to_total(state: State) -> dict:
return {"total": state["total"] + state["count"]}
def should_continue(state: State) -> Literal["continue", "finish"]:
if state["count"] < 5:
return "continue"
return "finish"
def finish(state: State) -> dict:
return {"status": "completed"}
# 构建图
graph = StateGraph(State)
# 添加节点
graph.add_node("increment", increment)
graph.add_node("add_to_total", add_to_total)
graph.add_node("finish", finish)
# 添加边
graph.add_edge(START, "increment")
graph.add_edge("increment", "add_to_total")
# 条件边(循环)
graph.add_conditional_edges(
"add_to_total",
should_continue,
{
"continue": "increment", # 循环
"finish": "finish",
}
)
graph.add_edge("finish", END)
# 编译运行
app = graph.compile()
result = app.invoke({"count": 0, "total": 0, "status": ""})
print(result)
# {'count': 5, 'total': 15, 'status': 'completed'}
四、条件路由
4.1 基本条件路由
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
input: str
category: str
result: str
def classify(state: State) -> dict:
text = state["input"].lower()
if "代码" in text or "编程" in text:
return {"category": "tech"}
elif "天气" in text:
return {"category": "weather"}
else:
return {"category": "general"}
def tech_handler(state: State) -> dict:
return {"result": "技术问题回答"}
def weather_handler(state: State) -> dict:
return {"result": "天气信息"}
def general_handler(state: State) -> dict:
return {"result": "通用回答"}
def route(state: State) -> Literal["tech", "weather", "general"]:
return state["category"]
# 构建图
graph = StateGraph(State)
graph.add_node("classify", classify)
graph.add_node("tech", tech_handler)
graph.add_node("weather", weather_handler)
graph.add_node("general", general_handler)
graph.add_edge(START, "classify")
graph.add_conditional_edges(
"classify",
route,
{
"tech": "tech",
"weather": "weather",
"general": "general",
}
)
graph.add_edge("tech", END)
graph.add_edge("weather", END)
graph.add_edge("general", END)
app = graph.compile()
4.2 多条件组合
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
user_type: str # "vip", "normal", "guest"
request_type: str # "query", "write", "admin"
result: str
def check_permission(state: State) -> Literal["allowed", "denied", "upgrade"]:
user = state["user_type"]
request = state["request_type"]
if request == "admin":
if user == "vip":
return "allowed"
else:
return "denied"
elif request == "write":
if user in ("vip", "normal"):
return "allowed"
else:
return "upgrade"
else: # query
return "allowed"
def handle_allowed(state: State) -> dict:
return {"result": "操作已执行"}
def handle_denied(state: State) -> dict:
return {"result": "权限不足"}
def handle_upgrade(state: State) -> dict:
return {"result": "请升级账户"}
# 构建图
graph = StateGraph(State)
graph.add_node("allowed", handle_allowed)
graph.add_node("denied", handle_denied)
graph.add_node("upgrade", handle_upgrade)
graph.add_conditional_edges(
START,
check_permission,
{
"allowed": "allowed",
"denied": "denied",
"upgrade": "upgrade",
}
)
graph.add_edge("allowed", END)
graph.add_edge("denied", END)
graph.add_edge("upgrade", END)
五、工具调用集成
5.1 工具节点
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import BaseMessage
# 定义工具
@tool
def search(query: str) -> str:
"""搜索网络信息。"""
return f"搜索结果: {query}"
@tool
def calculator(expression: str) -> str:
"""计算数学表达式。"""
return str(eval(expression))
tools = [search, calculator]
# 定义状态
class State(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
# 创建模型和工具节点
llm = ChatOpenAI(model="gpt-4o").bind_tools(tools)
tool_node = ToolNode(tools)
# 定义节点函数
def chatbot(state: State) -> dict:
response = llm.invoke(state["messages"])
return {"messages": [response]}
def should_continue(state: State) -> str:
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
return "tools"
return END
# 构建图
from langgraph.graph.message import add_messages
graph = StateGraph(State)
graph.add_node("chatbot", chatbot)
graph.add_node("tools", tool_node)
graph.add_edge(START, "chatbot")
graph.add_conditional_edges("chatbot", should_continue)
graph.add_edge("tools", "chatbot")
app = graph.compile()
5.2 ReAct Agent 模式
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import BaseMessage
# 定义工具
@tool
def get_weather(city: str) -> str:
"""获取城市天气。"""
return f"{city} 天气晴,温度 25°C"
@tool
def get_time(timezone: str) -> str:
"""获取时区时间。"""
return f"当前时间: 14:00 ({timezone})"
tools = [get_weather, get_time]
# 使用预构建的 ReAct Agent
llm = ChatOpenAI(model="gpt-4o")
app = create_react_agent(llm, tools)
# 运行
result = app.invoke({
"messages": [{"role": "user", "content": "北京天气怎么样?现在几点了?"}]
})
六、人机协作
6.1 断点与恢复
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
class State(TypedDict):
input: str
approved: bool
result: str
def process(state: State) -> dict:
return {"result": f"处理: {state['input']}"}
def wait_for_approval(state: State) -> dict:
# 这个节点会等待人工批准
return {"approved": False}
# 构建图
graph = StateGraph(State)
graph.add_node("process", process)
graph.add_node("wait_approval", wait_for_approval)
graph.add_edge(START, "wait_approval")
graph.add_conditional_edges(
"wait_approval",
lambda s: "approved" if s["approved"] else END,
{"approved": "process", END: END}
)
graph.add_edge("process", END)
# 添加检查点(支持暂停/恢复)
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)
# 第一次运行(会暂停在 wait_approval)
config = {"configurable": {"thread_id": "user_123"}}
result1 = app.invoke({"input": "test", "approved": False, "result": ""}, config)
print(result1) # {'input': 'test', 'approved': False, 'result': ''}
# 恢复并提供批准
result2 = app.invoke({"input": "test", "approved": True, "result": ""}, config)
print(result2) # {'input': 'test', 'approved': True, 'result': '处理: test'}
6.2 人工审核节点
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import interrupt
class State(TypedDict):
draft: str
approved: bool
final: str
def generate_draft(state: State) -> dict:
return {"draft": "这是生成的草稿内容..."}
def human_review(state: State) -> dict:
# 暂停等待人工输入
user_input = interrupt("请审核草稿,输入 'approve' 批准或提供修改意见:")
if user_input == "approve":
return {"approved": True}
else:
return {
"approved": False,
"draft": f"{state['draft']}\n修改意见: {user_input}",
}
def finalize(state: State) -> dict:
return {"final": state["draft"]}
# 构建图
graph = StateGraph(State)
graph.add_node("generate", generate_draft)
graph.add_node("review", human_review)
graph.add_node("finalize", finalize)
graph.add_edge(START, "generate")
graph.add_edge("generate", "review")
graph.add_conditional_edges(
"review",
lambda s: "approved" if s["approved"] else "generate",
{"approved": "finalize", "generate": "generate"}
)
graph.add_edge("finalize", END)
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)
七、持久化与恢复
7.1 检查点机制
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
class State(TypedDict):
count: int
history: list[str]
def increment(state: State) -> dict:
return {
"count": state["count"] + 1,
"history": state["history"] + [f"Step {state['count'] + 1}"],
}
# 内存检查点
memory_checkpointer = MemorySaver()
# SQLite 持久化
conn = sqlite3.connect("checkpoints.db")
sqlite_checkpointer = SqliteSaver(conn)
# 构建图
graph = StateGraph(State)
graph.add_node("increment", increment)
graph.add_edge(START, "increment")
graph.add_edge("increment", END)
# 使用持久化检查点编译
app = graph.compile(checkpointer=sqlite_checkpointer)
# 运行(状态会被持久化)
config = {"configurable": {"thread_id": "session_001"}}
result = app.invoke({"count": 0, "history": []}, config)
7.2 恢复执行
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
class State(TypedDict):
step: int
data: str
def step_a(state: State) -> dict:
return {"step": 1, "data": "A 完成"}
def step_b(state: State) -> dict:
return {"step": 2, "data": state["data"] + " -> B 完成"}
def step_c(state: State) -> dict:
return {"step": 3, "data": state["data"] + " -> C 完成"}
graph = StateGraph(State)
graph.add_node("a", step_a)
graph.add_node("b", step_b)
graph.add_node("c", step_c)
graph.add_edge(START, "a")
graph.add_edge("a", "b")
graph.add_edge("b", "c")
graph.add_edge("c", END)
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)
# 运行
config = {"configurable": {"thread_id": "test"}}
result = app.invoke({"step": 0, "data": ""}, config)
# 查看历史状态
history = list(app.get_state_history(config))
for state in history:
print(f"Step: {state.values['step']}, Data: {state.values['data']}")
# 恢复到特定状态
app.update_state(config, {"step": 1, "data": "A 完成(恢复)"})
八、可视化与调试
8.1 图可视化
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
value: int
def node_a(state: State) -> dict:
return {"value": state["value"] + 1}
def node_b(state: State) -> dict:
return {"value": state["value"] * 2}
graph = StateGraph(State)
graph.add_node("a", node_a)
graph.add_node("b", node_b)
graph.add_edge(START, "a")
graph.add_edge("a", "b")
graph.add_edge("b", END)
app = graph.compile()
# 获取 Mermaid 图
mermaid = app.get_graph().draw_mermaid()
print(mermaid)
# 保存为图片(需要安装 graphviz)
# app.get_graph().draw_mermaid_png(output_file_path="graph.png")
8.2 执行追踪
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
class State(TypedDict):
value: int
logs: list[str]
def node_a(state: State) -> dict:
return {
"value": state["value"] + 1,
"logs": state["logs"] + ["执行了 node_a"],
}
def node_b(state: State) -> dict:
return {
"value": state["value"] * 2,
"logs": state["logs"] + ["执行了 node_b"],
}
graph = StateGraph(State)
graph.add_node("a", node_a)
graph.add_node("b", node_b)
graph.add_edge(START, "a")
graph.add_edge("a", "b")
graph.add_edge("b", END)
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)
# 运行
config = {"configurable": {"thread_id": "debug"}}
result = app.invoke({"value": 0, "logs": []}, config)
# 查看执行历史
for state in app.get_state_history(config):
print(f"Value: {state.values['value']}")
print(f"Logs: {state.values['logs']}")
print(f"Next: {state.next}")
print("---")
九、多代理协作
9.1 主从模式
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import BaseMessage
from langchain_openai import ChatOpenAI
class State(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
next_agent: str
llm = ChatOpenAI(model="gpt-4o")
def researcher(state: State) -> dict:
response = llm.invoke([
{"role": "system", "content": "你是研究员,负责收集信息。"},
*state["messages"],
])
return {
"messages": [response],
"next_agent": "writer",
}
def writer(state: State) -> dict:
response = llm.invoke([
{"role": "system", "content": "你是作家,负责撰写内容。"},
*state["messages"],
])
return {
"messages": [response],
"next_agent": "reviewer",
}
def reviewer(state: State) -> dict:
response = llm.invoke([
{"role": "system", "content": "你是审核员,负责检查质量。"},
*state["messages"],
])
return {
"messages": [response],
"next_agent": END,
}
def route(state: State) -> str:
return state["next_agent"]
# 构建图
graph = StateGraph(State)
graph.add_node("researcher", researcher)
graph.add_node("writer", writer)
graph.add_node("reviewer", reviewer)
graph.add_edge(START, "researcher")
graph.add_conditional_edges("researcher", route)
graph.add_conditional_edges("writer", route)
graph.add_conditional_edges("reviewer", route)
app = graph.compile()
9.2 层级图
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
# 子图
class SubState(TypedDict):
value: int
def sub_node_a(state: SubState) -> dict:
return {"value": state["value"] + 10}
def sub_node_b(state: SubState) -> dict:
return {"value": state["value"] * 2}
sub_graph = StateGraph(SubState)
sub_graph.add_node("a", sub_node_a)
sub_graph.add_node("b", sub_node_b)
sub_graph.add_edge(START, "a")
sub_graph.add_edge("a", "b")
sub_graph.add_edge("b", END)
sub_app = sub_graph.compile()
# 主图
class MainState(TypedDict):
value: int
processed: bool
def main_node(state: MainState) -> dict:
# 调用子图
result = sub_app.invoke({"value": state["value"]})
return {
"value": result["value"],
"processed": True,
}
main_graph = StateGraph(MainState)
main_graph.add_node("process", main_node)
main_graph.add_edge(START, "process")
main_graph.add_edge("process", END)
app = main_graph.compile()
result = app.invoke({"value": 5, "processed": False})
# result: {'value': 30, 'processed': True}
十、最佳实践
10.1 设计原则
| 原则 | 说明 |
|---|---|
| 状态最小化 | 只存储必要的状态字段 |
| 节点单一职责 | 每个节点只做一件事 |
| 明确路由条件 | 条件函数返回值要明确 |
| 使用检查点 | 生产环境启用持久化 |
| 可视化验证 | 开发时查看图结构 |
10.2 错误处理
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
value: int
error: str | None
def risky_operation(state: State) -> dict:
try:
result = 100 / state["value"]
return {"value": result, "error": None}
except Exception as e:
return {"error": str(e)}
def handle_error(state: State) -> dict:
return {"value": 0, "error": None}
def check_error(state: State) -> str:
if state["error"]:
return "error"
return "success"
graph = StateGraph(State)
graph.add_node("operation", risky_operation)
graph.add_node("error_handler", handle_error)
graph.add_edge(START, "operation")
graph.add_conditional_edges(
"operation",
check_error,
{"error": "error_handler", "success": END}
)
graph.add_edge("error_handler", END)
app = graph.compile()
10.3 性能优化
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
import asyncio
class State(TypedDict):
results: list[str]
async def parallel_task(name: str) -> str:
await asyncio.sleep(0.1)
return f"{name} 完成"
async def parallel_node(state: State) -> dict:
# 并行执行多个任务
results = await asyncio.gather(
parallel_task("A"),
parallel_task("B"),
parallel_task("C"),
)
return {"results": list(results)}
graph = StateGraph(State)
graph.add_node("parallel", parallel_node)
graph.add_edge(START, "parallel")
graph.add_edge("parallel", END)
app = graph.compile()
# 异步运行
import asyncio
result = asyncio.run(app.ainvoke({"results": []}))
十一、总结
11.1 LangGraph vs 传统 Agent
| 方面 | 传统 Agent | LangGraph |
|---|---|---|
| 状态管理 | 无/手动 | 内置 TypedDict |
| 控制流 | 线性/简单循环 | 图结构、复杂路由 |
| 持久化 | 需自行实现 | 内置检查点机制 |
| 可观测性 | 有限 | 完整的历史追踪 |
| 人机协作 | 困难 | 原生支持断点恢复 |
| 多代理 | 复杂 | 层级图支持 |
11.2 架构图
参考资料
更多推荐



所有评论(0)