解析muduo源码之 ThreadPool.h & ThreadPool.cc
本文详细介绍了Muduo库中的线程池实现ThreadPool。该线程池基于生产者-消费者模型,采用互斥锁保护共享资源,通过双条件变量(notEmpty_/notFull_)实现高效同步。核心特性包括:支持有界任务队列、线程初始化回调、优雅启停机制等。实现上使用RAII管理锁和线程资源,while循环防止虚假唤醒,全量异常捕获确保稳定性。设计亮点包括:退化执行机制(无工作线程时同步执行任务)、精准唤
·
目录
9. runInThread():工作线程核心循环(消费者核心)
一、 ThreadPool.h
先贴出完整代码,再逐部分解释:
// 本源代码的使用受 BSD 风格许可证约束
// 该许可证可在 License 文件中查阅。
//
// 作者:陈硕 (chenshuo at chenshuo dot com)
#ifndef MUDUO_BASE_THREADPOOL_H
#define MUDUO_BASE_THREADPOOL_H
#include "muduo/base/Condition.h" // 条件变量(配合互斥锁实现任务队列的等待/唤醒)
#include "muduo/base/Mutex.h" // 互斥锁(保证任务队列和线程池状态的线程安全)
#include "muduo/base/Thread.h" // 线程封装类(工作线程的创建/管理)
#include "muduo/base/Types.h" // 基础类型定义(如 string)
#include <deque> // 双端队列:存储待执行的任务(支持高效的头删尾插)
#include <vector> // 向量:存储工作线程的智能指针
namespace muduo
{
// 线程池类(不可拷贝)
// 核心特性:1. 基于「生产者-消费者」模型,主线程提交任务,工作线程执行任务;
// 2. 支持有界任务队列(可配置最大容量),队列满时提交任务会阻塞;
// 3. 支持线程初始化回调,每个工作线程启动时执行;
// 4. 安全的启停机制,停止时等待所有任务执行完成。
class ThreadPool : noncopyable
{
public:
// 任务类型:无参数、无返回值的可调用对象(函数/函数对象/lambda/绑定器等)
typedef std::function<void ()> Task;
// 构造函数:创建线程池
// @param nameArg 线程池名称(可选,用于日志/调试区分不同线程池)
explicit ThreadPool(const string& nameArg = string("ThreadPool"));
// 析构函数:销毁线程池(会自动调用 stop() 保证资源清理)
~ThreadPool();
// 设置任务队列的最大容量(必须在 start() 之前调用)
// @param maxSize 队列最大任务数,0 表示无界队列(默认)
void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; }
// 设置线程初始化回调函数(必须在 start() 之前调用)
// @param cb 每个工作线程启动后,执行第一个任务前会调用该回调
void setThreadInitCallback(const Task& cb)
{ threadInitCallback_ = cb; }
// 启动线程池:创建指定数量的工作线程并开始运行
// @param numThreads 工作线程数量(必须 >=1)
void start(int numThreads);
// 停止线程池:1. 标记 running_ 为 false,唤醒所有阻塞的工作线程;
// 2. 不再接受新任务,等待所有已提交任务执行完成;3. 等待所有工作线程退出。
void stop();
// 获取线程池名称(只读)
const string& name() const
{ return name_; }
// 获取当前任务队列的任务数量(线程安全)
size_t queueSize() const;
// 提交任务到线程池
// 特性:1. 若任务队列已满且 maxQueueSize_ > 0,会阻塞直到队列有空闲位置;
// 2. 若已调用 stop(),则立即返回(不会提交任务);
// 3. C++14 及之前的 std::function 不支持仅移动(move-only)类型,
// 因此无需像 BlockingQueue 那样重载 const& 和 && 版本的接口;
// 参考:https://stackoverflow.com/a/25408989
// @param f 要提交的任务(Task 类型)
void run(Task f);
private:
// 检查任务队列是否已满(仅在持有 mutex_ 时调用)
// REQUIRES(mutex_):Clang 线程安全注解,标记该函数必须持有 mutex_ 才能调用
bool isFull() const REQUIRES(mutex_);
// 工作线程的核心执行函数:循环从任务队列取任务并执行
void runInThread();
// 从任务队列取出一个任务(队列为空时阻塞,直到有新任务或线程池停止)
// @return 待执行的任务,若线程池停止则返回空任务
Task take();
mutable MutexLock mutex_; // 互斥锁(mutable 允许 const 函数加锁)
Condition notEmpty_ GUARDED_BY(mutex_); // 条件变量:任务队列非空时唤醒工作线程(受 mutex_ 保护)
Condition notFull_ GUARDED_BY(mutex_); // 条件变量:任务队列非满时唤醒提交任务的线程(受 mutex_ 保护)
string name_; // 线程池名称(用于日志/调试)
Task threadInitCallback_; // 线程初始化回调函数(可选)
// 工作线程列表:用 unique_ptr 管理 Thread 对象,自动释放资源
std::vector<std::unique_ptr<muduo::Thread>> threads_;
std::deque<Task> queue_ GUARDED_BY(mutex_); // 任务队列(受 mutex_ 保护,双端队列高效存取)
size_t maxQueueSize_; // 任务队列最大容量(0 表示无界)
bool running_; // 线程池运行状态(true=运行中,false=已停止)
};
} // namespace muduo
#endif // MUDUO_BASE_THREADPOOL_H
1. ThreadPool 的整体定位
- 复用工作线程:避免频繁创建 / 销毁线程的开销(线程创建是系统级开销,复用可大幅提升性能);
- 异步任务调度:生产者(主线程 / 其他线程)通过
run()提交任务,消费者(工作线程)自动取任务执行,解耦任务提交与执行; - 可控的任务队列:支持设置队列最大长度,避免任务堆积导致内存溢出;
- 优雅启停:支持
stop()优雅停止线程池(等待所有任务执行完成 / 中断,不强制终止线程); - 灵活扩展:支持线程初始化回调(每个工作线程启动时执行自定义逻辑,如初始化日志器、内存池);
- 线程安全:所有对共享资源(任务队列、线程状态)的访问都在互斥锁保护下,结合双条件变量(
notEmpty_/notFull_)实现高效同步。
它是 Muduo 中异步 IO、定时器、日志异步输出等场景的核心依赖,也是 C++ 线程池实现的经典范例。
2. 逐模块拆解核心实现
class ThreadPool : noncopyable // 禁止拷贝:线程池管理一组线程和任务队列,拷贝会导致资源混乱
{
public:
// 任务类型:封装任意可调用对象(函数、lambda、std::bind、成员函数),无参数无返回值
typedef std::function<void ()> Task;
// 构造函数:显式构造,可选指定线程池名称(调试友好)
explicit ThreadPool(const string& nameArg = string("ThreadPool"));
~ThreadPool(); // 析构时自动调用stop(),优雅停止线程池
// 配置接口(必须在start()前调用):
// 设置任务队列最大长度(0表示无限制)
void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; }
// 设置线程初始化回调:每个工作线程启动时执行的自定义逻辑
void setThreadInitCallback(const Task& cb) { threadInitCallback_ = cb; }
// 核心接口1:启动线程池,创建numThreads个工作线程
void start(int numThreads);
// 核心接口2:优雅停止线程池(等待所有工作线程退出,清空任务队列)
void stop();
// 获取线程池名称(const接口,线程安全)
const string& name() const { return name_; }
// 获取当前任务队列长度(线程安全)
size_t queueSize() const;
// 核心接口3:提交任务(可能阻塞,若队列满且线程池运行中)
// 注:C++14前std::function无移动版本,无需重载const&和&&
void run(Task f);
private:
// 私有函数1:判断任务队列是否满(需持有mutex_,REQUIRES是Clang线程安全注解)
bool isFull() const REQUIRES(mutex_);
// 私有函数2:工作线程的核心执行逻辑(循环取任务、执行任务)
void runInThread();
// 私有函数3:从任务队列取任务(阻塞直到有任务/线程池停止)
Task take();
// 核心成员变量(线程安全设计是核心)
mutable MutexLock mutex_; // 保护所有共享资源,mutable支持const函数加锁
Condition notEmpty_ GUARDED_BY(mutex_); // 条件变量:队列非空时唤醒消费者(工作线程)
Condition notFull_ GUARDED_BY(mutex_); // 条件变量:队列非满时唤醒生产者(提交任务的线程)
string name_; // 线程池名称(调试/日志用)
Task threadInitCallback_; // 线程初始化回调(可选)
// 工作线程列表:用unique_ptr管理Thread对象,自动释放,避免内存泄漏
std::vector<std::unique_ptr<muduo::Thread>> threads_;
std::deque<Task> queue_ GUARDED_BY(mutex_); // 任务队列:deque适合头尾操作(提交/取出)
size_t maxQueueSize_; // 任务队列最大长度(0=无限制)
bool running_; // 线程池运行状态:true=运行中,false=停止
};
3. 关键细节深度解析
1. 核心设计模式:生产者 - 消费者模型
| 角色 | 行为 | 同步条件 |
|---|---|---|
| 生产者(调用 run () 的线程) | 提交 Task 到 queue_,若队列满则阻塞,直到 notFull_被唤醒 | 队列非满(!isFull ())+ running_=true |
| 消费者(工作线程) | 从 queue_取 Task 执行,若队列为空则阻塞,直到 notEmpty_被唤醒 | 队列非空(!queue_.empty ())+ running_=true |
2. 核心成员的设计逻辑
| 成员变量 | 核心作用 | 设计原因 |
|---|---|---|
std::vector<std::unique_ptr<Thread>> threads_ |
管理工作线程的生命周期 | unique_ptr 自动释放 Thread 对象,避免手动管理内存;vector 方便批量创建 / 遍历线程 |
std::deque<Task> queue_ |
存储待执行的任务 | deque 的 pop_front/push_back 是 O (1) 操作,比 vector 更适合队列场景(vector 头部删除 O (n)) |
notEmpty_/notFull_ |
双条件变量实现生产者 - 消费者同步 | 单条件变量会导致频繁唤醒(惊群),双条件变量精准唤醒,提升效率 |
running_ |
控制线程池运行状态 | 用于优雅停止:设置为 false 后,唤醒所有阻塞的线程,避免工作线程永久阻塞 |
maxQueueSize_ |
限制任务队列长度 | 防止任务无限堆积导致内存溢出,0 表示无限制(适用于任务量可控的场景) |
3. 设计亮点与注意事项
设计亮点
- 优雅启停:
stop()先置状态再唤醒所有线程,等待线程退出,避免强制终止导致的资源泄漏; - 退化逻辑:无工作线程时同步执行任务,兼容单线程场景;
- 高效同步:双条件变量精准唤醒,避免惊群效应(单条件变量会唤醒所有线程,大部分线程无任务可执行);
- 内存安全:
unique_ptr管理工作线程,自动释放,无需手动析构; - 调试友好:线程池 / 工作线程命名,方便日志 / 工具定位问题;
- 灵活扩展:支持线程初始化回调,适配自定义线程上下文(如初始化日志、设置线程名)。
注意事项
- 任务提交时机:
stop()后调用run()会直接返回,任务不会被执行; - 队列长度限制:
maxQueueSize_=0表示无限制,需确保任务量可控,避免内存溢出; - 异常处理:工作线程捕获异常并打印栈回溯,避免单个任务崩溃导致整个线程池挂掉;
- 线程初始化回调:必须在
start()前设置,否则不会生效; - 禁止拷贝:继承
noncopyable,避免拷贝线程池导致多个对象管理同一组线程 / 队列。
二、 ThreadPool.cc
先贴出完整代码,再逐部分解释:
// 本源代码的使用受 BSD 风格许可证约束
// 该许可证可在 License 文件中查阅。
//
// 作者:陈硕 (chenshuo at chenshuo dot com)
#include "muduo/base/ThreadPool.h"
#include "muduo/base/Exception.h" // muduo 自定义异常类(带调用栈)
#include <assert.h> // 断言(调试期检查逻辑正确性)
#include <stdio.h> // 标准 IO(fprintf/abort)
using namespace muduo;
// 构造函数:初始化线程池核心成员
// @param nameArg 线程池名称(用于日志/调试区分)
ThreadPool::ThreadPool(const string& nameArg)
: mutex_(), // 初始化互斥锁
notEmpty_(mutex_), // 初始化「队列非空」条件变量(绑定互斥锁)
notFull_(mutex_), // 初始化「队列非满」条件变量(绑定互斥锁)
name_(nameArg), // 初始化线程池名称
maxQueueSize_(0), // 任务队列默认无界(0 表示无最大容量)
running_(false) // 初始状态为未运行
{
}
// 析构函数:销毁线程池时自动停止(保证资源清理)
ThreadPool::~ThreadPool()
{
if (running_) // 若线程池正在运行,调用 stop() 安全停止
{
stop();
}
}
// 启动线程池:创建指定数量的工作线程并启动
// @param numThreads 工作线程数量(>=0,0 表示无工作线程,提交任务时直接执行)
void ThreadPool::start(int numThreads)
{
assert(threads_.empty()); // 断言:启动前工作线程列表为空(避免重复启动)
running_ = true; // 标记线程池为运行状态
threads_.reserve(numThreads); // 预分配线程列表内存(避免频繁扩容)
// 循环创建指定数量的工作线程
for (int i = 0; i < numThreads; ++i)
{
char id[32];
snprintf(id, sizeof id, "%d", i+1); // 生成线程编号(1,2,...numThreads)
// 构造 Thread 对象:绑定 runInThread 为执行函数,线程名 = 线程池名 + 编号
// emplace_back 直接在容器内构造对象,避免拷贝
threads_.emplace_back(new muduo::Thread(
std::bind(&ThreadPool::runInThread, this), name_+id));
threads_[i]->start(); // 启动当前工作线程
}
// 特殊情况:无工作线程且设置了初始化回调 → 主线程直接执行回调
if (numThreads == 0 && threadInitCallback_)
{
threadInitCallback_();
}
}
// 停止线程池:安全终止所有工作线程,等待任务执行完成
void ThreadPool::stop()
{
{
MutexLockGuard lock(mutex_); // 加锁修改运行状态
running_ = false; // 标记线程池为停止状态
// 唤醒所有阻塞的线程:
// 1. notEmpty_:唤醒等待任务的工作线程(使其退出循环)
// 2. notFull_:唤醒等待提交任务的生产者线程(使其退出阻塞)
notEmpty_.notifyAll();
notFull_.notifyAll();
} // 解锁(RAII 自动释放)
// 等待所有工作线程退出(join),确保资源正确回收
for (auto& thr : threads_)
{
thr->join();
}
}
// 获取当前任务队列的任务数量(线程安全)
size_t ThreadPool::queueSize() const
{
MutexLockGuard lock(mutex_); // 加锁保证读取队列大小的线程安全
return queue_.size(); // 返回队列当前元素数量
}
// 提交任务到线程池
// 逻辑:1. 无工作线程 → 主线程直接执行任务;
// 2. 有工作线程 → 加锁后检查队列是否满,满则阻塞,否则入队并唤醒工作线程
// @param task 待提交的任务(Task 类型)
void ThreadPool::run(Task task)
{
if (threads_.empty()) // 无工作线程:直接在当前线程执行任务
{
task();
}
else
{
MutexLockGuard lock(mutex_); // 加锁保护任务队列操作
// 队列满且线程池运行中 → 阻塞等待「队列非满」条件
// 用 while 循环处理虚假唤醒:即使被唤醒,仍需检查队列是否真的非满
while (isFull() && running_)
{
notFull_.wait(); // 释放锁并阻塞,直到被 notify/notifyAll 唤醒
}
if (!running_) return; // 线程池已停止 → 直接返回,不提交任务
assert(!isFull()); // 断言:队列此时非满(保证入队安全)
// 移动语义入队:避免任务拷贝,提升性能
queue_.push_back(std::move(task));
notEmpty_.notify(); // 唤醒一个等待任务的工作线程(notify 而非 notifyAll,减少竞争)
}
}
// 从任务队列取出一个任务(工作线程调用)
// 逻辑:队列为空且运行中 → 阻塞等待;队列有任务 → 取出并唤醒生产者;停止时返回空任务
// @return 待执行的任务(空任务表示线程池已停止)
ThreadPool::Task ThreadPool::take()
{
MutexLockGuard lock(mutex_); // 加锁保护队列操作
// 队列为空且线程池运行中 → 阻塞等待「队列非空」条件
// 必须用 while 循环:处理条件变量的虚假唤醒(避免无任务时退出)
while (queue_.empty() && running_)
{
notEmpty_.wait(); // 释放锁并阻塞,直到被唤醒
}
Task task; // 初始化空任务
if (!queue_.empty()) // 队列有任务 → 取出队首任务
{
task = queue_.front(); // 获取队首任务
queue_.pop_front(); // 移除队首任务
// 队列有界且任务出队后 → 唤醒一个等待提交任务的生产者线程
if (maxQueueSize_ > 0)
{
notFull_.notify();
}
}
return task; // 返回任务(空任务表示线程池已停止)
}
// 检查任务队列是否已满(仅在持有互斥锁时调用)
// @return true=队列满,false=队列未满/无界
bool ThreadPool::isFull() const
{
mutex_.assertLocked(); // 断言:当前线程已持有互斥锁(保证线程安全)
// 队列有界(maxQueueSize_>0)且队列大小 >= 最大容量 → 满
return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_;
}
// 工作线程的核心执行函数:循环取任务、执行任务
void ThreadPool::runInThread()
{
try
{
// 若设置了线程初始化回调 → 执行回调(仅在线程启动时执行一次)
if (threadInitCallback_)
{
threadInitCallback_();
}
// 线程池运行中 → 循环取任务执行
while (running_)
{
Task task(take()); // 取出任务(阻塞直到有任务或线程池停止)
if (task) // 任务非空 → 执行任务
{
task();
}
}
}
catch (const Exception& ex) // 捕获 muduo 自定义异常(带调用栈)
{
fprintf(stderr, "线程池 %s 中捕获到异常\n", name_.c_str());
fprintf(stderr, "原因:%s\n", ex.what());
fprintf(stderr, "调用栈:%s\n", ex.stackTrace());
abort(); // 终止程序(异常未处理,避免线程池异常运行)
}
catch (const std::exception& ex) // 捕获标准库异常
{
fprintf(stderr, "线程池 %s 中捕获到异常\n", name_.c_str());
fprintf(stderr, "原因:%s\n", ex.what());
abort();
}
catch (...) // 捕获所有未处理异常
{
fprintf(stderr, "线程池 %s 中捕获到未知异常\n", name_.c_str());
throw; // 重新抛出(若上层无处理,最终终止程序)
}
}
1. 代码整体核心逻辑回顾
这个实现是 ThreadPool.h 接口的具体落地,核心遵循 “生产者 - 消费者模型”:
- 生产者:调用
run()提交任务的线程(主线程 / 其他线程),负责将任务放入队列; - 消费者:线程池的工作线程,负责从队列取任务执行;
- 同步保障:用
MutexLock保护所有共享资源(任务队列、运行状态),双条件变量notEmpty_/notFull_实现精准唤醒,避免 “惊群效应”; - 边缘场景处理:无工作线程时同步执行任务、优雅停止、全量异常捕获、防虚假唤醒等,确保鲁棒性。
2. 逐函数拆解核心实现
1. 构造函数:初始化核心资源
ThreadPool::ThreadPool(const string& nameArg)
: mutex_(), // 初始化互斥锁
notEmpty_(mutex_), // 条件变量绑定互斥锁(必须配对)
notFull_(mutex_), // 双条件变量均绑定同一个mutex_
name_(nameArg), // 线程池名称(调试/日志用)
maxQueueSize_(0), // 默认队列无长度限制(0=不限制)
running_(false) // 初始状态:未运行
{
}
关键设计点:
- 条件变量
notEmpty_/notFull_必须绑定同一个mutex_——POSIX 规范要求条件变量的wait()/notify()必须在同一个互斥锁保护下,否则会导致未定义行为; - 默认参数合理:
maxQueueSize_=0表示任务队列无长度限制,适配大多数场景;running_=false确保线程池未start()前无法提交 / 执行任务。
2. 析构函数:保证优雅停止
ThreadPool::~ThreadPool()
{
if (running_)
{
stop(); // 析构时若线程池仍运行,自动调用stop(),避免资源泄漏
}
}
核心价值:即使用户忘记手动调用 stop(),析构函数也会自动触发优雅停止,确保工作线程全部退出,任务队列资源释放,避免 “僵尸线程” 或内存泄漏。
3. start():启动工作线程(核心)
void ThreadPool::start(int numThreads)
{
assert(threads_.empty()); // 断言:避免重复启动(threads_非空说明已启动)
running_ = true; // 标记线程池为运行状态
threads_.reserve(numThreads); // 预分配vector空间,避免扩容开销
// 创建numThreads个工作线程
for (int i = 0; i < numThreads; ++i)
{
char id[32];
snprintf(id, sizeof id, "%d", i+1); // 生成线程编号(1/2/3...)
// 创建Thread对象,绑定runInThread为执行函数,线程名=线程池名+编号(如ThreadPool1)
threads_.emplace_back(new muduo::Thread(
std::bind(&ThreadPool::runInThread, this), name_+id));
threads_[i]->start(); // 启动工作线程
}
// 特殊场景:0个工作线程时,直接执行初始化回调(退化到单线程)
if (numThreads == 0 && threadInitCallback_)
{
threadInitCallback_();
}
}
关键设计点:
std::bind(&ThreadPool::runInThread, this):将线程池的成员函数runInThread绑定为工作线程的执行函数,this保证每个工作线程能访问当前线程池的任务队列、状态等;emplace_back+unique_ptr:直接在 vector 中构造Thread对象,unique_ptr自动管理线程生命周期,避免手动delete;- 线程命名:拼接线程池名 + 编号,方便
top/pstree等工具调试,快速定位工作线程。
4. stop():优雅停止线程池
void ThreadPool::stop()
{
{
MutexLockGuard lock(mutex_); // 加锁修改状态,保证线程安全
running_ = false; // 标记线程池停止,工作线程会退出循环
notEmpty_.notifyAll(); // 唤醒所有阻塞在“取任务”的消费者
notFull_.notifyAll(); // 唤醒所有阻塞在“提交任务”的生产者
} // 解锁:避免join时持有锁,导致死锁
// 等待所有工作线程退出
for (auto& thr : threads_)
{
thr->join();
}
}
核心逻辑拆解:
- 加锁修改状态:
running_=false是共享变量,必须在锁保护下修改,确保所有线程能立即看到状态变化; - 唤醒所有阻塞线程:
notEmpty_.notifyAll():唤醒所有阻塞在take()中 “等待任务” 的工作线程,让它们检查running_状态并退出;notFull_.notifyAll():唤醒所有阻塞在run()中 “等待队列非满” 的生产者,让它们检查running_状态并返回;
- join 所有线程:等待每个工作线程完成当前任务(或退出循环)后,释放线程资源,确保 “优雅停止” 而非强制终止。
5. queueSize():线程安全获取队列长度
size_t ThreadPool::queueSize() const
{
MutexLockGuard lock(mutex_); // mutable mutex_ 支持const函数加锁
return queue_.size(); // 锁保护下读取,避免数据竞争
}
关键设计点:
const函数 +mutable mutex_:既保证接口的 “只读” 语义,又通过加锁确保多线程下读取queue_.size()的原子性和可见性。
6. run():提交任务(生产者逻辑)
void ThreadPool::run(Task task)
{
if (threads_.empty())
{
task(); // 无工作线程:退化到同步执行,避免任务丢失
}
else
{
MutexLockGuard lock(mutex_);
// while循环防虚假唤醒:确保队列真的非满且线程池运行中,才提交任务
while (isFull() && running_)
{
notFull_.wait(); // 队列满:阻塞,直到被消费者唤醒(取出任务后)
}
if (!running_) return; // 线程池已停止:直接返回,不提交任务
assert(!isFull()); // 断言:确保队列非满,避免逻辑错误
queue_.push_back(std::move(task)); // 任务入队(move减少拷贝)
notEmpty_.notify(); // 唤醒消费者:队列非空,可取任务
}
}
核心设计点:
- 退化执行:无工作线程时直接在当前线程执行任务,兼容单线程场景,避免任务堆积;
- while 循环防虚假唤醒:
notFull_.wait()可能被虚假唤醒,必须重新检查isFull(),确保队列真的非满; std::move(task):Task是std::function,支持移动语义,避免拷贝开销(尤其大函数对象如 lambda);notEmpty_.notify():仅唤醒一个消费者(notify()而非notifyAll()),减少惊群效应,提升效率。
7. take():取任务(消费者逻辑)
ThreadPool::Task ThreadPool::take()
{
MutexLockGuard lock(mutex_);
// 注释明确:必须用while循环,防虚假唤醒
while (queue_.empty() && running_)
{
notEmpty_.wait(); // 队列空:阻塞,直到被生产者唤醒(提交任务后)
}
Task task;
if (!queue_.empty())
{
task = queue_.front(); // 取队首任务
queue_.pop_front(); // 从队列移除任务
// 若队列有长度限制:取出任务后队列非满,唤醒生产者
if (maxQueueSize_ > 0)
{
notFull_.notify();
}
}
return task; // 队列为空/线程池停止:返回空Task
}
关键设计点:
- while 循环防虚假唤醒:和生产者逻辑一致,避免虚假唤醒导致工作线程取到空任务;
notFull_.notify():仅当队列有长度限制时才唤醒生产者 —— 若maxQueueSize_=0(无限制),无需唤醒,生产者可一直提交任务;- 返回空 Task:当线程池停止(
running_=false)或队列为空时,返回空 Task,工作线程会退出循环。
8. isFull():判断队列是否满(私有辅助函数)
bool ThreadPool::isFull() const
{
mutex_.assertLocked(); // 断言:调用该函数时必须已持有mutex_(防逻辑错误)
// 仅当maxQueueSize_>0时,才判断队列长度是否超过限制
return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_;
}
核心价值:
mutex_.assertLocked():是 MuduoMutexLock提供的断言,确保调用者已加锁,提前发现 “未加锁访问共享变量” 的逻辑错误(仅在调试模式生效,不影响性能);- 逻辑合理:
maxQueueSize_=0时返回false(无长度限制),只有设置了最大长度才判断队列是否满。
9. runInThread():工作线程核心循环(消费者核心)
void ThreadPool::runInThread()
{
try
{
// 执行线程初始化回调(如初始化日志器、内存池)
if (threadInitCallback_)
{
threadInitCallback_();
}
// 循环取任务执行,直到线程池停止
while (running_)
{
Task task(take()); // 取任务(可能阻塞)
if (task) // 任务非空:执行
{
task();
}
}
}
// 全量异常捕获:避免单个任务崩溃导致整个工作线程挂掉
catch (const Exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
abort(); // 终止进程:工作线程崩溃属于严重错误,避免异常扩散
}
catch (const std::exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
abort();
}
catch (...)
{
fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
throw; // 重抛:让系统处理未知异常,生成核心转储
}
}
关键设计点:
- 任务循环逻辑:只要
running_=true,就持续调用take()取任务执行 ——take()会阻塞直到有任务或线程池停止; - 全量异常捕获:
- 捕获自定义
Exception(带栈回溯)、标准库异常、未知异常; - 打印详细错误信息(线程池名、异常原因、栈回溯),方便调试;
- 工作线程崩溃属于严重错误,调用
abort()终止进程(避免线程池部分失效导致业务异常);
- 捕获自定义
- 初始化回调:线程启动后先执行自定义回调,再进入任务循环,适配 “每个线程初始化独立资源” 的场景(如每个线程一个日志器)。
3. 核心设计亮点总结
| 设计点 | 作用 | 为什么重要 |
|---|---|---|
| 双条件变量(notEmpty_/notFull_) | 精准唤醒生产者 / 消费者 | 避免 “惊群效应”(单条件变量会唤醒所有线程,大部分无任务可执行),提升效率 |
| RAII 锁管理(MutexLockGuard) | 自动加锁 / 解锁 | 杜绝锁泄漏(忘记 unlock、异常跳过 unlock),避免死锁 |
| 优雅停止(stop ()) | 等待所有线程退出,不强制终止 | 确保工作线程完成当前任务,避免任务丢失或资源泄漏 |
| 退化执行(无线程时同步执行) | 兼容单线程场景 | 避免用户忘记启动线程池导致任务无法执行 |
| 全量异常捕获 | 避免工作线程静默崩溃 | 打印详细错误信息,定位问题更高效,防止异常扩散 |
| unique_ptr 管理工作线程 | 自动释放 Thread 对象 | 无需手动 delete,避免内存泄漏 |
| 线程命名(ThreadPool1/2) | 调试友好 | 快速定位工作线程,区分不同线程池的线程 |
总结
- 核心模型:基于生产者 - 消费者模型,用互斥锁保护任务队列 / 运行状态,双条件变量实现精准同步;
- 线程安全:所有共享资源访问都在锁保护下,while 循环防虚假唤醒,RAII 锁杜绝泄漏;
- 优雅启停:析构自动调用 stop (),stop () 先置状态再唤醒所有线程,最后 join 确保线程退出;
- 鲁棒性:退化执行、全量异常捕获、断言检查,覆盖边缘场景,避免崩溃;
- 效率优化:vector 预分配空间、std::move 减少拷贝、双条件变量减少惊群。
更多推荐

所有评论(0)