028-异步编程与协程

🔴 难度: 高级 | ⏱️ 预计时间: 6小时 | 📋 前置: 027-多线程与并发编程

学习目标

完成本章节后,你将能够:

  • 理解异步编程的概念和优势
  • 掌握Python协程的基本语法和使用
  • 熟练使用asyncio库进行异步编程
  • 实现高效的异步I/O操作
  • 设计和优化异步应用程序
  • 处理异步编程中的常见问题

异步编程基础

什么是异步编程

异步编程是一种编程范式,允许程序在等待某些操作(如I/O操作)完成时,继续执行其他任务,而不是阻塞等待。

# 异步编程基础概念演示
print("=== 异步编程基础概念 ===")

import asyncio
import time
from typing import List, Any

# 同步版本 - 阻塞执行
def sync_task(name: str, duration: int) -> str:
    """同步任务 - 会阻塞执行"""
    print(f"开始执行任务 {name}")
    time.sleep(duration)  # 模拟耗时操作
    print(f"任务 {name} 完成")
    return f"任务 {name} 的结果"

def run_sync_tasks():
    """运行同步任务"""
    print("\n--- 同步执行 ---")
    start_time = time.time()
    
    # 依次执行任务
    result1 = sync_task("A", 2)
    result2 = sync_task("B", 1)
    result3 = sync_task("C", 3)
    
    end_time = time.time()
    print(f"同步执行总时间: {end_time - start_time:.2f} 秒")
    return [result1, result2, result3]

# 异步版本 - 非阻塞执行
async def async_task(name: str, duration: int) -> str:
    """异步任务 - 不会阻塞其他任务"""
    print(f"开始执行任务 {name}")
    await asyncio.sleep(duration)  # 异步等待,不阻塞
    print(f"任务 {name} 完成")
    return f"任务 {name} 的结果"

async def run_async_tasks():
    """运行异步任务"""
    print("\n--- 异步执行 ---")
    start_time = time.time()
    
    # 并发执行任务
    tasks = [
        async_task("A", 2),
        async_task("B", 1),
        async_task("C", 3)
    ]
    
    results = await asyncio.gather(*tasks)
    
    end_time = time.time()
    print(f"异步执行总时间: {end_time - start_time:.2f} 秒")
    return results

# 运行演示
sync_results = run_sync_tasks()
print(f"同步结果: {sync_results}")

# 运行异步任务
async_results = asyncio.run(run_async_tasks())
print(f"异步结果: {async_results}")

协程基础

协程(Coroutine)是异步编程的核心概念,它是一种可以暂停和恢复执行的函数。

# 协程基础语法
print("\n=== 协程基础语法 ===")

import asyncio
import inspect
from typing import Generator, Awaitable

# 定义协程函数
async def simple_coroutine(name: str) -> str:
    """简单的协程函数"""
    print(f"协程 {name} 开始执行")
    
    # await 关键字用于等待异步操作
    await asyncio.sleep(1)
    
    print(f"协程 {name} 执行完成")
    return f"来自协程 {name} 的结果"

# 协程对象
coro = simple_coroutine("test")
print(f"协程对象类型: {type(coro)}")
print(f"是否为协程: {inspect.iscoroutine(coro)}")

# 运行协程
result = asyncio.run(coro)
print(f"协程结果: {result}")

# 协程的生命周期
async def coroutine_lifecycle():
    """演示协程的生命周期"""
    print("\n--- 协程生命周期 ---")
    
    async def lifecycle_demo(stage: str):
        print(f"阶段 {stage}: 协程开始")
        try:
            await asyncio.sleep(0.5)
            print(f"阶段 {stage}: 协程执行中")
            await asyncio.sleep(0.5)
            print(f"阶段 {stage}: 协程即将结束")
            return f"阶段 {stage} 完成"
        except asyncio.CancelledError:
            print(f"阶段 {stage}: 协程被取消")
            raise
        finally:
            print(f"阶段 {stage}: 协程清理")
    
    # 创建多个协程
    tasks = [
        asyncio.create_task(lifecycle_demo("1")),
        asyncio.create_task(lifecycle_demo("2")),
        asyncio.create_task(lifecycle_demo("3"))
    ]
    
    # 等待所有协程完成
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for i, result in enumerate(results, 1):
        if isinstance(result, Exception):
            print(f"任务 {i} 异常: {result}")
        else:
            print(f"任务 {i} 结果: {result}")

# 运行生命周期演示
asyncio.run(coroutine_lifecycle())

async/await 语法

asyncawait 是Python异步编程的核心关键字。

# async/await 详细语法
print("\n=== async/await 语法详解 ===")

import asyncio
import aiohttp
import json
from typing import Dict, List, Optional

# async 函数定义
async def fetch_data(url: str, session) -> Dict:
    """异步获取数据"""
    try:
        # await 等待异步操作
        async with session.get(url) as response:
            data = await response.json()
            return {
                'url': url,
                'status': response.status,
                'data': data
            }
    except Exception as e:
        return {
            'url': url,
            'status': 0,
            'error': str(e)
        }

