玄同 765

大语言模型 (LLM) 开发工程师 | 中国传媒大学 · 数字媒体技术(智能交互与游戏设计)

CSDN · 个人主页 | GitHub · Follow


关于作者

  • 深耕领域:大语言模型开发 / RAG 知识库 / AI Agent 落地 / 模型微调
  • 技术栈:Python | RAG (LangChain / Dify + Milvus) | FastAPI + Docker
  • 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案

「让 AI 交互更智能,让技术落地更高效」
欢迎技术探讨与项目合作,解锁大模型与智能交互的无限可能!


LangGraph 状态图编排深度解析

摘要

LangGraph 是 LangChain 生态中用于构建有状态、多代理应用的框架。它基于图结构定义应用逻辑,支持循环、分支、持久化等高级特性。本文将深入解析 LangGraph 的核心概念、状态管理、节点与边、条件路由、人机协作等特性,帮助开发者构建复杂的生产级 AI 应用。


一、LangGraph 概述

1.1 为什么需要 LangGraph

传统的 Chain 和 Agent 存在以下局限:

局限性 说明 LangGraph 解决方案
无状态 Chain 不保存状态 内置状态管理
线性流程 难以实现复杂控制流 图结构支持循环和分支
难以调试 执行过程不透明 可视化和检查支持
无法暂停 必须一次性执行完 支持断点和恢复

1.2 LangGraph 核心概念

核心概念

State
状态

Node
节点

Edge

Graph

TypedDict 定义
状态在节点间传递

函数/Runnable
处理状态并返回更新

定义执行路径
支持条件路由

编译后的可执行图
可持久化、可恢复

1.3 Hello LangGraph

from typing import TypedDict
from langgraph.graph import StateGraph, START, END

# 1. 定义状态
class State(TypedDict):
    count: int
    message: str

# 2. 定义节点函数
def increment(state: State) -> dict:
    return {"count": state["count"] + 1}

def greet(state: State) -> dict:
    return {"message": f"计数: {state['count']}"}

# 3. 构建图
graph = StateGraph(State)
graph.add_node("increment", increment)
graph.add_node("greet", greet)

# 4. 添加边
graph.add_edge(START, "increment")
graph.add_edge("increment", "greet")
graph.add_edge("greet", END)

# 5. 编译并运行
app = graph.compile()
result = app.invoke({"count": 0, "message": ""})
print(result)  # {'count': 1, 'message': '计数: 1'}

二、状态管理

2.1 状态定义

from typing import TypedDict, Annotated
from langgraph.graph import add_messages
from langchain_core.messages import BaseMessage

# 基础状态定义
class BasicState(TypedDict):
    count: int
    name: str
    data: list[str]

# 带消息的状态(对话场景)
class ChatState(TypedDict):
    # add_messages 会自动合并消息列表
    messages: Annotated[list[BaseMessage], add_messages]
    user_id: str

# 嵌套状态
class NestedState(TypedDict):
    user: dict[str, str]
    session: dict[str, int]
    context: dict[str, list]

2.2 状态更新

from typing import TypedDict
from langgraph.graph import StateGraph

class State(TypedDict):
    count: int
    items: list[str]

def node_a(state: State) -> dict:
    # 返回要更新的字段(部分更新)
    return {"count": state["count"] + 1}

def node_b(state: State) -> dict:
    # 可以返回多个字段
    return {
        "count": state["count"] * 2,
        "items": state["items"] + ["new_item"],
    }

# 状态更新规则:
# 1. 返回的字典会与原状态合并
# 2. 未返回的字段保持不变
# 3. 列表类型默认替换,可用 Annotated 自定义合并

2.3 Reducer 函数

from typing import TypedDict, Annotated
from operator import add

# 自定义 Reducer
def merge_dicts(left: dict, right: dict) -> dict:
    """合并字典。"""
    return {**left, **right}

class State(TypedDict):
    # 使用 add 合并列表(而非替换)
    items: Annotated[list[str], add]
    # 自定义合并函数
    metadata: Annotated[dict, merge_dicts]
    # 普通字段(替换)
    count: int

# 示例
def add_item(state: State) -> dict:
    return {"items": ["new_item"]}  # 会追加到现有列表

def update_meta(state: State) -> dict:
    return {"metadata": {"key": "value"}}  # 会合并到现有字典

