目录

一、异步编程:从 “死心眼” 到 “多任务协作”

1.1 同步 vs 异步:一个生活化的例子

1.2 协程:轻量级的并发单元

1.3 事件循环:协程的 “调度中心”

二、LangChain 流式传输:让大模型 “边想边说”

2.1 基础流式调用:开箱即用

2.2 Runnable 接口:流式传输的统一抽象

2.3 自定义流式解析器:控制输出粒度

三、深度探索:流式传输的底层原理

3.1 SSE 协议:服务器主动推送的轻量级方案

3.2 LangChain 源码分析:流式传输的链路

3.3 核心转换:从 OpenAI 原生块到 AIMessageChunk

3.3.1 _convert_chunk_to_generation_chunk:顶层转换入口

3.3.2 _convert_delta_to_message_chunk:消息类型与内容解析

3.4 核心问题解答

四、总结与展望


在大模型应用开发中,实时流式响应已经成为提升用户体验的标配。从 ChatGPT 的逐字输出,到各类 AI 助手的即时反馈,背后都离不开异步编程与流式传输技术的支撑。本文将从异步协程的基础概念出发,逐步深入到 LangChain 流式传输的实现原理,带你构建一个完整的技术认知框架。


一、异步编程:从 “死心眼” 到 “多任务协作”

在同步编程的世界里,程序就像一个 “死心眼” 的人,必须一件事做完才能做下一件。比如烧水和发短信,必须等水开了才能发短信,总耗时就是两者之和。而异步编程则让我们学会 “等待时切换任务”,极大提升了 CPU 利用率。

1.1 同步 vs 异步:一个生活化的例子

import time

def boil_water():
    print("开始煮水...")
    time.sleep(5)  # 模拟阻塞等待5秒
    print("水开了!")

def send_message():
    print("开始发短信...")
    time.sleep(2)  # 模拟阻塞等待2秒
    print("短信发送成功!")

def main():
    boil_water()    # 先花5秒煮水,期间什么也不能做
    send_message()  # 水开后再花2秒发短信

main()
# 总耗时:7秒

问题在于,boil_water 函数等待的 5 秒里,CPU 完全空闲,却不能去执行 send_message 任务,效率低下。这就是同步编程的瓶颈。

1.2 协程:轻量级的并发单元

协程(Coroutine)是一种用户态的 “轻量级线程”,它的切换完全由用户空间控制,避免了操作系统内核态的频繁介入,因此开销极低。在 Python 中,我们通过 asyncio 模块来实现协程。

import asyncio

async def boil_water_async():
    print("开始煮水...")
    await asyncio.sleep(5)  # 关键!await表示“等待这个操作完成,但期间让事件循环去做别的事”
    print("水开了!")

async def send_message_async():
    print("开始发短信...")
    await asyncio.sleep(2)  # 同样,等待2秒,但让出控制权
    print("短信发送成功!")

async def main():
    # 创建两个任务,交给事件循环调度
    task1 = asyncio.create_task(boil_water_async())
    task2 = asyncio.create_task(send_message_async())
    
    # 等待两个任务都完成
    await task1
    await task2

asyncio.run(main())
# 总耗时:5秒(两个任务的等待时间是并发的)

1.3 事件循环:协程的 “调度中心”

事件循环是 asyncio 的核心,你可以把它想象成一个高效的待办事项管理员:

  1. 它维护着一个任务列表(比如:煮水、发短信)。
  2. 不断循环检查每个任务:
    • 如果任务处于 “等待 I/O” 状态(比如等水开、等网络响应),就暂停它,立即执行下一个 “就绪” 的任务。
    • 如果任务的等待时间到了或者 I/O 操作完成,事件循环就恢复执行这个任务。

通过这种方式,我们可以在单线程内同时处理多个任务,实现高效的并发。


二、LangChain 流式传输:让大模型 “边想边说”

