一、核心背景:为什么需要阻塞队列?

在多线程场景中,如果直接使用普通队列作为线程间数据传递的载体,会面临两个致命问题:

  1. 线程安全问题:多个线程同时读写队列(临界资源)会导致数据竞争,出现队列元素丢失、重复消费等诡异现象;
  2. 忙等问题:生产者不停检查队列是否满、消费者不停检查队列是否空,会浪费大量 CPU 资源(典型的 “空转”)。

阻塞队列的核心价值就在于:

  • ✅ 内置互斥锁,保证队列操作的原子性
  • ✅ 结合条件变量,实现 “队列满则生产者阻塞、队列空则消费者阻塞” 的优雅等待
  • 自动唤醒机制,队列状态变化时主动唤醒等待的线程,避免 CPU 空转。

什么是原子性?
原子性是保证多线程操作安全的三大核心特性(原子性、可见性、有序性)之首,也是理解 “数据竞争”“线程安全” 的基础。简单来说,原子性就是指一个操作(或一系列操作)要么完整执行完毕,要么完全不执行,中间不会被任何其他线程打断,就像原子一样不可分割、不可中断。

什么是自动唤醒机制?
自动唤醒机制 是解决 “生产者 - 消费者模型忙等问题” 的核心,也是条件变量(pthread_cond_t)最核心的价值所在。简单来说,自动唤醒机制是指当某个条件满足时(比如队列从满变有空位、从空变有元素),系统自动唤醒等待该条件的线程,让其从阻塞状态转为可运行状态,无需线程轮询检查条件—— 这彻底告别了 “线程死循环检查条件” 的低效模式,是多线程同步的 “优雅解法”。

二、基础封装:RAII 风格的锁与条件变量

在 Linux 下,原生的 pthread_mutex_t(互斥锁)和 pthread_cond_t(条件变量)需要手动初始化和销毁,稍有不慎就会出现 “锁未释放”“条件变量未销毁” 等资源泄漏问题。我们先用RAII(资源获取即初始化) 思想封装这两个核心组件,让资源管理自动化。

2.1 互斥锁(Mutex)的 RAII 封装

 
 #pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>

namespace MutexModule
{
    class Mutex
    {
    public:
        // 构造时初始化锁
        Mutex()
        {
            pthread_mutex_init(&_mutex, nullptr);
        }

        // 加锁
        void Lock()
        {
            int n = pthread_mutex_lock(&_mutex);
            if (n != 0)
            {
                std::cerr << "pthread_mutex_lock failed! error code: " << n
                          << ", error info: " << strerror(n) << std::endl;
                // 加锁失败根据业务场景处理:
                // 1. 致命错误(如锁已销毁):终止程序
                // 2. 非致命错误(如EINTR被信号中断):可重试
                exit(EXIT_FAILURE);
            }
        }

        // 解锁
        void Unlock()
        {
            int n = pthread_mutex_unlock(&_mutex);
            if (n != 0)
            {
                std::cerr << "pthread_mutex_unlock failed! error code: " << n
                          << ", error info: " << strerror(n) << std::endl;
                // 解锁失败常见原因:锁未被当前线程持有、锁已销毁等,均为致命错误
                exit(EXIT_FAILURE);
            }
        }

        // 析构时销毁锁,自动释放资源
        ~Mutex()
        {
            pthread_mutex_destroy(&_mutex);
        }

        // 获取原生锁对象(给条件变量使用)
        pthread_mutex_t *Get() { return &_mutex; }

    private:
        pthread_mutex_t _mutex;
    };

    // 锁守卫:构造加锁,析构解锁,杜绝忘记解锁的问题
    class LockGuard
    {
    public:
        LockGuard(Mutex & mutex) : _mutex(mutex) { _mutex.Lock(); }
        ~LockGuard() { _mutex.Unlock(); }

    private:
        Mutex & _mutex; // 引用而非拷贝,保证操作的是同一个锁
    };
}

关键解读:

  • Mutex类封装了原生互斥锁的创建、加锁、解锁、销毁,生命周期与对象绑定;
  • LockGuard是 “锁守卫”,利用栈对象的析构特性,即使函数异常退出也能自动解锁(解决了手动解锁的最大痛点)。

2.2 条件变量(Cond)的 RAII 封装

条件变量用于线程间的 “等待 - 唤醒” 通信,必须配合互斥锁使用

 #pragma once
#include <pthread.h>
#include "mutex.hpp"

namespace CondModule
{
    class Cond
    {
    public:
        Cond()
        {
            pthread_cond_init(&_cond, nullptr);
        }

        // 等待条件变量(需传入互斥锁,底层会自动解锁+等待,被唤醒后重新加锁)
        void Wait(MutexModule::Mutex & mutex)
        {
            pthread_cond_wait(&_cond, mutex.Get());
        }

        // 唤醒一个等待该条件变量的线程
        void Singal()
        {
            pthread_cond_signal(&_cond);
        }

        // 唤醒所有等待该条件变量的线程(广播)
        void Broadcast()
        {
            pthread_cond_broadcast(&_cond);
        }

        ~Cond()
        {
            pthread_cond_destroy(&_cond);
        }

