LangChain 系列 · 第九篇:LangGraph——当 Agent 需要"记住状态"

🎯 适合人群:已掌握 Agent 与 AgentExecutor,想构建有状态、可分支、支持人工干预的复杂工作流的工程师
⏱️ 阅读时间:约 35 分钟
💬 本文从 AgentExecutor 的局限出发,介绍 LangGraph 的 StateGraph 模型,涵盖节点、边、条件路由、状态持久化与 Human-in-the-loop


一、AgentExecutor 的天花板

第八篇结尾列出了 AgentExecutor 无法处理的几类场景。用一个具体例子来感受这个问题:

设计一个内容审核 Agent:用户提交文章后,Agent 先自动检查合规性。如果自动检查通过,直接发布;如果存疑,暂停并通知人工审核员;人工决定通过或拒绝后,Agent 继续执行后续操作。

用 AgentExecutor 实现这个流程,会遇到三个无法绕过的障碍:

  1. 无法暂停:AgentExecutor 的循环一旦启动就无法中途挂起,等待外部输入
  2. 无法分支:自动通过和人工审核是两条不同路径,AgentExecutor 只能线性执行
  3. 无法持久化:如果人工审核需要 10 分钟,进程中断后状态全部丢失

LangGraph 正是为了解决这些问题而设计的。它将工作流建模为有向图:节点是处理步骤,边是流转路径,状态在节点之间流动并可持久化。


二、核心概念

在写代码之前,先理解 LangGraph 的四个核心概念。

2.1 State(状态)

State 是贯穿整个工作流的共享数据结构,每个节点都可以读取和修改它。LangGraph 使用 Python 的 TypedDict 定义 State:

from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]  # 对话消息列表,add_messages 表示追加而非覆盖
    user_name: str                           # 普通字段,直接覆盖
    step_count: int                          # 普通字段,直接覆盖

Annotated[list, add_messages] 中的 add_messages 是一个归约函数(Reducer)——它定义了当多个节点同时更新同一字段时如何合并结果。add_messages 的行为是追加新消息而不是覆盖整个列表,这对消息历史来说是正确的语义。

📝 归约函数(Reducer) 是 LangGraph 的状态合并机制。没有指定归约函数的字段(如 user_name)采用默认行为:后写入的值覆盖先写入的值。

2.2 Node(节点)

Node 是工作流中的一个处理步骤,本质上是一个 Python 函数:接收当前 State,返回对 State 的更新:

def my_node(state: AgentState) -> dict:
    # 读取状态
    messages = state["messages"]

    # 执行逻辑
    result = do_something(messages)

    # 返回对状态的更新(只需返回要修改的字段)
    return {"messages": [result], "step_count": state["step_count"] + 1}

节点只返回需要变更的字段,不需要返回完整的 State 对象。

2.3 Edge(边)

Edge 定义节点之间的流转关系,分为两种:

  • 普通边:固定跳转,节点 A 执行完始终跳到节点 B
  • 条件边:根据当前 State 的内容动态决定下一个节点
  Normal Edge:        Conditional Edge:
  A ──► B             A ──► router_fn ──► B (if condition_1)
                                     └──► C (if condition_2)
                                     └──► END (if done)

2.4 特殊节点:START 和 END

LangGraph 内置两个特殊节点:

  • START:图的入口,第一个节点从这里接收输入
  • END:图的出口,流转到 END 表示工作流结束

三、安装与最小示例

pip install langgraph langchain-openai

用 LangGraph 重现第八篇的天气查询 Agent,感受两者的差异:

from dotenv import load_dotenv
from typing import TypedDict, Annotated
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode

load_dotenv()

# --- State 定义 ---

class State(TypedDict):
    messages: Annotated[list, add_messages]

# --- 工具定义 ---

@tool
def get_weather(city: str) -> str:
    """查询城市当前天气。

    Args:
        city: 城市中文名
    """
    data = {"北京": "晴,22°C", "上海": "多云,26°C"}
    return data.get(city, f"未找到 {city} 的数据")

