1. 说在前面:为什么异步编程是高并发时代的必修课?

兄弟们,在上一讲里我们聊了性能优化。但有一种性能问题,光靠算法优化是解决不了的——那就是等待

想象一下这个场景:

  • 你的爬虫要下载 1000 张图片,每张图片需要 1 秒。
  • 同步下载:1000 秒 = 16 分钟。
  • 异步下载:可能只需要 10 秒。

为什么差距这么大?因为下载图片的大部分时间都在"等网络响应",CPU 是闲着的。异步编程就是让 CPU 在等待的时候去干别的事,把"串行等待"变成"并行等待"。

这一讲,咱们深入剖析 asyncio 的底层原理,让你真正理解异步编程的精髓。


2. 同步 vs 异步:用生活案例理解

2.1 同步模式:排队打饭

import time

def cook_rice():
    print("开始煮饭...")
    time.sleep(3)
    print("饭煮好了!")

def cook_dish():
    print("开始炒菜...")
    time.sleep(5)
    print("菜炒好了!")

def main_sync():
    start = time.time()
    cook_rice()
    cook_dish()
    print(f"总耗时:{time.time() - start:.1f}秒")

main_sync()

输出:

开始煮饭...
饭煮好了!
开始炒菜...
菜炒好了!
总耗时:8.0秒

问题:煮饭的时候,你就傻站着等,完全可以在等饭熟的时候去炒菜啊!

2.2 异步模式:统筹安排

import asyncio
import time

async def cook_rice():
    print("开始煮饭...")
    await asyncio.sleep(3)
    print("饭煮好了!")

async def cook_dish():
    print("开始炒菜...")
    await asyncio.sleep(5)
    print("菜炒好了!")

async def main_async():
    start = time.time()
    await asyncio.gather(cook_rice(), cook_dish())
    print(f"总耗时:{time.time() - start:.1f}秒")

asyncio.run(main_async())

输出:

开始煮饭...
开始炒菜...
饭煮好了!
菜炒好了!
总耗时:5.0秒

效果:总耗时从 8 秒变成 5 秒!因为在等饭熟的时候,我们同时开始炒菜了。

一句话总结:同步是"做完一件事再做下一件",异步是"启动一件事,不等它完成就去做下一件"。


3. asyncio 事件循环:异步编程的心脏

3.1 什么是事件循环?

事件循环(Event Loop)是 asyncio 的核心。你可以把它想象成一个任务调度员

  1. 它维护着一个任务队列。
  2. 每次从队列里取出一个任务执行。
  3. 如果任务遇到 await,就暂停它,把控制权交还给事件循环。
  4. 事件循环继续执行下一个任务。
  5. 当被暂停的任务等待完成(比如网络请求返回了),事件循环会恢复它的执行。

打个比方:事件循环就像一个餐厅的服务员,同时服务多桌客人。他不会傻站在一桌等客人点菜,而是轮流服务每桌,谁准备好了就服务谁。

3.2 手动操控事件循环

import asyncio

async def say_hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

async def main():
    loop = asyncio.get_event_loop()
    print(f"事件循环:{loop}")
  
    task = loop.create_task(say_hello())
    print(f"任务创建:{task}")
  
    await task
    print(f"任务完成:{task}")

asyncio.run(main())

输出:

事件循环:<ProactorEventLoop running=True closed=False debug=False>
任务创建:<Task pending name='Task-2' coro=<say_hello() running at test.py:4>>
Hello
World
任务完成:<Task finished name='Task-2' coro=<say_hello() done, defined at test.py:4> result=None>

3.3 Task vs Coroutine:别搞混了

协程(Coroutine):用 async def 定义的函数,调用它返回的是一个协程对象。

async def my_coro():
    await asyncio.sleep(1)
    return "完成"

coro = my_coro()
print(type(coro))

任务(Task):协程的"包装器",可以被事件循环调度。

task = asyncio.create_task(my_coro())
print(type(task))

关键区别

  • 协程:只是定义了一个异步操作,不会自动执行。
  • 任务:把协程包装起来,扔进事件循环,让它开始执行。

4. async/await 底层原理

4.1 协程的本质:生成器的进化

Python 的协程是基于生成器实现的。在 Python 3.5 之前,协程是用 yield from 实现的:

def old_coro():
    yield from asyncio.sleep(1)
    return "完成"

Python 3.5 引入了 async/await 语法糖,让代码更清晰:

async def new_coro():
    await asyncio.sleep(1)
    return "完成"

底层机制

  • async def 定义的函数,调用时返回一个协程对象。
  • 协程对象实现了 __await__ 方法。
  • await 实际上是在调用 __await__ 方法,并使用迭代器协议获取结果。

