目录

一、 BoundedBlockingQueue.h

1. BoundedBlockingQueue 的整体定位

2. 核心设计拆解(对比无界 BlockingQueue)

1. 基础框架与核心成员

2. 生产者接口 put():队列满时阻塞

3. 消费者接口 take():队列空时阻塞

4. 辅助接口:精准判断队列状态

3. 核心差异对比(无界 vs 有界)

4. 核心使用示例(限流的生产者 - 消费者)

5. 核心设计亮点


一、 BoundedBlockingQueue.h

先贴出完整代码,再逐部分解释:

// 本源代码的使用受 BSD 风格许可证约束
// 该许可证可在 License 文件中查阅。
//
// 作者:陈硕 (chenshuo at chenshuo dot com)

#ifndef MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H
#define MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H

#include "muduo/base/Condition.h"  // 条件变量(配合互斥锁实现线程等待/唤醒)
#include "muduo/base/Mutex.h"      // 互斥锁(保证线程安全)

#include <boost/circular_buffer.hpp> // Boost 循环缓冲区(固定容量,高效的环形存储)
#include <assert.h>                 // 断言(调试期检查逻辑正确性)

namespace muduo
{

// 有界阻塞队列(不可拷贝)
// 核心特性:1. 固定容量,队列满时生产者阻塞;2. 队列为空时消费者阻塞;
// 3. 双条件变量分别控制「队列非空」和「队列未满」;4. 所有操作线程安全
template<typename T>
class BoundedBlockingQueue : noncopyable  // 继承 noncopyable,禁用拷贝构造/赋值
{
 public:
  // 构造函数:初始化有界队列(指定最大容量)
  // @param maxSize 队列最大容量(固定,不可修改)
  explicit BoundedBlockingQueue(int maxSize)
    : mutex_(),               // 互斥锁(默认构造)
      notEmpty_(mutex_),      // 条件变量:等待队列非空(给消费者用)
      notFull_(mutex_),       // 条件变量:等待队列未满(给生产者用)
      queue_(maxSize)         // 循环缓冲区:初始化固定容量为 maxSize
  {
  }

  // 插入元素(左值版本):队列满时阻塞,直到有空闲空间
  // @param x 要插入的左值对象(会拷贝到队列中)
  void put(const T& x)
  {
    MutexLockGuard lock(mutex_); // RAII 加锁:作用域结束自动解锁
    // 必须使用 while 循环:处理条件变量的「虚假唤醒」
    while (queue_.full())        // 队列满时,生产者阻塞
    {
      notFull_.wait();           // 释放锁并阻塞,直到被 notify 且重新获取锁
    }
    assert(!queue_.full());      // 调试期断言:队列未满(确保逻辑正确)
    queue_.push_back(x);         // 将元素插入队列尾部
    notEmpty_.notify();          // 唤醒一个等待队列非空的消费者线程
  }

  // 插入元素(右值版本):队列满时阻塞,移动语义减少拷贝
  // @param x 要插入的右值对象(如临时对象、std::move 后的对象)
  void put(T&& x)
  {
    MutexLockGuard lock(mutex_);
    while (queue_.full())
    {
      notFull_.wait();
    }
    assert(!queue_.full());
    queue_.push_back(std::move(x)); // 移动构造元素,避免拷贝
    notEmpty_.notify();
  }

  // 取出元素:队列为空时阻塞,直到有元素可用
  // @return 队列头部的元素(移动返回,减少拷贝)
  T take()
  {
    MutexLockGuard lock(mutex_);
    // 必须使用 while 循环:处理条件变量的「虚假唤醒」
    while (queue_.empty())       // 队列为空时,消费者阻塞
    {
      notEmpty_.wait();          // 释放锁并阻塞,直到被 notify 且重新获取锁
    }
    assert(!queue_.empty());     // 调试期断言:队列非空
    T front(std::move(queue_.front())); // 移动队列头部元素到临时对象
    queue_.pop_front();                // 移除队列头部元素
    notFull_.notify();                 // 唤醒一个等待队列未满的生产者线程
    return front;                      // 移动返回临时对象(C++11 移动语义)
  }

  // 判断队列是否为空(线程安全)
  bool empty() const
  {
    MutexLockGuard lock(mutex_); // 加锁后读取,避免并发修改导致的竞态
    return queue_.empty();
  }

  // 判断队列是否已满(线程安全)
  bool full() const
  {
    MutexLockGuard lock(mutex_);
    return queue_.full();
  }

  // 获取队列当前元素数量(线程安全)
  size_t size() const
  {
    MutexLockGuard lock(mutex_);
    return queue_.size();
  }

  // 获取队列最大容量(线程安全)
  size_t capacity() const
  {
    MutexLockGuard lock(mutex_);
    return queue_.capacity();
  }

