LangChain Streaming-Overview:流式处理使用完全指南

LangChain 的流式处理是提升大语言模型(LLM)应用响应性的核心功能,通过渐进式输出结果(无需等待完整响应生成),大幅优化用户体验,尤其适用于处理 LLM 延迟较高的场景(如长文本生成、多工具调用、复杂逻辑推理)。

流式处理的核心价值在于「实时反馈」——用户无需等待最终结果,即可看到中间过程(如工具调用进度、令牌生成、自定义状态更新),让交互更流畅、透明。

一、核心概念与流式模式 🚀

1.1 什么是 LangChain 流式处理?

LangChain 的流式系统允许从智能体(Agent)运行中实时提取反馈,支持 4 类核心能力:

  • 📊 流智能体进度:每一步执行后输出状态更新(如工具调用请求、工具执行结果);

  • 🔤 流 LLM 令牌:模型生成令牌时实时输出(类似 ChatGPT 的打字机效果);

  • 🎯 流自定义更新:输出用户定义的信号(如「已获取 10/100 条数据」);

  • 📦 多模式流式:同时启用多种流式模式(如进度+自定义更新)。

1.2 支持的 3 大核心流式模式

通过 stream_mode 参数指定流式模式(支持单个模式或列表形式的多模式),核心模式对比如下:

模式 描述 输出格式 典型场景
updates 智能体每步执行后的状态更新(如模型调用、工具执行) 字典(含步骤名、消息内容等) 跟踪多步骤任务进度(如「模型思考中→工具调用中→结果生成中」)
messages LLM 生成的令牌+元数据(如节点名称、令牌类型) 元组 (token, metadata) 实现打字机式实时输出、跟踪工具调用的增量生成
custom 自定义数据流(通过流写入器发送任意数据) 用户定义的任意格式(字符串、字典等) 长任务进度反馈(如「正在查询数据库→已完成 50%→已完成」)

1.3 关键 API 与工具

  • agent.stream():同步流式调用(最常用);

  • agent.astream():异步流式调用(适用于异步应用);

  • get_stream_writer():自定义流写入器(用于 custom 模式发送自定义数据);

  • AIMessageChunk:LLM 生成的令牌片段(messages 模式的核心数据类型)。

二、核心流式模式用法详解 🛠️

2.1 模式 1:updates - 流智能体进度

核心逻辑:智能体每完成一步执行(模型调用、工具执行),就输出一次状态更新,适合跟踪任务执行流程。

代码示例+详解(天气查询智能体)
from langchain.agents import create_agent

# 1. 定义工具:查询天气
def get_weather(city: str) -> str:
    """查询指定城市的天气"""
    return f"{city} 的天气:晴,25℃"

# 2. 创建智能体(绑定模型和工具)
# model:使用的LLM(如gpt-5-nano、gpt-4o)
agent = create_agent(
    model="gpt-5-nano",
    tools=[get_weather],  # 绑定天气查询工具
)

# 3. 流式调用智能体,stream_mode="updates"
# 输入:用户查询旧金山天气
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "旧金山的天气怎么样?"}]},
    stream_mode="updates",  # 启用进度流式
):
    # chunk 是字典,key为步骤名(model/tools),value为该步骤的状态数据
    for step, data in chunk.items():
        print(f"🔧 步骤:{step}")
        # 输出该步骤的最新消息内容(content_blocks 包含消息类型和文本)
        print(f"📄 内容:{data['messages'][-1].content_blocks}\n")
输出结果(清晰展示每一步进度)
🔧 步骤:model
📄 内容:[{'type': 'tool_call', 'name': 'get_weather', 'args': {'city': '旧金山'}, 'id': 'call_OW2NYNsNSKhRZpjW0wm2Aszd'}]

🔧 步骤:tools
📄 内容:[{'type': 'text', 'text': '旧金山 的天气:晴,25℃'}]