4.2 可等待对象(Awaitable)

在 Python 中,可以被 await 的对象叫做"可等待对象"。有三种:

  1. 协程(Coroutine)async def 定义的函数。
  2. 任务(Task)asyncio.create_task() 创建的对象。
  3. Future:代表一个未来的结果,是 Task 的父类。
import asyncio

async def demo_awaitable():
    coro = asyncio.sleep(1)
    print(f"sleep 返回的是协程:{type(coro)}")
  
    task = asyncio.create_task(asyncio.sleep(1))
    print(f"create_task 返回的是任务:{type(task)}")
  
    future = asyncio.Future()
    future.set_result("我是 Future")
    print(f"Future 的结果:{await future}")

asyncio.run(demo_awaitable())

4.3 自己实现一个可等待对象

import asyncio

class MyAwaitable:
    def __init__(self, value):
        self.value = value
  
    def __await__(self):
        async def inner():
            await asyncio.sleep(0.1)
            return self.value
      
        return inner().__await__()

async def main():
    result = await MyAwaitable("自定义可等待对象")
    print(result)

asyncio.run(main())

5. 异步数据库操作

在实际项目中,数据库操作往往是性能瓶颈。使用异步数据库驱动可以大幅提升并发性能。

5.1 异步 MySQL:aiomysql

首先安装:

pip install aiomysql

基本用法:

import asyncio
import aiomysql

async def query_user(user_id: int):
    conn = await aiomysql.connect(
        host='localhost',
        port=3306,
        user='root',
        password='password',
        db='test_db',
        charset='utf8mb4'
    )
  
    async with conn.cursor() as cur:
        await cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        result = await cur.fetchone()
        print(f"查询结果:{result}")
  
    conn.close()

asyncio.run(query_user(1))

5.2 连接池:复用连接,提升性能

每次建立数据库连接都有开销,使用连接池可以复用连接:

import asyncio
import aiomysql

async def create_pool():
    pool = await aiomysql.create_pool(
        host='localhost',
        port=3306,
        user='root',
        password='password',
        db='test_db',
        minsize=5,
        maxsize=20
    )
    return pool

async def query_with_pool(pool, user_id: int):
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
            return await cur.fetchone()

async def batch_query():
    pool = await create_pool()
  
    tasks = [query_with_pool(pool, i) for i in range(1, 101)]
    results = await asyncio.gather(*tasks)
  
    print(f"查询了 {len(results)} 条记录")
    pool.close()
    await pool.wait_closed()

asyncio.run(batch_query())

5.3 异步 PostgreSQL:asyncpg

PostgreSQL 用户推荐使用 asyncpg,性能更优:

pip install asyncpg
import asyncio
import asyncpg

async def query_postgres():
    conn = await asyncpg.connect(
        host='localhost',
        port=5432,
        user='postgres',
        password='password',
        database='test_db'
    )
  
    rows = await conn.fetch('SELECT * FROM users WHERE id = $1', 1)
    for row in rows:
        print(row)
  
    await conn.close()

asyncio.run(query_postgres())

6. aiohttp:高并发爬虫实战

aiohttp 是 Python 最流行的异步 HTTP 客户端,非常适合写高并发爬虫。

6.1 基本用法

pip install aiohttp
import asyncio
import aiohttp
import time

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch_url(session, 'https://httpbin.org/get')
        print(html[:200])

asyncio.run(main())

6.2 并发爬取多个 URL

import asyncio
import aiohttp
import time

async def fetch_with_retry(session, url, max_retries=3):
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                if resp.status == 200:
                    return await resp.text()
                else:
                    print(f"状态码错误:{resp.status}")
        except Exception as e:
            print(f"第 {attempt + 1} 次尝试失败:{e}")
            await asyncio.sleep(1)
    return None

