基础概念

进程与线程

  • 进程:程序的一次运行实例,是操作系统资源分配的基本单位,每个进程有独立的内存空间。
  • 线程:进程内的执行单元,是 CPU 调度的基本单位,多个线程共享进程的内存资源(如全局变量、堆内存),切换开销远小于进程。
  • 多线程:一个进程中并发运行多个线程

线程状态

线程的生命周期包含 5 种状态,理解状态切换有助于调试多线程程序:

  • 新建:创建线程对象但未启动。
  • 就绪:调用start()后,线程等待 CPU 调度。
  • 运行:CPU 执行线程的run()方法。
  • 阻塞:线程因 I/O、休眠、等待锁等暂停执行。
  • 终止:线程执行完毕或异常退出。

Python 多线程核心模块

Python 提供了多个实现多线程的模块,其中threading是主流选择,_thread为底层低级模块(不推荐直接使用),此外还有高级封装的concurrent.futures。这篇博客主要讲解threading模块和concurrent.futures模块的使用。

threading模块

  • threading是对_thread的高级封装,提供了线程、锁、事件、条件变量等丰富功能,是 Python 多线程的核心模块。
  • 无论在Python还是Java中,启动多线程本质上无非就是实例化线程对象,并启动多线程的相关方法来控制多线程而已。

启动多线程

threading模块模块的核心类是Thread类,用于启动多线程并提供了多个方法来控制多线程的执行过程。创建Thread对象有两种方式,一是自定义类继承Thread类,重写run方法,第二种方法是传入任务函数创建 Thread 对象。

  • 继承Thread类,重写run方法

    	import threading
    		import time
    		
    		class MyThread(threading.Thread):
    		    def __init__(self, name):
    		        super().__init__()
    		        self.name = name
    		
    		    def run(self):
    		        # 线程执行的任务
    		        for i in range(3):
    		            print(f"线程{self.name}执行:{i}")
    		            time.sleep(1)
    		
    		# 创建并启动线程
    		t1 = MyThread("A")
    		t2 = MyThread("B")
    		t1.start()
    		t2.start()
    
  • 传入任务函数创建 Thread 对象

    import threading
    		import time
    		
    		def task(name):
    		    for i in range(3):
    		        print(f"线程{name}执行:{i}")
    		        time.sleep(1)
    		
    		# 创建线程
    		t1 = threading.Thread(target=task, args=("A",))
    		t2 = threading.Thread(target=task, args=("B",))
    		# 或者使用kwargs传递字典形式的参数
    		#t1 = threading.Thread(target=task, kwargs={"name": "A"})
    		#t2 = threading.Thread(target=task, kwargs={"name": "B"})
    		
    		# 启动线程
    		t1.start()
    		t2.start()
    		
    		# 等待线程执行完毕(主线程阻塞)
    		t1.join()
    		t2.join()
    		print("所有线程执行完毕")
    

线程的关键操作

  1. 启动线程:start() vs run()

    • start():启动线程,将线程放入就绪队列,由操作系统调度执行 run() 方法(不可重复调用)
    • run():线程核心逻辑方法,直接调用会以单线程方式执行(不创建新线程)
  2. 等待线程:join(timeout=None)

    • 主线程调用 t.join() 后,会阻塞直到线程 t 终止,或等待 timeout 秒(超时后主线程继续执行)
    • 作用:确保主线程在子线程完成后再执行后续逻辑(如汇总结果)
  3. 线程状态判断

    • is_alive():返回线程是否处于运行 / 就绪状态(True 表示未终止)
    • getName()/setName():获取 / 设置线程名称(默认 Thread-1、Thread-2…)
    • ident:获取线程唯一标识符(终止后失效)
  4. 守护线程(Daemon Thread)

    • 守护线程是「后台线程」,主线程退出时,守护线程会被强制终止(无需等待)
    • 非守护线程(默认):主线程会等待所有非守护线程执行完毕后才退出
    • 设置方式:创建线程时指定 daemon=True,或调用 t.setDaemon(True)(必须在 start() 前设置)
    import threading
    import time
    
    def daemon_task():
        while True:
            print("守护线程运行中...")
            time.sleep(1)
    
    def normal_task():
        time.sleep(3)
        print("非守护线程执行完成")
    
    if __name__ == "__main__":
        # 守护线程
        t1 = threading.Thread(target=daemon_task, daemon=True)
        # 非守护线程
        t2 = threading.Thread(target=normal_task)
    
        t1.start()
        t2.start()
    
        t2.join()  # 等待非守护线程完成(3秒)
        print("主线程退出(守护线程被终止)")
    

线程同步:线程安全问题

  1. 什么是线程同步,线程同步就是多线程之间按照合理的顺序访问共享资源,避免由于多线程由于切换执行导致数据不一致和安全问题
  2. 解决线程同步的核心思想就是在某一段时间,只允许一个线程访问共享资源,保证线程访问的原子性操作。
  3. 解决线程同步的方式主要通过锁和线程间通信

