Python-协程

参考:豆包-跟着ai学习

  • 本章学习知识点

    • 协程基础概念
    • 协程:gevent\async\await\asyncio使用

一、协程基础概念

  • 什么是协程

    • 协程(Coroutine),又称微线程、纤程,是运行在单线程内的 “并发” 编程方式。其核心特性是:
      • 程序可在执行函数 A 时主动中断,切换到函数 B 执行;
      • 切换过程由程序自身控制,而非操作系统调度;
      • 切换时会自动保存 / 恢复执行状态;
      • 全程仅占用一个线程,无线程切换开销.
  • 协程的核心价值

    优势 适用场景 局限性
    切换开销远低于线程 / 进程 IO 密集型任务(网络请求、文件读写、数据库操作) 无法利用多核 CPU
    无需锁机制,避免资源竞争 高并发轻量级任务处理 不适合 CPU 密集型任务
    单线程内实现高并发 爬虫、API 服务、消息队列消费 需手动处理所有 IO 切换
  • 并发的本质

    • 并发 = 任务切换 + 状态保存
    • 协程正是通过在单线程内实现这两点,达到 “伪并发” 效果。
  • Python 协程发展历程

    • Python 2.x:基于生成器yield/send()实现原始协程
    • Python 3.4:引入asyncio模块,@asyncio.coroutine + yield from
    • Python 3.5+:新增async/await语法(推荐使用)
    • Python 3.6+:asyncio模块正式稳定

二、协程使用

