本文档记录了一次从同步迭代到异步并发的完整探索过程,以一系列由浅入深的问题为线索,旨在彻底澄清 Python 异步编程中的核心概念。


第零章:什么是异步迭代器 (Asynchronous Iterator)?

在开始比较之前,我们首先需要定义我们的主角。

定义: 一个对象如果实现了异步迭代协议 (Asynchronous Iteration Protocol),那么它就是一个异步可迭代对象。调用它的 __aiter__() 方法会返回一个异步迭代器

这个协议要求实现以下两个核心的 async def 方法:

  1. __aiter__(self):

    • async for 循环在开始时自动调用。
    • 必须返回一个异步迭代器对象。
  2. __anext__(self):

    • async for 循环在每次迭代时自动 await
    • 它负责异步地获取并返回下一个元素。
    • 当没有更多元素时,它必须抛出 StopAsyncIteration 异常。

“经典”实现示例

让我们用一个手写的类来创建一个异步迭代器,它会每隔 0.5 秒计数一次。

import asyncio

class AsyncCounter:
    """一个经典的异步迭代器类"""
    def __init__(self, limit):
        self.limit = limit
        self.current = 0

    async def __aiter__(self):
        # 返回自身,因为它自己就是迭代器
        return self

    async def __anext__(self):
        if self.current < self.limit:
            # 模拟一个耗时的 I/O 操作。
            # 这里的 await 是关键:它只能在 async def 方法中被使用。
            # 一个普通的同步迭代器的 __next__ 方法内不能 await 任何东西。
            await asyncio.sleep(0.5)
            self.current += 1
            return self.current
        else:
            # 迭代结束,抛出 StopAsyncIteration
            raise StopAsyncIteration

async def main():
    print("开始遍历异步迭代器...")
    # async for 是唯一能与异步迭代器交互的语法
    async for number in AsyncCounter(3):
        print(f"拿到了数字: {number}")

# asyncio.run(main())
# 输出会每隔 0.5 秒打印一个数字。

这个 AsyncCounter 类通过手动实现 __aiter____anext__,完整地展示了异步迭代器协议的原理。


第一章:本质区别 —— forasync for 的协议鸿沟

这是理解一切异步迭代的基础。forasync for 并非可以互换的工具,它们服务于两个完全不同的世界。

核心结论:async for 是唯一能够遍历“异步迭代器”的语法,而普通 for 只能遍历“同步迭代器”。

  • 同步世界 (for):

    • 协议: __iter__()__next__()
    • 流程: for 循环调用对象的 __iter__() 获得一个同步迭代器,然后反复调用其 __next__() 方法获取下一个元素。
    • 限制: __next__() 是一个同步方法,它不能 await I/O 操作。如果遇到 I/O,它只能阻塞整个程序。
  • 异步世界 (async for):

    • 协议: __aiter__()__anext__()
    • 流程: async for 循环调用对象的 __aiter__() 获得一个异步迭代器,然后反复 await__anext__() 方法来获取下一个元素。
    • 能力: 因为 __anext__ 是一个 async def 方法,所以它可以在内部 await 耗时的 I/O 操作(如网络请求、数据库查询),并在等待期间让出 CPU。

第二章:异步的意义 —— “非阻塞 I/O” 的价值

问题: 既然循环本身是串行的,异步的意义何在?

结论: 异步的核心价值不是让循环并发,而是在面对 I/O 等待时**“非阻塞” (Non-Blocking)** 的能力。

比喻:两位厨师烧水

  • 同步厨师 (同步 I/O): 把水壶放上炉子,然后站在原地死等,直到水烧开。在此期间,厨房(程序)完全停滞。
  • 异步厨师 (异步 I/O): 把水壶放上炉子,然后立刻转身去干别的事(切菜、洗盘子)。当水烧开时(I/O 完成),他再回来处理。

await 一个 I/O 操作,就相当于异步厨师转身去干别的事,这让程序在等待期间能保持响应或执行其他任务。


第三章:异步如何实现并发 —— “分发-收集”模型

问题: 既然 await 能让出 CPU,为何我的异步串行循环没有并发加速?

核心结论: await 只是创造了“让出 CPU”的机会。如果事件循环的任务队列中没有其他任务,CPU 在让出后也只能原地等待。真正的并发来自于**“串行创建,并发执行”**。

伪代码对比

  • 异步串行 (性能差)

    async for item in source:
        # await 让出 CPU,但没有其他任务可做,只能等
        await process(item) 
    
  • 异步并发 (性能好)

    tasks = []
    # 1. for 循环作为“分发器”,快速、串行地创建任务
    for item in data_source:
        tasks.append(asyncio.create_task(process(item)))
    
    # 2. gather 作为“收集者”,并发地执行所有任务
    await asyncio.gather(*tasks)
    

关键点

并发的发生需要两个条件:

  1. 等待点: await 在任务内部提供了暂停和让出 CPU 的机会。
  2. 并发任务: asyncio.create_task 在外部创造了多个可以让 CPU 在等待期间去切换执行的任务。

第四章:常见误区澄清

1. async for 本身的循环会并发吗?

绝对不会。 async for 循环本身是严格串行的。它必须完全结束第 N 次循环(包括获取元素和执行循环体),才能开始第 N+1 次循环。

2. asyncio.create_task 是多线程吗?

绝对不是。 asyncio 的核心是单线程并发

  • asyncio.create_task(coro): 只是在同一个线程内创建了一个由事件循环管理的协程任务。
  • loop.run_in_executor(func): 这才是真正使用多线程的地方,它是一个“桥梁”,用于将同步阻塞函数“外包”给线程池执行。

最终结论:一切都回到了协议的本质

我们所有问题的最终答案,都归结于 Python 中同步与异步两个世界之间不可逾越的语法和协议界限。

同步迭代器的 __next__() 是一个同步方法,它不能 await I/O,所以它在等待时必然会阻塞整个程序。

因此,我们才需要异步迭代器,它拥有一个可等待的 __anext__() 方法,允许在等待 I/O 时让出 CPU,从而为并发执行创造了可能性。

async for 就是为了驱动这个 __anext__() 方法而存在的专用语法。

通过这次探索,我们完整地构建了从“为什么需要异步”到“如何正确地实现异步并发”的完整知识链条。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