Python协程完全指南:掌握async/await异步编程
本文全面介绍了Python协程的最新语法和应用,从基础的async/await语法到高级的并发模式,涵盖了事件循环、任务管理、异步上下文管理器等核心概念。通过丰富的实战示例,展示了协程在Web爬虫、数据处理、实时通信等场景的应用。文章还提供了性能优化和调试技巧,帮助开发者编写高效、健壮的异步代码。
·
一、协程基础与核心概念
1.1 什么是协程
协程(Coroutine)是Python中实现并发编程的轻量级方案,它允许在单个线程内通过协作式多任务处理来执行多个任务。与多线程和多进程不同,协程的切换由程序自身控制,避免了上下文切换的开销,特别适合I/O密集型应用。
Python 3.5+引入了原生的async/await
语法,使协程编程变得更加直观和易于使用。协程本质上是一种特殊的函数,可以在执行过程中暂停和恢复,同时保持局部状态。
1.2 协程与线程的对比
二、async/await语法详解
2.1 定义异步函数
使用async def
定义异步函数,这类函数返回协程对象而不是直接执行:
import asyncio
async def simple_coroutine():
"""简单的协程示例"""
print("开始执行协程")
await asyncio.sleep(1) # 模拟异步操作
print("协程执行完毕")
return "完成"
# 调用协程
result = asyncio.run(simple_coroutine())
print(f"结果: {result}")
2.2 await表达式
await
关键字用于挂起当前协程的执行,等待异步操作完成:
async def complex_example():
print("步骤1")
await asyncio.sleep(0.5) # 等待0.5秒
print("步骤2")
result = await some_async_operation() # 等待其他异步操作
print("步骤3")
return result
2.3 异步函数调用链
协程可以调用其他协程,形成调用链:
async def inner_coroutine(x):
await asyncio.sleep(0.1)
return x * 2
async def middle_coroutine(y):
result = await inner_coroutine(y)
return result + 10
async def outer_coroutine(z):
final_result = await middle_coroutine(z)
return final_result * 3
# 执行调用链
result = asyncio.run(outer_coroutine(5))
print(f"最终结果: {result}") # 输出: (5 * 2+10)*3 = 60
三、事件循环(Event Loop)详解
3.1 事件循环的概念
事件循环是asyncio
的核心,负责调度和执行协程任务:
import asyncio
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
print(f"事件循环: {loop}")
# 创建任务
task1 = asyncio.create_task(async_task(1))
task2 = asyncio.create_task(async_task(2))
# 等待任务完成
await task1
await task2
async def async_task(id):
print(f"任务 {id} 开始")
await asyncio.sleep(1)
print(f"任务 {id} 结束")
# 启动事件循环
asyncio.run(main())
3.2 手动管理事件循环
虽然通常使用asyncio.run()
,但也可以手动管理事件循环:
async def manual_loop_management():
print("手动事件循环示例")
await asyncio.sleep(0.5)
# 创建事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# 运行协程
result = loop.run_until_complete(manual_loop_management())
print(f"执行结果: {result}")
finally:
# 关闭事件循环
loop.close()
四、并发执行与任务管理
4.1 使用asyncio.gather
并发执行
asyncio.gather()
用于并发运行多个协程,并收集所有结果:
import asyncio
import random
async def fetch_data(id, delay):
"""模拟获取数据"""
print(f"任务 {id} 开始,预计耗时: {delay}秒")
await asyncio.sleep(delay)
result = f"数据-{id}"
print(f"任务 {id} 完成")
return result
async def concurrent_execution():
# 创建多个不同耗时的任务
tasks = [
fetch_data(1, random.uniform(0.5, 2.0)),
fetch_data(2, random.uniform(0.5, 2.0)),
fetch_data(3, random.uniform(0.5, 2.0))
]
# 并发执行所有任务
results = await asyncio.gather(*tasks)
print(f"所有任务完成: {results}")
asyncio.run(concurrent_execution())
4.2 使用asyncio.create_task
创建任务
asyncio.create_task()
将协程包装为任务,允许后台执行:
async def task_management():
# 创建后台任务
background_task = asyncio.create_task(long_running_operation())
# 执行其他操作
for i in range(3):
print(f"主程序执行中... {i}")
await asyncio.sleep(0.5)
# 等待后台任务完成
result = await background_task
print(f"后台任务结果: {result}")
async def long_running_operation():
print("后台任务开始")
await asyncio.sleep(2)
print("后台任务结束")
return "后台完成"
asyncio.run(task_management())
4.3 使用asyncio.wait
控制任务执行
asyncio.wait()
提供更灵活的任务控制:
async def advanced_task_control():
# 创建多个任务
tasks = [asyncio.create_task(fetch_data(i, random.uniform(1, 3))) for i in range(5)]
# 等待部分任务完成
done, pending = await asyncio.wait(tasks, timeout=1.5, return_when=asyncio.FIRST_COMPLETED)
print(f"已完成: {len(done)} 个任务")
print(f"仍在进行: {len(pending)} 个任务")
# 取消未完成的任务
for task in pending:
task.cancel()
# 等待取消操作完成
await asyncio.gather(*pending, return_exceptions=True)
asyncio.run(advanced_task_control())
五、异步上下文管理器与迭代器
5.1 异步上下文管理器
使用async with
管理异步资源:
import aiohttp
class AsyncWebClient:
"""异步Web客户端"""
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc, tb):
await self.session.close()
async def fetch(self, url):
async with self.session.get(url) as response:
return await response.text()
async def use_async_context():
async with AsyncWebClient() as client:
content = await client.fetch("https://httpbin.org/get")
print(f"获取内容长度: {len(content)}")
asyncio.run(use_async_context())
5.2 异步迭代器
实现和使用异步迭代器:
class AsyncDataStream:
"""模拟异步数据流"""
def __init__(self, limit):
self.limit = limit
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.limit:
raise StopAsyncIteration
# 模拟异步数据获取
await asyncio.sleep(0.1)
self.current += 1
return f"数据项-{self.current}"
async def process_data_stream():
async for data in AsyncDataStream(5):
print(f"处理: {data}")
asyncio.run(process_data_stream())
六、高级模式与技巧
6.1 使用信号量控制并发数
async def controlled_concurrency():
# 限制同时只有2个任务执行
semaphore = asyncio.Semaphore(2)
async def limited_task(id):
async with semaphore:
print(f"任务 {id} 开始执行")
await asyncio.sleep(1)
print(f"任务 {id} 完成")
# 创建多个任务
tasks = [limited_task(i) for i in range(5)]
await asyncio.gather(*tasks)
asyncio.run(controlled_concurrency())
6.2 超时与错误处理
async def robust_operations():
# 设置超时
try:
result = await asyncio.wait_for(slow_operation(), timeout=1.0)
print(f"成功: {result}")
except asyncio.TimeoutError:
print("操作超时")
# 使用shield防止取消
try:
result = await asyncio.shield(important_operation())
print(f"重要操作完成: {result}")
except asyncio.CancelledError:
print("重要操作被取消")
async def slow_operation():
await asyncio.sleep(2) # 超过超时时间
return "慢操作结果"
async def important_operation():
await asyncio.sleep(0.5)
return "重要操作结果"
asyncio.run(robust_operations())
6.3 协程与线程池的混合使用
import concurrent.futures
import time
def blocking_io_operation(duration):
"""模拟阻塞I/O操作"""
time.sleep(duration)
return f"阻塞操作完成,耗时 {duration} 秒"
async def hybrid_approach():
loop = asyncio.get_running_loop()
# 在线程池中执行阻塞操作
with concurrent.futures.ThreadPoolExecutor() as pool:
# 执行单个阻塞操作
result = await loop.run_in_executor(pool, blocking_io_operation, 1)
print(result)
# 并发执行多个阻塞操作
tasks = [
loop.run_in_executor(pool, blocking_io_operation, i)
for i in [1, 2, 3]
]
results = await asyncio.gather(*tasks)
print(f"所有阻塞操作完成: {results}")
asyncio.run(hybrid_approach())
七、实战应用示例
7.1 异步Web爬虫
import aiohttp
import asyncio
from urllib.parse import urljoin
async def async_crawler(base_url, max_concurrent=5):
"""异步网页爬虫"""
semaphore = asyncio.Semaphore(max_concurrent)
visited = set()
async def crawl(url):
if url in visited:
return []
async with semaphore:
visited.add(url)
print(f"爬取: {url}")
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=10) as response:
if response.status != 200:
return []
content = await response.text()
# 这里可以解析内容并提取新链接
print(f"完成: {url}, 长度: {len(content)}")
return [] # 返回新发现的链接
except Exception as e:
print(f"错误 {url}: {e}")
return []
# 开始爬取
tasks = [crawl(base_url)]
await asyncio.gather(*tasks)
asyncio.run(async_crawler("https://httpbin.org/html"))
7.2 实时数据处理管道
import asyncio
import random
async def data_producer(queue):
"""数据生产者"""
for i in range(10):
data = f"数据-{i}"
await queue.put(data)
print(f"生产: {data}")
await asyncio.sleep(random.uniform(0.1, 0.5))
await queue.put(None) # 结束信号
async def data_consumer(queue, consumer_id):
"""数据消费者"""
while True:
data = await queue.get()
if data is None:
# 将结束信号放回,让其他消费者也能看到
await queue.put(None)
break
print(f"消费者 {consumer_id} 处理: {data}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def data_processing_pipeline():
"""数据处理管道"""
queue = asyncio.Queue(maxsize=5)
# 启动生产者和消费者
producer_task = asyncio.create_task(data_producer(queue))
consumer_tasks = [
asyncio.create_task(data_consumer(queue, i))
for i in range(3) # 3个消费者
]
# 等待生产者完成
await producer_task
# 等待所有数据被处理
await queue.join()
# 取消消费者任务
for task in consumer_tasks:
task.cancel()
# 等待取消完成
await asyncio.gather(*consumer_tasks, return_exceptions=True)
asyncio.run(data_processing_pipeline())
7.3 WebSocket实时通信
import asyncio
import websockets # 需要安装: pip install websockets
async def websocket_server(websocket, path):
"""WebSocket服务器"""
print("客户端连接")
try:
# 发送欢迎消息
await websocket.send("欢迎连接到服务器!")
# 处理客户端消息
async for message in websocket:
print(f"收到消息: {message}")
response = f"回声: {message}"
await websocket.send(response)
except websockets.exceptions.ConnectionClosed:
print("客户端断开连接")
async def websocket_client():
"""WebSocket客户端"""
uri = "ws://localhost:8765"
try:
async with websockets.connect(uri) as websocket:
# 接收欢迎消息
greeting = await websocket.recv()
print(f"服务器说: {greeting}")
# 发送几条消息
for i in range(3):
message = f"消息 {i}"
await websocket.send(message)
print(f"发送: {message}")
response = await websocket.recv()
print(f"收到: {response}")
await asyncio.sleep(1)
except Exception as e:
print(f"连接错误: {e}")
async def run_websocket_example():
"""运行WebSocket示例"""
# 启动服务器
server = await websockets.serve(websocket_server, "localhost", 8765)
print("WebSocket服务器已启动")
# 运行客户端
await websocket_client()
# 关闭服务器
server.close()
await server.wait_closed()
asyncio.run(run_websocket_example())
八、性能优化与调试
8.1 使用UVLoop
提升性能
UVLoop
是asyncio
的事件循环替代实现,性能更高:
import asyncio
import uvloop
async def performance_test():
"""性能测试"""
start = asyncio.get_running_loop().time()
# 执行大量异步操作
await asyncio.gather(*[asyncio.sleep(0.001) for _ in range(1000)])
end = asyncio.get_running_loop().time()
print(f"执行时间: {end - start:.3f} 秒")
# 使用UVLoop
uvloop.install()
asyncio.run(performance_test())
8.2 异步代码调试
import asyncio
import logging
# 设置日志
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
async def debuggable_coroutine():
"""可调试的协程"""
logger.debug("协程开始")
try:
result = await potentially_failing_operation()
logger.debug(f"操作成功: {result}")
return result
except Exception as e:
logger.error(f"操作失败: {e}")
raise
async def potentially_failing_operation():
"""可能失败的操作"""
await asyncio.sleep(0.1)
if random.random() < 0.3: # 30%概率失败
raise ValueError("随机错误")
return "成功"
# 启用调试模式
asyncio.run(debuggable_coroutine(), debug=True)
九、总结与最佳实践
9.1 协程使用原则
- 优先使用async/await:避免混用不同风格的协程
- 合理控制并发数:使用信号量限制资源使用
- 正确处理错误:为所有协程添加适当的错误处理
- 避免阻塞操作:将阻塞操作转移到线程池执行
- 使用类型注解:提高代码可读性和可维护性
9.2 常见陷阱与解决方案
陷阱 | 解决方案 |
---|---|
忘记使用await | 使用静态类型检查器如mypy |
在同步代码中调用协程 | 使用asyncio.run() 或事件循环 |
未处理协程异常 | 使用try/except包裹await表达式 |
资源未正确释放 | 使用异步上下文管理器 |
9.3 性能优化建议
- 使用
UVLoop
:替代默认事件循环提升性能 - 合理设置并发限制:避免过度并发导致资源竞争
- 使用连接池:复用HTTP/数据库连接
- 批量处理操作:减少频繁的小操作
更多推荐
所有评论(0)