    private:
        pthread_cond_t _cond;
    };
}

关键解读:

  • pthread_cond_wait 是核心:调用时会先释放传入的互斥锁,然后阻塞等待;被唤醒后会重新获取该锁,保证从等待到执行的原子性;
  • 区分Singal()和Broadcast():前者唤醒一个线程,后者唤醒所有线程;生产者 - 消费者模型中用Singal()更高效(避免 “惊群效应”)。

三、核心实现:线程安全的阻塞队列

3.1 完整代码实现

#pragma once
#include <iostream>
#include <unistd.h>
#include <queue>
#include "mutex.hpp"
#include "cond.hpp"

using namespace CondModule;
using namespace MutexModule;

// 默认队列容量
const int DEFAULT_CAPACITY = 5;

template <class T>
class BlockQueue
{
    // 判断队列是否满(私有辅助函数)
    bool IsFull() { return _q.size() >= _capa; }

    // 判断队列是否空(私有辅助函数)
    bool IsEmpty() { return _q.empty(); }

public:
    // 构造函数:初始化容量和休眠计数
    BlockQueue(int capa = DEFAULT_CAPACITY)
        : _capa(capa), _csleep_num(0), _psleep_num(0) {}

    // 生产者入队:队列满则阻塞
    void Push(const T &in)
    {
        // 自动加锁,析构时解锁(RAII)
        LockGuard lockGuard(_Mutex);

        // 注意:必须用while而非if!防止“虚假唤醒”
        while (IsFull())
        {
            _psleep_num++; // 记录休眠的生产者数量
            std::cout << "[生产者] 队列满,进入休眠 | 休眠数:" << _psleep_num << std::endl;
            _Full_cond.Wait(_Mutex); // 阻塞等待队列有空位
            _psleep_num--; // 被唤醒后,休眠数减1
        }

        // 队列有空间,入队
        _q.push(in);
        std::cout << "[生产者] 生产元素,队列当前大小:" << _q.size() << std::endl;

        // 如果有消费者休眠,唤醒一个消费者
        if (_csleep_num > 0)
        {
            _Empty_cond.Singal();
            std::cout << "[生产者] 唤醒消费者 | 消费者休眠数:" << _csleep_num << std::endl;
        }
    }

    // 消费者出队:队列空则阻塞
    T Pop()
    {
        // 自动加锁
        LockGuard lockguard(_Mutex);

        // 同样用while防止虚假唤醒
        while (IsEmpty())
        {
            _csleep_num++; // 记录休眠的消费者数量
            std::cout << "[消费者] 队列空,进入休眠 | 休眠数:" << _csleep_num << std::endl;
            _Empty_cond.Wait(_Mutex); // 阻塞等待队列有元素
            _csleep_num--; // 被唤醒后,休眠数减1
        }

        // 队列有元素,出队
        T data = _q.front();
        _q.pop();
        std::cout << "[消费者] 消费元素,队列当前大小:" << _q.size() << std::endl;

        // 如果有生产者休眠,唤醒一个生产者
        if (_psleep_num > 0)
        {
            _Full_cond.Singal();
            std::cout << "[消费者] 唤醒生产者 | 生产者休眠数:" << _psleep_num << std::endl;
        }

        return data;
    }

    // 析构函数:资源自动销毁(锁和条件变量的析构由自身类完成)
    ~BlockQueue() = default;

private:
    std::queue<T> _q;          // 底层存储容器(临界资源)
    int _capa;                 // 队列最大容量
    Mutex _Mutex;              // 保护队列的互斥锁
    Cond _Full_cond;           // 队列满时的等待条件
    Cond _Empty_cond;          // 队列空时的等待条件
    int _csleep_num;           // 休眠的消费者数量(仅用于日志)
    int _psleep_num;           // 休眠的生产者数量(仅用于日志)
};

3.2 核心关键点深度解析

(1)为什么用 while 判断队列状态,而非 if

这是阻塞队列实现的重中之重!pthread_cond_wait 存在 “虚假唤醒” 问题:即使没有调用Singal()/Broadcast(),等待的线程也可能被唤醒(操作系统层面的机制)。

如果用if (IsFull()):

  • 线程被虚假唤醒后,直接跳过判断,执行_q.push(in),此时队列可能依然是满的,导致队列溢出

用while (IsFull()):

  • 被唤醒后会重新检查队列状态,如果依然满 / 空,就继续等待,彻底规避虚假唤醒的风险

(2)LockGuard 的妙用
在Push()和Pop()中,我们没有手动调用Lock()/Unlock(),而是直接创建LockGuard对象:

  • 构造LockGuard时自动加锁;

  • 函数执行完毕(或异常退出)时,LockGuard析构,自动解锁;

  • 彻底杜绝 “忘记解锁”“异常导致锁无法释放” 的死锁问题

(3)休眠计数的意义
_csleep_num和_psleep_num并非功能必需,但能帮我们直观监控线程状态:

  • 避免 “盲目唤醒”:只有存在休眠线程时才调用Singal(),减少不必要的系统调用;
  • 方便调试:通过日志清晰看到有多少生产者 / 消费者在等待,快速定位问题。

