目录

引言:为什么需要 LangGraph?

一、LangGraph 核心概念再理解

1.1 State(状态):系统的“全局白板”

1.2 Node(节点):单一职责的执行单元

1.3 Edge(边):定义执行路径

1.4 Conditional Edge(条件边):动态路由的核心

二、多智能体协作:从单兵到军团

2.1 状态总线(State Bus)模式

2.2 子图(Sub-graph):逻辑隔离的黑盒

2.3 冲突处理:解决并发的“竞态条件”

三、上下文工程:从全量输入到精细化管理

3.1 动态裁剪(Dynamic Trimming)

3.2 状态摘要(Summarization Node)

3.3 节点级视图过滤

四、生产环境增强特性

4.1 持久化与断点续跑

4.2 错误处理与重试机制

4.3 节点缓存

五、完整实战:构建企业级客服智能体

5.1 系统架构设计

5.2 核心代码实现

六、性能对比与选型建议

结语:从链到图的范式转移


引言:为什么需要 LangGraph?

2026年,智能体开发正在经历从“链式驱动”到“图驱动”的范式变革。传统的 LangChain 链式调用在面对复杂业务场景时,逐渐暴露出三大瓶颈:

  • 状态管理碎片化:对话历史、工具调用记录分散在多个变量中,难以维护

  • 流程控制复杂:多轮工具调用需嵌套循环,错误处理代码臃肿

  • 扩展性差:新增工具需修改核心逻辑,难以支持人工干预等场景

LangGraph 的解决方案:用有向图模型重构智能体工作流,将 LLM 调用、工具执行等模块抽象为节点,通过条件边实现动态跳转。其核心优势包括:

✅ 循环图支持多轮思考与行动
✅ 状态持久化实现断点续跑
✅ 可视化调试降低维护成本
✅ 原生支持多智能体协作

一、LangGraph 核心概念再理解

1.1 State(状态):系统的“全局白板”

State 是整个 LangGraph 能够顺利运行的核心。它本质上是一个 TypedDict 或 Pydantic 模型,代表当前执行到这一步时系统所拥有的全部信息。

from typing import TypedDict, Annotated
import operator

class AgentState(TypedDict):
    messages: Annotated[list, operator.add]      # 消息自动累积
    user_preferences: dict                         # 用户偏好设置
    intermediate_steps: Annotated[list, custom_merge] # 自定义合并策略
    next_agent_instruction: str                    # 下一个Agent指令
    last_error: Annotated[str, lambda old, new: new] # 只保留最新错误

进阶技巧:通过 Annotated 元数据声明状态合并策略,可以实现:

  • operator.add:列表自动拼接(默认)

  • 自定义函数:实现消息更新替换等高级逻辑

1.2 Node(节点):单一职责的执行单元

每个 Node 本质上是一个函数,输入当前全局状态,返回修改后的状态子集。

def retrieve_node(state: AgentState) -> AgentState:
    """检索节点:根据查询获取相关文档"""
    query = state["messages"][-1].content
    docs = vector_store.similarity_search(query, k=3)
    return {"documents": docs}

def model_node(state: AgentState) -> AgentState:
    """模型节点:生成回答"""
    messages = state["messages"] + [SystemMessage(content="你是一个AI助手")]
    response = llm.invoke(messages)
    return {"messages": [response]}

设计原则

  • 单一职责:每个节点只负责一项职责

  • 无状态设计:节点本身不保存状态,所有数据通过状态传递

  • 幂等性:相同的输入应产生相同的输出

1.3 Edge(边):定义执行路径

Edge 将节点连接起来,形成完整的执行流程图。

from langgraph.graph import StateGraph, START, END

graph = StateGraph(AgentState)
graph.add_node("retrieve", retrieve_node)
graph.add_node("model", model_node)
graph.add_edge(START, "retrieve")
graph.add_edge("retrieve", "model")
graph.add_edge("model", END)

1.4 Conditional Edge(条件边):动态路由的核心

def route_based_on_tools(state: AgentState) -> str:
    """检测是否需要调用工具"""
    last_message = state["messages"][-1]
    if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
        return "tools"
    return "respond"

