LangChain 系列 ·(九):LangGraph——当 Agent 需要“记住状态“
概念核心要点State工作流的共享数据,用 TypedDict 定义;表示追加语义Node接收 State,返回需要更新的字段;不能直接修改传入的 state 对象条件边根据路由函数的返回值决定下一个节点,返回值必须在 mapping 中每步自动保存状态;thread_id是会话标识符;开发用 MemorySaver,生产用 SqliteSaver 或 PostgreSQL在指定节点暂停;从断点继
LangChain 系列 · 第九篇:LangGraph——当 Agent 需要"记住状态"
🎯 适合人群:已掌握 Agent 与 AgentExecutor,想构建有状态、可分支、支持人工干预的复杂工作流的工程师
⏱️ 阅读时间:约 35 分钟
💬 本文从 AgentExecutor 的局限出发,介绍 LangGraph 的 StateGraph 模型,涵盖节点、边、条件路由、状态持久化与 Human-in-the-loop
一、AgentExecutor 的天花板
第八篇结尾列出了 AgentExecutor 无法处理的几类场景。用一个具体例子来感受这个问题:
设计一个内容审核 Agent:用户提交文章后,Agent 先自动检查合规性。如果自动检查通过,直接发布;如果存疑,暂停并通知人工审核员;人工决定通过或拒绝后,Agent 继续执行后续操作。
用 AgentExecutor 实现这个流程,会遇到三个无法绕过的障碍:
- 无法暂停:AgentExecutor 的循环一旦启动就无法中途挂起,等待外部输入
- 无法分支:自动通过和人工审核是两条不同路径,AgentExecutor 只能线性执行
- 无法持久化:如果人工审核需要 10 分钟,进程中断后状态全部丢失
LangGraph 正是为了解决这些问题而设计的。它将工作流建模为有向图:节点是处理步骤,边是流转路径,状态在节点之间流动并可持久化。
二、核心概念
在写代码之前,先理解 LangGraph 的四个核心概念。
2.1 State(状态)
State 是贯穿整个工作流的共享数据结构,每个节点都可以读取和修改它。LangGraph 使用 Python 的 TypedDict 定义 State:
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
messages: Annotated[list, add_messages] # 对话消息列表,add_messages 表示追加而非覆盖
user_name: str # 普通字段,直接覆盖
step_count: int # 普通字段,直接覆盖
Annotated[list, add_messages] 中的 add_messages 是一个归约函数(Reducer)——它定义了当多个节点同时更新同一字段时如何合并结果。add_messages 的行为是追加新消息而不是覆盖整个列表,这对消息历史来说是正确的语义。
📝 归约函数(Reducer) 是 LangGraph 的状态合并机制。没有指定归约函数的字段(如
user_name)采用默认行为:后写入的值覆盖先写入的值。
2.2 Node(节点)
Node 是工作流中的一个处理步骤,本质上是一个 Python 函数:接收当前 State,返回对 State 的更新:
def my_node(state: AgentState) -> dict:
# 读取状态
messages = state["messages"]
# 执行逻辑
result = do_something(messages)
# 返回对状态的更新(只需返回要修改的字段)
return {"messages": [result], "step_count": state["step_count"] + 1}
节点只返回需要变更的字段,不需要返回完整的 State 对象。
2.3 Edge(边)
Edge 定义节点之间的流转关系,分为两种:
- 普通边:固定跳转,节点 A 执行完始终跳到节点 B
- 条件边:根据当前 State 的内容动态决定下一个节点
Normal Edge: Conditional Edge:
A ──► B A ──► router_fn ──► B (if condition_1)
└──► C (if condition_2)
└──► END (if done)
2.4 特殊节点:START 和 END
LangGraph 内置两个特殊节点:
START:图的入口,第一个节点从这里接收输入END:图的出口,流转到 END 表示工作流结束
三、安装与最小示例
pip install langgraph langchain-openai
用 LangGraph 重现第八篇的天气查询 Agent,感受两者的差异:
from dotenv import load_dotenv
from typing import TypedDict, Annotated
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
load_dotenv()
# --- State 定义 ---
class State(TypedDict):
messages: Annotated[list, add_messages]
# --- 工具定义 ---
@tool
def get_weather(city: str) -> str:
"""查询城市当前天气。
Args:
city: 城市中文名
"""
data = {"北京": "晴,22°C", "上海": "多云,26°C"}
return data.get(city, f"未找到 {city} 的数据")
tools = [get_weather]
tool_node = ToolNode(tools) # LangGraph 内置的工具执行节点
# --- 节点定义 ---
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0).bind_tools(tools)
def call_llm(state: State) -> dict:
"""调用 LLM,可能输出工具调用或最终答案"""
response = llm.invoke(state["messages"])
return {"messages": [response]}
def should_continue(state: State) -> str:
"""条件路由:判断是继续调用工具还是结束"""
last_message = state["messages"][-1]
if last_message.tool_calls:
return "call_tools" # 有工具调用,跳转到工具节点
return "end" # 没有工具调用,结束
# --- 构建图 ---
graph_builder = StateGraph(State)
# 添加节点
graph_builder.add_node("llm", call_llm)
graph_builder.add_node("tools", tool_node)
# 添加边
graph_builder.add_edge(START, "llm") # 入口 → LLM
graph_builder.add_conditional_edges( # LLM → 条件分支
"llm",
should_continue,
{"call_tools": "tools", "end": END},
)
graph_builder.add_edge("tools", "llm") # 工具结果 → 回到 LLM
graph = graph_builder.compile()
# --- 运行 ---
result = graph.invoke({"messages": [HumanMessage(content="北京今天天气怎么样?")]})
print(result["messages"][-1].content)
# "北京今天天气晴,22°C。"
这段代码与第八篇的 AgentExecutor 实现的是完全相同的逻辑,但架构完全不同:执行路径由图结构显式描述,每一步流转都一目了然。
四、可视化图结构
LangGraph 支持将图结构导出为 ASCII 或 Mermaid 格式,方便调试:
# ASCII 可视化
print(graph.get_graph().draw_ascii())
# Mermaid 格式(可粘贴到 mermaid.live 渲染)
print(graph.get_graph().draw_mermaid())
输出示意:
+-----------+
| __start__ |
+-----------+
|
v
+-------+
| llm |
+-------+
/ \
v v
+-------+ +-----+
| tools | | END |
+-------+ +-----+
|
v
+-------+
| llm | (loop back)
+-------+
五、条件路由:实现分支逻辑
条件路由是 LangGraph 相对于 AgentExecutor 最显著的优势之一。通过 add_conditional_edges,可以根据 State 的内容将流程导向不同的节点。
下面构建一个内容分类路由器:根据用户问题的类型,分发给不同的专家节点处理。
from typing import Literal
from langchain_core.prompts import ChatPromptTemplate
class RouterState(TypedDict):
messages: Annotated[list, add_messages]
question_type: str # 问题分类结果
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# 分类节点:判断问题属于哪个领域
def classify_question(state: RouterState) -> dict:
user_message = state["messages"][-1].content
classify_prompt = ChatPromptTemplate.from_messages([
("system",
"将用户问题分类为以下类别之一,只输出类别名称,不要其他内容:\n"
"weather(天气相关)、math(数学计算)、general(其他)"),
("human", "{question}"),
])
chain = classify_prompt | llm
result = chain.invoke({"question": user_message})
return {"question_type": result.content.strip().lower()}
# 三个专家节点
def handle_weather(state: RouterState) -> dict:
response = llm.invoke([
*state["messages"],
HumanMessage(content="(系统提示:你是天气专家,详细回答天气相关问题)"),
])
return {"messages": [response]}
def handle_math(state: RouterState) -> dict:
response = llm.invoke([
*state["messages"],
HumanMessage(content="(系统提示:你是数学专家,一步步展示计算过程)"),
])
return {"messages": [response]}
def handle_general(state: RouterState) -> dict:
response = llm.invoke(state["messages"])
return {"messages": [response]}
# 路由函数:根据分类结果决定下一个节点
def route_by_type(state: RouterState) -> Literal["weather", "math", "general"]:
return state["question_type"]
# 构建图
builder = StateGraph(RouterState)
builder.add_node("classify", classify_question)
builder.add_node("weather", handle_weather)
builder.add_node("math", handle_math)
builder.add_node("general", handle_general)
builder.add_edge(START, "classify")
builder.add_conditional_edges(
"classify",
route_by_type,
{
"weather": "weather",
"math": "math",
"general": "general",
},
)
builder.add_edge("weather", END)
builder.add_edge("math", END)
builder.add_edge("general", END)
router_graph = builder.compile()
# 测试
result = router_graph.invoke({
"messages": [HumanMessage(content="明天北京会下雨吗?")],
"question_type": "",
})
print(result["messages"][-1].content)
六、Checkpointer:持久化状态
Checkpointer 是 LangGraph 的状态持久化机制。每次节点执行完毕,Checkpointer 都会将当前 State 保存到存储后端,使得工作流可以在中断后从断点恢复。
📝 Checkpointer 类似于游戏的存档系统:每执行一步自动存档,下次运行时可以从任意历史存档点继续,而不必从头开始。
6.1 内存 Checkpointer(开发调试用)
from langgraph.checkpoint.memory import MemorySaver
# MemorySaver 将状态保存在内存中,进程重启后丢失
# 适合开发阶段快速验证逻辑
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)
# thread_id 是会话标识符,相同 thread_id 的调用共享同一个状态历史
config = {"configurable": {"thread_id": "user_session_001"}}
# 第一轮对话
result1 = graph.invoke(
{"messages": [HumanMessage(content="北京天气怎么样?")]},
config=config,
)
# 第二轮对话(同一 thread_id,LangGraph 自动加载上一轮的状态)
result2 = graph.invoke(
{"messages": [HumanMessage(content="那上海呢?")]},
config=config,
)
# 查看完整的对话历史
state = graph.get_state(config)
for msg in state.values["messages"]:
print(f"{type(msg).__name__}: {msg.content[:50]}")
6.2 数据库 Checkpointer(生产环境)
pip install langgraph-checkpoint-postgres # PostgreSQL
pip install langgraph-checkpoint-sqlite # SQLite(轻量级)
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
# SQLite:数据持久化到文件,进程重启后状态依然存在
conn = sqlite3.connect("checkpoints.db", check_same_thread=False)
checkpointer = SqliteSaver(conn)
graph = graph_builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "session_123"}}
# 第一次运行
graph.invoke({"messages": [HumanMessage(content="你好")]}, config=config)
# 即使进程重启,下次用相同 thread_id 运行时,LangGraph 会从数据库恢复之前的状态
graph.invoke({"messages": [HumanMessage(content="还记得我吗?")]}, config=config)
6.3 查看与回滚历史状态
Checkpointer 保存的不只是最新状态,而是完整的历史快照,支持时间旅行式调试:
# 列出所有历史状态(从最新到最早)
for checkpoint in graph.get_state_history(config):
print(f"步骤:{checkpoint.metadata.get('step', 0)}")
print(f"消息数:{len(checkpoint.values['messages'])}")
print(f"checkpoint_id:{checkpoint.config['configurable']['checkpoint_id']}")
print("---")
# 回滚到某个历史状态,从那个点重新执行
old_config = {
"configurable": {
"thread_id": "session_123",
"checkpoint_id": "某个历史 checkpoint_id",
}
}
graph.invoke({"messages": [HumanMessage(content="重新开始")]}, config=old_config)
七、Human-in-the-loop:中断与恢复
Human-in-the-loop(人机协作)是 LangGraph 相对于 AgentExecutor 最独特的能力:工作流可以在指定节点暂停,等待人工输入或审批后再继续。
📝 Human-in-the-loop 直译为"人在回路中",指在自动化流程的关键决策点引入人工判断,兼顾效率与安全。
7.1 interrupt_before:执行节点前暂停
# 在编译时指定需要暂停的节点
graph = graph_builder.compile(
checkpointer=memory,
interrupt_before=["tools"], # 在执行 tools 节点之前暂停
)
config = {"configurable": {"thread_id": "review_session"}}
# 第一次调用:运行到 tools 节点前自动暂停
result = graph.invoke(
{"messages": [HumanMessage(content="帮我删除所有临时文件")]},
config=config,
)
# 查看 Agent 计划调用什么工具
state = graph.get_state(config)
last_msg = state.values["messages"][-1]
print("Agent 计划执行:")
for tc in last_msg.tool_calls:
print(f" 工具:{tc['name']},参数:{tc['args']}")
# 人工审核:确认是否允许执行
user_input = input("是否允许执行以上操作?(y/n): ")
if user_input.lower() == "y":
# 传入 None 表示不修改状态,从暂停点继续执行
graph.invoke(None, config=config)
else:
# 注入一条工具结果,告知 Agent 操作被拒绝
from langchain_core.messages import ToolMessage
tool_call_id = last_msg.tool_calls[0]["id"]
graph.invoke(
{"messages": [ToolMessage(
content="操作已被用户拒绝,请告知用户并停止执行。",
tool_call_id=tool_call_id,
)]},
config=config,
)
7.2 interrupt_after:执行节点后暂停
# 工具执行完后暂停,让人工审核工具结果再决定是否继续
graph = graph_builder.compile(
checkpointer=memory,
interrupt_after=["tools"], # 工具执行完毕后暂停
)
7.3 完整的审批工作流示例
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool, ToolException
from langchain_core.messages import HumanMessage, ToolMessage
from langgraph.prebuilt import ToolNode
# --- 定义有风险的工具 ---
@tool
def delete_files(pattern: str) -> str:
"""删除匹配指定模式的文件。这是一个危险操作,需要人工确认。
Args:
pattern: 文件匹配模式,如 "*.tmp"、"/var/log/*.log"
"""
# 实际场景中执行删除操作
return f"已删除匹配 '{pattern}' 的文件(模拟)"
@tool
def send_notification(email: str, message: str) -> str:
"""向指定邮箱发送通知邮件。
Args:
email: 收件人邮箱
message: 邮件内容
"""
return f"邮件已发送至 {email}(模拟)"
tools = [delete_files, send_notification]
tool_node = ToolNode(tools)
# --- 图定义 ---
class WorkflowState(TypedDict):
messages: Annotated[list, add_messages]
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0).bind_tools(tools)
def call_llm(state: WorkflowState) -> dict:
return {"messages": [llm.invoke(state["messages"])]}
def should_continue(state: WorkflowState) -> Literal["tools", "end"]:
if state["messages"][-1].tool_calls:
return "tools"
return "end"
builder = StateGraph(WorkflowState)
builder.add_node("llm", call_llm)
builder.add_node("tools", tool_node)
builder.add_edge(START, "llm")
builder.add_conditional_edges("llm", should_continue, {"tools": "tools", "end": END})
builder.add_edge("tools", "llm")
memory = MemorySaver()
# interrupt_before=["tools"]:每次执行工具前都需要人工确认
approval_graph = builder.compile(checkpointer=memory, interrupt_before=["tools"])
# --- 审批流程 ---
def run_with_approval(user_input: str, session_id: str):
config = {"configurable": {"thread_id": session_id}}
# 启动工作流
approval_graph.invoke(
{"messages": [HumanMessage(content=user_input)]},
config=config,
)
# 检查是否在等待人工审批
while True:
state = approval_graph.get_state(config)
# next 为空表示工作流已完成
if not state.next:
final = state.values["messages"][-1].content
print(f"\n最终答案:{final}")
break
# 显示待审批的工具调用
pending_calls = state.values["messages"][-1].tool_calls
print("\n=== 等待人工审批 ===")
for tc in pending_calls:
print(f"工具:{tc['name']}")
print(f"参数:{tc['args']}")
decision = input("批准执行?(y=批准 / n=拒绝): ").strip().lower()
if decision == "y":
# 批准:继续执行
approval_graph.invoke(None, config=config)
else:
# 拒绝:注入拒绝消息,让 LLM 做出响应
rejected_msgs = [
ToolMessage(
content="操作已被管理员拒绝,请告知用户并停止执行。",
tool_call_id=tc["id"],
)
for tc in pending_calls
]
approval_graph.invoke({"messages": rejected_msgs}, config=config)
# 测试
run_with_approval(
"帮我清理 /tmp 目录下的临时文件,并发邮件通知 admin@example.com",
session_id="approval_demo"
)
八、实战:带记忆的多轮 Agent
将 Checkpointer 与完整的工具调用 Agent 结合,构建一个跨会话保持记忆的助手:
from dotenv import load_dotenv
from typing import TypedDict, Annotated
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver
load_dotenv()
# --- 工具 ---
@tool
def get_weather(city: str) -> str:
"""查询城市天气。Args: city: 城市中文名"""
data = {"北京": "晴 22°C", "上海": "多云 26°C", "广州": "阵雨 31°C"}
return data.get(city, f"未找到 {city}")
@tool
def calculate(expression: str) -> str:
"""计算数学表达式。Args: expression: Python 数学表达式"""
try:
return str(eval(expression, {"__builtins__": {}}))
except Exception as e:
return f"计算失败:{e}"
tools = [get_weather, calculate]
# --- 图 ---
class State(TypedDict):
messages: Annotated[list, add_messages]
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0).bind_tools(tools)
tool_node = ToolNode(tools)
SYSTEM_PROMPT = SystemMessage(content=(
"你是一个智能助手,能查询天气和进行数学计算。"
"记住对话历史,前后保持上下文连贯。"
))
def call_llm(state: State) -> dict:
messages = [SYSTEM_PROMPT] + state["messages"]
return {"messages": [llm.invoke(messages)]}
def should_continue(state: State) -> str:
if state["messages"][-1].tool_calls:
return "tools"
return "end"
builder = StateGraph(State)
builder.add_node("llm", call_llm)
builder.add_node("tools", tool_node)
builder.add_edge(START, "llm")
builder.add_conditional_edges("llm", should_continue, {"tools": "tools", "end": END})
builder.add_edge("tools", "llm")
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
# --- 多轮对话 ---
def chat(session_id: str, user_input: str) -> str:
config = {"configurable": {"thread_id": session_id}}
result = graph.invoke(
{"messages": [HumanMessage(content=user_input)]},
config=config,
)
return result["messages"][-1].content
# 同一 session_id,状态跨轮次保持
sid = "demo_session"
print(chat(sid, "北京和上海今天天气怎么样?"))
print(chat(sid, "两个城市的温度差是多少?")) # Agent 记得上一轮的温度数据
print(chat(sid, "北京那边适合穿什么衣服?")) # 继续追问北京
九、LangGraph vs AgentExecutor 对比
| 维度 | AgentExecutor | LangGraph |
|---|---|---|
| 执行路径 | 线性循环,代码固定 | 有向图,动态路由 |
| 分支逻辑 | 不支持 | 条件边,任意分支 |
| 状态持久化 | 不支持 | Checkpointer,支持内存/SQLite/PostgreSQL |
| Human-in-the-loop | 不支持 | interrupt_before/after,原生支持 |
| 调试可视化 | verbose=True 打印日志 | draw_ascii/draw_mermaid 图形化 |
| 适用场景 | 简单线性工具调用 | 复杂工作流、生产级 Agent |
| 学习曲线 | 低 | 中(需理解图模型) |
🤔 什么时候该用 LangGraph 而不是 AgentExecutor? 判断标准很简单:如果工作流需要"等待外部输入"或"根据结果走不同路径",就用 LangGraph;如果只是"调几个工具然后给答案",AgentExecutor 已经足够。
十、常见坑与最佳实践
坑一:State 字段更新方式理解错误
# ❌ 以为直接赋值就能更新 State
class State(TypedDict):
count: int
def my_node(state: State) -> dict:
state["count"] += 1 # 错误!不能直接修改传入的 state
return {} # 返回空 dict,count 不会变化
# ✅ 在返回值中声明需要更新的字段
def my_node(state: State) -> dict:
return {"count": state["count"] + 1}
坑二:messages 字段忘记使用 add_messages
# ❌ 没有使用 add_messages,每次更新都会覆盖整个消息列表
class State(TypedDict):
messages: list # 没有 Annotated + add_messages
# ✅ 正确定义,确保消息历史是追加而非覆盖
class State(TypedDict):
messages: Annotated[list, add_messages]
坑三:条件路由函数的返回值与边的 mapping 不匹配
# ❌ 路由函数返回 "call_tools",但 mapping 里写的是 "tools"
def should_continue(state):
return "call_tools" # 实际返回值
graph.add_conditional_edges(
"llm",
should_continue,
{"tools": "tool_node", "end": END}, # ❌ mapping 中没有 "call_tools" 这个 key
)
# ✅ 路由函数返回值必须在 mapping 的 key 中
def should_continue(state):
return "tools" # 与 mapping key 完全一致
graph.add_conditional_edges(
"llm",
should_continue,
{"tools": "tool_node", "end": END},
)
坑四:使用 Checkpointer 但忘记传 config
# ❌ 没有传 config,Checkpointer 无法关联 thread_id,状态无法跨轮次保持
graph.invoke({"messages": [HumanMessage(content="你好")]})
# ✅ 每次调用都传入相同的 config
config = {"configurable": {"thread_id": "my_session"}}
graph.invoke({"messages": [HumanMessage(content="你好")]}, config=config)
graph.invoke({"messages": [HumanMessage(content="还记得我吗?")]}, config=config)
坑五:interrupt_before 的节点名称拼写错误
# ❌ 节点名是 "tool_node",interrupt_before 写成了 "tools"
builder.add_node("tool_node", tool_node_fn)
graph = builder.compile(
checkpointer=memory,
interrupt_before=["tools"], # 节点不存在,不会暂停,也不报错
)
# ✅ 名称必须与 add_node 时完全一致
graph = builder.compile(
checkpointer=memory,
interrupt_before=["tool_node"],
)
十一、总结
| 概念 | 核心要点 |
|---|---|
| State | 工作流的共享数据,用 TypedDict 定义;Annotated[list, add_messages] 表示追加语义 |
| Node | 接收 State,返回需要更新的字段;不能直接修改传入的 state 对象 |
| 条件边 | add_conditional_edges 根据路由函数的返回值决定下一个节点,返回值必须在 mapping 中 |
| Checkpointer | 每步自动保存状态;thread_id 是会话标识符;开发用 MemorySaver,生产用 SqliteSaver 或 PostgreSQL |
| Human-in-the-loop | interrupt_before/after 在指定节点暂停;graph.invoke(None, config) 从断点继续 |
| ToolNode | LangGraph 内置的工具执行节点,自动处理工具调用和 ToolMessage 的构造 |
🎯 LangGraph 的核心价值不在于"让 Agent 更智能",而在于"让 Agent 的行为可控、可追溯、可恢复"。Checkpointer + Human-in-the-loop 是将 Agent 从实验室带入生产环境的关键基础设施。
参考资料
下期预告
工作流搭好了,但 LLM 应用上线后怎么知道它运行得好不好?调用哪一步出了问题?哪条 Prompt 效果最好?
第十篇《LangSmith:调试和监控你的 LLM 应用》 将介绍 LangSmith 的链路追踪(Tracing)、评估数据集管理和实验对比功能——从"靠日志猜问题"升级为"用数据找问题"。
更多推荐



所有评论(0)