tools = [get_weather]
tool_node = ToolNode(tools)  # LangGraph 内置的工具执行节点

# --- 节点定义 ---

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0).bind_tools(tools)

def call_llm(state: State) -> dict:
    """调用 LLM,可能输出工具调用或最终答案"""
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

def should_continue(state: State) -> str:
    """条件路由:判断是继续调用工具还是结束"""
    last_message = state["messages"][-1]
    if last_message.tool_calls:
        return "call_tools"   # 有工具调用,跳转到工具节点
    return "end"              # 没有工具调用,结束

# --- 构建图 ---

graph_builder = StateGraph(State)

# 添加节点
graph_builder.add_node("llm", call_llm)
graph_builder.add_node("tools", tool_node)

# 添加边
graph_builder.add_edge(START, "llm")                    # 入口 → LLM
graph_builder.add_conditional_edges(                     # LLM → 条件分支
    "llm",
    should_continue,
    {"call_tools": "tools", "end": END},
)
graph_builder.add_edge("tools", "llm")                  # 工具结果 → 回到 LLM

graph = graph_builder.compile()

# --- 运行 ---

result = graph.invoke({"messages": [HumanMessage(content="北京今天天气怎么样?")]})
print(result["messages"][-1].content)
# "北京今天天气晴,22°C。"

这段代码与第八篇的 AgentExecutor 实现的是完全相同的逻辑,但架构完全不同:执行路径由图结构显式描述,每一步流转都一目了然。


四、可视化图结构

LangGraph 支持将图结构导出为 ASCII 或 Mermaid 格式,方便调试:

# ASCII 可视化
print(graph.get_graph().draw_ascii())

# Mermaid 格式(可粘贴到 mermaid.live 渲染)
print(graph.get_graph().draw_mermaid())

输出示意:

        +-----------+
        | __start__ |
        +-----------+
              |
              v
          +-------+
          |  llm  |
          +-------+
         /         \
        v           v
   +-------+      +-----+
   | tools |      | END |
   +-------+      +-----+
       |
       v
   +-------+
   |  llm  |  (loop back)
   +-------+

五、条件路由:实现分支逻辑

条件路由是 LangGraph 相对于 AgentExecutor 最显著的优势之一。通过 add_conditional_edges,可以根据 State 的内容将流程导向不同的节点。

下面构建一个内容分类路由器:根据用户问题的类型,分发给不同的专家节点处理。

from typing import Literal
from langchain_core.prompts import ChatPromptTemplate

class RouterState(TypedDict):
    messages: Annotated[list, add_messages]
    question_type: str   # 问题分类结果

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# 分类节点:判断问题属于哪个领域
def classify_question(state: RouterState) -> dict:
    user_message = state["messages"][-1].content
    classify_prompt = ChatPromptTemplate.from_messages([
        ("system",
         "将用户问题分类为以下类别之一,只输出类别名称,不要其他内容:\n"
         "weather(天气相关)、math(数学计算)、general(其他)"),
        ("human", "{question}"),
    ])
    chain = classify_prompt | llm
    result = chain.invoke({"question": user_message})
    return {"question_type": result.content.strip().lower()}

# 三个专家节点
def handle_weather(state: RouterState) -> dict:
    response = llm.invoke([
        *state["messages"],
        HumanMessage(content="(系统提示:你是天气专家,详细回答天气相关问题)"),
    ])
    return {"messages": [response]}

def handle_math(state: RouterState) -> dict:
    response = llm.invoke([
        *state["messages"],
        HumanMessage(content="(系统提示:你是数学专家,一步步展示计算过程)"),
    ])
    return {"messages": [response]}

def handle_general(state: RouterState) -> dict:
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

# 路由函数:根据分类结果决定下一个节点
def route_by_type(state: RouterState) -> Literal["weather", "math", "general"]:
    return state["question_type"]

