一、简介

LangChain 1.0 的工作流(Workflows)是为了实现复杂 LLM 应用编排的核心能力 —— 通过可视化、可复用、可调试的方式,将模型、工具、记忆、检索等组件串联成生产级应用。

1.1 核心定义

LangChain 1.0 的工作流是基于 LangGraph 构建的有状态、可中断、可重试的应用编排体系,核心特征是:

  • 将 LLM 应用的核心逻辑(模型调用、工具执行、分支判断、记忆管理、检索增强)抽象为节点(Node) 和边(Edge),通过可视化的图结构定义执行流程,支持循环、条件分支、多智能体协作等复杂逻辑。

1.2 核心定位

从线性链到智能流程图,用 LangGraph 构建可控AI流程,让你从“写代码”升级为“画流程图”,用节点和边来编排复杂的AI逻辑。旧版 Chain 仅用于极简线性场景(且逐步被 LCEL 链式调用替代)。

维度 旧版 Chain(0.x) 1.0 工作流(LangGraph)
核心形态 线性链式调用 有向图(支持分支 / 循环 / 并行)
状态管理 无原生状态,需手动维护 原生 Checkpoint 状态管理(可中断 / 恢复)
可调试性 弱(线性日志,难定位问题) 强(可视化图、节点级日志、断点调试)
复杂逻辑支持 弱(仅简单串联) 强(多智能体、条件分支、工具重试)
生产级特性 无(无重试 / 兜底 / 并发) 全支持(重试策略、异常兜底、异步并发)
可视化 原生支持(LangGraph UI)

1.3 核心价值

  1. 可视化编排:用图结构替代硬编码,降低复杂逻辑开发成本;
  2. 状态持久化:Checkpoint 支持中断 / 恢复,适配长时任务;
  3. 可复用性:节点可独立封装、跨工作流复用;
  4. 可调试性:节点级日志、可视化执行轨迹,快速定位问题;
  5. 生产级鲁棒性:内置重试、兜底、并发控制,适配高可用场景。

二、工作流核心架构(LangGraph)

LangChain 1.0 工作流的核心是 LangGraph,其核心优势是支持循环、分支、状态管理,且原生支持可视化 —— 能把复杂的智能体 / 多步骤流程转化为直观的图形,方便调试和理解。

2.1 核心概念

组件 作用 示例
State 工作流全局状态(数据载体) 包含用户输入、工具结果、模型输出、上下文
Node 工作流节点(执行单元) 模型调用节点、工具执行节点、判断节点
Edge 节点间的边(流程走向) 线性边(直接跳转)、条件边(分支判断)
Checkpoint 状态快照(可中断 / 恢复) 保存当前节点、状态数据,支持重启
Agent 智能体节点(决策 + 执行) 内置思考 - 行动循环的智能体节点
ToolNode 工具执行节点 封装工具调用逻辑,简化工具执行

2.2 状态(State)设计

State 是工作流的「数据总线」,所有节点的输入 / 输出均通过 State 传递,LangChain 1.0 提供 TypedDict 风格的强类型 State 定义:

from langgraph.graph import StateGraph, START, END
from langgraph.graph.state import StateGraph
from typing import TypedDict, List

# 定义工作流状态(强类型)
class WorkflowState(TypedDict):
    user_input: str                # 用户输入
    intermediate_steps: List[str]  # 中间步骤
    tool_results: dict             # 工具执行结果
    final_answer: str              # 最终答案
    need_retry: bool               # 是否需要重试

2.3 节点(Node)分类与实现

节点是工作流的「执行单元」,分为三类核心节点:

  1. 功能节点(Function Node)
    最基础的节点,封装任意自定义逻辑(模型调用、工具执行、数据处理):

    # 1. 模型调用节点
    def llm_node(state: WorkflowState):
        from langchain_openai import ChatOpenAI
        llm = ChatOpenAI(model="gpt-4o", temperature=0)
        # 调用模型
        response = llm.invoke([("human", state["user_input"])])
        # 更新状态
        return {
            "intermediate_steps": state["intermediate_steps"] + [f"模型调用结果:{response.content}"],
            "final_answer": response.content
        }
    
    # 2. 工具执行节点
    def tool_node(state: WorkflowState):
        from langchain_core.tools import tool
        @tool
        def query_sales(quarter: str) -> str:
            return "2025Q4销售额:1280万元"
        
        # 执行工具
        result = query_sales.invoke({"quarter": "2025Q4"})
        # 更新状态
        return {
            "tool_results": {"sales": result},
            "intermediate_steps": state["intermediate_steps"] + [f"工具执行结果:{result}"]
        }
    
    # 3. 条件判断节点(分支逻辑)
    def judge_node(state: WorkflowState):
        # 判断是否需要重试
        if "失败" in state["tool_results"].get("sales", ""):
            return "retry"  # 跳转到重试节点
        else:
            return "end"    # 跳转到结束节点
    
  2. 工具节点(ToolNode)
    LangGraph 内置的工具执行节点,简化工具调用逻辑:

    from langgraph.prebuilt import ToolNode
    
    # 定义工具列表
    tools = [query_sales]
    # 创建工具节点
    tool_node = ToolNode(tools)
    
  3. 智能体节点(AgentNode)
    内置思考 - 行动循环的智能体节点,适配复杂决策场景:

    from langgraph.prebuilt import create_react_agent
    
    # 创建 ReAct 智能体节点
    agent_node = create_react_agent(llm, tools)
    

