一、协程基础与核心概念

1.1 什么是协程

协程(Coroutine)是Python中实现并发编程的轻量级方案,它允许在单个线程内通过协作式多任务处理来执行多个任务。与多线程和多进程不同,协程的切换由程序自身控制,避免了上下文切换的开销,特别适合I/O密集型应用。

Python 3.5+引入了原生的async/await语法,使协程编程变得更加直观和易于使用。协程本质上是一种特殊的函数,可以在执行过程中暂停和恢复,同时保持局部状态。

1.2 协程与线程的对比

并发编程模型
多线程
多进程
协程
操作系统调度
上下文切换开销大
适合CPU密集型任务
完全隔离的内存空间
开销最大
真正并行执行
程序自身调度
几乎无上下文切换开销
适合I/O密集型任务
单线程内数万个并发

二、async/await语法详解

2.1 定义异步函数

使用async def定义异步函数,这类函数返回协程对象而不是直接执行:

import asyncio

async def simple_coroutine():
    """简单的协程示例"""
    print("开始执行协程")
    await asyncio.sleep(1)  # 模拟异步操作
    print("协程执行完毕")
    return "完成"

# 调用协程
result = asyncio.run(simple_coroutine())
print(f"结果: {result}")

2.2 await表达式

await关键字用于挂起当前协程的执行,等待异步操作完成:

async def complex_example():
    print("步骤1")
    await asyncio.sleep(0.5)  # 等待0.5秒
    
    print("步骤2")
    result = await some_async_operation()  # 等待其他异步操作
    
    print("步骤3")
    return result

2.3 异步函数调用链

协程可以调用其他协程,形成调用链:

async def inner_coroutine(x):
    await asyncio.sleep(0.1)
    return x * 2

async def middle_coroutine(y):
    result = await inner_coroutine(y)
    return result + 10

async def outer_coroutine(z):
    final_result = await middle_coroutine(z)
    return final_result * 3

# 执行调用链
result = asyncio.run(outer_coroutine(5))
print(f"最终结果: {result}")  # 输出: (5 * 2+10)*3 = 60

三、事件循环(Event Loop)详解

3.1 事件循环的概念

事件循环是asyncio的核心,负责调度和执行协程任务:

import asyncio

async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    print(f"事件循环: {loop}")
    
    # 创建任务
    task1 = asyncio.create_task(async_task(1))
    task2 = asyncio.create_task(async_task(2))
    
    # 等待任务完成
    await task1
    await task2

async def async_task(id):
    print(f"任务 {id} 开始")
    await asyncio.sleep(1)
    print(f"任务 {id} 结束")

# 启动事件循环
asyncio.run(main())

3.2 手动管理事件循环

虽然通常使用asyncio.run(),但也可以手动管理事件循环:

async def manual_loop_management():
    print("手动事件循环示例")
    await asyncio.sleep(0.5)

# 创建事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

try:
    # 运行协程
    result = loop.run_until_complete(manual_loop_management())
    print(f"执行结果: {result}")
finally:
    # 关闭事件循环
    loop.close()

四、并发执行与任务管理

4.1 使用asyncio.gather并发执行

asyncio.gather()用于并发运行多个协程,并收集所有结果:

import asyncio
import random

async def fetch_data(id, delay):
    """模拟获取数据"""
    print(f"任务 {id} 开始,预计耗时: {delay}秒")
    await asyncio.sleep(delay)
    result = f"数据-{id}"
    print(f"任务 {id} 完成")
    return result

async def concurrent_execution():
    # 创建多个不同耗时的任务
    tasks = [
        fetch_data(1, random.uniform(0.5, 2.0)),
        fetch_data(2, random.uniform(0.5, 2.0)),
        fetch_data(3, random.uniform(0.5, 2.0))
    ]
    
    # 并发执行所有任务
    results = await asyncio.gather(*tasks)
    print(f"所有任务完成: {results}")

asyncio.run(concurrent_execution())

4.2 使用asyncio.create_task创建任务

asyncio.create_task()将协程包装为任务,允许后台执行:

async def task_management():
    # 创建后台任务
    background_task = asyncio.create_task(long_running_operation())
    
    # 执行其他操作
    for i in range(3):
        print(f"主程序执行中... {i}")
        await asyncio.sleep(0.5)
    
    # 等待后台任务完成
    result = await background_task
    print(f"后台任务结果: {result}")

async def long_running_operation():
    print("后台任务开始")
    await asyncio.sleep(2)
    print("后台任务结束")
    return "后台完成"

asyncio.run(task_management())

4.3 使用asyncio.wait控制任务执行

asyncio.wait()提供更灵活的任务控制:

async def advanced_task_control():
    # 创建多个任务
    tasks = [asyncio.create_task(fetch_data(i, random.uniform(1, 3))) for i in range(5)]
    
    # 等待部分任务完成
    done, pending = await asyncio.wait(tasks, timeout=1.5, return_when=asyncio.FIRST_COMPLETED)
    
    print(f"已完成: {len(done)} 个任务")
    print(f"仍在进行: {len(pending)} 个任务")
    
    # 取消未完成的任务
    for task in pending:
        task.cancel()
    
    # 等待取消操作完成
    await asyncio.gather(*pending, return_exceptions=True)

asyncio.run(advanced_task_control())

五、异步上下文管理器与迭代器

5.1 异步上下文管理器

使用async with管理异步资源:

import aiohttp

class AsyncWebClient:
    """异步Web客户端"""
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, exc_type, exc, tb):
        await self.session.close()
    
    async def fetch(self, url):
        async with self.session.get(url) as response:
            return await response.text()

async def use_async_context():
    async with AsyncWebClient() as client:
        content = await client.fetch("https://httpbin.org/get")
        print(f"获取内容长度: {len(content)}")

asyncio.run(use_async_context())

5.2 异步迭代器

实现和使用异步迭代器:

class AsyncDataStream:
    """模拟异步数据流"""
    def __init__(self, limit):
        self.limit = limit
        self.current = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.current >= self.limit:
            raise StopAsyncIteration
        
        # 模拟异步数据获取
        await asyncio.sleep(0.1)
        self.current += 1
        return f"数据项-{self.current}"

async def process_data_stream():
    async for data in AsyncDataStream(5):
        print(f"处理: {data}")

asyncio.run(process_data_stream())

六、高级模式与技巧

6.1 使用信号量控制并发数

async def controlled_concurrency():
    # 限制同时只有2个任务执行
    semaphore = asyncio.Semaphore(2)
    
    async def limited_task(id):
        async with semaphore:
            print(f"任务 {id} 开始执行")
            await asyncio.sleep(1)
            print(f"任务 {id} 完成")
    
    # 创建多个任务
    tasks = [limited_task(i) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(controlled_concurrency())

6.2 超时与错误处理

async def robust_operations():
    # 设置超时
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=1.0)
        print(f"成功: {result}")
    except asyncio.TimeoutError:
        print("操作超时")
    
    # 使用shield防止取消
    try:
        result = await asyncio.shield(important_operation())
        print(f"重要操作完成: {result}")
    except asyncio.CancelledError:
        print("重要操作被取消")

async def slow_operation():
    await asyncio.sleep(2)  # 超过超时时间
    return "慢操作结果"

async def important_operation():
    await asyncio.sleep(0.5)
    return "重要操作结果"

asyncio.run(robust_operations())

6.3 协程与线程池的混合使用

import concurrent.futures
import time

def blocking_io_operation(duration):
    """模拟阻塞I/O操作"""
    time.sleep(duration)
    return f"阻塞操作完成,耗时 {duration} 秒"

async def hybrid_approach():
    loop = asyncio.get_running_loop()
    
    # 在线程池中执行阻塞操作
    with concurrent.futures.ThreadPoolExecutor() as pool:
        # 执行单个阻塞操作
        result = await loop.run_in_executor(pool, blocking_io_operation, 1)
        print(result)
        
        # 并发执行多个阻塞操作
        tasks = [
            loop.run_in_executor(pool, blocking_io_operation, i)
            for i in [1, 2, 3]
        ]
        results = await asyncio.gather(*tasks)
        print(f"所有阻塞操作完成: {results}")

asyncio.run(hybrid_approach())

七、实战应用示例

7.1 异步Web爬虫

import aiohttp
import asyncio
from urllib.parse import urljoin

async def async_crawler(base_url, max_concurrent=5):
    """异步网页爬虫"""
    semaphore = asyncio.Semaphore(max_concurrent)
    visited = set()
    
    async def crawl(url):
        if url in visited:
            return []
        
        async with semaphore:
            visited.add(url)
            print(f"爬取: {url}")
            
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(url, timeout=10) as response:
                        if response.status != 200:
                            return []
                        
                        content = await response.text()
                        # 这里可以解析内容并提取新链接
                        print(f"完成: {url}, 长度: {len(content)}")
                        return []  # 返回新发现的链接
            except Exception as e:
                print(f"错误 {url}: {e}")
                return []
    
    # 开始爬取
    tasks = [crawl(base_url)]
    await asyncio.gather(*tasks)

asyncio.run(async_crawler("https://httpbin.org/html"))

7.2 实时数据处理管道

import asyncio
import random

async def data_producer(queue):
    """数据生产者"""
    for i in range(10):
        data = f"数据-{i}"
        await queue.put(data)
        print(f"生产: {data}")
        await asyncio.sleep(random.uniform(0.1, 0.5))
    
    await queue.put(None)  # 结束信号

