案例1: 最基础的生产者消费者模型

#include <stdio.h>      // 标准输入输出(printf)
#include <pthread.h>    // POSIX线程库(线程创建/互斥锁/条件变量)
#include <unistd.h>     // 睡眠函数(sleep)
#include <stdlib.h>     // 标准库(内存分配/退出等,此处暂未用到)
 
#define BUF_SIZE 3  // 缓冲区大小:最多存放3个数据
int g_buf[BUF_SIZE]; // 共享缓冲区:存储生产/消费的数据
int g_idx = 0;      // 缓冲区当前已存元素个数(共享资源,需互斥保护)
 
// 同步原语:互斥锁+两个条件变量(分别协调生产者/消费者)
pthread_mutex_t g_mutex;          // 互斥锁:保护共享资源(g_buf/g_idx)的排他访问
pthread_cond_t g_cond_prod;       // 生产者条件变量:缓冲区有空位时唤醒生产者
pthread_cond_t g_cond_cons;       // 消费者条件变量:缓冲区有数据时唤醒消费者
 
// 生产者线程函数:循环往缓冲区放入5个数据
// 参数:arg - 线程入参(此处未使用),返回值:线程退出状态(此处返回NULL)
void* producer(void* arg) {
    // 生产5个数据(循环5次)
    for (int i = 0; i < 5; i++) {
        // 1. 加互斥锁:保护共享资源(g_buf/g_idx),防止多线程同时修改
        //    加锁后,其他线程调用pthread_mutex_lock会阻塞,直到当前线程解锁
        pthread_mutex_lock(&g_mutex);
 
        // 2. 检查缓冲区是否满:如果满了,生产者需要等待消费者取数据
        //    注意:必须用while而非if!避免「虚假唤醒」(条件变量被唤醒后,条件可能已不满足)
        //    比如:多个生产者被唤醒时,缓冲区可能已被其他生产者占满
        while (g_idx == BUF_SIZE) {
            // pthread_cond_wait做两件事:
            //   a. 原子操作:解锁g_mutex + 阻塞当前线程,等待被唤醒
            //   b. 被唤醒后:继续从这个代码开始执行, 重新加锁g_mutex,然后退出wait继续执行
            pthread_cond_wait(&g_cond_prod, &g_mutex);
        }
 
        // 3. 生产数据:往缓冲区放入当前数值i
        g_buf[g_idx++] = i; // g_idx先赋值,再自增(等价于g_buf[g_idx]=i; g_idx++;)
        // 打印生产信息:pthread_self()获取当前线程ID,g_idx是当前缓冲区数据量
        printf("生产者[%ld]:放入%d,缓冲区数量=%d\n", pthread_self(), i, g_idx);
 
        // 4. 唤醒消费者:缓冲区已有数据,通知消费者可以取数据了
        //    pthread_cond_signal:唤醒等待该条件变量的任意一个线程(此处只有1个消费者)
        //    若有多个消费者,可用pthread_cond_broadcast唤醒所有
        pthread_cond_signal(&g_cond_cons);
 
        // 5. 解锁互斥锁:释放共享资源,让其他线程(消费者)可以访问
        pthread_mutex_unlock(&g_mutex);
        
        // 模拟生产耗时(比如生产一个数据需要1秒),让出CPU,让消费者有机会执行
        sleep(1);
    }
    // 线程正常退出,返回NULL
    return NULL;
}
 