graph.add_conditional_edges(
    "agent",
    route_based_on_tools,
    {
        "tools": "execute_tools",
        "respond": "generate_response"
    }
)

二、多智能体协作:从单兵到军团

2.1 状态总线(State Bus)模式

在多智能体系统中,信息同步的成败取决于你如何管理那个共享的 State。LangGraph 的 State 本质上是一个分布式状态总线。

class TeamState(TypedDict):
    # 共享记忆:所有Agent都能看到并追加对话历史
    messages: Annotated[Sequence[BaseMessage], operator.add]
    
    # 任务成果:Agent A 写入草稿,Agent B 读取并修改
    document_draft: str
    
    # 专家私有指令:仅由 Supervisor 写入,引导下一个Agent执行
    next_agent_instruction: str

2.2 子图(Sub-graph):逻辑隔离的黑盒

当执行如“深度搜索”等任务时,子智能体可能会翻阅数十篇网页,产生数万字的原始素材。如果这些“过程垃圾”全部存入父图的消息列表,主智能体会因为看到太多噪音而“溺水”。

class ResearchSubState(TypedDict):
    query: str
    raw_web_pages: List[str]  # 海量原始数据,只在子图内部流转
    summary: str               # 最终要上报给父图的精简结论

def call_researcher_expert(state: ParentState):
    """父图节点:像调用函数一样调用子图"""
    # 输入映射:只给子图需要的最小化信息
    child_input = {"query": state["current_query"]}
    
    # 执行子图逻辑(子图内部的50步循环对父图完全透明)
    child_output = research_subgraph.invoke(child_input)
    
    # 输出映射:仅将精炼后的摘要同步回父图
    return {"research_summary": child_output["summary"]}

2.3 冲突处理:解决并发的“竞态条件”

当图发生分支且多个节点并行运行时,它们可能同时尝试更新同一个 State 字段。LangGraph 通过 Reducer 来定义合并规则。

def merge_product_info(old_val: dict, new_val: dict) -> dict:
    """自定义Reducer:实现字典的智能合并,而非简单覆盖"""
    merged = old_val.copy() if old_val else {}
    for key, value in new_val.items():
        if key in merged and merged[key] != value:
            # 生产级策略:记录冲突或保留置信度更高的值
            merged[f"conflict_{key}"] = value
        else:
            merged[key] = value
    return merged

class ParallelState(TypedDict):
    product_metadata: Annotated[dict, merge_product_info]

三、上下文工程:从全量输入到精细化管理

3.1 动态裁剪(Dynamic Trimming)

随着交互轮数增加,对话历史会呈指数级增长,引发“Lost in the Middle”现象。

from langchain_core.messages import trim_messages

trimmer = trim_messages(
    max_tokens=2000,
    strategy="last",
    token_counter=lambda msgs: sum(len(m.content or "") for m in msgs),
    start_on="human",          # 从用户消息开始保留
    include_system=True        # 必须保留系统提示
)

def call_model_with_trim(state: AgentState):
    trimmed_messages = trimmer.invoke(state["messages"])
    response = llm.invoke(trimmed_messages)
    return {"messages": [response]}

3.2 状态摘要(Summarization Node)

def summarize_conversation(state: AgentState):
    """当消息过长时进行总结"""
    messages = state["messages"]
    if len(messages) > 20:
        summary = state.get("summary", "")
        summary_prompt = f"现有摘要: {summary}\n新消息: {messages}\n请更新摘要。"
        new_summary = llm.invoke(summary_prompt).content
        return {"summary": new_summary, "messages": messages[-2:]}
    return {}

3.3 节点级视图过滤

给节点它完成工作所必须的数据投影,防止注意力分散和泄露敏感信息。

def expert_node(state: AgentState):
    """专家节点只需要任务背景,不需要其他人的中间过程"""
    node_view = {
        "task": state["document_draft"],
        "instructions": state["next_agent_instruction"]
    }
    response = llm.invoke(f"根据任务 {node_view['task']} 执行操作")
    return {"expert_result": response}

四、生产环境增强特性

4.1 持久化与断点续跑

