Part 4: 并发编程(多线程/多进程/协程)

一、并发 vs 并行 基础概念

1. 1 核心概念区分

"""
并发 (Concurrency):
    - 同一时间段内处理多个任务
    - 任务之间交替执行(时间片轮转)
    - 单核 CPU 也能实现
    - 关注:任务调度、资源共享

并行 (Parallelism):
    - 同一时刻同时执行多个任务
    - 需要多核 CPU 或多台机器
    - 关注:任务分解、负载均衡

I/O 密集型 vs CPU 密集型:
    - I/O 密集型:大部分时间等待 I/O(网络、磁盘)
      → 适合多线程/协程
    - CPU 密集型:大部分时间进行计算
      → 适合多进程
"""

# ============ Python 并发模型选择 ============
"""
场景                    推荐方案              原因
─────────────────────────────────────────────────────
Web 爬虫               asyncio + aiohttp     I/O 密集,协程最高效
Web 服务器             asyncio / 多线程      处理大量并发连接
图像/视频处理          多进程                CPU 密集,绑过 GIL
数据计算               多进程 + NumPy        CPU 密集
文件批量处理           多线程 / 多进程       取决于是否 I/O 密集
API 调用聚合           asyncio               I/O 密集
"""

1.2 GIL 深度解析(必考!)

"""
GIL (Global Interpreter Lock) - 全局解释器锁

什么是 GIL?
    - CPython 解释器的一个互斥锁
    - 同一时刻只允许一个线程执行 Python 字节码
    - 即使在多核 CPU 上,多线程也无法并行执行 Python 代码

为什么需要 GIL?
    - CPython 的内存管理(引用计数)不是线程安全的
    - GIL 简化了 CPython 的实现
    - 保护内置数据结构的线程安全

GIL 的影响:
    - CPU 密集型任务:多线程几乎无法提升性能
    - I/O 密集型任务:GIL 会在 I/O 等待时释放,影响较小

GIL 何时释放?
    1. I/O 操作(文件读写、网络请求)
    2. time.sleep()
    3. 每执行一定数量的字节码(默认 100 个 tick,Python 3.2+ 改为时间片)
    4. 调用 C 扩展(如 NumPy)时可以主动释放
"""

import sys
# 查看 GIL 切换间隔(Python 3.2+)
print(sys.getswitchinterval())  # 0.005 秒 (5ms)

# 可以修改切换间隔
# sys.setswitchinterval(0.001)

# ============ GIL 对性能的影响演示 ============
import threading
import time

def cpu_bound(n:  int) -> int:
    """CPU 密集型任务"""
    count = 0
    for i in range(n):
        count += i
    return count

# 单线程
start = time.perf_counter()
cpu_bound(10**7)
cpu_bound(10**7)
print(f"单线程:  {time.perf_counter() - start:.2f}s")

# 多线程(因为 GIL,不会更快)
start = time.perf_counter()
t1 = threading.Thread(target=cpu_bound, args=(10**7,))
t2 = threading.Thread(target=cpu_bound, args=(10**7,))
t1.start()
t2.start()
t1.join()
t2.join()
print(f"多线程: {time.perf_counter() - start:.2f}s")
# 结果:多线程可能比单线程还慢!(线程切换开销)

二、多线程(threading)

2.1 线程基础

import threading
import time
from typing import List
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(threadName)s - %(message)s'
)
logger = logging.getLogger(__name__)

# ============ 方式1:直接创建 Thread ============
def worker(name: str, delay: float) -> None:
    """工作线程函数"""
    logger.info(f"Worker {name} starting")
    time.sleep(delay)
    logger.info(f"Worker {name} finished")

# 创建线程
t1 = threading.Thread(target=worker, args=("A", 2))
t2 = threading.Thread(target=worker, args=("B", 1), name="Thread-B")

# 启动线程
t1.start()
t2.start()

# 等待线程完成
t1.join()  # 阻塞直到 t1 完成
t2.join(timeout=5)  # 最多等待 5 秒

# ============ 方式2:继承 Thread 类 ============
class DownloadThread(threading.Thread):
    def __init__(self, url: str, filename:  str):
        super().__init__()
        self.url = url
        self.filename = filename
        self.result = None
    
    def run(self):
        """重写 run 方法"""
        logger.info(f"Downloading {self.url}")
        time.sleep(1)  # 模拟下载
        self.result = f"Content from {self.url}"
        logger.info(f"Saved to {self.filename}")

thread = DownloadThread("http://example.com", "file.txt")
thread.start()
thread.join()
print(thread.result)

# ============ 守护线程 ============
def background_task():
    while True:
        logger.info("Background task running...")
        time.sleep(1)

daemon = threading.Thread(target=background_task, daemon=True)
daemon.start()
# 主线程结束时,守护线程自动终止
time.sleep(3)
logger.info("Main thread exiting")

