Python 异步迭代深度探索
本文系统探讨了Python异步编程的核心概念,重点分析了同步迭代与异步迭代的本质区别。文章指出async for是遍历异步迭代器的唯一语法,其关键在于__anext__()方法允许在I/O等待时让出CPU。通过类比厨师烧水的例子,阐释了异步I/O的"非阻塞"价值。同时强调真正的并发需要结合asyncio.create_task的任务分发机制,而非单纯依赖await。文章澄清了常
本文档记录了一次从同步迭代到异步并发的完整探索过程,以一系列由浅入深的问题为线索,旨在彻底澄清 Python 异步编程中的核心概念。
第零章:什么是异步迭代器 (Asynchronous Iterator)?
在开始比较之前,我们首先需要定义我们的主角。
定义: 一个对象如果实现了异步迭代协议 (Asynchronous Iteration Protocol),那么它就是一个异步可迭代对象。调用它的 __aiter__() 方法会返回一个异步迭代器。
这个协议要求实现以下两个核心的 async def 方法:
-
__aiter__(self):- 由
async for循环在开始时自动调用。 - 它必须返回一个异步迭代器对象。
- 由
-
__anext__(self):- 由
async for循环在每次迭代时自动await。 - 它负责异步地获取并返回下一个元素。
- 当没有更多元素时,它必须抛出
StopAsyncIteration异常。
- 由
“经典”实现示例
让我们用一个手写的类来创建一个异步迭代器,它会每隔 0.5 秒计数一次。
import asyncio
class AsyncCounter:
"""一个经典的异步迭代器类"""
def __init__(self, limit):
self.limit = limit
self.current = 0
async def __aiter__(self):
# 返回自身,因为它自己就是迭代器
return self
async def __anext__(self):
if self.current < self.limit:
# 模拟一个耗时的 I/O 操作。
# 这里的 await 是关键:它只能在 async def 方法中被使用。
# 一个普通的同步迭代器的 __next__ 方法内不能 await 任何东西。
await asyncio.sleep(0.5)
self.current += 1
return self.current
else:
# 迭代结束,抛出 StopAsyncIteration
raise StopAsyncIteration
async def main():
print("开始遍历异步迭代器...")
# async for 是唯一能与异步迭代器交互的语法
async for number in AsyncCounter(3):
print(f"拿到了数字: {number}")
# asyncio.run(main())
# 输出会每隔 0.5 秒打印一个数字。
这个 AsyncCounter 类通过手动实现 __aiter__ 和 __anext__,完整地展示了异步迭代器协议的原理。
第一章:本质区别 —— for 与 async for 的协议鸿沟
这是理解一切异步迭代的基础。for 和 async for 并非可以互换的工具,它们服务于两个完全不同的世界。
核心结论:async for 是唯一能够遍历“异步迭代器”的语法,而普通 for 只能遍历“同步迭代器”。
-
同步世界 (
for):- 协议:
__iter__()和__next__()。 - 流程:
for循环调用对象的__iter__()获得一个同步迭代器,然后反复调用其__next__()方法获取下一个元素。 - 限制:
__next__()是一个同步方法,它不能awaitI/O 操作。如果遇到 I/O,它只能阻塞整个程序。
- 协议:
-
异步世界 (
async for):- 协议:
__aiter__()和__anext__()。 - 流程:
async for循环调用对象的__aiter__()获得一个异步迭代器,然后反复await其__anext__()方法来获取下一个元素。 - 能力: 因为
__anext__是一个async def方法,所以它可以在内部await耗时的 I/O 操作(如网络请求、数据库查询),并在等待期间让出 CPU。
- 协议:
第二章:异步的意义 —— “非阻塞 I/O” 的价值
问题: 既然循环本身是串行的,异步的意义何在?
结论: 异步的核心价值不是让循环并发,而是在面对 I/O 等待时**“非阻塞” (Non-Blocking)** 的能力。
比喻:两位厨师烧水
- 同步厨师 (同步 I/O): 把水壶放上炉子,然后站在原地死等,直到水烧开。在此期间,厨房(程序)完全停滞。
- 异步厨师 (异步 I/O): 把水壶放上炉子,然后立刻转身去干别的事(切菜、洗盘子)。当水烧开时(I/O 完成),他再回来处理。
await 一个 I/O 操作,就相当于异步厨师转身去干别的事,这让程序在等待期间能保持响应或执行其他任务。
第三章:异步如何实现并发 —— “分发-收集”模型
问题: 既然 await 能让出 CPU,为何我的异步串行循环没有并发加速?
核心结论: await 只是创造了“让出 CPU”的机会。如果事件循环的任务队列中没有其他任务,CPU 在让出后也只能原地等待。真正的并发来自于**“串行创建,并发执行”**。
伪代码对比
-
异步串行 (性能差)
async for item in source: # await 让出 CPU,但没有其他任务可做,只能等 await process(item) -
异步并发 (性能好)
tasks = [] # 1. for 循环作为“分发器”,快速、串行地创建任务 for item in data_source: tasks.append(asyncio.create_task(process(item))) # 2. gather 作为“收集者”,并发地执行所有任务 await asyncio.gather(*tasks)
关键点
并发的发生需要两个条件:
- 等待点:
await在任务内部提供了暂停和让出 CPU 的机会。 - 并发任务:
asyncio.create_task在外部创造了多个可以让 CPU 在等待期间去切换执行的任务。
第四章:常见误区澄清
1. async for 本身的循环会并发吗?
绝对不会。 async for 循环本身是严格串行的。它必须完全结束第 N 次循环(包括获取元素和执行循环体),才能开始第 N+1 次循环。
2. asyncio.create_task 是多线程吗?
绝对不是。 asyncio 的核心是单线程并发。
asyncio.create_task(coro): 只是在同一个线程内创建了一个由事件循环管理的协程任务。loop.run_in_executor(func): 这才是真正使用多线程的地方,它是一个“桥梁”,用于将同步阻塞函数“外包”给线程池执行。
最终结论:一切都回到了协议的本质
我们所有问题的最终答案,都归结于 Python 中同步与异步两个世界之间不可逾越的语法和协议界限。
同步迭代器的
__next__()是一个同步方法,它不能awaitI/O,所以它在等待时必然会阻塞整个程序。因此,我们才需要异步迭代器,它拥有一个可等待的
__anext__()方法,允许在等待 I/O 时让出 CPU,从而为并发执行创造了可能性。
async for 就是为了驱动这个 __anext__() 方法而存在的专用语法。
通过这次探索,我们完整地构建了从“为什么需要异步”到“如何正确地实现异步并发”的完整知识链条。
更多推荐



所有评论(0)