Python异步实战:把asyncio真正讲明白,从基础到进阶(下)
上半篇我们已经把asyncio最核心的几件事理顺了:什么是协程,await到底在等什么,事件循环怎么调度任务,以及为什么很多人明明写了异步,结果程序还是串行但真正到了项目里,问题往往不会停留在语法会不会写这一层,更常见的挑战是:一批任务怎么并发收结果,超时怎么处理,任务怎么取消,并发量怎么限制,老的阻塞代码又该怎么接进异步系统里。
上半篇我们已经把asyncio最核心的几件事理顺了:什么是协程,await到底在等什么,事件循环怎么调度任务,以及为什么很多人明明写了异步,结果程序还是串行
但真正到了项目里,问题往往不会停留在语法会不会写这一层,更常见的挑战是:一批任务怎么并发收结果,超时怎么处理,任务怎么取消,并发量怎么限制,老的阻塞代码又该怎么接进异步系统里
所以下半篇我们不再停留在概念层面,而是直接往工程实战走,你会看到asyncio在真实项目里最像什么、最常解决哪些问题,以及最容易踩到哪些坑,看完这一部分,你对异步的理解就不只是能看懂代码,而是会更接近知道在项目里该怎么用
四、代码实战
4.1 批量请求接口为什么适合asyncio
假设你现在要同时请求4个远程服务,同步写法会一个一个等,异步写法则可以一起发出去这里我先用asyncio.sleep()模拟网络延迟:
import asyncio
import time
async def fake_request(name, delay):
print(f"{name} 发起请求")
await asyncio.sleep(delay)
print(f"{name} 返回结果")
return f"{name} 的数据"
async def main():
start = time.perf_counter()
results = await asyncio.gather(
fake_request("接口A", 2),
fake_request("接口B", 1),
fake_request("接口C", 3),
fake_request("接口D", 2),
)
cost = time.perf_counter() - start
print(results)
print(f"总耗时: {cost:.2f} 秒")
asyncio.run(main())
这个例子虽然是假的延迟,但思路和真实接口调用是一模一样的:
请求可以一起发
执行层面上谁先回来谁先完成
gather() 收到的结果列表仍然按传入顺序排列
总耗时取决于最慢的那个,不是所有耗时相加
补充一点实战里非常关键的语义:asyncio.gather()默认return_exceptions=False,只要有一个任务抛异常,调用方会立刻收到异常,其他任务不会自动帮你兜底处理。如果你希望所有任务都跑完,再统一收结果和错误,可以用return_exceptions=True,然后自己分类处理成功值和异常对象
4.2 放到真实项目里,它像什么角色
如果把一个Web服务或者AI应用拆开看,asyncio 很像底层的调度中枢
比如一个聊天系统里,可能同时有这些等待型动作:
- 请求大模型接口
- 查询向量数据库
- 读取历史消息
- 调用外部天气服务
- 写日志和埋点
如果这些都串行排队,响应时间会非常难看;如果其中能并发的部分并发起来,链路就会顺很多
所以你可以把asyncio理解成把等待时间捡回来的那一层
五、进阶内容:真正写项目时会用到这些能力
5.1 超时控制:wait_for
项目里最怕的不是报错,而是请求一直卡着不回来。这个时候就要给任务加超时
import asyncio
async def slow_task():
try:
await asyncio.sleep(5)
return "完成"
finally:
print("slow_task 清理资源")
async def main():
task = asyncio.create_task(slow_task())
try:
result = await asyncio.wait_for(task, timeout=2)
print(result)
except asyncio.TimeoutError:
print("任务超时")
# wait_for 超时会取消被等待任务,这里通常要继续做降级或补偿
asyncio.run(main())
这个设计非常实用,因为线上系统不能无限等,超时就是系统边界的一部分,要特别注意:wait_for超时时会取消底层任务,所以协程内部应做好finally清理
5.2 任务取消:CancelledError
有时候用户主动退出、服务准备关闭、上游请求已经失效,这时候继续跑任务就没意义了
import asyncio
async def worker():
try:
while True:
print("任务运行中")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("任务被取消,开始做清理")
raise
async def main():
task = asyncio.create_task(worker())
await asyncio.sleep(3)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("主协程确认任务已取消")
asyncio.run(main())
这里的关键点不是会不会取消,而是任务取消后有没有正确做清理,比如数据库连接、文件句柄、网络会话,这些都不能随手丢着不管
另外,CancelledError一般不建议吞掉,清理完成后应继续raise,让上层知道这个任务确实被取消了
5.3 并发限制:Semaphore
异步不是说任务越多越好,有些接口有限流,有些数据库连接数有限,有些下游服务扛不住你一下打太多请求,这时候就要给并发加闸门:
import asyncio
semaphore = asyncio.Semaphore(3)
async def fetch(name):
async with semaphore:
print(f"{name} 开始")
await asyncio.sleep(2)
print(f"{name} 结束")
async def main():
tasks = [asyncio.create_task(fetch(f"任务{i}")) for i in range(1, 8)]
await asyncio.gather(*tasks)
asyncio.run(main())
这里最多只会有3个任务同时运行,这个设计在爬虫、批量接口调用、异步任务平台里特别常见
5.4 队列通信:Queue
如果你要做生产者--消费者模型,比如一个协程负责生产消息,多个协程负责消费处理,asyncio.Queue非常好用
import asyncio
async def producer(queue, n_items, n_consumers):
for i in range(n_items):
await queue.put(f"消息{i}")
print(f"生产: 消息{i}")
# 给每个消费者一个结束信号
for _ in range(n_consumers):
await queue.put(None)
async def consumer(queue, name):
while True:
item = await queue.get()
try:
if item is None:
print(f"{name} 结束")
return
print(f"{name} 消费: {item}")
await asyncio.sleep(1)
finally:
queue.task_done()
async def main():
queue = asyncio.Queue()
n_consumers = 3
consumers = [asyncio.create_task(consumer(queue, f"消费者{i}")) for i in range(n_consumers)]
await producer(queue, n_items=5, n_consumers=n_consumers)
await queue.join() # 等待所有消息被处理完
await asyncio.gather(*consumers)
asyncio.run(main())
这套模式很适合日志处理、任务调度、消息消费,实战里建议配合task_done()和join(),这样你能明确知道队列里的任务是否真的处理完成
如果后面你真要扩成多个消费者,通常还要给每个消费者都准备一个结束信号,这一点在实战里要注意
5.5 把阻塞代码接进异步系统:to_thread
很多项目不是从零开始写的,老代码里经常会混着阻塞函数,这个时候最常见的问题:协程写得没问题,但里面偷偷塞了一个同步阻塞调用,整个异步链路就被拖慢了这种情况可以用asyncio.to_thread()先做过渡
import asyncio
import time
def blocking_io():
time.sleep(3)
return "同步任务完成"
async def main():
result = await asyncio.to_thread(blocking_io)
print(result)
asyncio.run(main())
这招特别适合改造旧项目里的阻塞I/O(文件、老HTTP客户端、老SDK),但它不是CPU重计算的加速器;CPU密集任务通常要考虑多进程或独立任务系统
5.6 TaskGroup:更现代的任务管理方式
import asyncio
async def work(name, delay, fail=False):
await asyncio.sleep(delay)
if fail:
raise RuntimeError(f"{name} 失败")
print(f"{name} 完成")
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(work("任务A", 1))
tg.create_task(work("任务B", 2, fail=True))
tg.create_task(work("任务C", 3))
except* RuntimeError as eg:
print(f"捕获到 {len(eg.exceptions)} 个运行时错误")
asyncio.run(main())
TaskGroup的关键价值是结构化并发:同组里一个任务失败,其他任务会被取消,并把错误以 ExceptionGroup形式交给上层统一处理,这种要么一起受控运行,要么一起收束的语义,比散落的create_task()更稳
5.7 组合用法:并发 + 限流 + 超时 + 统一收束
import asyncio
sem = asyncio.Semaphore(3)
async def call_api(i):
async with sem:
await asyncio.sleep(0.5 + i * 0.1)
if i == 4:
raise RuntimeError("下游服务异常")
return f"ok-{i}"
async def guarded_call(i, timeout=1.2):
return await asyncio.wait_for(call_api(i), timeout=timeout)
async def main():
try:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(guarded_call(i)) for i in range(1, 7)]
except* Exception as eg:
print(f"批任务失败,共 {len(eg.exceptions)} 个异常")
else:
print([t.result() for t in tasks])
asyncio.run(main())
这段代码演示了项目里的常见组合:Semaphore控并发、wait_for控超时、TaskGroup负责整组任务的生命周期和异常收束
六、实战里容易踩的坑
6.1 只写了协程,没真正运行
错误写法:
async def hello():
print("hello")
hello()
这只是创建了协程对象,没有真正执行,正确方式是放到事件循环里运行,例如:
async def hello():
print("hello")
async def main():
task = asyncio.create_task(hello()) # create_task 需要在事件循环中调用
await task
asyncio.run(main())
6.2 协程里混入阻塞函数
这类问题特别隐蔽,最常见的几个坑:
time.sleep()
- 同步版数据库驱动
- 同步版HTTP请求库
- 很重的本地文件处理
一旦这些东西直接塞进协程,表面看是异步,实际上事件循环已经被卡住了
6.3 以为异步天然更快
异步不是无脑提速按钮,它只是在I/O等待型任务里更有效,如果你的任务本质是大量CPU计算,异步不会让它飞起来,很多时候应该考虑:
- 多进程
- C扩展
- 任务队列
- 把CPU重活拆到别的执行单元
6.4 异常处理想当然
异步场景下异常比同步代码更容易散掉,特别是一批任务并发执行时,如果你没提前想清楚:谁负责收异常、一个任务失败后要不要中断全部、超时后要不要取消其他任务,那代码很容易在边界场景里出问题
落地时建议先定一条团队约定:
- 并发批任务默认是否失败即中断全部
- 超时后是否取消同组其他任务
- 异常是就地记录后继续,还是上抛给调用方统一处理
把这三件事提前定清楚,异步代码的边界问题会少很多
更多推荐

所有评论(0)