2.1、Gevent 协程框架

  • 原理

    Gevent 是基于 greenlet 实现的第三方协程库,核心逻辑:

    • 自动检测 IO 操作(网络、文件、睡眠等)
    • 遇到 IO 时自动切换到其他协程
    • IO 完成后自动切回原协程继续执行
    • 始终保证有协程在运行,而非等待 IO
  • 安装与基础使用pip install gevent

  • 基础示例

    • 基础示例1

      # demo-1: 基础协程示例
      import time
      import gevent
      from threading import current_thread
      from gevent import monkey
      
      # 打补丁:将标准库的阻塞IO替换为gevent的非阻塞版本
      monkey.patch_all()
      
      def task1():
          """示例任务1"""
          print(f"[Task1] 启动 | 线程: {current_thread().name}")
          time.sleep(2)  # 模拟IO操作(已被monkey.patch_all()替换)
          print(f"[Task1] 完成 | 线程: {current_thread().name}")
          return "结果:task1执行完成"
      
      def task2():
          """示例任务2"""
          print(f"[Task2] 启动 | 线程: {current_thread().name}")
          time.sleep(3)  # 模拟IO操作
          print(f"[Task2] 完成 | 线程: {current_thread().name}")
          return "结果:task2执行完成"
      
      if __name__ == "__main__":
          start_time = time.time()
          
          # 创建协程对象
          g1 = gevent.spawn(task1)
          g2 = gevent.spawn(task2)
          
          # 等待所有协程完成(批量等待更高效)
          gevent.joinall([g1, g2])
          
          # 输出结果
          print(f"\n总耗时: {time.time() - start_time:.2f}秒")
          print(f"Task1返回值: {g1.value}")
          print(f"Task2返回值: {g2.value}")
      
    • 示例2: 高 IO 场景性能对比

      # demo-2: 同步vs协程性能对比
      import time
      from gevent import monkey, spawn, joinall
      
      # 打补丁
      monkey.patch_all()
      
      def io_bound_task(task_id):
          """模拟IO密集型任务"""
          time.sleep(0.4)  # 模拟网络IO/文件IO
          return f"任务{task_id}完成"
      
      def sync_execution():
          """同步执行"""
          start = time.time()
          for i in range(100):
              io_bound_task(i)
          print(f"同步执行耗时: {time.time() - start:.2f}秒")
      
      def async_execution():
          """协程异步执行"""
          start = time.time()
          # 创建100个协程
          greenlets = [spawn(io_bound_task, i) for i in range(100)]
          # 等待所有协程完成
          joinall(greenlets)
          # 获取所有结果
          results = [g.value for g in greenlets]
          print(f"协程执行耗时: {time.time() - start:.2f}秒")
          print(f"完成任务数: {len(results)}")
      
      if __name__ == "__main__":
          print("=== 同步执行 ===")
          sync_execution()
          
          print("\n=== 协程执行 ===")
          async_execution()
      
    • 示例3 - 协程版 Socket 服务器(优化版)

      • 服务端

        # server.py - 协程Socket服务器
        import socket
        from gevent import spawn, monkey
        
        # 打补丁
        monkey.patch_all()
        
        def handle_client(conn: socket.socket, addr: tuple):
            """处理单个客户端连接"""
            print(f"新连接: {addr}")
            try:
                while True:
                    # 接收客户端数据
                    data = conn.recv(1024)
                    if not data:  # 客户端关闭连接
                        break
                    # 模拟处理耗时(IO操作)
                    time.sleep(0.1)
                    # 发送响应(转大写)
                    conn.send(data.upper())
            except ConnectionResetError:
                print(f"客户端{addr}强制断开连接")
            finally:
                conn.close()
                print(f"连接关闭: {addr}")
        
        def start_server(host: str = "127.0.0.1", port: int = 8080):
            """启动协程服务器"""
            # 创建Socket对象
            server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            # 端口复用
            server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            server_socket.bind((host, port))
            server_socket.listen(100)  # 最大监听数
            print(f"服务器启动: {host}:{port}")
            
            try:
                while True:
                    # 接受客户端连接(已被patch为非阻塞)
                    conn, addr = server_socket.accept()
                    # 创建协程处理客户端
                    spawn(handle_client, conn, addr)
            except KeyboardInterrupt:
                print("\n服务器正在关闭...")
            finally:
                server_socket.close()
        
        if __name__ == "__main__":
            start_server()
        
      • 客户端

        # client.py - 多线程客户端测试
        import socket
        from threading import Thread, current_thread
        import time
        
        def client_task():
            """单个客户端任务"""
            client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            client.connect(("127.0.0.1", 8080))
            
            try:
                for i in range(5):
                    msg = f"{current_thread().name} - 消息{i}".encode("utf-8")
                    client.send(msg)
                    # 接收响应
                    response = client.recv(1024).decode("utf-8")
                    print(f"{current_thread().name} 收到: {response}")
                    time.sleep(0.05)
            finally:
                client.close()
        
        if __name__ == "__main__":
            # 创建100个客户端线程
            threads = [Thread(target=client_task) for _ in range(100)]
            # 启动所有线程
            for t in threads:
                t.start()
            # 等待所有线程完成
            for t in threads:
                t.join()
            print("所有客户端任务完成")
        

2.2、async/await