# 构建图
builder = StateGraph(RouterState)
builder.add_node("classify", classify_question)
builder.add_node("weather", handle_weather)
builder.add_node("math", handle_math)
builder.add_node("general", handle_general)

builder.add_edge(START, "classify")
builder.add_conditional_edges(
    "classify",
    route_by_type,
    {
        "weather": "weather",
        "math": "math",
        "general": "general",
    },
)
builder.add_edge("weather", END)
builder.add_edge("math", END)
builder.add_edge("general", END)

router_graph = builder.compile()

# 测试
result = router_graph.invoke({
    "messages": [HumanMessage(content="明天北京会下雨吗?")],
    "question_type": "",
})
print(result["messages"][-1].content)

六、Checkpointer:持久化状态

Checkpointer 是 LangGraph 的状态持久化机制。每次节点执行完毕,Checkpointer 都会将当前 State 保存到存储后端,使得工作流可以在中断后从断点恢复。

📝 Checkpointer 类似于游戏的存档系统:每执行一步自动存档,下次运行时可以从任意历史存档点继续,而不必从头开始。

6.1 内存 Checkpointer(开发调试用)

from langgraph.checkpoint.memory import MemorySaver

# MemorySaver 将状态保存在内存中,进程重启后丢失
# 适合开发阶段快速验证逻辑
memory = MemorySaver()

graph = graph_builder.compile(checkpointer=memory)

# thread_id 是会话标识符,相同 thread_id 的调用共享同一个状态历史
config = {"configurable": {"thread_id": "user_session_001"}}

# 第一轮对话
result1 = graph.invoke(
    {"messages": [HumanMessage(content="北京天气怎么样?")]},
    config=config,
)

# 第二轮对话(同一 thread_id,LangGraph 自动加载上一轮的状态)
result2 = graph.invoke(
    {"messages": [HumanMessage(content="那上海呢?")]},
    config=config,
)

# 查看完整的对话历史
state = graph.get_state(config)
for msg in state.values["messages"]:
    print(f"{type(msg).__name__}: {msg.content[:50]}")

6.2 数据库 Checkpointer(生产环境)

pip install langgraph-checkpoint-postgres  # PostgreSQL
pip install langgraph-checkpoint-sqlite    # SQLite(轻量级)
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3

# SQLite:数据持久化到文件,进程重启后状态依然存在
conn = sqlite3.connect("checkpoints.db", check_same_thread=False)
checkpointer = SqliteSaver(conn)

graph = graph_builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "session_123"}}

# 第一次运行
graph.invoke({"messages": [HumanMessage(content="你好")]}, config=config)

# 即使进程重启,下次用相同 thread_id 运行时,LangGraph 会从数据库恢复之前的状态
graph.invoke({"messages": [HumanMessage(content="还记得我吗?")]}, config=config)

6.3 查看与回滚历史状态

Checkpointer 保存的不只是最新状态,而是完整的历史快照,支持时间旅行式调试:

# 列出所有历史状态(从最新到最早)
for checkpoint in graph.get_state_history(config):
    print(f"步骤:{checkpoint.metadata.get('step', 0)}")
    print(f"消息数:{len(checkpoint.values['messages'])}")
    print(f"checkpoint_id:{checkpoint.config['configurable']['checkpoint_id']}")
    print("---")

# 回滚到某个历史状态,从那个点重新执行
old_config = {
    "configurable": {
        "thread_id": "session_123",
        "checkpoint_id": "某个历史 checkpoint_id",
    }
}
graph.invoke({"messages": [HumanMessage(content="重新开始")]}, config=old_config)

七、Human-in-the-loop:中断与恢复

Human-in-the-loop(人机协作)是 LangGraph 相对于 AgentExecutor 最独特的能力:工作流可以在指定节点暂停,等待人工输入或审批后再继续。

📝 Human-in-the-loop 直译为"人在回路中",指在自动化流程的关键决策点引入人工判断,兼顾效率与安全。