🔧 步骤:model
📄 内容:[{'type': 'text', 'text': '旧金山的天气:晴,25℃'}]
关键要点
  • 步骤流程:model(模型决定调用工具)→ tools(工具执行)→ model(模型生成最终回复);

  • data['messages'][-1]:获取该步骤的最新消息(避免冗余历史);

  • 适用场景:任务拆解、多工具调用的流程跟踪(如「订单查询→支付验证→物流跟踪」)。

2.2 模式 2:messages - 流 LLM 令牌

核心逻辑:LLM 生成令牌(字符/单词)时实时输出,实现「打字机效果」,同时返回元数据(如生成令牌的节点名称)。

代码示例+详解(实时输出回复令牌)
from langchain.agents import create_agent

# 1. 复用天气查询工具
def get_weather(city: str) -> str:
    """查询指定城市的天气"""
    return f"{city} 的天气:晴,25℃"

# 2. 创建智能体
agent = create_agent(
    model="gpt-5-nano",
    tools=[get_weather],
)

# 3. 流式调用,stream_mode="messages"
for token, metadata in agent.stream(
    {"messages": [{"role": "user", "content": "旧金山的天气怎么样?"}]},
    stream_mode="messages",  # 启用令牌流式
):
    # token:LLM生成的令牌片段(AIMessageChunk类型)
    # metadata:元数据(含生成令牌的节点名称 langgraph_node)
    print(f"📍 节点:{metadata['langgraph_node']}")
    print(f"✅ 令牌片段:{token.content_blocks}\n")
输出结果(截取核心部分)
📍 节点:model
✅ 令牌片段:[{'type': 'tool_call_chunk', 'id': 'call_vbCyBcP8VuneUzyYlSBZZsVa', 'name': 'get_weather', 'args': '', 'index': 0}]

📍 节点:model
✅ 令牌片段:[{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': '{"', 'index': 0}]

📍 节点:model
✅ 令牌片段:[{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': 'city', 'index': 0}]

# ... 中间省略工具调用参数生成过程 ...

📍 节点:tools
✅ 令牌片段:[{'type': 'text', 'text': '旧金山 的天气:晴,25℃'}]

📍 节点:model
✅ 令牌片段:[{'type': 'text', 'text': '旧金'}]

📍 节点:model
✅ 令牌片段:[{'type': 'text', 'text': '山的天气:'}]

📍 节点:model
✅ 令牌片段:[{'type': 'text', 'text': '晴,25℃'}]
关键要点
  • 令牌片段类型:工具调用时为 tool_call_chunk,生成文本时为 text

  • 元数据用途:langgraph_node 可区分令牌来自「模型节点(model)」还是「工具节点(tools)」;

  • 适用场景:实时展示回复生成过程(如聊天机器人的打字机效果)、调试工具调用的参数生成逻辑。

2.3 模式 3:custom - 流自定义更新

核心逻辑:通过 get_stream_writer() 在工具/中间件中发送自定义数据(如进度提示、状态说明),适合长任务的进度反馈。

代码示例+详解(带进度反馈的天气查询)
from langchain.agents import create_agent
from langgraph.config import get_stream_writer  # 导入自定义流写入器

# 1. 定义工具:带进度反馈的天气查询
def get_weather(city: str) -> str:
    """查询指定城市的天气(带实时进度反馈)"""
    # 获取流写入器(用于发送自定义数据)
    writer = get_stream_writer()
    
    # 发送自定义进度更新(会在stream_mode="custom"中输出)
    writer(f"🔍 正在查询 {city} 的天气数据...")
    writer(f"📥 已获取 {city} 的原始气象数据,正在解析...")
    
    # 模拟查询逻辑
    return f"{city} 的天气:晴,25℃"

# 2. 创建智能体(使用Claude模型)
agent = create_agent(
    model="claude-sonnet-4-5-20250929",
    tools=[get_weather],
)

# 3. 流式调用,stream_mode="custom"
print("📢 自定义进度反馈:")
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "旧金山的天气怎么样?"}]},
    stream_mode="custom",  # 启用自定义流式
):
    # chunk 为工具中 writer 发送的自定义数据
    print(chunk)
输出结果
📢 自定义进度反馈:
🔍 正在查询 旧金山 的天气数据...
📥 已获取 旧金山 的原始气象数据,正在解析...
关键要点
  • get_stream_writer() 只能在 LangGraph 执行上下文 中使用(即智能体/工具调用时),无法单独调用;

  • 自定义数据格式:支持字符串、字典、列表等任意可序列化类型;

  • 适用场景:长耗时任务(如大数据查询、文件生成)的进度提示(如「已处理 30%」「即将完成」)。

2.4 模式 4:多模式流式 - 同时启用多种模式

核心逻辑:通过列表指定多个流式模式(如 ["updates", "custom"]),同时获取进度更新和自定义反馈,适合复杂场景的全面跟踪。

代码示例+详解
from langchain.agents import create_agent
from langgraph.config import get_stream_writer

# 1. 定义带自定义反馈的工具
def get_weather(city: str) -> str:
    writer = get_stream_writer()
    writer(f"🔍 正在查询 {city} 的天气...")
    writer(f"✅ 查询完成!")
    return f"{city} 的天气:晴,25℃"

# 2. 创建智能体
agent = create_agent(
    model="gpt-5-nano",
    tools=[get_weather],
)

# 3. 多模式流式:同时启用 updates 和 custom
for stream_mode, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "旧金山的天气怎么样?"}]},
    stream_mode=["updates", "custom"],  # 多模式组合
):
    print(f"📌 流式模式:{stream_mode}")
    print(f"内容:{chunk}\n")