# ============ 线程相关信息 ============
print(f"当前线程:  {threading.current_thread().name}")
print(f"主线程: {threading.main_thread().name}")
print(f"活跃线程数: {threading.active_count()}")
print(f"所有线程:  {threading.enumerate()}")

2.2 线程同步原语

Lock(互斥锁)
import threading
from typing import List

# ============ 没有锁的问题 ============
class UnsafeCounter: 
    def __init__(self):
        self.count = 0
    
    def increment(self):
        # 这不是原子操作!
        # 实际是:  temp = self.count; temp += 1; self.count = temp
        self.count += 1

unsafe = UnsafeCounter()

def unsafe_worker():
    for _ in range(100000):
        unsafe.increment()

threads = [threading.Thread(target=unsafe_worker) for _ in range(10)]
for t in threads:
    t.start()
for t in threads: 
    t.join()

print(f"期望:  1000000, 实际: {unsafe.count}")  # 通常小于 1000000!

# ============ 使用 Lock 保护 ============
class SafeCounter: 
    def __init__(self):
        self.count = 0
        self._lock = threading.Lock()
    
    def increment(self):
        with self._lock:  # 自动获取和释放锁
            self.count += 1
    
    # 等价于:
    def increment_manual(self):
        self._lock.acquire()
        try: 
            self.count += 1
        finally:
            self._lock.release()

safe = SafeCounter()

def safe_worker():
    for _ in range(100000):
        safe.increment()

threads = [threading.Thread(target=safe_worker) for _ in range(10)]
for t in threads: 
    t.start()
for t in threads:
    t.join()

print(f"期望: 1000000, 实际:  {safe.count}")  # 正好 1000000 ✅

# ============ 死锁示例与避免 ============
lock_a = threading.Lock()
lock_b = threading.Lock()

def worker_1():
    with lock_a:
        time.sleep(0.1)
        with lock_b:  # 等待 lock_b
            print("Worker 1 done")

def worker_2():
    with lock_b:
        time.sleep(0.1)
        with lock_a:  # 等待 lock_a → 死锁!
            print("Worker 2 done")

# 避免死锁的方法:
# 1. 按固定顺序获取锁
# 2. 使用 lock.acquire(timeout=...)
# 3. 使用 RLock(可重入锁)
RLock(可重入锁)
import threading

# 普通 Lock 不能重复获取
lock = threading.Lock()
lock.acquire()
# lock.acquire()  # 死锁!

# RLock 可以被同一线程多次获取
rlock = threading.RLock()
rlock.acquire()
rlock.acquire()  # OK,但必须释放相同次数
rlock.release()
rlock.release()

# ============ 实际应用:递归函数 ============
class RecursiveClass:
    def __init__(self):
        self._lock = threading.RLock()
        self.data = []
    
    def add(self, item):
        with self._lock:
            self.data.append(item)
    
    def add_many(self, items):
        with self._lock:
            for item in items: 
                self.add(item)  # 再次获取同一把锁
Condition(条件变量)
import threading
import time
from collections import deque
from typing import Any

class BlockingQueue:
    """阻塞队列:生产者-消费者模式"""
    
    def __init__(self, maxsize:  int = 10):
        self._queue:  deque = deque()
        self._maxsize = maxsize
        self._condition = threading.Condition()
    
    def put(self, item: Any) -> None:
        """放入元素,队列满时阻塞"""
        with self._condition:
            while len(self._queue) >= self._maxsize:
                self._condition.wait()  # 释放锁并等待通知
            self._queue.append(item)
            self._condition.notify()  # 通知一个等待的消费者
    
    def get(self) -> Any:
        """取出元素,队列空时阻塞"""
        with self._condition:
            while len(self._queue) == 0:
                self._condition.wait()
            item = self._queue.popleft()
            self._condition.notify()  # 通知一个等待的生产者
            return item

# 使用示例
queue = BlockingQueue(maxsize=5)

def producer():
    for i in range(10):
        queue.put(i)
        print(f"Produced: {i}")
        time.sleep(0.1)

def consumer():
    for _ in range(10):
        item = queue.get()
        print(f"Consumed: {item}")
        time.sleep(0.2)

threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()
Semaphore(信号量)
import threading
import time

# 信号量:限制同时访问资源的线程数
semaphore = threading.Semaphore(3)  # 最多 3 个线程同时访问

def limited_resource(thread_id: int):
    with semaphore: 
        print(f"Thread {thread_id} acquired semaphore")
        time.sleep(2)
        print(f"Thread {thread_id} released semaphore")

# 启动 10 个线程,但最多 3 个能同时运行
threads = [threading.Thread(target=limited_resource, args=(i,)) for i in range(10)]
for t in threads:
    t.start()

# ============ BoundedSemaphore ============
# 防止 release() 调用次数超过 acquire()
bounded = threading.BoundedSemaphore(3)
# bounded.release()  # ValueError:  Semaphore released too many times
Event(事件)
import threading
import time