在大模型应用中,用户体验的关键在于 “即时反馈”。如果等模型生成完所有内容再一次性返回,用户等待时间过长,体验极差。LangChain 通过 .stream().astream() 方法,完美支持了流式输出。

2.1 基础流式调用:开箱即用

from langchain_openai import ChatOpenAI

# 定义大模型
model = ChatOpenAI(model="gpt-4o-mini")

# 异步调用
async def async_stream():
    print("=== 异步调用 ===")
    async for chunk in model.astream("讲一个50字的笑话"):
        print(chunk.content, end="|", flush=True)

import asyncio
asyncio.run(async_stream())

运行这段代码,你会看到模型的输出像打字机一样逐字出现,而不是等待几秒后一次性弹出。

2.2 Runnable 接口:流式传输的统一抽象

LangChain 中的所有核心组件(模型、输出解析器、向量存储等)都实现了 Runnable 接口,这使得流式传输成为一种标准能力,而不是某个模型的特例。

Runnable 接口定义了几种核心模式:

  • Invoke:同步调用,单个输入转换为输出。
  • Batched:批处理,多个输入高效转换。
  • Streamed:流式传输,输出在生成时逐块返回。
  • Inspected:检查输入、输出和配置信息。
  • Composed:组合多个 Runnable,创建复杂的处理管道。

这意味着,流式传输的能力可以无缝集成到你的整个处理链路中,从模型调用到输出解析,全程支持实时反馈。

2.3 自定义流式解析器:控制输出粒度

默认的流式传输是逐 token 返回的,但有时我们需要更高层次的语义单位,比如逐句输出。这时就可以通过自定义生成器函数来实现。

from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from typing import Iterator, List

# 定义大模型
model = ChatOpenAI(model="gpt-4o-mini")
# 定义输出解析器
parser = StrOutputParser()

# 定义生成器
def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:
    buffer = ""
    for chunk in input:
        buffer += chunk
        # 只要缓冲区中包含句号,就找到第一个句号的位置
        while "。" in buffer:
            stop_index = buffer.index("。")
            # 将句号之前的内容(去除首尾空格)作为一个句子放入列表中并产出
            yield [buffer[:stop_index].strip()]
            # 更新缓冲区,保留句号之后的内容
            buffer = buffer[stop_index + 1 :]
    # 如果缓冲区还有剩余内容,也产出
    if buffer:
        yield [buffer.strip()]

# 定义链
chain = model | parser | split_into_list

for chunk in chain.stream("写一段关于爱情的歌词,需要5句话,每句话用句号分割:"):
    print(chunk, end="|", flush=True)

这样,模型的输出就会被自动切分成句子,逐句返回,大大提升了交互的自然度。


三、深度探索:流式传输的底层原理

LangChain 本身并不 “创造” 流式传输,它是依赖于底层大模型供应商(如 OpenAI)的能力。要真正理解流式传输,我们需要深入到网络协议和源码层面。

3.1 SSE 协议:服务器主动推送的轻量级方案

HTTP 协议本身是无状态的请求 - 响应模式,无法做到服务器主动推送消息。但通过 Server-Sent Events (SSE) 技术,我们可以实现流式传输。

SSE 是一种基于 HTTP 的轻量级实时通信协议,核心特点:

  • 基于 HTTP 协议:无需额外端口或协议,兼容性好。
  • 单向通信:仅支持服务器向客户端推送数据。
  • 自动重连机制:客户端断开连接时,浏览器会自动尝试重新连接。
  • 自定义消息类型:通过 Content-Type: text/event-stream 标识为事件流格式。

服务器向浏览器发送 SSE 数据,需要设置必要的 HTTP 头信息:

Content-Type: text/event-stream;charset=utf-8
Connection: keep-alive

每个 SSE 消息由若干行组成,由 \n\n 分隔,常见字段包括:

  • data::数据内容(必需)。
  • event::事件类型(非必需,默认是 message)。
  • id::数据标识符(非必需)。
  • retry::重连间隔时间(非必需)。

