python 流式请求(基于 httpx)
python 流式响应 StreamingResponse 和 EventSourceResponse

1. 流式输出概述

流式响应在大模型服务中可大大提高用户体验,在 Python 中主要有两种方式实现流式响应,即 fastapi 的 StreamingResponseSEE 模块的 EventSourceResponse,既然两者都可以实现流式响应,那么我们在实际应用中应该如何选择呢?

2. StreamingResponse(基于 fastapi)

参考:
https://apifox.com/apiskills/how-to-use-fastapi-streamingresponse/

2.1. 安装必要的库

pip install fastapi

2.2. 使用 StreamingResponse

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
 
app = FastAPI()
 
# 示例1:实时日志流(纯文本流,无结构要求)
async def log_generator():
    for i in range(5):
        yield f"Log entry {i}\n"
        await asyncio.sleep(1)
 
@app.get("/logs_stream")
async def stream_logs():
    return StreamingResponse(
        log_generator(),
        media_type="text/plain"
    )
 
# 示例2:大文件分块下载(二进制)
async def file_chunker(file_path: str):
    with open(file_path, "rb") as f:
        while chunk := f.read(1024*1024):  # 1MB chunks
            yield chunk
 
@app.get("/download-large-file")
async def download_large_file():
    return StreamingResponse(
        file_chunker("bigfile.zip"),
        media_type="application/octet-stream",
        headers={"Content-Disposition": "attachment; filename=bigfile.zip"}
    )

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)

3. EventSourceResponse(基于 SSE)

参考:
https://blog.csdn.net/xiaodoudouer/article/details/143906428

SSE 的数据流由一系列的字段组成,每个字段都以键值对的形式出现,字段之间用换行符分隔。

3.1. 安装必要的库

首先,确保已经安装了 sse-starlette 库,因为 EventSourceResponse 是该库提供的一个组件。

pip install sse-starlette

3.2. 使用 EventSourceResponse

在路由处理函数中,将生成器传递给 EventSourceResponse 来创建一个 SSE 响应对象。当客户端请求该路由时,服务器将保持连接打开,并根据生成器的输出向客户端发送事件:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
import asyncio

from datetime import datetime
import time
import json

app = FastAPI()

# 定义事件流生成器
async def event_stream():
    while True:
        timestamp = time.time()
        readable_time = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
        data = {"message": "Hello, world!", "timestamp": readable_time}

        yield f"{json.dumps(data)}"
        await asyncio.sleep(1)  # 每秒发送一次数据

@app.get("/events", response_class=StreamingResponse)
async def get_events():
    return EventSourceResponse(event_stream(), media_type="text/event-stream")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)

4. 异同对比

参考:
https://blog.csdn.net/wamp0001/article/details/147458595
https://blog.csdn.net/xiaodoudouer/article/details/143906428

4.1. 核心相同点

4.1.1. 流式传输能力

  • 均支持异步生成器 (async generator),实现数据分块(chunked)传输。
  • 适用于需要实时、持续下发数据的场景(如实时监控、大文件下载、消息推送)。

4.1.2. 长连接特性

  • 保持 HTTP 连接长时间开放,持续发送数据,而非一次性返回完整响应。
  • 可在客户端和服务器之间建立持久化通信通道。

4.1.3. 异步兼容性

  • 完美适配 FastAPI 异步框架,能够结合 await 和异步 I/O 操作。
  • 避免阻塞事件循环,支持高并发场景。

4.2. 核心不同点

4.2.1. 用途

  • StreamingResponse:FastAPI 提供的一个通用类。通用性强,适用于多种流式数据传输场景,如大文件传输、实时日志、动态生成的数据等。
from fastapi.responses import StreamingResponse
  • EventSourceResponse:sse-starlette 库提供的一个类。专门用于实现SSE(Server-Sent Events),适用于服务器向客户端推送实时更新的场景,如实时通知、股票价格更新等。
from sse_starlette.sse import EventSourceResponse

4.2.2. 响应头

  • StreamingResponse:需要手动设置响应头,如 Content-Type 和 Cache-Control。
  • EventSourceResponse:自动设置响应头,包括 Content-Type: text/event-streamCache-Control: no-cache,确保数据不会被缓存。

4.2.3. 数据格式

  • StreamingResponse:可以处理任何形式的数据,只要数据源是可迭代的。
  • EventSourceResponse:生成的数据必须符合 SSE 的格式要求,即每个事件由 data:字段开头,以两个换行符 \n\n 结束

4.2.4. 使用场景

  • StreamingResponse:适用于需要流式传输大量数据的场景,如视频流、文件下载等。
  • EventSourceResponse:适用于需要服务器向客户端推送实时更新的场景,如实时通知、股票价格更新等。

4.2.5. 客户端响应

  • StreamingResponse:客户端可以使用普通的 HTTP 请求来下载文件,例如使用 fetch 或 axios。
  • EventSourceResponse:客户端可以使用 EventSource 对象来接收 SSE 事件。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