目录

一、 ThreadPool.h

1. ThreadPool 的整体定位

2. 逐模块拆解核心实现

3. 关键细节深度解析

1. 核心设计模式:生产者 - 消费者模型

2. 核心成员的设计逻辑

3. 设计亮点与注意事项

设计亮点

注意事项

二、 ThreadPool.cc

1. 代码整体核心逻辑回顾

2. 逐函数拆解核心实现

1. 构造函数:初始化核心资源

2. 析构函数:保证优雅停止

3. start():启动工作线程(核心)

4. stop():优雅停止线程池

5. queueSize():线程安全获取队列长度

6. run():提交任务(生产者逻辑)

7. take():取任务(消费者逻辑)

8. isFull():判断队列是否满(私有辅助函数)

9. runInThread():工作线程核心循环(消费者核心)

3. 核心设计亮点总结

总结


一、 ThreadPool.h

先贴出完整代码,再逐部分解释:

// 本源代码的使用受 BSD 风格许可证约束
// 该许可证可在 License 文件中查阅。
//
// 作者:陈硕 (chenshuo at chenshuo dot com)

#ifndef MUDUO_BASE_THREADPOOL_H
#define MUDUO_BASE_THREADPOOL_H

#include "muduo/base/Condition.h"  // 条件变量(配合互斥锁实现任务队列的等待/唤醒)
#include "muduo/base/Mutex.h"      // 互斥锁(保证任务队列和线程池状态的线程安全)
#include "muduo/base/Thread.h"     // 线程封装类(工作线程的创建/管理)
#include "muduo/base/Types.h"      // 基础类型定义(如 string)

#include <deque>                   // 双端队列:存储待执行的任务(支持高效的头删尾插)
#include <vector>                  // 向量:存储工作线程的智能指针

namespace muduo
{

// 线程池类(不可拷贝)
// 核心特性:1. 基于「生产者-消费者」模型,主线程提交任务,工作线程执行任务;
// 2. 支持有界任务队列(可配置最大容量),队列满时提交任务会阻塞;
// 3. 支持线程初始化回调,每个工作线程启动时执行;
// 4. 安全的启停机制,停止时等待所有任务执行完成。
class ThreadPool : noncopyable
{
 public:
  // 任务类型:无参数、无返回值的可调用对象(函数/函数对象/lambda/绑定器等)
  typedef std::function<void ()> Task;

  // 构造函数:创建线程池
  // @param nameArg 线程池名称(可选,用于日志/调试区分不同线程池)
  explicit ThreadPool(const string& nameArg = string("ThreadPool"));
  
  // 析构函数:销毁线程池(会自动调用 stop() 保证资源清理)
  ~ThreadPool();

  // 设置任务队列的最大容量(必须在 start() 之前调用)
  // @param maxSize 队列最大任务数,0 表示无界队列(默认)
  void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; }
  
  // 设置线程初始化回调函数(必须在 start() 之前调用)
  // @param cb 每个工作线程启动后,执行第一个任务前会调用该回调
  void setThreadInitCallback(const Task& cb)
  { threadInitCallback_ = cb; }

  // 启动线程池:创建指定数量的工作线程并开始运行
  // @param numThreads 工作线程数量(必须 >=1)
  void start(int numThreads);
  
  // 停止线程池:1. 标记 running_ 为 false,唤醒所有阻塞的工作线程;
  // 2. 不再接受新任务,等待所有已提交任务执行完成;3. 等待所有工作线程退出。
  void stop();

  // 获取线程池名称(只读)
  const string& name() const
  { return name_; }

  // 获取当前任务队列的任务数量(线程安全)
  size_t queueSize() const;

  // 提交任务到线程池
  // 特性:1. 若任务队列已满且 maxQueueSize_ > 0,会阻塞直到队列有空闲位置;
  // 2. 若已调用 stop(),则立即返回(不会提交任务);
  // 3. C++14 及之前的 std::function 不支持仅移动(move-only)类型,
  //    因此无需像 BlockingQueue 那样重载 const& 和 && 版本的接口;
  //    参考:https://stackoverflow.com/a/25408989
  // @param f 要提交的任务(Task 类型)
  void run(Task f);