# 异步上下文管理器
class AsyncResourceManager:
    """异步资源管理器"""
    
    def __init__(self, resource_name: str):
        self.resource_name = resource_name
        self.resource = None
    
    async def __aenter__(self):
        """异步进入上下文"""
        print(f"获取资源: {self.resource_name}")
        await asyncio.sleep(0.1)  # 模拟资源获取
        self.resource = f"Resource-{self.resource_name}"
        return self.resource
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步退出上下文"""
        print(f"释放资源: {self.resource_name}")
        await asyncio.sleep(0.1)  # 模拟资源释放
        self.resource = None

# 异步迭代器
class AsyncNumberGenerator:
    """异步数字生成器"""
    
    def __init__(self, start: int, end: int, delay: float = 0.1):
        self.start = start
        self.end = end
        self.delay = delay
        self.current = start
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.current >= self.end:
            raise StopAsyncIteration
        
        await asyncio.sleep(self.delay)
        value = self.current
        self.current += 1
        return value

# 综合示例
async def async_syntax_demo():
    """异步语法综合演示"""
    print("\n--- 异步语法综合演示 ---")
    
    # 1. 异步上下文管理器
    async with AsyncResourceManager("Database") as db:
        print(f"使用资源: {db}")
        await asyncio.sleep(0.5)
    
    # 2. 异步迭代器
    print("\n异步迭代器:")
    async for number in AsyncNumberGenerator(1, 6):
        print(f"生成数字: {number}")
    
    # 3. 异步列表推导式
    print("\n异步列表推导式:")
    async def square(x):
        await asyncio.sleep(0.1)
        return x ** 2
    
    # 注意:Python 3.6+ 支持异步推导式
    squares = [await square(x) for x in range(1, 6)]
    print(f"平方数: {squares}")
    
    # 4. 异步生成器
    async def async_fibonacci(n):
        """异步斐波那契生成器"""
        a, b = 0, 1
        for _ in range(n):
            await asyncio.sleep(0.1)
            yield a
            a, b = b, a + b
    
    print("\n异步斐波那契数列:")
    fib_numbers = []
    async for fib in async_fibonacci(8):
        fib_numbers.append(fib)
    print(f"斐波那契数列: {fib_numbers}")

# 运行语法演示
asyncio.run(async_syntax_demo())

asyncio 核心概念

事件循环(Event Loop)

事件循环是asyncio的核心,负责调度和执行协程。

# 事件循环详解
print("\n=== 事件循环详解 ===")

import asyncio
import threading
import time
from typing import Callable, Any

class EventLoopDemo:
    """事件循环演示类"""
    
    def __init__(self):
        self.loop = None
        self.tasks = []
    
    def demonstrate_event_loop(self):
        """演示事件循环的基本概念"""
        print("\n--- 事件循环基础 ---")
        
        # 获取当前事件循环
        try:
            loop = asyncio.get_running_loop()
            print(f"当前运行的事件循环: {loop}")
        except RuntimeError:
            print("当前没有运行的事件循环")
        
        # 创建新的事件循环
        new_loop = asyncio.new_event_loop()
        print(f"新创建的事件循环: {new_loop}")
        
        # 设置事件循环策略
        policy = asyncio.get_event_loop_policy()
        print(f"事件循环策略: {policy}")
    
    async def simple_task(self, name: str, duration: float) -> str:
        """简单的异步任务"""
        print(f"任务 {name} 开始 (线程: {threading.current_thread().name})")
        await asyncio.sleep(duration)
        print(f"任务 {name} 完成")
        return f"任务 {name} 的结果"
    
    def callback_function(self, future):
        """回调函数"""
        if future.exception():
            print(f"任务异常: {future.exception()}")
        else:
            print(f"任务完成,结果: {future.result()}")
    
    async def demonstrate_task_scheduling(self):
        """演示任务调度"""
        print("\n--- 任务调度演示 ---")
        
        # 1. 创建任务
        task1 = asyncio.create_task(self.simple_task("A", 1.0))
        task2 = asyncio.create_task(self.simple_task("B", 0.5))
        task3 = asyncio.create_task(self.simple_task("C", 1.5))
        
        # 添加回调
        task1.add_done_callback(self.callback_function)
        task2.add_done_callback(self.callback_function)
        task3.add_done_callback(self.callback_function)
        
        # 2. 等待任务完成
        print("等待所有任务完成...")
        results = await asyncio.gather(task1, task2, task3)
        print(f"所有任务结果: {results}")
        
        # 3. 任务状态检查
        print(f"\n任务状态:")
        for i, task in enumerate([task1, task2, task3], 1):
            print(f"  任务 {i}: done={task.done()}, cancelled={task.cancelled()}")
    
    async def demonstrate_task_cancellation(self):
        """演示任务取消"""
        print("\n--- 任务取消演示 ---")
        
        # 创建长时间运行的任务
        long_task = asyncio.create_task(self.simple_task("Long", 5.0))
        
        # 等待一段时间后取消
        await asyncio.sleep(1.0)
        print("取消长时间任务...")
        long_task.cancel()
        
        try:
            await long_task
        except asyncio.CancelledError:
            print("任务已被取消")
    
    def run_in_thread(self):
        """在线程中运行事件循环"""
        print("\n--- 线程中的事件循环 ---")
        
        def thread_worker():
            # 在新线程中创建事件循环
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            
            async def thread_task():
                print(f"线程任务开始 (线程: {threading.current_thread().name})")
                await asyncio.sleep(1)
                print("线程任务完成")
                return "线程任务结果"
            
            try:
                result = loop.run_until_complete(thread_task())
                print(f"线程任务结果: {result}")
            finally:
                loop.close()
        
        # 启动线程
        thread = threading.Thread(target=thread_worker, name="AsyncThread")
        thread.start()
        thread.join()
    
    async def demonstrate_loop_methods(self):
        """演示事件循环方法"""
        print("\n--- 事件循环方法演示 ---")
        
        loop = asyncio.get_running_loop()
        
        # 1. 调度回调
        def sync_callback(name):
            print(f"同步回调 {name} 执行")
        
        loop.call_soon(sync_callback, "soon")
        loop.call_later(1.0, sync_callback, "later")
        
        # 2. 在线程池中运行阻塞函数
        def blocking_function(duration):
            print(f"阻塞函数开始 (线程: {threading.current_thread().name})")
            time.sleep(duration)
            print("阻塞函数完成")
            return "阻塞函数结果"
        
        result = await loop.run_in_executor(None, blocking_function, 1.0)
        print(f"阻塞函数结果: {result}")
        
        # 3. 等待回调执行
        await asyncio.sleep(2.0)

# 运行事件循环演示
async def run_event_loop_demo():
    demo = EventLoopDemo()
    
    # 基础演示
    demo.demonstrate_event_loop()
    
    # 任务调度
    await demo.demonstrate_task_scheduling()
    
    # 任务取消
    await demo.demonstrate_task_cancellation()
    
    # 事件循环方法
    await demo.demonstrate_loop_methods()

# 运行演示
asyncio.run(run_event_loop_demo())

# 线程中的事件循环(需要在主程序外运行)
demo = EventLoopDemo()
demo.run_in_thread()

任务和Future

Task和Future是asyncio中管理异步操作的重要概念。

# Task和Future详解
print("\n=== Task和Future详解 ===")

import asyncio
import random
import time
from typing import List, Optional, Union

class TaskFutureDemo:
    """Task和Future演示类"""
    
    async def demonstrate_future(self):
        """演示Future的使用"""
        print("\n--- Future演示 ---")
        
        # 创建Future对象
        future = asyncio.Future()
        
        async def set_future_result():
            """设置Future结果"""
            await asyncio.sleep(1)
            if not future.done():
                future.set_result("Future的结果")
        
        async def set_future_exception():
            """设置Future异常"""
            await asyncio.sleep(2)
            if not future.done():
                future.set_exception(ValueError("Future异常"))
        
        # 启动设置结果的任务
        asyncio.create_task(set_future_result())
        
        try:
            result = await future
            print(f"Future结果: {result}")
        except Exception as e:
            print(f"Future异常: {e}")
        
        # Future状态检查
        print(f"Future状态: done={future.done()}, cancelled={future.cancelled()}")
        if future.done() and not future.cancelled():
            try:
                print(f"Future结果: {future.result()}")
            except Exception as e:
                print(f"Future异常: {future.exception()}")
    
    async def demonstrate_task_creation(self):
        """演示Task创建方式"""
        print("\n--- Task创建方式 ---")
        
        async def sample_coroutine(name: str, delay: float) -> str:
            await asyncio.sleep(delay)
            return f"协程 {name} 完成"
        
        # 方式1: asyncio.create_task()
        task1 = asyncio.create_task(sample_coroutine("Task1", 1.0))
        
        # 方式2: asyncio.ensure_future()
        task2 = asyncio.ensure_future(sample_coroutine("Task2", 0.5))
        
        # 方式3: loop.create_task()
        loop = asyncio.get_running_loop()
        task3 = loop.create_task(sample_coroutine("Task3", 1.5))
        
        # 等待所有任务
        results = await asyncio.gather(task1, task2, task3)
        print(f"任务结果: {results}")
    
    async def demonstrate_task_management(self):
        """演示Task管理"""
        print("\n--- Task管理 ---")
        
        async def monitored_task(name: str, duration: float, fail_chance: float = 0.0) -> str:
            """可监控的任务"""
            print(f"任务 {name} 开始")
            
            try:
                # 模拟工作
                for i in range(int(duration * 10)):
                    await asyncio.sleep(0.1)
                    
                    # 随机失败
                    if random.random() < fail_chance:
                        raise ValueError(f"任务 {name} 随机失败")
                    
                    # 检查是否被取消
                    if asyncio.current_task().cancelled():
                        print(f"任务 {name} 检测到取消信号")
                        break
                
                print(f"任务 {name} 正常完成")
                return f"任务 {name} 的结果"
                
            except asyncio.CancelledError:
                print(f"任务 {name} 被取消")
                # 清理资源
                await asyncio.sleep(0.1)
                raise
            except Exception as e:
                print(f"任务 {name} 异常: {e}")
                raise
        
        # 创建多个任务
        tasks = [
            asyncio.create_task(monitored_task("A", 2.0, 0.1)),
            asyncio.create_task(monitored_task("B", 1.5, 0.2)),
            asyncio.create_task(monitored_task("C", 3.0, 0.0)),
            asyncio.create_task(monitored_task("D", 1.0, 0.3))
        ]
        
        # 为任务添加名称
        for i, task in enumerate(tasks):
            task.set_name(f"ManagedTask-{chr(65+i)}")
        
        # 监控任务执行
        completed_tasks = []
        failed_tasks = []
        
        # 等待任务完成或超时
        try:
            done, pending = await asyncio.wait(
                tasks, 
                timeout=2.5,
                return_when=asyncio.FIRST_EXCEPTION
            )
            
            # 处理完成的任务
            for task in done:
                try:
                    result = await task
                    completed_tasks.append((task.get_name(), result))
                except Exception as e:
                    failed_tasks.append((task.get_name(), str(e)))
            
            # 取消未完成的任务
            for task in pending:
                print(f"取消未完成的任务: {task.get_name()}")
                task.cancel()
            
            # 等待取消完成
            if pending:
                await asyncio.gather(*pending, return_exceptions=True)
        
        except Exception as e:
            print(f"任务管理异常: {e}")
        
        # 报告结果
        print(f"\n任务执行报告:")
        print(f"  完成任务: {len(completed_tasks)}")
        for name, result in completed_tasks:
            print(f"    {name}: {result}")
        
        print(f"  失败任务: {len(failed_tasks)}")
        for name, error in failed_tasks:
            print(f"    {name}: {error}")
    
    async def demonstrate_task_groups(self):
        """演示任务组 (Python 3.11+)"""
        print("\n--- 任务组演示 ---")
        
        async def group_task(name: str, duration: float, should_fail: bool = False) -> str:
            await asyncio.sleep(duration)
            if should_fail:
                raise ValueError(f"任务 {name} 故意失败")
            return f"任务 {name} 成功"
        
        try:
            # 检查是否支持TaskGroup (Python 3.11+)
            if hasattr(asyncio, 'TaskGroup'):
                async with asyncio.TaskGroup() as tg:
                    task1 = tg.create_task(group_task("G1", 1.0))
                    task2 = tg.create_task(group_task("G2", 0.5))
                    task3 = tg.create_task(group_task("G3", 1.5))
                
                print(f"任务组结果: {[task1.result(), task2.result(), task3.result()]}")
            else:
                print("当前Python版本不支持TaskGroup,使用gather替代")
                results = await asyncio.gather(
                    group_task("G1", 1.0),
                    group_task("G2", 0.5),
                    group_task("G3", 1.5),
                    return_exceptions=True
                )
                print(f"任务组结果: {results}")
        
        except* ValueError as eg:  # Python 3.11+ 异常组语法
            print(f"任务组中有任务失败: {[str(e) for e in eg.exceptions]}")
        except Exception as e:
            print(f"任务组异常: {e}")

# 运行Task和Future演示
async def run_task_future_demo():
    demo = TaskFutureDemo()
    
    await demo.demonstrate_future()
    await demo.demonstrate_task_creation()
    await demo.demonstrate_task_management()
    await demo.demonstrate_task_groups()

asyncio.run(run_task_future_demo())

异步I/O操作

文件I/O

异步文件操作可以避免阻塞事件循环。

# 异步文件I/O
print("\n=== 异步文件I/O ===")

import asyncio
import aiofiles
import os
import json
from pathlib import Path
from typing import List, Dict, Any
import tempfile

class AsyncFileIO:
    """异步文件I/O演示类"""
    
    def __init__(self):
        self.temp_dir = tempfile.mkdtemp()
        print(f"临时目录: {self.temp_dir}")
    
    async def demonstrate_basic_file_operations(self):
        """演示基本文件操作"""
        print("\n--- 基本异步文件操作 ---")
        
        file_path = os.path.join(self.temp_dir, "test.txt")
        
        # 异步写入文件
        async with aiofiles.open(file_path, 'w', encoding='utf-8') as f:
            await f.write("这是异步写入的内容\n")
            await f.write("第二行内容\n")
            await f.write("第三行内容\n")
        
        print(f"文件写入完成: {file_path}")
        
        # 异步读取文件
        async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
            content = await f.read()
            print(f"文件内容:\n{content}")
        
        # 逐行读取
        print("逐行读取:")
        async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
            line_number = 1
            async for line in f:
                print(f"  第{line_number}行: {line.strip()}")
                line_number += 1
    
    async def demonstrate_concurrent_file_operations(self):
        """演示并发文件操作"""
        print("\n--- 并发文件操作 ---")
        
        async def write_file(filename: str, content: str, delay: float = 0) -> str:
            """异步写入文件"""
            if delay:
                await asyncio.sleep(delay)
            
            file_path = os.path.join(self.temp_dir, filename)
            async with aiofiles.open(file_path, 'w', encoding='utf-8') as f:
                await f.write(content)
            
            return f"文件 {filename} 写入完成"
        
        async def read_file(filename: str, delay: float = 0) -> str:
            """异步读取文件"""
            if delay:
                await asyncio.sleep(delay)
            
            file_path = os.path.join(self.temp_dir, filename)
            try:
                async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
                    content = await f.read()
                return f"文件 {filename} 内容: {content.strip()}"
            except FileNotFoundError:
                return f"文件 {filename} 不存在"
        
        # 并发写入多个文件
        write_tasks = [
            write_file("file1.txt", "文件1的内容", 0.1),
            write_file("file2.txt", "文件2的内容", 0.2),
            write_file("file3.txt", "文件3的内容", 0.15)
        ]
        
        write_results = await asyncio.gather(*write_tasks)
        for result in write_results:
            print(f"  {result}")
        
        # 并发读取多个文件
        read_tasks = [
            read_file("file1.txt", 0.1),
            read_file("file2.txt", 0.05),
            read_file("file3.txt", 0.2)
        ]
        
        read_results = await asyncio.gather(*read_tasks)
        for result in read_results:
            print(f"  {result}")
    
    async def demonstrate_json_operations(self):
        """演示JSON文件操作"""
        print("\n--- 异步JSON操作 ---")
        
        # 准备JSON数据
        data = {
            "users": [
                {"id": 1, "name": "张三", "email": "zhangsan@example.com"},
                {"id": 2, "name": "李四", "email": "lisi@example.com"},
                {"id": 3, "name": "王五", "email": "wangwu@example.com"}
            ],
            "metadata": {
                "version": "1.0",
                "created_at": "2024-01-01"
            }
        }
        
        json_file = os.path.join(self.temp_dir, "data.json")
        
        # 异步写入JSON
        async with aiofiles.open(json_file, 'w', encoding='utf-8') as f:
            await f.write(json.dumps(data, ensure_ascii=False, indent=2))
        
        print("JSON文件写入完成")
        
        # 异步读取JSON
        async with aiofiles.open(json_file, 'r', encoding='utf-8') as f:
            json_content = await f.read()
            loaded_data = json.loads(json_content)
        
        print(f"读取的JSON数据:")
        print(f"  用户数量: {len(loaded_data['users'])}")
        for user in loaded_data['users']:
            print(f"    {user['name']} ({user['email']})")
    
    async def demonstrate_large_file_processing(self):
        """演示大文件处理"""
        print("\n--- 大文件处理 ---")
        
        large_file = os.path.join(self.temp_dir, "large_file.txt")
        
        # 生成大文件
        async def generate_large_file():
            async with aiofiles.open(large_file, 'w', encoding='utf-8') as f:
                for i in range(10000):
                    await f.write(f"这是第 {i+1} 行内容\n")
                    if i % 1000 == 0:
                        await asyncio.sleep(0.01)  # 让出控制权
        
        print("生成大文件...")
        await generate_large_file()
        
        # 分块读取大文件
        async def process_large_file():
            line_count = 0
            word_count = 0
            
            async with aiofiles.open(large_file, 'r', encoding='utf-8') as f:
                async for line in f:
                    line_count += 1
                    word_count += len(line.split())
                    
                    # 每处理1000行让出一次控制权
                    if line_count % 1000 == 0:
                        await asyncio.sleep(0.01)
            
            return line_count, word_count
        
        print("处理大文件...")
        lines, words = await process_large_file()
        print(f"文件统计: {lines} 行, {words} 个单词")
        
        # 获取文件大小
        file_size = os.path.getsize(large_file)
        print(f"文件大小: {file_size / 1024:.2f} KB")
    
    def cleanup(self):
        """清理临时文件"""
        import shutil
        shutil.rmtree(self.temp_dir)
        print(f"清理临时目录: {self.temp_dir}")

# 运行异步文件I/O演示
async def run_async_file_demo():
    demo = AsyncFileIO()
    
    try:
        await demo.demonstrate_basic_file_operations()
        await demo.demonstrate_concurrent_file_operations()
        await demo.demonstrate_json_operations()
        await demo.demonstrate_large_file_processing()
    finally:
        demo.cleanup()

# 检查是否安装了aiofiles
try:
    import aiofiles
    asyncio.run(run_async_file_demo())
except ImportError:
    print("需要安装aiofiles: pip install aiofiles")
    
    # 使用标准库的替代方案
    async def basic_file_demo():
        print("\n使用标准库进行异步文件操作:")
        
        # 在线程池中执行文件操作
        loop = asyncio.get_running_loop()
        
        def write_file_sync(filename, content):
            with open(filename, 'w', encoding='utf-8') as f:
                f.write(content)
            return f"文件 {filename} 写入完成"
        
        def read_file_sync(filename):
            try:
                with open(filename, 'r', encoding='utf-8') as f:
                    return f.read()
            except FileNotFoundError:
                return None
        
        # 在线程池中执行
        temp_file = "temp_async.txt"
        
        # 异步写入
        result = await loop.run_in_executor(None, write_file_sync, temp_file, "异步写入的内容")
        print(f"  {result}")
        
        # 异步读取
        content = await loop.run_in_executor(None, read_file_sync, temp_file)
        print(f"  读取内容: {content}")
        
        # 清理
        os.remove(temp_file)
    
    asyncio.run(basic_file_demo())

网络I/O

异步网络编程是asyncio的主要应用场景之一。

# 异步网络I/O
print("\n=== 异步网络I/O ===")

import asyncio
import socket
import json
from typing import Dict, List, Optional, Tuple
import time

class AsyncNetworkDemo:
    """异步网络演示类"""
    
    async def demonstrate_tcp_server(self):
        """演示TCP服务器"""
        print("\n--- 异步TCP服务器 ---")
        
        async def handle_client(reader, writer):
            """处理客户端连接"""
            addr = writer.get_extra_info('peername')
            print(f"客户端连接: {addr}")
            
            try:
                while True:
                    # 读取数据
                    data = await reader.read(1024)
                    if not data:
                        break
                    
                    message = data.decode('utf-8').strip()
                    print(f"收到消息: {message}")
                    
                    # 处理消息
                    if message.lower() == 'quit':
                        response = "再见!"
                        writer.write(response.encode('utf-8'))
                        await writer.drain()
                        break
                    elif message.lower() == 'time':
                        response = f"当前时间: {time.strftime('%Y-%m-%d %H:%M:%S')}"
                    else:
                        response = f"回声: {message}"
                    
                    # 发送响应
                    writer.write(f"{response}\n".encode('utf-8'))
                    await writer.drain()
            
            except Exception as e:
                print(f"处理客户端异常: {e}")
            finally:
                print(f"客户端断开: {addr}")
                writer.close()
                await writer.wait_closed()
        
        # 启动服务器
        server = await asyncio.start_server(
            handle_client, 
            'localhost', 
            8888
        )
        
        addr = server.sockets[0].getsockname()
        print(f"服务器启动在 {addr}")
        
        # 运行服务器一段时间
        async with server:
            # 创建测试客户端
            await asyncio.sleep(0.1)
            await self.test_tcp_client()
    
    async def test_tcp_client(self):
        """测试TCP客户端"""
        print("\n--- TCP客户端测试 ---")
        
        try:
            # 连接服务器
            reader, writer = await asyncio.open_connection(
                'localhost', 8888
            )
            
            # 发送测试消息
            test_messages = ['Hello', 'time', 'Python异步编程', 'quit']
            
            for message in test_messages:
                print(f"发送: {message}")
                writer.write(f"{message}\n".encode('utf-8'))
                await writer.drain()
                
                # 读取响应
                response = await reader.readline()
                print(f"响应: {response.decode('utf-8').strip()}")
                
                await asyncio.sleep(0.5)
            
            # 关闭连接
            writer.close()
            await writer.wait_closed()
        
        except Exception as e:
            print(f"客户端异常: {e}")
    
    async def demonstrate_http_client(self):
        """演示HTTP客户端"""
        print("\n--- 异步HTTP客户端 ---")
        
        # 使用aiohttp的简化版本(如果没有安装aiohttp)
        async def simple_http_request(url: str) -> Dict:
            """简单的HTTP请求"""
            try:
                # 解析URL
                if url.startswith('http://'):
                    url = url[7:]
                elif url.startswith('https://'):
                    # 简化演示,不处理HTTPS
                    return {'error': '此演示不支持HTTPS'}
                
                host, *path_parts = url.split('/', 1)
                path = '/' + (path_parts[0] if path_parts else '')
                
                # 连接服务器
                reader, writer = await asyncio.open_connection(host, 80)
                
                # 发送HTTP请求
                request = f"GET {path} HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n"
                writer.write(request.encode('utf-8'))
                await writer.drain()
                
                # 读取响应
                response_data = await reader.read()
                response_text = response_data.decode('utf-8', errors='ignore')
                
                # 关闭连接
                writer.close()
                await writer.wait_closed()
                
                # 解析响应
                lines = response_text.split('\r\n')
                status_line = lines[0] if lines else ''
                
                return {
                    'status': status_line,
                    'length': len(response_data),
                    'preview': response_text[:200] + '...' if len(response_text) > 200 else response_text
                }
            
            except Exception as e:
                return {'error': str(e)}
        
        # 测试HTTP请求
        test_urls = [
            'http://httpbin.org/get',
            'http://httpbin.org/json',
            'http://example.com'
        ]
        
        for url in test_urls:
            print(f"\n请求: {url}")
            result = await simple_http_request(url)
            
            if 'error' in result:
                print(f"  错误: {result['error']}")
            else:
                print(f"  状态: {result['status']}")
                print(f"  大小: {result['length']} 字节")
                print(f"  预览: {result['preview'][:100]}...")
    
    async def demonstrate_concurrent_requests(self):
        """演示并发网络请求"""
        print("\n--- 并发网络请求 ---")
        
        async def fetch_url(session_id: int, url: str, delay: float = 0) -> Dict:
            """获取URL内容"""
            start_time = time.time()
            
            if delay:
                await asyncio.sleep(delay)
            
            try:
                # 模拟网络请求
                await asyncio.sleep(0.5 + (session_id % 3) * 0.2)  # 模拟不同的响应时间
                
                end_time = time.time()
                
                return {
                    'session_id': session_id,
                    'url': url,
                    'status': 'success',
                    'duration': end_time - start_time,
                    'data_size': len(url) * 100  # 模拟数据大小
                }
            
            except Exception as e:
                end_time = time.time()
                return {
                    'session_id': session_id,
                    'url': url,
                    'status': 'error',
                    'duration': end_time - start_time,
                    'error': str(e)
                }
        
        # 创建并发请求
        urls = [
            'http://api1.example.com/data',
            'http://api2.example.com/users',
            'http://api3.example.com/posts',
            'http://api4.example.com/comments',
            'http://api5.example.com/tags'
        ]
        
        print(f"发起 {len(urls)} 个并发请求...")
        start_time = time.time()
        
        # 并发执行请求
        tasks = [
            fetch_url(i, url, i * 0.1) 
            for i, url in enumerate(urls)
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        end_time = time.time()
        total_duration = end_time - start_time
        
        # 分析结果
        successful_requests = [r for r in results if isinstance(r, dict) and r.get('status') == 'success']
        failed_requests = [r for r in results if isinstance(r, dict) and r.get('status') == 'error']
        exceptions = [r for r in results if isinstance(r, Exception)]
        
        print(f"\n请求结果统计:")
        print(f"  总耗时: {total_duration:.2f} 秒")
        print(f"  成功请求: {len(successful_requests)}")
        print(f"  失败请求: {len(failed_requests)}")
        print(f"  异常请求: {len(exceptions)}")
        
        # 详细结果
        for result in successful_requests:
            print(f"  会话 {result['session_id']}: {result['url']} - {result['duration']:.2f}s")
    
    async def demonstrate_websocket_simulation(self):
        """演示WebSocket模拟"""
        print("\n--- WebSocket模拟 ---")
        
        # 模拟WebSocket连接
        class MockWebSocket:
            def __init__(self, client_id: str):
                self.client_id = client_id
                self.connected = True
                self.message_queue = asyncio.Queue()
            
            async def send(self, message: str):
                if self.connected:
                    print(f"[{self.client_id}] 发送: {message}")
                    await asyncio.sleep(0.1)  # 模拟网络延迟
            
            async def receive(self) -> str:
                if self.connected:
                    return await self.message_queue.get()
                raise ConnectionError("连接已断开")
            
            async def close(self):
                self.connected = False
                print(f"[{self.client_id}] 连接关闭")
        
        # WebSocket服务器模拟
        class MockWebSocketServer:
            def __init__(self):
                self.clients = {}
                self.running = True
            
            async def handle_client(self, client_id: str):
                """处理客户端连接"""
                ws = MockWebSocket(client_id)
                self.clients[client_id] = ws
                
                try:
                    # 发送欢迎消息
                    await ws.send(f"欢迎 {client_id}!")
                    
                    # 模拟消息处理
                    for i in range(5):
                        message = f"服务器消息 {i+1}"
                        await ws.message_queue.put(message)
                        await asyncio.sleep(0.5)
                    
                    # 发送结束消息
                    await ws.send("会话结束")
                
                except Exception as e:
                    print(f"客户端 {client_id} 异常: {e}")
                finally:
                    await ws.close()
                    if client_id in self.clients:
                        del self.clients[client_id]
            
            async def broadcast(self, message: str):
                """广播消息"""
                if self.clients:
                    tasks = []
                    for client_id, ws in self.clients.items():
                        tasks.append(ws.send(f"广播: {message}"))
                    await asyncio.gather(*tasks, return_exceptions=True)
        
        # 客户端模拟
        async def websocket_client(client_id: str, server: MockWebSocketServer):
            """WebSocket客户端"""
            print(f"客户端 {client_id} 连接")
            
            # 启动服务器处理
            server_task = asyncio.create_task(server.handle_client(client_id))
            
            try:
                # 接收消息
                ws = server.clients.get(client_id)
                if ws:
                    while ws.connected:
                        try:
                            message = await asyncio.wait_for(ws.receive(), timeout=1.0)
                            print(f"[{client_id}] 收到: {message}")
                        except asyncio.TimeoutError:
                            break
                        except Exception as e:
                            print(f"[{client_id}] 接收异常: {e}")
                            break
            
            finally:
                await server_task
        
        # 运行WebSocket演示
        server = MockWebSocketServer()
        
        # 创建多个客户端
        client_tasks = [
            websocket_client(f"Client-{i}", server)
            for i in range(3)
        ]
        
        # 并发运行客户端
        await asyncio.gather(*client_tasks)
        
        print("WebSocket演示完成")

# 运行异步网络演示
async def run_network_demo():
    demo = AsyncNetworkDemo()
    
    # 注意:TCP服务器演示可能需要较长时间
    print("开始网络演示...")
    
    try:
        # 演示TCP服务器(简化版本)
        await asyncio.wait_for(demo.demonstrate_tcp_server(), timeout=10.0)
    except asyncio.TimeoutError:
        print("TCP服务器演示超时")
    except Exception as e:
        print(f"TCP服务器演示异常: {e}")
    
    # 其他网络演示
    await demo.demonstrate_http_client()
    await demo.demonstrate_concurrent_requests()
    await demo.demonstrate_websocket_simulation()

# 运行网络演示
try:
    asyncio.run(run_network_demo())
except KeyboardInterrupt:
    print("\n演示被用户中断")
except Exception as e:
    print(f"网络演示异常: {e}")

异步编程模式

生产者-消费者模式

异步队列是实现生产者-消费者模式的重要工具。

# 异步生产者-消费者模式
print("\n=== 异步生产者-消费者模式 ===")

import asyncio
import random
import time
from typing import Any, Optional, List
from dataclasses import dataclass
from enum import Enum

class TaskStatus(Enum):
    """任务状态枚举"""
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class Task:
    """任务数据类"""
    id: str
    data: Any
    priority: int = 0
    created_at: float = None
    status: TaskStatus = TaskStatus.PENDING
    
    def __post_init__(self):
        if self.created_at is None:
            self.created_at = time.time()

class AsyncProducerConsumer:
    """异步生产者-消费者演示"""
    
    def __init__(self, queue_size: int = 10):
        self.queue = asyncio.Queue(maxsize=queue_size)
        self.results = asyncio.Queue()
        self.stats = {
            'produced': 0,
            'consumed': 0,
            'failed': 0
        }
        self.running = True
    
    async def producer(self, producer_id: str, task_count: int, delay_range: tuple = (0.1, 0.5)):
        """生产者协程"""
        print(f"生产者 {producer_id} 开始工作")
        
        try:
            for i in range(task_count):
                if not self.running:
                    break
                
                # 创建任务
                task = Task(
                    id=f"{producer_id}-{i+1}",
                    data={
                        'value': random.randint(1, 100),
                        'operation': random.choice(['square', 'double', 'half'])
                    },
                    priority=random.randint(1, 5)
                )
                
                # 放入队列
                await self.queue.put(task)
                self.stats['produced'] += 1
                
                print(f"生产者 {producer_id} 生产任务: {task.id}")
                
                # 随机延迟
                delay = random.uniform(*delay_range)
                await asyncio.sleep(delay)
        
        except Exception as e:
            print(f"生产者 {producer_id} 异常: {e}")
        finally:
            print(f"生产者 {producer_id} 完成工作")
    
    async def consumer(self, consumer_id: str, delay_range: tuple = (0.2, 0.8)):
        """消费者协程"""
        print(f"消费者 {consumer_id} 开始工作")
        
        try:
            while self.running:
                try:
                    # 从队列获取任务
                    task = await asyncio.wait_for(self.queue.get(), timeout=1.0)
                    
                    print(f"消费者 {consumer_id} 处理任务: {task.id}")
                    task.status = TaskStatus.PROCESSING
                    
                    # 模拟任务处理
                    await asyncio.sleep(random.uniform(*delay_range))
                    
                    # 处理任务数据
                    try:
                        value = task.data['value']
                        operation = task.data['operation']
                        
                        if operation == 'square':
                            result = value ** 2
                        elif operation == 'double':
                            result = value * 2
                        elif operation == 'half':
                            result = value / 2
                        else:
                            raise ValueError(f"未知操作: {operation}")
                        
                        # 随机失败
                        if random.random() < 0.1:  # 10% 失败率
                            raise RuntimeError("随机处理失败")
                        
                        task.status = TaskStatus.COMPLETED
                        task.data['result'] = result
                        
                        print(f"消费者 {consumer_id} 完成任务: {task.id} -> {result}")
                        self.stats['consumed'] += 1
                    
                    except Exception as e:
                        task.status = TaskStatus.FAILED
                        task.data['error'] = str(e)
                        print(f"消费者 {consumer_id} 任务失败: {task.id} - {e}")
                        self.stats['failed'] += 1
                    
                    # 将结果放入结果队列
                    await self.results.put(task)
                    
                    # 标记任务完成
                    self.queue.task_done()
                
                except asyncio.TimeoutError:
                    # 超时,检查是否应该继续运行
                    continue
                except Exception as e:
                    print(f"消费者 {consumer_id} 异常: {e}")
                    break
        
        finally:
            print(f"消费者 {consumer_id} 停止工作")
    
    async def monitor(self, interval: float = 2.0):
        """监控协程"""
        print("监控器开始工作")
        
        try:
            while self.running:
                await asyncio.sleep(interval)
                
                queue_size = self.queue.qsize()
                results_size = self.results.qsize()
                
                print(f"\n=== 系统状态 ===")
                print(f"队列大小: {queue_size}")
                print(f"结果队列大小: {results_size}")
                print(f"已生产: {self.stats['produced']}")
                print(f"已消费: {self.stats['consumed']}")
                print(f"失败数: {self.stats['failed']}")
                print(f"成功率: {self.stats['consumed']/(self.stats['consumed']+self.stats['failed'])*100:.1f}%" if (self.stats['consumed']+self.stats['failed']) > 0 else "N/A")
        
        except Exception as e:
            print(f"监控器异常: {e}")
        finally:
            print("监控器停止工作")
    
    async def result_collector(self):
        """结果收集器"""
        print("结果收集器开始工作")
        collected_results = []
        
        try:
            while self.running:
                try:
                    result = await asyncio.wait_for(self.results.get(), timeout=1.0)
                    collected_results.append(result)
                    
                    if result.status == TaskStatus.COMPLETED:
                        print(f"收集成功结果: {result.id} = {result.data.get('result')}")
                    else:
                        print(f"收集失败结果: {result.id} - {result.data.get('error')}")
                
                except asyncio.TimeoutError:
                    continue
                except Exception as e:
                    print(f"结果收集器异常: {e}")
                    break
        
        finally:
            print(f"结果收集器完成,共收集 {len(collected_results)} 个结果")
            return collected_results
    
    async def run_demo(self, duration: float = 10.0):
        """运行演示"""
        print(f"\n开始生产者-消费者演示 (运行 {duration} 秒)")
        
        # 创建生产者任务
        producers = [
            asyncio.create_task(self.producer(f"P{i}", 10, (0.1, 0.3)))
            for i in range(2)
        ]
        
        # 创建消费者任务
        consumers = [
            asyncio.create_task(self.consumer(f"C{i}", (0.2, 0.6)))
            for i in range(3)
        ]
        
        # 创建监控和收集器任务
        monitor_task = asyncio.create_task(self.monitor(2.0))
        collector_task = asyncio.create_task(self.result_collector())
        
        try:
            # 运行指定时间
            await asyncio.sleep(duration)
            
            # 停止系统
            print("\n停止系统...")
            self.running = False
            
            # 等待生产者完成
            await asyncio.gather(*producers, return_exceptions=True)
            
            # 等待队列清空
            await self.queue.join()
            
            # 等待消费者和其他任务完成
            await asyncio.gather(
                *consumers, monitor_task, collector_task,
                return_exceptions=True
            )
        
        except Exception as e:
            print(f"演示异常: {e}")
        
        finally:
            print("\n=== 最终统计 ===")
            print(f"总生产: {self.stats['produced']}")
            print(f"总消费: {self.stats['consumed']}")
            print(f"总失败: {self.stats['failed']}")
            print(f"队列剩余: {self.queue.qsize()}")

# 运行生产者-消费者演示
async def run_producer_consumer_demo():
    demo = AsyncProducerConsumer(queue_size=5)
    await demo.run_demo(8.0)

asyncio.run(run_producer_consumer_demo())

异步上下文管理器

异步上下文管理器用于管理异步资源的获取和释放。

# 异步上下文管理器
print("\n=== 异步上下文管理器 ===")

import asyncio
import aiofiles
import tempfile
import os
from typing import Optional, Any, AsyncGenerator
from contextlib import asynccontextmanager
import time

class AsyncResourceManager:
    """异步资源管理器示例"""
    
    def __init__(self, resource_name: str, setup_time: float = 0.1, cleanup_time: float = 0.1):
        self.resource_name = resource_name
        self.setup_time = setup_time
        self.cleanup_time = cleanup_time
        self.resource = None
        self.is_acquired = False
    
    async def __aenter__(self):
        """异步进入上下文"""
        print(f"获取资源: {self.resource_name}")
        
        # 模拟资源获取过程
        await asyncio.sleep(self.setup_time)
        
        self.resource = {
            'name': self.resource_name,
            'id': id(self),
            'acquired_at': time.time(),
            'data': f"Resource-{self.resource_name}-Data"
        }
        
        self.is_acquired = True
        print(f"资源 {self.resource_name} 获取成功")
        return self.resource
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步退出上下文"""
        print(f"释放资源: {self.resource_name}")
        
        if exc_type:
            print(f"资源 {self.resource_name} 在异常情况下释放: {exc_type.__name__}: {exc_val}")
        
        # 模拟资源清理过程
        await asyncio.sleep(self.cleanup_time)
        
        self.resource = None
        self.is_acquired = False
        print(f"资源 {self.resource_name} 释放完成")
        
        # 返回False表示不抑制异常
        return False