# Event:线程间的简单通信机制
event = threading.Event()

def waiter(name: str):
    print(f"{name} waiting for event...")
    event.wait()  # 阻塞直到 event 被 set
    print(f"{name} received event!")

def setter():
    time.sleep(2)
    print("Setting event!")
    event.set()  # 唤醒所有等待的线程

threading.Thread(target=waiter, args=("Thread-1",)).start()
threading.Thread(target=waiter, args=("Thread-2",)).start()
threading.Thread(target=setter).start()

# event.clear()  # 重置事件
# event.is_set()  # 检查状态
Barrier(屏障)
import threading
import time
import random

# Barrier:让多个线程在某一点同步
barrier = threading.Barrier(3)  # 等待 3 个线程

def worker(thread_id: int):
    # 第一阶段
    work_time = random.uniform(0.5, 2)
    print(f"Thread {thread_id} working for {work_time:.1f}s")
    time.sleep(work_time)
    
    print(f"Thread {thread_id} waiting at barrier")
    barrier.wait()  # 等待其他线程
    
    # 第二阶段:所有线程同时开始
    print(f"Thread {thread_id} passed barrier")

threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
    t.start()

2.3 线程池(ThreadPoolExecutor)

from concurrent.futures import ThreadPoolExecutor, as_completed, wait
from concurrent.futures import Future
import time
from typing import List
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def download(url: str) -> str:
    """模拟下载任务"""
    logger.info(f"Downloading {url}")
    time.sleep(1)
    return f"Content from {url}"

# ============ 基础用法 ============
urls = [f"http://example.com/page{i}" for i in range(5)]

# 方式1:使用 submit
with ThreadPoolExecutor(max_workers=3) as executor:
    # submit 返回 Future 对象
    futures:  List[Future] = [executor.submit(download, url) for url in urls]
    
    # 获取结果
    for future in futures:
        result = future.result()  # 阻塞等待结果
        print(result)

# 方式2:使用 map(保持顺序)
with ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(download, urls)
    for result in results:  # 按提交顺序返回
        print(result)

# 方式3:使用 as_completed(先完成先返回)
with ThreadPoolExecutor(max_workers=3) as executor:
    futures = {executor.submit(download, url): url for url in urls}
    
    for future in as_completed(futures):
        url = futures[future]
        try:
            result = future.result()
            print(f"{url}:  {result}")
        except Exception as e: 
            print(f"{url} generated an exception: {e}")

# ============ 超时控制 ============
with ThreadPoolExecutor(max_workers=3) as executor:
    future = executor.submit(download, "http://slow.com")
    try:
        result = future.result(timeout=0.5)  # 最多等待 0.5 秒
    except TimeoutError:
        print("Task timed out")
        future.cancel()  # 尝试取消(不保证成功)

# ============ 等待策略 ============
with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(download, url) for url in urls]
    
    # 等待所有完成
    done, not_done = wait(futures)
    
    # 等待第一个完成
    # done, not_done = wait(futures, return_when=FIRST_COMPLETED)
    
    # 等待任一异常
    # done, not_done = wait(futures, return_when=FIRST_EXCEPTION)

# ============ 回调函数 ============
def callback(future:  Future):
    result = future.result()
    logger.info(f"Callback received: {result}")

with ThreadPoolExecutor(max_workers=3) as executor:
    future = executor.submit(download, "http://example.com")
    future.add_done_callback(callback)

2.4 线程局部数据(Thread-Local)

import threading
from typing import Optional

# 每个线程独享的数据
thread_local = threading.local()

def process_request(request_id:  int):
    # 设置线程局部变量
    thread_local.request_id = request_id
    thread_local.user = f"user_{request_id}"
    
    # 在其他函数中访问
    do_something()
    do_another_thing()

def do_something():
    # 不需要参数传递,直接访问
    print(f"Processing request {thread_local.request_id}")

def do_another_thing():
    print(f"User: {thread_local.user}")

# 每个线程有自己的 request_id 和 user
threads = [
    threading.Thread(target=process_request, args=(i,))
    for i in range(3)
]
for t in threads: 
    t.start()
for t in threads: 
    t.join()

# ============ 实际应用:数据库连接 ============
class ConnectionManager:
    def __init__(self):
        self._local = threading.local()
    
    def get_connection(self):
        if not hasattr(self._local, 'connection'):
            self._local.connection = self._create_connection()
        return self._local.connection
    
    def _create_connection(self):
        # 创建数据库连接
        return f"Connection-{threading.current_thread().name}"

三、多进程(multiprocessing)

3.1 进程基础

import multiprocessing as mp
import os
import time
from typing import List

def worker(name: str) -> None:
    """工作进程函数"""
    print(f"Worker {name} starting, PID: {os.getpid()}")
    time.sleep(2)
    print(f"Worker {name} finished")