 private:
  // 检查任务队列是否已满(仅在持有 mutex_ 时调用)
  // REQUIRES(mutex_):Clang 线程安全注解,标记该函数必须持有 mutex_ 才能调用
  bool isFull() const REQUIRES(mutex_);
  
  // 工作线程的核心执行函数:循环从任务队列取任务并执行
  void runInThread();
  
  // 从任务队列取出一个任务(队列为空时阻塞,直到有新任务或线程池停止)
  // @return 待执行的任务,若线程池停止则返回空任务
  Task take();

  mutable MutexLock mutex_;               // 互斥锁(mutable 允许 const 函数加锁)
  Condition notEmpty_ GUARDED_BY(mutex_); // 条件变量:任务队列非空时唤醒工作线程(受 mutex_ 保护)
  Condition notFull_ GUARDED_BY(mutex_);  // 条件变量:任务队列非满时唤醒提交任务的线程(受 mutex_ 保护)
  string name_;                           // 线程池名称(用于日志/调试)
  Task threadInitCallback_;               // 线程初始化回调函数(可选)
  // 工作线程列表:用 unique_ptr 管理 Thread 对象,自动释放资源
  std::vector<std::unique_ptr<muduo::Thread>> threads_;
  std::deque<Task> queue_ GUARDED_BY(mutex_); // 任务队列(受 mutex_ 保护,双端队列高效存取)
  size_t maxQueueSize_;                   // 任务队列最大容量(0 表示无界)
  bool running_;                          // 线程池运行状态(true=运行中,false=已停止)
};

}  // namespace muduo

#endif  // MUDUO_BASE_THREADPOOL_H

1. ThreadPool 的整体定位

  • 复用工作线程:避免频繁创建 / 销毁线程的开销(线程创建是系统级开销,复用可大幅提升性能);
  • 异步任务调度:生产者(主线程 / 其他线程)通过 run() 提交任务,消费者(工作线程)自动取任务执行,解耦任务提交与执行;
  • 可控的任务队列:支持设置队列最大长度,避免任务堆积导致内存溢出;
  • 优雅启停:支持 stop() 优雅停止线程池(等待所有任务执行完成 / 中断,不强制终止线程);
  • 灵活扩展:支持线程初始化回调(每个工作线程启动时执行自定义逻辑,如初始化日志器、内存池);
  • 线程安全:所有对共享资源(任务队列、线程状态)的访问都在互斥锁保护下,结合双条件变量(notEmpty_/notFull_)实现高效同步。

它是 Muduo 中异步 IO、定时器、日志异步输出等场景的核心依赖,也是 C++ 线程池实现的经典范例。

2. 逐模块拆解核心实现

class ThreadPool : noncopyable  // 禁止拷贝:线程池管理一组线程和任务队列,拷贝会导致资源混乱
{
 public:
  // 任务类型:封装任意可调用对象(函数、lambda、std::bind、成员函数),无参数无返回值
  typedef std::function<void ()> Task;

  // 构造函数:显式构造,可选指定线程池名称(调试友好)
  explicit ThreadPool(const string& nameArg = string("ThreadPool"));
  ~ThreadPool();  // 析构时自动调用stop(),优雅停止线程池