互斥锁(threading.Lock)

  • 最常用的同步工具,通过「加锁 - 操作 - 解锁」保证临界区代码的原子性。

  • 特点:同一时刻只能有一个线程持有锁,其他线程会阻塞直到锁释放。

    import threading
    
    count = 0  # 共享资源
    lock = threading.Lock()  # 创建互斥锁
    
    def increment():
        global count
        for _ in range(100000):
            lock.acquire()  # 加锁(阻塞直到获取锁)
            try:
                # 临界区:操作共享资源(必须保证原子性)
                count += 1
            finally:
                lock.release()  # 解锁(避免异常导致锁未释放)
    
    if __name__ == "__main__":
        t1 = threading.Thread(target=increment)
        t2 = threading.Thread(target=increment)
    
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    
        print(f"最终计数:{count}")  # 正确结果:200000(不加锁可能小于200000)
    

可重入锁(threading.RLock)

  • 允许同一线程多次获取锁(解决嵌套锁场景),解锁次数需与加锁次数一致。

  • 对比 Lock:Lock 是不可重入的,同一线程多次 acquire() 会死锁

    rlock = threading.RLock()
    
    def nested_task():
        rlock.acquire()
        print("第一次获取锁")
        rlock.acquire()
        print("第二次获取锁(可重入)")
        rlock.release()
        rlock.release()
    
    threading.Thread(target=nested_task).start()
    

信号量(threading.Semaphore)

  • 控制并发线程数量(如限制最大连接数、最大并发任务数)。

  • 特点:维护一个计数器,acquire() 减 1(计数器为 0 时阻塞),release() 加 1。

    import threading
    import time
    
    sem = threading.Semaphore(3)  # 限制最大并发数为3
    
    def task(name):
        sem.acquire()  # 获取许可(计数器减1)
        print(f"线程 {name} 开始执行(当前并发数:{3 - sem._value})")
        time.sleep(2)  # 模拟任务执行
        print(f"线程 {name} 执行完成")
        sem.release()  # 释放许可(计数器加1)
    
    if __name__ == "__main__":
        for i in range(5):
            threading.Thread(target=task, args=(f"A{i}",)).start()
    

事件(threading.Event)

  • 线程间的简单通信工具,通过 set()/clear() 控制事件状态,其他线程通过 wait() 等待事件触发。

  • 特点:事件是「一次性」的(set() 后状态保持为 True,需手动 clear() 重置)。

    import threading
    import time
    
    event = threading.Event()  # 初始状态为 False
    
    def wait_task():
        print("等待事件触发...")
        event.wait()  # 阻塞直到事件被 set()
        print("事件触发,继续执行")
    
    def trigger_task():
        time.sleep(3)
        print("触发事件!")
        event.set()  # 设置事件状态为 True
    
    if __name__ == "__main__":
        t1 = threading.Thread(target=wait_task)
        t2 = threading.Thread(target=trigger_task)
    
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    

线程通信

线程间需传递数据时,推荐使用线程安全的数据结构,避免手动加锁的繁琐。

队列(queue.Queue)

  • 线程安全的先进先出(FIFO)队列,支持 put()(存入数据)、get()(取出数据),内置锁机制

  • 常用场景:生产者 - 消费者模型(生产者线程存入任务,消费者线程取出执行)

    import threading
    import queue
    import time
    
    # 线程安全队列(最大容量为3)
    q = queue.Queue(maxsize=3)
    
    def producer():
        """生产者线程:向队列存入数据"""
        for i in range(5):
            q.put(f"任务{i}")
            print(f"生产:任务{i}(队列剩余:{q.qsize()})")
            time.sleep(0.5)
    
    def consumer():
        """消费者线程:从队列取出数据"""
        while True:
            task = q.get()  # 队列空时阻塞
            print(f"消费:{task}(队列剩余:{q.qsize()})")
            q.task_done()  # 标记任务完成(配合 join() 等待所有任务执行)
            time.sleep(1)
    
    if __name__ == "__main__":
        t1 = threading.Thread(target=producer)
        t2 = threading.Thread(target=consumer, daemon=True)  # 守护线程,随主线程退出
    
        t1.start()
        t2.start()
    
        t1.join()  # 等待生产者完成
        q.join()   # 等待队列中所有任务被消费
        print("所有任务生产消费完成")
    

线程池

  1. 手动创建大量线程会导致线程切换开销增大,线程池通过「复用线程」减少创建 / 销毁成本,同时限制最大并发数。

  2. concurrent.futures.ThreadPoolExecutor是比较推荐使用的线程池类,支持自动管理线程数量,简化任务提交和结果获取。

  3. 核心方法:submit()(提交单个任务)、map()(批量提交任务,返回结果顺序与任务一致)。

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    def task(name, delay):
        print(f"线程 {name} 启动,延迟 {delay}s 执行")
        time.sleep(delay)
        return f"线程 {name} 完成"
    
    if __name__ == "__main__":
        # 创建线程池,最大线程数为3
        with ThreadPoolExecutor(max_workers=3) as executor:
            # 方式1:submit() 提交单个任务(返回 Future 对象,可获取结果)
            futures = [
                executor.submit(task, f"A{i}", i) for i in range(1, 5)  # 4个任务,池内3个线程并发
            ]
            # 遍历获取任务结果(按任务完成顺序)
            for future in futures:
                print(future.result())
    
            # 方式2:map() 批量提交任务(简化代码,适合批量相同任务)
    			 names = ["B1", "B2", "B3"]
            delays = [1, 2, 1.5]
    
            # map 会自动将 names[i] 和 delays[i] 作为参数传递给 task 函数
            results = executor.map(task, names, delays)
    
            # 遍历获取结果
            for res in results:
                print(res)
    
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