Python 多线程核心知识点解析
Python 提供了多个实现多线程的模块,其中threading是主流选择,_thread为底层低级模块(不推荐直接使用),此外还有高级封装的concurrent.futures。线程间的简单通信工具,通过 set()/clear() 控制事件状态,其他线程通过 wait() 等待事件触发。线程安全的先进先出(FIFO)队列,支持 put()(存入数据)、get()(取出数据),内置锁机制。核心方
目录
基础概念
进程与线程
- 进程:程序的一次运行实例,是操作系统资源分配的基本单位,每个进程有独立的内存空间。
- 线程:进程内的执行单元,是 CPU 调度的基本单位,多个线程共享进程的内存资源(如全局变量、堆内存),切换开销远小于进程。
- 多线程:一个进程中并发运行多个线程
线程状态
线程的生命周期包含 5 种状态,理解状态切换有助于调试多线程程序:
- 新建:创建线程对象但未启动。
- 就绪:调用start()后,线程等待 CPU 调度。
- 运行:CPU 执行线程的run()方法。
- 阻塞:线程因 I/O、休眠、等待锁等暂停执行。
- 终止:线程执行完毕或异常退出。
Python 多线程核心模块
Python 提供了多个实现多线程的模块,其中threading是主流选择,_thread为底层低级模块(不推荐直接使用),此外还有高级封装的concurrent.futures。这篇博客主要讲解threading模块和concurrent.futures模块的使用。
threading模块
- threading是对_thread的高级封装,提供了线程、锁、事件、条件变量等丰富功能,是 Python 多线程的核心模块。
- 无论在Python还是Java中,启动多线程本质上无非就是实例化线程对象,并启动多线程的相关方法来控制多线程而已。
启动多线程
threading模块模块的核心类是Thread类,用于启动多线程并提供了多个方法来控制多线程的执行过程。创建Thread对象有两种方式,一是自定义类继承Thread类,重写run方法,第二种方法是传入任务函数创建 Thread 对象。
-
继承Thread类,重写run方法
import threading import time class MyThread(threading.Thread): def __init__(self, name): super().__init__() self.name = name def run(self): # 线程执行的任务 for i in range(3): print(f"线程{self.name}执行:{i}") time.sleep(1) # 创建并启动线程 t1 = MyThread("A") t2 = MyThread("B") t1.start() t2.start() -
传入任务函数创建 Thread 对象
import threading import time def task(name): for i in range(3): print(f"线程{name}执行:{i}") time.sleep(1) # 创建线程 t1 = threading.Thread(target=task, args=("A",)) t2 = threading.Thread(target=task, args=("B",)) # 或者使用kwargs传递字典形式的参数 #t1 = threading.Thread(target=task, kwargs={"name": "A"}) #t2 = threading.Thread(target=task, kwargs={"name": "B"}) # 启动线程 t1.start() t2.start() # 等待线程执行完毕(主线程阻塞) t1.join() t2.join() print("所有线程执行完毕")
线程的关键操作
-
启动线程:start() vs run()
- start():启动线程,将线程放入就绪队列,由操作系统调度执行 run() 方法(不可重复调用)
- run():线程核心逻辑方法,直接调用会以单线程方式执行(不创建新线程)
-
等待线程:join(timeout=None)
- 主线程调用 t.join() 后,会阻塞直到线程 t 终止,或等待 timeout 秒(超时后主线程继续执行)
- 作用:确保主线程在子线程完成后再执行后续逻辑(如汇总结果)
-
线程状态判断
- is_alive():返回线程是否处于运行 / 就绪状态(True 表示未终止)
- getName()/setName():获取 / 设置线程名称(默认 Thread-1、Thread-2…)
- ident:获取线程唯一标识符(终止后失效)
-
守护线程(Daemon Thread)
- 守护线程是「后台线程」,主线程退出时,守护线程会被强制终止(无需等待)
- 非守护线程(默认):主线程会等待所有非守护线程执行完毕后才退出
- 设置方式:创建线程时指定 daemon=True,或调用 t.setDaemon(True)(必须在 start() 前设置)
import threading import time def daemon_task(): while True: print("守护线程运行中...") time.sleep(1) def normal_task(): time.sleep(3) print("非守护线程执行完成") if __name__ == "__main__": # 守护线程 t1 = threading.Thread(target=daemon_task, daemon=True) # 非守护线程 t2 = threading.Thread(target=normal_task) t1.start() t2.start() t2.join() # 等待非守护线程完成(3秒) print("主线程退出(守护线程被终止)")
线程同步:线程安全问题
- 什么是线程同步,线程同步就是多线程之间按照合理的顺序访问共享资源,避免由于多线程由于切换执行导致数据不一致和安全问题
- 解决线程同步的核心思想就是在某一段时间,只允许一个线程访问共享资源,保证线程访问的原子性操作。
- 解决线程同步的方式主要通过锁和线程间通信
互斥锁(threading.Lock)
-
最常用的同步工具,通过「加锁 - 操作 - 解锁」保证临界区代码的原子性。
-
特点:同一时刻只能有一个线程持有锁,其他线程会阻塞直到锁释放。
import threading count = 0 # 共享资源 lock = threading.Lock() # 创建互斥锁 def increment(): global count for _ in range(100000): lock.acquire() # 加锁(阻塞直到获取锁) try: # 临界区:操作共享资源(必须保证原子性) count += 1 finally: lock.release() # 解锁(避免异常导致锁未释放) if __name__ == "__main__": t1 = threading.Thread(target=increment) t2 = threading.Thread(target=increment) t1.start() t2.start() t1.join() t2.join() print(f"最终计数:{count}") # 正确结果:200000(不加锁可能小于200000)
可重入锁(threading.RLock)
-
允许同一线程多次获取锁(解决嵌套锁场景),解锁次数需与加锁次数一致。
-
对比 Lock:Lock 是不可重入的,同一线程多次 acquire() 会死锁
rlock = threading.RLock() def nested_task(): rlock.acquire() print("第一次获取锁") rlock.acquire() print("第二次获取锁(可重入)") rlock.release() rlock.release() threading.Thread(target=nested_task).start()
信号量(threading.Semaphore)
-
控制并发线程数量(如限制最大连接数、最大并发任务数)。
-
特点:维护一个计数器,acquire() 减 1(计数器为 0 时阻塞),release() 加 1。
import threading import time sem = threading.Semaphore(3) # 限制最大并发数为3 def task(name): sem.acquire() # 获取许可(计数器减1) print(f"线程 {name} 开始执行(当前并发数:{3 - sem._value})") time.sleep(2) # 模拟任务执行 print(f"线程 {name} 执行完成") sem.release() # 释放许可(计数器加1) if __name__ == "__main__": for i in range(5): threading.Thread(target=task, args=(f"A{i}",)).start()
事件(threading.Event)
-
线程间的简单通信工具,通过 set()/clear() 控制事件状态,其他线程通过 wait() 等待事件触发。
-
特点:事件是「一次性」的(set() 后状态保持为 True,需手动 clear() 重置)。
import threading import time event = threading.Event() # 初始状态为 False def wait_task(): print("等待事件触发...") event.wait() # 阻塞直到事件被 set() print("事件触发,继续执行") def trigger_task(): time.sleep(3) print("触发事件!") event.set() # 设置事件状态为 True if __name__ == "__main__": t1 = threading.Thread(target=wait_task) t2 = threading.Thread(target=trigger_task) t1.start() t2.start() t1.join() t2.join()
线程通信
线程间需传递数据时,推荐使用线程安全的数据结构,避免手动加锁的繁琐。
队列(queue.Queue)
-
线程安全的先进先出(FIFO)队列,支持 put()(存入数据)、get()(取出数据),内置锁机制
-
常用场景:生产者 - 消费者模型(生产者线程存入任务,消费者线程取出执行)
import threading import queue import time # 线程安全队列(最大容量为3) q = queue.Queue(maxsize=3) def producer(): """生产者线程:向队列存入数据""" for i in range(5): q.put(f"任务{i}") print(f"生产:任务{i}(队列剩余:{q.qsize()})") time.sleep(0.5) def consumer(): """消费者线程:从队列取出数据""" while True: task = q.get() # 队列空时阻塞 print(f"消费:{task}(队列剩余:{q.qsize()})") q.task_done() # 标记任务完成(配合 join() 等待所有任务执行) time.sleep(1) if __name__ == "__main__": t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer, daemon=True) # 守护线程,随主线程退出 t1.start() t2.start() t1.join() # 等待生产者完成 q.join() # 等待队列中所有任务被消费 print("所有任务生产消费完成")
线程池
-
手动创建大量线程会导致线程切换开销增大,线程池通过「复用线程」减少创建 / 销毁成本,同时限制最大并发数。
-
concurrent.futures.ThreadPoolExecutor是比较推荐使用的线程池类,支持自动管理线程数量,简化任务提交和结果获取。 -
核心方法:submit()(提交单个任务)、map()(批量提交任务,返回结果顺序与任务一致)。
from concurrent.futures import ThreadPoolExecutor import time def task(name, delay): print(f"线程 {name} 启动,延迟 {delay}s 执行") time.sleep(delay) return f"线程 {name} 完成" if __name__ == "__main__": # 创建线程池,最大线程数为3 with ThreadPoolExecutor(max_workers=3) as executor: # 方式1:submit() 提交单个任务(返回 Future 对象,可获取结果) futures = [ executor.submit(task, f"A{i}", i) for i in range(1, 5) # 4个任务,池内3个线程并发 ] # 遍历获取任务结果(按任务完成顺序) for future in futures: print(future.result()) # 方式2:map() 批量提交任务(简化代码,适合批量相同任务) names = ["B1", "B2", "B3"] delays = [1, 2, 1.5] # map 会自动将 names[i] 和 delays[i] 作为参数传递给 task 函数 results = executor.map(task, names, delays) # 遍历获取结果 for res in results: print(res)
更多推荐


所有评论(0)