流式传输: LangChain 实时交互的底层逻辑
本文探讨了大模型应用开发中的实时流式响应技术。首先介绍了异步编程的优势,通过协程和事件循环实现高效并发处理。然后详细解析了LangChain的流式传输实现,包括.stream()和.astream()方法的使用、Runnable接口的统一抽象以及自定义流式解析器开发。文章深入剖析了底层SSE协议的工作原理和LangChain源码中的关键转换机制,展示了如何将OpenAI原生数据块转换为统一的AIM
目录
3.3 核心转换:从 OpenAI 原生块到 AIMessageChunk
3.3.1 _convert_chunk_to_generation_chunk:顶层转换入口
3.3.2 _convert_delta_to_message_chunk:消息类型与内容解析
在大模型应用开发中,实时流式响应已经成为提升用户体验的标配。从 ChatGPT 的逐字输出,到各类 AI 助手的即时反馈,背后都离不开异步编程与流式传输技术的支撑。本文将从异步协程的基础概念出发,逐步深入到 LangChain 流式传输的实现原理,带你构建一个完整的技术认知框架。
一、异步编程:从 “死心眼” 到 “多任务协作”
在同步编程的世界里,程序就像一个 “死心眼” 的人,必须一件事做完才能做下一件。比如烧水和发短信,必须等水开了才能发短信,总耗时就是两者之和。而异步编程则让我们学会 “等待时切换任务”,极大提升了 CPU 利用率。
1.1 同步 vs 异步:一个生活化的例子
import time
def boil_water():
print("开始煮水...")
time.sleep(5) # 模拟阻塞等待5秒
print("水开了!")
def send_message():
print("开始发短信...")
time.sleep(2) # 模拟阻塞等待2秒
print("短信发送成功!")
def main():
boil_water() # 先花5秒煮水,期间什么也不能做
send_message() # 水开后再花2秒发短信
main()
# 总耗时:7秒
问题在于,boil_water 函数等待的 5 秒里,CPU 完全空闲,却不能去执行 send_message 任务,效率低下。这就是同步编程的瓶颈。
1.2 协程:轻量级的并发单元
协程(Coroutine)是一种用户态的 “轻量级线程”,它的切换完全由用户空间控制,避免了操作系统内核态的频繁介入,因此开销极低。在 Python 中,我们通过 asyncio 模块来实现协程。
import asyncio
async def boil_water_async():
print("开始煮水...")
await asyncio.sleep(5) # 关键!await表示“等待这个操作完成,但期间让事件循环去做别的事”
print("水开了!")
async def send_message_async():
print("开始发短信...")
await asyncio.sleep(2) # 同样,等待2秒,但让出控制权
print("短信发送成功!")
async def main():
# 创建两个任务,交给事件循环调度
task1 = asyncio.create_task(boil_water_async())
task2 = asyncio.create_task(send_message_async())
# 等待两个任务都完成
await task1
await task2
asyncio.run(main())
# 总耗时:5秒(两个任务的等待时间是并发的)
1.3 事件循环:协程的 “调度中心”
事件循环是 asyncio 的核心,你可以把它想象成一个高效的待办事项管理员:
- 它维护着一个任务列表(比如:煮水、发短信)。
- 不断循环检查每个任务:
- 如果任务处于 “等待 I/O” 状态(比如等水开、等网络响应),就暂停它,立即执行下一个 “就绪” 的任务。
- 如果任务的等待时间到了或者 I/O 操作完成,事件循环就恢复执行这个任务。
通过这种方式,我们可以在单线程内同时处理多个任务,实现高效的并发。
二、LangChain 流式传输:让大模型 “边想边说”
在大模型应用中,用户体验的关键在于 “即时反馈”。如果等模型生成完所有内容再一次性返回,用户等待时间过长,体验极差。LangChain 通过 .stream() 和 .astream() 方法,完美支持了流式输出。
2.1 基础流式调用:开箱即用
from langchain_openai import ChatOpenAI
# 定义大模型
model = ChatOpenAI(model="gpt-4o-mini")
# 异步调用
async def async_stream():
print("=== 异步调用 ===")
async for chunk in model.astream("讲一个50字的笑话"):
print(chunk.content, end="|", flush=True)
import asyncio
asyncio.run(async_stream())
运行这段代码,你会看到模型的输出像打字机一样逐字出现,而不是等待几秒后一次性弹出。
2.2 Runnable 接口:流式传输的统一抽象
LangChain 中的所有核心组件(模型、输出解析器、向量存储等)都实现了 Runnable 接口,这使得流式传输成为一种标准能力,而不是某个模型的特例。
Runnable 接口定义了几种核心模式:
- Invoke:同步调用,单个输入转换为输出。
- Batched:批处理,多个输入高效转换。
- Streamed:流式传输,输出在生成时逐块返回。
- Inspected:检查输入、输出和配置信息。
- Composed:组合多个 Runnable,创建复杂的处理管道。
这意味着,流式传输的能力可以无缝集成到你的整个处理链路中,从模型调用到输出解析,全程支持实时反馈。
2.3 自定义流式解析器:控制输出粒度
默认的流式传输是逐 token 返回的,但有时我们需要更高层次的语义单位,比如逐句输出。这时就可以通过自定义生成器函数来实现。
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from typing import Iterator, List
# 定义大模型
model = ChatOpenAI(model="gpt-4o-mini")
# 定义输出解析器
parser = StrOutputParser()
# 定义生成器
def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:
buffer = ""
for chunk in input:
buffer += chunk
# 只要缓冲区中包含句号,就找到第一个句号的位置
while "。" in buffer:
stop_index = buffer.index("。")
# 将句号之前的内容(去除首尾空格)作为一个句子放入列表中并产出
yield [buffer[:stop_index].strip()]
# 更新缓冲区,保留句号之后的内容
buffer = buffer[stop_index + 1 :]
# 如果缓冲区还有剩余内容,也产出
if buffer:
yield [buffer.strip()]
# 定义链
chain = model | parser | split_into_list
for chunk in chain.stream("写一段关于爱情的歌词,需要5句话,每句话用句号分割:"):
print(chunk, end="|", flush=True)
这样,模型的输出就会被自动切分成句子,逐句返回,大大提升了交互的自然度。
三、深度探索:流式传输的底层原理
LangChain 本身并不 “创造” 流式传输,它是依赖于底层大模型供应商(如 OpenAI)的能力。要真正理解流式传输,我们需要深入到网络协议和源码层面。
3.1 SSE 协议:服务器主动推送的轻量级方案
HTTP 协议本身是无状态的请求 - 响应模式,无法做到服务器主动推送消息。但通过 Server-Sent Events (SSE) 技术,我们可以实现流式传输。
SSE 是一种基于 HTTP 的轻量级实时通信协议,核心特点:
- 基于 HTTP 协议:无需额外端口或协议,兼容性好。
- 单向通信:仅支持服务器向客户端推送数据。
- 自动重连机制:客户端断开连接时,浏览器会自动尝试重新连接。
- 自定义消息类型:通过
Content-Type: text/event-stream标识为事件流格式。
服务器向浏览器发送 SSE 数据,需要设置必要的 HTTP 头信息:
Content-Type: text/event-stream;charset=utf-8
Connection: keep-alive
每个 SSE 消息由若干行组成,由 \n\n 分隔,常见字段包括:
data::数据内容(必需)。event::事件类型(非必需,默认是message)。id::数据标识符(非必需)。retry::重连间隔时间(非必需)。
3.2 LangChain 源码分析:流式传输的链路
当我们向 OpenAI 发起流式请求时,LangChain 实际上是通过 BaseChatOpenAI 类中的 _stream() 方法发起调用。
def _stream(
self,
messages: list[BaseMessage],
stop: Optional[list[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
options: Optional[dict] = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
# 1. 流式配置
kwargs["stream"] = True # 强制启用流式模式
kwargs["stream_options"] = self._build_stream_usage(stream_usage, **kwargs)
# 2. 请求构建
payload = self._get_request_payload(messages, stop, stop, **kwargs)
# 3. 发起调用
if "response_format" in payload:
response_stream = self.root_client.beta.chat.completions.stream(**payload)
else:
response_stream = self.client.chat.completions.create(**payload, stream=True)
# 4. 响应处理
with context_manager as response:
for chunk in response:
# 将 OpenAI 原生块转换为 AIMessageChunk 数据块
generation_chunk = self._convert_chunk_to_generation_chunk(
chunk,
default_chunk_class,
base_generation_info if first_chunk else {},
)
if generation_chunk is None:
continue
# 4.2 触发新 token 回调
if run_manager:
run_manager.on_llm_new_token(
chunk_generation.chunk,
logprobs=logprobs,
)
# 4.3 产生生成块
yield generation_chunk
从源码中可以清晰地看到:
- LangChain 在请求中设置
stream=True,告诉 OpenAI 启用 SSE 流式传输。 - OpenAI 服务器会持续发送多个
data:块,每个块包含生成的部分内容。 - LangChain 接收这些原生块,并通过
_convert_chunk_to_generation_chunk方法,将其转换为统一的AIMessageChunk对象,供后续处理链路使用。
3.3 核心转换:从 OpenAI 原生块到 AIMessageChunk
OpenAI 返回的 SSE 事件块是原生的 JSON 格式,而 LangChain 需要将其转换为统一的 AIMessageChunk 类型,以便在整个框架中处理。这个转换过程主要由 _convert_chunk_to_generation_chunk 和 _convert_delta_to_message_chunk 两个核心方法完成。
3.3.1 _convert_chunk_to_generation_chunk:顶层转换入口
def _convert_chunk_to_generation_chunk(
self,
chunk: dict,
default_chunk_class: type,
base_generation_info: Optional[dict],
) -> Optional[ChatGenerationChunk]:
# 1. 提取选择项数据
choices = chunk.get("choices", [])
# 从 beta.chat.completions.stream
or chunk.get("chunk", {}).get("choices", [])
# 2. 处理有效选择项
choice = choices[0]
if choice["delta"] is None:
return None
# 3. 转换增量数据为消息块
message_chunk = self._convert_delta_to_message_chunk(
choice["delta"], default_chunk_class
)
# 4. 构建其他生成信息
generation_info = {}
if base_generation_info is not None:
generation_info.update(base_generation_info)
# 5. 返回生成块
return ChatGenerationChunk(
message=message_chunk, generation_info=generation_info or None
)
这个方法的核心作用是:
- 从 OpenAI 返回的原始 chunk 中提取
choices数据。 - 调用
_convert_delta_to_message_chunk将delta字段转换为消息块。 - 封装成
ChatGenerationChunk并返回,供上层流式迭代使用。
3.3.2 _convert_delta_to_message_chunk:消息类型与内容解析
def _convert_delta_to_message_chunk(
self,
_dict: Mapping[str, Any],
default_chunk_class: type[BaseMessageChunk],
) -> BaseMessageChunk:
# 1. 提取 OpenAI 格式数据
id_ = _dict.get("id")
role = cast(str, _dict.get("role") or "")
content = cast(str, _dict.get("content") or "")
additional_kwargs: dict = {}
# 处理 function_call
if "function_call" in _dict and function_call["name"] is not None:
additional_kwargs["function_call"] = function_call
# 处理 tool_calls
if "tool_calls" in _dict:
raw_tool_calls = _dict["tool_calls"]
try:
tool_call_chunks = [
ToolCallChunk(
name=rtcf("function").get("name"),
args=rtcf("function").get("arguments"),
id=rtcf("id"),
index=rtcf("index"),
)
for rtc in raw_tool_calls
]
except KeyError:
pass
# 2. 根据角色,构造消息
if role == "user" or default_chunk_class == HumanMessageChunk:
return HumanMessageChunk(content=content, id=id_)
elif role == "assistant" or default_chunk_class == AIMessageChunk:
return AIMessageChunk(
content=content,
additional_kwargs=additional_kwargs,
id=id_,
tool_call_chunks=tool_call_chunks,
)
elif role == "system" or default_chunk_class == SystemMessageChunk:
if role == "developer":
additional_kwargs["__openai_role__"] = "developer"
return SystemMessageChunk(
content=content, id=id_, additional_kwargs=additional_kwargs
)
elif role == "function" or default_chunk_class == FunctionMessageChunk:
return FunctionMessageChunk(content=content, name=dict["name"], id=id_)
elif role == "tool" or default_chunk_class == ToolMessageChunk:
return ToolMessageChunk(
content=content, tool_call_id=dict["tool_call_id"], id=id_
)
else:
return default_chunk_class(content=content, id=id_)
这个方法是转换的核心,它完成了:
- 从 OpenAI 的
delta结构中提取role、content、function_call、tool_calls等关键信息。 - 根据
role字段,创建对应的消息块类型(如AIMessageChunk、HumanMessageChunk、FunctionMessageChunk等)。 - 将工具调用、函数调用等复杂结构封装到消息的
additional_kwargs中,保持接口统一。
3.4 核心问题解答
-
发起调用时,底层使用什么协议?LangChain 使用 OpenAI 的官方 Python SDK,该 SDK 内部通过 HTTP/1.1 协议发起请求,并利用 SSE 技术接收流式响应。
-
如何支持流式传输?通过在请求中设置
stream=True,OpenAI 服务器会将响应打包成一系列 SSE 事件块,持续推送给客户端。 -
返回的块是什么格式,如何转换为 AIMessageChunk?OpenAI 返回的是 JSON 格式的原生块,LangChain 通过
_convert_chunk_to_generation_chunk和_convert_delta_to_message_chunk方法,将其解析、封装成统一的AIMessageChunk对象,确保与 LangChain 的其他组件无缝协作。
四、总结与展望
从异步协程到流式传输,我们构建了一个从基础到应用的完整技术栈:
- 异步协程:解决了单线程内的并发问题,让程序在等待时不 “死等”。
- SSE 协议:提供了服务器主动推送的轻量级方案,是大模型流式输出的网络基础。
- LangChain 抽象:通过 Runnable 接口和流式解析器,让开发者可以轻松构建端到端的实时交互应用。
- 底层转换机制:LangChain 通过
_convert_chunk_to_generation_chunk等核心方法,将不同模型供应商(OpenAI、Anthropic 等)的原生流式响应,统一转换为AIMessageChunk等自封装消息格式,实现了跨平台的流式处理能力。
未来,随着大模型能力的不断增强,流式传输将不仅仅是 “逐字输出”,还会支持更复杂的交互模式,比如多模态内容的实时生成、工具调用的中间步骤反馈等。掌握这些底层原理,将让你在大模型应用开发的浪潮中占据先机。
更多推荐


所有评论(0)