1. 为什么有了互斥,我还需要同步?

利用厕所坑位抢占理解互斥和同步

对于互斥,相当于手里有了一把锁

而互斥的本质是:解决竞争问题,保证同一时刻,只有一个线程能访问临界资源

例如:厕所只有一个坑位,你进去之后锁上门,别人就进不来,这样加锁处理是为了数据安全

但是只有互斥是不够的

  • 场景:还是厕所,如果没有同步,A进去了,B在门口等。A出来之后,B刚要抢,结果A动作快又抢到了锁进去了(单纯出来透口气)。B抢不到,一直在门口盲目空转或者频繁申请锁失败
  • 更糟糕的情况:生产者-消费者
    • 仓库满了,生产者抢到了锁,发现满了,释放锁
    • 生产者又抢到了锁,发现仓库还是满的,释放锁
    • 生产者陷入死循环,CPU飙升,而消费者却抢不到锁
  • A和生产者有错吗?其实并没有,遵守了互斥的规则,但是这样不合理,所以引入同步解决上述问题

同步的本质:解决时序问题

  • 让现场按照一定的顺序协同工作:“我干完了,通知你;你干完了,通知我”,同时解锁之后排队去,不能立刻再申请锁
  • 核心:等待+唤醒

总结:什么是线程同步?和互斥的区别?

互斥为了保护共享资源不被并发破坏,保证数据安全

同步为了协调线程执行的先后顺序,避免线程饥饿

同步通常建立在互斥只是,检查条件时需要保护数据

核心工具->条件变量

为了实现同步,OS提供了条件变量,它不是锁,而是一个等待队列

主要做两件事:

  • wait等待: 条件不满足,在等待队列中排队等待,释放CPU
  • signal/notify唤醒: 条件满足,被别人唤醒

2. 生产者-消费者模型

321原则

  • 3种关系:
    • 生产者VS生产者:互斥(争夺写指针)
    • 消费者VS消费者:互斥(争夺读指针)
    • 生产者VS消费者:互斥(保护队列)&& 同步(满的时候等消费,空的时候等生产)
  • 2个角色:生产者线程、消费者线程
  • 1个交易场所:阻塞队列

怎么理解模型

举个奶茶店出杯的例子:

  • 阻塞队列:出杯台,最多放3杯奶茶
  • 生产者:做奶茶的店员,把杯子放到出杯台
  • 消费者:取奶茶的配送员,从杯台拿走杯子

只要多个线程并发读写同一份共享资源,并且这个读写不是原子的,就必须互斥来维护不变量

队列中的不变量:

  • 容量
  • 任何一个槽位同一时刻只能被一个人修改占用状态
  • 读/写的移动顺序不能乱、不能重复

生产者和生产者互斥:两个店员同时出杯,争夺槽位,必须互斥,否则可能会数据覆盖

消费者和消费者互斥:两个配送员同时取杯,争夺杯子,必须互斥,否则可能逻辑错误(一杯被拿走两次)

生产者和消费者互斥:保护队列这个共享结构,如果没有互斥,P在写入,C同时读取,读到了P写一半的数据

生产者和消费者同步:互斥只保证别同时改,不保证有货/有位置

  • 假如杯台满,生产者不断过来检查,不断跑空—>应该等有空位再来
  • 假如杯台空,消费者不断过来检查,不断跑空—>应该等有余量再来

所以需要条件变量来做同步

3. 基于阻塞队列实现生产者-消费者模型封装

version1

代码

#pragma once
#include <pthread.h>
#include <iostream>
#include <queue>
#include "Mutex.hpp"
#include "Cond.hpp"

// version1
template<typename T>
class BlockQueue_v1{
private:
    std::queue<T>              _q;
    int                     _capacity;
    pthread_mutex_t         _mutex; 
    pthread_cond_t          _productor_cond;        // 队列满,生产者在这里等
    pthread_cond_t          _consumer_cond;         // 队列空,消费者在这里等
    int                     _cwait_num;
    int                     _pwait_num;

