本节介绍常见的工作流与智能体模式。

  • 工作流:拥有预先确定的代码执行路径,按固定顺序运行。

  • 智能体:动态自主,自行决定执行流程与工具使用方式。

使用 LangGraph 构建智能体与工作流有诸多优势,包括持久化、流式输出,同时支持调试与部署。

环境配置

构建工作流或智能体时,可使用任何支持结构化输出与工具调用的聊天模型。以下示例使用 Anthropic:

  1. 安装依赖:

pip install langchain_core langchain-anthropic langgraph
  1. 初始化大语言模型:

import os
import getpass
from langchain_anthropic import ChatAnthropic

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

_set_env("ANTHROPIC_API_KEY")
llm = ChatAnthropic(model="claude-sonnet-4-5-20250929")

大模型与增强能力

工作流与智能体系统基于大模型及其各类增强能力构建。

工具调用、结构化输出、短期记忆等能力可按需定制。


# 结构化输出 schema
from pydantic import BaseModel, Field

class SearchQuery(BaseModel):
    search_query: str = Field(None, description="优化后的网络搜索查询")
    justification: str = Field(
        None, description="说明该查询与用户请求相关的原因"
    )

# 为大模型添加结构化输出增强
structured_llm = llm.with_structured_output(SearchQuery)

# 调用增强后的大模型
output = structured_llm.invoke("钙CT评分与高胆固醇有何关联?")

# 定义工具
def multiply(a: int, b: int) -> int:
    return a * b

# 为大模型绑定工具
llm_with_tools = llm.bind_tools([multiply])

# 输入会触发工具调用的内容
msg = llm_with_tools.invoke("2乘以3等于多少?")

# 获取工具调用信息
msg.tool_calls

提示词链

提示词链指每次大模型调用都会处理上一次调用的输出,常用于可拆解为多个可验证小步骤的明确任务,例如:

  • 将文档翻译成不同语言

  • 校验生成内容的一致性

Graph API


from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from IPython.display import Image, display

# 图状态
class State(TypedDict):
    topic: str
    joke: str
    improved_joke: str
    final_joke: str

# 节点
def generate_joke(state: State):
    """第一次 LLM 调用:生成初始笑话"""
    msg = llm.invoke(f"写一个关于{state['topic']}的短笑话")
    return {"joke": msg.content}

def check_punchline(state: State):
    """判断笑话是否有笑点"""
    if "?" in state["joke"] or "!" in state["joke"]:
        return "Pass"
    return "Fail"

def improve_joke(state: State):
    """第二次 LLM 调用:优化笑话"""
    msg = llm.invoke(f"用文字游戏让这个笑话更有趣:{state['joke']}")
    return {"improved_joke": msg.content}

def polish_joke(state: State):
    """第三次 LLM 调用:最终润色"""
    msg = llm.invoke(f"给这个笑话加一个惊喜反转:{state['improved_joke']}")
    return {"final_joke": msg.content}

# 构建工作流
workflow = StateGraph(State)

# 添加节点
workflow.add_node("generate_joke", generate_joke)
workflow.add_node("improve_joke", improve_joke)
workflow.add_node("polish_joke", polish_joke)

# 连接节点
workflow.add_edge(START, "generate_joke")
workflow.add_conditional_edges(
    "generate_joke",
    check_punchline,
    {"Fail": "improve_joke", "Pass": END}
)
workflow.add_edge("improve_joke", "polish_joke")
workflow.add_edge("polish_joke", END)

# 编译
chain = workflow.compile()

# 展示流程图
display(Image(chain.get_graph().draw_mermaid_png()))

# 调用
state = chain.invoke({"topic": "cats"})
print("Initial joke:")
print(state["joke"])
print("\n--- --- ---\n")
if "improved_joke" in state:
    print("Improved joke:")
    print(state["improved_joke"])
    print("\n--- --- ---\n")
    print("Final joke:")
    print(state["final_joke"])
else:
    print("Final joke:")
    print(state["joke"])

Functional API


from langgraph.func import entrypoint, task

# 任务
@task
def generate_joke(topic: str):
    """第一次 LLM 调用:生成初始笑话"""
    msg = llm.invoke(f"写一个关于{topic}的短笑话")
    return msg.content