class AsyncDatabaseConnection:
    """异步数据库连接模拟"""
    
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.connection = None
        self.transaction = None
        self.is_connected = False
    
    async def __aenter__(self):
        """建立数据库连接"""
        print(f"连接数据库: {self.connection_string}")
        
        # 模拟连接建立
        await asyncio.sleep(0.2)
        
        self.connection = {
            'connection_id': f"conn_{id(self)}",
            'connected_at': time.time(),
            'status': 'connected'
        }
        
        self.is_connected = True
        print("数据库连接建立成功")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """关闭数据库连接"""
        if self.transaction:
            if exc_type:
                print("回滚事务")
                await self.rollback()
            else:
                print("提交事务")
                await self.commit()
        
        print("关闭数据库连接")
        await asyncio.sleep(0.1)
        
        self.connection = None
        self.is_connected = False
        print("数据库连接已关闭")
    
    async def begin_transaction(self):
        """开始事务"""
        if not self.is_connected:
            raise RuntimeError("数据库未连接")
        
        print("开始事务")
        self.transaction = {
            'transaction_id': f"tx_{id(self)}_{time.time()}",
            'started_at': time.time()
        }
        await asyncio.sleep(0.05)
    
    async def commit(self):
        """提交事务"""
        if self.transaction:
            print(f"提交事务: {self.transaction['transaction_id']}")
            await asyncio.sleep(0.05)
            self.transaction = None
    
    async def rollback(self):
        """回滚事务"""
        if self.transaction:
            print(f"回滚事务: {self.transaction['transaction_id']}")
            await asyncio.sleep(0.05)
            self.transaction = None
    
    async def execute(self, query: str) -> dict:
        """执行查询"""
        if not self.is_connected:
            raise RuntimeError("数据库未连接")
        
        print(f"执行查询: {query}")
        await asyncio.sleep(0.1)  # 模拟查询执行时间
        
        return {
            'query': query,
            'result': f"查询结果_{time.time()}",
            'rows_affected': 1
        }

