【Python 进阶之路】第2讲 | 异步编程深度解析:掌握 asyncio 的核心原理
摘要:Python异步编程与asyncio原理剖析 本文深入讲解了Python异步编程的核心概念和asyncio库的底层原理。主要内容包括: 同步与异步编程对比:通过"煮饭炒菜"的生活案例,展示了异步编程如何通过任务并行化大幅提升效率(从8秒降到5秒) asyncio事件循环机制:解释了事件循环作为"任务调度员"的工作原理,以及协程(Coroutine)与任
1. 说在前面:为什么异步编程是高并发时代的必修课?
兄弟们,在上一讲里我们聊了性能优化。但有一种性能问题,光靠算法优化是解决不了的——那就是等待。
想象一下这个场景:
- 你的爬虫要下载 1000 张图片,每张图片需要 1 秒。
- 同步下载:1000 秒 = 16 分钟。
- 异步下载:可能只需要 10 秒。
为什么差距这么大?因为下载图片的大部分时间都在"等网络响应",CPU 是闲着的。异步编程就是让 CPU 在等待的时候去干别的事,把"串行等待"变成"并行等待"。
这一讲,咱们深入剖析 asyncio 的底层原理,让你真正理解异步编程的精髓。
2. 同步 vs 异步:用生活案例理解
2.1 同步模式:排队打饭
import time
def cook_rice():
print("开始煮饭...")
time.sleep(3)
print("饭煮好了!")
def cook_dish():
print("开始炒菜...")
time.sleep(5)
print("菜炒好了!")
def main_sync():
start = time.time()
cook_rice()
cook_dish()
print(f"总耗时:{time.time() - start:.1f}秒")
main_sync()
输出:
开始煮饭...
饭煮好了!
开始炒菜...
菜炒好了!
总耗时:8.0秒
问题:煮饭的时候,你就傻站着等,完全可以在等饭熟的时候去炒菜啊!
2.2 异步模式:统筹安排
import asyncio
import time
async def cook_rice():
print("开始煮饭...")
await asyncio.sleep(3)
print("饭煮好了!")
async def cook_dish():
print("开始炒菜...")
await asyncio.sleep(5)
print("菜炒好了!")
async def main_async():
start = time.time()
await asyncio.gather(cook_rice(), cook_dish())
print(f"总耗时:{time.time() - start:.1f}秒")
asyncio.run(main_async())
输出:
开始煮饭...
开始炒菜...
饭煮好了!
菜炒好了!
总耗时:5.0秒
效果:总耗时从 8 秒变成 5 秒!因为在等饭熟的时候,我们同时开始炒菜了。
一句话总结:同步是"做完一件事再做下一件",异步是"启动一件事,不等它完成就去做下一件"。
3. asyncio 事件循环:异步编程的心脏
3.1 什么是事件循环?
事件循环(Event Loop)是 asyncio 的核心。你可以把它想象成一个任务调度员:
- 它维护着一个任务队列。
- 每次从队列里取出一个任务执行。
- 如果任务遇到
await,就暂停它,把控制权交还给事件循环。 - 事件循环继续执行下一个任务。
- 当被暂停的任务等待完成(比如网络请求返回了),事件循环会恢复它的执行。
打个比方:事件循环就像一个餐厅的服务员,同时服务多桌客人。他不会傻站在一桌等客人点菜,而是轮流服务每桌,谁准备好了就服务谁。
3.2 手动操控事件循环
import asyncio
async def say_hello():
print("Hello")
await asyncio.sleep(1)
print("World")
async def main():
loop = asyncio.get_event_loop()
print(f"事件循环:{loop}")
task = loop.create_task(say_hello())
print(f"任务创建:{task}")
await task
print(f"任务完成:{task}")
asyncio.run(main())
输出:
事件循环:<ProactorEventLoop running=True closed=False debug=False>
任务创建:<Task pending name='Task-2' coro=<say_hello() running at test.py:4>>
Hello
World
任务完成:<Task finished name='Task-2' coro=<say_hello() done, defined at test.py:4> result=None>
3.3 Task vs Coroutine:别搞混了
协程(Coroutine):用 async def 定义的函数,调用它返回的是一个协程对象。
async def my_coro():
await asyncio.sleep(1)
return "完成"
coro = my_coro()
print(type(coro))
任务(Task):协程的"包装器",可以被事件循环调度。
task = asyncio.create_task(my_coro())
print(type(task))
关键区别:
- 协程:只是定义了一个异步操作,不会自动执行。
- 任务:把协程包装起来,扔进事件循环,让它开始执行。
4. async/await 底层原理
4.1 协程的本质:生成器的进化
Python 的协程是基于生成器实现的。在 Python 3.5 之前,协程是用 yield from 实现的:
def old_coro():
yield from asyncio.sleep(1)
return "完成"
Python 3.5 引入了 async/await 语法糖,让代码更清晰:
async def new_coro():
await asyncio.sleep(1)
return "完成"
底层机制:
async def定义的函数,调用时返回一个协程对象。- 协程对象实现了
__await__方法。 await实际上是在调用__await__方法,并使用迭代器协议获取结果。
4.2 可等待对象(Awaitable)
在 Python 中,可以被 await 的对象叫做"可等待对象"。有三种:
- 协程(Coroutine):
async def定义的函数。 - 任务(Task):
asyncio.create_task()创建的对象。 - Future:代表一个未来的结果,是 Task 的父类。
import asyncio
async def demo_awaitable():
coro = asyncio.sleep(1)
print(f"sleep 返回的是协程:{type(coro)}")
task = asyncio.create_task(asyncio.sleep(1))
print(f"create_task 返回的是任务:{type(task)}")
future = asyncio.Future()
future.set_result("我是 Future")
print(f"Future 的结果:{await future}")
asyncio.run(demo_awaitable())
4.3 自己实现一个可等待对象
import asyncio
class MyAwaitable:
def __init__(self, value):
self.value = value
def __await__(self):
async def inner():
await asyncio.sleep(0.1)
return self.value
return inner().__await__()
async def main():
result = await MyAwaitable("自定义可等待对象")
print(result)
asyncio.run(main())
5. 异步数据库操作
在实际项目中,数据库操作往往是性能瓶颈。使用异步数据库驱动可以大幅提升并发性能。
5.1 异步 MySQL:aiomysql
首先安装:
pip install aiomysql
基本用法:
import asyncio
import aiomysql
async def query_user(user_id: int):
conn = await aiomysql.connect(
host='localhost',
port=3306,
user='root',
password='password',
db='test_db',
charset='utf8mb4'
)
async with conn.cursor() as cur:
await cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
result = await cur.fetchone()
print(f"查询结果:{result}")
conn.close()
asyncio.run(query_user(1))
5.2 连接池:复用连接,提升性能
每次建立数据库连接都有开销,使用连接池可以复用连接:
import asyncio
import aiomysql
async def create_pool():
pool = await aiomysql.create_pool(
host='localhost',
port=3306,
user='root',
password='password',
db='test_db',
minsize=5,
maxsize=20
)
return pool
async def query_with_pool(pool, user_id: int):
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
return await cur.fetchone()
async def batch_query():
pool = await create_pool()
tasks = [query_with_pool(pool, i) for i in range(1, 101)]
results = await asyncio.gather(*tasks)
print(f"查询了 {len(results)} 条记录")
pool.close()
await pool.wait_closed()
asyncio.run(batch_query())
5.3 异步 PostgreSQL:asyncpg
PostgreSQL 用户推荐使用 asyncpg,性能更优:
pip install asyncpg
import asyncio
import asyncpg
async def query_postgres():
conn = await asyncpg.connect(
host='localhost',
port=5432,
user='postgres',
password='password',
database='test_db'
)
rows = await conn.fetch('SELECT * FROM users WHERE id = $1', 1)
for row in rows:
print(row)
await conn.close()
asyncio.run(query_postgres())
6. aiohttp:高并发爬虫实战
aiohttp 是 Python 最流行的异步 HTTP 客户端,非常适合写高并发爬虫。
6.1 基本用法
pip install aiohttp
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch_url(session, 'https://httpbin.org/get')
print(html[:200])
asyncio.run(main())
6.2 并发爬取多个 URL
import asyncio
import aiohttp
import time
async def fetch_with_retry(session, url, max_retries=3):
for attempt in range(max_retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
return await resp.text()
else:
print(f"状态码错误:{resp.status}")
except Exception as e:
print(f"第 {attempt + 1} 次尝试失败:{e}")
await asyncio.sleep(1)
return None
async def crawl_batch(urls):
connector = aiohttp.TCPConnector(limit=10)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [fetch_with_retry(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
success = sum(1 for r in results if r and not isinstance(r, Exception))
print(f"成功:{success}/{len(urls)}")
return results
async def main():
urls = [f"https://httpbin.org/delay/{i % 3}" for i in range(20)]
start = time.time()
results = await crawl_batch(urls)
print(f"总耗时:{time.time() - start:.2f}秒")
asyncio.run(main())
6.3 限制并发数:避免被封 IP
爬虫最怕的就是并发太高被封 IP。我们可以使用 asyncio.Semaphore 来限制并发数:
import asyncio
import aiohttp
class AsyncCrawler:
def __init__(self, max_concurrent=5):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
async def fetch(self, url):
async with self.semaphore:
async with self.session.get(url) as resp:
return await resp.text()
async def crawl(self, urls):
self.session = aiohttp.ClientSession()
try:
tasks = [self.fetch(url) for url in urls]
return await asyncio.gather(*tasks)
finally:
await self.session.close()
async def main():
crawler = AsyncCrawler(max_concurrent=3)
urls = [f"https://httpbin.org/get?id={i}" for i in range(10)]
results = await crawler.crawl(urls)
print(f"爬取了 {len(results)} 个页面")
asyncio.run(main())
7. 异步上下文管理器与异步迭代器
7.1 异步上下文管理器
普通的上下文管理器使用 __enter__ 和 __exit__,异步版本使用 __aenter__ 和 __aexit__:
import asyncio
class AsyncTimer:
def __init__(self, name):
self.name = name
async def __aenter__(self):
self.start = asyncio.get_event_loop().time()
print(f"[{self.name}] 开始计时...")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
elapsed = asyncio.get_event_loop().time() - self.start
print(f"[{self.name}] 耗时:{elapsed:.3f}秒")
return False
async def main():
async with AsyncTimer("测试任务"):
await asyncio.sleep(1)
print("任务执行中...")
asyncio.run(main())
输出:
[测试任务] 开始计时...
任务执行中...
[测试任务] 耗时:1.003秒
7.2 异步迭代器
异步迭代器允许你在 async for 循环中异步获取数据:
import asyncio
class AsyncRange:
def __init__(self, count):
self.count = count
def __aiter__(self):
self.current = 0
return self
async def __anext__(self):
if self.current >= self.count:
raise StopAsyncIteration
await asyncio.sleep(0.1)
value = self.current
self.current += 1
return value
async def main():
async for num in AsyncRange(5):
print(f"获取到:{num}")
asyncio.run(main())
7.3 异步生成器
异步生成器使用 async def 定义,用 yield 返回值:
import asyncio
async def async_countdown(n):
while n > 0:
await asyncio.sleep(0.5)
yield n
n -= 1
async def main():
async for num in async_countdown(5):
print(f"倒计时:{num}")
print("发射!")
asyncio.run(main())
8. 异步编程常见陷阱
8.1 陷阱一:在异步函数中使用同步阻塞操作
错误示范:
import asyncio
import time
async def bad_example():
print("开始...")
time.sleep(3)
print("结束...")
async def main():
await asyncio.gather(bad_example(), bad_example())
asyncio.run(main())
问题:time.sleep(3) 会阻塞整个事件循环,两个任务实际上是串行执行的。
正确做法:
async def good_example():
print("开始...")
await asyncio.sleep(3)
print("结束...")
8.2 陷阱二:忘记 await
错误示范:
async def fetch_data():
await asyncio.sleep(1)
return "数据"
async def main():
result = fetch_data()
print(result)
输出:
<coroutine object fetch_data at 0x...>
问题:忘记 await,得到的是协程对象,而不是结果。
正确做法:
async def main():
result = await fetch_data()
print(result)
8.3 陷阱三:异常处理不当
import asyncio
async def may_fail():
raise ValueError("出错了!")
async def main():
try:
await may_fail()
except ValueError as e:
print(f"捕获到异常:{e}")
asyncio.run(main())
注意:如果使用 asyncio.gather(),需要设置 return_exceptions=True 来避免一个任务失败导致所有任务失败:
async def main():
results = await asyncio.gather(
may_fail(),
asyncio.sleep(1),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务 {i} 失败:{result}")
else:
print(f"任务 {i} 成功")
9. 综合实战:异步 API 服务
咱们来写一个完整的异步 API 服务,整合数据库、缓存和外部 API 调用。
import asyncio
import aiohttp
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Optional
import time
app = FastAPI()
class UserResponse(BaseModel):
user_id: int
name: str
email: str
orders_count: int
external_data: Optional[dict] = None
mock_db = {
1: {"name": "张三", "email": "zhangsan@example.com"},
2: {"name": "李四", "email": "lisi@example.com"},
}
mock_orders = {
1: [{"id": 1, "amount": 100}, {"id": 2, "amount": 200}],
2: [{"id": 3, "amount": 300}],
}
async def get_user_from_db(user_id: int):
await asyncio.sleep(0.1)
return mock_db.get(user_id)
async def get_orders_count(user_id: int):
await asyncio.sleep(0.1)
orders = mock_orders.get(user_id, [])
return len(orders)
async def get_external_data(user_id: int):
async with aiohttp.ClientSession() as session:
try:
async with session.get(
f"https://httpbin.org/get?user_id={user_id}",
timeout=aiohttp.ClientTimeout(total=2)
) as resp:
if resp.status == 200:
return await resp.json()
except Exception as e:
print(f"外部 API 调用失败:{e}")
return None
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
start = time.time()
user_info, orders_count, external_data = await asyncio.gather(
get_user_from_db(user_id),
get_orders_count(user_id),
get_external_data(user_id)
)
if not user_info:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail="用户不存在")
elapsed = time.time() - start
print(f"请求耗时:{elapsed:.3f}秒")
return UserResponse(
user_id=user_id,
name=user_info["name"],
email=user_info["email"],
orders_count=orders_count,
external_data=external_data
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
亮点:
- 三个数据源(数据库、订单、外部 API)并发获取,总耗时约等于最慢的那个。
- 使用
asyncio.gather()实现真正的并行等待。 - 异常处理完善,外部 API 失败不影响主流程。
10. 💡 避坑小贴士(老司机的私房话)
- 别在异步代码里用
time.sleep():它会阻塞整个事件循环,用asyncio.sleep()代替。 - CPU 密集型任务不适合 asyncio:asyncio 解决的是 I/O 等待问题。如果你的任务是计算密集型,应该用多进程(
multiprocessing)。 - 善用
asyncio.gather()和asyncio.create_task():前者是等待多个任务完成,后者是启动一个后台任务。 - 注意连接池大小:数据库和 HTTP 连接池不是越大越好,要根据实际情况调整。
- 调试异步代码:使用
asyncio.run(main(), debug=True)可以开启调试模式,帮助定位问题。
11. 实战演练:巩固你的内功
题目 1:异步下载器
需求:
编写一个异步下载器,下载 10 张图片(用 https://httpbin.org/image 模拟)。
限制并发数为 3。
显示每个下载任务的开始和结束时间。
import asyncio
import aiohttp
import time
async def download_image(session, semaphore, index):
async with semaphore:
print(f"[{time.strftime('%H:%M:%S')}] 开始下载图片 {index}")
async with session.get("https://httpbin.org/image") as resp:
content = await resp.read()
print(f"[{time.strftime('%H:%M:%S')}] 图片 {index} 下载完成,大小:{len(content)} 字节")
return content
async def main():
semaphore = asyncio.Semaphore(3)
async with aiohttp.ClientSession() as session:
tasks = [download_image(session, semaphore, i) for i in range(10)]
results = await asyncio.gather(*tasks)
print(f"总共下载了 {len(results)} 张图片")
asyncio.run(main())
题目 2:异步超时处理
需求:
编写一个异步函数,模拟调用一个可能超时的 API。
设置 2 秒超时,如果超时则返回默认值。
使用 asyncio.wait_for() 实现。
import asyncio
async def slow_api():
await asyncio.sleep(5)
return "API 响应"
async def call_api_with_timeout():
try:
result = await asyncio.wait_for(slow_api(), timeout=2)
return result
except asyncio.TimeoutError:
return "默认值(超时)"
async def main():
result = await call_api_with_timeout()
print(f"结果:{result}")
asyncio.run(main())
题目 3:异步生产者-消费者模式
需求:
实现一个异步的生产者-消费者模型。
生产者向队列中放入 10 个任务。
两个消费者从队列中取出任务并处理。
使用 asyncio.Queue 实现。
import asyncio
import random
async def producer(queue, producer_id):
for i in range(5):
item = f"生产者{producer_id}-任务{i}"
await queue.put(item)
print(f"生产:{item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
async def consumer(queue, consumer_id):
while True:
item = await queue.get()
print(f" 消费者{consumer_id} 处理:{item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
producers = [producer(queue, i) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(2)]
await asyncio.gather(*producers)
await queue.join()
for c in consumers:
c.cancel()
print("所有任务完成")
asyncio.run(main())
12. 系列索引
写在最后:
异步编程是 Python 高并发开发的核心技能。掌握了asyncio,你就能写出处理成千上万并发请求的高性能服务。
记住:异步不是万能的,它解决的是 I/O 等待问题。选对场景,才能发挥它的威力。
如果觉得有收获,点赞、收藏、关注! 咱们下一讲聊聊分布式系统!
更多推荐


所有评论(0)