2.4 边(Edge)设计

边定义节点间的执行流向,分为两类:

  1. 线性边(Direct Edge)
    无条件跳转,适用于线性流程:

    # 从 START 到 tool_node
    graph.add_edge(START, "tool_node")
    # 从 tool_node 到 llm_node
    graph.add_edge("tool_node", "llm_node")
    # 从 llm_node 到 END
    graph.add_edge("llm_node", END)
    
  2. 条件边(Conditional Edge)
    根据节点输出动态跳转,适用于分支逻辑:

    # 从 judge_node 跳转到不同节点
    graph.add_conditional_edges(
        "judge_node",          # 源节点
        judge_node,            # 判断函数(返回目标节点名)
        {
            "retry": "tool_node",  # 返回 "retry" 跳转到 tool_node
            "end": END             # 返回 "end" 跳转到 END
        }
    )
    

三、工作流核心开发流程

以「销售数据查询 + 分析」工作流为例,完整展示 1.0 工作流的开发步骤:

步骤 1:定义状态(State)

from typing import TypedDict, List
from langgraph.graph import StateGraph, START, END

# 定义工作流状态
class SalesWorkflowState(TypedDict):
    user_input: str                # 用户输入(如“查询2025Q4销售额并分析”)
    quarter: str                   # 提取的季度参数
    tool_results: dict             # 工具执行结果
    analysis_result: str           # 分析结果
    need_retry: bool = False       # 是否需要重试

步骤 2:定义核心节点

from langchain_openai import ChatOpenAI
from langchain_core.tools import tool

# 初始化模型
llm = ChatOpenAI(model="gpt-4o", temperature=0)

# 节点1:提取季度参数
def extract_quarter_node(state: SalesWorkflowState):
    # 调用模型提取参数
    prompt = f"从用户输入中提取季度参数(格式:2025Q4):{state['user_input']}"
    quarter = llm.invoke([("human", prompt)]).content.strip()
    return {"quarter": quarter}

# 节点2:查询销售数据(工具节点)
@tool
def query_sales_tool(quarter: str) -> str:
    """查询指定季度销售额"""
    return f"{quarter}销售额:1280万元,同比增长15.3%"

# 节点3:分析销售数据
def analyze_sales_node(state: SalesWorkflowState):
    prompt = f"分析销售数据:{state['tool_results']['query_sales_tool']}"
    analysis = llm.invoke([("human", prompt)]).content
    return {"analysis_result": analysis}

# 节点4:判断是否需要重试
def judge_retry_node(state: SalesWorkflowState):
    if not state["quarter"]:
        return "retry_extract"  # 季度提取失败,重试
    elif not state["tool_results"]:
        return "retry_query"    # 工具执行失败,重试
    else:
        return "end"            # 成功,结束

步骤 3:构建工作流图

from langgraph.prebuilt import ToolNode

# 1. 创建状态图
graph = StateGraph(SalesWorkflowState)

# 2. 添加节点
graph.add_node("extract_quarter", extract_quarter_node)
graph.add_node("query_sales", ToolNode([query_sales_tool]))
graph.add_node("analyze_sales", analyze_sales_node)
graph.add_node("judge_retry", judge_retry_node)

# 3. 添加边(定义流程)
# 初始流程:START → 提取季度 → 判断重试
graph.add_edge(START, "extract_quarter")
graph.add_edge("extract_quarter", "judge_retry")

# 条件边:根据判断结果跳转
graph.add_conditional_edges(
    "judge_retry",
    judge_retry_node,
    {
        "retry_extract": "extract_quarter",  # 重试提取季度
        "retry_query": "query_sales",        # 重试查询数据
        "end": "analyze_sales"               # 成功则分析数据
    }
)

