asyncio.gather 内部原理与运行机制(详解)
适用版本:本文以的实现为准(不同版本细节可能略有差异,但核心机制一致)。其中task_list可以是Task、或Future(更广义叫 “awaitable”)。
asyncio.gather 内部原理与运行机制(详解)
适用版本:本文以 CPython 3.12 的
asyncio.gather()实现为准(不同版本细节可能略有差异,但核心机制一致)。典型用法:
note_details = await asyncio.gather(*task_list)其中
task_list可以是 协程对象、Task、或Future(更广义叫 “awaitable”)。
1. 先用一句话理解 gather
asyncio.gather() 做的事可以概括为:
- 把你给的一堆 awaitable 同时“安排”到事件循环里执行(协程会被包成
Task并被调度); - 返回一个“外层 Future”(聚合器),你
await它就能拿到 所有结果的列表(结果顺序与入参顺序一致); - 在异常/取消时,按规则把异常或取消传播到这个聚合器上。
它是 async 世界里的典型 fan-out / fan-in:
- fan-out:把多个子任务并发跑起来
- fan-in:等它们都结束后汇总结果
2. 必须掌握的 3 个概念:Coroutine / Task / Future
理解 gather 的内部机制,关键是搞清楚这三者的关系:
2.1 Coroutine(协程对象)
async def f(): ...
co = f() # co 是 coroutine 对象
- coroutine 对象本身 不会自动执行,只有被事件循环调度(通常包成 Task)后才会运行。
2.2 Task(任务)
Task 是 Future 的子类(更像“可调度的 Future”):
t = asyncio.create_task(f())
- Task 一创建,就会被事件循环“尽快”执行(严格来说:等你把控制权
await回事件循环之后)。 - Task 负责驱动协程一步步运行,并在结束时把结果/异常写回自己(Future 状态)。
2.3 Future(未来对象)
Future 表示一个“未来会有结果”的占位符。它有状态机:
- pending(未完成)
- finished(set_result / set_exception)
- cancelled(或以 CancelledError 形式结束)
你可以 await future,事件循环会在 future 完成时唤醒你。
3. gather 的对外语义(你看到的行为)
3.1 入参类型:协程 / Task / Future 都行
asyncio.gather(a, b, c) 会把入参统一变成 Future 语义:
- 如果入参是协程对象:用
ensure_future()包成 Task 并调度 - 如果入参本身就是 Future/Task:直接使用(但需要同一个 event loop)
3.2 结果顺序:按入参顺序,不按完成顺序
哪怕 b 比 a 先完成,返回的 list 仍然是 [a_result, b_result, c_result]。
3.3 return_exceptions=False(默认)
- 只要有任何一个子任务 抛异常或被取消,gather 会立刻把这个异常“传播”到外层聚合器(你
await gather会 raise)。 - 重要:其它子任务不会因此被自动取消,它们通常会继续在后台跑完(这一点和
TaskGroup不同)。
3.4 return_exceptions=True
- 子任务的异常不会 raise 出去,而是当作“结果”放进结果列表里(异常对象本身会出现在 list 中)。
- 子任务被取消时,会在结果列表里对应位置放一个
CancelledError(...)对象。
3.5 “重复入参”去重
gather 内部会对 相同的 Future 对象做去重(同一个对象传两次不会创建两份执行单元):
fut = asyncio.create_task(f())
await asyncio.gather(fut, fut) # 结果列表会有两个位置,但背后只跑一个 fut
4. gather 的内部实现:CPython 3.12 代码级拆解
下面按 CPython 3.12 的核心实现思路拆解(用“伪代码 + 关键点”讲清楚,不贴满全部源码)。
4.1 第一阶段:把所有入参统一成 children futures(关键:ensure_future)
核心逻辑类似:
children = []
for arg in coros_or_futures:
fut = ensure_future(arg) # coroutine -> Task;Future/Task -> 原样
children.append(fut)
ensure_future() 的意义:
- 如果
arg是 coroutine:创建一个 Task,让它进入事件循环调度队列 - 如果
arg是 Future:直接返回它(并检查 loop 一致)
同时 gather 会缓存 arg_to_fut,避免同一个 Future 被重复包装。
4.2 第二阶段:创建“外层聚合器” _GatheringFuture
gather 返回的不是 Task,而是一个 _GatheringFuture(Future 子类):
- 它持有
children列表 - 它重写了
cancel():取消外层时,会把取消传播到每个 child
CPython 3.12 的 _GatheringFuture.cancel() 逻辑本质是:
- 遍历 children,逐个
child.cancel(msg) - 只要真的取消到了某些 child,就把
self._cancel_requested=True - 注意:它不像普通 Future 那样立刻把自己标记为 cancelled,而是等后面回调收尾
这让 gather 的 cancel 更像 Task.cancel():先“请求取消”,等协程/任务实际响应后才最终结束。
4.3 第三阶段:给每个 child 挂 done_callback 做“汇总”
gather 会给每个 未完成 的 child 挂一个 _done_callback:
这个回调做 4 件事:
nfinished += 1(计数)- 如果外层 outer 已经 done:
- 调用
fut.exception()把异常标记为“已取走”,避免 “Task exception was never retrieved”
- 调用
- 如果
return_exceptions=False且本 fut 有异常/取消:- 立刻
outer.set_exception(...),让await gather马上抛出
- 立刻
- 如果
nfinished == nfuts(全部结束):- 按 children 原顺序构造
results[]:对每个 child- cancelled ->
CancelledError(...) - exception -> 异常对象(或 raise 出去取到异常)
- success ->
fut.result()
- cancelled ->
- 若 outer 之前被请求 cancel(
_cancel_requested=True),则无论return_exceptions取值,最终都让 outer 以 CancelledError 结束 - 否则
outer.set_result(results)
- 按 children 原顺序构造
4.4 “已完成的 child”如何处理(eager completion)
如果某些 child 在 gather 创建时就已经 done,gather 会把它们先放进 done_futs,最后再手动跑一遍 _done_callback(fut),使得:
- gather 可能 同步地就完成(所有 child 都 done 的情况下)
- 同时保证 outer 已经创建完成后再做汇总(避免 outer 还没创建就 set_result)
5. 运行机制:用时间线理解它怎么“跑起来”
以 await asyncio.gather(*task_list) 为例:
- 你调用
asyncio.gather(...)- gather 立刻把协程包装成 Task(
ensure_future) - 给每个 child 安装 done_callback
- 返回 outer(一个 Future)
- gather 立刻把协程包装成 Task(
- 你
await outer- 当前协程把控制权交回事件循环
- 事件循环开始调度这些 Task 运行(并发推进)
- 每个 child 完成时,事件循环触发它的 done_callback
- 计数、处理异常、或在全部完成时汇总 results
- 当 outer 被
set_result或set_exception- 事件循环唤醒
await gather的地方 - 你拿到结果 list 或抛出异常
- 事件循环唤醒
6. 取消与异常:最容易踩坑的地方
6.1 外层 gather 被 cancel:会取消所有子任务
当你对外层聚合器 cancel()(或 await gather 的 task 被取消)时:
- gather 会向所有 children 发出 cancel 请求
- 但外层不会立刻变成 cancelled,而是等 children 回调收尾
- 最终外层会以
CancelledError结束(即使return_exceptions=True也一样)
6.2 某个子任务被 cancel:外层不会“跟着被 cancel”
CPython 3.12 的语义是:
- 子任务取消被当作它“抛出了 CancelledError”
- 外层 gather 不会变成 cancelled 状态(而是以异常
CancelledError完成,表现为await gatherraise CancelledError) - 这样做是为了避免“一个子任务取消导致其它子任务被连带取消”(更可控)
6.3 默认(return_exceptions=False)遇到异常:外层立刻抛,但其它子任务继续跑
这是 gather 和 TaskGroup 最大的区别之一:
- gather:一个子任务异常 -> outer 立刻
set_exception,你这边await立刻抛出;但其它子任务一般不会自动停。 - TaskGroup(3.11+):一个任务异常 -> 会取消同组其它任务(结构化并发)
因此:如果你用 gather 做“必须全成全败”的批处理,建议用 TaskGroup 或自己在 except 里取消剩余任务。
7. 实战示例:理解“异常不取消兄弟任务”
import asyncio
async def ok():
await asyncio.sleep(1)
print("ok done")
return "OK"
async def bad():
await asyncio.sleep(0.2)
raise RuntimeError("boom")
async def main():
try:
await asyncio.gather(ok(), bad()) # 默认 return_exceptions=False
except Exception as e:
print("gather raised:", e)
await asyncio.sleep(2) # 观察 ok 还会不会继续跑
asyncio.run(main())
你会发现 bad() 抛出后,gather 很快抛异常,但 ok() 仍然会在后台继续执行并打印 ok done。
8. 什么时候用 gather,什么时候别用
适合用 gather
- “并发发起多个 I/O 请求,然后等全部结果回来”的 fan-out/fan-in。
- “允许部分失败”,用
return_exceptions=True汇总再逐个处理。
不太适合用 gather(建议 TaskGroup / 自己管理取消)
- “只要一个失败就要停掉剩余任务”的全成全败任务组(结构化并发诉求)。
- 你需要严格的资源上限控制(gather 不限流,需要配合 Semaphore / 队列)。
9. 限流提醒:gather 不等于“并发控制”
gather 只负责“聚合等待”,不负责“并发上限”。
如果 task_list 很大,你通常需要配合:
asyncio.Semaphore控制并发上限- 或分批 gather(batch)
- 或用
asyncio.as_completed边完成边消费
10. 小结(记住这 5 条就够用了)
- gather 会用
ensure_future():协程会被包成 Task 并被调度执行 - 结果顺序按入参顺序,不按完成顺序
- 默认遇到异常会立刻抛,但其它任务通常继续跑(不会自动取消)
- 外层被 cancel 会 cancel 所有子任务(最终以 CancelledError 结束)
return_exceptions=True会把异常当“结果”收集,但外层取消仍会传播取消
更多推荐


所有评论(0)