// 消费者线程函数:循环从缓冲区取出5个数据
// 参数:arg - 线程入参(此处未使用),返回值:线程退出状态(此处返回NULL)
void* consumer(void* arg) {
    // 消费5个数据(与生产者生产数量一致)
    for (int i = 0; i < 5; i++) {
        // 1. 加互斥锁:保护共享资源,防止与生产者同时修改g_buf/g_idx
        pthread_mutex_lock(&g_mutex);
 
        // 2. 检查缓冲区是否空:如果空了,消费者需要等待生产者放数据
        //    同样用while避免虚假唤醒(比如多个消费者被唤醒时,缓冲区已空)
        while (g_idx == 0) {
            // 阻塞当前线程,等待生产者的唤醒信号(g_cond_cons)
            // 等待期间会自动解锁g_mutex,被唤醒后重新加锁
            pthread_cond_wait(&g_cond_cons, &g_mutex);
        }
 
        // 3. 消费数据:从缓冲区取出最后一个数据
        int val = g_buf[--g_idx]; // g_idx先自减,再赋值(等价于g_idx--; val=g_buf[g_idx];)
        // 打印消费信息:当前线程ID + 取出的数据 + 剩余缓冲区数量
        printf("消费者[%ld]:取出%d,缓冲区数量=%d\n", pthread_self(), val, g_idx);
 
        // 4. 唤醒生产者:缓冲区有空位了,通知生产者可以继续生产
        pthread_cond_signal(&g_cond_prod);
 
        // 5. 解锁互斥锁:释放共享资源,让生产者可以访问
        pthread_mutex_unlock(&g_mutex);
        
        // 模拟消费耗时(比如消费一个数据需要1秒),让出CPU,让生产者有机会执行
        sleep(1);
    }
    // 线程正常退出,返回NULL
    return NULL;
}
 
int main() {
    // 定义线程ID:分别存储生产者/消费者线程的ID
    pthread_t tid_prod, tid_cons;
 
    // 初始化同步原语:必须先初始化,才能使用
    // 第二个参数为NULL:使用默认属性
    pthread_mutex_init(&g_mutex, NULL);          // 初始化互斥锁
    pthread_cond_init(&g_cond_prod, NULL);       // 初始化生产者条件变量
    pthread_cond_init(&g_cond_cons, NULL);       // 初始化消费者条件变量
 
    // 创建线程
    // 参数:线程ID指针、线程属性(NULL=默认)、线程函数、函数入参(NULL)
    pthread_create(&tid_prod, NULL, producer, NULL); // 创建生产者线程
    pthread_create(&tid_cons, NULL, consumer, NULL); // 创建消费者线程
 
    // 等待线程结束:主线程阻塞,直到生产者/消费者线程执行完毕
    // 第二个参数为NULL:不接收线程的返回值
    pthread_join(tid_prod, NULL); // 等待生产者线程退出
    pthread_join(tid_cons, NULL); // 等待消费者线程退出
 
    // 销毁同步原语:释放系统资源,避免内存泄漏
    pthread_mutex_destroy(&g_mutex);          // 销毁互斥锁
    pthread_cond_destroy(&g_cond_prod);       // 销毁生产者条件变量
    pthread_cond_destroy(&g_cond_cons);       // 销毁消费者条件变量
 
    return 0; // 主线程正常退出
}

案例2: 信号量

我就用信号量最经典的**“限流”**场景来举例:只有 3 个窗口的银行,来了 10 个客户办理业务。

在这个例子中,你会看到:

  1. 信号量 充当了“叫号机”和“柜员空闲数”。

  2. 只有 3 个线程能同时运行(办理业务)。

  3. 其他 7 个线程必须阻塞等待,直到有人办完离开。

信号量(Semaphore)-CSDN博客https://blog.csdn.net/Howrun777/article/details/157183697?sharetype=blogdetail&sharerId=157183697&sharerefer=PC&sharesource=Howrun777&spm=1011.2480.3001.8118代码实现

这个代码不需要 pthread_cond,也不需要 while 循环检查,逻辑非常直线条。

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <semaphore.h> // 必须引入信号量头文件

#define NUM_WINDOWS 3   // 只有3个柜台窗口
#define NUM_CUSTOMERS 10 // 来了10个客户

sem_t sem_windows; // 定义信号量

// 客户线程函数
void* customer(void* arg) {
    int id = *(int*)arg;
    free(arg); // 释放传进来的内存

    printf("--- 客户 %d 到达银行,正在排队...\n", id);

    // [P操作] 申请资源
    // 信号量 > 0 则减1并进入;
    // 信号量 = 0 则阻塞在这里睡觉
    sem_wait(&sem_windows);

    // --- 临界区(能走到这里说明拿到窗口了) ---
    printf("+++ 客户 %d 抢到了窗口,正在办理业务...\n", id);
    
    sleep(2); // 模拟办理业务耗时 2秒

    printf("<<< 客户 %d 办理完毕,离开窗口\n", id);
    
    // [V操作] 释放资源
    // 信号量 + 1,如果有人在排队,会唤醒其中一个
    sem_post(&sem_windows);

    return NULL;
}

