python 流式响应 StreamingResponse 和 EventSourceResponse
·
python 流式请求(基于 httpx)
python 流式响应 StreamingResponse 和 EventSourceResponse
文章目录
1. 流式输出概述
流式响应在大模型服务中可大大提高用户体验,在 Python 中主要有两种方式实现流式响应,即 fastapi 的 StreamingResponse 和 SEE 模块的 EventSourceResponse,既然两者都可以实现流式响应,那么我们在实际应用中应该如何选择呢?
2. StreamingResponse(基于 fastapi)
参考:
https://apifox.com/apiskills/how-to-use-fastapi-streamingresponse/
2.1. 安装必要的库
pip install fastapi
2.2. 使用 StreamingResponse
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
# 示例1:实时日志流(纯文本流,无结构要求)
async def log_generator():
for i in range(5):
yield f"Log entry {i}\n"
await asyncio.sleep(1)
@app.get("/logs_stream")
async def stream_logs():
return StreamingResponse(
log_generator(),
media_type="text/plain"
)
# 示例2:大文件分块下载(二进制)
async def file_chunker(file_path: str):
with open(file_path, "rb") as f:
while chunk := f.read(1024*1024): # 1MB chunks
yield chunk
@app.get("/download-large-file")
async def download_large_file():
return StreamingResponse(
file_chunker("bigfile.zip"),
media_type="application/octet-stream",
headers={"Content-Disposition": "attachment; filename=bigfile.zip"}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)
3. EventSourceResponse(基于 SSE)
参考:
https://blog.csdn.net/xiaodoudouer/article/details/143906428
SSE 的数据流由一系列的字段组成,每个字段都以键值对的形式出现,字段之间用换行符分隔。
3.1. 安装必要的库
首先,确保已经安装了 sse-starlette 库,因为 EventSourceResponse 是该库提供的一个组件。
pip install sse-starlette
3.2. 使用 EventSourceResponse
在路由处理函数中,将生成器传递给 EventSourceResponse 来创建一个 SSE 响应对象。当客户端请求该路由时,服务器将保持连接打开,并根据生成器的输出向客户端发送事件:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
import asyncio
from datetime import datetime
import time
import json
app = FastAPI()
# 定义事件流生成器
async def event_stream():
while True:
timestamp = time.time()
readable_time = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
data = {"message": "Hello, world!", "timestamp": readable_time}
yield f"{json.dumps(data)}"
await asyncio.sleep(1) # 每秒发送一次数据
@app.get("/events", response_class=StreamingResponse)
async def get_events():
return EventSourceResponse(event_stream(), media_type="text/event-stream")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)
4. 异同对比
参考:
https://blog.csdn.net/wamp0001/article/details/147458595
https://blog.csdn.net/xiaodoudouer/article/details/143906428
4.1. 核心相同点
4.1.1. 流式传输能力
- 均支持异步生成器 (async generator),实现数据分块(chunked)传输。
- 适用于需要实时、持续下发数据的场景(如实时监控、大文件下载、消息推送)。
4.1.2. 长连接特性
- 保持 HTTP 连接长时间开放,持续发送数据,而非一次性返回完整响应。
- 可在客户端和服务器之间建立持久化通信通道。
4.1.3. 异步兼容性
- 完美适配 FastAPI 异步框架,能够结合 await 和异步 I/O 操作。
- 避免阻塞事件循环,支持高并发场景。
4.2. 核心不同点
4.2.1. 用途
- StreamingResponse:FastAPI 提供的一个通用类。通用性强,适用于多种流式数据传输场景,如大文件传输、实时日志、动态生成的数据等。
from fastapi.responses import StreamingResponse
- EventSourceResponse:sse-starlette 库提供的一个类。专门用于实现SSE(Server-Sent Events),适用于服务器向客户端推送实时更新的场景,如实时通知、股票价格更新等。
from sse_starlette.sse import EventSourceResponse
4.2.2. 响应头
- StreamingResponse:需要手动设置响应头,如 Content-Type 和 Cache-Control。
- EventSourceResponse:自动设置响应头,包括 Content-Type: text/event-stream 和 Cache-Control: no-cache,确保数据不会被缓存。
4.2.3. 数据格式
- StreamingResponse:可以处理任何形式的数据,只要数据源是可迭代的。
- EventSourceResponse:生成的数据必须符合 SSE 的格式要求,即每个事件由 data:字段开头,以两个换行符 \n\n 结束。
4.2.4. 使用场景
- StreamingResponse:适用于需要流式传输大量数据的场景,如视频流、文件下载等。
- EventSourceResponse:适用于需要服务器向客户端推送实时更新的场景,如实时通知、股票价格更新等。
4.2.5. 客户端响应
- StreamingResponse:客户端可以使用普通的 HTTP 请求来下载文件,例如使用 fetch 或 axios。
- EventSourceResponse:客户端可以使用 EventSource 对象来接收 SSE 事件。
更多推荐
所有评论(0)