7.1 interrupt_before:执行节点前暂停

# 在编译时指定需要暂停的节点
graph = graph_builder.compile(
    checkpointer=memory,
    interrupt_before=["tools"],  # 在执行 tools 节点之前暂停
)

config = {"configurable": {"thread_id": "review_session"}}

# 第一次调用:运行到 tools 节点前自动暂停
result = graph.invoke(
    {"messages": [HumanMessage(content="帮我删除所有临时文件")]},
    config=config,
)

# 查看 Agent 计划调用什么工具
state = graph.get_state(config)
last_msg = state.values["messages"][-1]
print("Agent 计划执行:")
for tc in last_msg.tool_calls:
    print(f"  工具:{tc['name']},参数:{tc['args']}")

# 人工审核:确认是否允许执行
user_input = input("是否允许执行以上操作?(y/n): ")

if user_input.lower() == "y":
    # 传入 None 表示不修改状态,从暂停点继续执行
    graph.invoke(None, config=config)
else:
    # 注入一条工具结果,告知 Agent 操作被拒绝
    from langchain_core.messages import ToolMessage
    tool_call_id = last_msg.tool_calls[0]["id"]
    graph.invoke(
        {"messages": [ToolMessage(
            content="操作已被用户拒绝,请告知用户并停止执行。",
            tool_call_id=tool_call_id,
        )]},
        config=config,
    )

7.2 interrupt_after:执行节点后暂停

# 工具执行完后暂停,让人工审核工具结果再决定是否继续
graph = graph_builder.compile(
    checkpointer=memory,
    interrupt_after=["tools"],  # 工具执行完毕后暂停
)

7.3 完整的审批工作流示例

from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool, ToolException
from langchain_core.messages import HumanMessage, ToolMessage
from langgraph.prebuilt import ToolNode

# --- 定义有风险的工具 ---

@tool
def delete_files(pattern: str) -> str:
    """删除匹配指定模式的文件。这是一个危险操作,需要人工确认。

    Args:
        pattern: 文件匹配模式,如 "*.tmp"、"/var/log/*.log"
    """
    # 实际场景中执行删除操作
    return f"已删除匹配 '{pattern}' 的文件(模拟)"

@tool
def send_notification(email: str, message: str) -> str:
    """向指定邮箱发送通知邮件。

    Args:
        email: 收件人邮箱
        message: 邮件内容
    """
    return f"邮件已发送至 {email}(模拟)"

tools = [delete_files, send_notification]
tool_node = ToolNode(tools)

# --- 图定义 ---

class WorkflowState(TypedDict):
    messages: Annotated[list, add_messages]

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0).bind_tools(tools)

def call_llm(state: WorkflowState) -> dict:
    return {"messages": [llm.invoke(state["messages"])]}

def should_continue(state: WorkflowState) -> Literal["tools", "end"]:
    if state["messages"][-1].tool_calls:
        return "tools"
    return "end"

builder = StateGraph(WorkflowState)
builder.add_node("llm", call_llm)
builder.add_node("tools", tool_node)
builder.add_edge(START, "llm")
builder.add_conditional_edges("llm", should_continue, {"tools": "tools", "end": END})
builder.add_edge("tools", "llm")

memory = MemorySaver()
# interrupt_before=["tools"]:每次执行工具前都需要人工确认
approval_graph = builder.compile(checkpointer=memory, interrupt_before=["tools"])

# --- 审批流程 ---

