asyncio.gather 内部原理与运行机制(详解)

适用版本:本文以 CPython 3.12asyncio.gather() 实现为准(不同版本细节可能略有差异,但核心机制一致)。

典型用法:

note_details = await asyncio.gather(*task_list)

其中 task_list 可以是 协程对象Task、或 Future(更广义叫 “awaitable”)。


1. 先用一句话理解 gather

asyncio.gather() 做的事可以概括为:

  1. 把你给的一堆 awaitable 同时“安排”到事件循环里执行(协程会被包成 Task 并被调度);
  2. 返回一个“外层 Future”(聚合器),你 await 它就能拿到 所有结果的列表(结果顺序与入参顺序一致);
  3. 在异常/取消时,按规则把异常或取消传播到这个聚合器上。

它是 async 世界里的典型 fan-out / fan-in

  • fan-out:把多个子任务并发跑起来
  • fan-in:等它们都结束后汇总结果

2. 必须掌握的 3 个概念:Coroutine / Task / Future

理解 gather 的内部机制,关键是搞清楚这三者的关系:

2.1 Coroutine(协程对象)

async def f(): ...
co = f()  # co 是 coroutine 对象
  • coroutine 对象本身 不会自动执行,只有被事件循环调度(通常包成 Task)后才会运行。

2.2 Task(任务)

Task 是 Future 的子类(更像“可调度的 Future”):

t = asyncio.create_task(f())
  • Task 一创建,就会被事件循环“尽快”执行(严格来说:等你把控制权 await 回事件循环之后)。
  • Task 负责驱动协程一步步运行,并在结束时把结果/异常写回自己(Future 状态)。

2.3 Future(未来对象)

Future 表示一个“未来会有结果”的占位符。它有状态机:

  • pending(未完成)
  • finished(set_result / set_exception)
  • cancelled(或以 CancelledError 形式结束)

你可以 await future,事件循环会在 future 完成时唤醒你。


3. gather 的对外语义(你看到的行为)

3.1 入参类型:协程 / Task / Future 都行

asyncio.gather(a, b, c) 会把入参统一变成 Future 语义:

  • 如果入参是协程对象:ensure_future() 包成 Task 并调度
  • 如果入参本身就是 Future/Task:直接使用(但需要同一个 event loop)

3.2 结果顺序:按入参顺序,不按完成顺序

哪怕 ba 先完成,返回的 list 仍然是 [a_result, b_result, c_result]

3.3 return_exceptions=False(默认)

  • 只要有任何一个子任务 抛异常或被取消,gather 会立刻把这个异常“传播”到外层聚合器(你 await gather 会 raise)。
  • 重要:其它子任务不会因此被自动取消,它们通常会继续在后台跑完(这一点和 TaskGroup 不同)。

3.4 return_exceptions=True

  • 子任务的异常不会 raise 出去,而是当作“结果”放进结果列表里(异常对象本身会出现在 list 中)。
  • 子任务被取消时,会在结果列表里对应位置放一个 CancelledError(...) 对象。

3.5 “重复入参”去重

gather 内部会对 相同的 Future 对象做去重(同一个对象传两次不会创建两份执行单元):

fut = asyncio.create_task(f())
await asyncio.gather(fut, fut)  # 结果列表会有两个位置,但背后只跑一个 fut

4. gather 的内部实现:CPython 3.12 代码级拆解

下面按 CPython 3.12 的核心实现思路拆解(用“伪代码 + 关键点”讲清楚,不贴满全部源码)。

4.1 第一阶段:把所有入参统一成 children futures(关键:ensure_future)

核心逻辑类似:

children = []
for arg in coros_or_futures:
    fut = ensure_future(arg)  # coroutine -> Task;Future/Task -> 原样
    children.append(fut)

ensure_future() 的意义:

  • 如果 arg 是 coroutine:创建一个 Task,让它进入事件循环调度队列
  • 如果 arg 是 Future:直接返回它(并检查 loop 一致)

同时 gather 会缓存 arg_to_fut,避免同一个 Future 被重复包装。

4.2 第二阶段:创建“外层聚合器” _GatheringFuture

gather 返回的不是 Task,而是一个 _GatheringFuture(Future 子类):

  • 它持有 children 列表
  • 它重写了 cancel()取消外层时,会把取消传播到每个 child

CPython 3.12 的 _GatheringFuture.cancel() 逻辑本质是:

  1. 遍历 children,逐个 child.cancel(msg)
  2. 只要真的取消到了某些 child,就把 self._cancel_requested=True
  3. 注意:它不像普通 Future 那样立刻把自己标记为 cancelled,而是等后面回调收尾

这让 gather 的 cancel 更像 Task.cancel():先“请求取消”,等协程/任务实际响应后才最终结束。

4.3 第三阶段:给每个 child 挂 done_callback 做“汇总”

gather 会给每个 未完成 的 child 挂一个 _done_callback