3.2 LangChain 源码分析:流式传输的链路

当我们向 OpenAI 发起流式请求时,LangChain 实际上是通过 BaseChatOpenAI 类中的 _stream() 方法发起调用。

def _stream(
    self,
    messages: list[BaseMessage],
    stop: Optional[list[str]] = None,
    run_manager: Optional[CallbackManagerForLLMRun] = None,
    options: Optional[dict] = None,
    **kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
    # 1. 流式配置
    kwargs["stream"] = True  # 强制启用流式模式
    kwargs["stream_options"] = self._build_stream_usage(stream_usage, **kwargs)
    
    # 2. 请求构建
    payload = self._get_request_payload(messages, stop, stop, **kwargs)
    
    # 3. 发起调用
    if "response_format" in payload:
        response_stream = self.root_client.beta.chat.completions.stream(**payload)
    else:
        response_stream = self.client.chat.completions.create(**payload, stream=True)
    
    # 4. 响应处理
    with context_manager as response:
        for chunk in response:
            # 将 OpenAI 原生块转换为 AIMessageChunk 数据块
            generation_chunk = self._convert_chunk_to_generation_chunk(
                chunk,
                default_chunk_class,
                base_generation_info if first_chunk else {},
            )
            if generation_chunk is None:
                continue
            # 4.2 触发新 token 回调
            if run_manager:
                run_manager.on_llm_new_token(
                    chunk_generation.chunk,
                    logprobs=logprobs,
                )
            # 4.3 产生生成块
            yield generation_chunk

从源码中可以清晰地看到:

  1. LangChain 在请求中设置 stream=True,告诉 OpenAI 启用 SSE 流式传输。
  2. OpenAI 服务器会持续发送多个 data: 块,每个块包含生成的部分内容。
  3. LangChain 接收这些原生块,并通过 _convert_chunk_to_generation_chunk 方法,将其转换为统一的 AIMessageChunk 对象,供后续处理链路使用。

3.3 核心转换:从 OpenAI 原生块到 AIMessageChunk

OpenAI 返回的 SSE 事件块是原生的 JSON 格式,而 LangChain 需要将其转换为统一的 AIMessageChunk 类型,以便在整个框架中处理。这个转换过程主要由 _convert_chunk_to_generation_chunk_convert_delta_to_message_chunk 两个核心方法完成。

3.3.1 _convert_chunk_to_generation_chunk:顶层转换入口
def _convert_chunk_to_generation_chunk(
    self,
    chunk: dict,
    default_chunk_class: type,
    base_generation_info: Optional[dict],
) -> Optional[ChatGenerationChunk]:
    # 1. 提取选择项数据
    choices = chunk.get("choices", [])
    # 从 beta.chat.completions.stream
    or chunk.get("chunk", {}).get("choices", [])
    
    # 2. 处理有效选择项
    choice = choices[0]
    if choice["delta"] is None:
        return None
    
    # 3. 转换增量数据为消息块
    message_chunk = self._convert_delta_to_message_chunk(
        choice["delta"], default_chunk_class
    )
    
    # 4. 构建其他生成信息
    generation_info = {}
    if base_generation_info is not None:
        generation_info.update(base_generation_info)
    
    # 5. 返回生成块
    return ChatGenerationChunk(
        message=message_chunk, generation_info=generation_info or None
    )

这个方法的核心作用是:

  • 从 OpenAI 返回的原始 chunk 中提取 choices 数据。
  • 调用 _convert_delta_to_message_chunkdelta 字段转换为消息块。
  • 封装成 ChatGenerationChunk 并返回,供上层流式迭代使用。
3.3.2 _convert_delta_to_message_chunk:消息类型与内容解析
def _convert_delta_to_message_chunk(
    self,
    _dict: Mapping[str, Any],
    default_chunk_class: type[BaseMessageChunk],
) -> BaseMessageChunk:
    # 1. 提取 OpenAI 格式数据
    id_ = _dict.get("id")
    role = cast(str, _dict.get("role") or "")
    content = cast(str, _dict.get("content") or "")
    additional_kwargs: dict = {}
    
    # 处理 function_call
    if "function_call" in _dict and function_call["name"] is not None:
        additional_kwargs["function_call"] = function_call
    
    # 处理 tool_calls
    if "tool_calls" in _dict:
        raw_tool_calls = _dict["tool_calls"]
        try:
            tool_call_chunks = [
                ToolCallChunk(
                    name=rtcf("function").get("name"),
                    args=rtcf("function").get("arguments"),
                    id=rtcf("id"),
                    index=rtcf("index"),
                )
                for rtc in raw_tool_calls
            ]
        except KeyError:
            pass
    
    # 2. 根据角色,构造消息
    if role == "user" or default_chunk_class == HumanMessageChunk:
        return HumanMessageChunk(content=content, id=id_)
    elif role == "assistant" or default_chunk_class == AIMessageChunk:
        return AIMessageChunk(
            content=content,
            additional_kwargs=additional_kwargs,
            id=id_,
            tool_call_chunks=tool_call_chunks,
        )
    elif role == "system" or default_chunk_class == SystemMessageChunk:
        if role == "developer":
            additional_kwargs["__openai_role__"] = "developer"
        return SystemMessageChunk(
            content=content, id=id_, additional_kwargs=additional_kwargs
        )
    elif role == "function" or default_chunk_class == FunctionMessageChunk:
        return FunctionMessageChunk(content=content, name=dict["name"], id=id_)
    elif role == "tool" or default_chunk_class == ToolMessageChunk:
        return ToolMessageChunk(
            content=content, tool_call_id=dict["tool_call_id"], id=id_
        )
    else:
        return default_chunk_class(content=content, id=id_)

这个方法是转换的核心,它完成了:

  • 从 OpenAI 的 delta 结构中提取 rolecontentfunction_calltool_calls 等关键信息。
  • 根据 role 字段,创建对应的消息块类型(如 AIMessageChunkHumanMessageChunkFunctionMessageChunk 等)。
  • 将工具调用、函数调用等复杂结构封装到消息的 additional_kwargs 中,保持接口统一。

3.4 核心问题解答

  1. 发起调用时,底层使用什么协议?LangChain 使用 OpenAI 的官方 Python SDK,该 SDK 内部通过 HTTP/1.1 协议发起请求,并利用 SSE 技术接收流式响应。

  2. 如何支持流式传输?通过在请求中设置 stream=True,OpenAI 服务器会将响应打包成一系列 SSE 事件块,持续推送给客户端。

  3. 返回的块是什么格式,如何转换为 AIMessageChunk?OpenAI 返回的是 JSON 格式的原生块,LangChain 通过 _convert_chunk_to_generation_chunk_convert_delta_to_message_chunk 方法,将其解析、封装成统一的 AIMessageChunk 对象,确保与 LangChain 的其他组件无缝协作。


四、总结与展望

从异步协程到流式传输,我们构建了一个从基础到应用的完整技术栈:

  • 异步协程:解决了单线程内的并发问题,让程序在等待时不 “死等”。
  • SSE 协议:提供了服务器主动推送的轻量级方案,是大模型流式输出的网络基础。
  • LangChain 抽象:通过 Runnable 接口和流式解析器,让开发者可以轻松构建端到端的实时交互应用。
  • 底层转换机制:LangChain 通过 _convert_chunk_to_generation_chunk 等核心方法,将不同模型供应商(OpenAI、Anthropic 等)的原生流式响应,统一转换为 AIMessageChunk 等自封装消息格式,实现了跨平台的流式处理能力。

未来,随着大模型能力的不断增强,流式传输将不仅仅是 “逐字输出”,还会支持更复杂的交互模式,比如多模态内容的实时生成、工具调用的中间步骤反馈等。掌握这些底层原理,将让你在大模型应用开发的浪潮中占据先机。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