    bool IsFull() { return _q.size() == _capacity; }
    bool IsEmpty() { return _q.empty(); }
public:
    BlockQueue_v1(int cap = 10)
        :_capacity(cap)
        ,_cwait_num(0)
        ,_pwait_num(0)
        {
            // 初始化锁和条件变量
            // nullptr表示默认属性
            pthread_mutex_init(&_mutex, nullptr);
            // 条件变量是等待队列 不是锁
            pthread_cond_init(&_productor_cond, nullptr);
            pthread_cond_init(&_consumer_cond, nullptr);
        }

    // 生产者,写数据入队列
    // 输入型参数
    void Equeue(const T& in){
        // 1. 加锁
        pthread_mutex_lock(&_mutex);

        // 2. 检查条件IsFull
        // 为什么用while? 
        // POSIX标准允许OS在无信号时意外唤醒线程,必须通过循环进行二次状态校验
     	// wait返回意味着"重新持有了锁",但不代表"条件依然满足"
        // 在"收到通知"到"抢到锁"的【时间窗口】内,空位可能已被其他生产者抢先占满
        // 若用 if,当前线程会无视满队列直接 push,导致数据覆盖或溢出
        while(IsFull()){
            std::cout << "队列满了,生产者开始等待...\n";
            _pwait_num++;

            // 3. 等待
            // 函数底层做三件事:
            // 1) 解锁unlock,允许别人进来,否则会死锁
            // 2) 线程挂起->进入_productor_cond的等待队列
            // 3) 被唤醒后->重新抢锁lock,抢不到就阻塞在这里
            pthread_cond_wait(&_productor_cond, &_mutex);
            _pwait_num--;
            std::cout << "生产者被唤醒,准备生产...\n";
        }

        // 4. 生产
        // 到这里,说明IsFull为0,且我持有锁
        _q.push(in);

        // 5. 唤醒消费者
        // 只有当_cwait_num>0时,再去唤醒,否则浪费资源
        if(_cwait_num > 0){
            std::cout << "通知消费者来取货\n";
            // siganl只唤醒一个
            // broadcast全部唤醒
            pthread_cond_signal(&_consumer_cond);
        }

        // 6. 解锁
        pthread_mutex_unlock(&_mutex);
    }

    // 消费者,读数据出队列
    // 输出型参数
    void Dequeue(T* out){
        pthread_mutex_lock(&_mutex);

        while(IsEmpty()){
            std::cout << "队列空了,消费者开始等待...\n";
            _cwait_num++;

            pthread_cond_wait(&_consumer_cond, &_mutex);
            _cwait_num--;
            std::cout << "消费者被唤醒...\n";
        }
        
        *out = _q.front();
        _q.pop();

        if(_pwait_num > 0){
            std::cout << "通知生产者补货...\n";
            pthread_cond_signal(&_productor_cond);
        }

        pthread_mutex_unlock(&_mutex);
    }

    ~BlockQueue_v1(){
        pthread_mutex_destroy(&_mutex); 
        pthread_cond_destroy(&_productor_cond);
        pthread_cond_destroy(&_consumer_cond);
    }
};

接口细节

  • pthread_cond_wait(cond, mutex)
    • 一定要传锁!!!
      • 为什么?我先解锁再等待不行吗?
      • 不行,如果我在 unlockwait 之间,OS 调度切换了线程,此时另一个线程发送signal。因为我已经解锁,信号发送成功,但我还没睡。等我回来执行 wait 时,就错过了那个信号,导致永久阻塞
      • 内核态操作wait 的本质是将当前线程放入等待队列,并原子性地释放锁。这两个动作必须在内核中一口气完成,无法由用户代码分开完成
    • 调用后的状态:
      • 调用前:持有锁
      • 调用中(睡眠):释放锁,允许其他线程进入临界区操作
      • 返回后:持有锁,函数内部自动帮忙抢回锁
  • pthread_cond_singalpthread_cond_broadcast
    • signal:唤醒一个等待线程,适合一对一
    • broadcast:唤醒所有等待线程,适合读写锁写完之后,所有人都能读
    • 坑:如果用broadcast唤醒了10个消费者,但只有1个数据,10个人抢锁,一个人吃到了肉,剩下9个跑空,只能回去继续睡,浪费CPU

version2

利用RAII,自动释放资源

代码

封装锁:

#include <iostream>
#include <pthread.h>

