asyncio 是 Python 官方推荐的并发编程库,专门用于处理异步I/O。它使用 async/await 语法,用单线程高效处理成千上万个并发连接。

本文将从 asyncio 的核心概念入手,通过 gather 掌握基础并发,用 aiohttp 演示真实的网络I/O,最后深入到 create_task 了解更灵活的高级并发控制。

1. 核心概念:async / await

这是使用 asyncio 的基础。

  • async def:

    • 用来定义一个协程函数 (coroutine function)。

    • 调用它并不会立即执行函数体,而是返回一个协程对象。

  • await:

    • 只能在 async def 函数内部使用

    • await 后面通常跟着另一个协程对象或支持 await 的对象(比如 asyncio.sleep())。

    • 这是协程的“暂停点”。当程序执行到 await 时,它会暂停当前协程的执行,并将控制权交还给事件循环,事件循环可以去执行别的协程。

    • 当等待的任务完成后,事件循环会在 await 的地方“唤醒”这个协程,让它继续往下执行。

2. 启动器:asyncio.run()

asyncio.run(coroutine) 是 Python 3.7+ 推荐的、最简单的启动 asyncio 程序的方式。

  • 它会创建一个新的事件循环(Event Loop)。

  • 它会运行传入的主协程(通常是 main 函数)。

  • 它会管理所有 await 引发的暂停和唤醒。

  • 在主协程执行完毕后,它会自动关闭事件循环。

3. 【示例 1】基本的串行协程

先来看一个最简单的例子,它使用 asyncio.sleep() 来模拟一个非阻塞的I/O操作。

import asyncio
import time

async def say_hello(delay, name):
    """一个简单的协程,模拟I/O等待"""
    print(f"[{time.strftime('%X')}] {name}: 开始... (等待 {delay} 秒)")
    
    # 关键:使用 asyncio.sleep() 而不是 time.sleep()
    await asyncio.sleep(delay) 
    
    print(f"[{time.strftime('%X')}] {name}: ...结束!")
    return f"{name} 的任务完成"

async def main():
    """主协程"""
    print(f"[{time.strftime('%X')}] Main: 开始执行...")
    
    # 必须使用 await 来“驱动”协程
    result = await say_hello(2, "任务A") 
    
    print(f"[{time.strftime('%X')}] Main: 收到结果: {result}")
    print(f"[{time.strftime('%X')}] Main: 执行完毕。")

if __name__ == "__main__":
    start_time = time.time()
    asyncio.run(main()) 
    end_time = time.time()
    print(f"总耗时: {end_time - start_time:.2f} 秒")

运行结果:

[20:30:01] Main: 开始执行...
[20:30:01] 任务A: 开始... (等待 2 秒)
[20:30:03] 任务A: ...结束!
[20:30:03] Main: 收到结果: 任务A 的任务完成
[20:30:03] Main: 执行完毕。
总耗时: 2.00 秒

在这个例子里,main 函数 awaitsay_hello,所以它必须等 say_hello 跑完才能继续。这还是串行的。

4. 核心用法:asyncio.gather() (实现并发)

asyncio 真正的威力在于并发执行。asyncio.gather(*coroutines) 就是用来实现这个目的的。它会接收一个或多个协程,并发地运行它们,并按顺序返回所有结果。

5. 【示例 2】并发的协程 (gather)

修改 main 函数,让它同时运行两个 say_hello 任务。

import asyncio
import time

# (say_hello 函数和上面一样,这里省略)
async def say_hello(delay, name):
    print(f"[{time.strftime('%X')}] {name}: 开始... (等待 {delay} 秒)")
    await asyncio.sleep(delay) 
    print(f"[{time.strftime('%X')}] {name}: ...结束!")
    return f"{name} 的任务完成"

async def main_concurrent():
    print(f"[{time.strftime('%X')}] Main: 开始执行 (并发模式)...")

    # 1. 创建协程对象
    coro_a = say_hello(2, "任务A")
    coro_b = say_hello(3, "任务B")

    # 2. 使用 asyncio.gather() 来“并发”运行它们
    print(f"[{time.strftime('%X')}] Main: 准备并发运行 A 和 B...")
    results = await asyncio.gather(coro_a, coro_b)
    
    print(f"[{time.strftime('%X')}] Main: A 和 B 都已完成。")
    print(f"[{time.strftime('%X')}] Main: 收到结果: {results}")