# 使用装饰器创建异步上下文管理器
@asynccontextmanager
async def async_file_processor(file_path: str) -> AsyncGenerator[dict, None]:
    """异步文件处理器"""
    print(f"开始处理文件: {file_path}")
    
    # 设置阶段
    processor = {
        'file_path': file_path,
        'start_time': time.time(),
        'lines_processed': 0,
        'status': 'processing'
    }
    
    try:
        # 模拟文件打开
        await asyncio.sleep(0.1)
        print(f"文件 {file_path} 打开成功")
        
        yield processor
        
    except Exception as e:
        print(f"文件处理异常: {e}")
        processor['status'] = 'error'
        processor['error'] = str(e)
        raise
    
    finally:
        # 清理阶段
        processor['end_time'] = time.time()
        processor['duration'] = processor['end_time'] - processor['start_time']
        
        print(f"文件处理完成: {file_path}")
        print(f"  处理时间: {processor['duration']:.2f} 秒")
        print(f"  处理行数: {processor['lines_processed']}")
        print(f"  最终状态: {processor['status']}")

@asynccontextmanager
async def async_connection_pool(pool_size: int = 5) -> AsyncGenerator[list, None]:
    """异步连接池"""
    print(f"创建连接池 (大小: {pool_size})")
    
    # 创建连接池
    pool = []
    try:
        for i in range(pool_size):
            connection = {
                'id': f"conn_{i}",
                'created_at': time.time(),
                'in_use': False
            }
            pool.append(connection)
            await asyncio.sleep(0.02)  # 模拟连接创建时间
        
        print(f"连接池创建完成,共 {len(pool)} 个连接")
        yield pool
        
    finally:
        # 清理连接池
        print("清理连接池")
        for conn in pool:
            print(f"  关闭连接: {conn['id']}")
            await asyncio.sleep(0.01)
        
        print("连接池清理完成")