from langgraph.checkpoint.memory import InMemorySaver

# 使用检查点保存状态
checkpointer = InMemorySaver()
graph = graph.compile(checkpointer=checkpointer)

# 使用同一个 thread_id 实现跨会话记忆
config = {"configurable": {"thread_id": "user-123"}}

# 第一轮对话
result1 = graph.invoke({"messages": [HumanMessage(content="你好")]}, config=config)

# 第二轮对话(保留上下文)
result2 = graph.invoke({"messages": [HumanMessage(content="我刚才说了什么")]}, config=config)

4.2 错误处理与重试机制

from langgraph.pregel import RetryPolicy

retry_policy = RetryPolicy(
    max_attempts=3,               # 最大重试次数
    initial_interval=1,           # 初始间隔
    jitter=True,                   # 避免重试风暴
    backoff_factor=2,              # 退避乘数
    retry_on=[RequestException, Timeout]  # 只重试这些异常
)

graph.add_node("unreliable_api", call_api, retry=retry_policy)

4.3 节点缓存

from langgraph.cache import InMemoryCache, CachePolicy

def expensive_embedding(state):
    # 耗时的嵌入计算
    return {"embeddings": result}

graph.add_node(
    "embedding",
    expensive_embedding,
    cache_policy=CachePolicy(ttl=3600)  # 缓存1小时
)
graph.compile(cache=InMemoryCache())

五、完整实战:构建企业级客服智能体

5.1 系统架构设计

5.2 核心代码实现

from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
import operator

class CustomerServiceState(TypedDict):
    messages: Annotated[list, operator.add]
    intent: str
    order_info: dict
    product_info: dict
    final_response: str

def intent_classifier(state: CustomerServiceState) -> dict:
    query = state["messages"][-1].content
    prompt = f"请判断用户意图:{query},只输出'order'或'product'"
    intent = llm.invoke(prompt).content.strip()
    return {"intent": intent}

def router(state: CustomerServiceState) -> Literal["order", "product"]:
    return state["intent"]

def order_handler(state: CustomerServiceState) -> dict:
    # 调用订单API
    order_id = extract_order_id(state["messages"][-1].content)
    order_info = query_order_api(order_id)
    return {"order_info": order_info}

def product_handler(state: CustomerServiceState) -> dict:
    # 检索商品知识库
    product_name = extract_product(state["messages"][-1].content)
    docs = knowledge_base.search(product_name)
    return {"product_info": docs}

def response_merger(state: CustomerServiceState) -> dict:
    if state.get("order_info"):
        response = f"订单信息:{state['order_info']}"
    elif state.get("product_info"):
        response = f"商品信息:{state['product_info']}"
    else:
        response = "抱歉,未能理解您的需求"
    return {"final_response": response}

# 构建图
builder = StateGraph(CustomerServiceState)
builder.add_node("classifier", intent_classifier)
builder.add_node("order", order_handler)
builder.add_node("product", product_handler)
builder.add_node("merger", response_merger)

builder.add_edge(START, "classifier")
builder.add_conditional_edges("classifier", router)
builder.add_edge("order", "merger")
builder.add_edge("product", "merger")
builder.add_edge("merger", END)

# 编译并执行
graph = builder.compile()
result = graph.invoke({
    "messages": [HumanMessage(content="查询订单123456的状态")]
})
print(result["final_response"])

六、性能对比与选型建议

指标 传统手写Agent LangGraph
工具扩展成本 高(需改核心逻辑) 低(增删节点)
多轮对话支持 循环嵌套复杂 原生支持
状态追溯 不可追溯 完整快照
开发效率 200+行代码 50行内实现
并行执行 难以实现 原生支持
人工干预 需额外编码 节点级支持

结语:从链到图的范式转移

如果说传统 Chain 是线性的能力组合,那么 StateGraph 就是有状态的全局控制流机;如果说 Chain 是工具驱动流程片段,Graph 就是工程级的智能协同平台。

在智能体需求越来越复杂的今天,单靠 Chain 已无法应对多步骤决策、逻辑分支和记忆维护。LangGraph 的图式设计正是为可控、可扩展、可观测的智能体而生。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