# 最终流程:分析数据 → END
graph.add_edge("query_sales", "analyze_sales")
graph.add_edge("analyze_sales", END)

# 4. 编译工作流(开启Checkpoint支持状态持久化)
app = graph.compile(checkpointer=None)  # 生产环境需指定checkpointer(如Redis)

步骤 4:运行工作流

# 输入初始状态
initial_state = {
    "user_input": "查询2025Q4的销售额并做简单分析"
}

# 运行工作流
result = app.invoke(initial_state)

# 输出结果
print(f"季度:{result['quarter']}")
print(f"工具结果:{result['tool_results']}")
print(f"分析结果:{result['analysis_result']}")

步骤 5:可视化工作流

LangGraph 提供 3 种可视化方式:

  • Mermaid 文本:生成可复制到在线工具的 Mermaid 代码(最通用);
  • HTML 可视化:生成交互式 HTML 页面(可本地打开,支持缩放 / 拖拽);
  • Jupyter 可视化:在 Notebook 中直接渲染(适合数据分析场景)。
  1. 生成 Mermaid 代码
    Mermaid 是通用的流程图语法,可复制到 Mermaid Live Editor 在线渲染:

    # 生成 Mermaid 代码
    mermaid_code = app.get_graph().draw_mermaid()
    print("=== Mermaid 可视化代码 ===")
    print(mermaid_code)
    
  2. 生成交互式 HTML 页面
    LangGraph 支持生成 HTML 文件,本地打开即可交互(缩放、拖拽、查看节点详情):

    # 生成 HTML 可视化文件
    html_content = app.get_graph().draw_html()
    
    # 保存为 HTML 文件
    with open("langgraph_workflow.html", "w", encoding="utf-8") as f:
        f.write(html_content)
    
    print("HTML 可视化文件已保存:langgraph_workflow.html")
    
  3. Jupyter Notebook 中直接渲染
    如果用 Jupyter 环境,可直接渲染流程图(无需保存文件):

    # 在 Jupyter 中渲染(需安装 ipywidgets)
    from IPython.display import display
    display(app.get_graph().draw_widget())
    

四、工作流高级特性

4.1 状态管理高级特性

4.1.1 可中断、可恢复(Checkpoint / Checkpointer)

  • 特性说明:工作流执行中可暂停,重启后从断点继续,支持长任务 / 多轮对话 / 人工介入。
  • 核心价值:解决服务重启、崩溃导致的流程中断问题,是生产级应用必备。
from langgraph.graph import StateGraph, START, END
from langchain_community.checkpoint.sqlite import SqliteCheckpointSaver
from typing import TypedDict

# 1. 定义状态
class WorkflowState(TypedDict):
    user_input: str
    step: str  # 记录当前执行步骤
    result: str

# 2. 初始化Checkpoint(持久化到SQLite)
checkpointer = SqliteCheckpointSaver.from_conn_string("checkpoint.db")

# 3. 定义节点
def step1(state: WorkflowState):
    print("执行步骤1:处理用户输入")
    return {"step": "step1", "result": f"处理完成:{state['user_input']}"}

def step2(state: WorkflowState):
    print("执行步骤2:生成结果")
    return {"step": "step2", "result": f"最终结果:{state['user_input']} - 已完成"}

# 4. 构建工作流
graph = StateGraph(WorkflowState)
graph.add_node("step1", step1)
graph.add_node("step2", step2)
graph.add_edge(START, "step1")
graph.add_edge("step1", "step2")
graph.add_edge("step2", END)

# 5. 编译时绑定Checkpoint(核心)
app = graph.compile(checkpointer=checkpointer)

# 6. 第一次运行(执行到step1后中断)
config = {"configurable": {"thread_id": "user_123"}}  # 会话唯一标识
initial_state = {"user_input": "测试可中断工作流"}
# 执行step1(手动中断演示)
app.invoke(initial_state, config=config, stop_after=["step1"])

# 7. 恢复执行(从step1继续到结束)
resumed_result = app.invoke({}, config=config)
print("恢复执行结果:", resumed_result)

关键说明:

  • checkpointer 支持 Redis/MongoDB/SQLite 等,生产推荐 Redis;
  • thread_id 区分不同用户 / 会话,保证状态隔离;
  • stop_after 手动指定中断节点,模拟异常 / 人工暂停场景。

4.1.2 状态增量更新

  • 特性说明:节点仅返回需要更新的字段,LangGraph 自动合并到全局状态,无需全量赋值。
  • 核心价值:简化节点逻辑,避免状态覆盖丢失。
# 基于上面的State定义
def step1_incremental(state: WorkflowState):
    # 仅返回需要更新的字段(无需包含user_input)
    return {"step": "step1"}  