(4)唤醒操作的时机
代码中唤醒操作(Singal())放在 “修改队列后 + 解锁前”,是最优选择:

  • 先修改队列状态,再唤醒线程,保证被唤醒的线程能直接操作队列(无需再次等待);
  • 解锁前唤醒,线程被唤醒后可直接获取锁,减少上下文切换。

四、实战落地:生产者 - 消费者模型的两种玩法

阻塞队列的核心价值是承载 “任务”,我们演示两种典型的任务形式:结构化任务(类对象)、函数任务(std::function)。

本套阻塞队列实现基于模板化设计,具备良好的类型通用性与扩展性。其底层存储容器支持任意数据类型的适配,既兼容int等基础数据类型的直接入队 / 出队操作,也可无缝承载自定义结构化任务(如加法计算任务类)。该设计充分满足了后续任务化场景的落地需求,可灵活适配不同形态的任务载体,为生产者 - 消费者模型的多样化任务处理提供了统一的底层支撑

4.1 玩法 1:结构化任务(加法计算)

因为代码是采用了模板,并且

#include "BlockQueue.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>

// 结构化任务类:封装加法计算
class Task
{
public:
    Task() = default;
    Task(int x, int y) : _x(x), _y(y) {}
    
    // 任务执行逻辑
    void Execute() { _result = _x + _y; }
    
    // 获取任务参数/结果
    int X() { return _x; }
    int Y() { return _y; }
    int Result() { return _result; }

private:
    int _x, _y;    // 任务参数
    int _result;   // 任务结果
};

// 消费者线程函数:处理加法任务
void *consumer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    while (true)
    {
        sleep(1); // 模拟消费耗时
        Task t = bq->Pop();
        t.Execute(); // 执行任务
        std::cout << "[消费者] 计算结果:" << t.X() << "+" << t.Y() << "=" << t.Result() << std::endl;
    }
}

// 生产者线程函数:生产加法任务
void *productor(void *args)
{
    int x = 1, y = 1;
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    while (true)
    {
        std::cout << "[生产者] 生产任务:" << x << "+" << y << "=?" << std::endl;
        Task t(x, y);
        bq->Push(t); // 任务入队
        x++, y++;
        usleep(500000); // 控制生产速度
    }
}

int main()
{
    BlockQueue<Task> *bq = new BlockQueue<Task>();

    // 创建生产者/消费者线程
    pthread_t c, p;
    pthread_create(&c, nullptr, consumer, bq);
    pthread_create(&p, nullptr, productor, bq);

    // 等待线程退出(实际场景可做优雅退出逻辑)
    pthread_join(c, nullptr);
    pthread_join(p, nullptr);

    delete bq;
    return 0;
}

4.2 玩法 2:函数任务(下载模拟)

#include "BlockQueue.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <functional>

// 定义任务类型:无参无返回值的函数
using task_t = std::function<void()>;

// 模拟下载任务
void Download()
{
    std::cout << "[任务执行] 开始下载文件...(耗时3秒)" << std::endl;
    sleep(3);
    std::cout << "[任务执行] 文件下载完成!" << std::endl;
}

// 消费者线程:执行函数任务
void *consumer(void *args)
{
    BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);
    while (true)
    {
        task_t t = bq->Pop();
        t(); // 执行任务(调用Download函数)
    }
}

// 生产者线程:生产函数任务
void *productor(void *args)
{
    BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);
    while (true)
    {
        std::cout << "[生产者] 生产下载任务" << std::endl;
        bq->Push(Download); // 函数入队
        usleep(1000000); // 1秒生产一个任务
    }
}

int main()
{
    BlockQueue<task_t> *bq = new BlockQueue<task_t>();

    // 创建1个消费者、1个生产者
    pthread_t c[1], p[1];
    pthread_create(c, nullptr, consumer, bq);
    pthread_create(p, nullptr, productor, bq);

    // 等待线程
    pthread_join(c[0], nullptr);
    pthread_join(p[0], nullptr);

    delete bq;
    return 0;
}

编译运行后:

[生产者] 生产下载任务
[消费者] 队列空,进入休眠 | 休眠数:1
[生产者] 生产元素,队列当前大小:1
[生产者] 唤醒消费者 | 消费者休眠数:1
[任务执行] 开始下载文件...(耗时3秒)
[任务执行] 文件下载完成!
[生产者] 生产下载任务
[生产者] 生产元素,队列当前大小:1
...

总结

本文从 “解决多线程同步问题” 的核心需求出发,完成了一套工业级的阻塞队列实现:

  1. 用 RAII 封装锁和条件变量,杜绝资源泄漏和死锁;
  2. 基于 while 循环规避条件变量的虚假唤醒,保证队列安全;
  3. 实现通用的阻塞队列模板,支持任意类型的任务;
  4. 落地两种典型的生产者 - 消费者模型,覆盖结构化任务和函数任务场景。

阻塞队列是线程池、消息队列等高级并发组件的基础,掌握其实现原理,能让你对多线程同步的理解更上一层楼。核心记住:锁保证原子性,条件变量保证同步,RAII 保证资源安全。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