深入理解 LangGraph:构建复杂AI应用的Graph图框架 - Agent框架底层的压舱石
在当今的 AI 应用开发中,我们经常面临多步骤、有状态、需要记忆的复杂场景。传统的线性处理方式已无法满足这些需求,这就催生了 LangGraph 框架的诞生。LangGraph 是一个专门用于构建状态化、多步骤 AI 应用的 Python 库,它基于 Pregel 计算模型,提供了构建复杂 AI 工作流的高级抽象。
一、什么是 LangGraph?
LangGraph 是一个用于构建状态化、多步骤 AI 应用的框架。与传统的单次调用 AI 模型不同,LangGraph 允许我们创建可以记住历史、做出决策、执行多个步骤并根据结果调整行为的复杂 AI 系统。
LangGraph 的核心特点包括:
- 状态化: 保持执行过程中的状态信息
- 多步骤: 支持复杂的多步骤工作流
- 可记忆: 通过检查点机制实现"短期记忆"
- 可中断: 支持人工干预和决策
- 可恢复: 支持暂停和恢复执行
二、为什么需要 LangGraph?
传统 AI 应用的局限性
传统的 AI 应用通常采用单次调用模式:输入 → AI 模型 → 输出。这种模式在以下场景中存在局限性:
- 多步骤推理: 需要多次调用 AI 模型进行推理
- 状态管理: 需要记住历史交互信息
- 循环处理: 需要根据结果决定是否继续
- 人工干预: 需要在某些节点等待人工确认

LangGraph 解决的问题
LangGraph 通过图结构解决了上述问题:
- 图结构: 使用节点和边定义复杂的执行流程
- 状态管理: 通过状态对象维护执行过程中的数据
- 条件分支: 根据条件动态选择执行路径
- 检查点: 持久化状态,支持恢复和重放

三、LangGraph 的核心概念
1. Pregel 模型
LangGraph 基于 Pregel 计算模型,这是一种用于大规模图计算的编程模型。Pregel 模型采用 Bulk Synchronous Parallel (BSP) 范式,将计算过程分为多个超步(Superstep):

每个超步包含三个阶段:
- 计划阶段: 确定此步骤中要执行的节点
- 执行阶段: 并行执行选定的节点
- 更新阶段: 更新通道中的值
2. StateGraph
StateGraph 是 LangGraph 中用于创建状态化工作流的主要抽象。它由以下几个核心组件构成:
- 状态(State): 定义图中共享的数据结构
- 节点(Node): 执行特定功能的函数或可运行对象
- 边(Edge): 定义节点间的执行顺序
- 条件边(Conditional Edge): 基于条件动态选择下一个节点

from typing import TypedDictfrom langgraph.graph import StateGraph, START, ENDclass State(TypedDict): value: intdef increment(state):return {"value": state["value"] + 1}def multiply_by_two(state):return {"value": state["value"] * 2}# 创建图builder = StateGraph(State)builder.add_node("increment", increment)builder.add_node("multiply", multiply_by_two)builder.add_edge(START, "increment")builder.add_edge("increment", "multiply")builder.add_edge("multiply", END)graph = builder.compile()# 执行图result = graph.invoke({"value": 1})print(result) # 输出: {'value': 4}
3. Channels(通道)
Channels 是节点间通信的主要机制,定义了数据如何在图中流动。主要类型包括:
- LastValue: 存储通道的最后值,适用于需要单一值的状态字段
- Topic: 发布-订阅通道,可累积多个值,适用于消息列表等场景
- BinaryOperatorAggregate: 使用二元操作符聚合值,适用于数值累加等场景

from typing import TypedDictfrom typing_extensions import Annotatedimport operatorclass State(TypedDict):# 使用 reducer 累积消息 messages: Annotated[list, operator.add]# 使用 reducer 累积总数 total: Annotated[int, operator.add]
4. Checkpoints(检查点)
检查点系统提供持久化状态的能力,支持暂停、恢复和重放:

