本文适合谁:了解 Java Iterator 或 Stream API 的工程师,想理解 Python 生成器如何处理流式 LLM 输出和大数据集的开发者。读完本篇,你能实现 LLM 流式输出处理,并构建内存高效的数据处理流水线。

处理大规模数据时,一次性把所有数据加载进内存是最常见的性能问题。读取一个 2GB 的日志文件,直接 f.readlines() 会把整个文件塞进内存。调用 LLM 流式接口时,如果等待所有 token 返回再处理,用户体验会很差。Python 的迭代器和生成器就是解决这类问题的工具:按需生产数据,用多少取多少。

1.1 迭代器协议

在这里插入图片描述

列表全量加载与生成器按需生成的内存使用对比

Python 的 for 循环背后有一套协议(约定好的接口规则):只要一个对象实现了 __iter__(返回迭代器自身)和 __next__(返回下一个元素)方法,就可以被 for 循环遍历。这套协议就是迭代器协议。

# 手动实现一个迭代器,理解底层机制
class CountDown:
    """倒计时迭代器:从 n 倒数到 1"""

    def __init__(self, start: int):
        self.current = start

    def __iter__(self):
        # 返回迭代器对象本身,让 for 循环能够调用 __next__
        return self

    def __next__(self):
        if self.current <= 0:
            # 抛出 StopIteration 告诉 for 循环:数据耗尽了
            raise StopIteration
        value = self.current
        self.current -= 1
        return value

# for 循环会自动处理 StopIteration,无需手动捕获
for n in CountDown(5):
    print(n)  # 5 4 3 2 1

# 手动驱动迭代器,等价于 for 循环的内部行为
counter = CountDown(3)
it = iter(counter)        # 调用 __iter__
print(next(it))           # 3,调用 __next__
print(next(it))           # 2
print(next(it))           # 1
# print(next(it))         # StopIteration

手动实现迭代器类需要维护状态(self.current),代码繁琐。生成器提供了更简洁的方式。

1.2 生成器函数:yield 的本质

生成器函数是包含 yield 语句的普通函数。调用生成器函数不会立即执行函数体,而是返回一个生成器对象。每次调用 next() 时,函数从上次 yield 的地方继续执行,直到遇到下一个 yield

def count_down(start: int):
    """用生成器重写 CountDown,代码量减少一半"""
    current = start
    while current > 0:
        yield current      # 暂停执行,返回当前值给调用方
        current -= 1       # 调用方调用 next() 后,从这里继续执行

# 用法完全一样
for n in count_down(5):
    print(n)  # 5 4 3 2 1

生成器函数 vs 普通函数的核心差异:

def regular_squares(n: int) -> list[int]:
    """普通函数:一次性计算所有结果,全部存入内存"""
    result = []
    for i in range(n):
        result.append(i ** 2)
    return result  # n=10_000_000 时返回一个 ~76MB 的列表

def gen_squares(n: int):
    """生成器函数:每次只计算一个值,内存占用固定"""
    for i in range(n):
        yield i ** 2  # 每次只保存一个整数

# 内存对比(n=10_000_000)
import sys
squares_list = regular_squares(100)
squares_gen = gen_squares(100)

print(sys.getsizeof(squares_list))  # 920 字节(100个元素的列表)
print(sys.getsizeof(squares_gen))   # 208 字节(生成器对象,固定大小)
# 当 n=10_000_000 时,列表约 76MB,生成器依然是 208 字节
特性 普通函数(返回 list) 生成器函数(yield)
执行时机 调用时立即执行完毕 调用时返回生成器对象,按需执行
内存占用 O(n),全部数据在内存 O(1),只保存当前状态
可重复遍历 可以(列表可多次遍历) 不可以(消耗一次即耗尽)
适用场景 数据量小,需要随机访问 数据量大,流式处理

1.3 生成器表达式

生成器表达式是生成器函数的简写形式,语法和列表推导式几乎一样,区别是用圆括号而不是方括号:

# 列表推导式:立即生成所有数据
squares_list = [x ** 2 for x in range(1_000_000)]  # 约 8MB

# 生成器表达式:延迟计算,几乎不占内存
squares_gen = (x ** 2 for x in range(1_000_000))   # 约 200 字节

# 作为函数参数时,可以省略外层括号
total = sum(x ** 2 for x in range(1_000_000))       # 直接传入,不用多一对括号

当生成器表达式作为函数唯一参数时,外层圆括号可以省略,这是 Python 常见的习惯写法。

1.4 AI 开发中的典型场景