这个回调做 4 件事:

  1. nfinished += 1(计数)
  2. 如果外层 outer 已经 done:
    • 调用 fut.exception() 把异常标记为“已取走”,避免 “Task exception was never retrieved”
  3. 如果 return_exceptions=False 且本 fut 有异常/取消:
    • 立刻 outer.set_exception(...),让 await gather 马上抛出
  4. 如果 nfinished == nfuts(全部结束):
    • 按 children 原顺序构造 results[]:对每个 child
      • cancelled -> CancelledError(...)
      • exception -> 异常对象(或 raise 出去取到异常)
      • success -> fut.result()
    • 若 outer 之前被请求 cancel(_cancel_requested=True),则无论 return_exceptions 取值,最终都让 outer 以 CancelledError 结束
    • 否则 outer.set_result(results)

4.4 “已完成的 child”如何处理(eager completion)

如果某些 child 在 gather 创建时就已经 done,gather 会把它们先放进 done_futs,最后再手动跑一遍 _done_callback(fut),使得:

  • gather 可能 同步地就完成(所有 child 都 done 的情况下)
  • 同时保证 outer 已经创建完成后再做汇总(避免 outer 还没创建就 set_result)

5. 运行机制:用时间线理解它怎么“跑起来”

await asyncio.gather(*task_list) 为例:

  1. 你调用 asyncio.gather(...)
    • gather 立刻把协程包装成 Task(ensure_future
    • 给每个 child 安装 done_callback
    • 返回 outer(一个 Future)
  2. await outer
    • 当前协程把控制权交回事件循环
  3. 事件循环开始调度这些 Task 运行(并发推进)
  4. 每个 child 完成时,事件循环触发它的 done_callback
    • 计数、处理异常、或在全部完成时汇总 results
  5. 当 outer 被 set_resultset_exception
    • 事件循环唤醒 await gather 的地方
    • 你拿到结果 list 或抛出异常

6. 取消与异常:最容易踩坑的地方

6.1 外层 gather 被 cancel:会取消所有子任务

当你对外层聚合器 cancel()(或 await gather 的 task 被取消)时:

  • gather 会向所有 children 发出 cancel 请求
  • 但外层不会立刻变成 cancelled,而是等 children 回调收尾
  • 最终外层会以 CancelledError 结束(即使 return_exceptions=True 也一样)

6.2 某个子任务被 cancel:外层不会“跟着被 cancel”

CPython 3.12 的语义是:

  • 子任务取消被当作它“抛出了 CancelledError”
  • 外层 gather 不会变成 cancelled 状态(而是以异常 CancelledError 完成,表现为 await gather raise CancelledError)
  • 这样做是为了避免“一个子任务取消导致其它子任务被连带取消”(更可控)

6.3 默认(return_exceptions=False)遇到异常:外层立刻抛,但其它子任务继续跑

这是 gather 和 TaskGroup 最大的区别之一:

  • gather:一个子任务异常 -> outer 立刻 set_exception,你这边 await 立刻抛出;但其它子任务一般不会自动停
  • TaskGroup(3.11+):一个任务异常 -> 会取消同组其它任务(结构化并发)

因此:如果你用 gather 做“必须全成全败”的批处理,建议用 TaskGroup 或自己在 except 里取消剩余任务。


7. 实战示例:理解“异常不取消兄弟任务”

import asyncio

async def ok():
    await asyncio.sleep(1)
    print("ok done")
    return "OK"

async def bad():
    await asyncio.sleep(0.2)
    raise RuntimeError("boom")

async def main():
    try:
        await asyncio.gather(ok(), bad())  # 默认 return_exceptions=False
    except Exception as e:
        print("gather raised:", e)
    await asyncio.sleep(2)  # 观察 ok 还会不会继续跑

asyncio.run(main())

你会发现 bad() 抛出后,gather 很快抛异常,但 ok() 仍然会在后台继续执行并打印 ok done


8. 什么时候用 gather,什么时候别用

适合用 gather

  • “并发发起多个 I/O 请求,然后等全部结果回来”的 fan-out/fan-in。
  • “允许部分失败”,用 return_exceptions=True 汇总再逐个处理。

不太适合用 gather(建议 TaskGroup / 自己管理取消)

  • “只要一个失败就要停掉剩余任务”的全成全败任务组(结构化并发诉求)。
  • 你需要严格的资源上限控制(gather 不限流,需要配合 Semaphore / 队列)。

9. 限流提醒:gather 不等于“并发控制”

gather 只负责“聚合等待”,不负责“并发上限”。
如果 task_list 很大,你通常需要配合:

  • asyncio.Semaphore 控制并发上限
  • 或分批 gather(batch)
  • 或用 asyncio.as_completed 边完成边消费

10. 小结(记住这 5 条就够用了)

  1. gather 会用 ensure_future():协程会被包成 Task 并被调度执行
  2. 结果顺序按入参顺序,不按完成顺序
  3. 默认遇到异常会立刻抛,但其它任务通常继续跑(不会自动取消)
  4. 外层被 cancel 会 cancel 所有子任务(最终以 CancelledError 结束)
  5. return_exceptions=True 会把异常当“结果”收集,但外层取消仍会传播取消
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