LangChain Streaming-Overview:流式处理使用完全指南
LangChain 的流式处理是提升大语言模型(LLM)应用响应性的核心功能,通过渐进式输出结果(无需等待完整响应生成),大幅优化用户体验,尤其适用于处理 LLM 延迟较高的场景(如长文本生成、多工具调用、复杂逻辑推理)。
LangChain Streaming-Overview:流式处理使用完全指南
LangChain 的流式处理是提升大语言模型(LLM)应用响应性的核心功能,通过渐进式输出结果(无需等待完整响应生成),大幅优化用户体验,尤其适用于处理 LLM 延迟较高的场景(如长文本生成、多工具调用、复杂逻辑推理)。
流式处理的核心价值在于「实时反馈」——用户无需等待最终结果,即可看到中间过程(如工具调用进度、令牌生成、自定义状态更新),让交互更流畅、透明。
一、核心概念与流式模式 🚀
1.1 什么是 LangChain 流式处理?
LangChain 的流式系统允许从智能体(Agent)运行中实时提取反馈,支持 4 类核心能力:
-
📊 流智能体进度:每一步执行后输出状态更新(如工具调用请求、工具执行结果);
-
🔤 流 LLM 令牌:模型生成令牌时实时输出(类似 ChatGPT 的打字机效果);
-
🎯 流自定义更新:输出用户定义的信号(如「已获取 10/100 条数据」);
-
📦 多模式流式:同时启用多种流式模式(如进度+自定义更新)。
1.2 支持的 3 大核心流式模式
通过 stream_mode 参数指定流式模式(支持单个模式或列表形式的多模式),核心模式对比如下:
| 模式 | 描述 | 输出格式 | 典型场景 |
|---|---|---|---|
updates |
智能体每步执行后的状态更新(如模型调用、工具执行) | 字典(含步骤名、消息内容等) | 跟踪多步骤任务进度(如「模型思考中→工具调用中→结果生成中」) |
messages |
LLM 生成的令牌+元数据(如节点名称、令牌类型) | 元组 (token, metadata) |
实现打字机式实时输出、跟踪工具调用的增量生成 |
custom |
自定义数据流(通过流写入器发送任意数据) | 用户定义的任意格式(字符串、字典等) | 长任务进度反馈(如「正在查询数据库→已完成 50%→已完成」) |
1.3 关键 API 与工具
-
agent.stream():同步流式调用(最常用); -
agent.astream():异步流式调用(适用于异步应用); -
get_stream_writer():自定义流写入器(用于custom模式发送自定义数据); -
AIMessageChunk:LLM 生成的令牌片段(messages模式的核心数据类型)。
二、核心流式模式用法详解 🛠️
2.1 模式 1:updates - 流智能体进度
核心逻辑:智能体每完成一步执行(模型调用、工具执行),就输出一次状态更新,适合跟踪任务执行流程。
代码示例+详解(天气查询智能体)
from langchain.agents import create_agent
# 1. 定义工具:查询天气
def get_weather(city: str) -> str:
"""查询指定城市的天气"""
return f"{city} 的天气:晴,25℃"
# 2. 创建智能体(绑定模型和工具)
# model:使用的LLM(如gpt-5-nano、gpt-4o)
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather], # 绑定天气查询工具
)
# 3. 流式调用智能体,stream_mode="updates"
# 输入:用户查询旧金山天气
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "旧金山的天气怎么样?"}]},
stream_mode="updates", # 启用进度流式
):
# chunk 是字典,key为步骤名(model/tools),value为该步骤的状态数据
for step, data in chunk.items():
print(f"🔧 步骤:{step}")
# 输出该步骤的最新消息内容(content_blocks 包含消息类型和文本)
print(f"📄 内容:{data['messages'][-1].content_blocks}\n")
输出结果(清晰展示每一步进度)
🔧 步骤:model
📄 内容:[{'type': 'tool_call', 'name': 'get_weather', 'args': {'city': '旧金山'}, 'id': 'call_OW2NYNsNSKhRZpjW0wm2Aszd'}]
🔧 步骤:tools
📄 内容:[{'type': 'text', 'text': '旧金山 的天气:晴,25℃'}]
🔧 步骤:model
📄 内容:[{'type': 'text', 'text': '旧金山的天气:晴,25℃'}]
关键要点
-
步骤流程:
model(模型决定调用工具)→tools(工具执行)→model(模型生成最终回复); -
data['messages'][-1]:获取该步骤的最新消息(避免冗余历史); -
适用场景:任务拆解、多工具调用的流程跟踪(如「订单查询→支付验证→物流跟踪」)。
2.2 模式 2:messages - 流 LLM 令牌
核心逻辑:LLM 生成令牌(字符/单词)时实时输出,实现「打字机效果」,同时返回元数据(如生成令牌的节点名称)。
代码示例+详解(实时输出回复令牌)
from langchain.agents import create_agent
# 1. 复用天气查询工具
def get_weather(city: str) -> str:
"""查询指定城市的天气"""
return f"{city} 的天气:晴,25℃"
# 2. 创建智能体
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
)
# 3. 流式调用,stream_mode="messages"
for token, metadata in agent.stream(
{"messages": [{"role": "user", "content": "旧金山的天气怎么样?"}]},
stream_mode="messages", # 启用令牌流式
):
# token:LLM生成的令牌片段(AIMessageChunk类型)
# metadata:元数据(含生成令牌的节点名称 langgraph_node)
print(f"📍 节点:{metadata['langgraph_node']}")
print(f"✅ 令牌片段:{token.content_blocks}\n")
输出结果(截取核心部分)
📍 节点:model
✅ 令牌片段:[{'type': 'tool_call_chunk', 'id': 'call_vbCyBcP8VuneUzyYlSBZZsVa', 'name': 'get_weather', 'args': '', 'index': 0}]
📍 节点:model
✅ 令牌片段:[{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': '{"', 'index': 0}]
📍 节点:model
✅ 令牌片段:[{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': 'city', 'index': 0}]
# ... 中间省略工具调用参数生成过程 ...
📍 节点:tools
✅ 令牌片段:[{'type': 'text', 'text': '旧金山 的天气:晴,25℃'}]
📍 节点:model
✅ 令牌片段:[{'type': 'text', 'text': '旧金'}]
📍 节点:model
✅ 令牌片段:[{'type': 'text', 'text': '山的天气:'}]
📍 节点:model
✅ 令牌片段:[{'type': 'text', 'text': '晴,25℃'}]
关键要点
-
令牌片段类型:工具调用时为
tool_call_chunk,生成文本时为text; -
元数据用途:
langgraph_node可区分令牌来自「模型节点(model)」还是「工具节点(tools)」; -
适用场景:实时展示回复生成过程(如聊天机器人的打字机效果)、调试工具调用的参数生成逻辑。
2.3 模式 3:custom - 流自定义更新
核心逻辑:通过 get_stream_writer() 在工具/中间件中发送自定义数据(如进度提示、状态说明),适合长任务的进度反馈。
代码示例+详解(带进度反馈的天气查询)
from langchain.agents import create_agent
from langgraph.config import get_stream_writer # 导入自定义流写入器
# 1. 定义工具:带进度反馈的天气查询
def get_weather(city: str) -> str:
"""查询指定城市的天气(带实时进度反馈)"""
# 获取流写入器(用于发送自定义数据)
writer = get_stream_writer()
# 发送自定义进度更新(会在stream_mode="custom"中输出)
writer(f"🔍 正在查询 {city} 的天气数据...")
writer(f"📥 已获取 {city} 的原始气象数据,正在解析...")
# 模拟查询逻辑
return f"{city} 的天气:晴,25℃"
# 2. 创建智能体(使用Claude模型)
agent = create_agent(
model="claude-sonnet-4-5-20250929",
tools=[get_weather],
)
# 3. 流式调用,stream_mode="custom"
print("📢 自定义进度反馈:")
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "旧金山的天气怎么样?"}]},
stream_mode="custom", # 启用自定义流式
):
# chunk 为工具中 writer 发送的自定义数据
print(chunk)
输出结果
📢 自定义进度反馈:
🔍 正在查询 旧金山 的天气数据...
📥 已获取 旧金山 的原始气象数据,正在解析...
关键要点
-
get_stream_writer()只能在 LangGraph 执行上下文 中使用(即智能体/工具调用时),无法单独调用; -
自定义数据格式:支持字符串、字典、列表等任意可序列化类型;
-
适用场景:长耗时任务(如大数据查询、文件生成)的进度提示(如「已处理 30%」「即将完成」)。
2.4 模式 4:多模式流式 - 同时启用多种模式
核心逻辑:通过列表指定多个流式模式(如 ["updates", "custom"]),同时获取进度更新和自定义反馈,适合复杂场景的全面跟踪。
代码示例+详解
from langchain.agents import create_agent
from langgraph.config import get_stream_writer
# 1. 定义带自定义反馈的工具
def get_weather(city: str) -> str:
writer = get_stream_writer()
writer(f"🔍 正在查询 {city} 的天气...")
writer(f"✅ 查询完成!")
return f"{city} 的天气:晴,25℃"
# 2. 创建智能体
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
)
# 3. 多模式流式:同时启用 updates 和 custom
for stream_mode, chunk in agent.stream(
{"messages": [{"role": "user", "content": "旧金山的天气怎么样?"}]},
stream_mode=["updates", "custom"], # 多模式组合
):
print(f"📌 流式模式:{stream_mode}")
print(f"内容:{chunk}\n")
输出结果(交替输出两种模式的内容)
📌 流式模式:updates
内容:{'model': {'messages': [AIMessage(content='', tool_calls=[{'name': 'get_weather', 'args': {'city': '旧金山'}, 'id': 'call_KTNQIftMrl9vgNwEfAJMVu7r', 'type': 'tool_call'}])]}}
📌 流式模式:custom
内容:🔍 正在查询 旧金山 的天气...
📌 流式模式:custom
内容:✅ 查询完成!
📌 流式模式:updates
内容:{'tools': {'messages': [ToolMessage(content='旧金山 的天气:晴,25℃', name='get_weather', tool_call_id='call_KTNQIftMrl9vgNwEfAJMVu7r')]}}
📌 流式模式:updates
内容:{'model': {'messages': [AIMessage(content='旧金山的天气:晴,25℃')]}}
三、常见流式场景实战 🎯
3.1 场景 1:流式工具调用(跟踪工具调用过程)
核心需求:同时跟踪工具调用的增量生成(如参数拼接)和最终执行结果,适合调试工具调用逻辑或向用户展示调用过程。
代码示例+详解
from typing import Any
from langchain.agents import create_agent
from langchain.messages import AIMessage, AIMessageChunk, AnyMessage, ToolMessage
# 1. 定义工具
def get_weather(city: str) -> str:
"""查询指定城市的天气"""
return f"{city} 的天气:晴,25℃"
# 2. 创建智能体(使用OpenAI gpt-5.2模型)
agent = create_agent("openai:gpt-5.2", tools=[get_weather])
# 3. 定义令牌片段渲染函数(美化输出)
def _render_message_chunk(token: AIMessageChunk) -> None:
if token.text:
print(f"💬 生成文本:{token.text}", end="|") # 打字机效果
if token.tool_call_chunks:
print(f"\n🔧 工具调用片段:{token.tool_call_chunks}")
# 4. 定义完整消息渲染函数(输出工具调用结果/响应)
def _render_completed_message(message: AnyMessage) -> None:
if isinstance(message, AIMessage) and message.tool_calls:
print(f"\n✅ 完整工具调用:{message.tool_calls}")
if isinstance(message, ToolMessage):
print(f"\n📊 工具响应:{message.content_blocks}")
# 5. 多模式流式:messages(令牌)+ updates(进度)
input_message = {"role": "user", "content": "波士顿的天气怎么样?"}
for stream_mode, data in agent.stream(
{"messages": [input_message]},
stream_mode=["messages", "updates"], # 双模式
):
if stream_mode == "messages":
# 处理令牌片段
token, metadata = data
if isinstance(token, AIMessageChunk):
_render_message_chunk(token)
if stream_mode == "updates":
# 处理步骤进度(输出完整工具调用和响应)
for source, update in data.items():
if source in ("model", "tools"):
_render_completed_message(update["messages"][-1])
输出结果
🔧 工具调用片段:[{'name': 'get_weather', 'args': '', 'id': 'call_D3Orjr89KgsLTZ9hTzYv7Hpf', 'index': 0, 'type': 'tool_call_chunk'}]
🔧 工具调用片段:[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
🔧 工具调用片段:[{'name': None, 'args': 'city', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
🔧 工具调用片段:[{'name': None, 'args': '":"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
🔧 工具调用片段:[{'name': None, 'args': 'Boston', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
🔧 工具调用片段:[{'name': None, 'args': '"}', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
✅ 完整工具调用:[{'name': 'get_weather', 'args': {'city': '波士顿'}, 'id': 'call_D3Orjr89KgsLTZ9hTzYv7Hpf', 'type': 'tool_call'}]
📊 工具响应:[{'type': 'text', 'text': '波士顿 的天气:晴,25℃'}]
💬 生成文本:波士|💬 生成文本:顿的天气:|💬 生成文本:晴,25℃|
3.2 场景 2:人机交互中断(Human-in-the-Loop)
核心需求:智能体执行工具前触发中断,等待人工审批后再继续,适合需要人工确认的敏感操作(如支付、数据修改)。
代码示例+详解
from typing import Any
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langchain.messages import AIMessage, AIMessageChunk, AnyMessage, ToolMessage
from langgraph.checkpoint.memory import InMemorySaver # 内存持久化(保存中断状态)
from langgraph.types import Command, Interrupt
# 1. 定义工具
def get_weather(city: str) -> str:
"""查询指定城市的天气"""
return f"{city} 的天气:晴,25℃"
# 2. 初始化持久化器(保存会话状态,支持中断后恢复)
checkpointer = InMemorySaver()
# 3. 创建智能体(启用人机交互中断)
agent = create_agent(
"openai:gpt-5.2",
tools=[get_weather],
middleware=[
# 配置中断:调用get_weather工具时触发中断
HumanInTheLoopMiddleware(interrupt_on={"get_weather": True}),
],
checkpointer=checkpointer, # 必须启用持久化才能支持中断
)
# 4. 定义渲染函数
def _render_message_chunk(token: AIMessageChunk) -> None:
if token.text:
print(token.text, end="|")
if token.tool_call_chunks:
print(token.tool_call_chunks)
def _render_completed_message(message: AnyMessage) -> None:
if isinstance(message, AIMessage) and message.tool_calls:
print(f"\n✅ 工具调用:{message.tool_calls}")
if isinstance(message, ToolMessage):
print(f"\n📊 工具响应:{message.content_blocks}")
def _render_interrupt(interrupt: Interrupt) -> None:
"""渲染中断请求(提示人工审批)"""
interrupts = interrupt.value
for request in interrupts["action_requests"]:
print(f"\n⚠️ 需要人工审批:{request['description']}")
# 5. 流式调用(跟踪中断和进度)
input_message = {
"role": "user",
"content": "查询波士顿和旧金山的天气"
}
config = {"configurable": {"thread_id": "session_001"}} # 会话ID(关联中断状态)
interrupts = [] # 存储中断请求
print("📢 智能体执行中...")
for stream_mode, data in agent.stream(
{"messages": [input_message]},
config=config,
stream_mode=["messages", "updates"],
):
if stream_mode == "messages":
token, metadata = data
if isinstance(token, AIMessageChunk):
_render_message_chunk(token)
if stream_mode == "updates":
for source, update in data.items():
if source in ("model", "tools"):
_render_completed_message(update["messages"][-1])
if source == "__interrupt__":
# 捕获中断请求
interrupts.extend(update)
for interrupt in update:
_render_interrupt(interrupt)
# 6. 人工审批后,恢复智能体执行
def _get_interrupt_decisions(interrupt: Interrupt) -> list[dict]:
"""定义审批决策:编辑波士顿查询,批准旧金山查询"""
return [
{
"type": "edit", # 编辑工具调用参数
"edited_action": {
"name": "get_weather",
"args": {"city": "波士顿, 英国"} # 修改城市参数
},
}
if "boston" in request["description"].lower()
else {"type": "approve"} # 批准其他调用
for request in interrupt.value["action_requests"]
]
# 生成审批决策
decisions = {}
for interrupt in interrupts:
decisions[interrupt.id] = {
"decisions": _get_interrupt_decisions(interrupt)
}
# 7. 发送审批决策,恢复流式执行
print("\n📢 人工审批完成,恢复执行...")
for stream_mode, data in agent.stream(
Command(resume=decisions), # 发送恢复命令
config=config,
stream_mode=["messages", "updates"],
):
if stream_mode == "messages":
token, metadata = data
if isinstance(token, AIMessageChunk):
_render_message_chunk(token)
if stream_mode == "updates":
for source, update in data.items():
if source in ("model", "tools"):
_render_completed_message(update["messages"][-1])
输出结果(含中断审批流程)
📢 智能体执行中...
[{'name': 'get_weather', 'args': '', 'id': 'call_GOwNaQHeqMixay2qy80padfE', 'index': 0, 'type': 'tool_call_chunk'}]
# ... 省略工具调用参数生成过程 ...
✅ 工具调用:[{'name': 'get_weather', 'args': {'city': '波士顿'}, 'id': 'call_GOwNaQHeqMixay2qy80padfE', 'type': 'tool_call'}, {'name': 'get_weather', 'args': {'city': '旧金山'}, 'id': 'call_Ndb4jvWm2uMA0JDQXu37wDH6', 'type': 'tool_call'}]
⚠️ 需要人工审批:Tool execution requires approval. Tool: get_weather, Args: {'city': '波士顿'}
⚠️ 需要人工审批:Tool execution requires approval. Tool: get_weather, Args: {'city': '旧金山'}
📢 人工审批完成,恢复执行...
📊 工具响应:[{'type': 'text', 'text': '波士顿, 英国 的天气:晴,25℃'}]
📊 工具响应:[{'type': 'text', 'text': '旧金山 的天气:晴,25℃'}]
💬 生成文本:-|💬 生成文本: 波士顿, 英国|💬 生成文本::晴,25℃|
💬 生成文本:-|💬 生成文本: 旧金山|💬 生成文本::晴,25℃|
3.3 场景 3:子智能体流式(多智能体协作)
核心需求:多个智能体协作时,区分每个智能体的流式输出(如「监督智能体」调用「天气智能体」),避免混淆。
代码示例+详解
from typing import Any
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langchain.messages import AIMessage, AIMessageChunk, AnyMessage, ToolMessage
# 1. 定义基础工具:查询天气
def get_weather(city: str) -> str:
"""查询指定城市的天气"""
return f"{city} 的天气:晴,25℃"
# 2. 创建子智能体:天气查询智能体(命名为 weather_agent)
weather_model = init_chat_model("openai:gpt-5.2")
weather_agent = create_agent(
model=weather_model,
tools=[get_weather],
name="weather_agent", # 子智能体命名(关键:用于区分流式来源)
)
# 3. 定义工具:调用天气子智能体
def call_weather_agent(query: str) -> str:
"""调用天气智能体处理查询"""
result = weather_agent.invoke({
"messages": [{"role": "user", "content": query}]
})
return result["messages"][-1].text # 返回子智能体的回复
# 4. 创建主智能体:监督智能体(命名为 supervisor)
supervisor_model = init_chat_model("openai:gpt-5.2")
agent = create_agent(
model=supervisor_model,
tools=[call_weather_agent],
name="supervisor", # 主智能体命名
)
# 5. 定义渲染函数(区分智能体来源)
def _render_message_chunk(token: AIMessageChunk) -> None:
if token.text:
print(token.text, end="|")
if token.tool_call_chunks:
print(token.tool_call_chunks)
def _render_completed_message(message: AnyMessage) -> None:
if isinstance(message, AIMessage) and message.tool_calls:
print(f"\n✅ 工具调用:{message.tool_calls}")
if isinstance(message, ToolMessage):
print(f"\n📊 工具响应:{message.content_blocks}")
# 6. 子智能体流式:启用 subgraphs=True(关键)
input_message = {"role": "user", "content": "波士顿的天气怎么样?"}
current_agent = None # 跟踪当前活跃的智能体
for _, stream_mode, data in agent.stream(
{"messages": [input_message]},
stream_mode=["messages", "updates"],
subgraphs=True, # 启用子图流式(支持多智能体区分)
):
if stream_mode == "messages":
token, metadata = data
# 从元数据中获取智能体名称(lc_agent_name)
if agent_name := metadata.get("lc_agent_name"):
if agent_name != current_agent:
print(f"\n🤖 当前智能体:{agent_name}")
current_agent = agent_name
if isinstance(token, AIMessageChunk):
_render_message_chunk(token)
if stream_mode == "updates":
for source, update in data.items():
if source in ("model", "tools"):
_render_completed_message(update["messages"][-1])
输出结果(清晰区分两个智能体的输出)
🤖 当前智能体:supervisor
[{'name': 'call_weather_agent', 'args': '', 'id': 'call_asorzUf0mB6sb7MiKfgojp7I', 'index': 0, 'type': 'tool_call_chunk'}]
# ... 省略主智能体工具调用参数生成 ...
✅ 工具调用:[{'name': 'call_weather_agent', 'args': {'query': '波士顿的天气怎么样?'}, 'id': 'call_asorzUf0mB6sb7MiKfgojp7I', 'type': 'tool_call'}]
🤖 当前智能体:weather_agent
[{'name': 'get_weather', 'args': '', 'id': 'call_LZ89lT8fW6w8vqck5pZeaDIx', 'index': 0, 'type': 'tool_call_chunk'}]
# ... 省略子智能体工具调用参数生成 ...
✅ 工具调用:[{'name': 'get_weather', 'args': {'city': '波士顿'}, 'id': 'call_LZ89lT8fW6w8vqck5pZeaDIx', 'type': 'tool_call'}]
📊 工具响应:[{'type': 'text', 'text': '波士顿 的天气:晴,25℃'}]
波士|顿的天气:|晴,25℃|
🤖 当前智能体:supervisor
波士|顿的天气:|晴,25℃|
四、禁用流式处理 ❌
部分场景下(如多智能体协作、模型不支持流式)需要禁用流式,只需在初始化模型时设置 streaming=False 即可。
代码示例
from langchain_openai import ChatOpenAI
# 初始化模型,禁用流式
model = ChatOpenAI(
model="gpt-4.1",
streaming=False # 禁用令牌流式输出
)
# 若模型不支持 streaming 参数,使用 disable_streaming=True
# model = SomeChatModel(disable_streaming=True)
五、核心代码整合(可直接运行) 🚀
以下代码整合了「多模式流式、自定义进度反馈、工具调用跟踪」的核心功能,基于 OpenAI 模型,只需替换 API Key 即可运行。
完整可运行代码
# 安装依赖
# pip install langchain langgraph openai tiktoken
import os
from typing import Any
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from langchain.messages import AIMessage, AIMessageChunk, AnyMessage, ToolMessage
from langgraph.config import get_stream_writer
from langgraph.checkpoint.memory import InMemorySaver
# -------------------------- 1. 环境配置 --------------------------
# 替换为自己的 OpenAI API Key
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
# 初始化模型(支持流式)
model = ChatOpenAI(model="gpt-4o", streaming=True)
# 初始化持久化器(用于会话状态保存)
checkpointer = InMemorySaver()
# -------------------------- 2. 定义工具(带自定义流式反馈) --------------------------
def get_weather(city: str) -> str:
"""查询指定城市的天气(带实时进度反馈)"""
writer = get_stream_writer()
writer(f"🔍 正在查询 {city} 的天气数据...")
writer(f"📥 已获取 {city} 的气象数据,正在解析...")
return f"{city} 的天气:晴,25℃,微风"
# -------------------------- 3. 定义渲染函数 --------------------------
def _render_message_chunk(token: AIMessageChunk) -> None:
"""渲染LLM令牌片段(打字机效果)"""
if token.text:
print(f"💬 {token.text}", end="|")
if token.tool_call_chunks:
print(f"\n🔧 工具调用片段:{token.tool_call_chunks}")
def _render_completed_message(message: AnyMessage) -> None:
"""渲染完整消息(工具调用/响应)"""
if isinstance(message, AIMessage) and message.tool_calls:
print(f"\n✅ 完整工具调用:{message.tool_calls}")
if isinstance(message, ToolMessage):
print(f"\n📊 工具响应:{message.content_blocks}")
# -------------------------- 4. 创建智能体 --------------------------
agent = create_agent(
model=model,
tools=[get_weather],
checkpointer=checkpointer,
name="weather_supervisor", # 智能体命名
)
# -------------------------- 5. 多模式流式调用 --------------------------
print("📢 天气查询智能体启动(支持实时进度+令牌流式)\n")
input_message = {"role": "user", "content": "旧金山的天气怎么样?"}
config = {"configurable": {"thread_id": "session_001"}} # 会话ID
for stream_mode, data in agent.stream(
{"messages": [input_message]},
config=config,
stream_mode=["messages", "updates", "custom"], # 三模式同时启用
):
if stream_mode == "messages":
# 处理LLM令牌流式
token, metadata = data
if isinstance(token, AIMessageChunk):
_render_message_chunk(token)
elif stream_mode == "updates":
# 处理智能体进度流式
for source, update in data.items():
if source in ("model", "tools"):
_render_completed_message(update["messages"][-1])
elif stream_mode == "custom":
# 处理自定义进度流式
print(f"\n📌 自定义进度:{data}")
print("\n\n📢 查询完成!")
运行说明
-
替换
os.environ["OPENAI_API_KEY"]为真实的 OpenAI API Key; -
支持切换模型(如
gpt-5-nano、claude-sonnet),只需修改model参数; -
运行后会输出:自定义进度反馈→工具调用片段→完整工具调用→工具响应→LLM 令牌流式输出。
六、开发要点总结 📌
-
模式选择:跟踪进度用
updates,打字机效果用messages,自定义反馈用custom; -
多模式组合:复杂场景用
["updates", "custom"],同时获取进度和自定义反馈; -
子智能体流式:必须给每个智能体命名,并启用
subgraphs=True,否则无法区分来源; -
持久化:人机交互中断、会话恢复必须启用
checkpointer(如InMemorySaver/PostgresSaver); -
模型兼容性:部分模型不支持流式,需用
disable_streaming=True禁用; -
性能优化:长文本生成时,避免过度打印令牌片段,可每隔N个令牌合并输出。
更多推荐




所有评论(0)