class AsyncContextDemo:
    """异步上下文管理器演示"""
    
    async def demonstrate_basic_context_manager(self):
        """演示基本异步上下文管理器"""
        print("\n--- 基本异步上下文管理器 ---")
        
        # 正常使用
        async with AsyncResourceManager("Database", 0.1, 0.1) as resource:
            print(f"使用资源: {resource['name']}")
            await asyncio.sleep(0.5)
            print(f"资源数据: {resource['data']}")
        
        print("\n--- 异常情况下的资源管理 ---")
        
        # 异常情况
        try:
            async with AsyncResourceManager("Cache", 0.05, 0.05) as resource:
                print(f"使用资源: {resource['name']}")
                await asyncio.sleep(0.2)
                raise ValueError("模拟业务异常")
        except ValueError as e:
            print(f"捕获异常: {e}")
    
    async def demonstrate_database_context(self):
        """演示数据库上下文管理器"""
        print("\n--- 数据库上下文管理器 ---")
        
        # 成功的数据库操作
        async with AsyncDatabaseConnection("postgresql://localhost:5432/testdb") as db:
            await db.begin_transaction()
            
            result1 = await db.execute("INSERT INTO users (name) VALUES ('Alice')")
            print(f"插入结果: {result1['rows_affected']} 行")
            
            result2 = await db.execute("SELECT * FROM users WHERE name = 'Alice'")
            print(f"查询结果: {result2['result']}")
            
            # 事务会自动提交
        
        print("\n--- 数据库异常处理 ---")
        
        # 异常情况下的数据库操作
        try:
            async with AsyncDatabaseConnection("postgresql://localhost:5432/testdb") as db:
                await db.begin_transaction()
                
                await db.execute("INSERT INTO users (name) VALUES ('Bob')")
                await db.execute("UPDATE users SET email = 'invalid' WHERE name = 'Bob'")
                
                # 模拟数据库错误
                raise RuntimeError("数据库约束违反")
        
        except RuntimeError as e:
            print(f"数据库操作异常: {e}")
    
    async def demonstrate_decorator_context_manager(self):
        """演示装饰器创建的上下文管理器"""
        print("\n--- 装饰器上下文管理器 ---")
        
        # 文件处理器
        async with async_file_processor("/tmp/test_file.txt") as processor:
            print(f"处理器状态: {processor['status']}")
            
            # 模拟文件处理
            for i in range(10):
                await asyncio.sleep(0.1)
                processor['lines_processed'] += 1
                
                if i % 3 == 0:
                    print(f"  已处理 {processor['lines_processed']} 行")
            
            processor['status'] = 'completed'
        
        # 连接池
        async with async_connection_pool(3) as pool:
            print(f"\n获得连接池,共 {len(pool)} 个连接")
            
            # 模拟使用连接
            async def use_connection(conn_id: int):
                if conn_id < len(pool):
                    conn = pool[conn_id]
                    conn['in_use'] = True
                    print(f"  使用连接: {conn['id']}")
                    await asyncio.sleep(0.3)
                    conn['in_use'] = False
                    print(f"  释放连接: {conn['id']}")
            
            # 并发使用连接
            tasks = [use_connection(i) for i in range(3)]
            await asyncio.gather(*tasks)
    
    async def demonstrate_nested_context_managers(self):
        """演示嵌套上下文管理器"""
        print("\n--- 嵌套上下文管理器 ---")
        
        async with AsyncResourceManager("OuterResource", 0.1, 0.1) as outer:
            print(f"外层资源: {outer['name']}")
            
            async with AsyncResourceManager("InnerResource", 0.05, 0.05) as inner:
                print(f"内层资源: {inner['name']}")
                
                async with async_file_processor("/tmp/nested_file.txt") as processor:
                    print(f"文件处理器: {processor['file_path']}")
                    
                    # 模拟嵌套操作
                    await asyncio.sleep(0.3)
                    processor['lines_processed'] = 5
                    processor['status'] = 'completed'
                    
                    print("所有资源都已获取,执行业务逻辑")
        
        print("所有资源已按顺序释放")
    
    async def demonstrate_multiple_context_managers(self):
        """演示多个上下文管理器"""
        print("\n--- 多个上下文管理器 ---")
        
        # 同时使用多个上下文管理器
        async with (
            AsyncResourceManager("Resource1", 0.05, 0.05) as res1,
            AsyncResourceManager("Resource2", 0.05, 0.05) as res2,
            async_connection_pool(2) as pool
        ):
            print(f"同时获得多个资源:")
            print(f"  资源1: {res1['name']}")
            print(f"  资源2: {res2['name']}")
            print(f"  连接池: {len(pool)} 个连接")
            
            # 使用所有资源
            await asyncio.sleep(0.5)
            print("所有资源使用完毕")

# 运行异步上下文管理器演示
async def run_context_manager_demo():
    demo = AsyncContextDemo()
    
    await demo.demonstrate_basic_context_manager()
    await demo.demonstrate_database_context()
    await demo.demonstrate_decorator_context_manager()
    await demo.demonstrate_nested_context_managers()
    await demo.demonstrate_multiple_context_managers()

asyncio.run(run_context_manager_demo())

性能优化与最佳实践

性能优化技巧

异步编程的性能优化需要考虑多个方面。

# 异步编程性能优化
print("\n=== 异步编程性能优化 ===")

import asyncio
import time
import concurrent.futures
from typing import List, Callable, Any
import functools
import weakref

class AsyncPerformanceOptimizer:
    """异步性能优化演示"""
    
    def __init__(self):
        self.cache = {}
        self.connection_pool = []
        self.semaphore = asyncio.Semaphore(10)  # 限制并发数
    
    async def demonstrate_batching(self):
        """演示批处理优化"""
        print("\n--- 批处理优化 ---")
        
        # 模拟数据库操作
        async def single_db_operation(item_id: int) -> dict:
            """单个数据库操作"""
            await asyncio.sleep(0.1)  # 模拟数据库延迟
            return {'id': item_id, 'data': f'item_{item_id}'}
        
        async def batch_db_operation(item_ids: List[int]) -> List[dict]:
            """批量数据库操作"""
            await asyncio.sleep(0.2)  # 批量操作延迟更短
            return [{'id': item_id, 'data': f'item_{item_id}'} for item_id in item_ids]
        
        # 测试单个操作
        print("单个操作测试:")
        start_time = time.time()
        
        single_tasks = [single_db_operation(i) for i in range(10)]
        single_results = await asyncio.gather(*single_tasks)
        
        single_duration = time.time() - start_time
        print(f"  单个操作耗时: {single_duration:.2f} 秒")
        print(f"  结果数量: {len(single_results)}")
        
        # 测试批量操作
        print("\n批量操作测试:")
        start_time = time.time()
        
        batch_results = await batch_db_operation(list(range(10)))
        
        batch_duration = time.time() - start_time
        print(f"  批量操作耗时: {batch_duration:.2f} 秒")
        print(f"  结果数量: {len(batch_results)}")
        print(f"  性能提升: {single_duration/batch_duration:.1f}x")
    
    async def demonstrate_caching(self):
        """演示缓存优化"""
        print("\n--- 缓存优化 ---")
        
        async def expensive_operation(key: str) -> str:
            """昂贵的操作"""
            print(f"  执行昂贵操作: {key}")
            await asyncio.sleep(0.5)  # 模拟耗时操作
            return f"result_for_{key}"
        
        async def cached_operation(key: str) -> str:
            """带缓存的操作"""
            if key in self.cache:
                print(f"  缓存命中: {key}")
                return self.cache[key]
            
            result = await expensive_operation(key)
            self.cache[key] = result
            return result
        
        # 测试缓存效果
        test_keys = ['key1', 'key2', 'key1', 'key3', 'key2', 'key1']
        
        print("缓存测试:")
        start_time = time.time()
        
        for key in test_keys:
            result = await cached_operation(key)
            print(f"  {key} -> {result}")
        
        cache_duration = time.time() - start_time
        print(f"\n缓存测试耗时: {cache_duration:.2f} 秒")
        print(f"缓存命中率: {(len(test_keys) - len(set(test_keys))) / len(test_keys) * 100:.1f}%")
    
    async def demonstrate_connection_pooling(self):
        """演示连接池优化"""
        print("\n--- 连接池优化 ---")
        
        # 模拟连接创建和使用
        class MockConnection:
            def __init__(self, conn_id: int):
                self.id = conn_id
                self.in_use = False
                self.created_at = time.time()
            
            async def execute(self, query: str) -> str:
                await asyncio.sleep(0.1)
                return f"Connection {self.id}: {query} executed"
        
        async def create_connection() -> MockConnection:
            """创建新连接"""
            await asyncio.sleep(0.2)  # 模拟连接创建时间
            conn_id = len(self.connection_pool)
            return MockConnection(conn_id)
        
        async def get_connection() -> MockConnection:
            """从池中获取连接"""
            # 查找空闲连接
            for conn in self.connection_pool:
                if not conn.in_use:
                    conn.in_use = True
                    return conn
            
            # 如果没有空闲连接,创建新连接
            if len(self.connection_pool) < 5:  # 最大连接数
                conn = await create_connection()
                self.connection_pool.append(conn)
                conn.in_use = True
                return conn
            
            # 等待连接可用
            while True:
                await asyncio.sleep(0.01)
                for conn in self.connection_pool:
                    if not conn.in_use:
                        conn.in_use = True
                        return conn
        
        def release_connection(conn: MockConnection):
            """释放连接"""
            conn.in_use = False
        
        async def database_task(task_id: int) -> str:
            """数据库任务"""
            conn = await get_connection()
            try:
                result = await conn.execute(f"SELECT * FROM table WHERE id = {task_id}")
                await asyncio.sleep(0.1)  # 模拟查询时间
                return result
            finally:
                release_connection(conn)
        
        # 测试连接池
        print("连接池测试:")
        start_time = time.time()
        
        tasks = [database_task(i) for i in range(20)]
        results = await asyncio.gather(*tasks)
        
        pool_duration = time.time() - start_time
        print(f"  连接池测试耗时: {pool_duration:.2f} 秒")
        print(f"  创建连接数: {len(self.connection_pool)}")
        print(f"  执行任务数: {len(results)}")
    
    async def demonstrate_semaphore_limiting(self):
        """演示信号量限制并发"""
        print("\n--- 信号量限制并发 ---")
        
        async def limited_task(task_id: int) -> str:
            """受限制的任务"""
            async with self.semaphore:
                print(f"  任务 {task_id} 开始执行")
                await asyncio.sleep(0.2)
                print(f"  任务 {task_id} 执行完成")
                return f"task_{task_id}_result"
        
        # 创建大量任务
        print(f"创建 30 个任务,并发限制为 {self.semaphore._value}:")
        start_time = time.time()
        
        tasks = [limited_task(i) for i in range(30)]
        results = await asyncio.gather(*tasks)
        
        limited_duration = time.time() - start_time
        print(f"\n限制并发测试耗时: {limited_duration:.2f} 秒")
        print(f"完成任务数: {len(results)}")
    
    async def demonstrate_cpu_bound_optimization(self):
        """演示CPU密集型任务优化"""
        print("\n--- CPU密集型任务优化 ---")
        
        def cpu_intensive_task(n: int) -> int:
            """CPU密集型任务"""
            result = 0
            for i in range(n):
                result += i * i
            return result
        
        # 在事件循环中直接执行(不推荐)
        print("在事件循环中执行CPU密集型任务:")
        start_time = time.time()
        
        # 这会阻塞事件循环
        direct_result = cpu_intensive_task(1000000)
        
        direct_duration = time.time() - start_time
        print(f"  直接执行耗时: {direct_duration:.2f} 秒")
        print(f"  结果: {direct_result}")
        
        # 在线程池中执行(推荐)
        print("\n在线程池中执行CPU密集型任务:")
        start_time = time.time()
        
        loop = asyncio.get_running_loop()
        with concurrent.futures.ThreadPoolExecutor() as executor:
            # 并发执行多个CPU密集型任务
            tasks = [
                loop.run_in_executor(executor, cpu_intensive_task, 500000)
                for _ in range(4)
            ]
            
            thread_results = await asyncio.gather(*tasks)
        
        thread_duration = time.time() - start_time
        print(f"  线程池执行耗时: {thread_duration:.2f} 秒")
        print(f"  结果数量: {len(thread_results)}")
        print(f"  性能对比: 直接执行 vs 线程池 = {direct_duration:.2f}s vs {thread_duration:.2f}s")