# ============ 方式1:直接创建 Process ============
if __name__ == "__main__":   # Windows 上必须有这个保护!
    p1 = mp.Process(target=worker, args=("A",))
    p2 = mp.Process(target=worker, args=("B",))
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()

# ============ 方式2:继承 Process 类 ============
class ComputeProcess(mp.Process):
    def __init__(self, data: List[int]):
        super().__init__()
        self.data = data
        self.result = None  # 注意:这个不能直接共享!
    
    def run(self):
        print(f"Computing in PID: {os.getpid()}")
        self.result = sum(x ** 2 for x in self.data)

# ============ 获取进程信息 ============
print(f"当前进程 PID: {os.getpid()}")
print(f"父进程 PID: {os.getppid()}")
print(f"CPU 核心数:  {mp.cpu_count()}")

3.2 进程间通信(IPC)

Queue(队列)
import multiprocessing as mp
import time
from typing import Any

def producer(queue: mp.Queue, items: list) -> None:
    """生产者"""
    for item in items:
        print(f"Producing: {item}")
        queue.put(item)
        time.sleep(0.5)
    queue.put(None)  # 结束信号

def consumer(queue: mp.Queue) -> None:
    """消费者"""
    while True: 
        item = queue.get()
        if item is None:
            break
        print(f"Consuming: {item}")
        time.sleep(1)

if __name__ == "__main__":
    queue = mp.Queue()
    
    p1 = mp.Process(target=producer, args=(queue, [1, 2, 3, 4, 5]))
    p2 = mp.Process(target=consumer, args=(queue,))
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()

# ============ JoinableQueue ============
# 支持 task_done() 和 join(),可以等待所有任务完成
def worker(queue: mp.JoinableQueue):
    while True:
        item = queue.get()
        if item is None:
            queue.task_done()
            break
        print(f"Processing: {item}")
        queue.task_done()  # 标记任务完成
Pipe(管道)
import multiprocessing as mp

def sender(conn):
    """发送端"""
    for i in range(5):
        conn.send(f"Message {i}")
    conn.send(None)  # 结束信号
    conn.close()

def receiver(conn):
    """接收端"""
    while True:
        msg = conn.recv()
        if msg is None:
            break
        print(f"Received: {msg}")
    conn.close()

if __name__ == "__main__": 
    # 创建双向管道
    parent_conn, child_conn = mp.Pipe()
    
    p1 = mp.Process(target=sender, args=(parent_conn,))
    p2 = mp.Process(target=receiver, args=(child_conn,))
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()
共享内存(Value / Array)
import multiprocessing as mp
from multiprocessing import Value, Array
import ctypes

def increment_counter(counter: Value, lock: mp.Lock):
    """使用共享内存的计数器"""
    for _ in range(10000):
        with lock:
            counter.value += 1

if __name__ == "__main__":
    # 共享变量
    # 类型代码:'i'=int, 'd'=double, 'c'=char
    counter = Value('i', 0)  # 共享整数
    lock = mp.Lock()
    
    processes = [
        mp.Process(target=increment_counter, args=(counter, lock))
        for _ in range(4)
    ]
    
    for p in processes:
        p.start()
    for p in processes: 
        p.join()
    
    print(f"Counter:  {counter.value}")  # 40000

    # 共享数组
    arr = Array('d', [0.0, 1.0, 2.0, 3.0])

# ============ 使用 shared_memory(Python 3.8+) ============
from multiprocessing import shared_memory
import numpy as np

def worker_shared(shm_name: str, shape: tuple, dtype):
    """使用共享内存处理 NumPy 数组"""
    existing_shm = shared_memory.SharedMemory(name=shm_name)
    arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
    arr[:] = arr[: ] * 2  # 原地修改
    existing_shm.close()

if __name__ == "__main__":
    # 创建原始数组
    original = np.array([1, 2, 3, 4, 5], dtype=np.float64)
    
    # 创建共享内存
    shm = shared_memory.SharedMemory(create=True, size=original.nbytes)
    shared_arr = np.ndarray(original.shape, dtype=original.dtype, buffer=shm.buf)
    shared_arr[:] = original[:]  # 复制数据
    
    # 在子进程中处理
    p = mp.Process(target=worker_shared, args=(shm.name, original.shape, original.dtype))
    p.start()
    p.join()
    
    print(f"Result: {shared_arr}")  # [2. 4. 6. 8. 10.]
    
    # 清理
    shm.close()
    shm.unlink()
Manager(管理器)
import multiprocessing as mp
from multiprocessing import Manager

def worker(shared_dict: dict, shared_list: list, key: str):
    """使用 Manager 共享复杂数据结构"""
    shared_dict[key] = f"value_{key}"
    shared_list.append(key)

