解析muduo源码之 BoundedBlockingQueue.h
本文介绍了一个有界阻塞队列(BoundedBlockingQueue)的实现,该队列具有固定容量,适用于需要限流的高并发场景。主要特点包括:1) 使用双条件变量(notEmpty_和notFull_)实现双向阻塞,生产者队列满时阻塞,消费者队列空时阻塞;2) 采用boost::circular_buffer作为底层容器,提供固定容量的环形存储结构;3) 通过RAII锁和移动语义保证线程安全和高效操
·
目录
一、 BoundedBlockingQueue.h
先贴出完整代码,再逐部分解释:
// 本源代码的使用受 BSD 风格许可证约束
// 该许可证可在 License 文件中查阅。
//
// 作者:陈硕 (chenshuo at chenshuo dot com)
#ifndef MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H
#define MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H
#include "muduo/base/Condition.h" // 条件变量(配合互斥锁实现线程等待/唤醒)
#include "muduo/base/Mutex.h" // 互斥锁(保证线程安全)
#include <boost/circular_buffer.hpp> // Boost 循环缓冲区(固定容量,高效的环形存储)
#include <assert.h> // 断言(调试期检查逻辑正确性)
namespace muduo
{
// 有界阻塞队列(不可拷贝)
// 核心特性:1. 固定容量,队列满时生产者阻塞;2. 队列为空时消费者阻塞;
// 3. 双条件变量分别控制「队列非空」和「队列未满」;4. 所有操作线程安全
template<typename T>
class BoundedBlockingQueue : noncopyable // 继承 noncopyable,禁用拷贝构造/赋值
{
public:
// 构造函数:初始化有界队列(指定最大容量)
// @param maxSize 队列最大容量(固定,不可修改)
explicit BoundedBlockingQueue(int maxSize)
: mutex_(), // 互斥锁(默认构造)
notEmpty_(mutex_), // 条件变量:等待队列非空(给消费者用)
notFull_(mutex_), // 条件变量:等待队列未满(给生产者用)
queue_(maxSize) // 循环缓冲区:初始化固定容量为 maxSize
{
}
// 插入元素(左值版本):队列满时阻塞,直到有空闲空间
// @param x 要插入的左值对象(会拷贝到队列中)
void put(const T& x)
{
MutexLockGuard lock(mutex_); // RAII 加锁:作用域结束自动解锁
// 必须使用 while 循环:处理条件变量的「虚假唤醒」
while (queue_.full()) // 队列满时,生产者阻塞
{
notFull_.wait(); // 释放锁并阻塞,直到被 notify 且重新获取锁
}
assert(!queue_.full()); // 调试期断言:队列未满(确保逻辑正确)
queue_.push_back(x); // 将元素插入队列尾部
notEmpty_.notify(); // 唤醒一个等待队列非空的消费者线程
}
// 插入元素(右值版本):队列满时阻塞,移动语义减少拷贝
// @param x 要插入的右值对象(如临时对象、std::move 后的对象)
void put(T&& x)
{
MutexLockGuard lock(mutex_);
while (queue_.full())
{
notFull_.wait();
}
assert(!queue_.full());
queue_.push_back(std::move(x)); // 移动构造元素,避免拷贝
notEmpty_.notify();
}
// 取出元素:队列为空时阻塞,直到有元素可用
// @return 队列头部的元素(移动返回,减少拷贝)
T take()
{
MutexLockGuard lock(mutex_);
// 必须使用 while 循环:处理条件变量的「虚假唤醒」
while (queue_.empty()) // 队列为空时,消费者阻塞
{
notEmpty_.wait(); // 释放锁并阻塞,直到被 notify 且重新获取锁
}
assert(!queue_.empty()); // 调试期断言:队列非空
T front(std::move(queue_.front())); // 移动队列头部元素到临时对象
queue_.pop_front(); // 移除队列头部元素
notFull_.notify(); // 唤醒一个等待队列未满的生产者线程
return front; // 移动返回临时对象(C++11 移动语义)
}
// 判断队列是否为空(线程安全)
bool empty() const
{
MutexLockGuard lock(mutex_); // 加锁后读取,避免并发修改导致的竞态
return queue_.empty();
}
// 判断队列是否已满(线程安全)
bool full() const
{
MutexLockGuard lock(mutex_);
return queue_.full();
}
// 获取队列当前元素数量(线程安全)
size_t size() const
{
MutexLockGuard lock(mutex_);
return queue_.size();
}
// 获取队列最大容量(线程安全)
size_t capacity() const
{
MutexLockGuard lock(mutex_);
return queue_.capacity();
}
private:
mutable MutexLock mutex_; // 互斥锁(mutable 允许 const 成员函数加锁)
Condition notEmpty_ GUARDED_BY(mutex_); // 队列非空条件变量(受 mutex_ 保护)
Condition notFull_ GUARDED_BY(mutex_); // 队列未满条件变量(受 mutex_ 保护)
boost::circular_buffer<T> queue_ GUARDED_BY(mutex_); // 底层循环缓冲区(受 mutex_ 保护)
};
} // namespace muduo
#endif // MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H
1. BoundedBlockingQueue 的整体定位
与之前的无界 BlockingQueue 相比,BoundedBlockingQueue 是固定容量的线程安全阻塞队列,核心设计目标是:
- 解决无界队列「内存无限增长导致溢出」的问题,适配需要限流的场景(如线程池任务队列、网络数据接收队列);
- 实现双向阻塞:
- 生产者:队列满时阻塞,直到消费者取走数据(有空闲空间);
- 消费者:队列空时阻塞,直到生产者放入数据;
- 底层用
boost::circular_buffer适配固定容量场景,兼顾效率和内存可控性。
2. 核心设计拆解(对比无界 BlockingQueue)
1. 基础框架与核心成员
template<typename T>
class BoundedBlockingQueue : noncopyable // 禁止拷贝,同无界队列
{
public:
// 显式构造:必须指定最大容量(有界的核心)
explicit BoundedBlockingQueue(int maxSize)
: mutex_(),
notEmpty_(mutex_), // 同无界:通知消费者“队列非空”
notFull_(mutex_), // 新增:通知生产者“队列未满”(双向阻塞的核心)
queue_(maxSize) // 底层:固定容量的循环缓冲区
{
}
private:
mutable MutexLock mutex_;
Condition notEmpty_ GUARDED_BY(mutex_);
Condition notFull_ GUARDED_BY(mutex_); // 新增条件变量
boost::circular_buffer<T> queue_ GUARDED_BY(mutex_); // 替换std::deque为循环缓冲区
};
核心成员差异解析:
| 组件 | 无界 BlockingQueue |
有界 BoundedBlockingQueue |
设计原因 |
|---|---|---|---|
| 条件变量 | 仅 notEmpty_ |
notEmpty_ + notFull_ |
有界队列需要双向阻塞:生产者等 “未满”,消费者等 “非空” |
| 底层容器 | std::deque<T> |
boost::circular_buffer<T> |
循环缓冲区天然适配固定容量,内存连续(缓存友好),无碎片,头尾操作 O (1) |
| 构造参数 | 无参数(无界) | 必须指定 maxSize(有界) |
明确队列容量上限,避免内存溢出 |
boost::circular_buffer 的核心优势:
- 固定容量:构造时指定大小,满了之后不会自动扩容(配合
notFull_阻塞生产者); - 环形内存:元素存储在连续(或分段连续)的内存块中,缓存命中率远高于
std::deque(分散内存块); - 高效操作:
push_back/pop_front无需移动元素(环形覆盖),效率与std::deque持平但内存更可控; - 原生支持
full()/empty():直接判断队列状态,无需手动维护容量计数器。
2. 生产者接口 put():队列满时阻塞
// 左值版本
void put(const T& x)
{
MutexLockGuard lock(mutex_);
// while循环防虚假唤醒:队列满时,生产者阻塞等待notFull_
while (queue_.full())
{
notFull_.wait(); // 原子操作:释放锁 + 阻塞,直到被notFull_通知
}
assert(!queue_.full());
queue_.push_back(x); // 放入数据
notEmpty_.notify(); // 通知消费者:队列非空,可取数据
}
// 右值版本(移动语义,同无界队列)
void put(T&& x)
{
MutexLockGuard lock(mutex_);
while (queue_.full())
{
notFull_.wait();
}
assert(!queue_.full());
queue_.push_back(std::move(x));
notEmpty_.notify();
}
关键逻辑:
- 生产者不再 “无限制放数据”:队列满时,
notFull_.wait()会释放锁并阻塞,直到消费者取走数据后调用notFull_.notify(); while循环防虚假唤醒:即使被虚假唤醒,仍会检查队列是否满,避免生产者在队列满时放入数据导致溢出;- 放入数据后仅通知
notEmpty_:精准唤醒消费者,而非所有线程,减少上下文切换。
3. 消费者接口 take():队列空时阻塞
T take()
{
MutexLockGuard lock(mutex_);
// 同无界:队列空时,消费者阻塞等待notEmpty_
while (queue_.empty())
{
notEmpty_.wait();
}
assert(!queue_.empty());
T front(std::move(queue_.front()));
queue_.pop_front(); // 取出数据
notFull_.notify(); // 新增:通知生产者“队列未满,可放数据”
return front;
}
关键新增逻辑:
- 消费者取走数据后,必须通知
notFull_:此时队列有了空闲空间,唤醒阻塞的生产者; - 移动语义:同无界队列,减少大对象拷贝开销。
4. 辅助接口:精准判断队列状态
// 判断队列是否为空(加锁保证原子性)
bool empty() const
{
MutexLockGuard lock(mutex_);
return queue_.empty();
}
// 判断队列是否满(加锁保证原子性)
bool full() const
{
MutexLockGuard lock(mutex_);
return queue_.full();
}
// 当前元素数量(加锁)
size_t size() const
{
MutexLockGuard lock(mutex_);
return queue_.size();
}
// 队列总容量(加锁,circular_buffer的capacity()是固定值)
size_t capacity() const
{
MutexLockGuard lock(mutex_);
return queue_.capacity();
}
关键说明:
- 所有状态判断都加锁:
circular_buffer的empty()/full()/size()/capacity()非原子操作,多线程下必须加锁保证准确性; capacity()是有界队列的特有接口:返回构造时指定的最大容量,用于业务层监控队列使用率(如限流告警)。
3. 核心差异对比(无界 vs 有界)
| 特性 | BlockingQueue(无界) |
BoundedBlockingQueue(有界) |
|---|---|---|
| 容量限制 | 无上限(可能内存溢出) | 固定容量(构造时指定) |
| 条件变量 | 仅 notEmpty_(消费者阻塞) |
notEmpty_ + notFull_(双向阻塞) |
| 生产者行为 | 永不阻塞 | 队列满时阻塞,直到有空闲空间 |
| 底层容器 | std::deque(无界) |
boost::circular_buffer(固定容量) |
| 内存风险 | 高(无限增长) | 低(容量可控) |
| 适用场景 | 数据量可控的异步场景(如日志队列) | 需限流的高并发场景(如线程池任务队列、网络接收队列) |
4. 核心使用示例(限流的生产者 - 消费者)
#include "muduo/base/BoundedBlockingQueue.h"
#include <thread>
#include <iostream>
#include <string>
// 有界队列:最大容量3
muduo::BoundedBlockingQueue<std::string> g_queue(3);
// 生产者线程:尝试放5条消息(队列满时阻塞)
void producer()
{
for (int i = 1; i <= 5; ++i)
{
std::string msg = "任务" + std::to_string(i);
std::cout << "生产者尝试放入:" << msg << "(当前队列大小:" << g_queue.size() << ")" << std::endl;
g_queue.put(std::move(msg)); // 队列满时阻塞
std::cout << "生产者成功放入:" << msg << std::endl;
}
}
// 消费者线程:每秒取1条消息
void consumer()
{
for (int i = 1; i <= 5; ++i)
{
std::string msg = g_queue.take(); // 队列为空时阻塞
std::cout << "消费者取出:" << msg << "(当前队列大小:" << g_queue.size() << ")" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
int main()
{
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
输出结果(关键看生产者阻塞逻辑):
生产者尝试放入:任务1(当前队列大小:0)
生产者成功放入:任务1
生产者尝试放入:任务2(当前队列大小:1)
生产者成功放入:任务2
生产者尝试放入:任务3(当前队列大小:2)
生产者成功放入:任务3
生产者尝试放入:任务4(当前队列大小:3)
// 队列满,生产者阻塞,直到消费者取走数据
消费者取出:任务1(当前队列大小:3)
生产者成功放入:任务4
生产者尝试放入:任务5(当前队列大小:3)
// 再次阻塞,直到消费者取走数据
消费者取出:任务2(当前队列大小:3)
生产者成功放入:任务5
消费者取出:任务3(当前队列大小:2)
消费者取出:任务4(当前队列大小:1)
消费者取出:任务5(当前队列大小:0)
5. 核心设计亮点
- 双向精准阻塞:双条件变量分别管理「队列非空」和「队列未满」,仅唤醒需要的线程(生产者 / 消费者),避免无意义的上下文切换;
- 内存可控:固定容量 + 循环缓冲区,彻底解决无界队列的内存溢出问题,适配高并发限流场景;
- 性能优化:
- 移动语义减少大对象拷贝;
boost::circular_buffer环形内存布局提升缓存命中率,效率高于手动限制容量的std::deque;
- 鲁棒性保障:
while循环防虚假唤醒,RAII 锁避免死锁,不可拷贝设计避免多线程数据混乱。
更多推荐


所有评论(0)