def step2_incremental(state: WorkflowState):
    # 仅更新result字段,保留step和user_input
    return {"result": f"增量更新结果:{state['user_input']}"}

# 构建并运行
graph = StateGraph(WorkflowState)
graph.add_node("step1", step1_incremental)
graph.add_node("step2", step2_incremental)
graph.add_edge(START, "step1")
graph.add_edge("step1", "step2")
graph.add_edge("step2", END)

app = graph.compile()
result = app.invoke({"user_input": "测试增量更新"})
print("最终状态:", result)  # 包含user_input/step/result所有字段

4.1.3 消息自动合并(Message Graph)

  • 特性说明:对 messages 列表字段自动追加 / 去重,无需手动拼接多轮对话上下文。
  • 核心价值:简化多轮对话 / 工具调用的上下文管理。
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import HumanMessage, AIMessage
from typing import Annotated, List
from langgraph.graph import add_messages

# 1. 定义带消息合并的状态
class ChatState(TypedDict):
    # add_messages 标记自动合并消息列表
    messages: Annotated[List, add_messages]

# 2. 定义聊天节点
def chat_node(state: ChatState):
    last_msg = state["messages"][-1].content
    return {"messages": [AIMessage(content=f"回复:{last_msg}")]}

# 3. 构建工作流
graph = StateGraph(ChatState)
graph.add_node("chat", chat_node)
graph.add_edge(START, "chat")
graph.add_edge("chat", END)

app = graph.compile()

# 4. 多轮调用(消息自动追加)
result1 = app.invoke({"messages": [HumanMessage(content="你好")]})
result2 = app.invoke({"messages": [HumanMessage(content="再聊下工作流")]}, 
                    config={"configurable": {"thread_id": "chat_123"}})
print("所有消息:", [msg.content for msg in result2["messages"]])

4.2 流程控制高级特性

4.2.1 条件分支(Conditional Edge)

  • 特性说明:根据节点输出动态决定下一步执行路径,支持分支逻辑(核心的 Agent 思考 - 行动循环依赖此特性)。
  • 核心价值:替代硬编码的 if-else,实现灵活的流程分支。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict

class BranchState(TypedDict):
    user_input: str
    need_tool: bool  # 是否需要调用工具
    result: str

# 1. 定义节点
def judge_node(state: BranchState):
    # 判断是否需要调用工具
    if "查询" in state["user_input"]:
        return "call_tool"  # 分支1:调用工具
    else:
        return "direct_answer"  # 分支2:直接回答

def tool_node(state: BranchState):
    return {"result": "工具调用结果:2025Q4销售额1280万", "need_tool": False}

def answer_node(state: BranchState):
    return {"result": f"直接回答:{state['user_input']}"}

# 2. 构建工作流
graph = StateGraph(BranchState)
graph.add_node("judge", judge_node)
graph.add_node("tool", tool_node)
graph.add_node("answer", answer_node)

# 3. 添加条件边(核心)
graph.add_edge(START, "judge")
graph.add_conditional_edges(
    "judge",  # 源节点
    judge_node,  # 分支判断函数
    {
        "call_tool": "tool",  # 返回"call_tool"跳转到tool节点
        "direct_answer": "answer"  # 返回"direct_answer"跳转到answer节点
    }
)
graph.add_edge("tool", END)
graph.add_edge("answer", END)

app = graph.compile()

# 测试分支1:调用工具
result1 = app.invoke({"user_input": "查询2025Q4销售额"})
print("分支1结果:", result1["result"])

# 测试分支2:直接回答
result2 = app.invoke({"user_input": "介绍下LangChain工作流"})
print("分支2结果:", result2["result"])

4.2.1 循环执行(Loop)

  • 特性说明:支持流程循环执行(如工具调用重试、多轮思考),是 ReAct Agent 的核心。
  • 核心价值:实现 “思考→行动→观察→再思考” 的循环逻辑。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict

class LoopState(TypedDict):
    count: int
    result: str

# 1. 定义节点
def loop_node(state: LoopState):
    new_count = state["count"] + 1
    return {"count": new_count}

def judge_loop(state: LoopState):
    # 循环3次后结束
    if state["count"] < 3:
        return "loop"  # 继续循环
    else:
        return "end"  # 结束循环

# 2. 构建工作流
graph = StateGraph(LoopState)
graph.add_node("loop", loop_node)
graph.add_node("judge", judge_loop)

# 3. 添加循环边
graph.add_edge(START, "loop")
graph.add_edge("loop", "judge")
graph.add_conditional_edges(
    "judge",
    judge_loop,
    {
        "loop": "loop",  # 循环回到loop节点
        "end": END
    }
)