  // 配置接口(必须在start()前调用):
  // 设置任务队列最大长度(0表示无限制)
  void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; }
  // 设置线程初始化回调:每个工作线程启动时执行的自定义逻辑
  void setThreadInitCallback(const Task& cb) { threadInitCallback_ = cb; }

  // 核心接口1:启动线程池,创建numThreads个工作线程
  void start(int numThreads);
  // 核心接口2:优雅停止线程池(等待所有工作线程退出,清空任务队列)
  void stop();

  // 获取线程池名称(const接口,线程安全)
  const string& name() const { return name_; }

  // 获取当前任务队列长度(线程安全)
  size_t queueSize() const;

  // 核心接口3:提交任务(可能阻塞,若队列满且线程池运行中)
  // 注:C++14前std::function无移动版本,无需重载const&和&&
  void run(Task f);

 private:
  // 私有函数1:判断任务队列是否满(需持有mutex_,REQUIRES是Clang线程安全注解)
  bool isFull() const REQUIRES(mutex_);
  // 私有函数2:工作线程的核心执行逻辑(循环取任务、执行任务)
  void runInThread();
  // 私有函数3:从任务队列取任务(阻塞直到有任务/线程池停止)
  Task take();

  // 核心成员变量(线程安全设计是核心)
  mutable MutexLock mutex_;                // 保护所有共享资源,mutable支持const函数加锁
  Condition notEmpty_ GUARDED_BY(mutex_);  // 条件变量:队列非空时唤醒消费者(工作线程)
  Condition notFull_ GUARDED_BY(mutex_);   // 条件变量:队列非满时唤醒生产者(提交任务的线程)
  string name_;                            // 线程池名称(调试/日志用)
  Task threadInitCallback_;                // 线程初始化回调(可选)
  // 工作线程列表:用unique_ptr管理Thread对象,自动释放,避免内存泄漏
  std::vector<std::unique_ptr<muduo::Thread>> threads_;
  std::deque<Task> queue_ GUARDED_BY(mutex_);  // 任务队列:deque适合头尾操作(提交/取出)
  size_t maxQueueSize_;                       // 任务队列最大长度(0=无限制)
  bool running_;                              // 线程池运行状态:true=运行中,false=停止
};

3. 关键细节深度解析

1. 核心设计模式:生产者 - 消费者模型
角色 行为 同步条件
生产者(调用 run () 的线程) 提交 Task 到 queue_,若队列满则阻塞,直到 notFull_被唤醒 队列非满(!isFull ())+ running_=true
消费者(工作线程) 从 queue_取 Task 执行,若队列为空则阻塞,直到 notEmpty_被唤醒 队列非空(!queue_.empty ())+ running_=true
2. 核心成员的设计逻辑
成员变量 核心作用 设计原因
std::vector<std::unique_ptr<Thread>> threads_ 管理工作线程的生命周期 unique_ptr 自动释放 Thread 对象,避免手动管理内存;vector 方便批量创建 / 遍历线程
std::deque<Task> queue_ 存储待执行的任务 deque 的 pop_front/push_back 是 O (1) 操作,比 vector 更适合队列场景(vector 头部删除 O (n))
notEmpty_/notFull_ 双条件变量实现生产者 - 消费者同步 单条件变量会导致频繁唤醒(惊群),双条件变量精准唤醒,提升效率
running_ 控制线程池运行状态 用于优雅停止:设置为 false 后,唤醒所有阻塞的线程,避免工作线程永久阻塞
maxQueueSize_ 限制任务队列长度 防止任务无限堆积导致内存溢出,0 表示无限制(适用于任务量可控的场景)
3. 设计亮点与注意事项
设计亮点
  1. 优雅启停stop() 先置状态再唤醒所有线程,等待线程退出,避免强制终止导致的资源泄漏;
  2. 退化逻辑:无工作线程时同步执行任务,兼容单线程场景;
  3. 高效同步:双条件变量精准唤醒,避免惊群效应(单条件变量会唤醒所有线程,大部分线程无任务可执行);
  4. 内存安全unique_ptr 管理工作线程,自动释放,无需手动析构;
  5. 调试友好:线程池 / 工作线程命名,方便日志 / 工具定位问题;
  6. 灵活扩展:支持线程初始化回调,适配自定义线程上下文(如初始化日志、设置线程名)。
