【Linux】信号量
信号量是一种用于控制多个线程/进程对共享资源访问的同步机制,本质上是一个计数器(记录了资源的剩余数量),配合两个原子操作:P 操作(等待/获取):sem_waitV 操作(发布/释放):sem_post我们申请信号量的本质其实就是预定资源,申请信号量、释放信号量的前提是要看到同一个信号量,那么信号量本身共享资源,所以PV操作必须有原子性,信号量其实就是相当于资源的锁。
目录
1. 什么是信号量?
信号量是一种用于控制多个线程/进程对共享资源访问的同步机制,本质上是一个计数器(记录了资源的剩余数量),配合两个原子操作:
P 操作(等待/获取):sem_wait
V 操作(发布/释放):sem_post
我们申请信号量的本质其实就是预定资源,
申请信号量、释放信号量的前提是要看到同一个信号量,那么信号量本身共享资源,所以PV操作必须有原子性,信号量其实就是相当于资源的锁
2. 信号量接口
初始化/销毁信号量
//初始化信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
sem:指向要初始化的信号量
pshared:
0:线程间共享非
0:进程间共享(需要放在共享内存中)
value:信号量的初始值//销毁信号量
int sem_destroy(sem_t *sem);
//等待信号量(P操作)
int sem_wait(sem_t *sem);
功能: 信号量值减1,如果值为0则阻塞
//发布信号量(V操作)
int sem_post(sem_t *sem);
功能: 信号量值加1,如果有线程在等待则唤醒一个
//获取信号量当前值
int sem_getvalue(sem_t *sem, int *sval);
这几个函数的返回值都是: 成功返回0,失败返回-1
3. CP场景
3.1 原理讲解
上一篇文章中我们使用的是一个可以动态增长的数组来充当生产消费模型中的缓冲区(消费场景),今天我们来用一个定长的环型队列来重新写一个生产消费模型。
环形队列是一种通过取模运算来模拟环状特性的线性数据结构。

如图所示,当生产者和消费者都在同一个位置时只有队列为空或为满的时候。
那么我们如何判断队列为空还是为满呢?
可以通过加计数器或者 标记位来判断满或者空。另外也可以预留⼀个空的位置,作为满的状态。但是现在我们可以通过把信号量当做计数器来进行线程间通信了。
若队列为空(信号量为满):生产者必须先生产数据,消费者才可以消费;
若队列为满(信号量为0):消费者必须先消费数据,生产者才可以生产。

当生产者和消费者不在同一个位置时,生产和消费可以并发进行。
4. 代码实例
4.1 封装信号量
//Sem.hpp
#pragma once
#include<iostream>
#include<semaphore.h>
//封装信号量
class Sem
{
public:
Sem(int num)
:_initnum(num)
{
sem_init(&_sem,0,_initnum);
}
//P操作
void P()
{
sem_wait(&_sem);
}
//V操作
void V()
{
sem_post(&_sem);
}
~Sem()
{
sem_destroy(&_sem);
}
private:
sem_t _sem;
int _initnum;//信号量初始值
};
4.2 RingQueue
我们将RingQueue作为生产消费的场所(缓冲区),单生产者-消费者场景和单生产者-消费者场景的情况稍有不同
在单生产者-消费者场景中,由于只有一个生产者和一个消费者线程,我们只需要确保两者对共享缓冲区的访问是互斥的。此时,二进制信号量可以作为一种高效的互斥机制:
//RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include "Sem.hpp"
const int gcap = 5;
template <class T>
class RingQueue
{
public:
RingQueue(int cap = gcap)
: _ringqueue(cap), _cap(cap), _space_sem(cap), _data_sem(0)
, _c_step(0), _p_step(0)
{
}
// 生产数据
void Push(const T &data)
{
_space_sem.P();
_ringqueue[_p_step++] = data;
_p_step %= _cap; // 维持环型特点
_data_sem.V();
}
// 消费数据
void Pop(T *data)
{
_data_sem.P();
*data = _ringqueue[_c_step++];
_c_step %= _cap;
_space_sem.V();
}
~RingQueue() {}
private:
std::vector<T> _ringqueue; // 临界资源
int _cap; // 容量
Sem _space_sem;
Sem _data_sem;
int _c_step;
int _p_step;
};
在多生产者-消费者场景中,我们就需要考虑生产者之间以及消费者之间的互斥,所以这个时候就必须用锁来维护生产和消费的过程,我们直接复用前面封装过的互斥锁:
#pragma once
#include <iostream>
#include <vector>
#include "Sem.hpp"
#include "Mutex.hpp"
const int gcap = 5;
template <class T>
class RingQueue
{
public:
RingQueue(int cap = gcap)
: _ringqueue(cap), _cap(cap), _space_sem(cap), _data_sem(0)
, _c_step(0), _p_step(0)
{
}
// 生产数据
void Push(const T &data)
{
// 先等待用来生产数据的空间
_space_sem.P();
{
MutexGuard MG(&_p_mutex);
// 生产数据
_ringqueue[_p_step++] = data;
_p_step %= _cap; // 维持环型特点
}
// 将数据信号量++
_data_sem.V();
}
// 消费数据
void Pop(T *data)
{
_data_sem.P();
{
MutexGuard MG(&_c_mutex);
*data = _ringqueue[_c_step++];
_c_step %= _cap;
}
_space_sem.V();
}
~RingQueue() {}
private:
std::vector<T> _ringqueue; // 临界资源
int _cap; // 容量
Sem _space_sem;
Sem _data_sem;
int _c_step;
int _p_step;
Mutex _c_mutex;
Mutex _p_mutex;
};
4.3 重写生成消费模型
4.3.1 单生产者-消费者模型
#include"RingQueue.hpp"
#include<unistd.h>
#define Type int
void* consumer(void* args)
{
RingQueue<Type>* rq = static_cast<RingQueue<Type>*>(args);
while(true)
{
sleep(1);
Type data;
rq->Pop(&data);
std::cout<<"消费数据:"<<data<<std::endl;
}
}
void* producer(void* args)
{
RingQueue<Type>* rq = static_cast<RingQueue<Type>*>(args);
Type data = 1;
while(true)
{
//sleep(1);
rq->Push(data);
std::cout<<"生产数据:"<<data<<std::endl;
data++;
}
}
int main()
{
//单线程
pthread_t c,p;
RingQueue<Type>* rq = new RingQueue<int>();
pthread_create(&c,nullptr,consumer,(void*)rq);
pthread_create(&p,nullptr,producer,(void*)rq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete rq;
return 0;
}
4.3.2 多生产者-消费者模型
只需要修改一下main函数即可,由于多线程并发进行所以打印可能会有点乱
int main()
{
int cnum = 3,pnum = 5;
std::vector<pthread_t> cs(cnum),ps(pnum);
RingQueue<Type>*rq = new RingQueue<int>();
for(int i = 0;i<cnum;i++)
{
pthread_create(&cs[i],nullptr,consumer,(void*)rq);
}
for(int i = 0;i<pnum;i++)
{
pthread_create(&ps[i],nullptr,producer,(void*)rq);
}
for(int i = 0;i<cnum;i++)
{
pthread_join(cs[i],nullptr);
}
for(int i = 0;i<pnum;i++)
{
pthread_join(ps[i],nullptr);
}
delete rq;
return 0;
}
更多推荐

所有评论(0)