Python网络并发编程-5-协程
协程核心:协程通过单线程内的任务切换 + 状态保存实现并发,最适合 IO 密集型任务,无线程切换开销和锁机制问题。实现方式:第三方库:Gevent(自动检测 IO,使用简单)原生实现:asyncio + async/await(Python3.5 + 推荐)IO 模型选择:简单场景:阻塞 IO + 多线程高并发场景:IO 多路复用(select/epoll)或协程现代 Python:优先使用 as
Python-协程
参考:豆包-跟着ai学习
-
本章学习知识点
- 协程基础概念
- 协程:gevent\async\await\asyncio使用
一、协程基础概念
-
什么是协程
- 协程(Coroutine),又称微线程、纤程,是运行在单线程内的 “并发” 编程方式。其核心特性是:
- 程序可在执行函数 A 时主动中断,切换到函数 B 执行;
- 切换过程由程序自身控制,而非操作系统调度;
- 切换时会自动保存 / 恢复执行状态;
- 全程仅占用一个线程,无线程切换开销.
- 协程(Coroutine),又称微线程、纤程,是运行在单线程内的 “并发” 编程方式。其核心特性是:
-
协程的核心价值
优势 适用场景 局限性 切换开销远低于线程 / 进程 IO 密集型任务(网络请求、文件读写、数据库操作) 无法利用多核 CPU 无需锁机制,避免资源竞争 高并发轻量级任务处理 不适合 CPU 密集型任务 单线程内实现高并发 爬虫、API 服务、消息队列消费 需手动处理所有 IO 切换 -
并发的本质
- 并发 = 任务切换 + 状态保存
- 协程正是通过在单线程内实现这两点,达到 “伪并发” 效果。
-
Python 协程发展历程
- Python 2.x:基于生成器
yield/send()实现原始协程 - Python 3.4:引入
asyncio模块,@asyncio.coroutine+yield from - Python 3.5+:新增
async/await语法(推荐使用) - Python 3.6+:
asyncio模块正式稳定
- Python 2.x:基于生成器
二、协程使用
2.1、Gevent 协程框架
-
原理
Gevent 是基于 greenlet 实现的第三方协程库,核心逻辑:
- 自动检测 IO 操作(网络、文件、睡眠等)
- 遇到 IO 时自动切换到其他协程
- IO 完成后自动切回原协程继续执行
- 始终保证有协程在运行,而非等待 IO
-
安装与基础使用:
pip install gevent -
基础示例
-
基础示例1
# demo-1: 基础协程示例 import time import gevent from threading import current_thread from gevent import monkey # 打补丁:将标准库的阻塞IO替换为gevent的非阻塞版本 monkey.patch_all() def task1(): """示例任务1""" print(f"[Task1] 启动 | 线程: {current_thread().name}") time.sleep(2) # 模拟IO操作(已被monkey.patch_all()替换) print(f"[Task1] 完成 | 线程: {current_thread().name}") return "结果:task1执行完成" def task2(): """示例任务2""" print(f"[Task2] 启动 | 线程: {current_thread().name}") time.sleep(3) # 模拟IO操作 print(f"[Task2] 完成 | 线程: {current_thread().name}") return "结果:task2执行完成" if __name__ == "__main__": start_time = time.time() # 创建协程对象 g1 = gevent.spawn(task1) g2 = gevent.spawn(task2) # 等待所有协程完成(批量等待更高效) gevent.joinall([g1, g2]) # 输出结果 print(f"\n总耗时: {time.time() - start_time:.2f}秒") print(f"Task1返回值: {g1.value}") print(f"Task2返回值: {g2.value}") -
示例2: 高 IO 场景性能对比
# demo-2: 同步vs协程性能对比 import time from gevent import monkey, spawn, joinall # 打补丁 monkey.patch_all() def io_bound_task(task_id): """模拟IO密集型任务""" time.sleep(0.4) # 模拟网络IO/文件IO return f"任务{task_id}完成" def sync_execution(): """同步执行""" start = time.time() for i in range(100): io_bound_task(i) print(f"同步执行耗时: {time.time() - start:.2f}秒") def async_execution(): """协程异步执行""" start = time.time() # 创建100个协程 greenlets = [spawn(io_bound_task, i) for i in range(100)] # 等待所有协程完成 joinall(greenlets) # 获取所有结果 results = [g.value for g in greenlets] print(f"协程执行耗时: {time.time() - start:.2f}秒") print(f"完成任务数: {len(results)}") if __name__ == "__main__": print("=== 同步执行 ===") sync_execution() print("\n=== 协程执行 ===") async_execution() -
示例3 - 协程版 Socket 服务器(优化版)
-
服务端
# server.py - 协程Socket服务器 import socket from gevent import spawn, monkey # 打补丁 monkey.patch_all() def handle_client(conn: socket.socket, addr: tuple): """处理单个客户端连接""" print(f"新连接: {addr}") try: while True: # 接收客户端数据 data = conn.recv(1024) if not data: # 客户端关闭连接 break # 模拟处理耗时(IO操作) time.sleep(0.1) # 发送响应(转大写) conn.send(data.upper()) except ConnectionResetError: print(f"客户端{addr}强制断开连接") finally: conn.close() print(f"连接关闭: {addr}") def start_server(host: str = "127.0.0.1", port: int = 8080): """启动协程服务器""" # 创建Socket对象 server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 端口复用 server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind((host, port)) server_socket.listen(100) # 最大监听数 print(f"服务器启动: {host}:{port}") try: while True: # 接受客户端连接(已被patch为非阻塞) conn, addr = server_socket.accept() # 创建协程处理客户端 spawn(handle_client, conn, addr) except KeyboardInterrupt: print("\n服务器正在关闭...") finally: server_socket.close() if __name__ == "__main__": start_server() -
客户端
# client.py - 多线程客户端测试 import socket from threading import Thread, current_thread import time def client_task(): """单个客户端任务""" client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(("127.0.0.1", 8080)) try: for i in range(5): msg = f"{current_thread().name} - 消息{i}".encode("utf-8") client.send(msg) # 接收响应 response = client.recv(1024).decode("utf-8") print(f"{current_thread().name} 收到: {response}") time.sleep(0.05) finally: client.close() if __name__ == "__main__": # 创建100个客户端线程 threads = [Thread(target=client_task) for _ in range(100)] # 启动所有线程 for t in threads: t.start() # 等待所有线程完成 for t in threads: t.join() print("所有客户端任务完成")
-
-
2.2、async/await
协程的核心价值是在单线程内高效处理大量 IO 密集型任务,以下是最典型、最常用的应用场景:
-
基础示例
-
示例一:替代 Gevent
# asyncio基础示例(Python3.7+) import asyncio import time async def async_task1(): """异步任务1""" print("Task1: 启动") await asyncio.sleep(2) # 异步睡眠(非阻塞) print("Task1: 完成") return "Task1结果" async def async_task2(): """异步任务2""" print("Task2: 启动") await asyncio.sleep(3) # 异步睡眠 print("Task2: 完成") return "Task2结果" async def main(): """主协程""" # 创建任务 task1 = asyncio.create_task(async_task1()) task2 = asyncio.create_task(async_task2()) # 等待任务完成并获取结果 result1 = await task1 result2 = await task2 print(f"\n结果1: {result1}") print(f"结果2: {result2}") if __name__ == "__main__": start_time = time.time() # 运行主协程 asyncio.run(main()) print(f"总耗时: {time.time() - start_time:.2f}秒")
-
-
网络请求 / 爬虫开发(最经典场景)
-
适用原因:爬虫 / API 调用的核心耗时在「等待网络响应」(IO 操作),而非 CPU 计算。协程可以在等待一个请求响应时,自动切换处理下一个请求,最大化利用单线程资源。
-
具体案例:
- 批量爬取网页数据(比如爬取 1000 个商品详情页)
- 批量调用第三方 API(比如短信发送、支付回调、数据查询)
- 接口压力测试(模拟大量并发请求)
-
对比效果:
- 同步方式:1000 个请求,每个等待 1 秒,总耗时≈1000 秒
- 协程方式:总耗时≈1 秒(仅等待最长的那个请求)
-
示例代码(asyncio 版爬虫)
import asyncio import aiohttp # 异步HTTP库 import time async def fetch_url(session, url): """异步请求单个URL""" async with session.get(url) as response: # 等待响应(IO操作,协程自动切换) return await response.text() async def batch_crawl(urls): """批量爬取URL""" async with aiohttp.ClientSession() as session: # 创建所有爬取任务 tasks = [asyncio.create_task(fetch_url(session, url)) for url in urls] # 等待所有任务完成 results = await asyncio.gather(*tasks) return results if __name__ == "__main__": # 模拟10个待爬取的URL test_urls = ["https://httpbin.org/get"] * 10 # 同步方式(对比) start = time.time() import requests for url in test_urls: requests.get(url) print(f"同步耗时: {time.time() - start:.2f}秒") # 协程方式 start = time.time() asyncio.run(batch_crawl(test_urls)) print(f"协程耗时: {time.time() - start:.2f}秒")
-
-
数据库 / 文件 IO 密集型任务
-
适用原因:数据库查询、文件读写属于典型的 IO 操作,协程可在等待数据库响应 / 文件读写时切换处理其他任务,提升整体处理效率。
-
具体案例:
- 批量数据库查询(比如从数据库读取 1000 条数据并处理)
- 大文件分块读写(比如日志分析、数据导出)
- 数据库批量写入(比如爬虫数据入库)
-
示例
import asyncio import asyncpg # 异步PostgreSQL库 async def batch_query(db_config, user_ids): """批量查询用户数据""" # 建立异步数据库连接 conn = await asyncpg.connect(**db_config) try: # 批量创建查询任务 tasks = [] for user_id in user_ids: task = asyncio.create_task( conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id) ) tasks.append(task) # 等待所有查询完成 results = await asyncio.gather(*tasks) return results finally: await conn.close() # 配置示例 DB_CONFIG = { "user": "postgres", "password": "123456", "database": "test", "host": "127.0.0.1" } if __name__ == "__main__": # 批量查询100个用户 user_ids = list(range(1, 101)) results = asyncio.run(batch_query(DB_CONFIG, user_ids)) print(f"查询到 {len(results)} 条数据")
-
2.3、场景说明
-
不适合用协程的场景
了解适用场景的同时,也要明确协程的「短板」,避免误用:
- CPU 密集型任务:比如大数据计算、视频编解码、机器学习训练等,协程无法利用多核 CPU,此时应使用多进程(或多进程 + 协程结合)。
- 简单的同步任务:比如单步的文件读写、单次 API 调用,使用协程会增加代码复杂度,反而不如同步代码简洁。
- 需要与老旧同步库强耦合的场景:如果核心依赖的库不支持异步(比如某些老旧的数据库驱动),强行用协程会导致频繁的线程切换,反而降低效率。
-
协程的最佳实践组合
- IO 密集型 + 少量 CPU 计算:纯协程(asyncio/Gevent)
- IO 密集型 + 大量 CPU 计算:多进程 + 协程(每个进程内跑协程,既利用多核,又提升 IO 效率)
- 高并发网络服务:协程 + IO 多路复用(asyncio 底层已整合 epoll/select)
三、IO 模型
-
IO操作的两个阶段
所有网络 IO 操作(以 read 为例)都包含两个核心阶段:
- 等待数据(wait data):等待数据从网络 / 磁盘到达内核缓冲区
- 拷贝数据(copy data):将数据从内核缓冲区拷贝到用户进程内存
-
阻塞 IO(Blocking IO)
- 特点:两个阶段都阻塞进程 / 线程
- 优点:实现简单,编程直观
- 缺点:单线程只能处理一个连接,并发差
- 解决方案:多线程 / 多进程(但资源消耗大)
-
非阻塞 IO(Non-blocking IO)
- 特点:等待数据阶段不阻塞,返回错误;拷贝数据阶段仍阻塞
- 实现:设置 socket 为非阻塞模式
- 优点:单线程可处理多个连接
- 缺点:轮询消耗 CPU,响应延迟大
-
IO 多路复用(IO Multiplexing)
-
核心:通过 select/epoll/kqueue 等系统调用,让内核监听多个 fd(文件描述符)
-
流程:
- 进程调用 select,内核监听所有注册的 fd
- 任意 fd 有数据到达,select 返回
- 进程调用 read,拷贝数据到用户空间
-
优势:单线程可处理大量连接
-
优化版 select 示例:
# IO多路复用(select)优化版 import socket import select import time def start_select_server(host: str = "127.0.0.1", port: int = 28312): """启动select服务器""" # 创建服务器socket server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_sock.bind((host, port)) server_sock.listen(100) server_sock.setblocking(False) # 设置为非阻塞 print(f"Select服务器启动: {host}:{port}") # 待监听的读列表 read_fds = [server_sock] # 存储客户端数据 client_data = {} try: while True: # 监听读/写/异常事件(超时1秒) readable, writable, exceptional = select.select( read_fds, [], read_fds, 1.0 ) # 处理可读事件 for sock in readable: # 新连接 if sock == server_sock: conn, addr = server_sock.accept() conn.setblocking(False) read_fds.append(conn) client_data[conn] = b"" print(f"新连接: {addr}, 总连接数: {len(read_fds)-1}") # 已有连接数据 else: try: data = sock.recv(1024) if data: # 存储数据并响应 client_data[sock] += data sock.send(f"收到数据: {data.decode('utf-8')}".encode("utf-8")) else: # 客户端关闭连接 print(f"客户端断开: {sock.getpeername()}") read_fds.remove(sock) del client_data[sock] sock.close() except (ConnectionResetError, OSError): # 客户端强制断开 print(f"客户端异常断开: {sock.getpeername()}") read_fds.remove(sock) del client_data[sock] sock.close() # 处理异常事件 for sock in exceptional: print(f"连接异常: {sock.getpeername()}") read_fds.remove(sock) del client_data[sock] sock.close() except KeyboardInterrupt: print("\n服务器关闭中...") finally: # 关闭所有连接 for sock in read_fds: sock.close() if __name__ == "__main__": start_select_server()
-
四、总结
- 协程核心:协程通过单线程内的任务切换 + 状态保存实现并发,最适合 IO 密集型任务,无线程切换开销和锁机制问题。
- 实现方式:
- 第三方库:Gevent(自动检测 IO,使用简单)
- 原生实现:asyncio + async/await(Python3.5 + 推荐)
- IO 模型选择:
- 简单场景:阻塞 IO + 多线程
- 高并发场景:IO 多路复用(select/epoll)或协程
- 现代 Python:优先使用 asyncio 原生协程
先记录文档,后续用上在重新看吧、2026年3月5日11:16:43
更多推荐


所有评论(0)