注意事项
  1. 任务提交时机stop() 后调用 run() 会直接返回,任务不会被执行;
  2. 队列长度限制maxQueueSize_=0 表示无限制,需确保任务量可控,避免内存溢出;
  3. 异常处理:工作线程捕获异常并打印栈回溯,避免单个任务崩溃导致整个线程池挂掉;
  4. 线程初始化回调:必须在 start() 前设置,否则不会生效;
  5. 禁止拷贝:继承 noncopyable,避免拷贝线程池导致多个对象管理同一组线程 / 队列。

二、 ThreadPool.cc

先贴出完整代码,再逐部分解释:

// 本源代码的使用受 BSD 风格许可证约束
// 该许可证可在 License 文件中查阅。
//
// 作者:陈硕 (chenshuo at chenshuo dot com)

#include "muduo/base/ThreadPool.h"

#include "muduo/base/Exception.h"  // muduo 自定义异常类(带调用栈)

#include <assert.h>  // 断言(调试期检查逻辑正确性)
#include <stdio.h>   // 标准 IO(fprintf/abort)

using namespace muduo;

// 构造函数:初始化线程池核心成员
// @param nameArg 线程池名称(用于日志/调试区分)
ThreadPool::ThreadPool(const string& nameArg)
  : mutex_(),               // 初始化互斥锁
    notEmpty_(mutex_),      // 初始化「队列非空」条件变量(绑定互斥锁)
    notFull_(mutex_),       // 初始化「队列非满」条件变量(绑定互斥锁)
    name_(nameArg),         // 初始化线程池名称
    maxQueueSize_(0),       // 任务队列默认无界(0 表示无最大容量)
    running_(false)         // 初始状态为未运行
{
}

// 析构函数:销毁线程池时自动停止(保证资源清理)
ThreadPool::~ThreadPool()
{
  if (running_)  // 若线程池正在运行,调用 stop() 安全停止
  {
    stop();
  }
}

// 启动线程池:创建指定数量的工作线程并启动
// @param numThreads 工作线程数量(>=0,0 表示无工作线程,提交任务时直接执行)
void ThreadPool::start(int numThreads)
{
  assert(threads_.empty()); // 断言:启动前工作线程列表为空(避免重复启动)
  running_ = true;          // 标记线程池为运行状态
  threads_.reserve(numThreads); // 预分配线程列表内存(避免频繁扩容)
  
  // 循环创建指定数量的工作线程
  for (int i = 0; i < numThreads; ++i)
  {
    char id[32];
    snprintf(id, sizeof id, "%d", i+1); // 生成线程编号(1,2,...numThreads)
    // 构造 Thread 对象:绑定 runInThread 为执行函数,线程名 = 线程池名 + 编号
    // emplace_back 直接在容器内构造对象,避免拷贝
    threads_.emplace_back(new muduo::Thread(
          std::bind(&ThreadPool::runInThread, this), name_+id));
    threads_[i]->start(); // 启动当前工作线程
  }
  
  // 特殊情况:无工作线程且设置了初始化回调 → 主线程直接执行回调
  if (numThreads == 0 && threadInitCallback_)
  {
    threadInitCallback_();
  }
}

// 停止线程池:安全终止所有工作线程,等待任务执行完成
void ThreadPool::stop()
{
  {
  MutexLockGuard lock(mutex_); // 加锁修改运行状态
  running_ = false;            // 标记线程池为停止状态
  // 唤醒所有阻塞的线程:
  // 1. notEmpty_:唤醒等待任务的工作线程(使其退出循环)
  // 2. notFull_:唤醒等待提交任务的生产者线程(使其退出阻塞)
  notEmpty_.notifyAll();
  notFull_.notifyAll();
  } // 解锁(RAII 自动释放)
  
  // 等待所有工作线程退出(join),确保资源正确回收
  for (auto& thr : threads_)
  {
    thr->join();
  }
}

// 获取当前任务队列的任务数量(线程安全)
size_t ThreadPool::queueSize() const
{
  MutexLockGuard lock(mutex_); // 加锁保证读取队列大小的线程安全
  return queue_.size();        // 返回队列当前元素数量
}