 private:
  mutable MutexLock               mutex_;          // 互斥锁(mutable 允许 const 成员函数加锁)
  Condition                       notEmpty_ GUARDED_BY(mutex_); // 队列非空条件变量(受 mutex_ 保护)
  Condition                       notFull_ GUARDED_BY(mutex_);  // 队列未满条件变量(受 mutex_ 保护)
  boost::circular_buffer<T>       queue_ GUARDED_BY(mutex_);   // 底层循环缓冲区(受 mutex_ 保护)
};

}  // namespace muduo

#endif  // MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H
1. BoundedBlockingQueue 的整体定位

与之前的无界 BlockingQueue 相比,BoundedBlockingQueue固定容量的线程安全阻塞队列,核心设计目标是:

  • 解决无界队列「内存无限增长导致溢出」的问题,适配需要限流的场景(如线程池任务队列、网络数据接收队列);
  • 实现双向阻塞
    • 生产者:队列满时阻塞,直到消费者取走数据(有空闲空间);
    • 消费者:队列空时阻塞,直到生产者放入数据;
  • 底层用 boost::circular_buffer 适配固定容量场景,兼顾效率和内存可控性。
2. 核心设计拆解(对比无界 BlockingQueue
1. 基础框架与核心成员
template<typename T>
class BoundedBlockingQueue : noncopyable  // 禁止拷贝,同无界队列
{
 public:
  // 显式构造:必须指定最大容量(有界的核心)
  explicit BoundedBlockingQueue(int maxSize)
    : mutex_(),
      notEmpty_(mutex_),    // 同无界:通知消费者“队列非空”
      notFull_(mutex_),     // 新增:通知生产者“队列未满”(双向阻塞的核心)
      queue_(maxSize)       // 底层:固定容量的循环缓冲区
  {
  }

 private:
  mutable MutexLock          mutex_;
  Condition                  notEmpty_ GUARDED_BY(mutex_);
  Condition                  notFull_ GUARDED_BY(mutex_);  // 新增条件变量
  boost::circular_buffer<T>  queue_ GUARDED_BY(mutex_);    // 替换std::deque为循环缓冲区
};

核心成员差异解析:

组件 无界 BlockingQueue 有界 BoundedBlockingQueue 设计原因
条件变量 notEmpty_ notEmpty_ + notFull_ 有界队列需要双向阻塞:生产者等 “未满”,消费者等 “非空”
底层容器 std::deque<T> boost::circular_buffer<T> 循环缓冲区天然适配固定容量,内存连续(缓存友好),无碎片,头尾操作 O (1)
构造参数 无参数(无界) 必须指定 maxSize(有界) 明确队列容量上限,避免内存溢出

boost::circular_buffer 的核心优势:

  • 固定容量:构造时指定大小,满了之后不会自动扩容(配合 notFull_ 阻塞生产者);
  • 环形内存:元素存储在连续(或分段连续)的内存块中,缓存命中率远高于 std::deque(分散内存块);
  • 高效操作push_back/pop_front 无需移动元素(环形覆盖),效率与 std::deque 持平但内存更可控;
  • 原生支持 full()/empty():直接判断队列状态,无需手动维护容量计数器。
2. 生产者接口 put():队列满时阻塞
// 左值版本
void put(const T& x)
{
  MutexLockGuard lock(mutex_);
  // while循环防虚假唤醒:队列满时,生产者阻塞等待notFull_
  while (queue_.full())
  {
    notFull_.wait(); // 原子操作:释放锁 + 阻塞,直到被notFull_通知
  }
  assert(!queue_.full());
  queue_.push_back(x);       // 放入数据
  notEmpty_.notify();        // 通知消费者:队列非空,可取数据
}

// 右值版本(移动语义,同无界队列)
void put(T&& x)
{
  MutexLockGuard lock(mutex_);
  while (queue_.full())
  {
    notFull_.wait();
  }
  assert(!queue_.full());
  queue_.push_back(std::move(x));
  notEmpty_.notify();
}

关键逻辑

  • 生产者不再 “无限制放数据”:队列满时,notFull_.wait() 会释放锁并阻塞,直到消费者取走数据后调用 notFull_.notify()
  • while 循环防虚假唤醒:即使被虚假唤醒,仍会检查队列是否满,避免生产者在队列满时放入数据导致溢出;
  • 放入数据后仅通知 notEmpty_:精准唤醒消费者,而非所有线程,减少上下文切换。
3. 消费者接口 take():队列空时阻塞
T take()
{
  MutexLockGuard lock(mutex_);
  // 同无界:队列空时,消费者阻塞等待notEmpty_
  while (queue_.empty())
  {
    notEmpty_.wait();
  }
  assert(!queue_.empty());
  T front(std::move(queue_.front()));
  queue_.pop_front();        // 取出数据
  notFull_.notify();         // 新增:通知生产者“队列未满,可放数据”
  return front;
}

关键新增逻辑

  • 消费者取走数据后,必须通知 notFull_:此时队列有了空闲空间,唤醒阻塞的生产者;
  • 移动语义:同无界队列,减少大对象拷贝开销。
4. 辅助接口:精准判断队列状态
// 判断队列是否为空(加锁保证原子性)
bool empty() const
{
  MutexLockGuard lock(mutex_);
  return queue_.empty();
}

// 判断队列是否满(加锁保证原子性)
bool full() const
{
  MutexLockGuard lock(mutex_);
  return queue_.full();
}

// 当前元素数量(加锁)
size_t size() const
{
  MutexLockGuard lock(mutex_);
  return queue_.size();
}

// 队列总容量(加锁,circular_buffer的capacity()是固定值)
size_t capacity() const
{
  MutexLockGuard lock(mutex_);
  return queue_.capacity();
}

关键说明

  • 所有状态判断都加锁:circular_bufferempty()/full()/size()/capacity() 非原子操作,多线程下必须加锁保证准确性;
  • capacity() 是有界队列的特有接口:返回构造时指定的最大容量,用于业务层监控队列使用率(如限流告警)。
3. 核心差异对比(无界 vs 有界)
特性 BlockingQueue(无界) BoundedBlockingQueue(有界)
容量限制 无上限(可能内存溢出) 固定容量(构造时指定)
条件变量 notEmpty_(消费者阻塞) notEmpty_ + notFull_(双向阻塞)
生产者行为 永不阻塞 队列满时阻塞,直到有空闲空间
底层容器 std::deque(无界) boost::circular_buffer(固定容量)
内存风险 高(无限增长) 低(容量可控)
适用场景 数据量可控的异步场景(如日志队列) 需限流的高并发场景(如线程池任务队列、网络接收队列)
4. 核心使用示例(限流的生产者 - 消费者)
#include "muduo/base/BoundedBlockingQueue.h"
#include <thread>
#include <iostream>
#include <string>

// 有界队列:最大容量3
muduo::BoundedBlockingQueue<std::string> g_queue(3);

// 生产者线程:尝试放5条消息(队列满时阻塞)
void producer()
{
  for (int i = 1; i <= 5; ++i)
  {
    std::string msg = "任务" + std::to_string(i);
    std::cout << "生产者尝试放入:" << msg << "(当前队列大小:" << g_queue.size() << ")" << std::endl;
    g_queue.put(std::move(msg)); // 队列满时阻塞
    std::cout << "生产者成功放入:" << msg << std::endl;
  }
}

// 消费者线程:每秒取1条消息
void consumer()
{
  for (int i = 1; i <= 5; ++i)
  {
    std::string msg = g_queue.take(); // 队列为空时阻塞
    std::cout << "消费者取出:" << msg << "(当前队列大小:" << g_queue.size() << ")" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }
}

int main()
{
  std::thread t1(producer);
  std::thread t2(consumer);

  t1.join();
  t2.join();

  return 0;
}

输出结果(关键看生产者阻塞逻辑):

生产者尝试放入:任务1(当前队列大小:0)
生产者成功放入:任务1
生产者尝试放入:任务2(当前队列大小:1)
生产者成功放入:任务2
生产者尝试放入:任务3(当前队列大小:2)
生产者成功放入:任务3
生产者尝试放入:任务4(当前队列大小:3)
// 队列满,生产者阻塞,直到消费者取走数据
消费者取出:任务1(当前队列大小:3)
生产者成功放入:任务4
生产者尝试放入:任务5(当前队列大小:3)
// 再次阻塞,直到消费者取走数据
消费者取出:任务2(当前队列大小:3)
生产者成功放入:任务5
消费者取出:任务3(当前队列大小:2)
消费者取出:任务4(当前队列大小:1)
消费者取出:任务5(当前队列大小:0)
5. 核心设计亮点
  • 双向精准阻塞:双条件变量分别管理「队列非空」和「队列未满」,仅唤醒需要的线程(生产者 / 消费者),避免无意义的上下文切换;
  • 内存可控:固定容量 + 循环缓冲区,彻底解决无界队列的内存溢出问题,适配高并发限流场景;
  • 性能优化
    • 移动语义减少大对象拷贝;
    • boost::circular_buffer 环形内存布局提升缓存命中率,效率高于手动限制容量的 std::deque
  • 鲁棒性保障while 循环防虚假唤醒,RAII 锁避免死锁,不可拷贝设计避免多线程数据混乱。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