LangChain入门(三)
若我们希望修改上面的输出格式,将输出改为一句话一句话的输出,同时保留流式处理功能。那我们需要在链中使用生成器函数,即可完成自定义流式输出的能力。")yield [buffer[:stop_index].strip()]#不结束函数先输出for chunk in chain.stream("写一份关于爱情的歌词,需要五句话每句话用句号分割"):HTTP本身设计为无状态的请求-响应模式。
本文主要介绍流式传输。
流式传输对于使基于LLM的应用程序能够响应用户至关重要,它显著改善用户体验。
1.stream()同步传输
在LangChain聊天模型,可以使用.stream()的方法,来同步生成流式响应的效果。
聊天模型的.stream()方法返回一个迭代器。该迭代器在生成输出时同步产生输出消息块。可以使用for循环实时处理每个块。代码如下:
from langchain_openai import ChatOpenAI
model=ChatOpenAI(model="gpt-4o-mini")
chunks=[]
for chunk in model.stream("讲一个笑话"):
chunks.append(chunk)
print(chunk.content,end="|",flush=True)
2.astream()异步传输
2.1异步相关概念
比喻:当需要洗盘子的时候可以同时烧水。
同步方式:做事情一件一件来。
import time
def boil_water():
print("开始煮⽔...")
time.sleep(5) # 模拟阻塞等待5秒
print("⽔开了!")
def send_message():
print("开始发短信...")
time.sleep(2) # 模拟阻塞等待2秒
print("短信发送成功!")
# 主程序
def main():
boil_water() # 先花5秒煮⽔,期间什么也不能做
send_message() # ⽔开后再花2秒发短信
main()
但是此任务效率低下,在等待的时候却不能做send_message任务。
我们使用异步方式:asyncio,协程和事件循环。
协程,作为一种轻量级的并发编程模型,可以被视为用户态的轻量级线程。与传统线程相比,它的优势在于其调度完全由用户控件掌握,避免了操作系统内核的频繁介入,从而降低了上下文切换的开销。
协程的切换由程序员和编程语言控制,程序员决定在何时暂停或恢复协程。协程是一个特殊的函数,它可以暂停并在稍后恢复执行。它用async def定义,并在哪个需要暂停的地方用await。
async def boil_water():
print("开始煮水...")
# 核心修改:使用 asyncio.sleep 而不是 time.sleep
await asyncio.sleep(5)
print("水开了!")
async def send_message():
print("开始发短信...")
# 核心修改:使用 asyncio.sleep
await asyncio.sleep(2)
print("短信发送成功!")
这是两个协程。
什么是事件循环?
它是asynico的核心,你可以把它看成一个总调度员或一个待办事项的管理员。
它的工作流程简单:
1)它维护着一个人物列表。
2)它不断循环检查每个任务:
a.如果任务处于等待I/O状态,就暂停它,立刻就去执行下一个就绪任务。
b.如果任务的等待时间已经到了或者I/O操作完成了,时间就恢复执行这个任务。
async def main():
start_time = time.time()
# create_task 会立即把任务丢进事件循环并开始执行
task1 = asyncio.create_task(boil_water())
task2 = asyncio.create_task(send_message())
# 等待两个任务完成
await task1
await task2
end_time = time.time()
print(f"总共耗时: {end_time - start_time:.2f} 秒")
# 它负责创建事件循环,并将第一个协程放入其中运行
asyncio.run(main())
总代码:
import asyncio
import time # 仅用于对比或记录总耗时
async def boil_water():
print("开始煮水...")
# 核心修改:使用 asyncio.sleep 而不是 time.sleep
await asyncio.sleep(5)
print("水开了!")
async def send_message():
print("开始发短信...")
# 核心修改:使用 asyncio.sleep
await asyncio.sleep(2)
print("短信发送成功!")
async def main():
start_time = time.time()
# create_task 会立即把任务丢进事件循环并开始执行
task1 = asyncio.create_task(boil_water())
task2 = asyncio.create_task(send_message())
# 等待两个任务完成
await task1
await task2
end_time = time.time()
print(f"总共耗时: {end_time - start_time:.2f} 秒")
# 它负责创建事件循环,并将第一个协程放入其中运行
asyncio.run(main())
3.流式传输的使用
可以使用.astream()的方法,来异步生成流式响应的效果,这专为非阻塞工作流程而设计,可以在异步代码中使用它来实现相同的实时流式处理行为。代码如下:
from langchain_openai import ChatOpenAI
# 定义⼤模型
model = ChatOpenAI(model="gpt-4o-mini")
# 异步调⽤
async def async_stream():
print("=== 异步调⽤ ===")
async for chunk in model.astream("讲⼀个50字的笑话"):
print(chunk.content, end="|", flush=True)
import asyncio
asyncio.run(async_stream())
4.使用StrOutputParser解析模型的输出
事实上,流式传输并不是聊天模型定义的能力,而是只要实现了Runnable接口的实例都必须具备的能力。
那么可以得出一个流式传输的结论:.astream()和stream()方法产生的块类型取决于正在流式传输的组件。例如,当前正在使用聊天模型的流式输出,返回的每个块都是一个AIMessageChunk,但是对于其他组件,块类型可能不同。
接下来,我们将使用StrOutputParser 来解析模型输出,它从AIMessageChunk中提取模型返回的令牌。代码如下:
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
# 定义⼤模型
model = ChatOpenAI(model="gpt-4o-mini")
# 定义输出解析器
parser = StrOutputParser()
# 定义链
chain = model | parser
for chunk in chain.stream("写⼀段关于爱情的歌词,需要5句话"):
print(chunk, end="|", flush=True)
5.自定义流式输出解析器
若我们希望修改上面的输出格式,将输出改为一句话一句话的输出,同时保留流式处理功能。那我们需要在链中使用生成器函数,即可完成自定义流式输出的能力。
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from typing import Iterator,List
model=ChatOpenAI(model="gpt-4o-mini")
parser=StrOutputParser()
def split_into_list(input:Iterator[str])->Iterator[str]:
buffer=""
for chunk in input:
buffer += chunk
while "。" in buffer:
stop_index=buffer.index("。")
yield [buffer[:stop_index].strip()]#不结束函数先输出
buffer=buffer[stop_index+1:]
yield[buffer.strip()]
chain=model|parser|split_into_list
for chunk in chain.stream("写一份关于爱情的歌词,需要五句话每句话用句号分割"):
print(chunk,end="|",flush=True)
6.深度探索流式传输
6.1SSE协议介绍
HTTP本身设计为无状态的请求-响应模式。无法做到服务器主动推送消息到客户端,通过Server-Sent Events(SSE)技术可实现流式传输,允许服务器主动向浏览器推送数据流。
SSE是一种基于HTTP的轻量级实时通信协议,浏览器可以通过内置的API接收并处理这些实时事件。

核心特点:
1)基于HTTP协议
2)单向通信机制:SSE仅支持服务器向客户端的单项数据推送,客户端通过HTTP建立连接后,服务器可持续发送数据流,但客户端无法通过同一连接向服务器发送数据。
3)自动重连机制:支持断线重连。
4)自定义消息类型:客户端发起请求后,服务器保持连接开放,响应头设置Content-Type,标识为事件流格式,持续推送事件流。
数据格式:
服务端向浏览器发送SSE格式需要设置必要的HTTP头信息:
Content-Type: text/event-stream;charset=utf-8
Connection: keep-alive
6.2LangChain流式传输与底层协议:
1.langchain-openai包通过集成OpenAI Python SDK,提供了一个HTTP的客户端。
2.因此支持LangChain向OpenAI的API发起调用请求。
3.若希望发起流式传输请求,则需要在请求中加入stream=True,向OpenAI说明以SSE协议进行流式返回。
4.LangChain接收OpenAI的SSE格式的响应。并将其转换为LangChain子封装的消息格式,如AIMessageChunk消息。这样就可以以统一的方式处理来自不同模型提供商(OpenAI,Anthropic等)
的流式响应。
更多推荐



所有评论(0)