目录

一、 BlockingQueue.h

1. BlockingQueue 的整体定位

2. 逐模块拆解核心实现

1. 基础框架与核心成员

2. 生产者接口:put()(支持移动语义)

3. 消费者接口:take()(阻塞取数据,防虚假唤醒)

4. 批量取数据:drain()

5. 队列大小:size()

3. 核心使用示例(生产者 - 消费者模型)

4. 核心设计亮点


一、 BlockingQueue.h

先贴出完整代码,再逐部分解释:

// 本源代码的使用受 BSD 风格许可证约束
// 该许可证可在 License 文件中查阅。
//
// 作者:陈硕 (chenshuo at chenshuo dot com)

#ifndef MUDUO_BASE_BLOCKINGQUEUE_H
#define MUDUO_BASE_BLOCKINGQUEUE_H

#include "muduo/base/Condition.h"  // 条件变量(配合互斥锁实现线程等待/唤醒)
#include "muduo/base/Mutex.h"      // 互斥锁(保证线程安全)

#include <deque>    // 双端队列(作为底层存储容器)
#include <assert.h> // 断言(调试期检查逻辑正确性)

namespace muduo
{

// 线程安全的阻塞队列(不可拷贝)
// 核心特性:1. 取数据时若队列为空则阻塞;2. 放数据时唤醒等待的取数据线程;3. 所有操作线程安全
template<typename T>
class BlockingQueue : noncopyable  // 继承 noncopyable,禁用拷贝构造/赋值
{
 public:
  // 类型别名:底层存储容器的类型(std::deque<T>)
  using queue_type = std::deque<T>;

  // 构造函数:初始化互斥锁、条件变量、空队列
  BlockingQueue()
    : mutex_(),               // 互斥锁(默认构造)
      notEmpty_(mutex_),      // 条件变量(关联到 mutex_,用于等待队列非空)
      queue_()                // 空队列
  {
  }

  // 插入元素(左值版本):向队列尾部插入左值对象
  // @param x 要插入的左值对象(会拷贝到队列中)
  void put(const T& x)
  {
    MutexLockGuard lock(mutex_); // RAII 加锁:作用域结束自动解锁
    queue_.push_back(x);         // 将元素插入队列尾部
    notEmpty_.notify();          // 唤醒一个等待队列非空的线程(取数据线程)
    // wait morphing 优化:此处解锁前唤醒线程,可减少上下文切换
    // 参考:http://www.domaigne.com/blog/computing/condvars-signal-with-mutex-locked-or-not/
  }

  // 插入元素(右值版本):向队列尾部插入右值对象(移动语义,减少拷贝)
  // @param x 要插入的右值对象(如临时对象、std::move 后的对象)
  void put(T&& x)
  {
    MutexLockGuard lock(mutex_);
    queue_.push_back(std::move(x)); // 移动构造元素,避免拷贝
    notEmpty_.notify();
  }

  // 取出元素:队列为空时阻塞,直到有元素可用
  // @return 队列头部的元素(移动返回,减少拷贝)
  T take()
  {
    MutexLockGuard lock(mutex_);
    // 必须使用 while 循环而非 if:处理「虚假唤醒」(条件变量可能无原因唤醒)
    // 即使被唤醒,也要再次检查队列是否真的非空
    while (queue_.empty())
    {
      notEmpty_.wait(); // 释放锁并阻塞,直到被 notify 且重新获取锁
    }
    assert(!queue_.empty()); // 调试期断言:队列非空(确保逻辑正确)
    T front(std::move(queue_.front())); // 移动队列头部元素到临时对象
    queue_.pop_front();                // 移除队列头部元素
    return front;                      // 移动返回临时对象(C++11 移动语义)
  }

  // 取出所有元素:原子性地取出队列中所有元素(清空队列)
  // @return 包含所有元素的新队列(移动语义,无拷贝)
  queue_type drain()
  {
    std::deque<T> queue; // 临时队列,用于存储取出的元素
    {
      MutexLockGuard lock(mutex_); // 加锁后原子性操作
      queue = std::move(queue_);   // 移动队列所有元素到临时队列(原队列变为空)
      assert(queue_.empty());      // 调试期断言:原队列已清空
    } // 作用域结束,自动解锁
    return queue; // 移动返回临时队列
  }