app = graph.compile()

# 运行循环
result = app.invoke({"count": 0})
print("循环结果:", result)  # count=3,循环3次后结束

4.2.3 节点中断(NodeInterrupt)

  • 特性说明:在节点中主动抛出中断异常,暂停工作流,支持人工审核 / 外部回调场景。
  • 核心价值:实现 “机器执行 + 人工介入” 的混合流程。
from langgraph.graph import StateGraph, START, END
from langgraph.errors import NodeInterrupt
from typing import TypedDict

class InterruptState(TypedDict):
    user_input: str
    need_audit: bool
    audit_result: str

# 1. 定义节点
def check_audit(state: InterruptState):
    # 敏感内容需要人工审核
    if "敏感信息" in state["user_input"]:
        # 抛出中断异常,暂停工作流
        raise NodeInterrupt("需要人工审核敏感内容")
    return {"need_audit": False}

def generate_result(state: InterruptState):
    return {"result": f"生成结果:{state['user_input']}"}

# 2. 构建工作流
graph = StateGraph(InterruptState)
graph.add_node("check", check_audit)
graph.add_node("generate", generate_result)
graph.add_edge(START, "check")
graph.add_edge("check", "generate")
graph.add_edge("generate", END)

app = graph.compile()

# 测试中断
try:
    app.invoke({"user_input": "包含敏感信息的请求"})
except NodeInterrupt as e:
    print("工作流中断:", e)
    # 人工审核后恢复执行
    resumed_result = app.invoke(
        {"user_input": "包含敏感信息的请求", "audit_result": "审核通过"},
        config={"configurable": {"thread_id": "audit_123"}}
    )
    print("恢复执行结果:", resumed_result)

4.2.4 并行执行(Parallel Node)

  • 特性说明:同时执行多个节点,提升流程效率(如并行调用多个工具、并行检索)。
  • 核心价值:减少流程总耗时,适配高并发场景。
from langgraph.graph import StateGraph, START, END, create_parallel_node
from typing import TypedDict

class ParallelState(TypedDict):
    tool1_result: str
    tool2_result: str
    merged_result: str

# 1. 定义并行节点
def tool1_node(state: ParallelState):
    return {"tool1_result": "工具1结果"}

def tool2_node(state: ParallelState):
    return {"tool2_result": "工具2结果"}

# 2. 合并并行结果
def merge_node(state: ParallelState):
    return {"merged_result": f"{state['tool1_result']} + {state['tool2_result']}"}

# 3. 构建并行工作流
graph = StateGraph(ParallelState)
# 创建并行节点(同时执行tool1和tool2)
parallel_node = create_parallel_node(
    graph,
    {"tool1": tool1_node, "tool2": tool2_node}
)
graph.add_node("parallel", parallel_node)
graph.add_node("merge", merge_node)

# 4. 定义流程
graph.add_edge(START, "parallel")
graph.add_edge("parallel", "merge")
graph.add_edge("merge", END)

app = graph.compile()

# 运行并行流程
result = app.invoke({})
print("并行执行结果:", result["merged_result"])

4.3 调试 / 观测高级特性

4.3.1 工作流可视化

  • 特性说明:将工作流导出为 Mermaid 图形(可可视化),支持 PNG 导出。
  • 核心价值:直观展示流程结构,便于调试和团队协作。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict

class VisState(TypedDict):
    input: str

def node1(state: VisState):
    return {"input": state["input"] + "_node1"}

def node2(state: VisState):
    return {"input": state["input"] + "_node2"}

# 构建工作流
graph = StateGraph(VisState)
graph.add_node("node1", node1)
graph.add_node("node2", node2)
graph.add_edge(START, "node1")
graph.add_edge("node1", "node2")
graph.add_edge("node2", END)

app = graph.compile()

# 1. 导出Mermaid格式(可在Mermaid编辑器中可视化)
mermaid_code = app.get_graph().draw_mermaid()
print("Mermaid可视化代码:")
print(mermaid_code)

# 2. 导出PNG图片(需安装pygraphviz)
# app.get_graph().draw_png("workflow.png")

4.3.2 流式输出整个工作流

  • 特性说明:实时流式输出工作流的节点执行、状态变化、模型结果,支持前端打字机效果。
  • 核心价值:提升用户体验,实时展示 AI 思考 / 执行过程。
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from typing import TypedDict

class StreamState(TypedDict):
    question: str
    answer: str

# 1. 初始化模型
llm = ChatOpenAI(model="gpt-4o", streaming=True)