def check_punchline(joke: str):
    """判断笑话是否有笑点"""
    if "?" in joke or "!" in joke:
        return "Pass"
    return "Fail"

@task
def improve_joke(joke: str):
    """第二次 LLM 调用:优化笑话"""
    msg = llm.invoke(f"用文字游戏让这个笑话更有趣:{joke}")
    return msg.content

@task
def polish_joke(joke: str):
    """第三次 LLM 调用:最终润色"""
    msg = llm.invoke(f"给这个笑话加一个惊喜反转:{joke}")
    return msg.content

@entrypoint()
def prompt_chaining_workflow(topic: str):
    original_joke = generate_joke(topic).result()
    if check_punchline(original_joke) == "Pass":
        return original_joke
    improved_joke = improve_joke(original_joke).result()
    return polish_joke(improved_joke).result()

# 调用
for step in prompt_chaining_workflow.stream("cats", stream_mode="updates"):
    print(step)
    print("\n")

并行执行

并行执行指多个大模型同时处理任务,可同时运行多个独立子任务,或多次运行同一任务以对比输出。

常见用途:

  • 拆分子任务并行执行,提升速度

  • 多次运行任务以验证输出,提高可靠性

例如:

  • 一个子任务提取文档关键词,另一个检查格式错误

  • 用不同标准多次评分,评估文档准确性

Graph API


# 图状态
class State(TypedDict):
    topic: str
    joke: str
    story: str
    poem: str
    combined_output: str

# 节点
def call_llm_1(state: State):
    """生成笑话"""
    msg = llm.invoke(f"写一个关于{state['topic']}的笑话")
    return {"joke": msg.content}

def call_llm_2(state: State):
    """生成故事"""
    msg = llm.invoke(f"写一个关于{state['topic']}的故事")
    return {"story": msg.content}

def call_llm_3(state: State):
    """生成诗歌"""
    msg = llm.invoke(f"写一首关于{state['topic']}的诗")
    return {"poem": msg.content}

def aggregator(state: State):
    """合并结果"""
    combined = f"这是关于{state['topic']}的故事、笑话和诗!\n\n"
    combined += f"故事:\n{state['story']}\n\n"
    combined += f"笑话:\n{state['joke']}\n\n"
    combined += f"诗:\n{state['poem']}"
    return {"combined_output": combined}

# 构建并行工作流
parallel_builder = StateGraph(State)

parallel_builder.add_node("call_llm_1", call_llm_1)
parallel_builder.add_node("call_llm_2", call_llm_2)
parallel_builder.add_node("call_llm_3", call_llm_3)
parallel_builder.add_node("aggregator", aggregator)

parallel_builder.add_edge(START, "call_llm_1")
parallel_builder.add_edge(START, "call_llm_2")
parallel_builder.add_edge(START, "call_llm_3")
parallel_builder.add_edge("call_llm_1", "aggregator")
parallel_builder.add_edge("call_llm_2", "aggregator")
parallel_builder.add_edge("call_llm_3", "aggregator")
parallel_builder.add_edge("aggregator", END)

parallel_workflow = parallel_builder.compile()

# 展示流程图
display(Image(parallel_workflow.get_graph().draw_mermaid_png()))

# 调用
state = parallel_workflow.invoke({"topic": "cats"})
print(state["combined_output"])

Functional API


@task
def call_llm_1(topic: str):
    """生成笑话"""
    msg = llm.invoke(f"写一个关于{topic}的笑话")
    return msg.content

@task
def call_llm_2(topic: str):
    """生成故事"""
    msg = llm.invoke(f"写一个关于{topic}的故事")
    return msg.content

@task
def call_llm_3(topic):
    """生成诗歌"""
    msg = llm.invoke(f"写一首关于{topic}的诗")
    return msg.content

@task
def aggregator(topic, joke, story, poem):
    """合并结果"""
    combined = f"这是关于{topic}的故事、笑话和诗!\n\n"
    combined += f"故事:\n{story}\n\n"
    combined += f"笑话:\n{joke}\n\n"
    combined += f"诗:\n{poem}"
    return combined