// 提交任务到线程池
// 逻辑:1. 无工作线程 → 主线程直接执行任务;
//      2. 有工作线程 → 加锁后检查队列是否满,满则阻塞,否则入队并唤醒工作线程
// @param task 待提交的任务(Task 类型)
void ThreadPool::run(Task task)
{
  if (threads_.empty())  // 无工作线程:直接在当前线程执行任务
  {
    task();
  }
  else
  {
    MutexLockGuard lock(mutex_); // 加锁保护任务队列操作
    
    // 队列满且线程池运行中 → 阻塞等待「队列非满」条件
    // 用 while 循环处理虚假唤醒:即使被唤醒,仍需检查队列是否真的非满
    while (isFull() && running_)
    {
      notFull_.wait(); // 释放锁并阻塞,直到被 notify/notifyAll 唤醒
    }
    
    if (!running_) return; // 线程池已停止 → 直接返回,不提交任务
    assert(!isFull());     // 断言:队列此时非满(保证入队安全)

    // 移动语义入队:避免任务拷贝,提升性能
    queue_.push_back(std::move(task));
    notEmpty_.notify();    // 唤醒一个等待任务的工作线程(notify 而非 notifyAll,减少竞争)
  }
}

// 从任务队列取出一个任务(工作线程调用)
// 逻辑:队列为空且运行中 → 阻塞等待;队列有任务 → 取出并唤醒生产者;停止时返回空任务
// @return 待执行的任务(空任务表示线程池已停止)
ThreadPool::Task ThreadPool::take()
{
  MutexLockGuard lock(mutex_); // 加锁保护队列操作
  
  // 队列为空且线程池运行中 → 阻塞等待「队列非空」条件
  // 必须用 while 循环:处理条件变量的虚假唤醒(避免无任务时退出)
  while (queue_.empty() && running_)
  {
    notEmpty_.wait(); // 释放锁并阻塞,直到被唤醒
  }

  Task task; // 初始化空任务
  if (!queue_.empty()) // 队列有任务 → 取出队首任务
  {
    task = queue_.front();  // 获取队首任务
    queue_.pop_front();     // 移除队首任务
    // 队列有界且任务出队后 → 唤醒一个等待提交任务的生产者线程
    if (maxQueueSize_ > 0)
    {
      notFull_.notify();
    }
  }
  return task; // 返回任务(空任务表示线程池已停止)
}

// 检查任务队列是否已满(仅在持有互斥锁时调用)
// @return true=队列满,false=队列未满/无界
bool ThreadPool::isFull() const
{
  mutex_.assertLocked(); // 断言:当前线程已持有互斥锁(保证线程安全)
  // 队列有界(maxQueueSize_>0)且队列大小 >= 最大容量 → 满
  return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_;
}

// 工作线程的核心执行函数:循环取任务、执行任务
void ThreadPool::runInThread()
{
  try
  {
    // 若设置了线程初始化回调 → 执行回调(仅在线程启动时执行一次)
    if (threadInitCallback_)
    {
      threadInitCallback_();
    }

    // 线程池运行中 → 循环取任务执行
    while (running_)
    {
      Task task(take()); // 取出任务(阻塞直到有任务或线程池停止)
      if (task)          // 任务非空 → 执行任务
      {
        task();
      }
    }
  }
  catch (const Exception& ex) // 捕获 muduo 自定义异常(带调用栈)
  {
    fprintf(stderr, "线程池 %s 中捕获到异常\n", name_.c_str());
    fprintf(stderr, "原因:%s\n", ex.what());
    fprintf(stderr, "调用栈:%s\n", ex.stackTrace());
    abort(); // 终止程序(异常未处理,避免线程池异常运行)
  }
  catch (const std::exception& ex) // 捕获标准库异常
  {
    fprintf(stderr, "线程池 %s 中捕获到异常\n", name_.c_str());
    fprintf(stderr, "原因:%s\n", ex.what());
    abort();
  }
  catch (...) // 捕获所有未处理异常
  {
    fprintf(stderr, "线程池 %s 中捕获到未知异常\n", name_.c_str());
    throw; // 重新抛出(若上层无处理,最终终止程序)
  }
}