三、节点与边

3.1 节点类型

from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    value: int

# 1. 函数节点
def function_node(state: State) -> dict:
    return {"value": state["value"] + 1}

# 2. Runnable 节点
from langchain_core.runnables import RunnableLambda

runnable_node = RunnableLambda(
    lambda state: {"value": state["value"] * 2}
)

# 添加节点
graph = StateGraph(State)
graph.add_node("func", function_node)
graph.add_node("runnable", runnable_node)

3.2 边的类型

边类型

普通边
直接连接

条件边
动态路由

循环边
回到之前节点

START

END

from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    value: int
    status: str

graph = StateGraph(State)

# 1. 普通边
graph.add_edge(START, "node_a")
graph.add_edge("node_a", "node_b")

# 2. 条件边
def route_by_value(state: State) -> Literal["high", "low"]:
    if state["value"] > 50:
        return "high"
    return "low"

graph.add_conditional_edges(
    "node_b",
    route_by_value,
    {
        "high": "high_handler",
        "low": "low_handler",
    }
)

# 3. 循环边(回到之前的节点)
graph.add_edge("high_handler", "node_a")  # 循环
graph.add_edge("low_handler", END)

3.3 完整示例

from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    count: int
    total: int
    status: str

def increment(state: State) -> dict:
    return {"count": state["count"] + 1}

def add_to_total(state: State) -> dict:
    return {"total": state["total"] + state["count"]}

def should_continue(state: State) -> Literal["continue", "finish"]:
    if state["count"] < 5:
        return "continue"
    return "finish"

def finish(state: State) -> dict:
    return {"status": "completed"}

# 构建图
graph = StateGraph(State)

# 添加节点
graph.add_node("increment", increment)
graph.add_node("add_to_total", add_to_total)
graph.add_node("finish", finish)

# 添加边
graph.add_edge(START, "increment")
graph.add_edge("increment", "add_to_total")

# 条件边(循环)
graph.add_conditional_edges(
    "add_to_total",
    should_continue,
    {
        "continue": "increment",  # 循环
        "finish": "finish",
    }
)

graph.add_edge("finish", END)

# 编译运行
app = graph.compile()
result = app.invoke({"count": 0, "total": 0, "status": ""})
print(result)
# {'count': 5, 'total': 15, 'status': 'completed'}

四、条件路由

4.1 基本条件路由

from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    input: str
    category: str
    result: str

def classify(state: State) -> dict:
    text = state["input"].lower()
    if "代码" in text or "编程" in text:
        return {"category": "tech"}
    elif "天气" in text:
        return {"category": "weather"}
    else:
        return {"category": "general"}

def tech_handler(state: State) -> dict:
    return {"result": "技术问题回答"}

def weather_handler(state: State) -> dict:
    return {"result": "天气信息"}

def general_handler(state: State) -> dict:
    return {"result": "通用回答"}

def route(state: State) -> Literal["tech", "weather", "general"]:
    return state["category"]

# 构建图
graph = StateGraph(State)
graph.add_node("classify", classify)
graph.add_node("tech", tech_handler)
graph.add_node("weather", weather_handler)
graph.add_node("general", general_handler)

graph.add_edge(START, "classify")
graph.add_conditional_edges(
    "classify",
    route,
    {
        "tech": "tech",
        "weather": "weather",
        "general": "general",
    }
)
graph.add_edge("tech", END)
graph.add_edge("weather", END)
graph.add_edge("general", END)

app = graph.compile()

4.2 多条件组合

from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    user_type: str  # "vip", "normal", "guest"
    request_type: str  # "query", "write", "admin"
    result: str

def check_permission(state: State) -> Literal["allowed", "denied", "upgrade"]:
    user = state["user_type"]
    request = state["request_type"]
  
    if request == "admin":
        if user == "vip":
            return "allowed"
        else:
            return "denied"
    elif request == "write":
        if user in ("vip", "normal"):
            return "allowed"
        else:
            return "upgrade"
    else:  # query
        return "allowed"

def handle_allowed(state: State) -> dict:
    return {"result": "操作已执行"}

def handle_denied(state: State) -> dict:
    return {"result": "权限不足"}

def handle_upgrade(state: State) -> dict:
    return {"result": "请升级账户"}