class Mutex{
public:
    Mutex() { pthread_mutex_init(&_lock, nullptr); }
    void Lock() { pthread_mutex_lock(&_lock); }
    void Unlock() { pthread_mutex_unlock(&_lock); }
    ~Mutex() { pthread_mutex_destroy(&_lock); }
    // 关键:条件变量需要锁,要传指针
    pthread_mutex_t* GetLock() { return &_lock; }
private:
    pthread_mutex_t _lock;
};

class GuardLock{
public:
    GuardLock(Mutex& m) :_mtx(m) { _mtx.Lock(); }
    ~GuardLock() { _mtx.Unlock(); }
private:
    Mutex& _mtx;
};

封装条件变量:

#pragma once
#include <pthread.h>
#include "Mutex.hpp"

class Cond{
public:
    Cond() { pthread_cond_init(&_cond, nullptr); }
    ~Cond() { pthread_cond_destroy(&_cond); }

    // wait需要一把锁,传入封装的Mutex对象
    void Wait(Mutex& m){
        // 调用底层的pthread_cond_Wait
        // 需要原生指针
        pthread_cond_wait(&_cond, m.GetLock());
    }
    void Notify() { pthread_cond_signal(&_cond); }
    void NotifyAll() { pthread_cond_broadcast(&_cond); }
private:
    pthread_cond_t _cond;
};

实现阻塞队列:

template <typename T>
class BlockQueue_v2{
private:
    int              _capacity;
    std::queue<T>       _q;
    Mutex            _mutex;
    Cond             _productor_cond;
    Cond             _consumer_cond;
    int              _pwait_num;
    int              _cwait_num;

    bool IsFull() { return _q.size() == _capacity; }
    bool IsEmpty() { return _q.empty(); }
public:
    // Mutex和Cond会自动调用构造初始化
    BlockQueue_v2(int cap = 10)
        :_capacity(cap)
        ,_pwait_num(0)
        ,_cwait_num(0)
        {}

    // 生产者
    void Equeue(const T& in){
        GuardLock guardLock(_mutex);

        while(IsFull()){
            std::cout << "生产者进入等待...\n";
            _pwait_num++;
            _productor_cond.Wait(_mutex);
            _pwait_num--;
            std::cout << "生产者被唤醒...\n";
        }
    
        _q.push(in);
        if(_cwait_num > 0){
            std::cout << "唤醒消费者\n";
            _consumer_cond.Notify();
        }
    }   // guardLock离开作用域自动析构,解锁

    void Dequeue(T* out){
        GuardLock guardLock(_mutex);
        while(IsEmpty()){
            std::cout << "消费者进入等待...\n";
            _cwait_num++;
            _consumer_cond.Wait(_mutex);
            _cwait_num--;
            std::cout << "消费者被唤醒...\n";
        }

        *out = _q.front();
        _q.pop();

        if(_pwait_num > 0){
            std::cout << "唤醒生产者\n";
            _productor_cond.Notify();
        }
    }

    // 成员变量会自用调用析构销毁
    ~BlockQueue_v2() { }
};

4. 基于环形队列实现生产者-消费者模型

有了条件变量,为什么还要信号量?

  • 条件变量:本质是同步机制,解决了时许问题,逻辑是条件不满足我就睡,条件满足你叫醒我,它不保存状态,不知道资源具体有多少,需要结合mutex和外部变量来判断

  • 信号量:本质是资源计数器,可以解决数量问题,逻辑是票还有没有,有我就拿走一张进门,没有我就在门口等,是对资源的一种预定机制

  • 使用条件变量实现环形队列需要:

    1. 加锁
    2. while(queue.full()) wait()检查满不满
    3. 放数据
    4. signal/broadcast通知消费者
    5. 解锁

    每次操作都要抢锁,生产者和消费者哪怕互不干扰(队列既没满也没空),也要竞争同意把锁,并发度低

  • 信号量解法:把环形队列视为两种资源

    • 空闲空间资源(_spacesem):生产者关心,初始值为队列容量N
    • 数据资源(_datasem):消费者关心,初始值为0

    生产者只消耗空间,增加数据

    消费者只消耗数据,增加空间

    如果不满也不空,生产者和消费者位于不同位置,完全可以并行执行,不互斥!

    只有在相同位置->环形队列相同位置为空or为满,只有这两种情况->互斥!