if __name__ == "__main__":
    with Manager() as manager:
        # 可以共享复杂数据结构
        shared_dict = manager.dict()
        shared_list = manager.list()
        
        processes = [
            mp.Process(target=worker, args=(shared_dict, shared_list, f"key_{i}"))
            for i in range(5)
        ]
        
        for p in processes:
            p.start()
        for p in processes: 
            p.join()
        
        print(f"Dict: {dict(shared_dict)}")
        print(f"List: {list(shared_list)}")

3.3 进程池(Pool / ProcessPoolExecutor)

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
from typing import List

def cpu_intensive(n: int) -> int:
    """CPU 密集型任务"""
    return sum(i ** 2 for i in range(n))

# ============ 方式1:multiprocessing.Pool ============
if __name__ == "__main__":
    # 创建进程池
    with mp.Pool(processes=4) as pool:
        # map:阻塞,保持顺序
        results = pool.map(cpu_intensive, [10**6] * 8)
        print(f"Map results: {len(results)} items")
        
        # imap:惰性求值,保持顺序
        for result in pool.imap(cpu_intensive, [10**6] * 8):
            print(f"imap result: {result}")
        
        # imap_unordered:惰性求值,不保持顺序(更快)
        for result in pool.imap_unordered(cpu_intensive, [10**6] * 8):
            print(f"imap_unordered result: {result}")
        
        # apply_async:异步提交单个任务
        result = pool.apply_async(cpu_intensive, args=(10**6,))
        print(f"Async result: {result.get()}")
        
        # starmap:支持多参数
        def add(a, b):
            return a + b
        results = pool.starmap(add, [(1, 2), (3, 4), (5, 6)])

# ============ 方式2:ProcessPoolExecutor(推荐) ============
if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=4) as executor:
        # submit
        futures = [executor.submit(cpu_intensive, 10**6) for _ in range(8)]
        
        for future in as_completed(futures):
            print(f"Result: {future.result()}")
        
        # map
        results = executor.map(cpu_intensive, [10**6] * 8)
        for result in results: 
            print(f"Map result: {result}")

# ============ 进程池 vs 线程池对比 ============
"""
ProcessPoolExecutor:
    - 适合 CPU 密集型任务
    - 绑过 GIL
    - 进程创建开销大
    - 需要序列化数据(pickle)

ThreadPoolExecutor:
    - 适合 I/O 密集型任务
    - 受 GIL 限制
    - 线程创建开销小
    - 直接共享内存
"""

3.4 多进程最佳实践

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import logging

# ============ 1. Windows 兼容性 ============
# Windows 使用 spawn 方式创建进程,必须有 __main__ 保护
if __name__ == "__main__":
    mp.set_start_method('spawn')  # 'fork' (Unix默认), 'spawn' (Windows默认), 'forkserver'

# ============ 2. 日志处理 ============
def worker_with_logging(x):
    logger = logging.getLogger(__name__)
    logger.info(f"Processing {x}")
    return x ** 2

# ============ 3. 异常处理 ============
def safe_worker(x):
    try: 
        return x ** 2
    except Exception as e:
        return {"error": str(e), "input": x}

# ============ 4. 初始化函数 ============
def init_worker():
    """进程初始化函数"""
    import signal
    signal.signal(signal.SIGINT, signal.SIG_IGN)  # 忽略 Ctrl+C

if __name__ == "__main__": 
    with mp.Pool(4, initializer=init_worker) as pool:
        results = pool.map(safe_worker, range(10))

# ============ 5. 大数据传输优化 ============
# 使用共享内存而非 pickle
from multiprocessing import shared_memory
import numpy as np

def process_shared_data(shm_name, shape, dtype):
    shm = shared_memory.SharedMemory(name=shm_name)
    arr = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
    # 处理数据... 
    shm.close()
    return arr.sum()

四、协程与异步编程(asyncio)

4.1 协程基础

import asyncio
from typing import List

# ============ 定义协程 ============
async def hello(name: str) -> str:
    """async 定义协程函数"""
    print(f"Hello {name}, starting...")
    await asyncio.sleep(1)  # 非阻塞等待
    print(f"Hello {name}, finished!")
    return f"Result from {name}"

# ============ 运行协程 ============
# 方式1:asyncio.run()(推荐,Python 3.7+)
async def main():
    result = await hello("World")
    print(result)

asyncio.run(main())

# 方式2:获取事件循环(低级 API)
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())

# ============ 并发执行多个协程 ============
async def main_concurrent():
    # 创建任务
    task1 = asyncio.create_task(hello("Alice"))
    task2 = asyncio.create_task(hello("Bob"))
    
    # 等待所有任务完成
    result1 = await task1
    result2 = await task2
    print(f"Results:  {result1}, {result2}")

# 更简洁的方式
async def main_gather():
    results = await asyncio.gather(
        hello("Alice"),
        hello("Bob"),
        hello("Charlie"),
    )
    print(f"All results: {results}")

asyncio.run(main_gather())
# 总耗时约 1 秒(并发执行),而非 3 秒(串行执行)