# 构建图
graph = StateGraph(State)
graph.add_node("allowed", handle_allowed)
graph.add_node("denied", handle_denied)
graph.add_node("upgrade", handle_upgrade)

graph.add_conditional_edges(
    START,
    check_permission,
    {
        "allowed": "allowed",
        "denied": "denied",
        "upgrade": "upgrade",
    }
)

graph.add_edge("allowed", END)
graph.add_edge("denied", END)
graph.add_edge("upgrade", END)

五、工具调用集成

5.1 工具节点

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import BaseMessage

# 定义工具
@tool
def search(query: str) -> str:
    """搜索网络信息。"""
    return f"搜索结果: {query}"

@tool
def calculator(expression: str) -> str:
    """计算数学表达式。"""
    return str(eval(expression))

tools = [search, calculator]

# 定义状态
class State(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]

# 创建模型和工具节点
llm = ChatOpenAI(model="gpt-4o").bind_tools(tools)
tool_node = ToolNode(tools)

# 定义节点函数
def chatbot(state: State) -> dict:
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

def should_continue(state: State) -> str:
    messages = state["messages"]
    last_message = messages[-1]
    if last_message.tool_calls:
        return "tools"
    return END

# 构建图
from langgraph.graph.message import add_messages

graph = StateGraph(State)
graph.add_node("chatbot", chatbot)
graph.add_node("tools", tool_node)

graph.add_edge(START, "chatbot")
graph.add_conditional_edges("chatbot", should_continue)
graph.add_edge("tools", "chatbot")

app = graph.compile()

5.2 ReAct Agent 模式

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import BaseMessage

# 定义工具
@tool
def get_weather(city: str) -> str:
    """获取城市天气。"""
    return f"{city} 天气晴,温度 25°C"

@tool
def get_time(timezone: str) -> str:
    """获取时区时间。"""
    return f"当前时间: 14:00 ({timezone})"

tools = [get_weather, get_time]

# 使用预构建的 ReAct Agent
llm = ChatOpenAI(model="gpt-4o")
app = create_react_agent(llm, tools)

# 运行
result = app.invoke({
    "messages": [{"role": "user", "content": "北京天气怎么样?现在几点了?"}]
})

六、人机协作

6.1 断点与恢复

from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

class State(TypedDict):
    input: str
    approved: bool
    result: str

def process(state: State) -> dict:
    return {"result": f"处理: {state['input']}"}

def wait_for_approval(state: State) -> dict:
    # 这个节点会等待人工批准
    return {"approved": False}

# 构建图
graph = StateGraph(State)
graph.add_node("process", process)
graph.add_node("wait_approval", wait_for_approval)

graph.add_edge(START, "wait_approval")
graph.add_conditional_edges(
    "wait_approval",
    lambda s: "approved" if s["approved"] else END,
    {"approved": "process", END: END}
)
graph.add_edge("process", END)

# 添加检查点(支持暂停/恢复)
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)

# 第一次运行(会暂停在 wait_approval)
config = {"configurable": {"thread_id": "user_123"}}
result1 = app.invoke({"input": "test", "approved": False, "result": ""}, config)
print(result1)  # {'input': 'test', 'approved': False, 'result': ''}

# 恢复并提供批准
result2 = app.invoke({"input": "test", "approved": True, "result": ""}, config)
print(result2)  # {'input': 'test', 'approved': True, 'result': '处理: test'}

6.2 人工审核节点

from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import interrupt

class State(TypedDict):
    draft: str
    approved: bool
    final: str

def generate_draft(state: State) -> dict:
    return {"draft": "这是生成的草稿内容..."}

def human_review(state: State) -> dict:
    # 暂停等待人工输入
    user_input = interrupt("请审核草稿,输入 'approve' 批准或提供修改意见:")
  
    if user_input == "approve":
        return {"approved": True}
    else:
        return {
            "approved": False,
            "draft": f"{state['draft']}\n修改意见: {user_input}",
        }

def finalize(state: State) -> dict:
    return {"final": state["draft"]}

# 构建图
graph = StateGraph(State)
graph.add_node("generate", generate_draft)
graph.add_node("review", human_review)
graph.add_node("finalize", finalize)

graph.add_edge(START, "generate")
graph.add_edge("generate", "review")
graph.add_conditional_edges(
    "review",
    lambda s: "approved" if s["approved"] else "generate",
    {"approved": "finalize", "generate": "generate"}
)
graph.add_edge("finalize", END)

checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)

七、持久化与恢复

7.1 检查点机制

from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3

class State(TypedDict):
    count: int
    history: list[str]

def increment(state: State) -> dict:
    return {
        "count": state["count"] + 1,
        "history": state["history"] + [f"Step {state['count'] + 1}"],
    }

# 内存检查点
memory_checkpointer = MemorySaver()

# SQLite 持久化
conn = sqlite3.connect("checkpoints.db")
sqlite_checkpointer = SqliteSaver(conn)

# 构建图
graph = StateGraph(State)
graph.add_node("increment", increment)
graph.add_edge(START, "increment")
graph.add_edge("increment", END)

# 使用持久化检查点编译
app = graph.compile(checkpointer=sqlite_checkpointer)

# 运行(状态会被持久化)
config = {"configurable": {"thread_id": "session_001"}}
result = app.invoke({"count": 0, "history": []}, config)

7.2 恢复执行

from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

class State(TypedDict):
    step: int
    data: str

def step_a(state: State) -> dict:
    return {"step": 1, "data": "A 完成"}

def step_b(state: State) -> dict:
    return {"step": 2, "data": state["data"] + " -> B 完成"}

def step_c(state: State) -> dict:
    return {"step": 3, "data": state["data"] + " -> C 完成"}

graph = StateGraph(State)
graph.add_node("a", step_a)
graph.add_node("b", step_b)
graph.add_node("c", step_c)

graph.add_edge(START, "a")
graph.add_edge("a", "b")
graph.add_edge("b", "c")
graph.add_edge("c", END)

checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)

# 运行
config = {"configurable": {"thread_id": "test"}}
result = app.invoke({"step": 0, "data": ""}, config)

# 查看历史状态
history = list(app.get_state_history(config))
for state in history:
    print(f"Step: {state.values['step']}, Data: {state.values['data']}")

# 恢复到特定状态
app.update_state(config, {"step": 1, "data": "A 完成(恢复)"})

八、可视化与调试

8.1 图可视化

from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    value: int

def node_a(state: State) -> dict:
    return {"value": state["value"] + 1}

def node_b(state: State) -> dict:
    return {"value": state["value"] * 2}

graph = StateGraph(State)
graph.add_node("a", node_a)
graph.add_node("b", node_b)
graph.add_edge(START, "a")
graph.add_edge("a", "b")
graph.add_edge("b", END)

app = graph.compile()

# 获取 Mermaid 图
mermaid = app.get_graph().draw_mermaid()
print(mermaid)

# 保存为图片(需要安装 graphviz)
# app.get_graph().draw_mermaid_png(output_file_path="graph.png")

8.2 执行追踪

from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

class State(TypedDict):
    value: int
    logs: list[str]

def node_a(state: State) -> dict:
    return {
        "value": state["value"] + 1,
        "logs": state["logs"] + ["执行了 node_a"],
    }

def node_b(state: State) -> dict:
    return {
        "value": state["value"] * 2,
        "logs": state["logs"] + ["执行了 node_b"],
    }

graph = StateGraph(State)
graph.add_node("a", node_a)
graph.add_node("b", node_b)
graph.add_edge(START, "a")
graph.add_edge("a", "b")
graph.add_edge("b", END)

checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)

# 运行
config = {"configurable": {"thread_id": "debug"}}
result = app.invoke({"value": 0, "logs": []}, config)

# 查看执行历史
for state in app.get_state_history(config):
    print(f"Value: {state.values['value']}")
    print(f"Logs: {state.values['logs']}")
    print(f"Next: {state.next}")
    print("---")

九、多代理协作

9.1 主从模式

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import BaseMessage
from langchain_openai import ChatOpenAI