from langgraph.checkpoint.memory import MemorySaver# 创建检查点保存器memory = MemorySaver()graph = builder.compile(checkpointer=memory)# 执行图并持久化状态config = {"configurable": {"thread_id": "1"}}result = graph.invoke({"value": 0}, config)
5. Interrupts(中断)
中断机制允许在图执行过程中暂停,以便进行人工干预:

from langgraph.types import interruptdef node_with_interrupt(state):# 中断并等待用户输入 user_input = interrupt("请输入一个数字:")return {"user_input": str(user_input)}
四、LangGraph 的应用场景
1. AI 代理系统
AI 代理通常需要多步骤推理、工具调用和状态管理:

from langgraph.prebuilt import create_react_agentfrom langchain_openai import ChatOpenAIfrom langchain.tools import tool@tooldef get_weather(location: str) -> str:"""获取指定位置的天气"""return f"{location}的天气是晴朗的,温度22°C"tools = [get_weather]model = ChatOpenAI(model="gpt-3.5-turbo")agent = create_react_agent(model, tools)result = agent.invoke({"messages": [("user", "北京的天气如何?")]})
2. 多步骤工作流
复杂的业务逻辑需要多个步骤协同完成:

from typing import TypedDict, Literalfrom typing_extensions import Annotatedfrom langgraph.graph import StateGraph, START, ENDimport operatorclass Task(TypedDict): id: str description: str status: Literal["pending", "in_progress", "completed", "failed"] created_at: str completed_at: str | Noneclass TaskManagerState(TypedDict): tasks: Annotated[list[Task], operator.add] current_task_id: str | None completed_tasks: Annotated[list[Task], operator.add]def add_task(state): new_task = {"id": f"task_{len(state['tasks']) + 1}","description": f"Sample task {len(state['tasks']) + 1}","status": "pending","created_at": datetime.now().isoformat(),"completed_at": None }return {"tasks": [new_task]}def process_task(state): pending_tasks = [t for t in state["tasks"] if t["status"] == "pending"]ifnot pending_tasks:return {"current_task_id": None} task = pending_tasks[0]# 模拟任务处理 updated_task = {**task, "status": "completed", "completed_at": datetime.now().isoformat()}return {"current_task_id": task["id"],"completed_tasks": [updated_task] }def should_continue(state): pending_tasks = [t for t in state["tasks"] if t["status"] == "pending"]return"process"if pending_tasks else ENDbuilder = StateGraph(TaskManagerState)builder.add_node("add", add_task)builder.add_node("process", process_task)builder.add_conditional_edges(START, lambda s: "add")builder.add_conditional_edges("add", should_continue, ["process", END])builder.add_edge("process", "add")graph = builder.compile()
3. 数据处理流水线
构建复杂的数据处理流水线:

class DataPipelineState(TypedDict): raw_data: list processed_data: Annotated[list, operator.add] validation_errors: Annotated[list, operator.add] pipeline_status: strdef extract_data(state):"""从源提取数据""" raw_data = [ {"id": 1, "name": "Alice", "email": "alice@example.com"}, {"id": 2, "name": "Bob", "email": "invalid-email"}, {"id": 3, "name": "Charlie", "email": "charlie@example.com"} ]return {"raw_data": raw_data, "pipeline_status": "extracted"}def validate_data(state):"""验证数据""" errors = [] valid_data = []for item in state["raw_data"]:if"@"in item["email"] and"."in item["email"]: valid_data.append(item)else: errors.append(f"Invalid email for {item['name']}: {item['email']}") result = {"processed_data": valid_data}if errors: result["validation_errors"] = errorsreturn resultdef transform_data(state):"""转换数据格式""" transformed = []for item in state["processed_data"]: transformed.append({"user_id": item["id"],"full_name": item["name"].upper(),"contact": item["email"] })return {"processed_data": transformed, "pipeline_status": "completed"}builder = StateGraph(DataPipelineState)builder.add_node("extract", extract_data)builder.add_node("validate", validate_data)builder.add_node("transform", transform_data)builder.add_edge(START, "extract")builder.add_edge("extract", "validate")builder.add_edge("validate", "transform")builder.add_conditional_edges("transform", lambda s: END if s["pipeline_status"] == "completed"else"transform")graph = builder.compile()
五、高级特性
1. 条件分支
根据条件动态选择执行路径:

def should_continue(state):if state["value"] < 5:return"increment"else:return ENDdef increment(state):return {"value": state["value"] + 1}builder = StateGraph(State)builder.add_node("increment", increment)builder.add_conditional_edges(START, should_continue, ["increment", END])builder.add_edge("increment", START)graph = builder.compile()Result = graph.invoke({"value": 1})print(result) # 输出: {'value': 5}
2. 并行处理
使用 Send API 实现并行处理:

from langgraph.types import Senddef distribute_work(state):# 为每个项目创建任务return [Send("process_item", {"item": item}) for item in state["items"]]def should_continue(state):return"square"if len(state["results"]) < len(state["numbers"]) else END
3. 错误处理和重试
内置的错误处理和重试机制:

from langgraph.types import RetryPolicy# 定义重试策略retry_policy = RetryPolicy( initial_interval=0.5, backoff_factor=2.0, max_attempts=5, retry_on=Exception)builder.add_node("operation", unreliable_operation, retry_policy=retry_policy)
4. 流式处理
支持多种流式输出模式:

# 流式输出每个步骤的更新for chunk in graph.stream({"value": 1}, stream_mode="updates"): print(chunk)# 流式输出最终值for chunk in graph.stream({"value": 1}, stream_mode="values"): print(chunk)
六、架构模式
1. ReAct 模式
ReAct (Reasoning + Acting) 模式是 LangGraph 中常用的 AI 代理模式:

2. Map-Reduce 模式
使用 Send API 实现 Map-Reduce 模式:

def map_tasks(state):# 为每个项目创建任务return [Send("process", {"item": item}) for item in state["items"]]def reduce_results(state):# 合并结果return {"result": sum(state["partial_results"])}
七、最佳实践
1. 状态设计
- 使用 TypedDict 或 Pydantic 模型明确定义状态
- 为需要累积的字段使用适当的 reducer
- 避免状态过于复杂,考虑分解为多个图

-
2. 错误处理
-
使用重试策略处理临时性错误
-
实现适当的错误恢复机制
-
记录和监控错误情况
3. 性能优化
-
使用检查点避免重复计算
-
合理设置递归限制
-
使用异步 API 提高并发性能
4. 调试和监控
-
使用流式输出监控执行过程
-
启用调试模式获取详细信息
-
使用检查点进行状态回溯
八、与其他组件的集成
1. LangChain 集成
LangGraph 与 LangChain 生态系统无缝集成:
-
与 LangChain 的 LLM、工具、记忆等组件无缝集成
-
使用 Runnable 接口实现节点
-
利用 LangChain 的回调系统
2. 外部系统集成
-
通过自定义节点集成数据库
-
与外部 API 交互
-
集成消息队列和事件系统
九、总结
LangGraph 提供了一个强大而灵活的框架来构建复杂的 AI 工作流。通过理解其核心概念(Pregel 模型、StateGraph、Channels、Checkpoints 等),开发者可以构建从简单到复杂的各种 AI 应用,从基本的数值处理到复杂的多步骤推理系统。
LangGraph 的主要优势包括:
-
状态化管理
提供了完整的状态管理机制
-
图结构
支持复杂的执行流程定义
-
可扩展性
支持子图、并行处理等高级特性
-
容错性
内置错误处理和重试机制
-
可观察性
提供丰富的监控和调试功能
随着 AI 应用复杂度的不断增加,LangGraph 为开发者提供了一个强大的工具,能够构建更加智能、灵活和可靠的 AI 系统。无论是构建 AI 代理、多步骤工作流,还是复杂的决策系统,LangGraph 都能提供相应的解决方案。
如何学习大模型 AI ?
由于新岗位的生产效率,要优于被取代岗位的生产效率,所以实际上整个社会的生产效率是提升的。
但是具体到个人,只能说是:
“最先掌握AI的人,将会比较晚掌握AI的人有竞争优势”。
这句话,放在计算机、互联网、移动互联网的开局时期,都是一样的道理。
我在一线互联网企业工作十余年里,指导过不少同行后辈。帮助很多人得到了学习和成长。
我意识到有很多经验和知识值得分享给大家,也可以通过我们的能力和经验解答大家在人工智能学习中的很多困惑,所以在工作繁忙的情况下还是坚持各种整理和分享。但苦于知识传播途径有限,很多互联网行业朋友无法获得正确的资料得到学习提升,故此将并将重要的AI大模型资料包括AI大模型入门学习思维导图、精品AI大模型学习书籍手册、视频教程、实战学习等录播视频免费分享出来。