1.4.1 流式输出逐 token 处理

LLM 的流式 API 本质上就是一个迭代器,每次产出一个 token 或一个 chunk。处理流式输出时,生成器是最自然的抽象:

from openai import OpenAI

client = OpenAI()

def stream_llm_response(prompt: str):
    """将 OpenAI 流式响应封装成生成器,逐 chunk 产出文本"""
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True,  # 开启流式返回
    )
    for chunk in response:
        # chunk.choices[0].delta.content 可能是 None(流结束时)
        content = chunk.choices[0].delta.content
        if content is not None:
            yield content  # 每次产出一个 token 或短文本片段

# 实时打印,用户看到文字逐渐出现
for token in stream_llm_response("用三句话介绍 Python"):
    print(token, end="", flush=True)  # flush=True 确保立即输出,不缓冲

print()  # 换行

1.4.2 大文件逐行读取

from pathlib import Path
from typing import Generator

def read_jsonl(file_path: str) -> Generator[dict, None, None]:
    """
    逐行读取 JSONL 文件(JSONL:每行一个完整 JSON 对象的文本格式,常用于大规模训练数据)
    训练数据集动辄几 GB,必须流式处理
    """
    import json
    with open(file_path, encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if line:  # 跳过空行
                yield json.loads(line)

# 处理 5GB 的训练数据集,内存占用依然很低
for record in read_jsonl("train_data.jsonl"):
    # 每次只有一条记录在内存中
    process(record)

1.4.3 批量数据处理

调用 LLM API 时,通常需要把数据分批处理(避免超过 rate limit 或 API 限制):

from typing import TypeVar, Generator

T = TypeVar("T")

def batched(iterable, batch_size: int) -> Generator[list, None, None]:
    """
    将任意可迭代对象按批次分割
    输入可以是生成器(无限流),无需预先知道总量
    """
    batch = []
    for item in iterable:
        batch.append(item)
        if len(batch) >= batch_size:
            yield batch
            batch = []  # 清空,开始下一批
    if batch:             # 处理最后一批(可能不满 batch_size)
        yield batch

# 从文件读取 -> 分批 -> 调用 API,全程流式,内存恒定
texts = read_jsonl("documents.jsonl")  # 生成器,尚未读取任何数据
for batch in batched(texts, batch_size=20):
    # batch 是最多 20 条记录的列表
    embeddings = get_embeddings(batch)  # 批量调用 Embedding API
    save_to_db(embeddings)

1.5 itertools:生成器工具箱

itertools 是标准库里专门为迭代器设计的工具集,所有函数都是惰性的(lazy,即延迟计算:不到用的时候不计算,按需产出数据),不会提前计算。

import itertools

# chain:把多个可迭代对象串联成一个
train_data = read_jsonl("train.jsonl")
val_data   = read_jsonl("val.jsonl")
all_data   = itertools.chain(train_data, val_data)  # 无缝拼接,不复制数据

# islice:从迭代器中截取前 n 个元素
# 当只需要调试前100条时,不用读完整个文件
sample = list(itertools.islice(read_jsonl("train.jsonl"), 100))

# batched(Python 3.12+):官方的分批工具,等价于上面手写的 batched
from itertools import batched as std_batched
for batch in std_batched(range(100), 10):
    print(list(batch))  # [0,1,...,9], [10,...,19], ...

# Python 3.11 及更早版本,用 islice 实现等价效果
def batched_compat(iterable, n: int):
    it = iter(iterable)
    while batch := list(itertools.islice(it, n)):
        yield batch

# takewhile:当条件为真时继续,一旦为假就停止
# 适合处理带哨兵值的流式数据
numbers = iter([1, 3, 5, 2, 4, 6])
odd_prefix = list(itertools.takewhile(lambda x: x % 2 == 1, numbers))
# [1, 3, 5],遇到第一个偶数 2 就停止

# count + islice 组合:生成固定数量的序号
ids = list(itertools.islice(itertools.count(1000), 5))
# [1000, 1001, 1002, 1003, 1004]

1.6 生成器的完整数据处理流水线

数据源
(文件 / API / 数据库)

read_jsonl()
生成器:逐行读取

filter()
生成器:过滤无效数据

map()
生成器:数据转换

batched()
生成器:分批

LLM API 调用
每批处理

结果存储

把多个生成器串联成流水线,每个环节都是惰性的:

import json
import itertools
from typing import Iterator

def read_jsonl(path: str) -> Iterator[dict]:
    with open(path) as f:
        for line in f:
            if line.strip():
                yield json.loads(line)

def filter_valid(records: Iterator[dict]) -> Iterator[dict]:
    """过滤掉缺少必要字段的记录"""
    for record in records:
        if record.get("text") and len(record["text"]) > 10:
            yield record

def extract_text(records: Iterator[dict]) -> Iterator[str]:
    """只取 text 字段"""
    for record in records:
        yield record["text"].strip()

def process_large_file(path: str, batch_size: int = 32):
    """
    完整流水线:读取 -> 过滤 -> 提取 -> 分批处理
    整个过程内存占用约等于 batch_size 条记录
    """
    records = read_jsonl(path)
    valid   = filter_valid(records)
    texts   = extract_text(valid)

    for batch in itertools.batched(texts, batch_size):
        yield batch  # 每次产出一批文本,供调用方处理

1.7 与 Java Iterator 的对比

Java 有类似的 Iterator<T> 接口,但在使用上有显著差异:

维度 Python 生成器 Java Iterator
定义方式 函数里写 yield,自动变成生成器 实现 Iterator<T> 接口,手动维护状态
代码量 极少 较多(需要 hasNextnext 方法)
状态管理 由 Python 运行时自动保存函数栈帧 开发者手动维护(indexcursor 等字段)
惰性求值 天然惰性 需要 Stream.of(...).lazy() 或自己实现
Stream 支持 所有生成器都可用于 forsumlist Java 8+ 的 Stream API
可组合性 itertools 标准库 Stream.filter().map() 链式调用

Java 8+ 引入的 Stream API 在概念上和 Python 生成器最接近,都是惰性计算。区别在于 Python 生成器更轻量,不需要额外 API,一个 yield 关键字就够了。

1.8 异步生成器:在 FastAPI 中实现流式响应

LLM 流式输出在 FastAPI 服务中的标准实现是异步生成器(async generator)。异步生成器结合了 async/awaityield,让流式处理与 asyncio 完美配合:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
from typing import AsyncIterator

app = FastAPI()
client = AsyncOpenAI()

async def stream_llm_response(prompt: str) -> AsyncIterator[str]:
    """
    异步生成器:逐 token yield LLM 输出。
    与普通生成器的区别:async def + yield,调用方用 async for。
    """
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True,  # 开启流式返回
    )
    async for chunk in response:
        content = chunk.choices[0].delta.content
        if content is not None:
            yield content  # 每次产出一个 token


