2.9 Python 生成器与迭代器:内存高效的数据处理
摘要(150字): Python生成器通过yield实现按需生产数据,避免一次性加载大文件或LLM流式输出的内存问题。迭代器协议(__iter__和__next__)是for循环的底层机制,生成器函数和表达式可简化实现。典型应用包括:逐token处理LLM流式响应(如OpenAI API)、逐行读取GB级JSONL文件、分批处理数据(如控制API调用频率)。生成器内存占用恒定(O(1)),适合流式
本文适合谁:了解 Java Iterator 或 Stream API 的工程师,想理解 Python 生成器如何处理流式 LLM 输出和大数据集的开发者。读完本篇,你能实现 LLM 流式输出处理,并构建内存高效的数据处理流水线。
处理大规模数据时,一次性把所有数据加载进内存是最常见的性能问题。读取一个 2GB 的日志文件,直接 f.readlines() 会把整个文件塞进内存。调用 LLM 流式接口时,如果等待所有 token 返回再处理,用户体验会很差。Python 的迭代器和生成器就是解决这类问题的工具:按需生产数据,用多少取多少。
1.1 迭代器协议

列表全量加载与生成器按需生成的内存使用对比
Python 的 for 循环背后有一套协议(约定好的接口规则):只要一个对象实现了 __iter__(返回迭代器自身)和 __next__(返回下一个元素)方法,就可以被 for 循环遍历。这套协议就是迭代器协议。
# 手动实现一个迭代器,理解底层机制
class CountDown:
"""倒计时迭代器:从 n 倒数到 1"""
def __init__(self, start: int):
self.current = start
def __iter__(self):
# 返回迭代器对象本身,让 for 循环能够调用 __next__
return self
def __next__(self):
if self.current <= 0:
# 抛出 StopIteration 告诉 for 循环:数据耗尽了
raise StopIteration
value = self.current
self.current -= 1
return value
# for 循环会自动处理 StopIteration,无需手动捕获
for n in CountDown(5):
print(n) # 5 4 3 2 1
# 手动驱动迭代器,等价于 for 循环的内部行为
counter = CountDown(3)
it = iter(counter) # 调用 __iter__
print(next(it)) # 3,调用 __next__
print(next(it)) # 2
print(next(it)) # 1
# print(next(it)) # StopIteration
手动实现迭代器类需要维护状态(self.current),代码繁琐。生成器提供了更简洁的方式。
1.2 生成器函数:yield 的本质
生成器函数是包含 yield 语句的普通函数。调用生成器函数不会立即执行函数体,而是返回一个生成器对象。每次调用 next() 时,函数从上次 yield 的地方继续执行,直到遇到下一个 yield。
def count_down(start: int):
"""用生成器重写 CountDown,代码量减少一半"""
current = start
while current > 0:
yield current # 暂停执行,返回当前值给调用方
current -= 1 # 调用方调用 next() 后,从这里继续执行
# 用法完全一样
for n in count_down(5):
print(n) # 5 4 3 2 1
生成器函数 vs 普通函数的核心差异:
def regular_squares(n: int) -> list[int]:
"""普通函数:一次性计算所有结果,全部存入内存"""
result = []
for i in range(n):
result.append(i ** 2)
return result # n=10_000_000 时返回一个 ~76MB 的列表
def gen_squares(n: int):
"""生成器函数:每次只计算一个值,内存占用固定"""
for i in range(n):
yield i ** 2 # 每次只保存一个整数
# 内存对比(n=10_000_000)
import sys
squares_list = regular_squares(100)
squares_gen = gen_squares(100)
print(sys.getsizeof(squares_list)) # 920 字节(100个元素的列表)
print(sys.getsizeof(squares_gen)) # 208 字节(生成器对象,固定大小)
# 当 n=10_000_000 时,列表约 76MB,生成器依然是 208 字节
| 特性 | 普通函数(返回 list) | 生成器函数(yield) |
|---|---|---|
| 执行时机 | 调用时立即执行完毕 | 调用时返回生成器对象,按需执行 |
| 内存占用 | O(n),全部数据在内存 | O(1),只保存当前状态 |
| 可重复遍历 | 可以(列表可多次遍历) | 不可以(消耗一次即耗尽) |
| 适用场景 | 数据量小,需要随机访问 | 数据量大,流式处理 |
1.3 生成器表达式
生成器表达式是生成器函数的简写形式,语法和列表推导式几乎一样,区别是用圆括号而不是方括号:
# 列表推导式:立即生成所有数据
squares_list = [x ** 2 for x in range(1_000_000)] # 约 8MB
# 生成器表达式:延迟计算,几乎不占内存
squares_gen = (x ** 2 for x in range(1_000_000)) # 约 200 字节
# 作为函数参数时,可以省略外层括号
total = sum(x ** 2 for x in range(1_000_000)) # 直接传入,不用多一对括号
当生成器表达式作为函数唯一参数时,外层圆括号可以省略,这是 Python 常见的习惯写法。
1.4 AI 开发中的典型场景
1.4.1 流式输出逐 token 处理
LLM 的流式 API 本质上就是一个迭代器,每次产出一个 token 或一个 chunk。处理流式输出时,生成器是最自然的抽象:
from openai import OpenAI
client = OpenAI()
def stream_llm_response(prompt: str):
"""将 OpenAI 流式响应封装成生成器,逐 chunk 产出文本"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True, # 开启流式返回
)
for chunk in response:
# chunk.choices[0].delta.content 可能是 None(流结束时)
content = chunk.choices[0].delta.content
if content is not None:
yield content # 每次产出一个 token 或短文本片段
# 实时打印,用户看到文字逐渐出现
for token in stream_llm_response("用三句话介绍 Python"):
print(token, end="", flush=True) # flush=True 确保立即输出,不缓冲
print() # 换行
1.4.2 大文件逐行读取
from pathlib import Path
from typing import Generator
def read_jsonl(file_path: str) -> Generator[dict, None, None]:
"""
逐行读取 JSONL 文件(JSONL:每行一个完整 JSON 对象的文本格式,常用于大规模训练数据)
训练数据集动辄几 GB,必须流式处理
"""
import json
with open(file_path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if line: # 跳过空行
yield json.loads(line)
# 处理 5GB 的训练数据集,内存占用依然很低
for record in read_jsonl("train_data.jsonl"):
# 每次只有一条记录在内存中
process(record)
1.4.3 批量数据处理
调用 LLM API 时,通常需要把数据分批处理(避免超过 rate limit 或 API 限制):
from typing import TypeVar, Generator
T = TypeVar("T")
def batched(iterable, batch_size: int) -> Generator[list, None, None]:
"""
将任意可迭代对象按批次分割
输入可以是生成器(无限流),无需预先知道总量
"""
batch = []
for item in iterable:
batch.append(item)
if len(batch) >= batch_size:
yield batch
batch = [] # 清空,开始下一批
if batch: # 处理最后一批(可能不满 batch_size)
yield batch
# 从文件读取 -> 分批 -> 调用 API,全程流式,内存恒定
texts = read_jsonl("documents.jsonl") # 生成器,尚未读取任何数据
for batch in batched(texts, batch_size=20):
# batch 是最多 20 条记录的列表
embeddings = get_embeddings(batch) # 批量调用 Embedding API
save_to_db(embeddings)
1.5 itertools:生成器工具箱
itertools 是标准库里专门为迭代器设计的工具集,所有函数都是惰性的(lazy,即延迟计算:不到用的时候不计算,按需产出数据),不会提前计算。
import itertools
# chain:把多个可迭代对象串联成一个
train_data = read_jsonl("train.jsonl")
val_data = read_jsonl("val.jsonl")
all_data = itertools.chain(train_data, val_data) # 无缝拼接,不复制数据
# islice:从迭代器中截取前 n 个元素
# 当只需要调试前100条时,不用读完整个文件
sample = list(itertools.islice(read_jsonl("train.jsonl"), 100))
# batched(Python 3.12+):官方的分批工具,等价于上面手写的 batched
from itertools import batched as std_batched
for batch in std_batched(range(100), 10):
print(list(batch)) # [0,1,...,9], [10,...,19], ...
# Python 3.11 及更早版本,用 islice 实现等价效果
def batched_compat(iterable, n: int):
it = iter(iterable)
while batch := list(itertools.islice(it, n)):
yield batch
# takewhile:当条件为真时继续,一旦为假就停止
# 适合处理带哨兵值的流式数据
numbers = iter([1, 3, 5, 2, 4, 6])
odd_prefix = list(itertools.takewhile(lambda x: x % 2 == 1, numbers))
# [1, 3, 5],遇到第一个偶数 2 就停止
# count + islice 组合:生成固定数量的序号
ids = list(itertools.islice(itertools.count(1000), 5))
# [1000, 1001, 1002, 1003, 1004]
1.6 生成器的完整数据处理流水线
把多个生成器串联成流水线,每个环节都是惰性的:
import json
import itertools
from typing import Iterator
def read_jsonl(path: str) -> Iterator[dict]:
with open(path) as f:
for line in f:
if line.strip():
yield json.loads(line)
def filter_valid(records: Iterator[dict]) -> Iterator[dict]:
"""过滤掉缺少必要字段的记录"""
for record in records:
if record.get("text") and len(record["text"]) > 10:
yield record
def extract_text(records: Iterator[dict]) -> Iterator[str]:
"""只取 text 字段"""
for record in records:
yield record["text"].strip()
def process_large_file(path: str, batch_size: int = 32):
"""
完整流水线:读取 -> 过滤 -> 提取 -> 分批处理
整个过程内存占用约等于 batch_size 条记录
"""
records = read_jsonl(path)
valid = filter_valid(records)
texts = extract_text(valid)
for batch in itertools.batched(texts, batch_size):
yield batch # 每次产出一批文本,供调用方处理
1.7 与 Java Iterator 的对比
Java 有类似的 Iterator<T> 接口,但在使用上有显著差异:
| 维度 | Python 生成器 | Java Iterator |
|---|---|---|
| 定义方式 | 函数里写 yield,自动变成生成器 |
实现 Iterator<T> 接口,手动维护状态 |
| 代码量 | 极少 | 较多(需要 hasNext、next 方法) |
| 状态管理 | 由 Python 运行时自动保存函数栈帧 | 开发者手动维护(index、cursor 等字段) |
| 惰性求值 | 天然惰性 | 需要 Stream.of(...).lazy() 或自己实现 |
| Stream 支持 | 所有生成器都可用于 for、sum、list |
Java 8+ 的 Stream API |
| 可组合性 | itertools 标准库 |
Stream.filter().map() 链式调用 |
Java 8+ 引入的 Stream API 在概念上和 Python 生成器最接近,都是惰性计算。区别在于 Python 生成器更轻量,不需要额外 API,一个 yield 关键字就够了。
1.8 异步生成器:在 FastAPI 中实现流式响应
LLM 流式输出在 FastAPI 服务中的标准实现是异步生成器(async generator)。异步生成器结合了 async/await 和 yield,让流式处理与 asyncio 完美配合:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
from typing import AsyncIterator
app = FastAPI()
client = AsyncOpenAI()
async def stream_llm_response(prompt: str) -> AsyncIterator[str]:
"""
异步生成器:逐 token yield LLM 输出。
与普通生成器的区别:async def + yield,调用方用 async for。
"""
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True, # 开启流式返回
)
async for chunk in response:
content = chunk.choices[0].delta.content
if content is not None:
yield content # 每次产出一个 token
@app.get("/chat-stream")
async def chat_stream(message: str) -> StreamingResponse:
"""
FastAPI 流式响应端点。
StreamingResponse 会持续从生成器取数据并发送给客户端。
"""
async def generate():
async for token in stream_llm_response(message):
# 格式:SSE(Server-Sent Events)格式,浏览器可直接消费
yield f"data: {token}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache"}
)
# 客户端测试(命令行):
# curl -N "http://localhost:8000/chat-stream?message=你好"
# 预期输出:
# data: 你
# data: 好
# data: ,
# data: 有什么
# data: 可以
# data: 帮助
# data: 你的
# data: ?
# data: [DONE]
为什么用生成器而不是一次性返回:
# 方式一:等待完整响应(用户体验差)
async def slow_endpoint(message: str) -> dict:
full_response = ""
async for chunk in client.chat.completions.create(stream=True, ...):
full_response += chunk.choices[0].delta.content or ""
return {"reply": full_response} # 等 10 秒后一次性返回
# 方式二:流式响应(用户立刻看到文字)
async def fast_endpoint(message: str) -> StreamingResponse:
# 生成器每产出一个 token,浏览器就立刻显示
# 用户感受到的延迟是首个 token 的时间(约 0.5 秒),不是完整响应时间
return StreamingResponse(stream_llm_response(message), ...)
1.9 小结
| 概念 | 关键字/语法 | 适用场景 |
|---|---|---|
| 迭代器 | __iter__ + __next__ |
理解 for 循环底层 |
| 同步生成器 | def + yield |
大文件读取、批量处理 |
| 生成器表达式 | (x for x in ...) |
简单转换,内存节省 |
| 异步生成器 | async def + yield |
LLM 流式输出、SSE 接口 |
| itertools | chain, islice, batched |
流水线组合 |
在 AI 开发中,生成器的使用场景非常集中:处理流式 LLM 输出、读取大型训练数据集、构建批量调用流水线。核心原则始终是:数据量超过内存的 10%,就应该考虑流式处理;LLM 响应需要实时展示,就应该用流式生成器。
itertools 是标准库提供的生成器工具集,chain、islice、batched(3.12+)是最常用的三个函数,熟悉它们能避免很多重复造轮子。
更多推荐


所有评论(0)