async def data_consumer(queue, consumer_id):
    """数据消费者"""
    while True:
        data = await queue.get()
        if data is None:
            # 将结束信号放回,让其他消费者也能看到
            await queue.put(None)
            break
        
        print(f"消费者 {consumer_id} 处理: {data}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def data_processing_pipeline():
    """数据处理管道"""
    queue = asyncio.Queue(maxsize=5)
    
    # 启动生产者和消费者
    producer_task = asyncio.create_task(data_producer(queue))
    consumer_tasks = [
        asyncio.create_task(data_consumer(queue, i))
        for i in range(3)  # 3个消费者
    ]
    
    # 等待生产者完成
    await producer_task
    
    # 等待所有数据被处理
    await queue.join()
    
    # 取消消费者任务
    for task in consumer_tasks:
        task.cancel()
    
    # 等待取消完成
    await asyncio.gather(*consumer_tasks, return_exceptions=True)

asyncio.run(data_processing_pipeline())

7.3 WebSocket实时通信

import asyncio
import websockets  # 需要安装: pip install websockets

async def websocket_server(websocket, path):
    """WebSocket服务器"""
    print("客户端连接")
    
    try:
        # 发送欢迎消息
        await websocket.send("欢迎连接到服务器!")
        
        # 处理客户端消息
        async for message in websocket:
            print(f"收到消息: {message}")
            response = f"回声: {message}"
            await websocket.send(response)
            
    except websockets.exceptions.ConnectionClosed:
        print("客户端断开连接")

async def websocket_client():
    """WebSocket客户端"""
    uri = "ws://localhost:8765"
    
    try:
        async with websockets.connect(uri) as websocket:
            # 接收欢迎消息
            greeting = await websocket.recv()
            print(f"服务器说: {greeting}")
            
            # 发送几条消息
            for i in range(3):
                message = f"消息 {i}"
                await websocket.send(message)
                print(f"发送: {message}")
                
                response = await websocket.recv()
                print(f"收到: {response}")
                
                await asyncio.sleep(1)
                
    except Exception as e:
        print(f"连接错误: {e}")

async def run_websocket_example():
    """运行WebSocket示例"""
    # 启动服务器
    server = await websockets.serve(websocket_server, "localhost", 8765)
    print("WebSocket服务器已启动")
    
    # 运行客户端
    await websocket_client()
    
    # 关闭服务器
    server.close()
    await server.wait_closed()

asyncio.run(run_websocket_example())

八、性能优化与调试

8.1 使用UVLoop提升性能

UVLoopasyncio的事件循环替代实现,性能更高:

import asyncio
import uvloop

async def performance_test():
    """性能测试"""
    start = asyncio.get_running_loop().time()
    
    # 执行大量异步操作
    await asyncio.gather(*[asyncio.sleep(0.001) for _ in range(1000)])
    
    end = asyncio.get_running_loop().time()
    print(f"执行时间: {end - start:.3f} 秒")

# 使用UVLoop
uvloop.install()
asyncio.run(performance_test())

8.2 异步代码调试

import asyncio
import logging

# 设置日志
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

async def debuggable_coroutine():
    """可调试的协程"""
    logger.debug("协程开始")
    
    try:
        result = await potentially_failing_operation()
        logger.debug(f"操作成功: {result}")
        return result
    except Exception as e:
        logger.error(f"操作失败: {e}")
        raise

async def potentially_failing_operation():
    """可能失败的操作"""
    await asyncio.sleep(0.1)
    if random.random() < 0.3:  # 30%概率失败
        raise ValueError("随机错误")
    return "成功"

# 启用调试模式
asyncio.run(debuggable_coroutine(), debug=True)

九、总结与最佳实践

9.1 协程使用原则

  1. 优先使用async/await:避免混用不同风格的协程
  2. 合理控制并发数:使用信号量限制资源使用
  3. 正确处理错误:为所有协程添加适当的错误处理
  4. 避免阻塞操作:将阻塞操作转移到线程池执行
  5. 使用类型注解:提高代码可读性和可维护性

9.2 常见陷阱与解决方案

陷阱 解决方案
忘记使用await 使用静态类型检查器如mypy
在同步代码中调用协程 使用asyncio.run()或事件循环
未处理协程异常 使用try/except包裹await表达式
资源未正确释放 使用异步上下文管理器

9.3 性能优化建议

  1. 使用UVLoop:替代默认事件循环提升性能
  2. 合理设置并发限制:避免过度并发导致资源竞争
  3. 使用连接池:复用HTTP/数据库连接
  4. 批量处理操作:减少频繁的小操作
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