输出结果(交替输出两种模式的内容)
📌 流式模式:updates
内容:{'model': {'messages': [AIMessage(content='', tool_calls=[{'name': 'get_weather', 'args': {'city': '旧金山'}, 'id': 'call_KTNQIftMrl9vgNwEfAJMVu7r', 'type': 'tool_call'}])]}}

📌 流式模式:custom
内容:🔍 正在查询 旧金山 的天气...

📌 流式模式:custom
内容:✅ 查询完成!

📌 流式模式:updates
内容:{'tools': {'messages': [ToolMessage(content='旧金山 的天气:晴,25℃', name='get_weather', tool_call_id='call_KTNQIftMrl9vgNwEfAJMVu7r')]}}

📌 流式模式:updates
内容:{'model': {'messages': [AIMessage(content='旧金山的天气:晴,25℃')]}}

三、常见流式场景实战 🎯

3.1 场景 1:流式工具调用(跟踪工具调用过程)

核心需求:同时跟踪工具调用的增量生成(如参数拼接)和最终执行结果,适合调试工具调用逻辑或向用户展示调用过程。

代码示例+详解
from typing import Any
from langchain.agents import create_agent
from langchain.messages import AIMessage, AIMessageChunk, AnyMessage, ToolMessage

# 1. 定义工具
def get_weather(city: str) -> str:
    """查询指定城市的天气"""
    return f"{city} 的天气:晴,25℃"

# 2. 创建智能体(使用OpenAI gpt-5.2模型)
agent = create_agent("openai:gpt-5.2", tools=[get_weather])

# 3. 定义令牌片段渲染函数(美化输出)
def _render_message_chunk(token: AIMessageChunk) -> None:
    if token.text:
        print(f"💬 生成文本:{token.text}", end="|")  # 打字机效果
    if token.tool_call_chunks:
        print(f"\n🔧 工具调用片段:{token.tool_call_chunks}")

# 4. 定义完整消息渲染函数(输出工具调用结果/响应)
def _render_completed_message(message: AnyMessage) -> None:
    if isinstance(message, AIMessage) and message.tool_calls:
        print(f"\n✅ 完整工具调用:{message.tool_calls}")
    if isinstance(message, ToolMessage):
        print(f"\n📊 工具响应:{message.content_blocks}")