async def crawl_batch(urls):
    connector = aiohttp.TCPConnector(limit=10)
  
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [fetch_with_retry(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
      
        success = sum(1 for r in results if r and not isinstance(r, Exception))
        print(f"成功:{success}/{len(urls)}")
      
        return results

async def main():
    urls = [f"https://httpbin.org/delay/{i % 3}" for i in range(20)]
  
    start = time.time()
    results = await crawl_batch(urls)
    print(f"总耗时:{time.time() - start:.2f}秒")

asyncio.run(main())

6.3 限制并发数:避免被封 IP

爬虫最怕的就是并发太高被封 IP。我们可以使用 asyncio.Semaphore 来限制并发数:

import asyncio
import aiohttp

class AsyncCrawler:
    def __init__(self, max_concurrent=5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None
  
    async def fetch(self, url):
        async with self.semaphore:
            async with self.session.get(url) as resp:
                return await resp.text()
  
    async def crawl(self, urls):
        self.session = aiohttp.ClientSession()
        try:
            tasks = [self.fetch(url) for url in urls]
            return await asyncio.gather(*tasks)
        finally:
            await self.session.close()

async def main():
    crawler = AsyncCrawler(max_concurrent=3)
    urls = [f"https://httpbin.org/get?id={i}" for i in range(10)]
  
    results = await crawler.crawl(urls)
    print(f"爬取了 {len(results)} 个页面")

asyncio.run(main())

7. 异步上下文管理器与异步迭代器

7.1 异步上下文管理器

普通的上下文管理器使用 __enter____exit__,异步版本使用 __aenter____aexit__

import asyncio

class AsyncTimer:
    def __init__(self, name):
        self.name = name
  
    async def __aenter__(self):
        self.start = asyncio.get_event_loop().time()
        print(f"[{self.name}] 开始计时...")
        return self
  
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        elapsed = asyncio.get_event_loop().time() - self.start
        print(f"[{self.name}] 耗时:{elapsed:.3f}秒")
        return False

async def main():
    async with AsyncTimer("测试任务"):
        await asyncio.sleep(1)
        print("任务执行中...")

asyncio.run(main())

输出:

[测试任务] 开始计时...
任务执行中...
[测试任务] 耗时:1.003秒

7.2 异步迭代器

异步迭代器允许你在 async for 循环中异步获取数据:

import asyncio

class AsyncRange:
    def __init__(self, count):
        self.count = count
  
    def __aiter__(self):
        self.current = 0
        return self
  
    async def __anext__(self):
        if self.current >= self.count:
            raise StopAsyncIteration
      
        await asyncio.sleep(0.1)
        value = self.current
        self.current += 1
        return value

async def main():
    async for num in AsyncRange(5):
        print(f"获取到:{num}")

asyncio.run(main())

7.3 异步生成器

异步生成器使用 async def 定义,用 yield 返回值:

import asyncio

async def async_countdown(n):
    while n > 0:
        await asyncio.sleep(0.5)
        yield n
        n -= 1

async def main():
    async for num in async_countdown(5):
        print(f"倒计时:{num}")
    print("发射!")

asyncio.run(main())

8. 异步编程常见陷阱

8.1 陷阱一:在异步函数中使用同步阻塞操作

错误示范

import asyncio
import time

async def bad_example():
    print("开始...")
    time.sleep(3)
    print("结束...")

async def main():
    await asyncio.gather(bad_example(), bad_example())

asyncio.run(main())

问题time.sleep(3) 会阻塞整个事件循环,两个任务实际上是串行执行的。

正确做法

async def good_example():
    print("开始...")
    await asyncio.sleep(3)
    print("结束...")

8.2 陷阱二:忘记 await

错误示范

async def fetch_data():
    await asyncio.sleep(1)
    return "数据"

async def main():
    result = fetch_data()
    print(result)

输出:

<coroutine object fetch_data at 0x...>

问题:忘记 await,得到的是协程对象,而不是结果。

正确做法

async def main():
    result = await fetch_data()
    print(result)

8.3 陷阱三:异常处理不当

import asyncio

async def may_fail():
    raise ValueError("出错了!")

async def main():
    try:
        await may_fail()
    except ValueError as e:
        print(f"捕获到异常:{e}")

asyncio.run(main())

注意:如果使用 asyncio.gather(),需要设置 return_exceptions=True 来避免一个任务失败导致所有任务失败:

async def main():
    results = await asyncio.gather(
        may_fail(),
        asyncio.sleep(1),
        return_exceptions=True
    )
  
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"任务 {i} 失败:{result}")
        else:
            print(f"任务 {i} 成功")

9. 综合实战:异步 API 服务

咱们来写一个完整的异步 API 服务,整合数据库、缓存和外部 API 调用。

import asyncio
import aiohttp
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Optional
import time

app = FastAPI()

class UserResponse(BaseModel):
    user_id: int
    name: str
    email: str
    orders_count: int
    external_data: Optional[dict] = None

mock_db = {
    1: {"name": "张三", "email": "zhangsan@example.com"},
    2: {"name": "李四", "email": "lisi@example.com"},
}

mock_orders = {
    1: [{"id": 1, "amount": 100}, {"id": 2, "amount": 200}],
    2: [{"id": 3, "amount": 300}],
}

async def get_user_from_db(user_id: int):
    await asyncio.sleep(0.1)
    return mock_db.get(user_id)

async def get_orders_count(user_id: int):
    await asyncio.sleep(0.1)
    orders = mock_orders.get(user_id, [])
    return len(orders)

async def get_external_data(user_id: int):
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(
                f"https://httpbin.org/get?user_id={user_id}",
                timeout=aiohttp.ClientTimeout(total=2)
            ) as resp:
                if resp.status == 200:
                    return await resp.json()
        except Exception as e:
            print(f"外部 API 调用失败:{e}")
    return None

@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
    start = time.time()
  
    user_info, orders_count, external_data = await asyncio.gather(
        get_user_from_db(user_id),
        get_orders_count(user_id),
        get_external_data(user_id)
    )
  
    if not user_info:
        from fastapi import HTTPException
        raise HTTPException(status_code=404, detail="用户不存在")
  
    elapsed = time.time() - start
    print(f"请求耗时:{elapsed:.3f}秒")
  
    return UserResponse(
        user_id=user_id,
        name=user_info["name"],
        email=user_info["email"],
        orders_count=orders_count,
        external_data=external_data
    )

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

亮点

  • 三个数据源(数据库、订单、外部 API)并发获取,总耗时约等于最慢的那个。
  • 使用 asyncio.gather() 实现真正的并行等待。
  • 异常处理完善,外部 API 失败不影响主流程。

10. 💡 避坑小贴士(老司机的私房话)

  1. 别在异步代码里用 time.sleep():它会阻塞整个事件循环,用 asyncio.sleep() 代替。
  2. CPU 密集型任务不适合 asyncio:asyncio 解决的是 I/O 等待问题。如果你的任务是计算密集型,应该用多进程(multiprocessing)。
  3. 善用 asyncio.gather()asyncio.create_task():前者是等待多个任务完成,后者是启动一个后台任务。
  4. 注意连接池大小:数据库和 HTTP 连接池不是越大越好,要根据实际情况调整。
  5. 调试异步代码:使用 asyncio.run(main(), debug=True) 可以开启调试模式,帮助定位问题。

11. 实战演练:巩固你的内功

题目 1:异步下载器

需求
编写一个异步下载器,下载 10 张图片(用 https://httpbin.org/image 模拟)。
限制并发数为 3。
显示每个下载任务的开始和结束时间。

点击查看参考答案
import asyncio
import aiohttp
import time

async def download_image(session, semaphore, index):
    async with semaphore:
        print(f"[{time.strftime('%H:%M:%S')}] 开始下载图片 {index}")
        async with session.get("https://httpbin.org/image") as resp:
            content = await resp.read()
            print(f"[{time.strftime('%H:%M:%S')}] 图片 {index} 下载完成,大小:{len(content)} 字节")
            return content

async def main():
    semaphore = asyncio.Semaphore(3)
  
    async with aiohttp.ClientSession() as session:
        tasks = [download_image(session, semaphore, i) for i in range(10)]
        results = await asyncio.gather(*tasks)
        print(f"总共下载了 {len(results)} 张图片")

asyncio.run(main())

题目 2:异步超时处理

需求
编写一个异步函数,模拟调用一个可能超时的 API。
设置 2 秒超时,如果超时则返回默认值。
使用 asyncio.wait_for() 实现。

点击查看参考答案
import asyncio

async def slow_api():
    await asyncio.sleep(5)
    return "API 响应"

async def call_api_with_timeout():
    try:
        result = await asyncio.wait_for(slow_api(), timeout=2)
        return result
    except asyncio.TimeoutError:
        return "默认值(超时)"

async def main():
    result = await call_api_with_timeout()
    print(f"结果:{result}")

asyncio.run(main())

题目 3:异步生产者-消费者模式

需求
实现一个异步的生产者-消费者模型。
生产者向队列中放入 10 个任务。
两个消费者从队列中取出任务并处理。
使用 asyncio.Queue 实现。

点击查看参考答案
import asyncio
import random

async def producer(queue, producer_id):
    for i in range(5):
        item = f"生产者{producer_id}-任务{i}"
        await queue.put(item)
        print(f"生产:{item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))

async def consumer(queue, consumer_id):
    while True:
        item = await queue.get()
        print(f"  消费者{consumer_id} 处理:{item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)
  
    producers = [producer(queue, i) for i in range(2)]
    consumers = [asyncio.create_task(consumer(queue, i)) for i in range(2)]
  
    await asyncio.gather(*producers)
    await queue.join()
  
    for c in consumers:
        c.cancel()
  
    print("所有任务完成")

asyncio.run(main())

12. 系列索引


写在最后
异步编程是 Python 高并发开发的核心技能。掌握了 asyncio,你就能写出处理成千上万并发请求的高性能服务。
记住:异步不是万能的,它解决的是 I/O 等待问题。选对场景,才能发挥它的威力。
如果觉得有收获,点赞、收藏、关注! 咱们下一讲聊聊分布式系统!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