协程的核心价值是在单线程内高效处理大量 IO 密集型任务,以下是最典型、最常用的应用场景:

  1. 基础示例

    • 示例一:替代 Gevent

      # asyncio基础示例(Python3.7+)
      import asyncio
      import time
      
      async def async_task1():
          """异步任务1"""
          print("Task1: 启动")
          await asyncio.sleep(2)  # 异步睡眠(非阻塞)
          print("Task1: 完成")
          return "Task1结果"
      
      async def async_task2():
          """异步任务2"""
          print("Task2: 启动")
          await asyncio.sleep(3)  # 异步睡眠
          print("Task2: 完成")
          return "Task2结果"
      
      async def main():
          """主协程"""
          # 创建任务
          task1 = asyncio.create_task(async_task1())
          task2 = asyncio.create_task(async_task2())
          
          # 等待任务完成并获取结果
          result1 = await task1
          result2 = await task2
          
          print(f"\n结果1: {result1}")
          print(f"结果2: {result2}")
      
      if __name__ == "__main__":
          start_time = time.time()
          # 运行主协程
          asyncio.run(main())
          print(f"总耗时: {time.time() - start_time:.2f}秒")
      
  2. 网络请求 / 爬虫开发(最经典场景)

    • 适用原因:爬虫 / API 调用的核心耗时在「等待网络响应」(IO 操作),而非 CPU 计算。协程可以在等待一个请求响应时,自动切换处理下一个请求,最大化利用单线程资源。

    • 具体案例

      • 批量爬取网页数据(比如爬取 1000 个商品详情页)
      • 批量调用第三方 API(比如短信发送、支付回调、数据查询)
      • 接口压力测试(模拟大量并发请求)
    • 对比效果

      • 同步方式:1000 个请求,每个等待 1 秒,总耗时≈1000 秒
      • 协程方式:总耗时≈1 秒(仅等待最长的那个请求)
    • 示例代码(asyncio 版爬虫)

      import asyncio
      import aiohttp  # 异步HTTP库
      import time
      
      async def fetch_url(session, url):
          """异步请求单个URL"""
          async with session.get(url) as response:
              # 等待响应(IO操作,协程自动切换)
              return await response.text()
      
      async def batch_crawl(urls):
          """批量爬取URL"""
          async with aiohttp.ClientSession() as session:
              # 创建所有爬取任务
              tasks = [asyncio.create_task(fetch_url(session, url)) for url in urls]
              # 等待所有任务完成
              results = await asyncio.gather(*tasks)
              return results
      
      if __name__ == "__main__":
          # 模拟10个待爬取的URL
          test_urls = ["https://httpbin.org/get"] * 10
          
          # 同步方式(对比)
          start = time.time()
          import requests
          for url in test_urls:
              requests.get(url)
          print(f"同步耗时: {time.time() - start:.2f}秒")
          
          # 协程方式
          start = time.time()
          asyncio.run(batch_crawl(test_urls))
          print(f"协程耗时: {time.time() - start:.2f}秒")
      
  3. 数据库 / 文件 IO 密集型任务

    • 适用原因:数据库查询、文件读写属于典型的 IO 操作,协程可在等待数据库响应 / 文件读写时切换处理其他任务,提升整体处理效率。

    • 具体案例

      • 批量数据库查询(比如从数据库读取 1000 条数据并处理)
      • 大文件分块读写(比如日志分析、数据导出)
      • 数据库批量写入(比如爬虫数据入库)
    • 示例

      import asyncio
      import asyncpg  # 异步PostgreSQL库
      
      async def batch_query(db_config, user_ids):
          """批量查询用户数据"""
          # 建立异步数据库连接
          conn = await asyncpg.connect(**db_config)
          
          try:
              # 批量创建查询任务
              tasks = []
              for user_id in user_ids:
                  task = asyncio.create_task(
                      conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
                  )
                  tasks.append(task)
              
              # 等待所有查询完成
              results = await asyncio.gather(*tasks)
              return results
          finally:
              await conn.close()
      
      # 配置示例
      DB_CONFIG = {
          "user": "postgres",
          "password": "123456",
          "database": "test",
          "host": "127.0.0.1"
      }
      
      if __name__ == "__main__":
          # 批量查询100个用户
          user_ids = list(range(1, 101))
          results = asyncio.run(batch_query(DB_CONFIG, user_ids))
          print(f"查询到 {len(results)} 条数据")
      

2.3、场景说明

  • 不适合用协程的场景

    了解适用场景的同时,也要明确协程的「短板」,避免误用:

    1. CPU 密集型任务:比如大数据计算、视频编解码、机器学习训练等,协程无法利用多核 CPU,此时应使用多进程(或多进程 + 协程结合)。
    2. 简单的同步任务:比如单步的文件读写、单次 API 调用,使用协程会增加代码复杂度,反而不如同步代码简洁。
    3. 需要与老旧同步库强耦合的场景:如果核心依赖的库不支持异步(比如某些老旧的数据库驱动),强行用协程会导致频繁的线程切换,反而降低效率。
  • 协程的最佳实践组合

    • IO 密集型 + 少量 CPU 计算:纯协程(asyncio/Gevent)
    • IO 密集型 + 大量 CPU 计算:多进程 + 协程(每个进程内跑协程,既利用多核,又提升 IO 效率)
    • 高并发网络服务:协程 + IO 多路复用(asyncio 底层已整合 epoll/select)

