本文主要介绍流式传输。

流式传输对于使基于LLM的应用程序能够响应用户至关重要,它显著改善用户体验。

1.stream()同步传输

在LangChain聊天模型,可以使用.stream()的方法,来同步生成流式响应的效果。

聊天模型的.stream()方法返回一个迭代器。该迭代器在生成输出时同步产生输出消息块。可以使用for循环实时处理每个块。代码如下:

from langchain_openai import ChatOpenAI

model=ChatOpenAI(model="gpt-4o-mini")
chunks=[]
for chunk in model.stream("讲一个笑话"):
    chunks.append(chunk)
    print(chunk.content,end="|",flush=True)

2.astream()异步传输

2.1异步相关概念

比喻:当需要洗盘子的时候可以同时烧水。

同步方式:做事情一件一件来。

import time
def boil_water():
    print("开始煮⽔...")
    time.sleep(5) # 模拟阻塞等待5秒
    print("⽔开了!")
def send_message():
    print("开始发短信...")
    time.sleep(2) # 模拟阻塞等待2秒
    print("短信发送成功!")
# 主程序
def main():
    boil_water() # 先花5秒煮⽔,期间什么也不能做
    send_message() # ⽔开后再花2秒发短信
main()

但是此任务效率低下,在等待的时候却不能做send_message任务。

我们使用异步方式:asyncio,协程和事件循环。

协程,作为一种轻量级的并发编程模型,可以被视为用户态的轻量级线程。与传统线程相比,它的优势在于其调度完全由用户控件掌握,避免了操作系统内核的频繁介入,从而降低了上下文切换的开销。

协程的切换由程序员和编程语言控制,程序员决定在何时暂停或恢复协程。协程是一个特殊的函数,它可以暂停并在稍后恢复执行。它用async def定义,并在哪个需要暂停的地方用await。

async def boil_water():
    print("开始煮水...")
    # 核心修改:使用 asyncio.sleep 而不是 time.sleep
    await asyncio.sleep(5)
    print("水开了!")


async def send_message():
    print("开始发短信...")
    # 核心修改:使用 asyncio.sleep
    await asyncio.sleep(2)
    print("短信发送成功!")

这是两个协程。

什么是事件循环?

它是asynico的核心,你可以把它看成一个总调度员或一个待办事项的管理员。

它的工作流程简单:

1)它维护着一个人物列表。

2)它不断循环检查每个任务:

a.如果任务处于等待I/O状态,就暂停它,立刻就去执行下一个就绪任务。

b.如果任务的等待时间已经到了或者I/O操作完成了,时间就恢复执行这个任务。

async def main():
    start_time = time.time()

    # create_task 会立即把任务丢进事件循环并开始执行
    task1 = asyncio.create_task(boil_water())
    task2 = asyncio.create_task(send_message())

    # 等待两个任务完成
    await task1
    await task2

    end_time = time.time()
    print(f"总共耗时: {end_time - start_time:.2f} 秒")


# 它负责创建事件循环,并将第一个协程放入其中运行
asyncio.run(main())

总代码:

import asyncio
import time  # 仅用于对比或记录总耗时


async def boil_water():
    print("开始煮水...")
    # 核心修改:使用 asyncio.sleep 而不是 time.sleep
    await asyncio.sleep(5)
    print("水开了!")


async def send_message():
    print("开始发短信...")
    # 核心修改:使用 asyncio.sleep
    await asyncio.sleep(2)
    print("短信发送成功!")


async def main():
    start_time = time.time()

    # create_task 会立即把任务丢进事件循环并开始执行
    task1 = asyncio.create_task(boil_water())
    task2 = asyncio.create_task(send_message())

    # 等待两个任务完成
    await task1
    await task2

    end_time = time.time()
    print(f"总共耗时: {end_time - start_time:.2f} 秒")


# 它负责创建事件循环,并将第一个协程放入其中运行
asyncio.run(main())