# 2. 定义流式节点
def stream_answer_node(state: StreamState):
    # 流式生成回答
    response = ""
    for chunk in llm.stream([("human", state["question"])]):
        response += chunk.content
        # 实时返回流式结果
        yield {"answer": response}

# 3. 构建工作流
graph = StateGraph(StreamState)
graph.add_node("stream_answer", stream_answer_node)
graph.add_edge(START, "stream_answer")
graph.add_edge("stream_answer", END)

app = graph.compile()

# 4. 流式运行工作流
print("流式输出:", end="")
for chunk in app.stream({"question": "详细介绍LangChain 1.0工作流"}):
    print(chunk["answer"][-10:], end="", flush=True)  # 打印最后10个字符

4.4 生产级稳定性特性

4.4.1 重试机制

  • 特性说明:为节点添加重试逻辑,处理工具调用 / 模型调用失败场景。
  • 核心价值:提升工作流鲁棒性,减少临时故障导致的失败。
from langgraph.graph import StateGraph, START, END
from tenacity import retry, stop_after_attempt, wait_exponential
from typing import TypedDict

class RetryState(TypedDict):
    input: str
    result: str

# 1. 定义带重试的节点
@retry(
    stop=stop_after_attempt(3),  # 最多重试3次
    wait=wait_exponential(multiplier=1, min=1, max=5)  # 指数退避等待
)
def unreliable_node(state: RetryState):
    # 模拟随机失败
    import random
    if random.random() < 0.7:
        raise Exception("节点执行失败")
    return {"result": f"成功:{state['input']}"}

# 2. 构建工作流
graph = StateGraph(RetryState)
graph.add_node("unreliable", unreliable_node)
graph.add_edge(START, "unreliable")
graph.add_edge("unreliable", END)

app = graph.compile()

# 运行带重试的节点
result = app.invoke({"input": "测试重试机制"})
print("重试结果:", result["result"])

4.4.2 异步执行

  • 特性说明:支持异步调用工作流,适配 FastAPI / 异步框架,提升并发能力。
  • 核心价值:解决同步调用的性能瓶颈,支持高并发场景。
import asyncio
from langgraph.graph import StateGraph, START, END
from typing import TypedDict

class AsyncState(TypedDict):
    input: str
    result: str

# 1. 定义异步节点
async def async_node(state: AsyncState):
    # 模拟异步操作(如异步调用模型/工具)
    await asyncio.sleep(1)
    return {"result": f"异步结果:{state['input']}"}

# 2. 构建工作流
graph = StateGraph(AsyncState)
graph.add_node("async_node", async_node)
graph.add_edge(START, "async_node")
graph.add_edge("async_node", END)

app = graph.compile()

# 3. 异步调用工作流
async def run_async_workflow():
    result = await app.ainvoke({"input": "测试异步执行"})
    print("异步执行结果:", result["result"])

    # 批量异步调用
    batch_results = await app.abatch([
        {"input": "批量1"},
        {"input": "批量2"}
    ])
    print("批量异步结果:", [r["result"] for r in batch_results])

asyncio.run(run_async_workflow())

4.4.3 中间件(Middleware)

  • 特性说明:通过中间件实现日志、监控、脱敏等横切功能,不侵入业务逻辑。
  • 核心价值:解耦业务与非业务逻辑,提升代码可维护性。
from langgraph.graph import StateGraph, START, END
from langchain_core.middleware import BaseMiddleware
from typing import TypedDict

class MiddlewareState(TypedDict):
    input: str
    result: str

# 1. 定义日志中间件
class LogMiddleware(BaseMiddleware):
    def invoke(self, input, config, next):
        # 前置处理:记录输入
        print(f"节点 {config['node']} 输入:{input}")
        # 执行原节点逻辑
        output = next(input, config)
        # 后置处理:记录输出
        print(f"节点 {config['node']} 输出:{output}")
        return output

# 2. 定义业务节点
def business_node(state: MiddlewareState):
    return {"result": f"业务结果:{state['input']}"}

# 3. 构建工作流并绑定中间件
graph = StateGraph(MiddlewareState)
graph.add_node("business", business_node)
graph.add_edge(START, "business")
graph.add_edge("business", END)

# 绑定中间件
app = graph.compile(middlewares=[LogMiddleware()])

# 运行工作流(中间件自动记录日志)
app.invoke({"input": "测试中间件"})

4.5 架构扩展高级特性

4.5.1 子图模块化(Graph as Node)

  • 特性说明:将一个完整工作流作为子图嵌入到更大的工作流中,实现模块化开发。
  • 核心价值:支持大型应用的分层架构(如 RAG 子图、工具调用子图)。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict

# 1. 定义子图状态
class SubGraphState(TypedDict):
    sub_input: str
    sub_result: str

# 2. 构建子图(独立的小工作流)
sub_graph = StateGraph(SubGraphState)
def sub_node(state: SubGraphState):
    return {"sub_result": f"子图处理:{state['sub_input']}"}
sub_graph.add_node("sub_node", sub_node)
sub_graph.add_edge(START, "sub_node")
sub_graph.add_edge("sub_node", END)
sub_app = sub_graph.compile()

# 3. 定义主工作流状态
class MainGraphState(TypedDict):
    main_input: str
    main_result: str
    sub_result: str

# 4. 主工作流中调用子图
def call_subgraph_node(state: MainGraphState):
    # 调用子图
    sub_result = sub_app.invoke({"sub_input": state["main_input"]})
    return {"sub_result": sub_result["sub_result"]}

def main_node(state: MainGraphState):
    return {"main_result": f"主图合并:{state['sub_result']}"}

# 5. 构建主工作流
main_graph = StateGraph(MainGraphState)
main_graph.add_node("call_subgraph", call_subgraph_node)
main_graph.add_node("main_node", main_node)
main_graph.add_edge(START, "call_subgraph")
main_graph.add_edge("call_subgraph", "main_node")
main_graph.add_edge("main_node", END)

main_app = main_graph.compile()

# 运行主工作流
result = main_app.invoke({"main_input": "测试子图模块化"})
print("主工作流结果:", result["main_result"])

4.5.2 多智能体协作

  • 特性说明:支持多智能体分工协作(如 Supervisor + Worker 架构),实现复杂任务拆解。
  • 核心价值:解决单一智能体无法处理的复杂任务,提升任务处理能力。
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
from typing import TypedDict

# 1. 初始化模型
llm = ChatOpenAI(model="gpt-4o")

# 2. 定义工具
@tool
def sales_tool(quarter: str) -> str:
    """查询销售额"""
    return f"{quarter}销售额:1280万"

@tool
def analysis_tool(data: str) -> str:
    """分析销售数据"""
    return f"分析结果:{data},同比增长15.3%"

# 3. 定义多智能体状态
class MultiAgentState(TypedDict):
    user_input: str
    sales_data: str
    analysis_result: str

# 4. 创建Worker智能体
sales_agent = create_react_agent(llm, [sales_tool], name="sales_agent")
analysis_agent = create_react_agent(llm, [analysis_tool], name="analysis_agent")

# 5. 定义智能体节点
def sales_agent_node(state: MultiAgentState):
    result = sales_agent.invoke({"input": state["user_input"]})
    return {"sales_data": result["output"]}

def analysis_agent_node(state: MultiAgentState):
    result = analysis_agent.invoke({"input": state["sales_data"]})
    return {"analysis_result": result["output"]}

# 6. 构建多智能体工作流
graph = StateGraph(MultiAgentState)
graph.add_node("sales_agent", sales_agent_node)
graph.add_node("analysis_agent", analysis_agent_node)
graph.add_edge(START, "sales_agent")
graph.add_edge("sales_agent", "analysis_agent")
graph.add_edge("analysis_agent", END)

app = graph.compile()

# 运行多智能体工作流
result = app.invoke({"user_input": "查询2025Q4销售额并分析"})
print("多智能体结果:", result["analysis_result"])

五、完整实战——多智能体协作工作流

构建一个复杂的例子:研究助手团队,包含研究员、分析师、写作者三个智能体协同工作。

  • 多智能体分工:研究员→分析师→写作者→编辑
  • 迭代优化:编辑可以退回给写作者修改
  • 条件判断:直到质量达标才结束
from typing import TypedDict, List
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage

# 定义状态
class ResearchState(TypedDict):
    topic: str
    research_notes: str
    analysis: str
    draft: str
    final_report: str

# 初始化模型
model = ChatOpenAI(model="gpt-4o")

# 节点1:研究员
def researcher(state: ResearchState):
    prompt = f"""你是一个研究员,请对以下主题进行深入研究:
    主题:{state['topic']}
    
    请提供:
    1. 关键事实和数据
    2. 主要观点和争议
    3. 相关引用来源
    """
    response = model.invoke([HumanMessage(content=prompt)])
    return {"research_notes": response.content}

# 节点2:分析师
def analyst(state: ResearchState):
    prompt = f"""你是一个分析师,请基于以下研究笔记进行分析:
    {state['research_notes']}
    
    请提供:
    1. 核心洞察
    2. 趋势分析
    3. 优缺点评估
    """
    response = model.invoke([HumanMessage(content=prompt)])
    return {"analysis": response.content}