三、IO 模型

  • IO操作的两个阶段

    所有网络 IO 操作(以 read 为例)都包含两个核心阶段:

    • 等待数据(wait data):等待数据从网络 / 磁盘到达内核缓冲区
    • 拷贝数据(copy data):将数据从内核缓冲区拷贝到用户进程内存
  • 阻塞 IO(Blocking IO)

    • 特点:两个阶段都阻塞进程 / 线程
    • 优点:实现简单,编程直观
    • 缺点:单线程只能处理一个连接,并发差
    • 解决方案:多线程 / 多进程(但资源消耗大)
  • 非阻塞 IO(Non-blocking IO)

    • 特点:等待数据阶段不阻塞,返回错误;拷贝数据阶段仍阻塞
    • 实现:设置 socket 为非阻塞模式
    • 优点:单线程可处理多个连接
    • 缺点:轮询消耗 CPU,响应延迟大
  • IO 多路复用(IO Multiplexing)

    • 核心:通过 select/epoll/kqueue 等系统调用,让内核监听多个 fd(文件描述符)

    • 流程:

      1. 进程调用 select,内核监听所有注册的 fd
      2. 任意 fd 有数据到达,select 返回
      3. 进程调用 read,拷贝数据到用户空间
    • 优势:单线程可处理大量连接

    • 优化版 select 示例

      # IO多路复用(select)优化版
      import socket
      import select
      import time
      
      def start_select_server(host: str = "127.0.0.1", port: int = 28312):
          """启动select服务器"""
          # 创建服务器socket
          server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
          server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
          server_sock.bind((host, port))
          server_sock.listen(100)
          server_sock.setblocking(False)  # 设置为非阻塞
          print(f"Select服务器启动: {host}:{port}")
          
          # 待监听的读列表
          read_fds = [server_sock]
          # 存储客户端数据
          client_data = {}
          
          try:
              while True:
                  # 监听读/写/异常事件(超时1秒)
                  readable, writable, exceptional = select.select(
                      read_fds, [], read_fds, 1.0
                  )
                  
                  # 处理可读事件
                  for sock in readable:
                      # 新连接
                      if sock == server_sock:
                          conn, addr = server_sock.accept()
                          conn.setblocking(False)
                          read_fds.append(conn)
                          client_data[conn] = b""
                          print(f"新连接: {addr}, 总连接数: {len(read_fds)-1}")
                      # 已有连接数据
                      else:
                          try:
                              data = sock.recv(1024)
                              if data:
                                  # 存储数据并响应
                                  client_data[sock] += data
                                  sock.send(f"收到数据: {data.decode('utf-8')}".encode("utf-8"))
                              else:
                                  # 客户端关闭连接
                                  print(f"客户端断开: {sock.getpeername()}")
                                  read_fds.remove(sock)
                                  del client_data[sock]
                                  sock.close()
                          except (ConnectionResetError, OSError):
                              # 客户端强制断开
                              print(f"客户端异常断开: {sock.getpeername()}")
                              read_fds.remove(sock)
                              del client_data[sock]
                              sock.close()
                  
                  # 处理异常事件
                  for sock in exceptional:
                      print(f"连接异常: {sock.getpeername()}")
                      read_fds.remove(sock)
                      del client_data[sock]
                      sock.close()
                      
          except KeyboardInterrupt:
              print("\n服务器关闭中...")
          finally:
              # 关闭所有连接
              for sock in read_fds:
                  sock.close()
      
      if __name__ == "__main__":
          start_select_server()
      

四、总结

  1. 协程核心:协程通过单线程内的任务切换 + 状态保存实现并发,最适合 IO 密集型任务,无线程切换开销和锁机制问题。
  2. 实现方式:
    • 第三方库:Gevent(自动检测 IO,使用简单)
    • 原生实现:asyncio + async/await(Python3.5 + 推荐)
  3. IO 模型选择:
    • 简单场景:阻塞 IO + 多线程
    • 高并发场景:IO 多路复用(select/epoll)或协程
    • 现代 Python:优先使用 asyncio 原生协程

先记录文档,后续用上在重新看吧、2026年3月5日11:16:43

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