  // 获取队列当前大小(线程安全)
  size_t size() const
  {
    MutexLockGuard lock(mutex_); // 加锁后读取,避免并发修改导致的竞态
    return queue_.size();
  }

 private:
  mutable MutexLock mutex_;          // 互斥锁(mutable 允许 const 成员函数加锁)
  Condition         notEmpty_ GUARDED_BY(mutex_); // 条件变量(GUARDED_BY:注解,表示受 mutex_ 保护)
  queue_type        queue_ GUARDED_BY(mutex_);    // 底层队列(GUARDED_BY:注解,表示访问需持有 mutex_)
};  // __attribute__ ((aligned (64))); // 可选:64字节对齐,避免缓存行伪共享(注释掉不影响功能)

}  // namespace muduo

#endif  // MUDUO_BASE_BLOCKINGQUEUE_H
1. BlockingQueue 的整体定位

BlockingQueue 是 Muduo 中基于 std::deque 实现的线程安全阻塞队列,核心适配「生产者 - 消费者模型」,解决多线程间异步数据传递的问题:

  • 生产者:调用 put() 向队列放数据,无需阻塞(队列无上限,Muduo 场景下通常配合业务限流使用);
  • 消费者:调用 take() 从队列取数据,队列为空时阻塞等待,直到有数据放入;
  • 核心特性:线程安全(互斥锁 + 条件变量)、支持移动语义(减少拷贝)、不可拷贝(避免多线程数据混乱)、泛化支持任意类型元素。
2. 逐模块拆解核心实现
1. 基础框架与核心成员
template<typename T>
class BlockingQueue : noncopyable  // 禁止拷贝:多线程场景下拷贝队列会导致数据混乱
{
 public:
  using queue_type = std::deque<T>; // 底层容器:双端队列,头尾操作效率O(1)

  // 构造函数:初始化互斥锁、条件变量、队列
  BlockingQueue()
    : mutex_(),
      notEmpty_(mutex_),  // 条件变量必须关联互斥锁(Muduo的Condition设计)
      queue_()
  {
  }

 private:
  mutable MutexLock mutex_;          // mutable:const成员函数(如size())也能加锁
  Condition         notEmpty_ GUARDED_BY(mutex_); // 条件变量:通知“队列非空”,GUARDED_BY是注解(提示受mutex_保护)
  queue_type        queue_ GUARDED_BY(mutex_);    // 底层队列:所有操作必须加锁
};

核心细节

  • noncopyable 继承:禁止拷贝构造和赋值运算符,避免多线程场景下队列拷贝导致的竞态条件;
  • std::deque 选择:相比 std::vector,deque 头尾插入 / 删除无需移动元素,效率更高;相比 std::queue(适配器),直接用 deque 更灵活(如 drain() 批量取数据);
  • mutable MutexLocksize() 是 const 成员函数,但需要加锁读取队列大小,mutable 允许 const 函数修改 mutex_(锁的状态变化不影响队列逻辑上的 “只读”)。
2. 生产者接口:put()(支持移动语义)
// 重载1:接收左值(如已存在的对象)
void put(const T& x)
{
  MutexLockGuard lock(mutex_); // RAII加锁,作用域结束自动解锁
  queue_.push_back(x);         // 左值拷贝入队
  notEmpty_.notify();          // 通知消费者:队列已有数据,可唤醒wait()
}

// 重载2:接收右值(如临时对象),减少拷贝
void put(T&& x)
{
  MutexLockGuard lock(mutex_);
  queue_.push_back(std::move(x)); // 移动构造入队,避免拷贝(大对象场景效率提升明显)
  notEmpty_.notify();
}

关键设计

  • RAII 锁(MutexLockGuard):保证锁的正确释放 —— 即使 push_back 抛出异常,作用域结束时 lock 析构会自动解锁,避免死锁;
  • 移动语义:右值重载用 std::move,比如传递大对象(如日志字符串、任务函数)时,直接移动而非拷贝,大幅降低开销;
  • 条件变量通知notify() 唤醒至少一个等待在 notEmpty_ 上的消费者线程(take() 中的 wait()),无需唤醒所有线程,减少上下文切换。