@entrypoint()
def parallel_workflow(topic: str):
    joke_fut = call_llm_1(topic)
    story_fut = call_llm_2(topic)
    poem_fut = call_llm_3(topic)
    return aggregator(
        topic,
        joke_fut.result(),
        story_fut.result(),
        poem_fut.result()
    ).result()

# 调用
for step in parallel_workflow.stream("cats", stream_mode="updates"):
    print(step)
    print("\n")

路由

路由工作流会先处理输入,再导向对应上下文的专用任务,可实现复杂任务的专用流程。

例如:回答产品相关问题时,先判断问题类型,再路由到定价、退款、退货等专用流程。

Graph API


from typing_extensions import Literal
from langchain.messages import HumanMessage, SystemMessage

# 路由逻辑的结构化输出 schema
class Route(BaseModel):
    step: Literal["poem", "story", "joke"] = Field(
        None, description="路由流程的下一步"
    )

# 路由大模型
router = llm.with_structured_output(Route)

# 状态
class State(TypedDict):
    input: str
    decision: str
    output: str

# 节点
def llm_call_1(state: State):
    """写故事"""
    result = llm.invoke(state["input"])
    return {"output": result.content}

def llm_call_2(state: State):
    """写笑话"""
    result = llm.invoke(state["input"])
    return {"output": result.content}

def llm_call_3(state: State):
    """写诗歌"""
    result = llm.invoke(state["input"])
    return {"output": result.content}

def llm_call_router(state: State):
    """路由到对应节点"""
    decision = router.invoke(
        [
            SystemMessage(content="根据用户请求,将输入路由到故事、笑话或诗歌。"),
            HumanMessage(content=state["input"]),
        ]
    )
    return {"decision": decision.step}

def route_decision(state: State):
    """条件边函数:路由到对应节点"""
    if state["decision"] == "story":
        return "llm_call_1"
    elif state["decision"] == "joke":
        return "llm_call_2"
    elif state["decision"] == "poem":
        return "llm_call_3"

# 构建路由工作流
router_builder = StateGraph(State)

router_builder.add_node("llm_call_1", llm_call_1)
router_builder.add_node("llm_call_2", llm_call_2)
router_builder.add_node("llm_call_3", llm_call_3)
router_builder.add_node("llm_call_router", llm_call_router)

router_builder.add_edge(START, "llm_call_router")
router_builder.add_conditional_edges(
    "llm_call_router",
    route_decision,
    {
        "llm_call_1": "llm_call_1",
        "llm_call_2": "llm_call_2",
        "llm_call_3": "llm_call_3",
    },
)
router_builder.add_edge("llm_call_1", END)
router_builder.add_edge("llm_call_2", END)
router_builder.add_edge("llm_call_3", END)

router_workflow = router_builder.compile()

# 展示流程图
display(Image(router_workflow.get_graph().draw_mermaid_png()))

# 调用
state = router_workflow.invoke({"input": "给我写一个关于猫的笑话"})
print(state["output"])

Functional API


from typing_extensions import Literal
from pydantic import BaseModel, Field
from langchain.messages import HumanMessage, SystemMessage

# 路由逻辑的结构化输出 schema
class Route(BaseModel):
    step: Literal["poem", "story", "joke"] = Field(
        None, description="路由流程的下一步"
    )

router = llm.with_structured_output(Route)

@task
def llm_call_1(input_: str):
    """写故事"""
    result = llm.invoke(input_)
    return result.content

@task
def llm_call_2(input_: str):
    """写笑话"""
    result = llm.invoke(input_)
    return result.content

@task
def llm_call_3(input_: str):
    """写诗歌"""
    result = llm.invoke(input_)
    return result.content

def llm_call_router(input_: str):
    """路由到对应节点"""
    decision = router.invoke(
        [
            SystemMessage(content="根据用户请求,将输入路由到故事、笑话或诗歌。"),
            HumanMessage(content=input_),
        ]
    )
    return decision.step