# 运行性能优化演示
async def run_performance_demo():
    optimizer = AsyncPerformanceOptimizer()
    
    await optimizer.demonstrate_batching()
    await optimizer.demonstrate_caching()
    await optimizer.demonstrate_connection_pooling()
    await optimizer.demonstrate_semaphore_limiting()
    await optimizer.demonstrate_cpu_bound_optimization()

asyncio.run(run_performance_demo())

实践练习

练习1:异步Web爬虫系统

创建一个高性能的异步Web爬虫,支持并发控制、错误处理和结果统计。

# 异步Web爬虫系统
print("\n=== 异步Web爬虫系统 ===")

import asyncio
import aiohttp
import time
from typing import List, Dict, Optional, Set
from urllib.parse import urljoin, urlparse
import re
from dataclasses import dataclass, field
from collections import defaultdict
import json

@dataclass
class CrawlResult:
    """爬取结果"""
    url: str
    status_code: int
    title: str = ""
    content_length: int = 0
    links: List[str] = field(default_factory=list)
    error: Optional[str] = None
    response_time: float = 0.0
    timestamp: float = field(default_factory=time.time)

@dataclass
class CrawlStats:
    """爬取统计"""
    total_urls: int = 0
    successful_crawls: int = 0
    failed_crawls: int = 0
    total_response_time: float = 0.0
    status_codes: Dict[int, int] = field(default_factory=lambda: defaultdict(int))
    errors: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
    start_time: float = field(default_factory=time.time)
    end_time: Optional[float] = None
    
    @property
    def success_rate(self) -> float:
        """成功率"""
        if self.total_urls == 0:
            return 0.0
        return self.successful_crawls / self.total_urls * 100
    
    @property
    def average_response_time(self) -> float:
        """平均响应时间"""
        if self.successful_crawls == 0:
            return 0.0
        return self.total_response_time / self.successful_crawls
    
    @property
    def duration(self) -> float:
        """总耗时"""
        end = self.end_time or time.time()
        return end - self.start_time