int main() {
    pthread_t tids[NUM_CUSTOMERS];

    // 初始化信号量
    // 第二个参数 0 表示线程间共享
    // 第三个参数 NUM_WINDOWS (3) 是信号量的初始值!表示有3个空位
    sem_init(&sem_windows, 0, NUM_WINDOWS);

    // 创建 10 个客户线程
    for (int i = 0; i < NUM_CUSTOMERS; i++) {
        int* p_id = malloc(sizeof(int)); // 动态分配内存传参
        *p_id = i;
        pthread_create(&tids[i], NULL, customer, p_id);
        
        // 稍微间隔一下让打印好看点,不是必须的
        usleep(100000); 
    }

    // 等待所有线程结束
    for (int i = 0; i < NUM_CUSTOMERS; i++) {
        pthread_join(tids[i], NULL);
    }

    // 销毁信号量
    sem_destroy(&sem_windows);

    return 0;
}

案例3: 环形队列

生产者消费者模型,环形队列 ,信号量, 且单独实现环形队列类

这是一个非常好的练习,能够帮助你理解如何从 面向过程 (C语言) 转向 面向对象 (C++) 的思维方式。在 C++ 中,我们通常会利用 RAII (资源获取即初始化) 技术来管理锁和信号量,利用 模板 (Template) 来让队列支持任意数据类型。

以下是完整的 C++ 实现,包含三个部分:

  1. Semaphore 类:将 POSIX sem_t 封装成 C++ 类(为了 RAII 管理)。

  2. RingQueue 类:线程安全的泛型环形队列。

  3. 主程序:生产者与消费者逻辑。

完整代码 (C++ Standard 11/14/17/20 兼容)

为了保证代码在 Linux/Unix 环境下与之前的 C 代码逻辑一致,底层依然使用 <semaphore.h>,但用 C++ 类进行了封装。

#include <iostream>     // 标准输入输出(cout/endl)
#include <vector>       // 动态数组,用于实现环形队列的缓冲区
#include <thread>       // C++11线程库(线程创建/线程ID获取)
#include <mutex>        // C++11互斥锁(lock_guard)
#include <semaphore.h>  // POSIX信号量(sem_init/sem_wait/sem_post)
#include <unistd.h>     // sleep函数(模拟生产/消费耗时)

// ---------------------------------------------------------
// 1. 信号量封装类 (RAII风格)
// 核心设计:利用C++的构造/析构自动管理信号量生命周期,避免手动销毁导致的资源泄漏
// ---------------------------------------------------------
class Semaphore {
private:
    sem_t _sem;  // POSIX信号量原生对象

public:
    // 构造函数:初始化信号量
    // 参数value:信号量初始值(>=0)
    // sem_init参数说明:
    //   &_sem:要初始化的信号量对象
    //   0:信号量作用域(0=进程内线程共享,非0=进程间共享)
    //   value:信号量初始值(可用资源数)
    Semaphore(int value) {
        sem_init(&_sem, 0, value);
    }

    // 析构函数:销毁信号量(RAII核心:对象销毁时自动释放资源)
    // 避免手动调用sem_destroy遗漏导致的系统资源泄漏
    ~Semaphore() {
        sem_destroy(&_sem);
    }

    // P操作 (Wait/减操作):申请资源,若无可用则阻塞
    // 作用:信号量值-1;若值<0,当前线程阻塞,直到有其他线程调用signal()
    void wait() {
        sem_wait(&_sem);
    }

    // V操作 (Post/Signal/加操作):释放资源,唤醒阻塞线程
    // 作用:信号量值+1;若有线程阻塞在wait(),则唤醒其中一个
    void signal() {
        sem_post(&_sem);
    }
};

// ---------------------------------------------------------
// 2. 线程安全的环形队列类 (模板类)
// 核心设计:
//   - 环形队列:通过head/tail取模实现缓冲区循环复用
//   - 同步机制:信号量(控制空/数据数量)+ 互斥锁(保护临界区)
//   - 线程安全:所有对head/tail/buffer的操作都加锁保护
// ---------------------------------------------------------
template <typename T>  // 模板类:支持任意类型的数据存储
class RingQueue {
private:
    std::vector<T> _buffer; // 环形缓冲区:替代原生数组,自动管理内存(无需手动new/delete)
    int _capacity;          // 队列总容量(缓冲区最大可存储元素数)
    int _head;              // 读索引(消费者使用):指向下一个要取出的元素位置
    int _tail;              // 写索引(生产者使用):指向下一个要写入的元素位置