所以环形队列有三大约束:

  • 生产者VS消费者: 队列为空,消费者不能读;队列为满,生产者不能写入
  • 生产者VS生产者: 多个生产者不能同时写入同一个位置
  • 消费者VS消费者: 多个消费者不能同时读取同一个位置

信号量的封装

我们使用POSIX标准的信号量,在<semaphore.h>库中

常用接口:

  • 初始化信号量

    #include <semaphore.h>
    int sem_init(sem_t* sem, int pshared, unsigned int value);
    // 参数:
    // pshared:0表示线程间共享,非0表示进程间共享
    // value:信号量初始值
    
  • 销毁

    int sem_destroy(sem_t* sem);
    
  • 等待

    // 等待信号量,将信号量的值-1
    int sem_Wait(sem_t* sme);	// P操作
    
  • 发布

    // 发布信号量,表示资源使用完毕,可以归还资源了,信号量值+1
    int sem_post(sem_t* sem);	// Vc操作
    
#pragma once
#include <semaphore.h>

namespace SemModule{
    int defalutSemVal = 1;
    class Sem{
    public:
        Sem(int value = defalutSemVal)
            :_init_value(value)
            {
                // 参数2为0,表示线程间共享,非0表示进程间共享
                int n = ::sem_init(&_sem, 0, _init_value);
            }
        
        // P操作:proberen申请资源
        // 如果计数器 > 0,原子-1,返回
        // 如果计数器 == 0,阻塞等待,知道被唤醒
        void P(){ int n = ::sem_wait(&_sem); }

        // V操作:verhogen归还资源
        // 计数器原子+1,如果有线程在等待该资源,唤醒它
        void V(){ int n = ::sem_post(&_sem); }

        ~Sem(){ int n = ::sem_destroy(&_sem); }
    private:
        sem_t _sem;
        int _init_value;
    };
}

环形队列的封装

#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include "Sem.hpp"
#include "Mutex.hpp"

namespace RingBuffer{
    using namespace SemModule;

    template<typename T>
    class RingBuff{
    private:
        std::vector<T> _ring;
        int            _capacity;
        // 生产者和消费者的索引,临界资源,需要保护
        int            _p_idx;          
        int            _c_idx;
        // 同步机制:信号量
        Sem            _dataSem;
        Sem            _spaceSem;
        // 互斥机制:锁
        Mutex          _p_lock;
        Mutex          _c_lock;
    public:
        RingBuff(int cap)
            :_capacity(cap)
            ,_ring(cap)
            ,_p_idx(0)
            ,_c_idx(0)
            ,_dataSem(0)
            ,_spaceSem(cap)
            {}

        // 生产者
        void Enqueue(const T& in){
            // 必须先P再Lock
            // 先申请资源。如果先锁再P,且资源不足,生产者抱着锁挂起
            // 消费者向消费腾出空间,却拿不到锁-`>死锁

            // 1. 申请空间资源(-1)
            _spaceSem.P();

            {
                // 2. 只有申请到资源后,才竞争锁保护临界区
                GuardLock guardLock(_p_lock);

                _ring[_p_idx++] = in;
                _p_idx %= _capacity;
            }
            // 3. 生产完毕,增加数据资源,唤醒消费者(+1)
            _dataSem.V();
        }

        // 消费者
        void Dequeue(T* out){
            // 1. 申请资源(-1) 没数据就阻塞在这,不占锁
            _dataSem.P();

            {
                // 2. 加锁访问
                GuardLock guardLock(_c_lock);

                *out = _ring[_c_idx++];
                _c_idx %= _capacity;
            }
            // 3. 消费完毕,归还空间资源,唤醒生产者(+1)
            _spaceSem.V();
        }

        ~RingBuff(){}
    };
}

5. 信号量 VS 条件变量

特性 信号量 (Semaphore) 条件变量 (Cond Var)
本质 资源计数器 (Stateful) 通知机制 (Stateless)
状态保存 保存状态。如果P操作前已经V了,信号量+1,P操作不会阻塞 不保存。如果 wait 前已经 signal 了,信号丢失,wait 会死等
互斥需求 可以在无锁(外部无Mutex)情况下使用(自身保证原子性) 必须配合 Mutex 使用
适用场景 这里的环形队列(确定数量的资源)、控制并发数 复杂的同步逻辑(如:队列既不满也不空,或者需要判断特定状态)
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