1. 引言

1.1 智能体的初始化:

  • 模型(Model):处理信息、做出决策的核心引擎
  • 工具(Tools):扩展模型能力,执行具体任务的功能集
  • 系统提示(System Prompt):引导模型行为的关键指令

初始化完成后,该智能体能够 🤖 自主学习如何调用工具,从而达成最终目标。此外,它还能生成结构化输出、集成记忆机制,并借助中间件协同处理,胜任更复杂的任务。

1.2 LangChain 核心概念

  • 理解智能体的三要素:模型、工具、提示词
  • 掌握消息(Messages)的四种类型和标准化处理
  • 学会使用 ToolRuntime 访问上下文和记忆
  • 理解短期记忆的四种管理策略
  • 掌握流式传输的多种模式
  • 学会定义和使用中间件

1.3 LangGraph 核心概念

  • 构建简单的状态图(计算器案例)
  • 实现复杂的多分支流程(邮件处理案例)
  • 使用条件边和中断实现动态路由
  • 集成持久化检查点管理会话
  • 添加人工审核环节

2. 模型(Model/LLM)

2.1 动态模型 vs 静态模型

模型类型 特点 应用场景
动态模型 根据输入上下文动态切换专用模型 多模态场景(文本+图像+音频)
静态模型 始终使用同一个固定的多模态大模型 统一接口、简化架构

例子: 动态模型中,用户传入图像时,先调用 VLM 进行内容识别,再将结果交由文本 LLM 生成响应;静态模型中,直接由多模态模型端到端处理。

2.2 大模型调用方法

2.2.1 单次调用:invoke()

实现单次无记忆的大模型调用

# 简单调用
response = model.invoke("Why do parrots have colorful feathers?")
print(response)

# 带对话历史的调用(具有记忆能力)
conversation = [
    {"role": "system", "content": "You are a helpful assistant that translates English to French."},
    {"role": "user", "content": "Translate: I love programming."},
    {"role": "assistant", "content": "J'adore la programmation."},
    {"role": "user", "content": "Translate: I love building applications."}
]

response = model.invoke(conversation)
print(response)  # AIMessage("J'adore créer des applications.")
2.2.2 流式输出:stream()

允许模型在生成响应过程中实时输出内容片段(Chunks),显著提升长文本生成的用户体验。

# 基础流式输出
for chunk in model.stream("天空是什么颜色的?"):
    for block in chunk.content_blocks:
        # 处理推理过程
        if block["type"] == "reasoning" and (reasoning := block.get("reasoning")):
            print(f"推理过程: {reasoning}")
        
        # 处理工具调用片段
        elif block["type"] == "tool_call_chunk":
            print(f"工具调用片段: {block}")
        
        # 处理普通文本输出
        elif block["type"] == "text":
            print(block["text"])

# 累加碎片重构完整消息
full = None  # None | AIMessageChunk
for chunk in model.stream("What color is the sky?"):
    full = chunk if full is None else full + chunk
    print(full.text)  # 逐步打印累积的文本

print(full.content_blocks)
# [{"type": "text", "text": "The sky is typically blue..."}]
2.2.3 批量输入输出:batch()

用于同时向模型发送多个独立的请求,通过并行处理显著提升效率。

# 同时发送三个请求
responses = model.batch([
    "将 'Hello' 翻译成中文",
    "将 'Goodbye' 翻译成中文",
    "将 'Thank you' 翻译成中文"
])

# 返回包含三个响应结果的列表
for response in responses:
    print(response.content)

# 限制并发数量
model.batch(
    large_list_of_inputs,
    config={"max_concurrency": 5}  # 最多同时 5 个并行请求
)

2.3 工具集

2.3.1 工具调用流程

工具调用的完整流程:

  1. LLM 分析请求 → 2. 识别需要的工具 → 3. 构造工具调用 → 4. 执行工具 → 5. 返回结果 → 6. LLM 处理结果
2.3.2 工具绑定与调用

绑定工具到模型:

from langchain_openai import ChatOpenAI
from langchain.tools import tool

@tool
def get_weather(location: str):
    """获取指定城市的当前天气。"""
    return f"{location}的天气晴朗,25度。"

model = ChatOpenAI(model="gpt-4o")
# 将工具绑定到模型
model_with_tools = model.bind_tools([get_weather])

# 调用模型
response = model_with_tools.invoke("上海天气怎么样?")
print(response.tool_calls)  # 包含工具调用列表

强制工具选择:

# 强制调用特定工具
model_with_forced_tool = model.bind_tools(
    [get_weather], 
    tool_choice="get_weather"
)

# 要求从提供的工具中选一个
model_with_any_tool = model.bind_tools(
    [get_weather, search_db], 
    tool_choice="required"
)

2.4 大模型结构化输出

支持三种结构化输出模式:PydanticTypedDictJSON Schema

Pydantic 方式:

from pydantic import BaseModel, Field

class Movie(BaseModel):
    """A movie with details."""
    title: str = Field(..., description="The title of the movie")
    year: int = Field(..., description="The year the movie was released")
    director: str = Field(..., description="The director of the movie")
    rating: float = Field(..., description="The movie's rating out of 10")

model_with_structure = model.with_structured_output(Movie)
response = model_with_structure.invoke("Provide details about the movie Inception")
print(response)  # Movie(title="Inception", year=2010, director="Christopher Nolan", rating=8.8)

TypedDict 方式:

from typing_extensions import TypedDict, Annotated

class MovieDict(TypedDict):
    """A movie with details."""
    title: Annotated[str, ..., "The title of the movie"]
    year: Annotated[int, ..., "The year the movie was released"]
    director: Annotated[str, ..., "The director of the movie"]
    rating: Annotated[float, ..., "The movie's rating out of 10"]

model_with_structure = model.with_structured_output(MovieDict)
response = model_with_structure.invoke("Provide details about the movie Inception")
print(response)  # {'title': 'Inception', 'year': 2010, 'director': 'Christopher Nolan', 'rating': 8.8}

JSON Schema 方式:

json_schema = {
    "title": "Movie",
    "description": "A movie with details",
    "type": "object",
    "properties": {
        "title": {"type": "string", "description": "The title of the movie"},
        "year": {"type": "integer", "description": "The year the movie was released"},
        "director": {"type": "string", "description": "The director of the movie"},
        "rating": {"type": "number", "description": "The movie's rating out of 10"}
    },
    "required": ["title", "year", "director", "rating"]
}

model_with_structure = model.with_structured_output(
    json_schema,
    method="json_schema",
)
response = model_with_structure.invoke("Provide details about the movie Inception")
print(response)  # {'title': 'Inception', 'year': 2010, ...}

2.5 其他高级用法

2.5.1 模型概况
# 获取模型基本信息
model.profile
# {
#   "max_input_tokens": 400000,
#   "image_inputs": True,
#   "reasoning_output": True,
#   "tool_calling": True,
#   ...
# }
2.5.2 多模态输出
# 部分模型支持输出多种格式内容
response = model.invoke("Create a picture of a cat")
print(response.content_blocks)
# [
#     {"type": "text", "text": "这是一只猫的图片"},
#     {"type": "image", "base64": "...", "mime_type": "image/jpeg"},
# ]
2.5.3 推理模型

许多模型能通过多步推理解决复杂问题:

# 流式输出推理过程
for chunk in model.stream("Why do parrots have colorful feathers?"):
    reasoning_steps = [r for r in chunk.content_blocks if r["type"] == "reasoning"]
    print(reasoning_steps if reasoning_steps else chunk.text)

# 输出完整的推理过程
response = model.invoke("Why do parrots have colorful feathers?")
reasoning_steps = [b for b in response.content_blocks if b["type"] == "reasoning"]
print(" ".join(step["reasoning"] for step in reasoning_steps))
2.5.4 invoke 额外配置
response = model.invoke(
    "Tell me a joke",
    config={
        "run_name": "joke_generation",      # 自定义运行名称
        "tags": ["humor", "demo"],          # 给调用打标签
        "metadata": {"user_id": "123"},     # 附加自定义元数据
        "callbacks": [my_callback_handler], # 注册回调处理器
    }
)
2.5.5 配置可变模型

允许在运行时动态切换模型和参数:

from langchain.chat_models import init_chat_model

# 创建可配置模型(不指定具体模型)
model = init_chat_model()

# 运行时指定 OpenAI
model.invoke(
    "你好", 
    config={"configurable": {"model": "gpt-4o"}}
)

# 或切换到 Anthropic
model.invoke(
    "你好", 
    config={"configurable": {"model": "claude-3-5-sonnet-20240620"}}
)

3. 消息(Messages)

3.1 消息的组成

消息是 LangChain 中模型上下文的基本单位,包含:

字段 说明 示例
role 消息类型标识 systemuserassistant
content 消息实际内容 文本、图像、音频、文档等
metadata 可选元数据 响应信息、消息ID、token使用等

3.2 消息传入方式

对象列表形式:

from langchain.messages import SystemMessage, HumanMessage, AIMessage

messages = [
    SystemMessage("You are a poetry expert"),
    HumanMessage("Write a haiku about spring"),
    AIMessage("Cherry blossoms bloom...")
]
response = model.invoke(messages)

字典形式:

messages = [
    {"role": "system", "content": "You are a poetry expert"},
    {"role": "user", "content": "Write a haiku about spring"},
    {"role": "assistant", "content": "Cherry blossoms bloom..."}
]
response = model.invoke(messages)

3.3 消息的四种类型

3.3.1 系统消息(SystemMessage)

表示一组初始指令,用于引导模型行为:

system_msg = SystemMessage(
    content="You are a helpful assistant that always responds in a friendly tone."
)
3.3.2 人类消息(HumanMessage)

表示用户输入和交互,支持多模态内容:

from langchain.messages import HumanMessage

human_msg = HumanMessage(
    content="Hello!",
    name="alice",  # 可选:标识用户
    id="msg_123",  # 可选:唯一标识符
)
3.3.3 AI消息(AIMessage)

表示模型调用的输出,包含丰富的属性:

from langchain.messages import AIMessage, SystemMessage, HumanMessage

# 手动创建 AI 消息
ai_msg = AIMessage("I'd be happy to help you with that question!")

# 添加到对话历史
messages = [
    SystemMessage("You are a helpful assistant"),
    HumanMessage("Can you help me?"),
    ai_msg,  # 插入就像来自模型一样
    HumanMessage("Great! What's 2+2?")
]

response = model.invoke(messages)

AI消息的关键属性:

  • text:消息的文本内容
  • content:消息的原始内容(字符串或字典列表)
  • content_blocks:标准化内容块列表
  • tool_calls:模型进行的工具调用(如果有)
  • id:消息的唯一标识符
  • usage_metadata:token计数等使用信息
  • response_metadata:响应元数据
3.3.4 工具消息(ToolMessage)

用于将工具执行结果传回模型:

from langchain.messages import ToolMessage

# 模型进行工具调用后
ai_message = AIMessage(
    content=[],
    tool_calls=[{
        "name": "get_weather",
        "args": {"location": "San Francisco"},
        "id": "call_123"
    }]
)

# 执行工具并创建结果消息
weather_result = "Sunny, 72°F"
tool_message = ToolMessage(
    content=weather_result,
    tool_call_id="call_123"  # 必须匹配调用 ID
)

# 继续对话
messages = [
    HumanMessage("What's the weather in San Francisco?"),
    ai_message,    # 模型的工具调用
    tool_message,  # 工具执行结果
]
response = model.invoke(messages)  # 模型处理结果

3.4 消息内容

消息的 content 属性支持字符串和未类型对象列表,允许直接支持多模态内容:

from langchain.messages import HumanMessage

# 字符串内容
human_message = HumanMessage("Hello, how are you?")

# 提供商原生格式(例如 OpenAI)
human_message = HumanMessage(content=[
    {"type": "text", "text": "Hello, how are you?"},
    {"type": "image_url", "image_url": {"url": "https://example.com/image.jpg"}}
])

