用信号量机制实现生产者消费者模型
主要原因就是环形队列的特性和信号量机制非常的契合,信号量的计数机制与环形队列的满空状态完全通步,引入信号量之后,你甚至不需要在环形队列的数据结构中额外添加判断队空和队满的函数。在上一篇通过条件变量实现的阻塞队列中,我们是定义了一个类内的成员变量cap,实时记录超市中的货物的数量,当仓库空了的时候,我们就调用pthread_cond_wait让消费者进程陷入阻塞,当仓库满的时候就调用pthread_
数据容器的选取
设计思路
在上一篇通过条件变量实现的阻塞队列中,我们是定义了一个类内的成员变量cap,实时记录超市中的货物的数量,当仓库空了的时候,我们就调用pthread_cond_wait让消费者进程陷入阻塞,当仓库满的时候就调用pthread_cond_wait让生产者进程陷入阻塞。
而引入信号量机制之后,我们想要设计一个环形队列来存储超市仓库中的货物。为什么这一次不用阻塞队列而改用环形队列了呢?
主要原因就是环形队列的特性和信号量机制非常的契合,信号量的计数机制与环形队列的满空状态完全通步,引入信号量之后,你甚至不需要在环形队列的数据结构中额外添加判断队空和队满的函数。同时信号量的 sem_data 和 sem_space 也能Yeah直接映射队列的 “数据数” 和 “空闲数”,无需复杂的动态扩容 / 缩容逻辑,代码更简洁。
而条件变量通过 pthread_cond_wait(等待 “队列非空 / 非满”)和 pthread_cond_signal(通知 “队列状态变化”),柔性处理 “满 / 空” 边界,更有利于阻塞队列的动态扩容
阻塞队列和环形队列都能用来实现生产者消费者模型,只是他们的适用场景不同:
- 条件变量 +阻塞队列的实现方式适合资源相对宽松、需动态扩容的场景(如通用中间件、业务系统)。条件变量的 “灵活通知”,搭配阻塞队列的 “动态存储、可扩展”,能做到逻辑灵活、适配性强。
- 而信号量 + 环形队列实现方式适合资源严格受限、追求极致性能的场景(如嵌入式、高并发服务器)。信号量的 “精准计数”,搭配环形队列的 “固定容量、O (1) 读写”,能做到内存可控、效率极高。
引入信号量机制之后,我们就可以将超市中的货物数量实时记录在 sem_t 类型的信号量中。在初学操作系统时,书里面就给出了生产者进程内部的简单处理逻辑。
生产者(){
P(剩余空间);
lock(mutex);
将货物放入超市;
unlock(mutex);
V(剩余空间);
}
PV原语的伪代码
P(sem){
lock(&lock);
if(sem>0) sem--;
else 释放锁并挂起
unlock(&lock);
}
V(sem){
lock(&lock)
sem++;
unlock(&lock);
}
int sem = 5; pthread_mutex_t lock;
int arr[5]; //临界资源
P(sem)
//访问临界资源!
V(sem)
为什么在进入函数之后第一件事情是P(剩余空间)
呢?
这主要是为了实现我们的设计要求:只有仓库没有满时,生产者才能继续生产。P(剩余空间)
执行时会将剩余空间的数量减1,如果这时候信号量的值小于零,那么p操作所处的线程就会陷入阻塞。
为什么将货物放入超市之前还需要加锁呢?
这主要是为了实现生产者线程往超市仓库放东西操作的原子性,设想下边儿一种情况,当前信号量的值是5。此时有5个线程都对这一个信号量执行了p操作,那么这5个线程都能够成功的进入临界区。但是我们对于临界区的修改操作并不是原子的。如果不加锁很容易导致数据不一致的问题。
为什么加锁的操作在p操作的后面,他们顺序可不可以颠倒一下?
可以换,但是整个系统的效率会大大降低。一进来就加锁这一操作会导致多线程只能串行化执行。程序运行效率会大大降低,因此我们在设计同步互斥机制的时候有一个核心原则:确保锁的持有时间应该尽可能短,本着这个原则,我们就应该尽量晚加锁,尽量早解锁
这时候有人就可能会问。用条件变量实现了那个阻塞队列里面不就是一进来就加锁了吗?那他不是会有同样的问题吗?
是这样,但是条件变量的函数有一个非常特别的功能就是一旦陷入阻塞,它就会自动将它持有的所释放。当他被唤醒的时候才会再次去申请那把锁。这样就可以避免我们前面说的,线程带锁进入阻塞导致的串行化问题
#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>
#include <cassert>
template <class T>
class RingQueue {
private:
std::vector<T> ring_; // 环形队列存储容器
int cap_; // 队列容量
// 信号量:数据计数(已生产)、空闲位置计数(可生产)
sem_t sem_data_;
sem_t sem_space_;
// 互斥锁:保护生产/消费索引(多生产者/多消费者场景必须)
pthread_mutex_t p_lock_;
pthread_mutex_t c_lock_;
// 生产/消费索引
int p_index_ = 0; // 生产者下一个写入位置
int c_index_ = 0; // 消费者下一个读取位置
public:
RingQueue(int cap = 10)
: cap_(cap)
, ring_(cap) {
// 初始化信号量:数据计数、空闲位置计数
sem_init(&sem_data_, 0, 0);
sem_init(&sem_space_, 0, cap);
// 初始化互斥锁:生产/消费索引保护
pthread_mutex_init(&p_lock_, nullptr);
pthread_mutex_init(&c_lock_, nullptr);
}
~RingQueue() {
sem_destroy(&sem_data_);
sem_destroy(&sem_space_);
pthread_mutex_destroy(&p_lock_);
pthread_mutex_destroy(&c_lock_);
}
// 生产者:放入数据
void Push(const T& in) {
// 1. P(sem_space_):获取空闲位置(队列不满才能生产)
sem_wait(&sem_space_);
// 2. 加锁保护生产索引 p_index
pthread_mutex_lock(&p_lock_);
ring_[p_index_] = in;
p_index_ = (p_index_ + 1) % cap_;
pthread_mutex_unlock(&p_lock_);
// 3. V(sem_data_):通知消费者有新数据
sem_post(&sem_data_);
}
// 消费者:取出数据
void Pop(T& out) {
// 1. P(sem_data_):获取已生产数据(队列不空才能消费)
sem_wait(&sem_data_);
// 2. 加锁保护消费索引 c_index
pthread_mutex_lock(&c_lock_);
out = ring_[c_index_];
c_index_ = (c_index_ + 1) % cap_;
pthread_mutex_unlock(&c_lock_);
// 3. V(sem_space_):通知生产者释放空闲位置
sem_post(&sem_space_);
}
};
更多推荐
所有评论(0)