4.2 异步任务控制

import asyncio
from typing import List, Any

# ============ asyncio.gather:并发执行,收集所有结果 ============
async def fetch(url: str) -> str:
    await asyncio.sleep(1)
    return f"Content from {url}"

async def main_gather():
    urls = ["url1", "url2", "url3"]
    
    # 收集所有结果(保持顺序)
    results = await asyncio.gather(*[fetch(url) for url in urls])
    
    # 允许部分失败
    results = await asyncio.gather(
        *[fetch(url) for url in urls],
        return_exceptions=True  # 异常作为返回值,不抛出
    )

# ============ asyncio.wait:更灵活的等待控制 ============
async def main_wait():
    tasks = [asyncio.create_task(fetch(f"url{i}")) for i in range(5)]
    
    # 等待所有完成
    done, pending = await asyncio.wait(tasks)
    
    # 等待第一个完成
    done, pending = await asyncio.wait(
        tasks, 
        return_when=asyncio.FIRST_COMPLETED
    )
    
    # 带超时
    done, pending = await asyncio.wait(tasks, timeout=0.5)
    
    # 取消未完成的任务
    for task in pending:
        task.cancel()

# ============ asyncio.as_completed:先完成先处理 ============
async def main_as_completed():
    tasks = [fetch(f"url{i}") for i in range(5)]
    
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"Got:  {result}")

# ============ asyncio.wait_for:单任务超时 ============
async def main_timeout():
    try:
        result = await asyncio.wait_for(
            fetch("slow_url"),
            timeout=0.5
        )
    except asyncio.TimeoutError:
        print("Task timed out!")

# ============ asyncio.shield:保护任务不被取消 ============
async def main_shield():
    task = asyncio.create_task(fetch("important_url"))
    
    try:
        result = await asyncio.wait_for(
            asyncio.shield(task),  # 即使超时,任务也会继续执行
            timeout=0.5
        )
    except asyncio.TimeoutError:
        # task 仍在运行
        result = await task  # 可以继续等待

4.3 异步同步原语

import asyncio

# ============ asyncio.Lock ============
async def protected_operation(lock:  asyncio.Lock, name: str):
    async with lock:
        print(f"{name} acquired lock")
        await asyncio.sleep(1)
        print(f"{name} releasing lock")

async def main_lock():
    lock = asyncio.Lock()
    await asyncio.gather(
        protected_operation(lock, "Task1"),
        protected_operation(lock, "Task2"),
    )

# ============ asyncio.Semaphore ============
async def limited_operation(sem: asyncio.Semaphore, n: int):
    async with sem: 
        print(f"Task {n} started")
        await asyncio.sleep(1)
        print(f"Task {n} finished")

async def main_semaphore():
    sem = asyncio.Semaphore(3)  # 最多 3 个并发
    await asyncio.gather(*[
        limited_operation(sem, i) for i in range(10)
    ])

# ============ asyncio.Event ============
async def waiter(event: asyncio.Event, name: str):
    print(f"{name} waiting...")
    await event.wait()
    print(f"{name} got event!")

async def main_event():
    event = asyncio.Event()
    
    asyncio.create_task(waiter(event, "Task1"))
    asyncio.create_task(waiter(event, "Task2"))
    
    await asyncio.sleep(1)
    event.set()  # 通知所有等待者
    await asyncio.sleep(0.1)

# ============ asyncio.Queue ============
async def producer(queue: asyncio.Queue):
    for i in range(5):
        await queue.put(i)
        print(f"Produced: {i}")
        await asyncio.sleep(0.5)
    await queue.put(None)  # 结束信号

async def consumer(queue: asyncio.Queue):
    while True:
        item = await queue.get()
        if item is None: 
            break
        print(f"Consumed: {item}")
        queue.task_done()

async def main_queue():
    queue = asyncio.Queue(maxsize=3)
    
    await asyncio.gather(
        producer(queue),
        consumer(queue),
    )

4.4 实战:异步 HTTP 客户端

import asyncio
import aiohttp
from typing import List, Dict, Any
import logging
from dataclasses import dataclass
from contextlib import asynccontextmanager

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class Response:
    url:  str
    status:  int
    data: Any

class AsyncHTTPClient:
    """异步 HTTP 客户端"""
    
    def __init__(self, max_concurrent: int = 10, timeout: float = 30):
        self._semaphore = asyncio.Semaphore(max_concurrent)
        self._timeout = aiohttp.ClientTimeout(total=timeout)
        self._session: aiohttp.ClientSession | None = None
    
    async def __aenter__(self):
        self._session = aiohttp.ClientSession(timeout=self._timeout)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self._session:
            await self._session.close()
    
    async def fetch(self, url: str) -> Response:
        """获取单个 URL"""
        async with self._semaphore:
            try:
                async with self._session.get(url) as response:
                    data = await response.json()
                    return Response(url=url, status=response.status, data=data)
            except Exception as e:
                logger.error(f"Error fetching {url}: {e}")
                return Response(url=url, status=-1, data={"error": str(e)})
    
    async def fetch_all(self, urls: List[str]) -> List[Response]: 
        """并发获取多个 URL"""
        tasks = [self.fetch(url) for url in urls]
        return await asyncio.gather(*tasks)