def run_with_approval(user_input: str, session_id: str):
    config = {"configurable": {"thread_id": session_id}}

    # 启动工作流
    approval_graph.invoke(
        {"messages": [HumanMessage(content=user_input)]},
        config=config,
    )

    # 检查是否在等待人工审批
    while True:
        state = approval_graph.get_state(config)

        # next 为空表示工作流已完成
        if not state.next:
            final = state.values["messages"][-1].content
            print(f"\n最终答案:{final}")
            break

        # 显示待审批的工具调用
        pending_calls = state.values["messages"][-1].tool_calls
        print("\n=== 等待人工审批 ===")
        for tc in pending_calls:
            print(f"工具:{tc['name']}")
            print(f"参数:{tc['args']}")

        decision = input("批准执行?(y=批准 / n=拒绝): ").strip().lower()

        if decision == "y":
            # 批准:继续执行
            approval_graph.invoke(None, config=config)
        else:
            # 拒绝:注入拒绝消息,让 LLM 做出响应
            rejected_msgs = [
                ToolMessage(
                    content="操作已被管理员拒绝,请告知用户并停止执行。",
                    tool_call_id=tc["id"],
                )
                for tc in pending_calls
            ]
            approval_graph.invoke({"messages": rejected_msgs}, config=config)

# 测试
run_with_approval(
    "帮我清理 /tmp 目录下的临时文件,并发邮件通知 admin@example.com",
    session_id="approval_demo"
)

八、实战:带记忆的多轮 Agent

将 Checkpointer 与完整的工具调用 Agent 结合,构建一个跨会话保持记忆的助手:

from dotenv import load_dotenv
from typing import TypedDict, Annotated
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver

load_dotenv()

# --- 工具 ---

@tool
def get_weather(city: str) -> str:
    """查询城市天气。Args: city: 城市中文名"""
    data = {"北京": "晴 22°C", "上海": "多云 26°C", "广州": "阵雨 31°C"}
    return data.get(city, f"未找到 {city}")

@tool
def calculate(expression: str) -> str:
    """计算数学表达式。Args: expression: Python 数学表达式"""
    try:
        return str(eval(expression, {"__builtins__": {}}))
    except Exception as e:
        return f"计算失败:{e}"

tools = [get_weather, calculate]

# --- 图 ---

class State(TypedDict):
    messages: Annotated[list, add_messages]

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0).bind_tools(tools)
tool_node = ToolNode(tools)

SYSTEM_PROMPT = SystemMessage(content=(
    "你是一个智能助手,能查询天气和进行数学计算。"
    "记住对话历史,前后保持上下文连贯。"
))

def call_llm(state: State) -> dict:
    messages = [SYSTEM_PROMPT] + state["messages"]
    return {"messages": [llm.invoke(messages)]}

def should_continue(state: State) -> str:
    if state["messages"][-1].tool_calls:
        return "tools"
    return "end"

builder = StateGraph(State)
builder.add_node("llm", call_llm)
builder.add_node("tools", tool_node)
builder.add_edge(START, "llm")
builder.add_conditional_edges("llm", should_continue, {"tools": "tools", "end": END})
builder.add_edge("tools", "llm")

memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

# --- 多轮对话 ---

def chat(session_id: str, user_input: str) -> str:
    config = {"configurable": {"thread_id": session_id}}
    result = graph.invoke(
        {"messages": [HumanMessage(content=user_input)]},
        config=config,
    )
    return result["messages"][-1].content

# 同一 session_id,状态跨轮次保持
sid = "demo_session"
print(chat(sid, "北京和上海今天天气怎么样?"))
print(chat(sid, "两个城市的温度差是多少?"))   # Agent 记得上一轮的温度数据
print(chat(sid, "北京那边适合穿什么衣服?"))    # 继续追问北京

九、LangGraph vs AgentExecutor 对比

维度 AgentExecutor LangGraph
执行路径 线性循环,代码固定 有向图,动态路由
分支逻辑 不支持 条件边,任意分支
状态持久化 不支持 Checkpointer,支持内存/SQLite/PostgreSQL
Human-in-the-loop 不支持 interrupt_before/after,原生支持
调试可视化 verbose=True 打印日志 draw_ascii/draw_mermaid 图形化
适用场景 简单线性工具调用 复杂工作流、生产级 Agent
学习曲线 中(需理解图模型)