if __name__ == "__main__":
    start_time = time.time()
    asyncio.run(main_concurrent()) 
    end_time = time.time()
    print(f"总耗时: {end_time - start_time:.2f} 秒")

运行结果 (重点看时间!):

[20:35:01] Main: 开始执行 (并发模式)...
[20:35:01] Main: 准备并发运行 A 和 B...
[20:35:01] 任务A: 开始... (等待 2 秒)  <-- 任务A开始
[20:35:01] 任务B: 开始... (等待 3 秒)  <-- 任务B几乎同时开始
[20:35:03] 任务A: ...结束!            <-- 2秒后,A结束
[20:35:04] 任务B: ...结束!            <-- 3秒后,B结束
[20:35:04] Main: A 和 B 都已完成。
[20:35:04] Main: 收到结果: ['任务A 的任务完成', '任务B 的任务完成']
总耗时: 3.01 秒

分析:

两个任务并发执行,总耗时只取决于最长的那个任务(3秒)。如果是串行,总耗时会是 2 + 3 = 5 秒。

⚠️ 重要提醒:asyncio.sleep vs time.sleep

这是新手最容易犯的错误:

  • await asyncio.sleep(1): 非阻塞 (Non-blocking)

    • 它告诉事件循环:“我暂停1秒,请先去忙别的协程。” 此时CPU可以去执行其他任务。

  • time.sleep(1): 阻塞 (Blocking)

    • 它告诉整个线程:“所有人,原地暂停1秒。” time.sleep()冻结整个事件循环,导致所有并发任务都一起卡住。

async 函数中,永远不要使用 time.sleep()requests.get() 这类同步阻塞的I/O代码。必须使用异步库(如 asyncio.sleep()aiohttp.get())并配合 await

6. 【示例 3】真实I/O:使用 aiohttp 并发抓取网页

asyncio 配合 aiohttp(一个异步网络请求库)能真正展示其在I/O密集型任务上的优势。

准备工作:

pip install aiohttp

代码:

import asyncio
import aiohttp
import time

# "delay/1" 会让服务器等待1秒后再返回数据
URLS = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
]

async def fetch(session, url):
    """使用 aiohttp session 发起一个异步 GET 请求"""
    print(f"[{time.strftime('%X')}] 开始请求: {url}")
    try:
        # 关键:使用 aiohttp 的 session.get(),并用 await 挂起
        async with session.get(url) as response:
            data = await response.text()
            print(f"[{time.strftime('%X')}] 完成请求: {url}, 状态: {response.status}")
            return f"{url} - 响应长度: {len(data)}"
            
    except Exception as e:
        print(f"[{time.strftime('%X')}] 请求失败: {url}, 错误: {e}")
        return f"{url} - 请求失败"

async def main_aiohttp():
    """主协程,用于创建并并发执行所有抓取任务"""
    print(f"[{time.strftime('%X')}] Main: 开始执行 (aiohttp 并发模式)...")
    
    async with aiohttp.ClientSession() as session:
        # 1. 创建所有任务(协程对象)的列表
        tasks = []
        for url in URLS:
            tasks.append(fetch(session, url))
            
        # 2. 使用 asyncio.gather() 来并发运行所有任务
        print(f"[{time.strftime('%X')}] Main: 准备并发运行 {len(tasks)} 个抓取任务...")
        results = await asyncio.gather(*tasks)
        
        print(f"[{time.strftime('%X')}] Main: 所有任务已完成。")
        print("--- 运行结果 ---")
        for res in results:
            print(res)
        print("------------------")

if __name__ == "__main__":
    start_time = time.time()
    asyncio.run(main_aiohttp())
    end_time = time.time()
    print(f"总耗时: {end_time - start_time:.2f} 秒")

预期运行结果:

[10:01:15] Main: 开始执行 (aiohttp 并发模式)...
[10:01:15] Main: 准备并发运行 3 个抓取任务...
[10:01:15] 开始请求: https://httpbin.org/delay/1  <-- 任务1开始
[10:01:15] 开始请求: https://httpbin.org/delay/1  <-- 任务2开始
[10:01:15] 开始请求: https://httpbin.org/delay/1  <-- 任务3开始
[10:01:16] 完成请求: https://httpbin.org/delay/1, 状态: 200  <-- 约1秒后,任务1完成
[10:01:16] 完成请求: https://httpbin.org/delay/1, 状态: 200  <-- 任务2几乎同时完成
[10:01:16] 完成请求: https://httpbin.org/delay/1, 状态: 200  <-- 任务3几乎同时完成
[10:01:16] Main: 所有任务已完成。
--- 运行结果 ---
... (省略3行响应长度) ...
------------------
总耗时: 1.12 秒  <-- 关键点!

分析: 三个“耗时1秒”的网络请求,总耗时仅为 1.12 秒。这证明了 asyncio 把所有等待网络响应的时间都利用起来去处理其他任务了。

7. 高级并发:asyncio.create_task()

asyncio.gather() “安排”并“等待”所有任务。而 asyncio.create_task() 是一个更低级的接口,它只做一件事:立即将一个协程安排到事件循环上(“后台”执行),并返回一个 Task 对象。

不会自动 await 等待任务完成。这提供了极大的灵活性,因为可以将“启动”和“等待”分离。

8. 【示例 4】使用 create_task (灵活控制)

create_task 的真正威力在于“边运行边干活”。

import asyncio
import time

# (say_hello 函数和上面一样,这里省略)
async def say_hello(delay, name):
    print(f"[{time.strftime('%X')}] {name}: 开始... (等待 {delay} 秒)")
    await asyncio.sleep(delay) 
    print(f"[{time.strftime('%X')}] {name}: ...结束!")
    return f"{name} 的任务完成"

async def main_flexible():
    print(f"[{time.strftime('%X')}] Main: 正在创建任务...")
    # 1. 立即创建任务,任务会“在后台”开始执行
    task_a = asyncio.create_task(say_hello(2, "任务A"))
    task_b = asyncio.create_task(say_hello(3, "任务B"))
    
    # --- 灵活性的体现 ---
    print(f"[{time.strftime('%X')}] Main: 任务已在后台运行... 我先干点别的活 (等待1秒)")
    await asyncio.sleep(1) # 模拟 main 函数自己也在做一些I/O工作
    print(f"[{time.strftime('%X')}] Main: 我自己的活干完了。")
    # ----------------------

    # T=0s: A(2s) 和 B(3s) 开始
    # T=1s: Main 自己的活干完了。
    
    # 2. 现在,我们必须分别 await 它们来“收集”结果
    print(f"[{time.strftime('%X')}] Main: 开始等待 任务A...")
    result_a = await task_a 
    # T=2s: A 完成。 (Main 从 T=1s 开始等,只需再等 1s)

    print(f"[{time.strftime('%X')}] Main: 任务A 完成。开始等待 任务B...")
    result_b = await task_b
    # T=3s: B 完成。 (Main 从 T=2s 开始等,只需再等 1s)
    
    print(f"[{time.strftime('%X')}] Main: 任务B 完成。")
    print(f"[{time.strftime('%X')}] Main: 收到结果: {[result_a, result_b]}")

if __name__ == "__main__":
    start_time = time.time()
    asyncio.run(main_flexible())
    end_time = time.time()
    print(f"总耗时: {end_time - start_time:.2f} 秒") # 总耗时依然是 3.xx 秒

9. 总结:gather vs create_task

  1. asyncio.gather()

    • 用途:最常用的方式。用于“一次性启动一组任务,并等待它们全部完成”。

    • 优点:简洁,意图明确。自动收集所有结果并按顺序返回。

    • 用法results = await asyncio.gather(coro1, coro2, ...)

  2. asyncio.create_task()

    • 用途:需要更精细控制任务时使用。例如“启动一个后台任务”、“在等待A的同时做B”、“给某个任务设置超时”。

    • 优点:非常灵活。将“启动”和“等待”解耦。

    • 用法

      task1 = asyncio.create_task(coro1) # 立即安排执行
      task2 = asyncio.create_task(coro2) # 立即安排执行
      
      # ... 在这里可以做其他工作 ...
      
      result1 = await task1 # 稍后单独等待
      result2 = await task2 # 再单独等待
      

对于绝大多数“并发运行N个请求”的场景,asyncio.gather() 是更简单、更推荐的选择。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