Python面试手册AI版——Part4
Python面试手册AI版——Part4 并发编程(多线程/多进程/协程)
·
Part 4: 并发编程(多线程/多进程/协程)
一、并发 vs 并行 基础概念
1. 1 核心概念区分
"""
并发 (Concurrency):
- 同一时间段内处理多个任务
- 任务之间交替执行(时间片轮转)
- 单核 CPU 也能实现
- 关注:任务调度、资源共享
并行 (Parallelism):
- 同一时刻同时执行多个任务
- 需要多核 CPU 或多台机器
- 关注:任务分解、负载均衡
I/O 密集型 vs CPU 密集型:
- I/O 密集型:大部分时间等待 I/O(网络、磁盘)
→ 适合多线程/协程
- CPU 密集型:大部分时间进行计算
→ 适合多进程
"""
# ============ Python 并发模型选择 ============
"""
场景 推荐方案 原因
─────────────────────────────────────────────────────
Web 爬虫 asyncio + aiohttp I/O 密集,协程最高效
Web 服务器 asyncio / 多线程 处理大量并发连接
图像/视频处理 多进程 CPU 密集,绑过 GIL
数据计算 多进程 + NumPy CPU 密集
文件批量处理 多线程 / 多进程 取决于是否 I/O 密集
API 调用聚合 asyncio I/O 密集
"""
1.2 GIL 深度解析(必考!)
"""
GIL (Global Interpreter Lock) - 全局解释器锁
什么是 GIL?
- CPython 解释器的一个互斥锁
- 同一时刻只允许一个线程执行 Python 字节码
- 即使在多核 CPU 上,多线程也无法并行执行 Python 代码
为什么需要 GIL?
- CPython 的内存管理(引用计数)不是线程安全的
- GIL 简化了 CPython 的实现
- 保护内置数据结构的线程安全
GIL 的影响:
- CPU 密集型任务:多线程几乎无法提升性能
- I/O 密集型任务:GIL 会在 I/O 等待时释放,影响较小
GIL 何时释放?
1. I/O 操作(文件读写、网络请求)
2. time.sleep()
3. 每执行一定数量的字节码(默认 100 个 tick,Python 3.2+ 改为时间片)
4. 调用 C 扩展(如 NumPy)时可以主动释放
"""
import sys
# 查看 GIL 切换间隔(Python 3.2+)
print(sys.getswitchinterval()) # 0.005 秒 (5ms)
# 可以修改切换间隔
# sys.setswitchinterval(0.001)
# ============ GIL 对性能的影响演示 ============
import threading
import time
def cpu_bound(n: int) -> int:
"""CPU 密集型任务"""
count = 0
for i in range(n):
count += i
return count
# 单线程
start = time.perf_counter()
cpu_bound(10**7)
cpu_bound(10**7)
print(f"单线程: {time.perf_counter() - start:.2f}s")
# 多线程(因为 GIL,不会更快)
start = time.perf_counter()
t1 = threading.Thread(target=cpu_bound, args=(10**7,))
t2 = threading.Thread(target=cpu_bound, args=(10**7,))
t1.start()
t2.start()
t1.join()
t2.join()
print(f"多线程: {time.perf_counter() - start:.2f}s")
# 结果:多线程可能比单线程还慢!(线程切换开销)
二、多线程(threading)
2.1 线程基础
import threading
import time
from typing import List
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(threadName)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ============ 方式1:直接创建 Thread ============
def worker(name: str, delay: float) -> None:
"""工作线程函数"""
logger.info(f"Worker {name} starting")
time.sleep(delay)
logger.info(f"Worker {name} finished")
# 创建线程
t1 = threading.Thread(target=worker, args=("A", 2))
t2 = threading.Thread(target=worker, args=("B", 1), name="Thread-B")
# 启动线程
t1.start()
t2.start()
# 等待线程完成
t1.join() # 阻塞直到 t1 完成
t2.join(timeout=5) # 最多等待 5 秒
# ============ 方式2:继承 Thread 类 ============
class DownloadThread(threading.Thread):
def __init__(self, url: str, filename: str):
super().__init__()
self.url = url
self.filename = filename
self.result = None
def run(self):
"""重写 run 方法"""
logger.info(f"Downloading {self.url}")
time.sleep(1) # 模拟下载
self.result = f"Content from {self.url}"
logger.info(f"Saved to {self.filename}")
thread = DownloadThread("http://example.com", "file.txt")
thread.start()
thread.join()
print(thread.result)
# ============ 守护线程 ============
def background_task():
while True:
logger.info("Background task running...")
time.sleep(1)
daemon = threading.Thread(target=background_task, daemon=True)
daemon.start()
# 主线程结束时,守护线程自动终止
time.sleep(3)
logger.info("Main thread exiting")
# ============ 线程相关信息 ============
print(f"当前线程: {threading.current_thread().name}")
print(f"主线程: {threading.main_thread().name}")
print(f"活跃线程数: {threading.active_count()}")
print(f"所有线程: {threading.enumerate()}")
2.2 线程同步原语
Lock(互斥锁)
import threading
from typing import List
# ============ 没有锁的问题 ============
class UnsafeCounter:
def __init__(self):
self.count = 0
def increment(self):
# 这不是原子操作!
# 实际是: temp = self.count; temp += 1; self.count = temp
self.count += 1
unsafe = UnsafeCounter()
def unsafe_worker():
for _ in range(100000):
unsafe.increment()
threads = [threading.Thread(target=unsafe_worker) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"期望: 1000000, 实际: {unsafe.count}") # 通常小于 1000000!
# ============ 使用 Lock 保护 ============
class SafeCounter:
def __init__(self):
self.count = 0
self._lock = threading.Lock()
def increment(self):
with self._lock: # 自动获取和释放锁
self.count += 1
# 等价于:
def increment_manual(self):
self._lock.acquire()
try:
self.count += 1
finally:
self._lock.release()
safe = SafeCounter()
def safe_worker():
for _ in range(100000):
safe.increment()
threads = [threading.Thread(target=safe_worker) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"期望: 1000000, 实际: {safe.count}") # 正好 1000000 ✅
# ============ 死锁示例与避免 ============
lock_a = threading.Lock()
lock_b = threading.Lock()
def worker_1():
with lock_a:
time.sleep(0.1)
with lock_b: # 等待 lock_b
print("Worker 1 done")
def worker_2():
with lock_b:
time.sleep(0.1)
with lock_a: # 等待 lock_a → 死锁!
print("Worker 2 done")
# 避免死锁的方法:
# 1. 按固定顺序获取锁
# 2. 使用 lock.acquire(timeout=...)
# 3. 使用 RLock(可重入锁)
RLock(可重入锁)
import threading
# 普通 Lock 不能重复获取
lock = threading.Lock()
lock.acquire()
# lock.acquire() # 死锁!
# RLock 可以被同一线程多次获取
rlock = threading.RLock()
rlock.acquire()
rlock.acquire() # OK,但必须释放相同次数
rlock.release()
rlock.release()
# ============ 实际应用:递归函数 ============
class RecursiveClass:
def __init__(self):
self._lock = threading.RLock()
self.data = []
def add(self, item):
with self._lock:
self.data.append(item)
def add_many(self, items):
with self._lock:
for item in items:
self.add(item) # 再次获取同一把锁
Condition(条件变量)
import threading
import time
from collections import deque
from typing import Any
class BlockingQueue:
"""阻塞队列:生产者-消费者模式"""
def __init__(self, maxsize: int = 10):
self._queue: deque = deque()
self._maxsize = maxsize
self._condition = threading.Condition()
def put(self, item: Any) -> None:
"""放入元素,队列满时阻塞"""
with self._condition:
while len(self._queue) >= self._maxsize:
self._condition.wait() # 释放锁并等待通知
self._queue.append(item)
self._condition.notify() # 通知一个等待的消费者
def get(self) -> Any:
"""取出元素,队列空时阻塞"""
with self._condition:
while len(self._queue) == 0:
self._condition.wait()
item = self._queue.popleft()
self._condition.notify() # 通知一个等待的生产者
return item
# 使用示例
queue = BlockingQueue(maxsize=5)
def producer():
for i in range(10):
queue.put(i)
print(f"Produced: {i}")
time.sleep(0.1)
def consumer():
for _ in range(10):
item = queue.get()
print(f"Consumed: {item}")
time.sleep(0.2)
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()
Semaphore(信号量)
import threading
import time
# 信号量:限制同时访问资源的线程数
semaphore = threading.Semaphore(3) # 最多 3 个线程同时访问
def limited_resource(thread_id: int):
with semaphore:
print(f"Thread {thread_id} acquired semaphore")
time.sleep(2)
print(f"Thread {thread_id} released semaphore")
# 启动 10 个线程,但最多 3 个能同时运行
threads = [threading.Thread(target=limited_resource, args=(i,)) for i in range(10)]
for t in threads:
t.start()
# ============ BoundedSemaphore ============
# 防止 release() 调用次数超过 acquire()
bounded = threading.BoundedSemaphore(3)
# bounded.release() # ValueError: Semaphore released too many times
Event(事件)
import threading
import time
# Event:线程间的简单通信机制
event = threading.Event()
def waiter(name: str):
print(f"{name} waiting for event...")
event.wait() # 阻塞直到 event 被 set
print(f"{name} received event!")
def setter():
time.sleep(2)
print("Setting event!")
event.set() # 唤醒所有等待的线程
threading.Thread(target=waiter, args=("Thread-1",)).start()
threading.Thread(target=waiter, args=("Thread-2",)).start()
threading.Thread(target=setter).start()
# event.clear() # 重置事件
# event.is_set() # 检查状态
Barrier(屏障)
import threading
import time
import random
# Barrier:让多个线程在某一点同步
barrier = threading.Barrier(3) # 等待 3 个线程
def worker(thread_id: int):
# 第一阶段
work_time = random.uniform(0.5, 2)
print(f"Thread {thread_id} working for {work_time:.1f}s")
time.sleep(work_time)
print(f"Thread {thread_id} waiting at barrier")
barrier.wait() # 等待其他线程
# 第二阶段:所有线程同时开始
print(f"Thread {thread_id} passed barrier")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
t.start()
2.3 线程池(ThreadPoolExecutor)
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
from concurrent.futures import Future
import time
from typing import List
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def download(url: str) -> str:
"""模拟下载任务"""
logger.info(f"Downloading {url}")
time.sleep(1)
return f"Content from {url}"
# ============ 基础用法 ============
urls = [f"http://example.com/page{i}" for i in range(5)]
# 方式1:使用 submit
with ThreadPoolExecutor(max_workers=3) as executor:
# submit 返回 Future 对象
futures: List[Future] = [executor.submit(download, url) for url in urls]
# 获取结果
for future in futures:
result = future.result() # 阻塞等待结果
print(result)
# 方式2:使用 map(保持顺序)
with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(download, urls)
for result in results: # 按提交顺序返回
print(result)
# 方式3:使用 as_completed(先完成先返回)
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(download, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
result = future.result()
print(f"{url}: {result}")
except Exception as e:
print(f"{url} generated an exception: {e}")
# ============ 超时控制 ============
with ThreadPoolExecutor(max_workers=3) as executor:
future = executor.submit(download, "http://slow.com")
try:
result = future.result(timeout=0.5) # 最多等待 0.5 秒
except TimeoutError:
print("Task timed out")
future.cancel() # 尝试取消(不保证成功)
# ============ 等待策略 ============
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(download, url) for url in urls]
# 等待所有完成
done, not_done = wait(futures)
# 等待第一个完成
# done, not_done = wait(futures, return_when=FIRST_COMPLETED)
# 等待任一异常
# done, not_done = wait(futures, return_when=FIRST_EXCEPTION)
# ============ 回调函数 ============
def callback(future: Future):
result = future.result()
logger.info(f"Callback received: {result}")
with ThreadPoolExecutor(max_workers=3) as executor:
future = executor.submit(download, "http://example.com")
future.add_done_callback(callback)
2.4 线程局部数据(Thread-Local)
import threading
from typing import Optional
# 每个线程独享的数据
thread_local = threading.local()
def process_request(request_id: int):
# 设置线程局部变量
thread_local.request_id = request_id
thread_local.user = f"user_{request_id}"
# 在其他函数中访问
do_something()
do_another_thing()
def do_something():
# 不需要参数传递,直接访问
print(f"Processing request {thread_local.request_id}")
def do_another_thing():
print(f"User: {thread_local.user}")
# 每个线程有自己的 request_id 和 user
threads = [
threading.Thread(target=process_request, args=(i,))
for i in range(3)
]
for t in threads:
t.start()
for t in threads:
t.join()
# ============ 实际应用:数据库连接 ============
class ConnectionManager:
def __init__(self):
self._local = threading.local()
def get_connection(self):
if not hasattr(self._local, 'connection'):
self._local.connection = self._create_connection()
return self._local.connection
def _create_connection(self):
# 创建数据库连接
return f"Connection-{threading.current_thread().name}"
三、多进程(multiprocessing)
3.1 进程基础
import multiprocessing as mp
import os
import time
from typing import List
def worker(name: str) -> None:
"""工作进程函数"""
print(f"Worker {name} starting, PID: {os.getpid()}")
time.sleep(2)
print(f"Worker {name} finished")
# ============ 方式1:直接创建 Process ============
if __name__ == "__main__": # Windows 上必须有这个保护!
p1 = mp.Process(target=worker, args=("A",))
p2 = mp.Process(target=worker, args=("B",))
p1.start()
p2.start()
p1.join()
p2.join()
# ============ 方式2:继承 Process 类 ============
class ComputeProcess(mp.Process):
def __init__(self, data: List[int]):
super().__init__()
self.data = data
self.result = None # 注意:这个不能直接共享!
def run(self):
print(f"Computing in PID: {os.getpid()}")
self.result = sum(x ** 2 for x in self.data)
# ============ 获取进程信息 ============
print(f"当前进程 PID: {os.getpid()}")
print(f"父进程 PID: {os.getppid()}")
print(f"CPU 核心数: {mp.cpu_count()}")
3.2 进程间通信(IPC)
Queue(队列)
import multiprocessing as mp
import time
from typing import Any
def producer(queue: mp.Queue, items: list) -> None:
"""生产者"""
for item in items:
print(f"Producing: {item}")
queue.put(item)
time.sleep(0.5)
queue.put(None) # 结束信号
def consumer(queue: mp.Queue) -> None:
"""消费者"""
while True:
item = queue.get()
if item is None:
break
print(f"Consuming: {item}")
time.sleep(1)
if __name__ == "__main__":
queue = mp.Queue()
p1 = mp.Process(target=producer, args=(queue, [1, 2, 3, 4, 5]))
p2 = mp.Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
# ============ JoinableQueue ============
# 支持 task_done() 和 join(),可以等待所有任务完成
def worker(queue: mp.JoinableQueue):
while True:
item = queue.get()
if item is None:
queue.task_done()
break
print(f"Processing: {item}")
queue.task_done() # 标记任务完成
Pipe(管道)
import multiprocessing as mp
def sender(conn):
"""发送端"""
for i in range(5):
conn.send(f"Message {i}")
conn.send(None) # 结束信号
conn.close()
def receiver(conn):
"""接收端"""
while True:
msg = conn.recv()
if msg is None:
break
print(f"Received: {msg}")
conn.close()
if __name__ == "__main__":
# 创建双向管道
parent_conn, child_conn = mp.Pipe()
p1 = mp.Process(target=sender, args=(parent_conn,))
p2 = mp.Process(target=receiver, args=(child_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
共享内存(Value / Array)
import multiprocessing as mp
from multiprocessing import Value, Array
import ctypes
def increment_counter(counter: Value, lock: mp.Lock):
"""使用共享内存的计数器"""
for _ in range(10000):
with lock:
counter.value += 1
if __name__ == "__main__":
# 共享变量
# 类型代码:'i'=int, 'd'=double, 'c'=char
counter = Value('i', 0) # 共享整数
lock = mp.Lock()
processes = [
mp.Process(target=increment_counter, args=(counter, lock))
for _ in range(4)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Counter: {counter.value}") # 40000
# 共享数组
arr = Array('d', [0.0, 1.0, 2.0, 3.0])
# ============ 使用 shared_memory(Python 3.8+) ============
from multiprocessing import shared_memory
import numpy as np
def worker_shared(shm_name: str, shape: tuple, dtype):
"""使用共享内存处理 NumPy 数组"""
existing_shm = shared_memory.SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
arr[:] = arr[: ] * 2 # 原地修改
existing_shm.close()
if __name__ == "__main__":
# 创建原始数组
original = np.array([1, 2, 3, 4, 5], dtype=np.float64)
# 创建共享内存
shm = shared_memory.SharedMemory(create=True, size=original.nbytes)
shared_arr = np.ndarray(original.shape, dtype=original.dtype, buffer=shm.buf)
shared_arr[:] = original[:] # 复制数据
# 在子进程中处理
p = mp.Process(target=worker_shared, args=(shm.name, original.shape, original.dtype))
p.start()
p.join()
print(f"Result: {shared_arr}") # [2. 4. 6. 8. 10.]
# 清理
shm.close()
shm.unlink()
Manager(管理器)
import multiprocessing as mp
from multiprocessing import Manager
def worker(shared_dict: dict, shared_list: list, key: str):
"""使用 Manager 共享复杂数据结构"""
shared_dict[key] = f"value_{key}"
shared_list.append(key)
if __name__ == "__main__":
with Manager() as manager:
# 可以共享复杂数据结构
shared_dict = manager.dict()
shared_list = manager.list()
processes = [
mp.Process(target=worker, args=(shared_dict, shared_list, f"key_{i}"))
for i in range(5)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Dict: {dict(shared_dict)}")
print(f"List: {list(shared_list)}")
3.3 进程池(Pool / ProcessPoolExecutor)
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
from typing import List
def cpu_intensive(n: int) -> int:
"""CPU 密集型任务"""
return sum(i ** 2 for i in range(n))
# ============ 方式1:multiprocessing.Pool ============
if __name__ == "__main__":
# 创建进程池
with mp.Pool(processes=4) as pool:
# map:阻塞,保持顺序
results = pool.map(cpu_intensive, [10**6] * 8)
print(f"Map results: {len(results)} items")
# imap:惰性求值,保持顺序
for result in pool.imap(cpu_intensive, [10**6] * 8):
print(f"imap result: {result}")
# imap_unordered:惰性求值,不保持顺序(更快)
for result in pool.imap_unordered(cpu_intensive, [10**6] * 8):
print(f"imap_unordered result: {result}")
# apply_async:异步提交单个任务
result = pool.apply_async(cpu_intensive, args=(10**6,))
print(f"Async result: {result.get()}")
# starmap:支持多参数
def add(a, b):
return a + b
results = pool.starmap(add, [(1, 2), (3, 4), (5, 6)])
# ============ 方式2:ProcessPoolExecutor(推荐) ============
if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=4) as executor:
# submit
futures = [executor.submit(cpu_intensive, 10**6) for _ in range(8)]
for future in as_completed(futures):
print(f"Result: {future.result()}")
# map
results = executor.map(cpu_intensive, [10**6] * 8)
for result in results:
print(f"Map result: {result}")
# ============ 进程池 vs 线程池对比 ============
"""
ProcessPoolExecutor:
- 适合 CPU 密集型任务
- 绑过 GIL
- 进程创建开销大
- 需要序列化数据(pickle)
ThreadPoolExecutor:
- 适合 I/O 密集型任务
- 受 GIL 限制
- 线程创建开销小
- 直接共享内存
"""
3.4 多进程最佳实践
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import logging
# ============ 1. Windows 兼容性 ============
# Windows 使用 spawn 方式创建进程,必须有 __main__ 保护
if __name__ == "__main__":
mp.set_start_method('spawn') # 'fork' (Unix默认), 'spawn' (Windows默认), 'forkserver'
# ============ 2. 日志处理 ============
def worker_with_logging(x):
logger = logging.getLogger(__name__)
logger.info(f"Processing {x}")
return x ** 2
# ============ 3. 异常处理 ============
def safe_worker(x):
try:
return x ** 2
except Exception as e:
return {"error": str(e), "input": x}
# ============ 4. 初始化函数 ============
def init_worker():
"""进程初始化函数"""
import signal
signal.signal(signal.SIGINT, signal.SIG_IGN) # 忽略 Ctrl+C
if __name__ == "__main__":
with mp.Pool(4, initializer=init_worker) as pool:
results = pool.map(safe_worker, range(10))
# ============ 5. 大数据传输优化 ============
# 使用共享内存而非 pickle
from multiprocessing import shared_memory
import numpy as np
def process_shared_data(shm_name, shape, dtype):
shm = shared_memory.SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
# 处理数据...
shm.close()
return arr.sum()
四、协程与异步编程(asyncio)
4.1 协程基础
import asyncio
from typing import List
# ============ 定义协程 ============
async def hello(name: str) -> str:
"""async 定义协程函数"""
print(f"Hello {name}, starting...")
await asyncio.sleep(1) # 非阻塞等待
print(f"Hello {name}, finished!")
return f"Result from {name}"
# ============ 运行协程 ============
# 方式1:asyncio.run()(推荐,Python 3.7+)
async def main():
result = await hello("World")
print(result)
asyncio.run(main())
# 方式2:获取事件循环(低级 API)
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
# ============ 并发执行多个协程 ============
async def main_concurrent():
# 创建任务
task1 = asyncio.create_task(hello("Alice"))
task2 = asyncio.create_task(hello("Bob"))
# 等待所有任务完成
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
# 更简洁的方式
async def main_gather():
results = await asyncio.gather(
hello("Alice"),
hello("Bob"),
hello("Charlie"),
)
print(f"All results: {results}")
asyncio.run(main_gather())
# 总耗时约 1 秒(并发执行),而非 3 秒(串行执行)
4.2 异步任务控制
import asyncio
from typing import List, Any
# ============ asyncio.gather:并发执行,收集所有结果 ============
async def fetch(url: str) -> str:
await asyncio.sleep(1)
return f"Content from {url}"
async def main_gather():
urls = ["url1", "url2", "url3"]
# 收集所有结果(保持顺序)
results = await asyncio.gather(*[fetch(url) for url in urls])
# 允许部分失败
results = await asyncio.gather(
*[fetch(url) for url in urls],
return_exceptions=True # 异常作为返回值,不抛出
)
# ============ asyncio.wait:更灵活的等待控制 ============
async def main_wait():
tasks = [asyncio.create_task(fetch(f"url{i}")) for i in range(5)]
# 等待所有完成
done, pending = await asyncio.wait(tasks)
# 等待第一个完成
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# 带超时
done, pending = await asyncio.wait(tasks, timeout=0.5)
# 取消未完成的任务
for task in pending:
task.cancel()
# ============ asyncio.as_completed:先完成先处理 ============
async def main_as_completed():
tasks = [fetch(f"url{i}") for i in range(5)]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Got: {result}")
# ============ asyncio.wait_for:单任务超时 ============
async def main_timeout():
try:
result = await asyncio.wait_for(
fetch("slow_url"),
timeout=0.5
)
except asyncio.TimeoutError:
print("Task timed out!")
# ============ asyncio.shield:保护任务不被取消 ============
async def main_shield():
task = asyncio.create_task(fetch("important_url"))
try:
result = await asyncio.wait_for(
asyncio.shield(task), # 即使超时,任务也会继续执行
timeout=0.5
)
except asyncio.TimeoutError:
# task 仍在运行
result = await task # 可以继续等待
4.3 异步同步原语
import asyncio
# ============ asyncio.Lock ============
async def protected_operation(lock: asyncio.Lock, name: str):
async with lock:
print(f"{name} acquired lock")
await asyncio.sleep(1)
print(f"{name} releasing lock")
async def main_lock():
lock = asyncio.Lock()
await asyncio.gather(
protected_operation(lock, "Task1"),
protected_operation(lock, "Task2"),
)
# ============ asyncio.Semaphore ============
async def limited_operation(sem: asyncio.Semaphore, n: int):
async with sem:
print(f"Task {n} started")
await asyncio.sleep(1)
print(f"Task {n} finished")
async def main_semaphore():
sem = asyncio.Semaphore(3) # 最多 3 个并发
await asyncio.gather(*[
limited_operation(sem, i) for i in range(10)
])
# ============ asyncio.Event ============
async def waiter(event: asyncio.Event, name: str):
print(f"{name} waiting...")
await event.wait()
print(f"{name} got event!")
async def main_event():
event = asyncio.Event()
asyncio.create_task(waiter(event, "Task1"))
asyncio.create_task(waiter(event, "Task2"))
await asyncio.sleep(1)
event.set() # 通知所有等待者
await asyncio.sleep(0.1)
# ============ asyncio.Queue ============
async def producer(queue: asyncio.Queue):
for i in range(5):
await queue.put(i)
print(f"Produced: {i}")
await asyncio.sleep(0.5)
await queue.put(None) # 结束信号
async def consumer(queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Consumed: {item}")
queue.task_done()
async def main_queue():
queue = asyncio.Queue(maxsize=3)
await asyncio.gather(
producer(queue),
consumer(queue),
)
4.4 实战:异步 HTTP 客户端
import asyncio
import aiohttp
from typing import List, Dict, Any
import logging
from dataclasses import dataclass
from contextlib import asynccontextmanager
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class Response:
url: str
status: int
data: Any
class AsyncHTTPClient:
"""异步 HTTP 客户端"""
def __init__(self, max_concurrent: int = 10, timeout: float = 30):
self._semaphore = asyncio.Semaphore(max_concurrent)
self._timeout = aiohttp.ClientTimeout(total=timeout)
self._session: aiohttp.ClientSession | None = None
async def __aenter__(self):
self._session = aiohttp.ClientSession(timeout=self._timeout)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._session:
await self._session.close()
async def fetch(self, url: str) -> Response:
"""获取单个 URL"""
async with self._semaphore:
try:
async with self._session.get(url) as response:
data = await response.json()
return Response(url=url, status=response.status, data=data)
except Exception as e:
logger.error(f"Error fetching {url}: {e}")
return Response(url=url, status=-1, data={"error": str(e)})
async def fetch_all(self, urls: List[str]) -> List[Response]:
"""并发获取多个 URL"""
tasks = [self.fetch(url) for url in urls]
return await asyncio.gather(*tasks)
# 使用示例
async def main():
urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
]
async with AsyncHTTPClient(max_concurrent=5) as client:
responses = await client.fetch_all(urls)
for resp in responses:
logger.info(f"{resp.url}: {resp.status}")
# asyncio.run(main())
4.5 异步上下文管理器与迭代器
import asyncio
from typing import AsyncIterator
# ============ 异步上下文管理器 ============
class AsyncDatabase:
"""异步数据库连接"""
async def __aenter__(self):
print("Connecting to database...")
await asyncio.sleep(0.5) # 模拟连接
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing database connection...")
await asyncio.sleep(0.1)
async def query(self, sql: str):
await asyncio.sleep(0.1)
return [{"id": 1, "name": "Alice"}]
async def main_context():
async with AsyncDatabase() as db:
result = await db.query("SELECT * FROM users")
print(result)
# 使用 contextlib
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_timer():
import time
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
print(f"Elapsed: {elapsed:.4f}s")
async def main_timer():
async with async_timer():
await asyncio.sleep(1)
# ============ 异步迭代器 ============
class AsyncCounter:
"""异步计数器"""
def __init__(self, stop: int):
self.current = 0
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.1) # 模拟异步操作
self.current += 1
return self.current
async def main_aiter():
async for num in AsyncCounter(5):
print(num)
# ============ 异步生成器 ============
async def async_range(start: int, stop: int) -> AsyncIterator[int]:
"""异步生成器"""
for i in range(start, stop):
await asyncio.sleep(0.1)
yield i
async def main_agen():
async for num in async_range(0, 5):
print(num)
4.6 asyncio 实战模式
import asyncio
from typing import Callable, Any
import functools
# ============ 1. 同步函数转异步 ============
def blocking_io(x: int) -> int:
"""阻塞的 I/O 操作"""
import time
time.sleep(1)
return x ** 2
async def main_executor():
loop = asyncio.get_event_loop()
# 在线程池中执行阻塞函数
result = await loop.run_in_executor(None, blocking_io, 5)
print(result)
# 指定线程池
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as pool:
result = await loop.run_in_executor(pool, blocking_io, 10)
# 装饰器版本
def async_wrap(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
functools.partial(func, *args, **kwargs)
)
return wrapper
@async_wrap
def sync_function(x: int) -> int:
import time
time.sleep(1)
return x ** 2
# ============ 2. 重试装饰器 ============
def async_retry(max_attempts: int = 3, delay: float = 1.0):
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_attempts - 1:
await asyncio.sleep(delay * (attempt + 1))
raise last_exception
return wrapper
return decorator
@async_retry(max_attempts=3, delay=1.0)
async def unreliable_operation():
import random
if random.random() < 0.7:
raise ValueError("Random failure")
return "Success"
# ============ 3. 限速器 ============
class RateLimiter:
"""令牌桶限速器"""
def __init__(self, rate: float, max_tokens: int = 10):
self.rate = rate # 每秒产生的令牌数
self.max_tokens = max_tokens
self.tokens = max_tokens
self.last_update = asyncio.get_event_loop().time()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = asyncio.get_event_loop().time()
elapsed = now - self.last_update
self.tokens = min(self.max_tokens, self.tokens + elapsed * self.rate)
self.last_update = now
if self.tokens < 1:
wait_time = (1 - self.tokens) / self.rate
await asyncio.sleep(wait_time)
self.tokens = 0
else:
self.tokens -= 1
# ============ 4. 优雅关闭 ============
async def graceful_shutdown(loop: asyncio.AbstractEventLoop):
"""优雅关闭所有任务"""
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
def main_with_shutdown():
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
loop.run_until_complete(graceful_shutdown(loop))
finally:
loop.close()
五、并发编程对比总结
5.1 三种方式对比
| 特性 | 多线程 | 多进程 | 协程 |
|---|---|---|---|
| 适用场景 | I/O 密集型 | CPU 密集型 | I/O 密集型 |
| GIL 影响 | 受限制 | 不受限制 | 不受限制 |
| 内存占用 | 中等 (~1MB/线程) | 高 (~10MB/进程) | 低 (~KB/协程) |
| 创建开销 | 中等 | 高 | 低 |
| 通信方式 | 共享内存 | IPC (Queue/Pipe) | 直接 await |
| 切换开销 | 系统调度 | 系统调度 | 用户态切换 |
| 编程难度 | 中等 | 中等 | 较高 |
| 调试难度 | 高 | 中等 | 中等 |
5.2 选择决策树
需要并发处理?
│
├─ CPU 密集型任务?
│ ├─ 是 → 多进程 (ProcessPoolExecutor)
│ └─ 否 → I/O 密集型
│ │
│ ├─ 高并发 (>1000)?
│ │ └─ 是 → asyncio
│ │
│ ├─ 需要调用阻塞库?
│ │ └─ 是 → 多线程 (ThreadPoolExecutor)
│ │
│ └─ 其他 → asyncio (推荐)
│
└─ 混合场景?
└─ asyncio + run_in_executor
5.3 性能对比代码
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp
# ============ I/O 密集型测试 ============
def io_task():
time.sleep(0.1)
return 1
async def async_io_task():
await asyncio.sleep(0.1)
return 1
def test_io_bound(n: int = 100):
# 串行
start = time.perf_counter()
for _ in range(n):
io_task()
print(f"串行: {time.perf_counter() - start:.2f}s")
# 多线程
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=20) as executor:
list(executor.map(io_task, range(n)))
print(f"多线程: {time.perf_counter() - start:.2f}s")
# 协程
async def run_async():
tasks = [async_io_task() for _ in range(n)]
await asyncio.gather(*tasks)
start = time.perf_counter()
asyncio.run(run_async())
print(f"协程: {time.perf_counter() - start:.2f}s")
# ============ CPU 密集型测试 ============
def cpu_task(n: int) -> int:
return sum(i ** 2 for i in range(n))
def test_cpu_bound(n: int = 8, size: int = 10**6):
# 串行
start = time.perf_counter()
for _ in range(n):
cpu_task(size)
print(f"串行: {time.perf_counter() - start:.2f}s")
# 多线程(受 GIL 限制)
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as executor:
list(executor.map(cpu_task, [size] * n))
print(f"多线程: {time.perf_counter() - start:.2f}s")
# 多进程
start = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as executor:
list(executor.map(cpu_task, [size] * n))
print(f"多进程: {time.perf_counter() - start:.2f}s")
if __name__ == "__main__":
print("=== I/O 密集型 ===")
test_io_bound(100)
print("\n=== CPU 密集型 ===")
test_cpu_bound(8)
典型输出:
=== I/O 密集型 ===
串行: 10.05s
多线程: 0.52s
协程: 0.11s ← 协程最快
=== CPU 密集型 ===
串行: 3.20s
多线程: 3.35s ← GIL 限制,比串行还慢
多进程: 0.95s ← 多进程最快
📝 Part 4 总结
面试高频考点
| 知识点 | 面试频率 | 难度 |
|---|---|---|
| GIL 是什么?有什么影响? | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ |
| 多线程 vs 多进程 vs 协程区别 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ |
| 什么场景用什么并发模型? | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ |
| 线程安全与锁 | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| ThreadPoolExecutor 用法 | ⭐⭐⭐⭐ | ⭐⭐ |
| asyncio 基础用法 | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| async/await 原理 | ⭐⭐⭐ | ⭐⭐⭐⭐ |
| 进程间通信方式 | ⭐⭐⭐ | ⭐⭐⭐ |
| 生产者-消费者模式 | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| 死锁的原因与避免 | ⭐⭐⭐ | ⭐⭐⭐ |
面试常见问题
- 什么是 GIL?它对多线程有什么影响?
- Python 的多线程是真正的并行吗?
- 什么情况下用多线程,什么情况下用多进程?
- asyncio 的工作原理是什么?和多线程有什么区别?
- 如何实现一个线程安全的单例模式?
- 什么是死锁?如何避免?
- 如何实现生产者-消费者模式?
- 进程间通信有哪些方式?各有什么优缺点?
- 协程比多线程有什么优势?
- 如何在 asyncio 中执行阻塞的同步代码?
更多推荐


所有评论(0)