    // 同步工具:保证多线程访问的安全性和顺序性
    std::mutex _mtx;        // 互斥锁:保护head/tail/_buffer的原子操作(临界区)
    Semaphore _sem_empty;   // 信号量:表示缓冲区的「空闲位置数」(生产者可用资源)
    Semaphore _sem_data;    // 信号量:表示缓冲区的「已存储数据数」(消费者可用资源)

public:
    // 构造函数:初始化环形队列的容量和同步工具
    // 参数cap:队列容量(必须>0)
    RingQueue(int cap) 
        : _capacity(cap),        // 初始化队列总容量
          _buffer(cap),          // 初始化缓冲区大小为cap
          _head(0),              // 读索引初始化为0
          _tail(0),              // 写索引初始化为0
          _sem_empty(cap),       // 初始空闲位置数 = 总容量(队列空)
          _sem_data(0)           // 初始数据数 = 0(队列空)
    {}

    // 禁止拷贝构造和赋值运算符重载
    // 原因:队列包含互斥锁、信号量等不可拷贝的同步对象,拷贝会导致未定义行为
    RingQueue(const RingQueue&) = delete;
    RingQueue& operator=(const RingQueue&) = delete;

    // 入队操作 (生产者调用):将数据写入环形队列
    // 参数data:要写入队列的数据(const& 避免拷贝,提高效率)
    void push(const T& data) {
        // 1. P操作:申请「空闲位置」资源
        //    若队列已满(_sem_empty值=0),生产者线程阻塞,直到消费者取出数据释放空位
        _sem_empty.wait();

        // 2. 加互斥锁:应对多个生产者, 保护临界区(_tail/_buffer的修改)
        //    lock_guard是RAII风格锁:作用域结束时自动解锁,避免手动解锁遗漏导致死锁
        {   // 局部作用域:限制lock_guard的生命周期,解锁时机更精准
            std::lock_guard<std::mutex> lock(_mtx);
            
            // --- 临界区开始:仅允许一个线程执行 ---
            _buffer[_tail] = data;                // 将数据写入当前写索引位置
            _tail = (_tail + 1) % _capacity;      // 写索引后移,取模实现「环形」(到末尾则回到0)
            // --- 临界区结束 ---
        } 
        // 出作用域,lock_guard析构,_mtx自动解锁

        // 3. V操作:释放「数据」资源(通知消费者有新数据可消费)
        //    _sem_data值+1,若有消费者阻塞在pop()的wait(),则唤醒其中一个
        _sem_data.signal();
    }

    // 出队操作 (消费者调用):从环形队列取出数据
    // 参数out_data:输出参数,用于接收取出的数据(指针避免返回值拷贝)
    void pop(T* out_data) {
        // 1. P操作:申请「数据」资源
        //    若队列为空(_sem_data值=0),消费者线程阻塞,直到生产者写入数据
        _sem_data.wait();

        // 2. 加互斥锁:应对多个消费者, 保护临界区(_head/_buffer的读取)
        {
            std::lock_guard<std::mutex> lock(_mtx);

            // --- 临界区开始 ---
            *out_data = _buffer[_head];           // 从当前读索引位置取出数据
            _head = (_head + 1) % _capacity;      // 读索引后移,取模实现环形
            // --- 临界区结束 ---
        }

        // 3. V操作:释放「空闲位置」资源(通知生产者有新空位可写入)
        //    _sem_empty值+1,若有生产者阻塞在push()的wait(),则唤醒其中一个
        _sem_empty.signal();
    }
};

// ---------------------------------------------------------
// 3. 主程序逻辑:生产者-消费者模型测试
// ---------------------------------------------------------

// 全局的环形队列实例:容量为5的整数队列(生产者和消费者共享)
RingQueue<int> g_queue(5);

// 生产者线程函数:无限循环生产数据并写入队列
void producer() {
    int i = 0; // 生产的数据值(从0开始递增)
    while (true) { // 死循环持续生产
        // 打印生产信息:输出线程ID和生产的数据值
        std::cout << "生产者 [" << std::this_thread::get_id() << "] 生产: " << i << std::endl;
        
        // 将数据推入队列:若队列满则自动阻塞,无需手动判断
        g_queue.push(i);
        
        i++; // 下一个要生产的数据值
        sleep(1); // 模拟生产耗时(1秒生产一个数据)
    }
}