# 标准内容块列表
human_message = HumanMessage(content_blocks=[
    {"type": "text", "text": "Hello, how are you?"},
    {"type": "image", "url": "https://example.com/image.jpg"},
])
3.4.1 标准化输入输出

不同模型厂商对推理过程的支持格式不一致:

厂商 格式 字段
Anthropic(Claude) "type": "thinking" thinkingsignature
OpenAI(o1、gpt-5等) "type": "reasoning" summary(可能包含多个子项)

LangChain 的 content_blocks 标准化处理:

from langchain.messages import AIMessage

# OpenAI 返回的原始格式
message = AIMessage(
    content=[
        {
            "type": "reasoning",
            "id": "rs_abc123",
            "summary": [
                {"type": "summary_text", "text": "summary 1"},
                {"type": "summary_text", "text": "summary 2"},
            ],
        },
        {"type": "text", "text": "...", "id": "msg_abc123"},
    ],
    response_metadata={"model_provider": "openai"}
)

# 使用 LangChain 统一格式输出
print(message.content_blocks)
# [{'type': 'reasoning', 'id': 'rs_abc123', 'reasoning': 'summary 1'},
#  {'type': 'reasoning', 'id': 'rs_abc123', 'reasoning': 'summary 2'},
#  {'type': 'text', 'text': '...', 'id': 'msg_abc123'}]
3.4.2 多模态支持矩阵
数据类型 URL输入 Base64编码输入 提供商文件ID
图像
PDF
音频
视频
3.4.3 内容块类型详解
类别 块类型 用途 标记字段
核心 TextContentBlock 标准文本输出 type=“text”
ReasoningContentBlock 模型推理步骤 type=“reasoning”
多模态 ImageContentBlock 图像数据 type=“image”
AudioContentBlock 音频数据 type=“audio”
VideoContentBlock 视频数据 type=“video”
FileContentBlock 通用文件(PDF等) type=“file”
PlainTextContentBlock 文档文本(.txt、.md) type=“text-plain”
工具调用 ToolCall 函数调用 type=“tool_call”
ToolCallChunk 流式工具调用片段 type=“tool_call_chunk”
InvalidToolCall 格式错误的调用 type=“invalid_tool_call”
服务器端工具 ServerToolCall 服务器端执行的工具调用 type=“server_tool_call”
ServerToolCallChunk 流式服务器端工具片段 type=“server_tool_call_chunk”
ServerToolResult 搜索结果 type=“server_tool_result”
提供商特定 NonStandardContentBlock 提供商特定的内容 type=“non_standard”

推理块输入示例:

{
    "type": "reasoning",
    "reasoning": "The user is asking about...",
    "extras": {"signature": "abc123"}
}

4. 工具(Tools)

4.1 自定义工具

创建工具最简单的方法是使用 @tool 装饰器。被大模型读取的部分:

  • 函数名 → 作为工具名称(除非手动指定)
  • 类型提示(Type hints) → 自动生成工具的 JSON Schema
  • 文档字符串(Docstring) → 作为工具描述,帮助模型决定何时调用
  • 默认值 → 可能体现在 schema 的 default 值中
  • 装饰器参数 → 如 name=description= 手动覆盖

不会被读取的部分:

  • 函数体代码逻辑
  • 函数内部注释(非 docstring)
from langchain.tools import tool

@tool
def multiply(a: int, b: int) -> int:
    """将 a 和 b 相乘。
    
    参数:
        a: 第一个整数
        b: 第二个整数
    """
    return a * b

# 自定义工具名称和描述
@tool("CustomName", description="Custom description here")
def my_function(x: str):
    """默认文档字符串会被覆盖"""
    return x.upper()

# 支持复杂输入架构
@tool(args_schema=ComplexSchema)
def complex_tool(data: dict):
    """接收复杂的参数结构"""
    return data

4.2 访问上下文:ToolRuntime

ToolRuntime 是一个统一的参数接口,允许工具在执行时直接访问智能体的状态、上下文和持久化记忆,但不将内部信息暴露给大模型。

无需手动实例化,LangChain 会在调用工具时自动"注入"进去。

使用 ToolRuntime 可以:

  • 读取当前对话状态 (runtime.state):获取消息列表或自定义中间变量
  • 访问外部配置 (runtime.context):获取用户ID等调用时传入的参数
  • 操作长效记忆 (runtime.store):跨会话存取数据

示例 1:保存用户笔记

@tool
def save_user_note(note: str, runtime: ToolRuntime):
    """为当前用户保存一条笔记。"""
    # 从 context 获取用户 ID
    user_id = runtime.context.user_id
    # 将笔记存入跨会话的长效记忆中
    runtime.store.put(("notes",), user_id, {"content": note})
    return f"已为用户 {user_id} 成功保存笔记。"

示例 2:提取用户笔记

@tool
def get_user_notes(runtime: ToolRuntime) -> str:
    """提取当前用户之前保存的所有笔记。"""
    # 从 context 获取当前用户的 ID
    user_id = runtime.context.user_id
    # 从 store 中提取数据(命名空间必须一致)
    stored_item = runtime.store.get(("notes",), user_id)
    
    if stored_item and stored_item.value: 
        note_content = stored_item.value.get("content")
        return f"找到您的笔记内容:{note_content}"
    return "您还没有保存过任何笔记。"

5. 短期记忆(Short-term Memory)

5.1 自定义状态

通过继承 AgentState 为智能体添加额外的上下文信息:

from langchain.agents import create_agent, AgentState
from langgraph.checkpoint.memory import InMemorySaver

class CustomAgentState(AgentState):
    user_id: str          # 自定义字段:用户ID
    preferences: dict     # 自定义字段:用户偏好
    # messages 是 AgentState 内置的

agent = create_agent(
    "openai:gpt-5",
    [get_user_info],
    state_schema=CustomAgentState,
    checkpointer=InMemorySaver(),
)

# 调用时传入自定义状态
result = agent.invoke(
    {
        "messages": [{"role": "user", "content": "Hello"}],
        "user_id": "user_123",
        "preferences": {"theme": "dark"}
    },
    {"configurable": {"thread_id": "1"}}
)

5.2 消息处理策略

