基于生产者-消费者模型下的线程同步综述
文章摘要: 本文深入探讨了线程同步与互斥的区别及实现方式。互斥解决资源竞争问题,保证数据安全;同步解决执行时序问题,避免线程饥饿。通过厕所坑位和奶茶店出杯的生动例子,解释了生产者-消费者模型的321原则(3种关系、2个角色、1个交易场所)。重点分析了条件变量的使用,包括wait等待和signal唤醒机制,并提供了基于阻塞队列的生产者-消费者模型实现代码。文章还强调wait必须原子性地释放锁并进入等
文章目录
1. 为什么有了互斥,我还需要同步?
利用厕所坑位抢占理解互斥和同步
对于互斥,相当于手里有了一把锁
而互斥的本质是:解决竞争问题,保证同一时刻,只有一个线程能访问临界资源
例如:厕所只有一个坑位,你进去之后锁上门,别人就进不来,这样加锁处理是为了数据安全
但是只有互斥是不够的
- 场景:还是厕所,如果没有同步,A进去了,B在门口等。A出来之后,B刚要抢,结果A动作快又抢到了锁进去了(单纯出来透口气)。B抢不到,一直在门口盲目空转或者频繁申请锁失败
- 更糟糕的情况:生产者-消费者
- 仓库满了,生产者抢到了锁,发现满了,释放锁
- 生产者又抢到了锁,发现仓库还是满的,释放锁
- 生产者陷入死循环,CPU飙升,而消费者却抢不到锁
- A和生产者有错吗?其实并没有,遵守了互斥的规则,但是这样不合理,所以引入同步解决上述问题
同步的本质:解决时序问题
- 让现场按照一定的顺序协同工作:“我干完了,通知你;你干完了,通知我”,同时解锁之后排队去,不能立刻再申请锁
- 核心:等待+唤醒
总结:什么是线程同步?和互斥的区别?
互斥为了保护共享资源不被并发破坏,保证数据安全
同步为了协调线程执行的先后顺序,避免线程饥饿
同步通常建立在互斥只是,检查条件时需要保护数据
核心工具->条件变量
为了实现同步,OS提供了条件变量,它不是锁,而是一个等待队列
主要做两件事:
- wait等待: 条件不满足,在等待队列中排队等待,释放CPU
- signal/notify唤醒: 条件满足,被别人唤醒
2. 生产者-消费者模型
321原则
- 3种关系:
- 生产者VS生产者:互斥(争夺写指针)
- 消费者VS消费者:互斥(争夺读指针)
- 生产者VS消费者:互斥(保护队列)&& 同步(满的时候等消费,空的时候等生产)
- 2个角色:生产者线程、消费者线程
- 1个交易场所:阻塞队列
怎么理解模型
举个奶茶店出杯的例子:
- 阻塞队列:出杯台,最多放3杯奶茶
- 生产者:做奶茶的店员,把杯子放到出杯台
- 消费者:取奶茶的配送员,从杯台拿走杯子
只要多个线程并发读写同一份共享资源,并且这个读写不是原子的,就必须互斥来维护不变量
队列中的不变量:
- 容量
- 任何一个槽位同一时刻只能被一个人修改占用状态
- 读/写的移动顺序不能乱、不能重复
生产者和生产者互斥:两个店员同时出杯,争夺槽位,必须互斥,否则可能会数据覆盖
消费者和消费者互斥:两个配送员同时取杯,争夺杯子,必须互斥,否则可能逻辑错误(一杯被拿走两次)
生产者和消费者互斥:保护队列这个共享结构,如果没有互斥,P在写入,C同时读取,读到了P写一半的数据
生产者和消费者同步:互斥只保证别同时改,不保证有货/有位置
- 假如杯台满,生产者不断过来检查,不断跑空—>应该等有空位再来
- 假如杯台空,消费者不断过来检查,不断跑空—>应该等有余量再来
所以需要条件变量来做同步
3. 基于阻塞队列实现生产者-消费者模型封装
version1
代码
#pragma once
#include <pthread.h>
#include <iostream>
#include <queue>
#include "Mutex.hpp"
#include "Cond.hpp"
// version1
template<typename T>
class BlockQueue_v1{
private:
std::queue<T> _q;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _productor_cond; // 队列满,生产者在这里等
pthread_cond_t _consumer_cond; // 队列空,消费者在这里等
int _cwait_num;
int _pwait_num;
bool IsFull() { return _q.size() == _capacity; }
bool IsEmpty() { return _q.empty(); }
public:
BlockQueue_v1(int cap = 10)
:_capacity(cap)
,_cwait_num(0)
,_pwait_num(0)
{
// 初始化锁和条件变量
// nullptr表示默认属性
pthread_mutex_init(&_mutex, nullptr);
// 条件变量是等待队列 不是锁
pthread_cond_init(&_productor_cond, nullptr);
pthread_cond_init(&_consumer_cond, nullptr);
}
// 生产者,写数据入队列
// 输入型参数
void Equeue(const T& in){
// 1. 加锁
pthread_mutex_lock(&_mutex);
// 2. 检查条件IsFull
// 为什么用while?
// POSIX标准允许OS在无信号时意外唤醒线程,必须通过循环进行二次状态校验
// wait返回意味着"重新持有了锁",但不代表"条件依然满足"
// 在"收到通知"到"抢到锁"的【时间窗口】内,空位可能已被其他生产者抢先占满
// 若用 if,当前线程会无视满队列直接 push,导致数据覆盖或溢出
while(IsFull()){
std::cout << "队列满了,生产者开始等待...\n";
_pwait_num++;
// 3. 等待
// 函数底层做三件事:
// 1) 解锁unlock,允许别人进来,否则会死锁
// 2) 线程挂起->进入_productor_cond的等待队列
// 3) 被唤醒后->重新抢锁lock,抢不到就阻塞在这里
pthread_cond_wait(&_productor_cond, &_mutex);
_pwait_num--;
std::cout << "生产者被唤醒,准备生产...\n";
}
// 4. 生产
// 到这里,说明IsFull为0,且我持有锁
_q.push(in);
// 5. 唤醒消费者
// 只有当_cwait_num>0时,再去唤醒,否则浪费资源
if(_cwait_num > 0){
std::cout << "通知消费者来取货\n";
// siganl只唤醒一个
// broadcast全部唤醒
pthread_cond_signal(&_consumer_cond);
}
// 6. 解锁
pthread_mutex_unlock(&_mutex);
}
// 消费者,读数据出队列
// 输出型参数
void Dequeue(T* out){
pthread_mutex_lock(&_mutex);
while(IsEmpty()){
std::cout << "队列空了,消费者开始等待...\n";
_cwait_num++;
pthread_cond_wait(&_consumer_cond, &_mutex);
_cwait_num--;
std::cout << "消费者被唤醒...\n";
}
*out = _q.front();
_q.pop();
if(_pwait_num > 0){
std::cout << "通知生产者补货...\n";
pthread_cond_signal(&_productor_cond);
}
pthread_mutex_unlock(&_mutex);
}
~BlockQueue_v1(){
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_productor_cond);
pthread_cond_destroy(&_consumer_cond);
}
};
接口细节
pthread_cond_wait(cond, mutex)- 一定要传锁!!!
- 为什么?我先解锁再等待不行吗?
- 不行,如果我在
unlock和wait之间,OS 调度切换了线程,此时另一个线程发送signal。因为我已经解锁,信号发送成功,但我还没睡。等我回来执行wait时,就错过了那个信号,导致永久阻塞 - 内核态操作:
wait的本质是将当前线程放入等待队列,并原子性地释放锁。这两个动作必须在内核中一口气完成,无法由用户代码分开完成
- 调用后的状态:
- 调用前:持有锁
- 调用中(睡眠):释放锁,允许其他线程进入临界区操作
- 返回后:持有锁,函数内部自动帮忙抢回锁
- 一定要传锁!!!
pthread_cond_singal和pthread_cond_broadcastsignal:唤醒一个等待线程,适合一对一broadcast:唤醒所有等待线程,适合读写锁写完之后,所有人都能读- 坑:如果用
broadcast唤醒了10个消费者,但只有1个数据,10个人抢锁,一个人吃到了肉,剩下9个跑空,只能回去继续睡,浪费CPU
version2
利用RAII,自动释放资源
代码
封装锁:
#include <iostream>
#include <pthread.h>
class Mutex{
public:
Mutex() { pthread_mutex_init(&_lock, nullptr); }
void Lock() { pthread_mutex_lock(&_lock); }
void Unlock() { pthread_mutex_unlock(&_lock); }
~Mutex() { pthread_mutex_destroy(&_lock); }
// 关键:条件变量需要锁,要传指针
pthread_mutex_t* GetLock() { return &_lock; }
private:
pthread_mutex_t _lock;
};
class GuardLock{
public:
GuardLock(Mutex& m) :_mtx(m) { _mtx.Lock(); }
~GuardLock() { _mtx.Unlock(); }
private:
Mutex& _mtx;
};
封装条件变量:
#pragma once
#include <pthread.h>
#include "Mutex.hpp"
class Cond{
public:
Cond() { pthread_cond_init(&_cond, nullptr); }
~Cond() { pthread_cond_destroy(&_cond); }
// wait需要一把锁,传入封装的Mutex对象
void Wait(Mutex& m){
// 调用底层的pthread_cond_Wait
// 需要原生指针
pthread_cond_wait(&_cond, m.GetLock());
}
void Notify() { pthread_cond_signal(&_cond); }
void NotifyAll() { pthread_cond_broadcast(&_cond); }
private:
pthread_cond_t _cond;
};
实现阻塞队列:
template <typename T>
class BlockQueue_v2{
private:
int _capacity;
std::queue<T> _q;
Mutex _mutex;
Cond _productor_cond;
Cond _consumer_cond;
int _pwait_num;
int _cwait_num;
bool IsFull() { return _q.size() == _capacity; }
bool IsEmpty() { return _q.empty(); }
public:
// Mutex和Cond会自动调用构造初始化
BlockQueue_v2(int cap = 10)
:_capacity(cap)
,_pwait_num(0)
,_cwait_num(0)
{}
// 生产者
void Equeue(const T& in){
GuardLock guardLock(_mutex);
while(IsFull()){
std::cout << "生产者进入等待...\n";
_pwait_num++;
_productor_cond.Wait(_mutex);
_pwait_num--;
std::cout << "生产者被唤醒...\n";
}
_q.push(in);
if(_cwait_num > 0){
std::cout << "唤醒消费者\n";
_consumer_cond.Notify();
}
} // guardLock离开作用域自动析构,解锁
void Dequeue(T* out){
GuardLock guardLock(_mutex);
while(IsEmpty()){
std::cout << "消费者进入等待...\n";
_cwait_num++;
_consumer_cond.Wait(_mutex);
_cwait_num--;
std::cout << "消费者被唤醒...\n";
}
*out = _q.front();
_q.pop();
if(_pwait_num > 0){
std::cout << "唤醒生产者\n";
_productor_cond.Notify();
}
}
// 成员变量会自用调用析构销毁
~BlockQueue_v2() { }
};
4. 基于环形队列实现生产者-消费者模型
有了条件变量,为什么还要信号量?
-
条件变量:本质是同步机制,解决了时许问题,逻辑是条件不满足我就睡,条件满足你叫醒我,它不保存状态,不知道资源具体有多少,需要结合
mutex和外部变量来判断 -
信号量:本质是资源计数器,可以解决数量问题,逻辑是票还有没有,有我就拿走一张进门,没有我就在门口等,是对资源的一种预定机制
-
使用条件变量实现环形队列需要:
- 加锁
while(queue.full()) wait()检查满不满- 放数据
signal/broadcast通知消费者- 解锁
每次操作都要抢锁,生产者和消费者哪怕互不干扰(队列既没满也没空),也要竞争同意把锁,并发度低
-
信号量解法:把环形队列视为两种资源
- 空闲空间资源(
_spacesem):生产者关心,初始值为队列容量N - 数据资源(
_datasem):消费者关心,初始值为0
生产者只消耗空间,增加数据
消费者只消耗数据,增加空间
如果不满也不空,生产者和消费者位于不同位置,完全可以并行执行,不互斥!
只有在相同位置->环形队列相同位置为空or为满,只有这两种情况->互斥!
- 空闲空间资源(
所以环形队列有三大约束:
- 生产者VS消费者: 队列为空,消费者不能读;队列为满,生产者不能写入
- 生产者VS生产者: 多个生产者不能同时写入同一个位置
- 消费者VS消费者: 多个消费者不能同时读取同一个位置
信号量的封装
我们使用POSIX标准的信号量,在<semaphore.h>库中
常用接口:
初始化信号量
#include <semaphore.h> int sem_init(sem_t* sem, int pshared, unsigned int value); // 参数: // pshared:0表示线程间共享,非0表示进程间共享 // value:信号量初始值销毁
int sem_destroy(sem_t* sem);等待
// 等待信号量,将信号量的值-1 int sem_Wait(sem_t* sme); // P操作发布
// 发布信号量,表示资源使用完毕,可以归还资源了,信号量值+1 int sem_post(sem_t* sem); // Vc操作
#pragma once
#include <semaphore.h>
namespace SemModule{
int defalutSemVal = 1;
class Sem{
public:
Sem(int value = defalutSemVal)
:_init_value(value)
{
// 参数2为0,表示线程间共享,非0表示进程间共享
int n = ::sem_init(&_sem, 0, _init_value);
}
// P操作:proberen申请资源
// 如果计数器 > 0,原子-1,返回
// 如果计数器 == 0,阻塞等待,知道被唤醒
void P(){ int n = ::sem_wait(&_sem); }
// V操作:verhogen归还资源
// 计数器原子+1,如果有线程在等待该资源,唤醒它
void V(){ int n = ::sem_post(&_sem); }
~Sem(){ int n = ::sem_destroy(&_sem); }
private:
sem_t _sem;
int _init_value;
};
}
环形队列的封装
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include "Sem.hpp"
#include "Mutex.hpp"
namespace RingBuffer{
using namespace SemModule;
template<typename T>
class RingBuff{
private:
std::vector<T> _ring;
int _capacity;
// 生产者和消费者的索引,临界资源,需要保护
int _p_idx;
int _c_idx;
// 同步机制:信号量
Sem _dataSem;
Sem _spaceSem;
// 互斥机制:锁
Mutex _p_lock;
Mutex _c_lock;
public:
RingBuff(int cap)
:_capacity(cap)
,_ring(cap)
,_p_idx(0)
,_c_idx(0)
,_dataSem(0)
,_spaceSem(cap)
{}
// 生产者
void Enqueue(const T& in){
// 必须先P再Lock
// 先申请资源。如果先锁再P,且资源不足,生产者抱着锁挂起
// 消费者向消费腾出空间,却拿不到锁-`>死锁
// 1. 申请空间资源(-1)
_spaceSem.P();
{
// 2. 只有申请到资源后,才竞争锁保护临界区
GuardLock guardLock(_p_lock);
_ring[_p_idx++] = in;
_p_idx %= _capacity;
}
// 3. 生产完毕,增加数据资源,唤醒消费者(+1)
_dataSem.V();
}
// 消费者
void Dequeue(T* out){
// 1. 申请资源(-1) 没数据就阻塞在这,不占锁
_dataSem.P();
{
// 2. 加锁访问
GuardLock guardLock(_c_lock);
*out = _ring[_c_idx++];
_c_idx %= _capacity;
}
// 3. 消费完毕,归还空间资源,唤醒生产者(+1)
_spaceSem.V();
}
~RingBuff(){}
};
}
5. 信号量 VS 条件变量
| 特性 | 信号量 (Semaphore) | 条件变量 (Cond Var) |
|---|---|---|
| 本质 | 资源计数器 (Stateful) | 通知机制 (Stateless) |
| 状态保存 | 保存状态。如果P操作前已经V了,信号量+1,P操作不会阻塞 | 不保存。如果 wait 前已经 signal 了,信号丢失,wait 会死等 |
| 互斥需求 | 可以在无锁(外部无Mutex)情况下使用(自身保证原子性) | 必须配合 Mutex 使用 |
| 适用场景 | 这里的环形队列(确定数量的资源)、控制并发数 | 复杂的同步逻辑(如:队列既不满也不空,或者需要判断特定状态) |
更多推荐


所有评论(0)