1. 代码整体核心逻辑回顾

这个实现是 ThreadPool.h 接口的具体落地,核心遵循 “生产者 - 消费者模型”

  • 生产者:调用 run() 提交任务的线程(主线程 / 其他线程),负责将任务放入队列;
  • 消费者:线程池的工作线程,负责从队列取任务执行;
  • 同步保障:用 MutexLock 保护所有共享资源(任务队列、运行状态),双条件变量 notEmpty_/notFull_ 实现精准唤醒,避免 “惊群效应”;
  • 边缘场景处理:无工作线程时同步执行任务、优雅停止、全量异常捕获、防虚假唤醒等,确保鲁棒性。

2. 逐函数拆解核心实现

1. 构造函数:初始化核心资源
ThreadPool::ThreadPool(const string& nameArg)
  : mutex_(),               // 初始化互斥锁
    notEmpty_(mutex_),      // 条件变量绑定互斥锁(必须配对)
    notFull_(mutex_),       // 双条件变量均绑定同一个mutex_
    name_(nameArg),         // 线程池名称(调试/日志用)
    maxQueueSize_(0),       // 默认队列无长度限制(0=不限制)
    running_(false)         // 初始状态:未运行
{
}

关键设计点

  • 条件变量 notEmpty_/notFull_ 必须绑定同一个 mutex_——POSIX 规范要求条件变量的 wait()/notify() 必须在同一个互斥锁保护下,否则会导致未定义行为;
  • 默认参数合理:maxQueueSize_=0 表示任务队列无长度限制,适配大多数场景;running_=false 确保线程池未 start() 前无法提交 / 执行任务。
2. 析构函数:保证优雅停止
ThreadPool::~ThreadPool()
{
  if (running_)
  {
    stop(); // 析构时若线程池仍运行,自动调用stop(),避免资源泄漏
  }
}

核心价值:即使用户忘记手动调用 stop(),析构函数也会自动触发优雅停止,确保工作线程全部退出,任务队列资源释放,避免 “僵尸线程” 或内存泄漏。

3. start():启动工作线程(核心)
void ThreadPool::start(int numThreads)
{
  assert(threads_.empty()); // 断言:避免重复启动(threads_非空说明已启动)
  running_ = true;          // 标记线程池为运行状态
  threads_.reserve(numThreads); // 预分配vector空间,避免扩容开销

  // 创建numThreads个工作线程
  for (int i = 0; i < numThreads; ++i)
  {
    char id[32];
    snprintf(id, sizeof id, "%d", i+1); // 生成线程编号(1/2/3...)
    // 创建Thread对象,绑定runInThread为执行函数,线程名=线程池名+编号(如ThreadPool1)
    threads_.emplace_back(new muduo::Thread(
          std::bind(&ThreadPool::runInThread, this), name_+id));
    threads_[i]->start(); // 启动工作线程
  }

  // 特殊场景:0个工作线程时,直接执行初始化回调(退化到单线程)
  if (numThreads == 0 && threadInitCallback_)
  {
    threadInitCallback_();
  }
}

关键设计点

  • std::bind(&ThreadPool::runInThread, this):将线程池的成员函数 runInThread 绑定为工作线程的执行函数,this 保证每个工作线程能访问当前线程池的任务队列、状态等;
  • emplace_back + unique_ptr:直接在 vector 中构造 Thread 对象,unique_ptr 自动管理线程生命周期,避免手动 delete
  • 线程命名:拼接线程池名 + 编号,方便 top/pstree 等工具调试,快速定位工作线程。