class AsyncWebCrawler:
    """异步Web爬虫"""
    
    def __init__(self, max_concurrent: int = 10, timeout: float = 10.0, delay: float = 0.1):
        self.max_concurrent = max_concurrent
        self.timeout = timeout
        self.delay = delay
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session: Optional[aiohttp.ClientSession] = None
        self.stats = CrawlStats()
        self.visited_urls: Set[str] = set()
        self.results: List[CrawlResult] = []
    
    async def __aenter__(self):
        """异步上下文管理器入口"""
        connector = aiohttp.TCPConnector(limit=self.max_concurrent * 2)
        timeout = aiohttp.ClientTimeout(total=self.timeout)
        
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout,
            headers={
                'User-Agent': 'AsyncWebCrawler/1.0 (Educational Purpose)'
            }
        )
        
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步上下文管理器出口"""
        if self.session:
            await self.session.close()
        
        self.stats.end_time = time.time()
    
    async def crawl_url(self, url: str) -> CrawlResult:
        """爬取单个URL"""
        async with self.semaphore:
            start_time = time.time()
            result = CrawlResult(url=url, status_code=0)
            
            try:
                # 添加延迟以避免过于频繁的请求
                if self.delay > 0:
                    await asyncio.sleep(self.delay)
                
                async with self.session.get(url) as response:
                    result.status_code = response.status
                    result.response_time = time.time() - start_time
                    
                    if response.status == 200:
                        content = await response.text()
                        result.content_length = len(content)
                        
                        # 提取标题
                        title_match = re.search(r'<title[^>]*>([^<]+)</title>', content, re.IGNORECASE)
                        if title_match:
                            result.title = title_match.group(1).strip()
                        
                        # 提取链接
                        result.links = self._extract_links(content, url)
                        
                        self.stats.successful_crawls += 1
                        self.stats.total_response_time += result.response_time
                    else:
                        result.error = f"HTTP {response.status}"
                        self.stats.failed_crawls += 1
                    
                    self.stats.status_codes[response.status] += 1
            
            except asyncio.TimeoutError:
                result.error = "Timeout"
                self.stats.failed_crawls += 1
                self.stats.errors["Timeout"] += 1
            
            except aiohttp.ClientError as e:
                result.error = f"Client Error: {str(e)}"
                self.stats.failed_crawls += 1
                self.stats.errors["Client Error"] += 1
            
            except Exception as e:
                result.error = f"Unknown Error: {str(e)}"
                self.stats.failed_crawls += 1
                self.stats.errors["Unknown Error"] += 1
            
            self.stats.total_urls += 1
            self.results.append(result)
            self.visited_urls.add(url)
            
            return result
    
    def _extract_links(self, content: str, base_url: str) -> List[str]:
        """从HTML内容中提取链接"""
        links = []
        
        # 提取href属性
        href_pattern = r'href=["\']([^"\'>]+)["\']'
        matches = re.findall(href_pattern, content, re.IGNORECASE)
        
        for match in matches:
            # 转换为绝对URL
            absolute_url = urljoin(base_url, match)
            
            # 过滤有效的HTTP/HTTPS链接
            if absolute_url.startswith(('http://', 'https://')):
                links.append(absolute_url)
        
        return list(set(links))  # 去重
    
    async def crawl_urls(self, urls: List[str]) -> List[CrawlResult]:
        """批量爬取URL"""
        print(f"开始爬取 {len(urls)} 个URL,最大并发数: {self.max_concurrent}")
        
        # 创建爬取任务
        tasks = [self.crawl_url(url) for url in urls]
        
        # 执行爬取
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 处理异常结果
        processed_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                error_result = CrawlResult(
                    url=urls[i],
                    status_code=0,
                    error=f"Task Error: {str(result)}"
                )
                processed_results.append(error_result)
                self.stats.failed_crawls += 1
                self.stats.errors["Task Error"] += 1
            else:
                processed_results.append(result)
        
        return processed_results
    
    async def crawl_with_depth(self, start_urls: List[str], max_depth: int = 2, max_urls: int = 50) -> List[CrawlResult]:
        """深度爬取"""
        print(f"开始深度爬取,最大深度: {max_depth},最大URL数: {max_urls}")
        
        current_urls = start_urls.copy()
        all_results = []
        
        for depth in range(max_depth + 1):
            if not current_urls or len(self.visited_urls) >= max_urls:
                break
            
            print(f"\n--- 深度 {depth} ---")
            print(f"待爬取URL数: {len(current_urls)}")
            
            # 过滤已访问的URL
            new_urls = [url for url in current_urls if url not in self.visited_urls]
            
            if not new_urls:
                print("没有新的URL需要爬取")
                break
            
            # 限制URL数量
            if len(self.visited_urls) + len(new_urls) > max_urls:
                new_urls = new_urls[:max_urls - len(self.visited_urls)]
            
            # 爬取当前深度的URL
            depth_results = await self.crawl_urls(new_urls)
            all_results.extend(depth_results)
            
            # 收集下一深度的URL
            next_urls = []
            for result in depth_results:
                if result.error is None and result.links:
                    next_urls.extend(result.links)
            
            # 去重并限制数量
            next_urls = list(set(next_urls))
            current_urls = [url for url in next_urls if url not in self.visited_urls]
            
            print(f"完成深度 {depth},发现 {len(next_urls)} 个新链接")
        
        return all_results
    
    def print_stats(self):
        """打印统计信息"""
        print(f"\n=== 爬取统计 ===")
        print(f"总URL数: {self.stats.total_urls}")
        print(f"成功爬取: {self.stats.successful_crawls}")
        print(f"失败爬取: {self.stats.failed_crawls}")
        print(f"成功率: {self.stats.success_rate:.1f}%")
        print(f"平均响应时间: {self.stats.average_response_time:.2f} 秒")
        print(f"总耗时: {self.stats.duration:.2f} 秒")
        
        if self.stats.status_codes:
            print(f"\n状态码分布:")
            for code, count in sorted(self.stats.status_codes.items()):
                print(f"  {code}: {count}")
        
        if self.stats.errors:
            print(f"\n错误分布:")
            for error, count in self.stats.errors.items():
                print(f"  {error}: {count}")
    
    def export_results(self, filename: str = "crawl_results.json"):
        """导出结果到JSON文件"""
        export_data = {
            'stats': {
                'total_urls': self.stats.total_urls,
                'successful_crawls': self.stats.successful_crawls,
                'failed_crawls': self.stats.failed_crawls,
                'success_rate': self.stats.success_rate,
                'average_response_time': self.stats.average_response_time,
                'duration': self.stats.duration,
                'status_codes': dict(self.stats.status_codes),
                'errors': dict(self.stats.errors)
            },
            'results': [
                {
                    'url': result.url,
                    'status_code': result.status_code,
                    'title': result.title,
                    'content_length': result.content_length,
                    'links_count': len(result.links),
                    'error': result.error,
                    'response_time': result.response_time,
                    'timestamp': result.timestamp
                }
                for result in self.results
            ]
        }
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(export_data, f, indent=2, ensure_ascii=False)
        
        print(f"\n结果已导出到: {filename}")

# 爬虫演示
async def run_crawler_demo():
    """运行爬虫演示"""
    
    # 测试URL列表(使用一些公开的测试网站)
    test_urls = [
        "https://httpbin.org/status/200",
        "https://httpbin.org/status/404",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/json",
        "https://httpbin.org/html",
    ]
    
    print("=== 异步Web爬虫演示 ===")
    
    # 基本爬取演示
    async with AsyncWebCrawler(max_concurrent=3, timeout=5.0, delay=0.5) as crawler:
        print("\n--- 基本爬取演示 ---")
        results = await crawler.crawl_urls(test_urls)
        
        print(f"\n爬取结果:")
        for result in results:
            status = "✅" if result.error is None else "❌"
            print(f"  {status} {result.url}")
            print(f"    状态码: {result.status_code}")
            print(f"    标题: {result.title or 'N/A'}")
            print(f"    响应时间: {result.response_time:.2f}s")
            if result.error:
                print(f"    错误: {result.error}")
            print()
        
        crawler.print_stats()
        crawler.export_results("basic_crawl_results.json")
    
    # 深度爬取演示(使用模拟数据)
    print("\n--- 深度爬取演示 ---")
    async with AsyncWebCrawler(max_concurrent=2, timeout=3.0, delay=0.3) as crawler:
        # 注意:这里使用测试URL,实际使用时请确保遵守网站的robots.txt
        start_urls = ["https://httpbin.org/links/5"]
        
        depth_results = await crawler.crawl_with_depth(
            start_urls=start_urls,
            max_depth=1,
            max_urls=10
        )
        
        crawler.print_stats()
        crawler.export_results("depth_crawl_results.json")

# 运行爬虫演示
asyncio.run(run_crawler_demo())

练习2:异步任务调度系统

创建一个支持定时任务、优先级队列和任务依赖的异步调度系统。

# 异步任务调度系统
print("\n=== 异步任务调度系统 ===")

import asyncio
import time
from typing import Dict, List, Optional, Callable, Any, Set
from dataclasses import dataclass, field
from enum import Enum
import heapq
from datetime import datetime, timedelta
import uuid
import json

class TaskStatus(Enum):
    """任务状态"""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"
    WAITING_DEPENDENCIES = "waiting_dependencies"

class TaskPriority(Enum):
    """任务优先级"""
    LOW = 3
    NORMAL = 2
    HIGH = 1
    CRITICAL = 0

@dataclass
class TaskResult:
    """任务结果"""
    task_id: str
    status: TaskStatus
    result: Any = None
    error: Optional[str] = None
    start_time: Optional[float] = None
    end_time: Optional[float] = None
    execution_time: Optional[float] = None
    
    @property
    def duration(self) -> Optional[float]:
        """执行时长"""
        if self.start_time and self.end_time:
            return self.end_time - self.start_time
        return None

@dataclass
class Task:
    """任务定义"""
    id: str
    name: str
    func: Callable
    args: tuple = field(default_factory=tuple)
    kwargs: dict = field(default_factory=dict)
    priority: TaskPriority = TaskPriority.NORMAL
    dependencies: Set[str] = field(default_factory=set)
    scheduled_time: Optional[float] = None
    timeout: Optional[float] = None
    retry_count: int = 0
    max_retries: int = 0
    retry_delay: float = 1.0
    created_at: float = field(default_factory=time.time)
    
    def __lt__(self, other):
        """用于优先级队列排序"""
        if self.scheduled_time and other.scheduled_time:
            return self.scheduled_time < other.scheduled_time
        elif self.scheduled_time:
            return True
        elif other.scheduled_time:
            return False
        else:
            return self.priority.value < other.priority.value

class AsyncTaskScheduler:
    """异步任务调度器"""
    
    def __init__(self, max_concurrent_tasks: int = 10):
        self.max_concurrent_tasks = max_concurrent_tasks
        self.semaphore = asyncio.Semaphore(max_concurrent_tasks)
        
        # 任务存储
        self.tasks: Dict[str, Task] = {}
        self.task_results: Dict[str, TaskResult] = {}
        self.task_queue: List[Task] = []  # 优先级队列
        self.scheduled_tasks: List[Task] = []  # 定时任务队列
        
        # 运行状态
        self.running_tasks: Dict[str, asyncio.Task] = {}
        self.is_running = False
        self.scheduler_task: Optional[asyncio.Task] = None
        
        # 统计信息
        self.stats = {
            'total_tasks': 0,
            'completed_tasks': 0,
            'failed_tasks': 0,
            'cancelled_tasks': 0,
            'start_time': None,
            'end_time': None
        }
    
    def add_task(self, 
                 name: str,
                 func: Callable,
                 args: tuple = (),
                 kwargs: dict = None,
                 priority: TaskPriority = TaskPriority.NORMAL,
                 dependencies: Set[str] = None,
                 scheduled_time: Optional[float] = None,
                 timeout: Optional[float] = None,
                 max_retries: int = 0,
                 retry_delay: float = 1.0) -> str:
        """添加任务"""
        
        task_id = str(uuid.uuid4())
        task = Task(
            id=task_id,
            name=name,
            func=func,
            args=args,
            kwargs=kwargs or {},
            priority=priority,
            dependencies=dependencies or set(),
            scheduled_time=scheduled_time,
            timeout=timeout,
            max_retries=max_retries,
            retry_delay=retry_delay
        )
        
        self.tasks[task_id] = task
        self.task_results[task_id] = TaskResult(
            task_id=task_id,
            status=TaskStatus.PENDING
        )
        
        # 根据是否有依赖和定时来决定放入哪个队列
        if task.dependencies:
            self.task_results[task_id].status = TaskStatus.WAITING_DEPENDENCIES
        elif task.scheduled_time:
            heapq.heappush(self.scheduled_tasks, task)
        else:
            heapq.heappush(self.task_queue, task)
        
        self.stats['total_tasks'] += 1
        print(f"添加任务: {name} (ID: {task_id[:8]}...)")
        
        return task_id
    
    def add_scheduled_task(self,
                          name: str,
                          func: Callable,
                          delay_seconds: float,
                          args: tuple = (),
                          kwargs: dict = None,
                          priority: TaskPriority = TaskPriority.NORMAL) -> str:
        """添加定时任务"""
        scheduled_time = time.time() + delay_seconds
        return self.add_task(
            name=name,
            func=func,
            args=args,
            kwargs=kwargs,
            priority=priority,
            scheduled_time=scheduled_time
        )
    
    def add_dependent_task(self,
                          name: str,
                          func: Callable,
                          dependencies: List[str],
                          args: tuple = (),
                          kwargs: dict = None,
                          priority: TaskPriority = TaskPriority.NORMAL) -> str:
        """添加依赖任务"""
        return self.add_task(
            name=name,
            func=func,
            args=args,
            kwargs=kwargs,
            priority=priority,
            dependencies=set(dependencies)
        )
    
    async def _execute_task(self, task: Task) -> TaskResult:
        """执行单个任务"""
        result = self.task_results[task.id]
        
        async with self.semaphore:
            result.status = TaskStatus.RUNNING
            result.start_time = time.time()
            
            print(f"开始执行任务: {task.name} (ID: {task.id[:8]}...)")
            
            try:
                # 设置超时
                if task.timeout:
                    task_result = await asyncio.wait_for(
                        self._run_task_function(task),
                        timeout=task.timeout
                    )
                else:
                    task_result = await self._run_task_function(task)
                
                result.result = task_result
                result.status = TaskStatus.COMPLETED
                result.end_time = time.time()
                result.execution_time = result.end_time - result.start_time
                
                self.stats['completed_tasks'] += 1
                print(f"任务完成: {task.name} (耗时: {result.execution_time:.2f}s)")
                
            except asyncio.TimeoutError:
                result.error = f"任务超时 (>{task.timeout}s)"
                result.status = TaskStatus.FAILED
                result.end_time = time.time()
                
                self.stats['failed_tasks'] += 1
                print(f"任务超时: {task.name}")
                
            except Exception as e:
                result.error = str(e)
                result.status = TaskStatus.FAILED
                result.end_time = time.time()
                
                # 重试逻辑
                if task.retry_count < task.max_retries:
                    task.retry_count += 1
                    print(f"任务失败,准备重试 ({task.retry_count}/{task.max_retries}): {task.name}")
                    
                    # 延迟后重新加入队列
                    await asyncio.sleep(task.retry_delay)
                    heapq.heappush(self.task_queue, task)
                    result.status = TaskStatus.PENDING
                    return result
                
                self.stats['failed_tasks'] += 1
                print(f"任务失败: {task.name} - {str(e)}")
            
            # 检查并释放依赖此任务的其他任务
            if result.status == TaskStatus.COMPLETED:
                await self._check_and_release_dependent_tasks(task.id)
            
            return result
    
    async def _run_task_function(self, task: Task) -> Any:
        """运行任务函数"""
        if asyncio.iscoroutinefunction(task.func):
            return await task.func(*task.args, **task.kwargs)
        else:
            # 在线程池中运行同步函数
            loop = asyncio.get_running_loop()
            return await loop.run_in_executor(None, lambda: task.func(*task.args, **task.kwargs))
    
    async def _check_and_release_dependent_tasks(self, completed_task_id: str):
        """检查并释放依赖任务"""
        for task_id, task in self.tasks.items():
            if (completed_task_id in task.dependencies and 
                self.task_results[task_id].status == TaskStatus.WAITING_DEPENDENCIES):
                
                # 移除已完成的依赖
                task.dependencies.remove(completed_task_id)
                
                # 如果所有依赖都已完成,将任务加入执行队列
                if not task.dependencies:
                    self.task_results[task_id].status = TaskStatus.PENDING
                    if task.scheduled_time:
                        heapq.heappush(self.scheduled_tasks, task)
                    else:
                        heapq.heappush(self.task_queue, task)
                    
                    print(f"依赖任务已就绪: {task.name}")
    
    async def _scheduler_loop(self):
        """调度器主循环"""
        print("任务调度器启动")
        self.stats['start_time'] = time.time()
        
        while self.is_running:
            try:
                # 处理定时任务
                current_time = time.time()
                while (self.scheduled_tasks and 
                       self.scheduled_tasks[0].scheduled_time <= current_time):
                    
                    scheduled_task = heapq.heappop(self.scheduled_tasks)
                    heapq.heappush(self.task_queue, scheduled_task)
                    print(f"定时任务就绪: {scheduled_task.name}")
                
                # 处理普通任务队列
                if self.task_queue and len(self.running_tasks) < self.max_concurrent_tasks:
                    task = heapq.heappop(self.task_queue)
                    
                    # 创建任务协程
                    task_coroutine = asyncio.create_task(self._execute_task(task))
                    self.running_tasks[task.id] = task_coroutine
                    
                    # 设置任务完成回调
                    task_coroutine.add_done_callback(
                        lambda t, task_id=task.id: self.running_tasks.pop(task_id, None)
                    )
                
                # 检查是否所有任务都已完成
                if (not self.task_queue and 
                    not self.scheduled_tasks and 
                    not self.running_tasks and 
                    self._all_tasks_processed()):
                    
                    print("所有任务已完成,调度器准备停止")
                    break
                
                # 短暂休眠以避免过度占用CPU
                await asyncio.sleep(0.1)
                
            except Exception as e:
                print(f"调度器错误: {e}")
                await asyncio.sleep(1)
        
        # 等待所有运行中的任务完成
        if self.running_tasks:
            print(f"等待 {len(self.running_tasks)} 个运行中的任务完成...")
            await asyncio.gather(*self.running_tasks.values(), return_exceptions=True)
        
        self.stats['end_time'] = time.time()
        print("任务调度器已停止")
    
    def _all_tasks_processed(self) -> bool:
        """检查是否所有任务都已处理"""
        for result in self.task_results.values():
            if result.status in [TaskStatus.PENDING, TaskStatus.RUNNING, TaskStatus.WAITING_DEPENDENCIES]:
                return False
        return True
    
    async def start(self):
        """启动调度器"""
        if self.is_running:
            print("调度器已在运行")
            return
        
        self.is_running = True
        self.scheduler_task = asyncio.create_task(self._scheduler_loop())
    
    async def stop(self):
        """停止调度器"""
        print("正在停止调度器...")
        self.is_running = False
        
        if self.scheduler_task:
            await self.scheduler_task
    
    def cancel_task(self, task_id: str) -> bool:
        """取消任务"""
        if task_id not in self.tasks:
            return False
        
        result = self.task_results[task_id]
        
        if result.status == TaskStatus.RUNNING:
            # 取消运行中的任务
            if task_id in self.running_tasks:
                self.running_tasks[task_id].cancel()
                result.status = TaskStatus.CANCELLED
                self.stats['cancelled_tasks'] += 1
                return True
        elif result.status in [TaskStatus.PENDING, TaskStatus.WAITING_DEPENDENCIES]:
            # 取消等待中的任务
            result.status = TaskStatus.CANCELLED
            self.stats['cancelled_tasks'] += 1
            return True
        
        return False
    
    def get_task_status(self, task_id: str) -> Optional[TaskResult]:
        """获取任务状态"""
        return self.task_results.get(task_id)
    
    def get_all_results(self) -> Dict[str, TaskResult]:
        """获取所有任务结果"""
        return self.task_results.copy()
    
    def print_stats(self):
        """打印统计信息"""
        duration = None
        if self.stats['start_time']:
            end_time = self.stats['end_time'] or time.time()
            duration = end_time - self.stats['start_time']
        
        print(f"\n=== 任务调度统计 ===")
        print(f"总任务数: {self.stats['total_tasks']}")
        print(f"已完成: {self.stats['completed_tasks']}")
        print(f"已失败: {self.stats['failed_tasks']}")
        print(f"已取消: {self.stats['cancelled_tasks']}")
        print(f"运行中: {len(self.running_tasks)}")
        
        if duration:
            print(f"总耗时: {duration:.2f} 秒")
            if self.stats['completed_tasks'] > 0:
                print(f"平均任务耗时: {duration / self.stats['completed_tasks']:.2f} 秒")
        
        # 按状态分组显示任务
        status_groups = {}
        for result in self.task_results.values():
            status = result.status.value
            if status not in status_groups:
                status_groups[status] = []
            status_groups[status].append(result)
        
        print(f"\n任务状态分布:")
        for status, results in status_groups.items():
            print(f"  {status}: {len(results)}")

# 任务调度演示
async def run_scheduler_demo():
    """运行任务调度演示"""
    
    # 定义一些示例任务函数
    async def async_task(name: str, duration: float) -> str:
        """异步任务"""
        print(f"  异步任务 {name} 开始执行")
        await asyncio.sleep(duration)
        print(f"  异步任务 {name} 执行完成")
        return f"异步任务 {name} 的结果"
    
    def sync_task(name: str, duration: float) -> str:
        """同步任务"""
        print(f"  同步任务 {name} 开始执行")
        time.sleep(duration)
        print(f"  同步任务 {name} 执行完成")
        return f"同步任务 {name} 的结果"
    
    def failing_task(name: str) -> str:
        """会失败的任务"""
        print(f"  失败任务 {name} 开始执行")
        raise ValueError(f"任务 {name} 故意失败")
    
    def cpu_intensive_task(name: str, n: int) -> int:
        """CPU密集型任务"""
        print(f"  CPU密集型任务 {name} 开始执行")
        result = sum(i * i for i in range(n))
        print(f"  CPU密集型任务 {name} 执行完成")
        return result
    
    print("=== 异步任务调度系统演示 ===")
    
    # 创建调度器
    scheduler = AsyncTaskScheduler(max_concurrent_tasks=3)
    
    # 添加各种类型的任务
    print("\n--- 添加任务 ---")
    
    # 普通任务
    task1_id = scheduler.add_task(
        "普通任务1",
        async_task,
        args=("Task1", 1.0),
        priority=TaskPriority.NORMAL
    )
    
    task2_id = scheduler.add_task(
        "普通任务2",
        sync_task,
        args=("Task2", 0.5),
        priority=TaskPriority.HIGH
    )
    
    # 定时任务
    scheduled_id = scheduler.add_scheduled_task(
        "定时任务",
        async_task,
        delay_seconds=2.0,
        args=("Scheduled", 0.5),
        priority=TaskPriority.CRITICAL
    )
    
    # 依赖任务
    dependent_id = scheduler.add_dependent_task(
        "依赖任务",
        async_task,
        dependencies=[task1_id, task2_id],
        args=("Dependent", 0.3),
        priority=TaskPriority.NORMAL
    )
    
    # 重试任务
    retry_id = scheduler.add_task(
        "重试任务",
        failing_task,
        args=("Retry",),
        max_retries=2,
        retry_delay=0.5
    )
    
    # CPU密集型任务
    cpu_id = scheduler.add_task(
        "CPU密集型任务",
        cpu_intensive_task,
        args=("CPU", 100000),
        timeout=5.0
    )
    
    # 启动调度器
    print("\n--- 启动调度器 ---")
    await scheduler.start()
    
    # 等待一段时间后检查状态
    await asyncio.sleep(1)
    print(f"\n--- 1秒后状态检查 ---")
    print(f"运行中任务数: {len(scheduler.running_tasks)}")
    
    # 等待调度器完成所有任务
    await scheduler.stop()
    
    # 打印最终统计
    scheduler.print_stats()
    
    # 显示详细结果
    print(f"\n--- 详细任务结果 ---")
    for task_id, result in scheduler.get_all_results().items():
        task = scheduler.tasks[task_id]
        print(f"任务: {task.name}")
        print(f"  状态: {result.status.value}")
        print(f"  结果: {result.result}")
        if result.error:
            print(f"  错误: {result.error}")
        if result.duration:
            print(f"  耗时: {result.duration:.2f}s")
        print()

# 运行任务调度演示
asyncio.run(run_scheduler_demo())

总结

核心知识点

  1. 异步编程基础

    • 同步与异步的区别
    • 事件循环机制
    • 协程的概念和优势
  2. 协程语法

    • async/await 语法
    • 协程函数定义和调用
    • 异步上下文管理器
    • 异步迭代器和生成器
  3. asyncio核心组件

    • 事件循环管理
    • 任务和Future对象
    • 并发控制和同步原语
  4. 网络I/O编程

    • 异步TCP服务器和客户端
    • HTTP客户端编程
    • 并发网络请求处理
  5. 异步编程模式

    • 生产者-消费者模式
    • 任务队列和工作池
    • 异步上下文管理
  6. 性能优化技巧

    • 批处理优化
    • 缓存策略
    • 连接池管理
    • 并发控制

技能掌握

基础技能

  • 理解异步编程概念
  • 掌握基本的async/await语法
  • 能够编写简单的异步函数
  • 了解事件循环的工作原理

中级技能

  • 熟练使用asyncio库的各种功能
  • 能够处理异步异常和错误
  • 掌握异步上下文管理器的使用
  • 理解并发控制和同步机制

高级技能

  • 能够设计复杂的异步系统架构
  • 掌握性能优化技巧
  • 能够处理大规模并发场景
  • 理解异步编程的最佳实践

最佳实践

设计原则

  • 避免在异步函数中使用阻塞操作
  • 合理使用并发控制机制
  • 正确处理异步异常
  • 遵循异步编程模式

性能考虑

  • 使用连接池减少连接开销
  • 实施批处理优化
  • 合理设置并发限制
  • 使用缓存减少重复计算

错误处理

  • 使用try-except处理异步异常
  • 实现重试机制
  • 设置合理的超时时间
  • 记录详细的错误信息

可维护性

  • 使用异步上下文管理器管理资源
  • 编写清晰的异步代码
  • 添加适当的日志和监控
  • 遵循代码规范

常见陷阱

阻塞操作问题

  • 在异步函数中使用同步I/O
  • 使用time.sleep而不是asyncio.sleep
  • 在事件循环中执行CPU密集型任务

并发控制问题

  • 没有限制并发数量
  • 资源竞争和死锁
  • 不当的异常处理

内存泄漏问题

  • 未正确关闭连接和资源
  • 任务引用循环
  • 事件循环未正确清理

性能问题

  • 过度创建协程
  • 不必要的上下文切换
  • 缺乏批处理优化

性能考虑

内存使用

  • 协程比线程更轻量
  • 注意避免内存泄漏
  • 合理管理连接池大小

计算效率

  • I/O密集型任务的理想选择
  • CPU密集型任务需要特殊处理
  • 合理使用线程池执行器

优化策略

  • 使用uvloop提升性能
  • 实施连接复用
  • 采用批处理模式
  • 优化序列化和反序列化

下一步学习

深入主题

  • 学习网络编程基础
  • 掌握数据库异步操作
  • 了解微服务架构
  • 研究高性能Web框架

相关技术

  • FastAPI和Starlette框架
  • aiohttp和aiofiles库
  • 异步数据库驱动
  • 消息队列和任务调度

实践项目

  • 构建异步Web API
  • 开发实时聊天系统
  • 创建数据采集爬虫
  • 实现分布式任务系统

扩展阅读

官方文档

推荐书籍

  • 《Using Asyncio in Python》
  • 《Python Tricks: The Book》
  • 《Effective Python》

在线资源

  • Real Python异步编程教程
  • AsyncIO官方教程
  • Python异步编程最佳实践

实践项目

  • 异步Web爬虫项目
  • 实时数据处理系统
  • 高并发API服务
  • 分布式任务调度器

下一章预告:在下一章中,我们将学习网络编程基础,包括Socket编程、HTTP协议、网络安全等内容,这将为构建网络应用程序奠定坚实的基础。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