# 使用示例
async def main():
    urls = [
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
        "https://httpbin.org/user-agent",
    ]
    
    async with AsyncHTTPClient(max_concurrent=5) as client:
        responses = await client.fetch_all(urls)
        for resp in responses:
            logger.info(f"{resp.url}: {resp.status}")

# asyncio.run(main())

4.5 异步上下文管理器与迭代器

import asyncio
from typing import AsyncIterator

# ============ 异步上下文管理器 ============
class AsyncDatabase:
    """异步数据库连接"""
    
    async def __aenter__(self):
        print("Connecting to database...")
        await asyncio.sleep(0.5)  # 模拟连接
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing database connection...")
        await asyncio.sleep(0.1)
    
    async def query(self, sql: str):
        await asyncio.sleep(0.1)
        return [{"id": 1, "name":  "Alice"}]

async def main_context():
    async with AsyncDatabase() as db:
        result = await db.query("SELECT * FROM users")
        print(result)

# 使用 contextlib
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_timer():
    import time
    start = time.perf_counter()
    try:
        yield
    finally: 
        elapsed = time.perf_counter() - start
        print(f"Elapsed:  {elapsed:.4f}s")

async def main_timer():
    async with async_timer():
        await asyncio.sleep(1)

# ============ 异步迭代器 ============
class AsyncCounter:
    """异步计数器"""
    
    def __init__(self, stop:  int):
        self.current = 0
        self.stop = stop
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.current >= self.stop:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)  # 模拟异步操作
        self.current += 1
        return self.current

async def main_aiter():
    async for num in AsyncCounter(5):
        print(num)

# ============ 异步生成器 ============
async def async_range(start:  int, stop: int) -> AsyncIterator[int]:
    """异步生成器"""
    for i in range(start, stop):
        await asyncio.sleep(0.1)
        yield i

async def main_agen():
    async for num in async_range(0, 5):
        print(num)

4.6 asyncio 实战模式

import asyncio
from typing import Callable, Any
import functools

# ============ 1. 同步函数转异步 ============
def blocking_io(x: int) -> int:
    """阻塞的 I/O 操作"""
    import time
    time.sleep(1)
    return x ** 2

async def main_executor():
    loop = asyncio.get_event_loop()
    
    # 在线程池中执行阻塞函数
    result = await loop.run_in_executor(None, blocking_io, 5)
    print(result)
    
    # 指定线程池
    from concurrent.futures import ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=4) as pool:
        result = await loop.run_in_executor(pool, blocking_io, 10)

# 装饰器版本
def async_wrap(func: Callable) -> Callable:
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            None, 
            functools.partial(func, *args, **kwargs)
        )
    return wrapper

@async_wrap
def sync_function(x: int) -> int:
    import time
    time.sleep(1)
    return x ** 2

# ============ 2. 重试装饰器 ============
def async_retry(max_attempts: int = 3, delay: float = 1.0):
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(max_attempts):
                try: 
                    return await func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    if attempt < max_attempts - 1:
                        await asyncio.sleep(delay * (attempt + 1))
            raise last_exception
        return wrapper
    return decorator

@async_retry(max_attempts=3, delay=1.0)
async def unreliable_operation():
    import random
    if random.random() < 0.7:
        raise ValueError("Random failure")
    return "Success"