3. 消费者接口:take()(阻塞取数据,防虚假唤醒)
T take()
{
  MutexLockGuard lock(mutex_);
  // 必须用while而非if:防止“虚假唤醒(spurious wakeup)”
  while (queue_.empty())
  {
    notEmpty_.wait(); // 原子操作:释放mutex_ + 阻塞,直到被notify()唤醒,唤醒后重新加锁
  }
  assert(!queue_.empty()); // 确保队列非空,避免逻辑错误
  T front(std::move(queue_.front())); // 移动构造返回值,减少拷贝
  queue_.pop_front();                 // 移除队首元素
  return front;                       // 返回移动后的对象
}

核心重点:为什么用 while 而非 if?

  • 条件变量存在「虚假唤醒」:操作系统可能无原因唤醒 wait() 的线程(POSIX 标准允许),此时队列仍为空;
  • 若用 if (queue_.empty()):虚假唤醒后直接执行后续逻辑,访问空队列的 front() 会崩溃;
  • while:唤醒后重新检查队列是否为空,为空则继续阻塞,彻底避免虚假唤醒导致的错误。
4. 批量取数据:drain()
queue_type drain()
{
  std::deque<T> queue;
  { // 缩小锁的作用域:仅在移动数据时加锁,返回时已解锁
    MutexLockGuard lock(mutex_);
    queue = std::move(queue_); // 移动队列:原queue_变为空,无拷贝开销
    assert(queue_.empty());
  }
  return queue; // 返回移动后的队列,编译器优化为移动构造
}

设计目的

  • 用于批量处理数据(如异步日志线程批量取日志消息),减少加锁次数(一次取所有数据,而非多次 take());
  • 锁的作用域仅覆盖数据移动,返回时已解锁,避免长时间持有锁导致生产者阻塞。
5. 队列大小:size()
size_t size() const
{
  MutexLockGuard lock(mutex_); // queue_.size()非原子操作,必须加锁保证准确
  return queue_.size();
}

关键说明

  • std::deque::size() 不是原子操作,多线程下若不加锁,可能读取到中间状态(如生产者正在 push_back,size 计算错误);
  • const 成员函数加锁依赖 mutex_mutable—— 锁的状态变化不影响队列 “逻辑只读” 的语义。
3. 核心使用示例(生产者 - 消费者模型)
#include "muduo/base/BlockingQueue.h"
#include <thread>
#include <iostream>
#include <string>

// 全局阻塞队列:传递字符串消息
muduo::BlockingQueue<std::string> g_queue;

// 生产者线程:每秒放一条消息
void producer()
{
  int count = 0;
  while (count < 5)
  {
    std::string msg = "消息" + std::to_string(++count);
    g_queue.put(std::move(msg)); // 移动语义,减少拷贝
    std::cout << "生产者放入:" << msg << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }
}

// 消费者线程:阻塞取消息,取完5条退出
void consumer()
{
  int count = 0;
  while (count < 5)
  {
    std::string msg = g_queue.take(); // 队列为空时阻塞
    std::cout << "消费者取出:" << msg << std::endl;
    ++count;
  }
}

int main()
{
  std::thread t1(producer);
  std::thread t2(consumer);

  t1.join();
  t2.join();

  return 0;
}

输出结果(顺序可能因线程调度略有差异):

生产者放入:消息1
消费者取出:消息1
生产者放入:消息2
消费者取出:消息2
生产者放入:消息3
消费者取出:消息3
生产者放入:消息4
消费者取出:消息4
生产者放入:消息5
消费者取出:消息5
4. 核心设计亮点
  • 性能优化:全链路移动语义(put 右值、take 移动返回、drain 移动队列),大幅减少大对象的拷贝开销;
  • 线程安全保障
    • RAII 锁避免死锁;
    • while 循环防虚假唤醒;
    • 不可拷贝设计避免多线程数据混乱;
  • 精准同步:仅用一个 notEmpty_ 条件变量(Muduo 场景下队列无上限,无需 notFull_),减少条件变量数量,降低上下文切换;
  • 泛化设计:模板类支持任意类型元素(如任务函数、日志消息、网络数据包),适配所有生产者 - 消费者场景。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