# 5. 多模式流式:messages(令牌)+ updates(进度)
input_message = {"role": "user", "content": "波士顿的天气怎么样?"}
for stream_mode, data in agent.stream(
    {"messages": [input_message]},
    stream_mode=["messages", "updates"],  # 双模式
):
    if stream_mode == "messages":
        # 处理令牌片段
        token, metadata = data
        if isinstance(token, AIMessageChunk):
            _render_message_chunk(token)
    if stream_mode == "updates":
        # 处理步骤进度(输出完整工具调用和响应)
        for source, update in data.items():
            if source in ("model", "tools"):
                _render_completed_message(update["messages"][-1])
输出结果
🔧 工具调用片段:[{'name': 'get_weather', 'args': '', 'id': 'call_D3Orjr89KgsLTZ9hTzYv7Hpf', 'index': 0, 'type': 'tool_call_chunk'}]
🔧 工具调用片段:[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
🔧 工具调用片段:[{'name': None, 'args': 'city', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
🔧 工具调用片段:[{'name': None, 'args': '":"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
🔧 工具调用片段:[{'name': None, 'args': 'Boston', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
🔧 工具调用片段:[{'name': None, 'args': '"}', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
✅ 完整工具调用:[{'name': 'get_weather', 'args': {'city': '波士顿'}, 'id': 'call_D3Orjr89KgsLTZ9hTzYv7Hpf', 'type': 'tool_call'}]
📊 工具响应:[{'type': 'text', 'text': '波士顿 的天气:晴,25℃'}]
💬 生成文本:波士|💬 生成文本:顿的天气:|💬 生成文本:晴,25℃|

3.2 场景 2:人机交互中断(Human-in-the-Loop)

核心需求:智能体执行工具前触发中断,等待人工审批后再继续,适合需要人工确认的敏感操作(如支付、数据修改)。

代码示例+详解
from typing import Any
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langchain.messages import AIMessage, AIMessageChunk, AnyMessage, ToolMessage
from langgraph.checkpoint.memory import InMemorySaver  # 内存持久化(保存中断状态)
from langgraph.types import Command, Interrupt

# 1. 定义工具
def get_weather(city: str) -> str:
    """查询指定城市的天气"""
    return f"{city} 的天气:晴,25℃"

# 2. 初始化持久化器(保存会话状态,支持中断后恢复)
checkpointer = InMemorySaver()

# 3. 创建智能体(启用人机交互中断)
agent = create_agent(
    "openai:gpt-5.2",
    tools=[get_weather],
    middleware=[
        # 配置中断:调用get_weather工具时触发中断
        HumanInTheLoopMiddleware(interrupt_on={"get_weather": True}),
    ],
    checkpointer=checkpointer,  # 必须启用持久化才能支持中断
)

# 4. 定义渲染函数
def _render_message_chunk(token: AIMessageChunk) -> None:
    if token.text:
        print(token.text, end="|")
    if token.tool_call_chunks:
        print(token.tool_call_chunks)

def _render_completed_message(message: AnyMessage) -> None:
    if isinstance(message, AIMessage) and message.tool_calls:
        print(f"\n✅ 工具调用:{message.tool_calls}")
    if isinstance(message, ToolMessage):
        print(f"\n📊 工具响应:{message.content_blocks}")

def _render_interrupt(interrupt: Interrupt) -> None:
    """渲染中断请求(提示人工审批)"""
    interrupts = interrupt.value
    for request in interrupts["action_requests"]:
        print(f"\n⚠️  需要人工审批:{request['description']}")

# 5. 流式调用(跟踪中断和进度)
input_message = {
    "role": "user",
    "content": "查询波士顿和旧金山的天气"
}
config = {"configurable": {"thread_id": "session_001"}}  # 会话ID(关联中断状态)
interrupts = []  # 存储中断请求

print("📢 智能体执行中...")
for stream_mode, data in agent.stream(
    {"messages": [input_message]},
    config=config,
    stream_mode=["messages", "updates"],
):
    if stream_mode == "messages":
        token, metadata = data
        if isinstance(token, AIMessageChunk):
            _render_message_chunk(token)
    if stream_mode == "updates":
        for source, update in data.items():
            if source in ("model", "tools"):
                _render_completed_message(update["messages"][-1])
            if source == "__interrupt__":
                # 捕获中断请求
                interrupts.extend(update)
                for interrupt in update:
                    _render_interrupt(interrupt)

# 6. 人工审批后,恢复智能体执行
def _get_interrupt_decisions(interrupt: Interrupt) -> list[dict]:
    """定义审批决策:编辑波士顿查询,批准旧金山查询"""
    return [
        {
            "type": "edit",  # 编辑工具调用参数
            "edited_action": {
                "name": "get_weather",
                "args": {"city": "波士顿, 英国"}  # 修改城市参数
            },
        }
        if "boston" in request["description"].lower()
        else {"type": "approve"}  # 批准其他调用
        for request in interrupt.value["action_requests"]
    ]

# 生成审批决策
decisions = {}
for interrupt in interrupts:
    decisions[interrupt.id] = {
        "decisions": _get_interrupt_decisions(interrupt)
    }

# 7. 发送审批决策,恢复流式执行
print("\n📢 人工审批完成,恢复执行...")
for stream_mode, data in agent.stream(
    Command(resume=decisions),  # 发送恢复命令
    config=config,
    stream_mode=["messages", "updates"],
):
    if stream_mode == "messages":
        token, metadata = data
        if isinstance(token, AIMessageChunk):
            _render_message_chunk(token)
    if stream_mode == "updates":
        for source, update in data.items():
            if source in ("model", "tools"):
                _render_completed_message(update["messages"][-1])
输出结果(含中断审批流程)
📢 智能体执行中...
[{'name': 'get_weather', 'args': '', 'id': 'call_GOwNaQHeqMixay2qy80padfE', 'index': 0, 'type': 'tool_call_chunk'}]
# ... 省略工具调用参数生成过程 ...
✅ 工具调用:[{'name': 'get_weather', 'args': {'city': '波士顿'}, 'id': 'call_GOwNaQHeqMixay2qy80padfE', 'type': 'tool_call'}, {'name': 'get_weather', 'args': {'city': '旧金山'}, 'id': 'call_Ndb4jvWm2uMA0JDQXu37wDH6', 'type': 'tool_call'}]

⚠️  需要人工审批:Tool execution requires approval. Tool: get_weather, Args: {'city': '波士顿'}
⚠️  需要人工审批:Tool execution requires approval. Tool: get_weather, Args: {'city': '旧金山'}

📢 人工审批完成,恢复执行...
📊 工具响应:[{'type': 'text', 'text': '波士顿, 英国 的天气:晴,25℃'}]
📊 工具响应:[{'type': 'text', 'text': '旧金山 的天气:晴,25℃'}]
💬 生成文本:-|💬 生成文本: 波士顿, 英国|💬 生成文本::晴,25℃|
💬 生成文本:-|💬 生成文本: 旧金山|💬 生成文本::晴,25℃|

3.3 场景 3:子智能体流式(多智能体协作)

核心需求:多个智能体协作时,区分每个智能体的流式输出(如「监督智能体」调用「天气智能体」),避免混淆。

代码示例+详解
from typing import Any
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langchain.messages import AIMessage, AIMessageChunk, AnyMessage, ToolMessage

# 1. 定义基础工具:查询天气
def get_weather(city: str) -> str:
    """查询指定城市的天气"""
    return f"{city} 的天气:晴,25℃"

# 2. 创建子智能体:天气查询智能体(命名为 weather_agent)
weather_model = init_chat_model("openai:gpt-5.2")
weather_agent = create_agent(
    model=weather_model,
    tools=[get_weather],
    name="weather_agent",  # 子智能体命名(关键:用于区分流式来源)
)

# 3. 定义工具:调用天气子智能体
def call_weather_agent(query: str) -> str:
    """调用天气智能体处理查询"""
    result = weather_agent.invoke({
        "messages": [{"role": "user", "content": query}]
    })
    return result["messages"][-1].text  # 返回子智能体的回复

# 4. 创建主智能体:监督智能体(命名为 supervisor)
supervisor_model = init_chat_model("openai:gpt-5.2")
agent = create_agent(
    model=supervisor_model,
    tools=[call_weather_agent],
    name="supervisor",  # 主智能体命名
)

# 5. 定义渲染函数(区分智能体来源)
def _render_message_chunk(token: AIMessageChunk) -> None:
    if token.text:
        print(token.text, end="|")
    if token.tool_call_chunks:
        print(token.tool_call_chunks)

def _render_completed_message(message: AnyMessage) -> None:
    if isinstance(message, AIMessage) and message.tool_calls:
        print(f"\n✅ 工具调用:{message.tool_calls}")
    if isinstance(message, ToolMessage):
        print(f"\n📊 工具响应:{message.content_blocks}")

# 6. 子智能体流式:启用 subgraphs=True(关键)
input_message = {"role": "user", "content": "波士顿的天气怎么样?"}
current_agent = None  # 跟踪当前活跃的智能体

for _, stream_mode, data in agent.stream(
    {"messages": [input_message]},
    stream_mode=["messages", "updates"],
    subgraphs=True,  # 启用子图流式(支持多智能体区分)
):
    if stream_mode == "messages":
        token, metadata = data
        # 从元数据中获取智能体名称(lc_agent_name)
        if agent_name := metadata.get("lc_agent_name"):
            if agent_name != current_agent:
                print(f"\n🤖 当前智能体:{agent_name}")
                current_agent = agent_name
        if isinstance(token, AIMessageChunk):
            _render_message_chunk(token)
    if stream_mode == "updates":
        for source, update in data.items():
            if source in ("model", "tools"):
                _render_completed_message(update["messages"][-1])
输出结果(清晰区分两个智能体的输出)
🤖 当前智能体:supervisor
[{'name': 'call_weather_agent', 'args': '', 'id': 'call_asorzUf0mB6sb7MiKfgojp7I', 'index': 0, 'type': 'tool_call_chunk'}]
# ... 省略主智能体工具调用参数生成 ...
✅ 工具调用:[{'name': 'call_weather_agent', 'args': {'query': '波士顿的天气怎么样?'}, 'id': 'call_asorzUf0mB6sb7MiKfgojp7I', 'type': 'tool_call'}]

🤖 当前智能体:weather_agent
[{'name': 'get_weather', 'args': '', 'id': 'call_LZ89lT8fW6w8vqck5pZeaDIx', 'index': 0, 'type': 'tool_call_chunk'}]
# ... 省略子智能体工具调用参数生成 ...
✅ 工具调用:[{'name': 'get_weather', 'args': {'city': '波士顿'}, 'id': 'call_LZ89lT8fW6w8vqck5pZeaDIx', 'type': 'tool_call'}]
📊 工具响应:[{'type': 'text', 'text': '波士顿 的天气:晴,25℃'}]
波士|顿的天气:|晴,25℃|

🤖 当前智能体:supervisor
波士|顿的天气:|晴,25℃|

四、禁用流式处理 ❌

部分场景下(如多智能体协作、模型不支持流式)需要禁用流式,只需在初始化模型时设置 streaming=False 即可。

代码示例

from langchain_openai import ChatOpenAI

# 初始化模型,禁用流式
model = ChatOpenAI(
    model="gpt-4.1",
    streaming=False  # 禁用令牌流式输出
)

# 若模型不支持 streaming 参数,使用 disable_streaming=True
# model = SomeChatModel(disable_streaming=True)

五、核心代码整合(可直接运行) 🚀

以下代码整合了「多模式流式、自定义进度反馈、工具调用跟踪」的核心功能,基于 OpenAI 模型,只需替换 API Key 即可运行。

完整可运行代码

# 安装依赖
# pip install langchain langgraph openai tiktoken
import os
from typing import Any
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from langchain.messages import AIMessage, AIMessageChunk, AnyMessage, ToolMessage
from langgraph.config import get_stream_writer
from langgraph.checkpoint.memory import InMemorySaver

# -------------------------- 1. 环境配置 --------------------------
# 替换为自己的 OpenAI API Key
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
# 初始化模型(支持流式)
model = ChatOpenAI(model="gpt-4o", streaming=True)
# 初始化持久化器(用于会话状态保存)
checkpointer = InMemorySaver()

# -------------------------- 2. 定义工具(带自定义流式反馈) --------------------------
def get_weather(city: str) -> str:
    """查询指定城市的天气(带实时进度反馈)"""
    writer = get_stream_writer()
    writer(f"🔍 正在查询 {city} 的天气数据...")
    writer(f"📥 已获取 {city} 的气象数据,正在解析...")
    return f"{city} 的天气:晴,25℃,微风"

# -------------------------- 3. 定义渲染函数 --------------------------
def _render_message_chunk(token: AIMessageChunk) -> None:
    """渲染LLM令牌片段(打字机效果)"""
    if token.text:
        print(f"💬 {token.text}", end="|")
    if token.tool_call_chunks:
        print(f"\n🔧 工具调用片段:{token.tool_call_chunks}")

def _render_completed_message(message: AnyMessage) -> None:
    """渲染完整消息(工具调用/响应)"""
    if isinstance(message, AIMessage) and message.tool_calls:
        print(f"\n✅ 完整工具调用:{message.tool_calls}")
    if isinstance(message, ToolMessage):
        print(f"\n📊 工具响应:{message.content_blocks}")

# -------------------------- 4. 创建智能体 --------------------------
agent = create_agent(
    model=model,
    tools=[get_weather],
    checkpointer=checkpointer,
    name="weather_supervisor",  # 智能体命名
)

# -------------------------- 5. 多模式流式调用 --------------------------
print("📢 天气查询智能体启动(支持实时进度+令牌流式)\n")
input_message = {"role": "user", "content": "旧金山的天气怎么样?"}
config = {"configurable": {"thread_id": "session_001"}}  # 会话ID

for stream_mode, data in agent.stream(
    {"messages": [input_message]},
    config=config,
    stream_mode=["messages", "updates", "custom"],  # 三模式同时启用
):
    if stream_mode == "messages":
        # 处理LLM令牌流式
        token, metadata = data
        if isinstance(token, AIMessageChunk):
            _render_message_chunk(token)
    elif stream_mode == "updates":
        # 处理智能体进度流式
        for source, update in data.items():
            if source in ("model", "tools"):
                _render_completed_message(update["messages"][-1])
    elif stream_mode == "custom":
        # 处理自定义进度流式
        print(f"\n📌 自定义进度:{data}")

print("\n\n📢 查询完成!")

运行说明

  1. 替换 os.environ["OPENAI_API_KEY"] 为真实的 OpenAI API Key;

  2. 支持切换模型(如 gpt-5-nanoclaude-sonnet),只需修改 model 参数;

  3. 运行后会输出:自定义进度反馈→工具调用片段→完整工具调用→工具响应→LLM 令牌流式输出。

六、开发要点总结 📌

  1. 模式选择:跟踪进度用 updates,打字机效果用 messages,自定义反馈用 custom

  2. 多模式组合:复杂场景用 ["updates", "custom"],同时获取进度和自定义反馈;

  3. 子智能体流式:必须给每个智能体命名,并启用 subgraphs=True,否则无法区分来源;

  4. 持久化:人机交互中断、会话恢复必须启用 checkpointer(如 InMemorySaver/PostgresSaver);

  5. 模型兼容性:部分模型不支持流式,需用 disable_streaming=True 禁用;

  6. 性能优化:长文本生成时,避免过度打印令牌片段,可每隔N个令牌合并输出。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