🤔 什么时候该用 LangGraph 而不是 AgentExecutor? 判断标准很简单:如果工作流需要"等待外部输入"或"根据结果走不同路径",就用 LangGraph;如果只是"调几个工具然后给答案",AgentExecutor 已经足够。


十、常见坑与最佳实践

坑一:State 字段更新方式理解错误

# ❌ 以为直接赋值就能更新 State
class State(TypedDict):
    count: int

def my_node(state: State) -> dict:
    state["count"] += 1  # 错误!不能直接修改传入的 state
    return {}            # 返回空 dict,count 不会变化

# ✅ 在返回值中声明需要更新的字段
def my_node(state: State) -> dict:
    return {"count": state["count"] + 1}

坑二:messages 字段忘记使用 add_messages

# ❌ 没有使用 add_messages,每次更新都会覆盖整个消息列表
class State(TypedDict):
    messages: list   # 没有 Annotated + add_messages

# ✅ 正确定义,确保消息历史是追加而非覆盖
class State(TypedDict):
    messages: Annotated[list, add_messages]

坑三:条件路由函数的返回值与边的 mapping 不匹配

# ❌ 路由函数返回 "call_tools",但 mapping 里写的是 "tools"
def should_continue(state):
    return "call_tools"  # 实际返回值

graph.add_conditional_edges(
    "llm",
    should_continue,
    {"tools": "tool_node", "end": END},  # ❌ mapping 中没有 "call_tools" 这个 key
)

# ✅ 路由函数返回值必须在 mapping 的 key 中
def should_continue(state):
    return "tools"  # 与 mapping key 完全一致

graph.add_conditional_edges(
    "llm",
    should_continue,
    {"tools": "tool_node", "end": END},
)

坑四:使用 Checkpointer 但忘记传 config

# ❌ 没有传 config,Checkpointer 无法关联 thread_id,状态无法跨轮次保持
graph.invoke({"messages": [HumanMessage(content="你好")]})

# ✅ 每次调用都传入相同的 config
config = {"configurable": {"thread_id": "my_session"}}
graph.invoke({"messages": [HumanMessage(content="你好")]}, config=config)
graph.invoke({"messages": [HumanMessage(content="还记得我吗?")]}, config=config)

坑五:interrupt_before 的节点名称拼写错误

# ❌ 节点名是 "tool_node",interrupt_before 写成了 "tools"
builder.add_node("tool_node", tool_node_fn)
graph = builder.compile(
    checkpointer=memory,
    interrupt_before=["tools"],   # 节点不存在,不会暂停,也不报错
)

# ✅ 名称必须与 add_node 时完全一致
graph = builder.compile(
    checkpointer=memory,
    interrupt_before=["tool_node"],
)

十一、总结

概念 核心要点
State 工作流的共享数据,用 TypedDict 定义;Annotated[list, add_messages] 表示追加语义
Node 接收 State,返回需要更新的字段;不能直接修改传入的 state 对象
条件边 add_conditional_edges 根据路由函数的返回值决定下一个节点,返回值必须在 mapping 中
Checkpointer 每步自动保存状态;thread_id 是会话标识符;开发用 MemorySaver,生产用 SqliteSaver 或 PostgreSQL
Human-in-the-loop interrupt_before/after 在指定节点暂停;graph.invoke(None, config) 从断点继续
ToolNode LangGraph 内置的工具执行节点,自动处理工具调用和 ToolMessage 的构造

🎯 LangGraph 的核心价值不在于"让 Agent 更智能",而在于"让 Agent 的行为可控、可追溯、可恢复"。Checkpointer + Human-in-the-loop 是将 Agent 从实验室带入生产环境的关键基础设施。


参考资料


下期预告

工作流搭好了,但 LLM 应用上线后怎么知道它运行得好不好?调用哪一步出了问题?哪条 Prompt 效果最好?

第十篇《LangSmith:调试和监控你的 LLM 应用》 将介绍 LangSmith 的链路追踪(Tracing)、评估数据集管理和实验对比功能——从"靠日志猜问题"升级为"用数据找问题"。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