Python异步编程:asyncio高效并发指南
本文介绍了Python异步编程库asyncio的核心用法。主要内容包括:1)async/await基础语法,通过事件循环实现协程的暂停与唤醒;2)asyncio.run()作为程序启动器;3)asyncio.gather()实现并发执行多个协程;4)aiohttp库在网络请求中的实际应用;5)create_task()提供更灵活的任务控制。文章通过多个示例演示了异步编程的优势,比较了gather和
asyncio 是 Python 官方推荐的并发编程库,专门用于处理异步I/O。它使用 async/await 语法,用单线程高效处理成千上万个并发连接。
本文将从 asyncio 的核心概念入手,通过 gather 掌握基础并发,用 aiohttp 演示真实的网络I/O,最后深入到 create_task 了解更灵活的高级并发控制。
1. 核心概念:async / await
这是使用 asyncio 的基础。
-
async def:-
用来定义一个协程函数 (coroutine function)。
-
调用它并不会立即执行函数体,而是返回一个协程对象。
-
-
await:-
只能在
async def函数内部使用。 -
await后面通常跟着另一个协程对象或支持await的对象(比如asyncio.sleep())。 -
这是协程的“暂停点”。当程序执行到
await时,它会暂停当前协程的执行,并将控制权交还给事件循环,事件循环可以去执行别的协程。 -
当等待的任务完成后,事件循环会在
await的地方“唤醒”这个协程,让它继续往下执行。
-
2. 启动器:asyncio.run()
asyncio.run(coroutine) 是 Python 3.7+ 推荐的、最简单的启动 asyncio 程序的方式。
-
它会创建一个新的事件循环(Event Loop)。
-
它会运行传入的主协程(通常是
main函数)。 -
它会管理所有
await引发的暂停和唤醒。 -
在主协程执行完毕后,它会自动关闭事件循环。
3. 【示例 1】基本的串行协程
先来看一个最简单的例子,它使用 asyncio.sleep() 来模拟一个非阻塞的I/O操作。
import asyncio
import time
async def say_hello(delay, name):
"""一个简单的协程,模拟I/O等待"""
print(f"[{time.strftime('%X')}] {name}: 开始... (等待 {delay} 秒)")
# 关键:使用 asyncio.sleep() 而不是 time.sleep()
await asyncio.sleep(delay)
print(f"[{time.strftime('%X')}] {name}: ...结束!")
return f"{name} 的任务完成"
async def main():
"""主协程"""
print(f"[{time.strftime('%X')}] Main: 开始执行...")
# 必须使用 await 来“驱动”协程
result = await say_hello(2, "任务A")
print(f"[{time.strftime('%X')}] Main: 收到结果: {result}")
print(f"[{time.strftime('%X')}] Main: 执行完毕。")
if __name__ == "__main__":
start_time = time.time()
asyncio.run(main())
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f} 秒")
运行结果:
[20:30:01] Main: 开始执行...
[20:30:01] 任务A: 开始... (等待 2 秒)
[20:30:03] 任务A: ...结束!
[20:30:03] Main: 收到结果: 任务A 的任务完成
[20:30:03] Main: 执行完毕。
总耗时: 2.00 秒
在这个例子里,
main函数await了say_hello,所以它必须等say_hello跑完才能继续。这还是串行的。
4. 核心用法:asyncio.gather() (实现并发)
asyncio 真正的威力在于并发执行。asyncio.gather(*coroutines) 就是用来实现这个目的的。它会接收一个或多个协程,并发地运行它们,并按顺序返回所有结果。
5. 【示例 2】并发的协程 (gather)
修改 main 函数,让它同时运行两个 say_hello 任务。
import asyncio
import time
# (say_hello 函数和上面一样,这里省略)
async def say_hello(delay, name):
print(f"[{time.strftime('%X')}] {name}: 开始... (等待 {delay} 秒)")
await asyncio.sleep(delay)
print(f"[{time.strftime('%X')}] {name}: ...结束!")
return f"{name} 的任务完成"
async def main_concurrent():
print(f"[{time.strftime('%X')}] Main: 开始执行 (并发模式)...")
# 1. 创建协程对象
coro_a = say_hello(2, "任务A")
coro_b = say_hello(3, "任务B")
# 2. 使用 asyncio.gather() 来“并发”运行它们
print(f"[{time.strftime('%X')}] Main: 准备并发运行 A 和 B...")
results = await asyncio.gather(coro_a, coro_b)
print(f"[{time.strftime('%X')}] Main: A 和 B 都已完成。")
print(f"[{time.strftime('%X')}] Main: 收到结果: {results}")
if __name__ == "__main__":
start_time = time.time()
asyncio.run(main_concurrent())
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f} 秒")
运行结果 (重点看时间!):
[20:35:01] Main: 开始执行 (并发模式)...
[20:35:01] Main: 准备并发运行 A 和 B...
[20:35:01] 任务A: 开始... (等待 2 秒) <-- 任务A开始
[20:35:01] 任务B: 开始... (等待 3 秒) <-- 任务B几乎同时开始
[20:35:03] 任务A: ...结束! <-- 2秒后,A结束
[20:35:04] 任务B: ...结束! <-- 3秒后,B结束
[20:35:04] Main: A 和 B 都已完成。
[20:35:04] Main: 收到结果: ['任务A 的任务完成', '任务B 的任务完成']
总耗时: 3.01 秒
分析:
两个任务并发执行,总耗时只取决于最长的那个任务(3秒)。如果是串行,总耗时会是 2 + 3 = 5 秒。
⚠️ 重要提醒:asyncio.sleep vs time.sleep
这是新手最容易犯的错误:
-
await asyncio.sleep(1): 非阻塞 (Non-blocking)。-
它告诉事件循环:“我暂停1秒,请先去忙别的协程。” 此时CPU可以去执行其他任务。
-
-
time.sleep(1): 阻塞 (Blocking)。-
它告诉整个线程:“所有人,原地暂停1秒。”
time.sleep()会冻结整个事件循环,导致所有并发任务都一起卡住。
-
在
async函数中,永远不要使用time.sleep()、requests.get()这类同步阻塞的I/O代码。必须使用异步库(如asyncio.sleep()、aiohttp.get())并配合await。
6. 【示例 3】真实I/O:使用 aiohttp 并发抓取网页
asyncio 配合 aiohttp(一个异步网络请求库)能真正展示其在I/O密集型任务上的优势。
准备工作:
pip install aiohttp
代码:
import asyncio
import aiohttp
import time
# "delay/1" 会让服务器等待1秒后再返回数据
URLS = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
async def fetch(session, url):
"""使用 aiohttp session 发起一个异步 GET 请求"""
print(f"[{time.strftime('%X')}] 开始请求: {url}")
try:
# 关键:使用 aiohttp 的 session.get(),并用 await 挂起
async with session.get(url) as response:
data = await response.text()
print(f"[{time.strftime('%X')}] 完成请求: {url}, 状态: {response.status}")
return f"{url} - 响应长度: {len(data)}"
except Exception as e:
print(f"[{time.strftime('%X')}] 请求失败: {url}, 错误: {e}")
return f"{url} - 请求失败"
async def main_aiohttp():
"""主协程,用于创建并并发执行所有抓取任务"""
print(f"[{time.strftime('%X')}] Main: 开始执行 (aiohttp 并发模式)...")
async with aiohttp.ClientSession() as session:
# 1. 创建所有任务(协程对象)的列表
tasks = []
for url in URLS:
tasks.append(fetch(session, url))
# 2. 使用 asyncio.gather() 来并发运行所有任务
print(f"[{time.strftime('%X')}] Main: 准备并发运行 {len(tasks)} 个抓取任务...")
results = await asyncio.gather(*tasks)
print(f"[{time.strftime('%X')}] Main: 所有任务已完成。")
print("--- 运行结果 ---")
for res in results:
print(res)
print("------------------")
if __name__ == "__main__":
start_time = time.time()
asyncio.run(main_aiohttp())
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f} 秒")
预期运行结果:
[10:01:15] Main: 开始执行 (aiohttp 并发模式)...
[10:01:15] Main: 准备并发运行 3 个抓取任务...
[10:01:15] 开始请求: https://httpbin.org/delay/1 <-- 任务1开始
[10:01:15] 开始请求: https://httpbin.org/delay/1 <-- 任务2开始
[10:01:15] 开始请求: https://httpbin.org/delay/1 <-- 任务3开始
[10:01:16] 完成请求: https://httpbin.org/delay/1, 状态: 200 <-- 约1秒后,任务1完成
[10:01:16] 完成请求: https://httpbin.org/delay/1, 状态: 200 <-- 任务2几乎同时完成
[10:01:16] 完成请求: https://httpbin.org/delay/1, 状态: 200 <-- 任务3几乎同时完成
[10:01:16] Main: 所有任务已完成。
--- 运行结果 ---
... (省略3行响应长度) ...
------------------
总耗时: 1.12 秒 <-- 关键点!
分析: 三个“耗时1秒”的网络请求,总耗时仅为 1.12 秒。这证明了
asyncio把所有等待网络响应的时间都利用起来去处理其他任务了。
7. 高级并发:asyncio.create_task()
asyncio.gather() “安排”并“等待”所有任务。而 asyncio.create_task() 是一个更低级的接口,它只做一件事:立即将一个协程安排到事件循环上(“后台”执行),并返回一个 Task 对象。
它不会自动 await 等待任务完成。这提供了极大的灵活性,因为可以将“启动”和“等待”分离。
8. 【示例 4】使用 create_task (灵活控制)
create_task 的真正威力在于“边运行边干活”。
import asyncio
import time
# (say_hello 函数和上面一样,这里省略)
async def say_hello(delay, name):
print(f"[{time.strftime('%X')}] {name}: 开始... (等待 {delay} 秒)")
await asyncio.sleep(delay)
print(f"[{time.strftime('%X')}] {name}: ...结束!")
return f"{name} 的任务完成"
async def main_flexible():
print(f"[{time.strftime('%X')}] Main: 正在创建任务...")
# 1. 立即创建任务,任务会“在后台”开始执行
task_a = asyncio.create_task(say_hello(2, "任务A"))
task_b = asyncio.create_task(say_hello(3, "任务B"))
# --- 灵活性的体现 ---
print(f"[{time.strftime('%X')}] Main: 任务已在后台运行... 我先干点别的活 (等待1秒)")
await asyncio.sleep(1) # 模拟 main 函数自己也在做一些I/O工作
print(f"[{time.strftime('%X')}] Main: 我自己的活干完了。")
# ----------------------
# T=0s: A(2s) 和 B(3s) 开始
# T=1s: Main 自己的活干完了。
# 2. 现在,我们必须分别 await 它们来“收集”结果
print(f"[{time.strftime('%X')}] Main: 开始等待 任务A...")
result_a = await task_a
# T=2s: A 完成。 (Main 从 T=1s 开始等,只需再等 1s)
print(f"[{time.strftime('%X')}] Main: 任务A 完成。开始等待 任务B...")
result_b = await task_b
# T=3s: B 完成。 (Main 从 T=2s 开始等,只需再等 1s)
print(f"[{time.strftime('%X')}] Main: 任务B 完成。")
print(f"[{time.strftime('%X')}] Main: 收到结果: {[result_a, result_b]}")
if __name__ == "__main__":
start_time = time.time()
asyncio.run(main_flexible())
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f} 秒") # 总耗时依然是 3.xx 秒
9. 总结:gather vs create_task
-
asyncio.gather()-
用途:最常用的方式。用于“一次性启动一组任务,并等待它们全部完成”。
-
优点:简洁,意图明确。自动收集所有结果并按顺序返回。
-
用法:
results = await asyncio.gather(coro1, coro2, ...)
-
-
asyncio.create_task()-
用途:需要更精细控制任务时使用。例如“启动一个后台任务”、“在等待A的同时做B”、“给某个任务设置超时”。
-
优点:非常灵活。将“启动”和“等待”解耦。
-
用法:
task1 = asyncio.create_task(coro1) # 立即安排执行 task2 = asyncio.create_task(coro2) # 立即安排执行 # ... 在这里可以做其他工作 ... result1 = await task1 # 稍后单独等待 result2 = await task2 # 再单独等待
-
对于绝大多数“并发运行N个请求”的场景,asyncio.gather() 是更简单、更推荐的选择。
更多推荐


所有评论(0)