LangChain V1.0 Streaming 详细指南
LangChain Streaming 指南摘要 LangChain 的流式传输系统为LLM应用提供实时响应能力,显著提升用户体验。核心功能包括: 流式模式: 支持values/updates/messages/custom/debug五种模式 可输出token级/步骤级/事件级数据 核心应用: 模型流式:逐token输出内容 工具流式:实时显示工具调用过程 Agent流式:观察Agent执行步骤
LangChain Streaming 详细指南
基于官方文档 https://docs.langchain.com/oss/python/langchain/streaming 的完整中文总结
概述
LangChain 实现了一个强大的流式传输系统,用于实时显示更新。流式传输对于增强基于 LLM 的应用程序的响应性至关重要。通过逐步显示输出(即使在完整响应准备好之前),流式传输显著改善了用户体验(UX),特别是在处理 LLM 的延迟时。
关键优势:
- 🚀 实时反馈 - 用户立即看到进展
- ⚡ 改善感知性能 - 即使总时间相同,流式传输让应用感觉更快
- 📊 进度可视化 - 可以显示中间步骤和状态
- 🎯 更好的 UX - 特别是对于长响应
为什么需要 Streaming
1. LLM 延迟问题
大型语言模型生成响应需要时间,特别是对于长输出:
- GPT-4 生成 500 字可能需要 10-20 秒
- 用户期望即时反馈
- 流式传输让等待过程更加可控
2. 用户体验
# 非流式:用户等待 15 秒,然后看到完整响应
response = model.invoke("写一篇关于 AI 的文章")
print(response.content) # 15 秒后显示
# 流式:用户立即看到文字逐渐出现
for chunk in model.stream("写一篇关于 AI 的文章"):
print(chunk.text, end="", flush=True) # 实时显示
3. Agent 可观察性
在复杂的 Agent 系统中,流式传输让你看到:
- Agent 正在调用哪个工具
- 工具执行的进度
- 中间思考过程
核心概念
Stream Mode(流式模式)
LangChain 支持多种流式模式,每种模式提供不同级别的信息:
| 模式 | 描述 | 用途 |
|---|---|---|
values |
每步后的完整状态 | 查看完整的图状态 |
updates |
每步的状态更新(增量) | 只看变化部分 |
messages |
LLM token 流 + 元数据 | 流式显示 LLM 输出 |
custom |
自定义用户数据 | 进度更新、日志等 |
debug |
详细的执行信息 | 调试和故障排除 |
流式输出类型
- Token-level streaming - 逐个 token 输出
- Step-level streaming - 每个节点/步骤的输出
- Event streaming - 语义事件(开始、流式、结束)
LangChain 中的 Streaming
Model Streaming
基本用法
from langchain.chat_models import init_chat_model
model = init_chat_model(model="glm-4.5-air")
# 流式输出 tokens
for chunk in model.stream("什么颜色是天空?"):
print(chunk.text, end="", flush=True)
输出:
天
天空
天空通常
天空通常是
天空通常是蓝色
...
累积消息块
from langchain_core.messages import AIMessageChunk
# 累积完整消息
full = None
for chunk in model.stream("你好"):
full = chunk if full is None else full + chunk
print(full.text)
# 最终的 full 是一个完整的 AIMessage
print(full.content_blocks)
# [{"type": "text", "text": "你好!有什么可以帮你的吗?"}]
流式工具调用
from langchain.tools import tool
@tool
def get_weather(city: str) -> str:
"""获取城市天气"""
return f"{city} 今天晴天"
model_with_tools = model.bind_tools([get_weather])
# 工具调用在流式传输中逐步构建
for chunk in model_with_tools.stream("波士顿天气如何?"):
if chunk.tool_call_chunks:
for tool_chunk in chunk.tool_call_chunks:
print(f"Tool: {tool_chunk.get('name', '')}")
print(f"Args: {tool_chunk.get('args', '')}")
输出:
Tool: get_weather
Args:
Tool:
Args: {"city
Tool:
Args: ": "Boston"}
Agent Streaming
流式 Agent 进度
from langchain.agents import create_agent
agent = create_agent(
model="glm-4.5-air",
tools=[get_weather],
)
# stream_mode="updates" 显示每个步骤后的更新
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "SF 的天气"}]},
stream_mode="updates"
):
print(chunk)
输出示例:
# LLM 节点输出(带工具调用)
{'model': {'messages': [AIMessage(tool_calls=[...])]}}
# 工具节点输出
{'tools': {'messages': [ToolMessage(content="晴天")]}}
# LLM 最终响应
{'model': {'messages': [AIMessage(content="旧金山今天晴天")]}}
自动流式传输(invoke())
LangChain 的一个强大特性是自动流式传输:即使在节点内使用 invoke(),如果整个应用程序处于流式模式,LangChain 也会自动切换到流式传输。
from langgraph.graph import StateGraph, START
def my_node(state):
# 使用 invoke(),但在流式上下文中会自动流式传输
response = model.invoke(state["messages"])
return {"messages": [response]}
graph = StateGraph(MessagesState).add_node("model", my_node).compile()
# graph.stream() 会自动触发 model 的流式传输
for chunk in graph.stream(
{"messages": [{"role": "user", "content": "你好"}]},
stream_mode="messages" # 流式传输 LLM tokens
):
print(chunk)
工作原理:
- 当检测到整体应用程序在流式模式时,
invoke()自动切换到内部流式模式 - LangChain 触发
on_llm_new_token回调事件 - LangGraph 的
stream()和astream_events()实时显示输出
LangGraph 中的 Streaming
LangGraph 提供更强大的流式传输功能,适用于复杂的 Agent 工作流。
Stream 模式详解
1. Values Mode - 完整状态
每个超步骤后流式传输完整的图状态。
from langgraph.graph import StateGraph, START
from typing import TypedDict
class State(TypedDict):
topic: str
joke: str
def generate_joke(state: State):
return {"joke": f"关于 {state['topic']} 的笑话"}
graph = StateGraph(State).add_node("generate", generate_joke).compile()
for chunk in graph.stream(
{"topic": "冰淇淋"},
stream_mode="values" # 完整状态
):
print(chunk)
输出:
{'topic': '冰淇淋', 'joke': ''} # 初始状态
{'topic': '冰淇淋', 'joke': '关于 冰淇淋 的笑话'} # 执行后
2. Updates Mode - 增量更新
只流式传输状态的更新部分。
for chunk in graph.stream(
{"topic": "冰淇淋"},
stream_mode="updates" # 只有更新
):
print(chunk)
输出:
{'generate': {'joke': '关于 冰淇淋 的笑话'}} # 只显示更新
流式传输图状态
from typing import Annotated
from langgraph.graph import StateGraph, START, END
class GraphState(TypedDict):
count: Annotated[int, lambda x, y: x + y] # reducer
data: str
def node_a(state: GraphState):
return {"count": 1, "data": "A"}
def node_b(state: GraphState):
return {"count": 1, "data": "B"}
graph = (
StateGraph(GraphState)
.add_node("a", node_a)
.add_node("b", node_b)
.add_edge(START, "a")
.add_edge("a", "b")
.compile()
)
# 流式传输状态更新
for chunk in graph.stream(
{"count": 0, "data": ""},
stream_mode="updates"
):
print(chunk)
输出:
{'a': {'count': 1, 'data': 'A'}}
{'b': {'count': 1, 'data': 'B'}}
流式传输 LLM Tokens
使用 stream_mode="messages" 实时流式传输 LLM 生成的 tokens。
from dataclasses import dataclass
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START
@dataclass
class MyState:
topic: str
joke: str = ""
model = init_chat_model(model="glm-4.5-air-mini")
def call_model(state: MyState):
"""调用 LLM 生成笑话"""
# 即使使用 invoke,在 messages 模式下也会流式传输
response = model.invoke([
{"role": "user", "content": f"生成一个关于 {state.topic} 的笑话"}
])
return {"joke": response.content}
graph = (
StateGraph(MyState)
.add_node("call_model", call_model)
.add_edge(START, "call_model")
.compile()
)
# 流式传输 LLM tokens
for message_chunk, metadata in graph.stream(
{"topic": "冰淇淋"},
stream_mode="messages" # Token 流式传输
):
if message_chunk.content:
print(message_chunk.content, end="|", flush=True)
输出:
为|什么|冰淇淋|从不|感到|孤独|?|因为|它|总是|和|它的|朋友|在|一起|!|
元数据包含:
{
'langgraph_node': 'call_model', # 节点名称
'langgraph_triggers': [...], # 触发器
'langgraph_path': [...], # 执行路径
}
过滤特定节点的 Tokens
from langgraph.graph import StateGraph, START
def write_joke(state):
response = model.invoke([
{"role": "user", "content": f"写一个关于 {state['topic']} 的笑话"}
])
return {"joke": response.content}
def write_poem(state):
response = model.invoke([
{"role": "user", "content": f"写一首关于 {state['topic']} 的诗"}
])
return {"poem": response.content}
graph = (
StateGraph(State)
.add_node("write_joke", write_joke)
.add_node("write_poem", write_poem)
.add_edge(START, "write_joke")
.add_edge(START, "write_poem")
.compile()
)
# 只流式传输 write_poem 节点的输出
for msg, metadata in graph.stream(
{"topic": "猫"},
stream_mode="messages"
):
# 按节点过滤
if msg.content and metadata.get("langgraph_node") == "write_poem":
print(msg.content, end="|", flush=True)
流式传输自定义数据
从工具或节点内部发送自定义进度更新。
使用 get_stream_writer
from langgraph.config import get_stream_writer
from langgraph.graph import StateGraph, START
class State(TypedDict):
query: str
answer: str
def node(state: State):
# 获取流写入器
writer = get_stream_writer()
# 发送自定义进度更新
writer({"progress": "开始处理查询..."})
# 模拟处理
import time
time.sleep(1)
writer({"progress": "正在生成答案..."})
time.sleep(1)
writer({"progress": "完成!"})
return {"answer": "答案内容"}
graph = (
StateGraph(State)
.add_node("process", node)
.add_edge(START, "process")
.compile()
)
# stream_mode="custom" 接收自定义数据
for chunk in graph.stream(
{"query": "示例查询"},
stream_mode="custom"
):
print(chunk)
输出:
{'progress': '开始处理查询...'}
{'progress': '正在生成答案...'}
{'progress': '完成!'}
在工具中使用流式传输
from langchain.tools import tool
from langgraph.config import get_stream_writer
@tool
def query_database(query: str) -> str:
"""查询数据库"""
writer = get_stream_writer()
# 发送进度更新
writer({"data": "已检索 0/100 条记录", "type": "progress"})
# 模拟查询
import time
for i in range(0, 101, 20):
time.sleep(0.5)
writer({"data": f"已检索 {i}/100 条记录", "type": "progress"})
return "查询结果"
# 在 agent 中使用
agent = create_agent(model="glm-4.5-air", tools=[query_database])
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "查询数据库"}]},
stream_mode="custom"
):
print(chunk)
输出:
{'data': '已检索 0/100 条记录', 'type': 'progress'}
{'data': '已检索 20/100 条记录', 'type': 'progress'}
{'data': '已检索 40/100 条记录', 'type': 'progress'}
...
Python < 3.11 异步注意事项
在 Python < 3.11 的异步代码中,get_stream_writer 不可用,需要直接使用 StreamWriter:
from langgraph.types import StreamWriter
async def async_node(state: State, writer: StreamWriter):
writer({"status": "开始"})
# ... 异步处理
writer({"status": "完成"})
return {"result": "data"}
流式传输子图输出
当使用嵌套子图时,可以流式传输来自父图和子图的输出。
from langgraph.graph import START, StateGraph
from typing import TypedDict
# 定义子图
class SubgraphState(TypedDict):
foo: str # 与父图共享的键
bar: str
def subgraph_node_1(state: SubgraphState):
return {"bar": "bar"}
def subgraph_node_2(state: SubgraphState):
return {"foo": state["foo"] + state["bar"]}
subgraph = (
StateGraph(SubgraphState)
.add_node("subgraph_node_1", subgraph_node_1)
.add_node("subgraph_node_2", subgraph_node_2)
.add_edge(START, "subgraph_node_1")
.add_edge("subgraph_node_1", "subgraph_node_2")
.compile()
)
# 定义父图
class ParentState(TypedDict):
foo: str
def node_1(state: ParentState):
return {"foo": "hi! " + state["foo"]}
graph = (
StateGraph(ParentState)
.add_node("node_1", node_1)
.add_node("node_2", subgraph) # 子图作为节点
.add_edge(START, "node_1")
.add_edge("node_1", "node_2")
.compile()
)
# 流式传输时包含子图输出
for chunk in graph.stream(
{"foo": "foo"},
stream_mode="updates",
subgraphs=True # 启用子图流式传输
):
print(chunk)
输出:
((), {'node_1': {'foo': 'hi! foo'}}) # 父图
(('node_2:UUID',), {'subgraph_node_1': {'bar': 'bar'}}) # 子图节点 1
(('node_2:UUID',), {'subgraph_node_2': {'foo': 'hi! foobar'}}) # 子图节点 2
((), {'node_2': {'foo': 'hi! foobar'}}) # 父图
命名空间说明:
()- 父图('node_2:UUID',)- 子图路径- 可以通过命名空间识别来自哪个图/子图
调试模式
debug 模式流式传输尽可能多的执行信息,包括完整状态和所有中间步骤。
for chunk in graph.stream(
{"topic": "冰淇淋"},
stream_mode="debug"
):
print(chunk)
输出包含:
- 节点名称
- 完整状态
- 执行元数据
- 时间戳
- 所有中间结果
高级特性
多模式组合
可以同时使用多个流式模式:
from langchain.agents import create_agent
from langgraph.config import get_stream_writer
@tool
def get_weather(city: str) -> str:
"""获取城市天气"""
writer = get_stream_writer()
writer(f"正在查找 {city} 的数据")
writer(f"已获取 {city} 的数据")
return f"{city} 总是晴天!"
agent = create_agent(
model="glm-4.5-air-mini",
tools=[get_weather],
)
# 同时使用 updates、messages 和 custom 模式
for stream_mode, chunk in agent.stream(
{"messages": [{"role": "user", "content": "旧金山天气如何?"}]},
stream_mode=["updates", "messages", "custom"]
):
print(f"{stream_mode}: {chunk}\n")
输出:
updates: {'model': {'messages': [AIMessage(tool_calls=[...])]}}
custom: 正在查找 San Francisco 的数据
custom: 已获取 San Francisco 的数据
updates: {'tools': {'messages': [ToolMessage(...)]}}
messages: (AIMessageChunk(content='旧金山'), {...})
updates: {'model': {'messages': [AIMessage(content='...')]}}
禁用 Streaming
在某些场景(如多 Agent 系统),可能需要禁用特定模型的流式传输。
Python
from langchain_openai import ChatOpenAI
# 禁用流式传输
model = ChatOpenAI(
model="o1-preview",
disable_streaming=True # 显式禁用
)
或使用 init_chat_model:
from langchain.chat_models import init_chat_model
model = init_chat_model(
"claude-sonnet-4-5-20250929",
disable_streaming=True
)
JavaScript
import { ChatOpenAI } from "@langchain/openai";
const model = new ChatOpenAI({
model: "o1-preview",
streaming: false // 禁用流式传输
});
使用场景:
- 某些模型不支持流式传输(如 OpenAI o1 系列)
- 多 Agent 系统中控制哪些 Agent 流式传输
- 需要等待完整响应再处理
过滤特定节点
# 只显示特定节点的流式输出
for msg, metadata in graph.stream(
inputs,
stream_mode="messages"
):
# 过滤指定节点
if msg.content and metadata.get("langgraph_node") == "target_node":
print(msg.content, end="", flush=True)
消息块处理
AIMessageChunk
在流式传输过程中,你会收到 AIMessageChunk 对象,可以累积成完整消息。
Python
from langchain_core.messages import AIMessageChunk
chunks = []
full_message = None
for chunk in model.stream("你好"):
chunks.append(chunk)
print(chunk.text)
# 累积完整消息
full_message = chunk if full_message is None else full_message + chunk
# full_message 现在是完整的 AIMessage
print(full_message.content_blocks)
JavaScript
import { AIMessageChunk } from "langchain";
let finalChunk = undefined;
for (const chunk of chunks) {
finalChunk = finalChunk ? finalChunk.concat(chunk) : chunk;
}
console.log(finalChunk.contentBlocks);
流式传输工具调用
# 工具调用逐步构建
full = None
for chunk in model_with_tools.stream("波士顿天气?"):
full = chunk if full is None else full + chunk
print(full.content_blocks)
输出示例:
[{"type": "tool_call_chunk", "name": "get_weather", "args": ""}]
[{"type": "tool_call_chunk", "name": "get_weather", "args": "{\"city"}]
[{"type": "tool_call_chunk", "name": "get_weather", "args": "\": \"Boston\"}"}]
# ... 最终变为完整的 tool_call
实际应用场景
1. 聊天应用
from langchain.chat_models import init_chat_model
from langchain.agents import create_agent
model = init_chat_model("glm-4.5-air")
agent = create_agent(model=model, tools=[])
def chat_stream(user_message: str):
"""流式聊天响应"""
for chunk in agent.stream(
{"messages": [{"role": "user", "content": user_message}]},
stream_mode="messages"
):
msg, metadata = chunk
if msg.content:
yield msg.content
# 使用
for text in chat_stream("介绍一下 LangChain"):
print(text, end="", flush=True)
2. 进度跟踪的文档处理
@tool
def process_document(file_path: str) -> str:
"""处理文档"""
writer = get_stream_writer()
writer({"stage": "reading", "progress": 0})
# 读取文档
writer({"stage": "analyzing", "progress": 30})
# 分析内容
writer({"stage": "summarizing", "progress": 70})
# 生成摘要
writer({"stage": "complete", "progress": 100})
return "处理完成"
agent = create_agent(model="glm-4.5-air", tools=[process_document])
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "处理 report.pdf"}]},
stream_mode="custom"
):
# 更新 UI 进度条
update_progress_bar(chunk["progress"])
3. 多步骤 Agent 工作流
from langgraph.graph import StateGraph, START, END
class WorkflowState(TypedDict):
query: str
search_results: list
analysis: str
summary: str
def search(state: WorkflowState):
writer = get_stream_writer()
writer({"step": "搜索中..."})
# 执行搜索
return {"search_results": [...]}
def analyze(state: WorkflowState):
writer = get_stream_writer()
writer({"step": "分析结果..."})
# 分析
return {"analysis": "..."}
def summarize(state: WorkflowState):
writer = get_stream_writer()
writer({"step": "生成摘要..."})
# 总结
return {"summary": "..."}
graph = (
StateGraph(WorkflowState)
.add_node("search", search)
.add_node("analyze", analyze)
.add_node("summarize", summarize)
.add_edge(START, "search")
.add_edge("search", "analyze")
.add_edge("analyze", "summarize")
.compile()
)
# 同时流式传输步骤和自定义进度
for mode, chunk in graph.stream(
{"query": "LangChain 是什么?"},
stream_mode=["updates", "custom"]
):
if mode == "custom":
print(f"进度: {chunk['step']}")
elif mode == "updates":
print(f"完成节点: {list(chunk.keys())[0]}")
4. 实时数据处理
@tool
def fetch_realtime_data(source: str) -> str:
"""获取实时数据"""
writer = get_stream_writer()
for i in range(10):
# 模拟实时数据流
data = fetch_batch(i)
writer({
"batch": i,
"data": data,
"timestamp": datetime.now()
})
time.sleep(0.5)
return "数据获取完成"
# 实时显示数据
for chunk in agent.stream(..., stream_mode="custom"):
plot_data(chunk["data"]) # 实时绘图
最佳实践(重点)
1. 选择合适的流式模式
# 聊天应用 - 使用 messages 模式
for msg, _ in agent.stream(..., stream_mode="messages"):
display_to_user(msg.content)
# 调试 - 使用 debug 模式
for chunk in graph.stream(..., stream_mode="debug"):
log_to_console(chunk)
# 进度跟踪 - 使用 custom 模式
for progress in agent.stream(..., stream_mode="custom"):
update_progress_bar(progress)
# 状态监控 - 使用 updates 模式
for update in graph.stream(..., stream_mode="updates"):
update_state_display(update)
2. 正确处理消息块累积
# ✅ 正确:累积块
full = None
for chunk in model.stream("长文本"):
full = chunk if full is None else full + chunk
# 可以安全地使用 full
# ❌ 错误:不累积,丢失上下文
for chunk in model.stream("长文本"):
print(chunk.text) # 只显示部分内容
3. 使用 flush 立即显示
# ✅ 正确:立即刷新输出
for chunk in model.stream("你好"):
print(chunk.text, end="", flush=True)
# ❌ 错误:可能被缓冲
for chunk in model.stream("你好"):
print(chunk.text, end="") # 没有 flush
4. 错误处理
from langgraph.errors import GraphRecursionError
try:
for chunk in graph.stream(
inputs,
stream_mode="updates",
config={"recursion_limit": 10}
):
process_chunk(chunk)
except GraphRecursionError:
print("达到递归限制,但获得了部分结果")
except Exception as e:
print(f"流式传输错误: {e}")
5. 组合多种模式
# 同时获取状态更新和自定义进度
for mode, chunk in agent.stream(
inputs,
stream_mode=["updates", "custom", "messages"]
):
if mode == "updates":
log_state_change(chunk)
elif mode == "custom":
update_ui_progress(chunk)
elif mode == "messages":
display_llm_output(chunk[0].content)
6. 避免阻塞
import asyncio
# ✅ 异步流式传输(非阻塞)
async def async_stream():
async for chunk in agent.astream(inputs):
await process_chunk(chunk)
# ❌ 同步流式传输(阻塞)
def sync_stream():
for chunk in agent.stream(inputs):
process_chunk(chunk) # 阻塞主线程
7. 内存管理
# 对于长时间运行的流,定期清理
chunks_buffer = []
MAX_BUFFER_SIZE = 100
for chunk in model.stream("超长文本"):
chunks_buffer.append(chunk)
# 定期处理并清空缓冲区
if len(chunks_buffer) >= MAX_BUFFER_SIZE:
process_chunks(chunks_buffer)
chunks_buffer = []
8. 子图流式传输
# ✅ 启用子图流式传输以获得完整可见性
for chunk in graph.stream(
inputs,
stream_mode="updates",
subgraphs=True # 包含子图输出
):
process_chunk(chunk)
# ❌ 不启用子图流式传输可能丢失信息
for chunk in graph.stream(inputs, stream_mode="updates"):
# 只能看到父图输出
process_chunk(chunk)
性能优化
1. 使用流式传输减少感知延迟
import time
# 非流式:用户等待完整响应
start = time.time()
response = model.invoke("写一篇长文章")
print(response.content) # 15 秒后显示
print(f"总时间: {time.time() - start}s")
# 流式:立即开始显示
start = time.time()
first_token_time = None
for chunk in model.stream("写一篇长文章"):
if first_token_time is None:
first_token_time = time.time()
print(f"首个 token 时间: {first_token_time - start}s") # ~0.5s
print(chunk.text, end="", flush=True)
print(f"\n总时间: {time.time() - start}s") # 总时间相同,但体验更好
2. 批量处理
# ✅ 批量处理流式块
batch = []
BATCH_SIZE = 10
for chunk in model.stream("长文本"):
batch.append(chunk)
if len(batch) >= BATCH_SIZE:
process_batch(batch)
batch = []
# 处理剩余
if batch:
process_batch(batch)
3. 异步并发
import asyncio
async def stream_multiple_agents():
"""并发运行多个 agent 流"""
tasks = [
agent1.astream(input1, stream_mode="messages"),
agent2.astream(input2, stream_mode="messages"),
agent3.astream(input3, stream_mode="messages"),
]
# 并发处理所有流
results = await asyncio.gather(*tasks)
return results
4. 选择性流式传输
# 只流式传输需要的节点
for msg, metadata in graph.stream(
inputs,
stream_mode="messages"
):
# 只处理特定节点
if metadata.get("langgraph_node") in ["important_node_1", "important_node_2"]:
process_message(msg)
# 忽略其他节点,减少处理开销
5. 缓存和复用
from functools import lru_cache
@lru_cache(maxsize=100)
def get_cached_stream_result(query: str):
"""缓存流式结果"""
result = []
for chunk in model.stream(query):
result.append(chunk)
return result
# 复用缓存的流
cached = get_cached_stream_result("常见问题")
for chunk in cached:
print(chunk.text, end="")
快速参考
Stream 模式对比表
| 模式 | 返回内容 | 使用场景 | 示例输出 |
|---|---|---|---|
values |
完整状态 | 查看所有状态 | {"topic": "AI", "joke": "..."} |
updates |
增量更新 | 只关心变化 | {"node": {"field": "value"}} |
messages |
LLM tokens + 元数据 | 流式聊天 | (AIMessageChunk("Hi"), {...}) |
custom |
自定义数据 | 进度/日志 | {"progress": 50, "status": "..."} |
debug |
详细信息 | 调试 | {node, state, metadata, ...} |
常用代码片段
基本流式传输
# Model streaming
for chunk in model.stream("query"):
print(chunk.text, end="", flush=True)
# Agent streaming
for chunk in agent.stream({"messages": [...]}, stream_mode="updates"):
print(chunk)
# Graph streaming
for chunk in graph.stream(inputs, stream_mode="values"):
print(chunk)
累积消息
full = None
for chunk in model.stream("query"):
full = chunk if full is None else full + chunk
自定义进度
from langgraph.config import get_stream_writer
def my_node(state):
writer = get_stream_writer()
writer({"progress": "开始"})
# ... 处理
writer({"progress": "完成"})
return state
# 流式传输
for chunk in graph.stream(inputs, stream_mode="custom"):
print(chunk["progress"])
多模式
for mode, chunk in agent.stream(
inputs,
stream_mode=["updates", "messages", "custom"]
):
if mode == "updates":
print(f"State: {chunk}")
elif mode == "messages":
print(f"Token: {chunk[0].content}")
elif mode == "custom":
print(f"Progress: {chunk}")
子图流式传输
for chunk in graph.stream(
inputs,
stream_mode="updates",
subgraphs=True # 包含子图
):
namespace, data = chunk
if namespace == ():
print("父图:", data)
else:
print(f"子图 {namespace}:", data)
事件类型(astream_events)
async for event in model.astream_events("Hello"):
if event["event"] == "on_chat_model_start":
print(f"开始: {event['data']['input']}")
elif event["event"] == "on_chat_model_stream":
print(f"Token: {event['data']['chunk'].text}")
elif event["event"] == "on_chat_model_end":
print(f"完整消息: {event['data']['output'].text}")
禁用流式传输
# Python
model = ChatOpenAI(model="o1-preview", disable_streaming=True)
# JavaScript
const model = new ChatOpenAI({ model: "o1-preview", streaming: false });
总结
LangChain 和 LangGraph 的流式传输系统提供了强大而灵活的实时数据传输能力:
核心优势:
✅ 改善用户体验 - 即时反馈和进度可视化
✅ 多种流式模式 - 适应不同场景需求
✅ 自动流式传输 - 简化开发复杂度
✅ 子图支持 - 复杂工作流的完整可见性
✅ 自定义数据 - 灵活的进度和日志传输
关键要点:
- 使用
stream()或astream()方法启动流式传输 - 通过
stream_mode参数选择合适的流式模式 - 使用
get_stream_writer()发送自定义进度更新 - 正确累积
AIMessageChunk以获得完整消息 - 对于复杂工作流,启用
subgraphs=True - 使用多模式组合获得全面的可观察性
通过合理使用流式传输,你可以构建响应迅速、用户体验优秀的 LLM 应用程序!
更多推荐



所有评论(0)