@entrypoint()
def router_workflow(input_: str):
    next_step = llm_call_router(input_)
    if next_step == "story":
        llm_call = llm_call_1
    elif next_step == "joke":
        llm_call = llm_call_2
    elif next_step == "poem":
        llm_call = llm_call_3
    return llm_call(input_).result()

# 调用
for step in router_workflow.stream("给我写一个关于猫的笑话", stream_mode="updates"):
    print(step)
    print("\n")

调度器-工作器模式

在调度器-工作器架构中:

  • 调度器:将任务拆分为子任务 → 分配给工作器 → 合并输出得到最终结果

  • 工作器:执行具体子任务

该模式比并行执行更灵活,常用于子任务无法预先定义的场景,如编写代码、批量更新多文档内容。

Graph API


from typing import Annotated, List
import operator

# 规划用结构化输出 schema
class Section(BaseModel):
    name: str = Field(description="报告章节名称")
    description: str = Field(
        description="本章主要内容与概念的简要概述"
    )

class Sections(BaseModel):
    sections: List[Section] = Field(description="报告章节列表")

planner = llm.with_structured_output(Sections)

Functional API


from typing import List

# 规划用结构化输出 schema
class Section(BaseModel):
    name: str = Field(description="报告章节名称")
    description: str = Field(
        description="本章主要内容与概念的简要概述"
    )

class Sections(BaseModel):
    sections: List[Section] = Field(description="报告章节列表")

planner = llm.with_structured_output(Sections)

@task
def orchestrator(topic: str):
    """调度器:生成报告计划"""
    report_sections = planner.invoke(
        [
            SystemMessage(content="生成一份报告大纲。"),
            HumanMessage(content=f"报告主题:{topic}"),
        ]
    )
    return report_sections.sections

@task
def llm_call(section: Section):
    """工作器:撰写报告章节"""
    result = llm.invoke(
        [
            SystemMessage(content="撰写报告章节。"),
            HumanMessage(
                content=f"章节名称:{section.name},章节描述:{section.description}"
            ),
        ]
    )
    return result.content

@task
def synthesizer(completed_sections: list[str]):
    """合成完整报告"""
    final_report = "\n\n---\n\n".join(completed_sections)
    return final_report

@entrypoint()
def orchestrator_worker(topic: str):
    sections = orchestrator(topic).result()
    section_futures = [llm_call(section) for section in sections]
    final_report = synthesizer(
        [section_fut.result() for section_fut in section_futures]
    ).result()
    return final_report

# 调用
report = orchestrator_worker.invoke("生成一份关于大模型缩放定律的报告")
from IPython.display import Markdown
Markdown(report)

在 LangGraph 中创建工作器

LangGraph 内置支持调度器-工作器模式。Send API 可动态创建工作器节点并分发专属输入。

每个工作器拥有独立状态,所有输出写入共享状态键,供调度器读取与合并。


from langgraph.types import Send

# 图状态
class State(TypedDict):
    topic: str                # 报告主题
    sections: list[Section]   # 报告章节列表
    completed_sections: Annotated[list, operator.add]  # 并行写入
    final_report: str         # 最终报告

# 工作器状态
class WorkerState(TypedDict):
    section: Section
    completed_sections: Annotated[list, operator.add]

# 节点
def orchestrator(state: State):
    """调度器:生成报告计划"""
    report_sections = planner.invoke(
        [
            SystemMessage(content="生成一份报告大纲。"),
            HumanMessage(content=f"报告主题:{state['topic']}"),
        ]
    )
    return {"sections": report_sections.sections}

def llm_call(state: WorkerState):
    """工作器:撰写报告章节"""
    section = llm.invoke(
        [
            SystemMessage(
                content="按名称与描述撰写报告章节,使用 Markdown 格式,不要前言。"
            ),
            HumanMessage(
                content=f"章节名称:{state['section'].name},章节描述:{state['section'].description}"
            ),
        ]
    )
    return {"completed_sections": [section.content]}

def synthesizer(state: State):
    """合成完整报告"""
    completed_sections = state["completed_sections"]
    completed_report_sections = "\n\n---\n\n".join(completed_sections)
    return {"final_report": completed_report_sections}

def assign_workers(state: State):
    """为每个章节分配工作器"""
    return [Send("llm_call", {"section": s}) for s in state["sections"]]