# 节点3:写作者
def writer(state: ResearchState):
    prompt = f"""你是一个专业写作者,请基于研究和分析撰写最终报告:
    
    研究笔记:{state['research_notes']}
    分析:{state['analysis']}
    
    请用正式、清晰的报告格式输出,包含:
    - 执行摘要
    - 主要发现
    - 结论和建议
    """
    response = model.invoke([HumanMessage(content=prompt)])
    return {"draft": response.content}

# 节点4:编辑(质检)
def editor(state: ResearchState):
    prompt = f"""你是一个编辑,请审阅以下报告草稿并提出修改建议:
    {state['draft']}
    
    如果质量合格,输出"QUALITY_PASS"加上最终版本;
    如果需要改进,输出具体修改意见。
    """
    response = model.invoke([HumanMessage(content=prompt)])
    
    if "QUALITY_PASS" in response.content:
        final = response.content.replace("QUALITY_PASS", "").strip()
        return {"final_report": final}
    else:
        # 如果不合格,把修改意见返回给写作者(通过状态)
        return {"draft": f"修改意见:{response.content}\n\n请根据以上意见修改。"}

# 条件判断函数
def check_quality(state: ResearchState):
    """判断是否需要继续迭代"""
    if "final_report" in state and state["final_report"]:
        return "end"
    else:
        return "revise"

# 构建图
builder = StateGraph(ResearchState)

builder.add_node("researcher", researcher)
builder.add_node("analyst", analyst)
builder.add_node("writer", writer)
builder.add_node("editor", editor)

builder.set_entry_point("researcher")

builder.add_edge("researcher", "analyst")
builder.add_edge("analyst", "writer")
builder.add_edge("writer", "editor")

# 条件边:如果质检通过就结束,否则回写作者重写
builder.add_conditional_edges(
    "editor",
    check_quality,
    {
        "end": END,
        "revise": "writer"  # 回到writer节点继续修改
    }
)

research_app = builder.compile()

# 执行
result = research_app.invoke({"topic": "人工智能在医疗领域的应用"})
print(result["final_report"])

六、总结

6.1 核心关键点

  1. LangChain 1.0 工作流核心是 LangGraph:基于有向图的编排体系,替代旧版线性 Chain,支持分支、循环、多智能体等复杂逻辑;
  2. 核心组件:State(状态)、Node(节点)、Edge(边)、Checkpoint(状态持久化);
  3. 开发流程:定义状态 → 实现节点 → 编排图结构 → 编译运行 → 可视化调试;
  4. 核心特性:状态持久化、异步执行、异常重试、多智能体协作、可视化;
  5. 生产级关键:开启 Checkpoint、节点单一职责、流程简化、添加重试 / 兜底。

6.2 核心应用场景

场景 工作流设计要点
RAG 检索增强生成 拆分「文档加载→切分→嵌入→检索→生成」节点,支持检索重试
智能体工具调用 「参数提取→工具执行→结果处理→模型生成」分支流程,支持工具重试
多智能体协作 按职责拆分智能体节点,定义节点间数据传递规则
长文本生成 「分段生成→拼接→校验」循环流程,支持断点续生成
数据分析 「数据查询→清洗→分析→可视化」线性流程,支持异常兜底

6.3 生产级最佳实践

  1. 状态设计原则:
    • 仅存储核心数据(避免状态过大);
    • 使用强类型 TypedDict 定义状态(提升可维护性);
    • 状态字段命名统一(如 user_input/tool_results)。
  2. 节点设计原则:
    • 单一职责(一个节点只做一件事);
    • 节点逻辑可复用(封装为独立函数);
    • 节点输出仅更新状态(不做全局操作)。
  3. 流程设计原则:
    • 避免复杂嵌套分支(降低调试成本);
    • 关键节点添加重试 / 兜底;
    • 开启 Checkpoint 支持状态持久化。
  4. 性能优化:
    • 异步执行高耗时节点(如工具调用、模型生成);
    • 批量处理重复节点调用;
    • 缓存高频节点结果(如嵌入生成)。

6.4 常见问题与解决方案

问题 原因 解决方案
工作流死循环 条件边逻辑错误 增加最大循环次数限制、完善判断逻辑
状态数据丢失 未开启 Checkpoint 生产环境必须指定 checkpointer(Redis/MongoDB)
节点执行超时 模型 / 工具响应慢 增加节点超时控制、开启重试
可视化失败 依赖缺失(pygraphviz) 安装依赖或使用 Mermaid 格式可视化
多智能体数据冲突 状态字段未隔离 为不同智能体设置独立状态字段
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