// 消费者线程函数:无限循环从队列取出数据并消费
void consumer() {
    while (true) { // 死循环持续消费
        int data; // 存储从队列取出的数据
        
        // 从队列取出数据:若队列为空则自动阻塞,无需手动判断
        g_queue.pop(&data);
        
        // 打印消费信息:输出线程ID和消费的数据值(>>> 区分消费日志)
        std::cout << "  >>> 消费者 [" << std::this_thread::get_id() << "] 消费: " << data << std::endl;
        
        sleep(2); // 模拟消费耗时(2秒消费一个数据,比生产慢,队列会逐渐填满)
    }
}

int main() {
    // 创建生产者线程和消费者线程
    // 线程创建后自动执行对应的线程函数(producer/consumer)
    std::thread t_prod(producer);
    std::thread t_cons(consumer);

    // 等待线程结束:阻塞主线程,直到生产者/消费者线程退出
    // 注:由于线程函数是死循环,此处join()永远不会返回,程序会一直运行
    t_prod.join();
    t_cons.join();

    return 0;
}

注意: 如果是单生产者-单消费者模型, 不需要加锁, 可以只使用信号量完成

C++ 写法的关键点

类封装 (Encapsulation)

  • 使用者不需要手动调用 sem_init 或 sem_destroy,这些都在 Semaphore 和 RingQueue 的构造函数和析构函数中自动完成。

  • 使用者不需要知道内部用的是 sem_wait 还是 pthread_mutex_lock,接口只有简单的 push 和 pop。

RAII 锁管理 (std::lock_guard)

案例4: SPSC 无锁队列实现 (工业级写法)

这才是“不需要锁”的终极形态。我们利用 std::atomic 的 acquire / release 语义来替代锁和信号量。

#include <vector>
#include <atomic>
#include <thread>
#include <iostream>

template <typename T>
class SPSCLockFreeQueue {
private:
    std::vector<T> _buffer;
    int _capacity;
    // 使用 atomic 替代普通的 int,并移除 Mutex 和 Semaphore
    std::atomic<int> _head; 
    std::atomic<int> _tail; 

public:
    SPSCLockFreeQueue(int cap) : _capacity(cap + 1), _buffer(cap + 1), _head(0), _tail(0) {}
    // 注意:实际容量是 cap,多留一个空位用于判断队满(head == (tail + 1) % size)

    // 生产者调用
    bool push(const T& data) {
        int current_tail = _tail.load(std::memory_order_relaxed);
        int next_tail = (current_tail + 1) % _capacity;

        // 检查是否已满:需要读取 _head
        // memory_order_acquire: 保证看到消费者对 _head 的最新修改
        if (next_tail == _head.load(std::memory_order_acquire)) {
            return false; // 队列满
        }

        _buffer[current_tail] = data;

        // 更新 _tail
        // memory_order_release: 保证在此之前的写入(buffer data)对读取 tail 的线程可见
        _tail.store(next_tail, std::memory_order_release);
        return true;
    }

    // 消费者调用
    bool pop(T* out_data) {
        int current_head = _head.load(std::memory_order_relaxed);

        // 检查是否为空:需要读取 _tail
        // memory_order_acquire: 保证看到生产者对 _tail 和 buffer 的最新修改
        if (current_head == _tail.load(std::memory_order_acquire)) {
            return false; // 队列空
        }

        *out_data = _buffer[current_head];

        // 更新 _head
        // memory_order_release: 通知生产者这个位置已经空出来了
        _head.store((current_head + 1) % _capacity, std::memory_order_release);
        return true;
    }
};
  1. 针对你之前的代码:在单生产者-单消费者模式下,可以删掉 mutex。因为 semaphore 的屏障作用已经足够保证安全。你是对的。

  2. 更深层的逻辑:如果你觉得 mutex 慢,那么 semaphore 也很慢。

  3. 最佳实践:真正的 SPSC 优化方案是不使用任何阻塞工具(锁或信号量),而是利用 std::atomic 配合 CPU 缓存一致性协议,实现真正的“零拷贝、零阻塞”。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