# 构建工作流
orchestrator_worker_builder = StateGraph(State)

orchestrator_worker_builder.add_node("orchestrator", orchestrator)
orchestrator_worker_builder.add_node("llm_call", llm_call)
orchestrator_worker_builder.add_node("synthesizer", synthesizer)

orchestrator_worker_builder.add_edge(START, "orchestrator")
orchestrator_worker_builder.add_conditional_edges(
    "orchestrator",
    assign_workers,
    ["llm_call"]
)
orchestrator_worker_builder.add_edge("llm_call", "synthesizer")
orchestrator_worker_builder.add_edge("synthesizer", END)

orchestrator_worker = orchestrator_worker_builder.compile()

# 展示流程图
display(Image(orchestrator_worker.get_graph().draw_mermaid_png()))

# 调用
state = orchestrator_worker.invoke({"topic": "生成一份关于大模型缩放定律的报告"})
from IPython.display import Markdown
Markdown(state["final_report"])

评估器-优化器

在评估器-优化器工作流中:

  1. 一个大模型生成回答

  2. 另一个大模型评估回答

  3. 若评估器或人工介入认为需要优化,则给出反馈并重新生成

  4. 循环直到生成满意结果

适用于有明确成功标准但需要迭代优化的任务,如翻译、文案润色等。

Graph API


# 图状态
class State(TypedDict):
    joke: str
    topic: str
    feedback: str
    funny_or_not: str

# 评估用结构化输出 schema
class Feedback(BaseModel):
    grade: Literal["funny", "not funny"] = Field(
        description="判断笑话是否好笑"
    )
    feedback: str = Field(
        description="如果不好笑,提供改进建议"
    )

evaluator = llm.with_structured_output(Feedback)

# 节点
def llm_call_generator(state: State):
    """生成笑话"""
    if state.get("feedback"):
        msg = llm.invoke(
            f"写一个关于{state['topic']}的笑话,并参考反馈:{state['feedback']}"
        )
    else:
        msg = llm.invoke(f"写一个关于{state['topic']}的笑话")
    return {"joke": msg.content}

def llm_call_evaluator(state: State):
    """评估笑话"""
    grade = evaluator.invoke(f"评价这个笑话:{state['joke']}")
    return {"funny_or_not": grade.grade, "feedback": grade.feedback}

def route_joke(state: State):
    """根据评估结果路由:接受或重新生成"""
    if state["funny_or_not"] == "funny":
        return "Accepted"
    elif state["funny_or_not"] == "not funny":
        return "Rejected + Feedback"

# 构建优化工作流
optimizer_builder = StateGraph(State)

optimizer_builder.add_node("llm_call_generator", llm_call_generator)
optimizer_builder.add_node("llm_call_evaluator", llm_call_evaluator)

optimizer_builder.add_edge(START, "llm_call_generator")
optimizer_builder.add_edge("llm_call_generator", "llm_call_evaluator")
optimizer_builder.add_conditional_edges(
    "llm_call_evaluator",
    route_joke,
    {
        "Accepted": END,
        "Rejected + Feedback": "llm_call_generator",
    },
)

optimizer_workflow = optimizer_builder.compile()

# 展示流程图
display(Image(optimizer_workflow.get_graph().draw_mermaid_png()))

# 调用
state = optimizer_workflow.invoke({"topic": "Cats"})
print(state["joke"])

Functional API


# 评估用结构化输出 schema
class Feedback(BaseModel):
    grade: Literal["funny", "not funny"] = Field(
        description="判断笑话是否好笑"
    )
    feedback: str = Field(
        description="如果不好笑,提供改进建议"
    )

evaluator = llm.with_structured_output(Feedback)

@task
def llm_call_generator(topic: str, feedback: Feedback):
    """生成笑话"""
    if feedback:
        msg = llm.invoke(
            f"写一个关于{topic}的笑话,并参考反馈:{feedback}"
        )
    else:
        msg = llm.invoke(f"写一个关于{topic}的笑话")
    return msg.content

@task
def llm_call_evaluator(joke: str):
    """评估笑话"""
    return evaluator.invoke(f"评价这个笑话:{joke}")