第一阶段(10天):初阶应用
该阶段让大家对大模型 AI有一个最前沿的认识,对大模型 AI 的理解超过 95% 的人,可以在相关讨论时发表高级、不跟风、又接地气的见解,别人只会和 AI 聊天,而你能调教 AI,并能用代码将大模型和业务衔接。
- 大模型 AI 能干什么?
- 大模型是怎样获得「智能」的?
- 用好 AI 的核心心法
- 大模型应用业务架构
- 大模型应用技术架构
- 代码示例:向 GPT-3.5 灌入新知识
- 提示工程的意义和核心思想
- Prompt 典型构成
- 指令调优方法论
- 思维链和思维树
- Prompt 攻击和防范
- …
第二阶段(30天):高阶应用
该阶段我们正式进入大模型 AI 进阶实战学习,学会构造私有知识库,扩展 AI 的能力。快速开发一个完整的基于 agent 对话机器人。掌握功能最强的大模型开发框架,抓住最新的技术进展,适合 Python 和 JavaScript 程序员。
- 为什么要做 RAG
- 搭建一个简单的 ChatPDF
- 检索的基础概念
- 什么是向量表示(Embeddings)
- 向量数据库与向量检索
- 基于向量检索的 RAG
- 搭建 RAG 系统的扩展知识
- 混合检索与 RAG-Fusion 简介
- 向量模型本地部署
- …
第三阶段(30天):模型训练
恭喜你,如果学到这里,你基本可以找到一份大模型 AI相关的工作,自己也能训练 GPT 了!通过微调,训练自己的垂直大模型,能独立训练开源多模态大模型,掌握更多技术方案。
到此为止,大概2个月的时间。你已经成为了一名“AI小子”。那么你还想往下探索吗?
- 为什么要做 RAG
- 什么是模型
- 什么是模型训练
- 求解器 & 损失函数简介
- 小实验2:手写一个简单的神经网络并训练它
- 什么是训练/预训练/微调/轻量化微调
- Transformer结构简介
- 轻量化微调
- 实验数据集的构建
- …
第四阶段(20天):商业闭环
对全球大模型从性能、吞吐量、成本等方面有一定的认知,可以在云端和本地等多种环境下部署大模型,找到适合自己的项目/创业方向,做一名被 AI 武装的产品经理。
- 硬件选型
- 带你了解全球大模型
- 使用国产大模型服务
- 搭建 OpenAI 代理
- 热身:基于阿里云 PAI 部署 Stable Diffusion
- 在本地计算机运行大模型
- 大模型的私有化部署
- 基于 vLLM 部署大模型
- 案例:如何优雅地在阿里云私有部署开源大模型
- 部署一套开源 LLM 项目
- 内容安全
- 互联网信息服务算法备案
- …
学习是一个过程,只要学习就会有挑战。天道酬勤,你越努力,就会成为越优秀的自己。
如果你能在15天内完成所有的任务,那你堪称天才。然而,如果你能完成 60-70% 的内容,你就已经开始具备成为一名大模型 AI 的正确特征了。
这份完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费】

更多推荐
所有评论(0)