3.流式传输的使用

可以使用.astream()的方法,来异步生成流式响应的效果,这专为非阻塞工作流程而设计,可以在异步代码中使用它来实现相同的实时流式处理行为。代码如下:

from langchain_openai import ChatOpenAI
# 定义⼤模型
model = ChatOpenAI(model="gpt-4o-mini")
# 异步调⽤
async def async_stream():
    print("=== 异步调⽤ ===")
    async for chunk in model.astream("讲⼀个50字的笑话"):
         print(chunk.content, end="|", flush=True)
import asyncio
asyncio.run(async_stream())

4.使用StrOutputParser解析模型的输出

事实上,流式传输并不是聊天模型定义的能力,而是只要实现了Runnable接口的实例都必须具备的能力。

那么可以得出一个流式传输的结论:.astream()和stream()方法产生的块类型取决于正在流式传输的组件。例如,当前正在使用聊天模型的流式输出,返回的每个块都是一个AIMessageChunk,但是对于其他组件,块类型可能不同。

接下来,我们将使用StrOutputParser 来解析模型输出,它从AIMessageChunk中提取模型返回的令牌。代码如下:

from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
# 定义⼤模型
model = ChatOpenAI(model="gpt-4o-mini")
# 定义输出解析器
parser = StrOutputParser()
# 定义链
chain = model | parser
for chunk in chain.stream("写⼀段关于爱情的歌词,需要5句话"):
    print(chunk, end="|", flush=True)

5.自定义流式输出解析器

若我们希望修改上面的输出格式,将输出改为一句话一句话的输出,同时保留流式处理功能。那我们需要在链中使用生成器函数,即可完成自定义流式输出的能力。

from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from typing import Iterator,List
model=ChatOpenAI(model="gpt-4o-mini")
parser=StrOutputParser()
def split_into_list(input:Iterator[str])->Iterator[str]:
    buffer=""
    for chunk in input:
        buffer += chunk
        while "。" in buffer:
            stop_index=buffer.index("。")
            yield [buffer[:stop_index].strip()]#不结束函数先输出
            buffer=buffer[stop_index+1:]
    yield[buffer.strip()]
chain=model|parser|split_into_list
for chunk in chain.stream("写一份关于爱情的歌词,需要五句话每句话用句号分割"):
    print(chunk,end="|",flush=True)

6.深度探索流式传输

6.1SSE协议介绍

HTTP本身设计为无状态的请求-响应模式。无法做到服务器主动推送消息到客户端,通过Server-Sent Events(SSE)技术可实现流式传输,允许服务器主动向浏览器推送数据流。

SSE是一种基于HTTP的轻量级实时通信协议,浏览器可以通过内置的API接收并处理这些实时事件。

核心特点:

1)基于HTTP协议

2)单向通信机制:SSE仅支持服务器向客户端的单项数据推送,客户端通过HTTP建立连接后,服务器可持续发送数据流,但客户端无法通过同一连接向服务器发送数据。

3)自动重连机制:支持断线重连。

4)自定义消息类型:客户端发起请求后,服务器保持连接开放,响应头设置Content-Type,标识为事件流格式,持续推送事件流。

数据格式:

服务端向浏览器发送SSE格式需要设置必要的HTTP头信息:

Content-Type: text/event-stream;charset=utf-8
Connection: keep-alive

6.2LangChain流式传输与底层协议:

1.langchain-openai包通过集成OpenAI Python SDK,提供了一个HTTP的客户端。

2.因此支持LangChain向OpenAI的API发起调用请求。

3.若希望发起流式传输请求,则需要在请求中加入stream=True,向OpenAI说明以SSE协议进行流式返回。

4.LangChain接收OpenAI的SSE格式的响应。并将其转换为LangChain子封装的消息格式,如AIMessageChunk消息。这样就可以以统一的方式处理来自不同模型提供商(OpenAI,Anthropic等)

的流式响应。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