# ============ 3. 限速器 ============
class RateLimiter:
    """令牌桶限速器"""
    
    def __init__(self, rate: float, max_tokens: int = 10):
        self.rate = rate  # 每秒产生的令牌数
        self.max_tokens = max_tokens
        self.tokens = max_tokens
        self.last_update = asyncio.get_event_loop().time()
        self._lock = asyncio.Lock()
    
    async def acquire(self):
        async with self._lock:
            now = asyncio.get_event_loop().time()
            elapsed = now - self.last_update
            self.tokens = min(self.max_tokens, self.tokens + elapsed * self.rate)
            self.last_update = now
            
            if self.tokens < 1:
                wait_time = (1 - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                self.tokens = 0
            else:
                self.tokens -= 1

# ============ 4. 优雅关闭 ============
async def graceful_shutdown(loop: asyncio.AbstractEventLoop):
    """优雅关闭所有任务"""
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    
    for task in tasks: 
        task.cancel()
    
    await asyncio.gather(*tasks, return_exceptions=True)
    loop.stop()

def main_with_shutdown():
    loop = asyncio.new_event_loop()
    
    try:
        loop.run_until_complete(main())
    except KeyboardInterrupt: 
        loop.run_until_complete(graceful_shutdown(loop))
    finally:
        loop.close()

五、并发编程对比总结

5.1 三种方式对比

特性 多线程 多进程 协程
适用场景 I/O 密集型 CPU 密集型 I/O 密集型
GIL 影响 受限制 不受限制 不受限制
内存占用 中等 (~1MB/线程) 高 (~10MB/进程) 低 (~KB/协程)
创建开销 中等
通信方式 共享内存 IPC (Queue/Pipe) 直接 await
切换开销 系统调度 系统调度 用户态切换
编程难度 中等 中等 较高
调试难度 中等 中等

5.2 选择决策树

需要并发处理?
│
├─ CPU 密集型任务?
│   ├─ 是 → 多进程 (ProcessPoolExecutor)
│   └─ 否 → I/O 密集型
│           │
│           ├─ 高并发 (>1000)?
│           │   └─ 是 → asyncio
│           │
│           ├─ 需要调用阻塞库?
│           │   └─ 是 → 多线程 (ThreadPoolExecutor)
│           │
│           └─ 其他 → asyncio (推荐)
│
└─ 混合场景?
    └─ asyncio + run_in_executor

5.3 性能对比代码

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp

# ============ I/O 密集型测试 ============
def io_task():
    time.sleep(0.1)
    return 1

async def async_io_task():
    await asyncio.sleep(0.1)
    return 1

def test_io_bound(n:  int = 100):
    # 串行
    start = time.perf_counter()
    for _ in range(n):
        io_task()
    print(f"串行: {time.perf_counter() - start:.2f}s")
    
    # 多线程
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=20) as executor:
        list(executor.map(io_task, range(n)))
    print(f"多线程: {time.perf_counter() - start:.2f}s")
    
    # 协程
    async def run_async():
        tasks = [async_io_task() for _ in range(n)]
        await asyncio.gather(*tasks)
    
    start = time.perf_counter()
    asyncio.run(run_async())
    print(f"协程: {time.perf_counter() - start:.2f}s")

# ============ CPU 密集型测试 ============
def cpu_task(n: int) -> int:
    return sum(i ** 2 for i in range(n))

def test_cpu_bound(n: int = 8, size: int = 10**6):
    # 串行
    start = time.perf_counter()
    for _ in range(n):
        cpu_task(size)
    print(f"串行:  {time.perf_counter() - start:.2f}s")
    
    # 多线程(受 GIL 限制)
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=4) as executor:
        list(executor.map(cpu_task, [size] * n))
    print(f"多线程:  {time.perf_counter() - start:.2f}s")
    
    # 多进程
    start = time.perf_counter()
    with ProcessPoolExecutor(max_workers=4) as executor:
        list(executor.map(cpu_task, [size] * n))
    print(f"多进程:  {time.perf_counter() - start:.2f}s")

if __name__ == "__main__":
    print("=== I/O 密集型 ===")
    test_io_bound(100)
    
    print("\n=== CPU 密集型 ===")
    test_cpu_bound(8)

典型输出:

=== I/O 密集型 ===
串行:  10.05s
多线程: 0.52s
协程:  0.11s  ← 协程最快

=== CPU 密集型 ===
串行: 3.20s
多线程: 3.35s  ← GIL 限制,比串行还慢
多进程: 0.95s  ← 多进程最快

📝 Part 4 总结

面试高频考点

知识点 面试频率 难度
GIL 是什么?有什么影响? ⭐⭐⭐⭐⭐ ⭐⭐⭐
多线程 vs 多进程 vs 协程区别 ⭐⭐⭐⭐⭐ ⭐⭐⭐
什么场景用什么并发模型? ⭐⭐⭐⭐⭐ ⭐⭐⭐
线程安全与锁 ⭐⭐⭐⭐ ⭐⭐⭐
ThreadPoolExecutor 用法 ⭐⭐⭐⭐ ⭐⭐
asyncio 基础用法 ⭐⭐⭐⭐ ⭐⭐⭐
async/await 原理 ⭐⭐⭐ ⭐⭐⭐⭐
进程间通信方式 ⭐⭐⭐ ⭐⭐⭐
生产者-消费者模式 ⭐⭐⭐⭐ ⭐⭐⭐
死锁的原因与避免 ⭐⭐⭐ ⭐⭐⭐

面试常见问题

  1. 什么是 GIL?它对多线程有什么影响?
  2. Python 的多线程是真正的并行吗?
  3. 什么情况下用多线程,什么情况下用多进程?
  4. asyncio 的工作原理是什么?和多线程有什么区别?
  5. 如何实现一个线程安全的单例模式?
  6. 什么是死锁?如何避免?
  7. 如何实现生产者-消费者模式?
  8. 进程间通信有哪些方式?各有什么优缺点?
  9. 协程比多线程有什么优势?
  10. 如何在 asyncio 中执行阻塞的同步代码?
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