4. stop():优雅停止线程池
void ThreadPool::stop()
{
  {
  MutexLockGuard lock(mutex_); // 加锁修改状态,保证线程安全
  running_ = false;            // 标记线程池停止,工作线程会退出循环
  notEmpty_.notifyAll();       // 唤醒所有阻塞在“取任务”的消费者
  notFull_.notifyAll();        // 唤醒所有阻塞在“提交任务”的生产者
  } // 解锁:避免join时持有锁,导致死锁

  // 等待所有工作线程退出
  for (auto& thr : threads_)
  {
    thr->join();
  }
}

核心逻辑拆解

  1. 加锁修改状态running_=false 是共享变量,必须在锁保护下修改,确保所有线程能立即看到状态变化;
  2. 唤醒所有阻塞线程
    • notEmpty_.notifyAll():唤醒所有阻塞在 take() 中 “等待任务” 的工作线程,让它们检查 running_ 状态并退出;
    • notFull_.notifyAll():唤醒所有阻塞在 run() 中 “等待队列非满” 的生产者,让它们检查 running_ 状态并返回;
  3. join 所有线程:等待每个工作线程完成当前任务(或退出循环)后,释放线程资源,确保 “优雅停止” 而非强制终止。
5. queueSize():线程安全获取队列长度
size_t ThreadPool::queueSize() const
{
  MutexLockGuard lock(mutex_); // mutable mutex_ 支持const函数加锁
  return queue_.size();        // 锁保护下读取,避免数据竞争
}

关键设计点

  • const 函数 + mutable mutex_:既保证接口的 “只读” 语义,又通过加锁确保多线程下读取 queue_.size() 的原子性和可见性。
6. run():提交任务(生产者逻辑)
void ThreadPool::run(Task task)
{
  if (threads_.empty())
  {
    task(); // 无工作线程:退化到同步执行,避免任务丢失
  }
  else
  {
    MutexLockGuard lock(mutex_);
    // while循环防虚假唤醒:确保队列真的非满且线程池运行中,才提交任务
    while (isFull() && running_)
    {
      notFull_.wait(); // 队列满:阻塞,直到被消费者唤醒(取出任务后)
    }
    if (!running_) return; // 线程池已停止:直接返回,不提交任务
    assert(!isFull());     // 断言:确保队列非满,避免逻辑错误

    queue_.push_back(std::move(task)); // 任务入队(move减少拷贝)
    notEmpty_.notify();                // 唤醒消费者:队列非空,可取任务
  }
}

核心设计点

  • 退化执行:无工作线程时直接在当前线程执行任务,兼容单线程场景,避免任务堆积;
  • while 循环防虚假唤醒notFull_.wait() 可能被虚假唤醒,必须重新检查 isFull(),确保队列真的非满;
  • std::move(task)Taskstd::function,支持移动语义,避免拷贝开销(尤其大函数对象如 lambda);
  • notEmpty_.notify():仅唤醒一个消费者(notify() 而非 notifyAll()),减少惊群效应,提升效率。
7. take():取任务(消费者逻辑)
ThreadPool::Task ThreadPool::take()
{
  MutexLockGuard lock(mutex_);
  // 注释明确:必须用while循环,防虚假唤醒
  while (queue_.empty() && running_)
  {
    notEmpty_.wait(); // 队列空:阻塞,直到被生产者唤醒(提交任务后)
  }
  Task task;
  if (!queue_.empty())
  {
    task = queue_.front();    // 取队首任务
    queue_.pop_front();       // 从队列移除任务
    // 若队列有长度限制:取出任务后队列非满,唤醒生产者
    if (maxQueueSize_ > 0)
    {
      notFull_.notify();
    }
  }
  return task; // 队列为空/线程池停止:返回空Task
}

关键设计点

  • while 循环防虚假唤醒:和生产者逻辑一致,避免虚假唤醒导致工作线程取到空任务;
  • notFull_.notify():仅当队列有长度限制时才唤醒生产者 —— 若 maxQueueSize_=0(无限制),无需唤醒,生产者可一直提交任务;
  • 返回空 Task:当线程池停止(running_=false)或队列为空时,返回空 Task,工作线程会退出循环。