class State(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    next_agent: str

llm = ChatOpenAI(model="gpt-4o")

def researcher(state: State) -> dict:
    response = llm.invoke([
        {"role": "system", "content": "你是研究员,负责收集信息。"},
        *state["messages"],
    ])
    return {
        "messages": [response],
        "next_agent": "writer",
    }

def writer(state: State) -> dict:
    response = llm.invoke([
        {"role": "system", "content": "你是作家,负责撰写内容。"},
        *state["messages"],
    ])
    return {
        "messages": [response],
        "next_agent": "reviewer",
    }

def reviewer(state: State) -> dict:
    response = llm.invoke([
        {"role": "system", "content": "你是审核员,负责检查质量。"},
        *state["messages"],
    ])
    return {
        "messages": [response],
        "next_agent": END,
    }

def route(state: State) -> str:
    return state["next_agent"]

# 构建图
graph = StateGraph(State)
graph.add_node("researcher", researcher)
graph.add_node("writer", writer)
graph.add_node("reviewer", reviewer)

graph.add_edge(START, "researcher")
graph.add_conditional_edges("researcher", route)
graph.add_conditional_edges("writer", route)
graph.add_conditional_edges("reviewer", route)

app = graph.compile()

9.2 层级图

from typing import TypedDict
from langgraph.graph import StateGraph, START, END

# 子图
class SubState(TypedDict):
    value: int

def sub_node_a(state: SubState) -> dict:
    return {"value": state["value"] + 10}

def sub_node_b(state: SubState) -> dict:
    return {"value": state["value"] * 2}

sub_graph = StateGraph(SubState)
sub_graph.add_node("a", sub_node_a)
sub_graph.add_node("b", sub_node_b)
sub_graph.add_edge(START, "a")
sub_graph.add_edge("a", "b")
sub_graph.add_edge("b", END)
sub_app = sub_graph.compile()

# 主图
class MainState(TypedDict):
    value: int
    processed: bool

def main_node(state: MainState) -> dict:
    # 调用子图
    result = sub_app.invoke({"value": state["value"]})
    return {
        "value": result["value"],
        "processed": True,
    }

main_graph = StateGraph(MainState)
main_graph.add_node("process", main_node)
main_graph.add_edge(START, "process")
main_graph.add_edge("process", END)

app = main_graph.compile()
result = app.invoke({"value": 5, "processed": False})
# result: {'value': 30, 'processed': True}

十、最佳实践

10.1 设计原则

原则 说明
状态最小化 只存储必要的状态字段
节点单一职责 每个节点只做一件事
明确路由条件 条件函数返回值要明确
使用检查点 生产环境启用持久化
可视化验证 开发时查看图结构

10.2 错误处理

from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    value: int
    error: str | None

def risky_operation(state: State) -> dict:
    try:
        result = 100 / state["value"]
        return {"value": result, "error": None}
    except Exception as e:
        return {"error": str(e)}

def handle_error(state: State) -> dict:
    return {"value": 0, "error": None}

def check_error(state: State) -> str:
    if state["error"]:
        return "error"
    return "success"

graph = StateGraph(State)
graph.add_node("operation", risky_operation)
graph.add_node("error_handler", handle_error)

graph.add_edge(START, "operation")
graph.add_conditional_edges(
    "operation",
    check_error,
    {"error": "error_handler", "success": END}
)
graph.add_edge("error_handler", END)

app = graph.compile()

10.3 性能优化

from typing import TypedDict
from langgraph.graph import StateGraph, START, END
import asyncio

class State(TypedDict):
    results: list[str]

async def parallel_task(name: str) -> str:
    await asyncio.sleep(0.1)
    return f"{name} 完成"

async def parallel_node(state: State) -> dict:
    # 并行执行多个任务
    results = await asyncio.gather(
        parallel_task("A"),
        parallel_task("B"),
        parallel_task("C"),
    )
    return {"results": list(results)}

graph = StateGraph(State)
graph.add_node("parallel", parallel_node)
graph.add_edge(START, "parallel")
graph.add_edge("parallel", END)

app = graph.compile()

# 异步运行
import asyncio
result = asyncio.run(app.ainvoke({"results": []}))

十一、总结

11.1 LangGraph vs 传统 Agent

方面 传统 Agent LangGraph
状态管理 无/手动 内置 TypedDict
控制流 线性/简单循环 图结构、复杂路由
持久化 需自行实现 内置检查点机制
可观测性 有限 完整的历史追踪
人机协作 困难 原生支持断点恢复
多代理 复杂 层级图支持

11.2 架构图

输出层

最终结果

执行历史

可视化

执行层

同步执行

异步执行

流式输出

LangGraph 核心

状态 State

节点 Nodes

边 Edges

检查点 Checkpoint

输入层

用户请求

配置信息


参考资料

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