@entrypoint()
def optimizer_workflow(topic: str):
    feedback = None
    while True:
        joke = llm_call_generator(topic, feedback).result()
        feedback = llm_call_evaluator(joke).result()
        if feedback.grade == "funny":
            break
    return joke

# 调用
for step in optimizer_workflow.stream("Cats", stream_mode="updates"):
    print(step)
    print("\n")

智能体

智能体通常由大模型 + 工具实现,运行在持续反馈循环中,适用于问题与解法不可预知的场景。

智能体比工作流更自主,可自主决定使用哪些工具、如何解决问题,你仍可定义可用工具集与行为规范。


# 使用工具
from langchain.tools import tool

# 定义工具
@tool
def multiply(a: int, b: int) -> int:
    """乘法计算 a * b
    参数:
    a:第一个整数
    b:第二个整数
    """
    return a * b

@tool
def add(a: int, b: int) -> int:
    """加法计算 a + b
    参数:
    a:第一个整数
    b:第二个整数
    """
    return a + b

@tool
def divide(a: int, b: int) -> float:
    """除法计算 a / b
    参数:
    a:第一个整数
    b:第二个整数
    """
    return a / b

# 绑定工具
tools = [add, multiply, divide]
tools_by_name = {tool.name: tool for tool in tools}
llm_with_tools = llm.bind_tools(tools)

Graph API


from langgraph.graph import MessagesState
from langchain.messages import SystemMessage, HumanMessage, ToolMessage

# 节点
def llm_call(state: MessagesState):
    """大模型决定是否调用工具"""
    return {
        "messages": [
            llm_with_tools.invoke(
                [SystemMessage(content="你是一个执行算术运算的助手")]
                + state["messages"]
            )
        ]
    }

def tool_node(state: dict):
    """执行工具调用"""
    result = []
    for tool_call in state["messages"][-1].tool_calls:
        tool = tools_by_name[tool_call["name"]]
        observation = tool.invoke(tool_call["args"])
        result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))
    return {"messages": result}

def should_continue(state: MessagesState) -> Literal["tool_node", END]:
    """判断是否继续循环:是否有工具调用"""
    messages = state["messages"]
    last_message = messages[-1]
    if last_message.tool_calls:
        return "tool_node"
    return END

# 构建智能体
agent_builder = StateGraph(MessagesState)

agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("tool_node", tool_node)

agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges(
    "llm_call",
    should_continue,
    ["tool_node", END]
)
agent_builder.add_edge("tool_node", "llm_call")

agent = agent_builder.compile()

# 展示流程图
display(Image(agent.get_graph(xray=True).draw_mermaid_png()))

# 调用
messages = [HumanMessage(content="3加4等于多少?")]
messages = agent.invoke({"messages": messages})
for m in messages["messages"]:
    m.pretty_print()

Functional API


from langgraph.graph import add_messages
from langchain.messages import (
    SystemMessage, HumanMessage, ToolCall
)
from langchain_core.messages import BaseMessage

@task
def call_llm(messages: list[BaseMessage]):
    """大模型决定是否调用工具"""
    return llm_with_tools.invoke(
        [SystemMessage(content="你是一个执行算术运算的助手")]
        + messages
    )

@task
def call_tool(tool_call: ToolCall):
    """执行工具调用"""
    tool = tools_by_name[tool_call["name"]]
    return tool.invoke(tool_call)

@entrypoint()
def agent(messages: list[BaseMessage]):
    llm_response = call_llm(messages).result()
    while True:
        if not llm_response.tool_calls:
            break
        # 执行工具
        tool_result_futures = [
            call_tool(tool_call) for tool_call in llm_response.tool_calls
        ]
        tool_results = [fut.result() for fut in tool_result_futures]
        messages = add_messages(messages, [llm_response, *tool_results])
        llm_response = call_llm(messages).result()
    messages = add_messages(messages, llm_response)
    return messages

# 调用
messages = [HumanMessage(content="3加4等于多少?")]
for chunk in agent.stream(messages, stream_mode="updates"):
    print(chunk)
    print("\n")
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