解析muduo源码之 BlockingQueue.h
本文详细分析了Muduo网络库中的BlockingQueue实现,这是一个基于std::deque的线程安全阻塞队列。该队列通过互斥锁(MutexLock)和条件变量(Condition)实现线程安全,支持生产者-消费者模型。核心特性包括:1) put()方法支持左右值插入,利用移动语义减少拷贝;2) take()方法采用while循环防止虚假唤醒;3) 提供drain()方法批量获取数据;4)
·
目录
一、 BlockingQueue.h
先贴出完整代码,再逐部分解释:
// 本源代码的使用受 BSD 风格许可证约束
// 该许可证可在 License 文件中查阅。
//
// 作者:陈硕 (chenshuo at chenshuo dot com)
#ifndef MUDUO_BASE_BLOCKINGQUEUE_H
#define MUDUO_BASE_BLOCKINGQUEUE_H
#include "muduo/base/Condition.h" // 条件变量(配合互斥锁实现线程等待/唤醒)
#include "muduo/base/Mutex.h" // 互斥锁(保证线程安全)
#include <deque> // 双端队列(作为底层存储容器)
#include <assert.h> // 断言(调试期检查逻辑正确性)
namespace muduo
{
// 线程安全的阻塞队列(不可拷贝)
// 核心特性:1. 取数据时若队列为空则阻塞;2. 放数据时唤醒等待的取数据线程;3. 所有操作线程安全
template<typename T>
class BlockingQueue : noncopyable // 继承 noncopyable,禁用拷贝构造/赋值
{
public:
// 类型别名:底层存储容器的类型(std::deque<T>)
using queue_type = std::deque<T>;
// 构造函数:初始化互斥锁、条件变量、空队列
BlockingQueue()
: mutex_(), // 互斥锁(默认构造)
notEmpty_(mutex_), // 条件变量(关联到 mutex_,用于等待队列非空)
queue_() // 空队列
{
}
// 插入元素(左值版本):向队列尾部插入左值对象
// @param x 要插入的左值对象(会拷贝到队列中)
void put(const T& x)
{
MutexLockGuard lock(mutex_); // RAII 加锁:作用域结束自动解锁
queue_.push_back(x); // 将元素插入队列尾部
notEmpty_.notify(); // 唤醒一个等待队列非空的线程(取数据线程)
// wait morphing 优化:此处解锁前唤醒线程,可减少上下文切换
// 参考:http://www.domaigne.com/blog/computing/condvars-signal-with-mutex-locked-or-not/
}
// 插入元素(右值版本):向队列尾部插入右值对象(移动语义,减少拷贝)
// @param x 要插入的右值对象(如临时对象、std::move 后的对象)
void put(T&& x)
{
MutexLockGuard lock(mutex_);
queue_.push_back(std::move(x)); // 移动构造元素,避免拷贝
notEmpty_.notify();
}
// 取出元素:队列为空时阻塞,直到有元素可用
// @return 队列头部的元素(移动返回,减少拷贝)
T take()
{
MutexLockGuard lock(mutex_);
// 必须使用 while 循环而非 if:处理「虚假唤醒」(条件变量可能无原因唤醒)
// 即使被唤醒,也要再次检查队列是否真的非空
while (queue_.empty())
{
notEmpty_.wait(); // 释放锁并阻塞,直到被 notify 且重新获取锁
}
assert(!queue_.empty()); // 调试期断言:队列非空(确保逻辑正确)
T front(std::move(queue_.front())); // 移动队列头部元素到临时对象
queue_.pop_front(); // 移除队列头部元素
return front; // 移动返回临时对象(C++11 移动语义)
}
// 取出所有元素:原子性地取出队列中所有元素(清空队列)
// @return 包含所有元素的新队列(移动语义,无拷贝)
queue_type drain()
{
std::deque<T> queue; // 临时队列,用于存储取出的元素
{
MutexLockGuard lock(mutex_); // 加锁后原子性操作
queue = std::move(queue_); // 移动队列所有元素到临时队列(原队列变为空)
assert(queue_.empty()); // 调试期断言:原队列已清空
} // 作用域结束,自动解锁
return queue; // 移动返回临时队列
}
// 获取队列当前大小(线程安全)
size_t size() const
{
MutexLockGuard lock(mutex_); // 加锁后读取,避免并发修改导致的竞态
return queue_.size();
}
private:
mutable MutexLock mutex_; // 互斥锁(mutable 允许 const 成员函数加锁)
Condition notEmpty_ GUARDED_BY(mutex_); // 条件变量(GUARDED_BY:注解,表示受 mutex_ 保护)
queue_type queue_ GUARDED_BY(mutex_); // 底层队列(GUARDED_BY:注解,表示访问需持有 mutex_)
}; // __attribute__ ((aligned (64))); // 可选:64字节对齐,避免缓存行伪共享(注释掉不影响功能)
} // namespace muduo
#endif // MUDUO_BASE_BLOCKINGQUEUE_H
1. BlockingQueue 的整体定位
BlockingQueue 是 Muduo 中基于 std::deque 实现的线程安全阻塞队列,核心适配「生产者 - 消费者模型」,解决多线程间异步数据传递的问题:
- 生产者:调用
put()向队列放数据,无需阻塞(队列无上限,Muduo 场景下通常配合业务限流使用); - 消费者:调用
take()从队列取数据,队列为空时阻塞等待,直到有数据放入; - 核心特性:线程安全(互斥锁 + 条件变量)、支持移动语义(减少拷贝)、不可拷贝(避免多线程数据混乱)、泛化支持任意类型元素。
2. 逐模块拆解核心实现
1. 基础框架与核心成员
template<typename T>
class BlockingQueue : noncopyable // 禁止拷贝:多线程场景下拷贝队列会导致数据混乱
{
public:
using queue_type = std::deque<T>; // 底层容器:双端队列,头尾操作效率O(1)
// 构造函数:初始化互斥锁、条件变量、队列
BlockingQueue()
: mutex_(),
notEmpty_(mutex_), // 条件变量必须关联互斥锁(Muduo的Condition设计)
queue_()
{
}
private:
mutable MutexLock mutex_; // mutable:const成员函数(如size())也能加锁
Condition notEmpty_ GUARDED_BY(mutex_); // 条件变量:通知“队列非空”,GUARDED_BY是注解(提示受mutex_保护)
queue_type queue_ GUARDED_BY(mutex_); // 底层队列:所有操作必须加锁
};
核心细节:
noncopyable继承:禁止拷贝构造和赋值运算符,避免多线程场景下队列拷贝导致的竞态条件;std::deque选择:相比std::vector,deque 头尾插入 / 删除无需移动元素,效率更高;相比std::queue(适配器),直接用 deque 更灵活(如drain()批量取数据);mutable MutexLock:size()是 const 成员函数,但需要加锁读取队列大小,mutable 允许 const 函数修改 mutex_(锁的状态变化不影响队列逻辑上的 “只读”)。
2. 生产者接口:put()(支持移动语义)
// 重载1:接收左值(如已存在的对象)
void put(const T& x)
{
MutexLockGuard lock(mutex_); // RAII加锁,作用域结束自动解锁
queue_.push_back(x); // 左值拷贝入队
notEmpty_.notify(); // 通知消费者:队列已有数据,可唤醒wait()
}
// 重载2:接收右值(如临时对象),减少拷贝
void put(T&& x)
{
MutexLockGuard lock(mutex_);
queue_.push_back(std::move(x)); // 移动构造入队,避免拷贝(大对象场景效率提升明显)
notEmpty_.notify();
}
关键设计:
- RAII 锁(MutexLockGuard):保证锁的正确释放 —— 即使 push_back 抛出异常,作用域结束时 lock 析构会自动解锁,避免死锁;
- 移动语义:右值重载用
std::move,比如传递大对象(如日志字符串、任务函数)时,直接移动而非拷贝,大幅降低开销; - 条件变量通知:
notify()唤醒至少一个等待在notEmpty_上的消费者线程(take()中的wait()),无需唤醒所有线程,减少上下文切换。
3. 消费者接口:take()(阻塞取数据,防虚假唤醒)
T take()
{
MutexLockGuard lock(mutex_);
// 必须用while而非if:防止“虚假唤醒(spurious wakeup)”
while (queue_.empty())
{
notEmpty_.wait(); // 原子操作:释放mutex_ + 阻塞,直到被notify()唤醒,唤醒后重新加锁
}
assert(!queue_.empty()); // 确保队列非空,避免逻辑错误
T front(std::move(queue_.front())); // 移动构造返回值,减少拷贝
queue_.pop_front(); // 移除队首元素
return front; // 返回移动后的对象
}
核心重点:为什么用 while 而非 if?
- 条件变量存在「虚假唤醒」:操作系统可能无原因唤醒
wait()的线程(POSIX 标准允许),此时队列仍为空; - 若用
if (queue_.empty()):虚假唤醒后直接执行后续逻辑,访问空队列的front()会崩溃; - 用
while:唤醒后重新检查队列是否为空,为空则继续阻塞,彻底避免虚假唤醒导致的错误。
4. 批量取数据:drain()
queue_type drain()
{
std::deque<T> queue;
{ // 缩小锁的作用域:仅在移动数据时加锁,返回时已解锁
MutexLockGuard lock(mutex_);
queue = std::move(queue_); // 移动队列:原queue_变为空,无拷贝开销
assert(queue_.empty());
}
return queue; // 返回移动后的队列,编译器优化为移动构造
}
设计目的:
- 用于批量处理数据(如异步日志线程批量取日志消息),减少加锁次数(一次取所有数据,而非多次
take()); - 锁的作用域仅覆盖数据移动,返回时已解锁,避免长时间持有锁导致生产者阻塞。
5. 队列大小:size()
size_t size() const
{
MutexLockGuard lock(mutex_); // queue_.size()非原子操作,必须加锁保证准确
return queue_.size();
}
关键说明:
std::deque::size()不是原子操作,多线程下若不加锁,可能读取到中间状态(如生产者正在 push_back,size 计算错误);- const 成员函数加锁依赖
mutex_是mutable—— 锁的状态变化不影响队列 “逻辑只读” 的语义。
3. 核心使用示例(生产者 - 消费者模型)
#include "muduo/base/BlockingQueue.h"
#include <thread>
#include <iostream>
#include <string>
// 全局阻塞队列:传递字符串消息
muduo::BlockingQueue<std::string> g_queue;
// 生产者线程:每秒放一条消息
void producer()
{
int count = 0;
while (count < 5)
{
std::string msg = "消息" + std::to_string(++count);
g_queue.put(std::move(msg)); // 移动语义,减少拷贝
std::cout << "生产者放入:" << msg << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
// 消费者线程:阻塞取消息,取完5条退出
void consumer()
{
int count = 0;
while (count < 5)
{
std::string msg = g_queue.take(); // 队列为空时阻塞
std::cout << "消费者取出:" << msg << std::endl;
++count;
}
}
int main()
{
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
输出结果(顺序可能因线程调度略有差异):
生产者放入:消息1
消费者取出:消息1
生产者放入:消息2
消费者取出:消息2
生产者放入:消息3
消费者取出:消息3
生产者放入:消息4
消费者取出:消息4
生产者放入:消息5
消费者取出:消息5
4. 核心设计亮点
- 性能优化:全链路移动语义(put 右值、take 移动返回、drain 移动队列),大幅减少大对象的拷贝开销;
- 线程安全保障:
- RAII 锁避免死锁;
- while 循环防虚假唤醒;
- 不可拷贝设计避免多线程数据混乱;
- 精准同步:仅用一个
notEmpty_条件变量(Muduo 场景下队列无上限,无需notFull_),减少条件变量数量,降低上下文切换; - 泛化设计:模板类支持任意类型元素(如任务函数、日志消息、网络数据包),适配所有生产者 - 消费者场景。
更多推荐

所有评论(0)