处理长记忆通常有四种模式:修剪删除总结访问

5.2.1 修剪消息
from langchain.messages import RemoveMessage
from langgraph.graph.message import REMOVE_ALL_MESSAGES
from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import before_model
from langgraph.runtime import Runtime
from langchain_core.runnables import RunnableConfig
from typing import Any

@before_model
def trim_messages(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    """保留首条消息和最近几条交互,防止超出上下文窗口。"""
    messages = state["messages"]

    if len(messages) <= 3:
        return None  # 无需修剪

    first_msg = messages[0]
    recent_messages = messages[-3:] if len(messages) % 2 == 0 else messages[-4:]
    new_messages = [first_msg] + recent_messages

    return {
        "messages": [
            RemoveMessage(id=REMOVE_ALL_MESSAGES),  # 清空旧消息
            *new_messages  # 注入精简后的列表
        ]
    }

agent = create_agent(
    model,
    tools=tools,
    middleware=[trim_messages],
    checkpointer=InMemorySaver(),
)

# 使用 thread_id 保持会话一致性
config: RunnableConfig = {"configurable": {"thread_id": "1"}}

agent.invoke({"messages": "hi, my name is bob"}, config)
agent.invoke({"messages": "write a short poem about cats"}, config)
agent.invoke({"messages": "now do the same but for dogs"}, config)
final_response = agent.invoke({"messages": "what's my name?"}, config)

final_response["messages"][-1].pretty_print()
# 输出:Your name is Bob. You told me that earlier...

InMemorySaver 说明: 在同一 thread_id 下,持久化图的整个状态(包括消息历史、中间变量等),使后续调用可在此基础上继续执行。

5.2.2 删除消息
from langchain.messages import RemoveMessage

def delete_messages(state):
    """如果消息数量超过 2 条,删除最早的两条消息。"""
    messages = state["messages"]
    if len(messages) > 2:
        return {"messages": [RemoveMessage(id=m.id) for m in messages[:2]]}

# 一次性删除所有消息
RemoveMessage(id=REMOVE_ALL_MESSAGES)
5.2.3 总结消息

修剪或删除消息可能丢失信息,使用聊天模型总结历史记录:

from langchain.agents.middleware import SummarizationMiddleware

agent = create_agent(
    model="openai:gpt-4o",
    tools=[],
    middleware=[
        SummarizationMiddleware(
            model="openai:gpt-4o-mini",           # 用于总结的轻量模型
            max_tokens_before_summary=4000,       # 触发总结的token阈值
            messages_to_keep=20,                  # 保留的最近消息条数
        )
    ],
    checkpointer=checkpointer,
)

5.3 访问消息的四种方式

5.3.1 在工具中访问记忆
# 读取记忆
@tool
def get_user_info(runtime: ToolRuntime) -> str:
    """使用 ToolRuntime 访问短期记忆。"""
    # runtime 自动注入,对 LLM 隐藏
    return runtime.state["messages"][-1]

# 写入记忆(通过返回 Command)
from langgraph.types import Command

@tool
def update_context(info: str, runtime: ToolRuntime) -> Command:
    """工具返回 Command 对象会自动更新状态。"""
    return Command(
        update={"custom_field": info}
    )
5.3.2 在提示词中访问记忆
from langchain.agents.middleware import dynamic_prompt

@dynamic_prompt
def dynamic_system_prompt(request) -> str:
    """根据运行时上下文动态调整系统提示词。"""
    user_name = request.runtime.context["user_name"]
    return f"你是一个小助手,请称呼用户为:{user_name}."

agent = create_agent(
    model="openai:gpt-5-nano",
    tools=[get_weather],
    middleware=[dynamic_system_prompt],
    context_schema=CustomContext
)
5.3.3 在模型前后访问记忆
from langchain.agents.middleware import before_model, after_model

# 在模型调用前修改状态(如修剪消息)
@before_model
def prepare_context(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    """在 LLM 推理前调整上下文。"""
    # 访问和修改状态
    return {"messages": state["messages"]}

# 在模型调用后处理结果
@after_model
def process_response(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    """在 LLM 返回后进行后处理。"""
    return None

agent = create_agent(
    model,
    tools=tools,
    middleware=[prepare_context, process_response],
)

6. 流式传输(Streaming)

流式传输能够渐进式地显示输出,显著改善用户体验。LangGraph 支持多种流式模式(stream_mode),允许根据应用场景选择数据接收方式。

6.1 updates 模式(最常用)

适合跟踪哪些节点正在运行以及它们返回了什么:

# 仅流式传输状态更新
for chunk in graph.stream(
    {"messages": [("user", "你好")]}, 
    stream_mode="updates"
):
    # chunk 格式为 {"node_name": {"state_key": "update_value"}}
    print(chunk)
    # 输出示例:
    # {'llm_node': {'messages': [AIMessage(...)]}}
    # {'tool_node': {'messages': [ToolMessage(...)]}}

6.2 messages 模式

直接获取 LLM 生成的 Token,适合构建聊天机器人:

# 获取 LLM 的实时 Token 输出
for mode, chunk in graph.stream(
    {"messages": [("user", "写一首诗")]}, 
    stream_mode="messages"
):
    # chunk 是 (BaseMessageChunk, metadata) 的元组
    token, metadata = chunk
    if token.content:
        print(token.content, end="|", flush=True)
    # 输出示例:
    # 春|暖|花|开|白|云|飘...

6.3 混合模式输出

传入列表时,LangGraph 会并发流出多种数据:

# 同时监控节点更新和 Token 流
for mode, chunk in graph.stream(
    inputs, 
    stream_mode=["updates", "messages"]
):
    if mode == "updates":
        print(f"\n--- 节点运行完毕: {list(chunk.keys())[0]} ---")
    elif mode == "messages":
        # chunk 格式:(BaseMessageChunk, metadata)
        print(chunk[0].content, end="", flush=True)

7. 结构化输出(Structured Output)

7.1 基于供应商策略

某些模型提供商原生支持结构化输出(目前仅 OpenAIGrok)。在可用时,这是最可靠的方法:

from pydantic import BaseModel, Field

class ContactInfo(BaseModel):
    """联系方式信息。"""
    name: str = Field(..., description="联系人名字")
    email: str = Field(..., description="电子邮件地址")
    phone: str = Field(..., description="电话号码")

# 以下三种写法等价
agent = create_agent(
    model="openai:gpt-5",
    tools=tools,
    response_format=ContactInfo  # 简写(自动选择 ProviderStrategy)
)

# 标准写法
# response_format=ProviderStrategy(ContactInfo)

# 优先使用原生支持
# response_format=ToolStrategy(ContactInfo)

result = agent.invoke({
    "messages": [{
        "role": "user", 
        "content": "提取联系信息:John Doe, john@example.com, (555) 123-4567"
    }]
})

print(result["structured_response"])
# ContactInfo(name='John Doe', email='john@example.com', phone='(555) 123-4567')

7.2 工具调用策略

对于不支持原生结构化输出的模型,LangChain 使用工具调用实现相同效果:

from pydantic import BaseModel

class ProductReview(BaseModel):
    """产品评论分析。"""
    rating: int
    sentiment: str
    key_points: list[str]

agent = create_agent(
    model="openai:gpt-5",
    tools=tools,
    response_format=ToolStrategy(ProductReview)
)

result = agent.invoke({
    "messages": [{
        "role": "user", 
        "content": "分析评论:'很好的产品,5颗星。快速发货,但很贵'"
    }]
})

print(result["structured_response"])
# ProductReview(rating=5, sentiment='positive', key_points=['fast shipping', 'expensive'])

7.3 自定义消息内容

在工具策略中,可以添加额外参数自定义消息内容:

agent = create_agent(
    model="openai:gpt-5",
    tools=[],
    response_format=ToolStrategy(
        schema=MeetingAction,
        tool_message_content="行动事项已捕获并添加到会议记录中!"
    )
)

7.4 错误处理 ⚠️

当模型在工具调用中出现错误时,错误信息会返回到 ToolMessage:

# 示例错误信息
"""
Error: Failed to parse structured output for tool 'ProductRating': 
1 validation error for ProductRating.rating
  Input should be less than or equal to 5 
  [type=less_than_equal, input_value=10, input_type=int].
  Please fix your mistakes.
"""

处理错误的 5 种方法:

  1. 自定义错误消息:修改发送给 LLM 的错误消息
  2. 仅处理特定异常:如仅在 ValueError 时重试
  3. 处理多个异常类型:如在 ValueError 和 TypeError 时重试
  4. 自定义错误处理函数:完全自定义处理逻辑
  5. 不进行错误处理:让错误直接抛出

8. 中间件(Middleware)

8.1 预置中间件

LangChain 提供的常用中间件:

中间件 功能 应用场景
SummarizationMiddleware 自动摘要长对话 接近token限制时压缩历史
HumanInTheLoopMiddleware 暂停以供人工批准 重要工具调用需要审核
AnthropicPromptCachingMiddleware 缓存提示词前缀 使用 Anthropic 模型降低成本
ModelCallLimitMiddleware 限制模型调用次数 防止无限循环
ToolCallLimitMiddleware 限制工具调用次数 控制特定工具使用频率
ModelFallbackMiddleware 主模型失败时自动回退 增强系统可靠性
PIIMiddleware 检测和处理个人信息 保护敏感数据
LLMToolSelectorMiddleware 智能选择相关工具 减少工具数量,提升准确性
ToolRetryMiddleware 自动重试失败的工具调用 处理临时故障
ContextEditingMiddleware 管理对话上下文 优化上下文窗口使用

8.2 自定义中间件

8.2.1 使用装饰器定义中间件

节点式装饰器(在特定执行点运行):

from langchain.agents.middleware import before_agent, before_model, after_model, after_agent

# 代理启动前(每次调用一次)
@before_agent
def setup_agent(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    print("Agent starting...")
    return None

# 每次模型调用前
@before_model
def prepare_model(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    print("Preparing for model call...")
    return None

# 每次模型响应后
@after_model
def process_model_output(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    print("Processing model output...")
    return None

# 代理完成时(每次调用最多一次)
@after_agent
def cleanup_agent(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    print("Agent completed!")
    return None

包装式装饰器(拦截并控制执行):

from langchain.agents.middleware import wrap_model_call, wrap_tool_call

# 拦截每次模型调用
@wrap_model_call
async def log_model_calls(handler, request):
    """记录模型调用的输入和输出。"""
    print(f"Calling model with: {request.model}")
    response = await handler(request)
    print(f"Model returned: {response}")
    return response

# 拦截每次工具调用
@wrap_tool_call
async def log_tool_calls(handler, request):
    """记录工具调用和结果。"""
    tool_name = request.tool_name
    print(f"Executing tool: {tool_name}")
    result = await handler(request)
    print(f"Tool result: {result}")
    return result

便利装饰器

from langchain.agents.middleware import dynamic_prompt

# 生成动态系统提示词
@dynamic_prompt
def adaptive_system_prompt(request) -> str:
    """根据运行时信息动态生成提示词。"""
    user_level = request.runtime.context.get("user_level", "beginner")
    if user_level == "advanced":
        return "You are an expert assistant. Provide detailed, technical responses."
    else:
        return "You are a helpful assistant. Explain concepts clearly."
8.2.2 执行顺序(洋葱模型)

当提供中间件列表 [m1, m2, m3] 时:

Before execution:  m1 → m2 → m3 → Handler → m3 → m2 → m1
                   ↓正序执行before钩子        倒序执行after钩子↑

包装式:
m1 包裹 m2 包裹 m3 包裹最终的处理器

示例:中间件执行顺序可视化

@before_model
def m1_before(state, runtime):
    print("m1 before")
    return None

@before_model
def m2_before(state, runtime):
    print("m2 before")
    return None

@after_model
def m1_after(state, runtime):
    print("m1 after")
    return None

@after_model
def m2_after(state, runtime):
    print("m2 after")
    return None

agent = create_agent(model, tools, middleware=[m1_before, m2_before, m1_after, m2_after])

# 执行顺序:
# m1 before → m2 before → [模型调用] → m2 after → m1 after

9. 高级特性(Other Features)

9.1 守卫(Guards)

在中间件中定义个人信息检测和人工审批:

from langchain.agents.middleware import PIIMiddleware, HumanInTheLoopMiddleware

# PII 检测
pii_middleware = PIIMiddleware(
    keywords=["email", "phone", "address"],  # 要检测的敏感词
    redact=True  # 自动脱敏
)

# 人类在环
human_loop = HumanInTheLoopMiddleware(
    require_approval_for_tools=["delete_data", "send_email"],  # 需要批准的工具
)

agent = create_agent(
    model,
    tools=tools,
    middleware=[pii_middleware, human_loop]
)

9.2 运行时上下文(Runtime)

LangGraph 暴露的 Runtime 对象包含:

from langchain.agents import create_agent
from typing import TypedDict

# 自定义上下文类型
class CustomContext(TypedDict):
    user_id: str
    user_name: str
    api_key: str

agent = create_agent(
    model="openai:gpt-5-nano",
    tools=[...],
    context_schema=CustomContext  # 定义上下文结构
)

# 调用时传入上下文
agent.invoke(
    {"messages": [{"role": "user", "content": "What's my name?"}]},
    context=CustomContext(
        user_id="user_123",
        user_name="John Smith",
        api_key="sk-xxx..."
    )
)

Runtime 中的三个关键对象:

对象 说明 用途
Context 静态信息,调用时传入 用户ID、数据库连接等
Store BaseStore 实例,用于长期记忆 跨会话存取数据
StreamWriter 用于自定义流式传输 通过 “custom” 模式流出数据

在工具中访问 Runtime:

@tool
def access_runtime(runtime: ToolRuntime[CustomContext]) -> str:
    """工具自动接收 runtime 参数。"""
    # 访问上下文
    user_id = runtime.context.user_id
    
    # 访问或修改存储
    runtime.store.put(("users",), user_id, {"status": "active"})
    result = runtime.store.get(("users",), user_id)
    
    return f"User {user_id} processed"

在中间件中访问 Runtime:

# 节点式中间件(直接使用 runtime 参数)
@before_model
def node_style(state, runtime):
    user_id = runtime.context.user_id
    return None

# 包装式中间件(从 request 访问)
@wrap_model_call
async def wrap_style(handler, request):
    user_id = request.runtime.context.user_id
    return await handler(request)

9.3 上下文工程

9.3.1 模型上下文优化

模型上下文包括:系统提示、消息、工具、模型选择、响应格式

# 动态调整系统提示词
@dynamic_prompt
def smart_system_prompt(request) -> str:
    task_type = request.runtime.state.get("task_type")
    if task_type == "code":
        return "You are a senior software engineer. Provide production-ready code."
    else:
        return "You are a helpful assistant."

# 动态选择工具
def select_tools(state: AgentState, runtime: Runtime):
    """根据用户的查询动态选择相关工具。"""
    tools = get_all_tools()
    user_query = state["messages"][-1].content
    
    # 使用轻量级 LLM 选择工具
    selected = lightweight_model.select_tools(user_query, tools)
    return selected

# 动态选择模型
def choose_model(state: AgentState):
    """根据任务复杂度选择合适的模型。"""
    complexity = estimate_complexity(state["messages"])
    if complexity == "simple":
        return "gpt-4-turbo"  # 快速、便宜
    else:
        return "gpt-4"  # 更强大
9.3.2 工具上下文

工具需要的不仅仅是 LLM 的参数,还需要用户ID、API密钥等上下文信息:

@tool
def database_query(query: str, runtime: ToolRuntime[CustomContext]) -> str:
    """从数据库查询信息。"""
    # 从 runtime 获取用户 ID 进行权限检查
    user_id = runtime.context.user_id
    
    # 从 runtime 获取数据库连接
    db_connection = runtime.context.db_connection
    
    # 从状态获取查询历史
    history = runtime.state.get("query_history", [])
    
    # 执行查询...
    result = db_connection.execute(query)
    
    # 更新状态
    return Command(
        update={"query_history": history + [query]}
    )

9.4 模型上下文协议(MCP)

MCP 允许 LLM 通过标准化接口访问外部资源(数据库、API、文件系统等),而无需硬编码集成。

MCP 的核心优势:

  • 标准化的工具定义
  • 动态工具发现
  • 资源管理和权限控制
  • 可扩展的架构

9.5 人机交互

通过 interrupt() 实现智能体的暂停和恢复,允许人工干预:

from langgraph.types import interrupt, Command

@tool
def critical_action(action: str, runtime: ToolRuntime) -> Command:
    """执行关键操作前暂停以获得人工批准。"""
    # 暂停并等待人工决策
    human_decision = interrupt({
        "action": action,
        "request": "Please review and approve this critical action"
    })
    
    if human_decision.get("approved"):
        # 执行操作
        result = execute_action(action)
        return Command(update={"result": result})
    else:
        return Command(update={"result": "Action cancelled by human"})

9.6 多智能体系统

构建多个互相协作的智能体:

# Agent A: 分析师
analyst = create_agent(
    model="gpt-4",
    tools=[data_analysis_tools],
    name="analyst"
)

# Agent B: 执行者
executor = create_agent(
    model="gpt-4",
    tools=[action_tools],
    name="executor"
)

# Agent C: 协调者
coordinator = create_agent(
    model="gpt-4",
    tools=[analyst, executor],  # 可调用其他 agent
    name="coordinator"
)

9.7 检索增强生成(RAG)

将外部知识库集成到智能体中:

from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma

# 初始化向量数据库
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(documents, embeddings)

@tool
def search_knowledge_base(query: str) -> str:
    """从知识库中检索相关信息。"""
    # 向量相似性搜索
    results = vectorstore.similarity_search(query, k=3)
    # 格式化结果
    context = "\n".join([doc.page_content for doc in results])
    return context

agent = create_agent(
    model,
    tools=[search_knowledge_base, ...],
)

10. LangGraph 完整指南

10.1 简单案例:计算器智能体

第一步:定义工具和模型
from langchain_core.chat_models import init_chat_model
from langchain.tools import tool

# 初始化模型
model = init_chat_model(
    "claude-sonnet-4-5-20250929",
    temperature=0
)

# 定义数学工具
@tool
def multiply(a: int, b: int) -> int:
    """将 a 和 b 相乘。
    
    参数:
        a: 第一个整数
        b: 第二个整数
    """
    return a * b

@tool
def add(a: int, b: int) -> int:
    """将 a 和 b 相加。
    
    参数:
        a: 第一个整数
        b: 第二个整数
    """
    return a + b

@tool
def divide(a: int, b: int) -> float:
    """将 a 除以 b。
    
    参数:
        a: 被除数
        b: 除数
    """
    if b == 0:
        return "Error: Division by zero"
    return a / b

# 将工具绑定到模型
tools = [add, multiply, divide]
tools_by_name = {tool.name: tool for tool in tools}
model_with_tools = model.bind_tools(tools)
第二步:定义状态
from typing import TypedDict, Annotated
from langchain.messages import AnyMessage
import operator

class MessagesState(TypedDict):
    """定义智能体的状态结构。"""
    messages: Annotated[list[AnyMessage], operator.add]  # 消息列表(支持追加)
    llm_calls: int  # 追踪 LLM 调用次数

operator.add 说明: 确保新消息被追加到列表中,而不是替换整个列表。

第三步:定义节点
from langchain.messages import SystemMessage

def llm_call(state: dict):
    """LLM 节点:决定是否调用工具。"""
    return {
        "messages": [
            model_with_tools.invoke(
                [
                    SystemMessage(
                        content="你是一个有用的助手,负责对一组输入执行算术运算。"
                    )
                ]
                + state["messages"]
            )
        ],
        "llm_calls": state.get('llm_calls', 0) + 1
    }

def tool_node(state: dict):
    """工具节点:执行 LLM 的工具调用。"""
    result = []
    for tool_call in state["messages"][-1].tool_calls:
        # 根据工具名称查找并执行工具
        tool = tools_by_name[tool_call["name"]]
        observation = tool.invoke(tool_call["args"])
        # 创建工具结果消息
        from langchain.messages import ToolMessage
        result.append(
            ToolMessage(content=str(observation), tool_call_id=tool_call["id"])
        )
    return {"messages": result}
第四步:定义条件边
from typing import Literal
from langgraph.graph import END

def should_continue(state: MessagesState) -> Literal["tool_node", END]:
    """决定流程:继续执行工具或结束。"""
    messages = state["messages"]
    last_message = messages[-1]
    
    # 如果 LLM 调用了工具,则执行工具
    if last_message.tool_calls:
        return "tool_node"
    
    # 否则,返回给用户
    return END
第五步:构建并编译图
from langgraph.graph import StateGraph, START

# 创建图构建器
agent_builder = StateGraph(MessagesState)

# 添加节点
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("tool_node", tool_node)

# 添加边
agent_builder.add_edge(START, "llm_call")  # 开始 → LLM
agent_builder.add_conditional_edges(
    "llm_call",
    should_continue,
    ["tool_node", END]  # 条件边:工具或结束
)
agent_builder.add_edge("tool_node", "llm_call")  # 工具 → LLM

# 编译智能体
agent = agent_builder.compile()

# 可视化图结构
from IPython.display import Image, display
display(Image(agent.get_graph(xray=True).draw_mermaid_png()))
第六步:调用智能体
from langchain.messages import HumanMessage

# 创建输入
messages = [HumanMessage(content="3 加 4 等于多少?")]

# 调用智能体
result = agent.invoke({"messages": messages})

# 打印结果
for m in result["messages"]:
    m.pretty_print()

10.2 复杂案例:客户邮件处理系统

这是一个完整的企业级应用案例。

系统设计

任务描述: 自动处理客户邮件,包括分类、搜索相关文档、起草回复、升级处理等。

可能的邮件类型:

  • 简单的产品问题:“如何重置密码?”
  • 错误报告:“导出PDF时功能崩溃”
  • 紧急账单问题:“我被重复收费了!”
  • 功能请求:“能否添加深色模式?”
  • 复杂技术问题:“API集成间歇性失败,504错误”

工作流程图:

读取邮件 → 分类 → 分支:
                  ├─ 计费/紧急 → 人工审核
                  ├─ 问题/功能 → 搜索文档
                  ├─ BUG → 创建工单
                  └─ 其他 → 直接起草
                              ↓
                         起草回复 → 条件判断 → 发送/人工审核 → 结束
完整实现代码
"""
LangGraph 复杂案例:客户邮件处理系统
这是一个基于 LangGraph 的生产级邮件自动化解决方案
"""

from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command, RetryPolicy
from langchain.messages import HumanMessage
from langgraph.checkpoint.memory import MemorySaver
from pydantic import BaseModel, Field

# ============== 数据类型定义 ==============

class EmailClassification(TypedDict):
    """邮件分类结果。"""
    intent: Literal["question", "bug", "billing", "feature", "complex"]
    urgency: Literal["low", "medium", "high", "critical"]
    topic: str
    summary: str

class EmailAgentState(TypedDict):
    """智能体的完整状态。"""
    # 邮件数据
    email_content: str
    sender_email: str
    email_id: str
    
    # 处理结果
    classification: EmailClassification | None
    search_results: list[str] | None  # 知识库搜索结果
    customer_history: dict | None      # CRM 数据
    
    # 生成的内容
    draft_response: str | None
    messages: list[str] | None

# ============== 模型初始化 ==============

from langchain_community.chat_models import ChatTongyi
import os

llm = ChatTongyi(
    model="qwen-plus",
    api_key=os.getenv("QWEN_API_KEY")
)

# ============== 节点定义 ==============

def read_email(state: EmailAgentState) -> dict:
    """第一步:读取和准备邮件。"""
    return {
        "messages": [HumanMessage(content=f"处理邮件: {state['email_content']}")]
    }

def classify_intent(state: EmailAgentState) -> Command:
    """第二步:使用结构化输出分类邮件。"""
    # 使用结构化输出
    structured_llm = llm.with_structured_output(EmailClassification)
    
    classification_prompt = f"""
    分析下列客户邮件并分类:
    
    邮件内容:{state['email_content']}
    发件人:{state['sender_email']}
    
    请提供分类信息:意图、紧急程度、主题、摘要。
    """
    
    classification = structured_llm.invoke(classification_prompt)
    
    # 根据分类结果路由到不同节点
    if classification['intent'] == 'billing' or classification['urgency'] == 'critical':
        next_node = "human_review"
    elif classification['intent'] in ['question', 'feature']:
        next_node = "search_documentation"
    elif classification['intent'] == 'bug':
        next_node = "bug_tracking"
    else:
        next_node = "draft_response"
    
    return Command(
        update={"classification": classification},
        goto=next_node
    )

def search_documentation(state: EmailAgentState) -> Command:
    """第三步:搜索知识库中的相关文档。"""
    classification = state.get('classification', {})
    
    # 构建搜索查询
    query = f"{classification.get('intent', '')} {classification.get('topic', '')}"
    
    try:
        # 调用知识库或搜索 API
        # 这里使用模拟数据代替实际 API 调用
        search_results = [
            "从设置 > 安全 > 修改密码重置密码",
            "密码长度至少 12 个字符",
            "必须包含大小写字母、数字和特殊符号"
        ]
    except Exception as e:
        # API 出现可恢复的错误时,存储错误信息
        search_results = [f"搜索暂时不可用: {str(e)}"]
    
    return Command(
        update={"search_results": search_results},
        goto="draft_response"
    )

def bug_tracking(state: EmailAgentState) -> Command:
    """第三步(备选):为 BUG 问题创建工单。"""
    # 调用缺陷跟踪系统的 API
    ticket_id = "BUG-12345"  # 实际应通过 API 生成
    
    return Command(
        update={
            "search_results": [f"BUG工单 {ticket_id} 已创建"],
            "current_step": "bug_tracked"
        },
        goto="draft_response"
    )

def draft_response(state: EmailAgentState) -> Command:
    """第四步:根据上下文起草邮件回复。"""
    classification = state.get('classification', {})
    
    # 构建上下文
    context_sections = []
    
    if state.get('search_results'):
        # 格式化搜索结果
        formatted_docs = "\n".join([f"- {doc}" for doc in state['search_results']])
        context_sections.append(f"相关文档:\n{formatted_docs}")
    
    if state.get('customer_history'):
        # 格式化客户信息
        context_sections.append(f"客户等级: {state['customer_history'].get('tier', '标准')}")
    
    # 构建提示词
    draft_prompt = f"""
    请起草对下列客户邮件的回复:
    {state['email_content']}
    
    邮件意图:{classification.get('intent', '未知')}
    紧急程度:{classification.get('urgency', '中等')}
    
    {chr(10).join(context_sections)}
    
    回复指南:
    - 保持专业和友好的语气
    - 直接解决客户的具体问题
    - 在相关时使用提供的文档
    """
    
    response = llm.invoke(draft_prompt)
    
    # 根据紧急程度决定是否需要人工审核
    needs_review = (
        classification.get('urgency') in ['high', 'critical'] or
        classification.get('intent') == 'complex'
    )
    
    next_node = "human_review" if needs_review else "send_reply"
    
    return Command(
        update={"draft_response": response.content},
        goto=next_node
    )

def human_review(state: EmailAgentState) -> Command:
    """第五步:暂停以供人工审核。"""
    classification = state.get('classification', {})
    
    # 暂停流程,等待人工决策
    human_decision = interrupt({
        "email_id": state.get('email_id', ''),
        "original_email": state.get('email_content', ''),
        "draft_response": state.get('draft_response', ''),
        "urgency": classification.get('urgency'),
        "intent": classification.get('intent'),
        "action": "请审核并批准/编辑此回复"
    })
    
    # 处理人工决策
    if human_decision.get("approved"):
        # 使用编辑后的回复或原始回复
        edited = human_decision.get("edited_response", state.get('draft_response', ''))
        return Command(
            update={"draft_response": edited},
            goto="send_reply"
        )
    else:
        # 拒绝:由人工直接处理,结束流程
        return Command(update={}, goto=END)

def send_reply(state: EmailAgentState) -> dict:
    """第六步:发送邮件回复。"""
    print(f"发送回复: {state['draft_response']}...")
    # 在实际系统中,这里会调用邮件服务 API
    return {}

# ============== 构建图 ==============

workflow = StateGraph(EmailAgentState)

# 添加所有节点
workflow.add_node("read_email", read_email)
workflow.add_node("classify_intent", classify_intent)
workflow.add_node(
    "search_documentation",
    search_documentation,
    retry_policy=RetryPolicy(max_attempts=3)  # 设置重试策略
)
workflow.add_node("bug_tracking", bug_tracking)
workflow.add_node("draft_response", draft_response)
workflow.add_node("human_review", human_review)
workflow.add_node("send_reply", send_reply)

# 连接节点
workflow.add_edge(START, "read_email")
workflow.add_edge("read_email", "classify_intent")
workflow.add_edge("send_reply", END)

# 编译并配置持久化
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

# ============== 使用示例 ==============

# 创建测试邮件
initial_state = {
    "email_content": "我的订阅被重复收费了!这很紧急!",
    "sender_email": "customer@example.com",
    "email_id": "email_123",
    "messages": []
}

# 使用线程保持会话一致性
config = {"configurable": {"thread_id": "customer_123"}}

# 执行流程,会在需要人工审核时暂停
result = app.invoke(initial_state, config)

# 获取中断信息
if "__interrupt__" in result:
    print("等待人工审核中...")
    print(result['__interrupt__'])
    
    # 人工审核和决策
    user_input = input("你批准此回复吗?(yes/no): ")
    
    # 恢复流程
    human_response = Command(
        resume={
            "approved": user_input.lower() == "yes",
            "edited_response": "我们对此重复收费深表歉意。已立即启动退款流程..."
        }
    )
    
    final_result = app.invoke(human_response, config)
    print("邮件已发送!")
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