@app.get("/chat-stream")
async def chat_stream(message: str) -> StreamingResponse:
    """
    FastAPI 流式响应端点。
    StreamingResponse 会持续从生成器取数据并发送给客户端。
    """
    async def generate():
        async for token in stream_llm_response(message):
            # 格式:SSE(Server-Sent Events)格式,浏览器可直接消费
            yield f"data: {token}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache"}
    )


# 客户端测试(命令行):
# curl -N "http://localhost:8000/chat-stream?message=你好"
# 预期输出:
# data: 你
# data: 好
# data: ,
# data: 有什么
# data: 可以
# data: 帮助
# data: 你的
# data: ?
# data: [DONE]

为什么用生成器而不是一次性返回:

# 方式一:等待完整响应(用户体验差)
async def slow_endpoint(message: str) -> dict:
    full_response = ""
    async for chunk in client.chat.completions.create(stream=True, ...):
        full_response += chunk.choices[0].delta.content or ""
    return {"reply": full_response}  # 等 10 秒后一次性返回

# 方式二:流式响应(用户立刻看到文字)
async def fast_endpoint(message: str) -> StreamingResponse:
    # 生成器每产出一个 token,浏览器就立刻显示
    # 用户感受到的延迟是首个 token 的时间(约 0.5 秒),不是完整响应时间
    return StreamingResponse(stream_llm_response(message), ...)

1.9 小结

概念 关键字/语法 适用场景
迭代器 __iter__ + __next__ 理解 for 循环底层
同步生成器 def + yield 大文件读取、批量处理
生成器表达式 (x for x in ...) 简单转换,内存节省
异步生成器 async def + yield LLM 流式输出、SSE 接口
itertools chain, islice, batched 流水线组合

在 AI 开发中,生成器的使用场景非常集中:处理流式 LLM 输出、读取大型训练数据集、构建批量调用流水线。核心原则始终是:数据量超过内存的 10%,就应该考虑流式处理;LLM 响应需要实时展示,就应该用流式生成器。

itertools 是标准库提供的生成器工具集,chainislicebatched(3.12+)是最常用的三个函数,熟悉它们能避免很多重复造轮子。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