[大模型架构] LangGraph AI 工作流编排(15)
组件定义与核心作用关键特性状态(State)工作流数据流转的 “载体”,存储工作流执行过程中的所有数据(输入、中间结果、配置参数)1. 基于类 / 数据结构定义,支持任意字段扩展;2. 节点间通过修改 State 传递数据;3. 支持状态持久化与回溯节点(Node)工作流的 “执行单元”,封装具体业务逻辑(如数据处理、模型调用、工具调用)1. 输入输出均为 State 实例;2. 支持同步 / 异
一、LangGraph 核心概念拆解(理解工作流的 “骨架”)
视频首先系统讲解 LangGraph 的核心组件与设计思想,避免开发者因概念模糊导致后续开发踩坑,核心围绕 “状态(State)、节点(Node)、边(Edge)、图(Graph)” 四大核心要素展开:
(一)核心组件定义与作用
| 组件 | 定义与核心作用 | 关键特性 |
|---|---|---|
| 状态(State) | 工作流数据流转的 “载体”,存储工作流执行过程中的所有数据(输入、中间结果、配置参数) | 1. 基于类 / 数据结构定义,支持任意字段扩展;2. 节点间通过修改 State 传递数据;3. 支持状态持久化与回溯 |
| 节点(Node) | 工作流的 “执行单元”,封装具体业务逻辑(如数据处理、模型调用、工具调用) | 1. 输入输出均为 State 实例;2. 支持同步 / 异步函数;3. 可复用、可组合,支持自定义节点与内置节点 |
| 边(Edge) | 节点间的 “连接关系”,定义数据流转路径(如 “节点 A 执行完成→节点 B”) | 1. 分为 “普通边”(固定流转)与 “条件边”(动态流转);2. 支持单节点→多节点、多节点→单节点连接 |
| 图(Graph) | 工作流的 “容器”,整合 State、Node、Edge,形成完整的执行逻辑 | 1. 需指定 “入口节点”(Entry Point)与 “结束节点”(Finish Point);2. 编译后生成可执行的 App 实例 |
(二)核心设计思想:“状态驱动的有向图”
- 状态驱动:所有节点的执行逻辑围绕 State 展开,节点通过读取 State 中的数据进行计算,执行完成后修改 State 并传递给下一个节点,确保数据流转的一致性;
- 有向图结构:工作流的执行路径由 Edge 明确定义,支持线性、分支、循环等复杂结构,可视化程度高,便于理解与调试;
- 可扩展性:组件化设计允许灵活新增 Node、Edge,或扩展 State 字段,适配不同业务场景。
二、线性工作流开发实操(入门必学)
视频以 “文本处理工作流” 为例,演示线性工作流(无分支、无循环,节点按固定顺序执行)的完整开发流程,步骤清晰可复刻:
(一)开发步骤:从定义到执行
-
步骤 1:定义 State(数据载体)
- 基于 LangGraph 的
State类,定义工作流所需的所有数据字段,示例:python
运行
from langgraph.graph import State from typing import Optional class TextProcessState(State): # 输入数据 raw_text: str # 原始文本(必填) # 中间结果 cleaned_text: Optional[str] = None # 清洗后的文本(可选,默认None) summarized_text: Optional[str] = None # 摘要文本(可选,默认None) # 配置参数 summary_length: int = 50 # 摘要长度(默认50字) - 关键说明:字段需明确类型注解,可选字段设置默认值,确保数据类型一致性。
- 基于 LangGraph 的
-
步骤 2:编写 Node 函数(执行单元)
- 每个 Node 函数接收
State实例作为输入,修改后返回新的State,示例实现两个节点(文本清洗、文本摘要):python
运行
# 节点1:文本清洗(去除空格、特殊字符) def clean_text_node(state: TextProcessState) -> TextProcessState: # 读取State中的原始文本 raw_text = state.raw_text # 执行清洗逻辑 cleaned = raw_text.strip().replace("\n", " ").replace(" ", " ") # 修改State并返回 state.cleaned_text = cleaned return state # 节点2:文本摘要(基于简单规则生成摘要) def summarize_text_node(state: TextProcessState) -> TextProcessState: # 读取State中的清洗后文本与配置参数 cleaned_text = state.cleaned_text summary_length = state.summary_length # 执行摘要逻辑(实际场景可替换为LLM调用) if len(cleaned_text) <= summary_length: summary = cleaned_text else: summary = cleaned_text[:summary_length] + "..." # 修改State并返回 state.summarized_text = summary return state - 关键说明:Node 函数必须返回
State实例,不可直接返回原始数据;逻辑需围绕 State 中的字段展开。
- 每个 Node 函数接收
-
步骤 3:构建 Graph(整合节点与边)
- 创建 Graph 实例,添加节点并定义流转顺序(线性:入口→清洗节点→摘要节点→结束):
python
运行
from langgraph import Graph # 1. 创建Graph实例(指定State类型) graph = Graph(TextProcessState) # 2. 添加节点(参数:节点ID,节点函数) graph.add_node("clean_text", clean_text_node) graph.add_node("summarize_text", summarize_text_node) # 3. 定义边(固定流转顺序) graph.add_edge("clean_text", "summarize_text") # 清洗节点→摘要节点 # 4. 指定入口与结束节点 graph.set_entry_point("clean_text") # 入口:清洗节点 graph.set_finish_point("summarize_text") # 结束:摘要节点
- 创建 Graph 实例,添加节点并定义流转顺序(线性:入口→清洗节点→摘要节点→结束):
-
步骤 4:编译与执行工作流
- 将 Graph 编译为可执行的 App 实例,传入初始 State 执行,获取结果:
python
运行
# 1. 编译Graph app = graph.compile() # 2. 定义初始State(仅传入必填字段) initial_state = TextProcessState( raw_text=" 这是一段需要处理的原始文本,包含多余的空格和换行符。\n\n 下面是第二段内容,我们需要对其进行清洗和摘要生成。 ", summary_length=80 # 自定义摘要长度(覆盖默认值) ) # 3. 执行工作流(invoke方法接收初始State,返回最终State) final_state = app.invoke(initial_state) # 4. 查看结果 print("清洗后文本:", final_state.cleaned_text) print("生成摘要:", final_state.summarized_text)
- 将 Graph 编译为可执行的 App 实例,传入初始 State 执行,获取结果:
-
步骤 5:执行结果验证
- 预期输出:
plaintext
清洗后文本: 这是一段需要处理的原始文本,包含多余的空格和换行符。 下面是第二段内容,我们需要对其进行清洗和摘要生成。 生成摘要: 这是一段需要处理的原始文本,包含多余的空格和换行符。 下面是第二段内容,我们需要对其进行清洗和摘要生成。...
- 预期输出:
(二)关键注意事项
- 节点函数必须接收并返回
State实例,不可遗漏; - 边的连接必须存在对应的节点(避免节点 ID 拼写错误);
- 初始 State 需包含所有必填字段,可选字段可按需覆盖默认值。
三、分支工作流开发实操(进阶入门)
视频在 linear 工作流基础上,扩展至分支工作流(根据 State 中的数据动态选择执行路径),以 “数据类型判断工作流” 为例,演示条件边的使用:
(一)核心新增组件:条件边(ConditionalEdge)
- 作用:根据 State 中的数据,动态选择下一个执行节点,实现 “一进多出” 的分支逻辑;
- 核心依赖:需定义 “分支判断函数”,输入 State,返回目标节点 ID。
(二)开发步骤:分支工作流实现
-
步骤 1:扩展 State(增加分支判断字段)
python
运行
class DataProcessState(State): data: str # 输入数据 data_type: Optional[str] = None # 数据类型(text/image/other) result: Optional[str] = None # 处理结果 -
步骤 2:编写分支判断函数
python
运行
def route_by_data_type(state: DataProcessState) -> str: """根据数据类型选择下一个节点""" data = state.data # 简单判断数据类型(实际场景可替换为更复杂的识别逻辑) if data.startswith("text:"): state.data_type = "text" return "process_text" # 文本处理节点ID elif data.startswith("image:"): state.data_type = "image" return "process_image" # 图像处理节点ID else: state.data_type = "other" return "process_other" # 其他类型处理节点ID -
步骤 3:编写分支节点函数
python
运行
# 文本处理节点 def process_text_node(state: DataProcessState) -> DataProcessState: state.result = f"文本处理完成:{state.data[5:]}" # 截取"text:"后的内容 return state # 图像处理节点 def process_image_node(state: DataProcessState) -> DataProcessState: state.result = f"图像处理完成:{state.data[6:]}" # 截取"image:"后的内容 return state # 其他类型处理节点 def process_other_node(state: DataProcessState) -> DataProcessState: state.result = f"其他类型数据处理完成:{state.data}" return state -
步骤 4:构建分支 Graph
python
运行
graph = Graph(DataProcessState) # 添加所有节点 graph.add_node("route", route_by_data_type) # 分支判断节点(可选,也可直接用条件边) graph.add_node("process_text", process_text_node) graph.add_node("process_image", process_image_node) graph.add_node("process_other", process_other_node) # 定义条件边(入口→分支判断→目标节点) graph.add_conditional_edges( source="route", # 源节点ID(分支判断节点) condition=route_by_data_type, # 分支判断函数 # 映射:判断函数返回值→目标节点ID mapping={ "process_text": "process_text", "process_image": "process_image", "process_other": "process_other" } ) # 指定入口节点(分支判断节点) graph.set_entry_point("route") # 所有分支共享同一个结束节点(可选,也可分别设置) graph.set_finish_point("process_text") graph.set_finish_point("process_image") graph.set_finish_point("process_other") -
步骤 5:执行与验证
python
运行
app = graph.compile() # 测试文本类型数据 text_state = DataProcessState(data="text:这是一段文本数据") text_result = app.invoke(text_state) print(text_result.result) # 预期输出:文本处理完成:这是一段文本数据 # 测试图像类型数据 image_state = DataProcessState(data="image:这是一张图片的路径") image_result = app.invoke(image_state) print(image_result.result) # 预期输出:图像处理完成:这是一张图片的路径
四、调试与优化技巧(提升开发效率)
视频针对 LangGraph 开发中的调试痛点,提供了 3 个核心技巧:
(一)日志输出与状态查看
-
启用详细日志:通过
logging模块输出节点执行顺序、State 变化,便于定位问题:python
运行
import logging logging.basicConfig(level=logging.INFO) # 输出INFO级别日志- 执行后会打印节点执行日志(如 “Entering node 'clean_text'”“Exiting node 'clean_text'”)。
-
中间状态查看:在节点函数中添加
print语句,实时查看 State 的变化:python
运行
def clean_text_node(state: TextProcessState) -> TextProcessState: print("清洗前State:", state.raw_text) # 清洗逻辑... print("清洗后State:", state.cleaned_text) return state
(二)节点执行耗时分析
- 添加耗时统计:在节点函数中记录执行时间,识别低效节点:
python
运行
import time def summarize_text_node(state: TextProcessState) -> TextProcessState: start_time = time.time() # 摘要逻辑... end_time = time.time() print(f"摘要节点执行耗时:{end_time - start_time:.2f}s") return state
(三)错误处理与容错
- 节点内异常捕获:在节点函数中添加
try-except,避免单个节点报错导致整个工作流中断:python
运行
def process_image_node(state: DataProcessState) -> DataProcessState: try: # 图像处理逻辑... state.result = f"图像处理完成:{state.data[6:]}" except Exception as e: state.result = f"图像处理失败:{str(e)}" return state
五、常见问题与避坑指南
| 问题现象 | 核心原因 | 解决方案 |
|---|---|---|
| 节点函数执行后 State 字段未更新 | 忘记修改 State 或未返回 State 实例 | 确保节点函数中修改 State 字段后,返回该实例(return state) |
| 条件边分支判断失效 | 判断函数返回值与mapping中的键不匹配 |
检查返回值类型(需为字符串)与键名一致性,避免拼写错误 |
| 工作流执行提示 “节点 ID 不存在” | 添加边时使用了未定义的节点 ID | 确保add_edge/add_conditional_edges中的节点 ID 已通过add_node注册 |
| 状态字段类型错误 | 初始 State 传入的字段类型与定义不一致 | 严格按照 State 类的类型注解传入数据(如raw_text需为字符串) |
| 异步节点执行卡顿 | 未使用异步函数或未等待异步操作 | 异步节点需定义为async def,并在执行时使用await app.ainvoke() |
更多推荐


所有评论(0)