8. isFull():判断队列是否满(私有辅助函数)
bool ThreadPool::isFull() const
{
  mutex_.assertLocked(); // 断言:调用该函数时必须已持有mutex_(防逻辑错误)
  // 仅当maxQueueSize_>0时,才判断队列长度是否超过限制
  return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_;
}

核心价值

  • mutex_.assertLocked():是 Muduo MutexLock 提供的断言,确保调用者已加锁,提前发现 “未加锁访问共享变量” 的逻辑错误(仅在调试模式生效,不影响性能);
  • 逻辑合理:maxQueueSize_=0 时返回 false(无长度限制),只有设置了最大长度才判断队列是否满。
9. runInThread():工作线程核心循环(消费者核心)
void ThreadPool::runInThread()
{
  try
  {
    // 执行线程初始化回调(如初始化日志器、内存池)
    if (threadInitCallback_)
    {
      threadInitCallback_();
    }

    // 循环取任务执行,直到线程池停止
    while (running_)
    {
      Task task(take()); // 取任务(可能阻塞)
      if (task)          // 任务非空:执行
      {
        task();
      }
    }
  }
  // 全量异常捕获:避免单个任务崩溃导致整个工作线程挂掉
  catch (const Exception& ex)
  {
    fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
    fprintf(stderr, "reason: %s\n", ex.what());
    fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
    abort(); // 终止进程:工作线程崩溃属于严重错误,避免异常扩散
  }
  catch (const std::exception& ex)
  {
    fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
    fprintf(stderr, "reason: %s\n", ex.what());
    abort();
  }
  catch (...)
  {
    fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
    throw; // 重抛:让系统处理未知异常,生成核心转储
  }
}

关键设计点

  • 任务循环逻辑:只要 running_=true,就持续调用 take() 取任务执行 ——take() 会阻塞直到有任务或线程池停止;
  • 全量异常捕获
    • 捕获自定义 Exception(带栈回溯)、标准库异常、未知异常;
    • 打印详细错误信息(线程池名、异常原因、栈回溯),方便调试;
    • 工作线程崩溃属于严重错误,调用 abort() 终止进程(避免线程池部分失效导致业务异常);
  • 初始化回调:线程启动后先执行自定义回调,再进入任务循环,适配 “每个线程初始化独立资源” 的场景(如每个线程一个日志器)。

3. 核心设计亮点总结

设计点 作用 为什么重要
双条件变量(notEmpty_/notFull_) 精准唤醒生产者 / 消费者 避免 “惊群效应”(单条件变量会唤醒所有线程,大部分无任务可执行),提升效率
RAII 锁管理(MutexLockGuard) 自动加锁 / 解锁 杜绝锁泄漏(忘记 unlock、异常跳过 unlock),避免死锁
优雅停止(stop ()) 等待所有线程退出,不强制终止 确保工作线程完成当前任务,避免任务丢失或资源泄漏
退化执行(无线程时同步执行) 兼容单线程场景 避免用户忘记启动线程池导致任务无法执行
全量异常捕获 避免工作线程静默崩溃 打印详细错误信息,定位问题更高效,防止异常扩散
unique_ptr 管理工作线程 自动释放 Thread 对象 无需手动 delete,避免内存泄漏
线程命名(ThreadPool1/2) 调试友好 快速定位工作线程,区分不同线程池的线程

总结

  1. 核心模型:基于生产者 - 消费者模型,用互斥锁保护任务队列 / 运行状态,双条件变量实现精准同步;
  2. 线程安全:所有共享资源访问都在锁保护下,while 循环防虚假唤醒,RAII 锁杜绝泄漏;
  3. 优雅启停:析构自动调用 stop (),stop () 先置状态再唤醒所有线程,最后 join 确保线程退出;
  4. 鲁棒性:退化执行、全量异常捕获、断言检查,覆盖边缘场景,避免崩溃;
  5. 效率优化:vector 预分配空间、std::move 减少拷贝、双条件变量减少惊群。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