🌟 各位看官好,我是egoist2023

🌍 Linux == Linux is not Unix !

🚀 今天来学习Linux的线程同步,有了线程互斥与同步,再来学习所谓的生产者消费者模型,基于该模型再提出阻塞队列。

👍 如果觉得这篇文章有帮助,欢迎您一键三连,分享更多人哦!

目录

线程同步

条件变量

同步概念与竞态条件

条件变量函数

初步理解

demo代码,验证条件变量

生产者消费者模型

为何使用生产者消费者模型

单单CP代码

生产数据

为什么pthread_cond_wait需要互斥量?

消费数据

基于BlockingQueue的生产者消费者模型

多多CP代码

生产和消费者的周边问题

总结

附源码

阻塞队列

条件变量封装


线程同步

继续拿图书馆的例子,如果你此时学完了语文,此时出了自习室把钥匙进行了归还.可是你突然想到你的数学还没复习啊!于是你又马上申请了钥匙(图书馆并没有所谓的排队规则).此时钥匙还是在你手上,其他人还在这拜拜等待,做着没有意义的事情.

这不就对应如果进程的某个线程抢占资源的能力很强,导致其他线程不能申请到锁,如果这个线程不进行break,当票为0时还拿着这把锁.而其他线程还在竞争着这把锁.但是票为0了啊,它们还在做着没有意义的事情,浪费CPU的资源.

void *routel(void* args)
{
    std::string name = static_cast<const char*>(args);
    while (true)
    {
        pthread_mutex_lock(&lock);
        if(tickets>0)
        {
            usleep(10000);
            printf("%s 抢占票号: %d\n", name.c_str(), tickets--);
            pthread_mutex_unlock(&lock);
        }
    }
    return nullptr;
}

需要明白的是,我这个线程只是抢占能力强罢了,有范错吗?不也是按照规则进行申请锁和释放吗?没错啊,只是这样做并不合理!这样可能会导致效率低下的问题!

其他线程因为长期申请不到锁,导致无法执行,进而导致饥饿问题!

为了解决这个问题,图书馆出了以下规矩:

  1. 互斥进入
  2. 凡是从自习室出来的人,归还钥匙之后,不能立即申请
  3. 外部的人,必须排好队,出来的人,必须排到队列尾部!

即在临界资源安全的情况下,让不同的线程,访问临界资源,具有一定的顺序性! -- 线程同步!

条件变量

  • 当⼀个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
  • 例如⼀个线程访问队列时,发现队列为空,它只能等待,只到其它线程将⼀个节点添加到队列中。这种情况就需要⽤到条件变量。

同步概念与竞态条件

  • 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从⽽有效避免饥饿问题,叫做同步
  • 竞态条件:因为时序问题,⽽导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解

条件变量函数

条件变量函数接口是和互斥锁非常相似的.

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

要在条件变量下等,那么就要让线程能看到条件变量啊!
PTHREAD_COND_INITIALZER做初始化,经过编译过后就是全局变量,就在地址空间的全局数据区,所以不需要init,因为已经初始化了,为什么不需要destroy,因为全局不需要,进程结束了全局变量自然被释放了.

int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

参数:

  • cond:要初始化的条件变量
  • attr:NULL

int pthread_cond_destroy(pthread_cond_t *cond);

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

参数:

  • cond:要在这个条件变量上等待
  • mutex:互斥量,为什么要带这个互斥量呢?(下面解释)

int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);

初步理解

为了更好地理解条件变量,这里讲个小故事:

A准备放,B就来拿苹果了,放没放是不确定的,导致二义性.

把苹果放完了,知道下一次什么时候放吗?不晓得,只能申请锁,再次检测是否有苹果,释放锁.(B拿苹果特别慢,导致A一直频繁申请释放锁,做无用工作) --> 不高效(B也同样如此)

此时,加了一个铃铛:

  1. 放苹果的人开锁,放苹果,敲铃铛,告诉拿苹果的人可以来拿苹果啦,进行解锁.
  2. 拿苹果的人不道盘子上什么时候有苹果,进行开锁,发现盘子没有苹果,此时就不再进行解锁,而是在铃铛处等待.当放苹果的人放了苹果,敲下了铃铛,此时拿苹果的人就可以来拿了.
  3. 假设拿苹果的人有很多,典型的消费者模型.此时第一个拿苹果的人去开锁,发现没有苹果,在铃铛处等待,当第二个拿苹果的人去开锁,发现还是没有苹果,继续在铃铛处等待,直到放苹果的人开了锁,放了苹果,去敲了铃铛,此时他有两种方案:
  • 让排在队头的人去拿苹果;
  • 让所有人去竞争这个苹果.

demo代码,验证条件变量

在下面这段代码中要能观察下面三种情况:

  1. 如果唤醒一个线程,所有线程执行要按照排队方式执行;
  2. 所有线程要看到等的过程;
  3. 主线程能唤醒所有线程的过程.
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;

void *active(void *args)
{
    std::string name = static_cast<const char*>(args);
    while(true)
    {
        pthread_mutex_lock(&gmutex);
        pthread_cond_wait(&gcond, &gmutex);
        std::cout << name << " active !" << std::endl;
        pthread_mutex_unlock(&gmutex);
    }

    return (void*)0;
}

int main()
{
    pthread_t t1, t2, t3, t4;
    pthread_create(&t1, nullptr, active, (void *)"thread-1");
    pthread_create(&t2, nullptr, active, (void *)"thread-2");
    pthread_create(&t3, nullptr, active, (void *)"thread-3");
    pthread_create(&t4, nullptr, active, (void *)"thread-4");

    sleep(5);
    while(true)
    {
        //pthread_cond_signal(&gcond);
        pthread_cond_broadcast(&gcond);
        sleep(1);
    }
    pthread_join(t1, nullptr);
    pthread_join(t2, nullptr);
    pthread_join(t3, nullptr);
    pthread_join(t4, nullptr);
}

生产者消费者模型

为何使用生产者消费者模型

⽣产者消费者模式就是通过⼀个容器来解决⽣产者和消费者的强耦合问题。⽣产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取,阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒。这个阻塞队列就是⽤来给⽣产者和消费者解耦的。


生活中,哪些场景,是符合生产和消费场景?最典型的就是超市啊!(需要明确超市并不生产产品)

我们清楚超市需要进货,那么就需要有对应的供货商,而货最后要交给消费者进行消费.

可是,为什么要有超市啊?消费者直接去对应的供货商购买不行吗?

  • 消费者自身零零散散的要求,可能在一个供货商有进行提供,另一个没有;
  • 供货商一般都离市区较远,消费者不可能打个车特意去买货.
  • 消费者过去后只跟供货商要一根火腿肠呢?这不是对双方都不利么?

结论一:减少生产和消费过程中产生的成本!

  • 过年前,生产者特别忙,把东西缓存到超市里;
  • 过年中,生产者特别闲,消费者不受影响,消费超市内缓存的大量火腿肠

结论二:支持生产和消费的忙闲不均

如果是消费者需要的时候去找生产者生产,不需要的时候生产者不生产,这不是一种强迫吗?不是一种强耦合的关系吗?

结论三:维护松耦合关系(解耦)

实际上维护松耦合关系可以支撑让生产和消费的忙闲不均,从而做到减少生成和消费过程中产生的成本!

既然引入了超市,当生产者正在向超市进火腿肠时,消费者就来消费呢? --> 此时超市不就是所谓的公共资源,既然是公共资源,那么就要进行保护啊!因此需要加锁啊,让二者进行竞争锁资源.

如何正确的进行生产和消费?

本质:维护生产者和消费者之间的关系!

生产者之间是什么关系?互斥关系

  • 我们清楚供应商之间是会有对应的竞争的,当一个供货商向超市进货时,其他供货商不能来打扰.

消费者之间是什么关系?互斥关系

  • 不对啊?消费者之间能是互斥关系?当只剩一根火腿肠,消费者之间都需要这个火腿肠,抢不抢?

生产者和消费者之间是什么关系?互斥 && 同步

  • 生产者正在进火腿肠,消费者就来消费?
  • 生产者或消费者高频访问超市?让超市来通知,二者就不需要竞争锁,达到唤醒双方的过程

为了方便记忆,这里引出'321'原则:

3种关系 --> 消费者之间、生产者之间的互斥,消费者和生产者之间的互斥与同步;

2种角色 --> 生产者和消费者

1个交易场所(不就是内存块) --> 场所(交易场所的变化,会引起二者的变化)

这也是自底向上的啊!

这里引出一个问题:既然说生成消费模型高效,而我们发现生产和消费的过程是串行的啊!高效又体现在哪呢?

单单CP代码

在生产消费者模型中,为了保证'321'原则,需要一把锁保证互斥,那么是生产者、消费者共用一把铃铛(条件变量)还是生产者、消费者各自拥有一个铃铛呢?

如果选择一个条件变量:

  • 当一个生产者放入数据后,它调用 pthread_cond_signal(&_cond)。此时如果队列未满,而一个正在等待的生产者被唤醒了,它会继续生产。而真正嗷嗷待哺的消费者可能还在沉睡。这就导致了生产过剩,而消费没有及时跟上。

  • 反之,消费者消费完一个数据后,队列从“满”变为“不满”,它本意是想唤醒生产者。但如果唤醒的是另一个消费者,这个被唤醒的消费者线程去检查队列,发现队列可能是空的,于是又得继续睡眠。这就是无效唤醒。

各自拥有一个条件变量:

  • _p_cond (生产者条件变量) 只由消费者(Consumer)触发,用来通知生产者:“队列不满了,你们可以开始生产了!”

  • _c_cond (消费者条件变量) 只由生产者(Producer)触发,用来通知消费者:“队列不空了,你们可以开始消费了!”

因此应该选择两个条件变量,_c_wait_num表示在_c_cond条件变量等待的个数,_p_wait_num表示在条件变量_p_cond等待的个数

template<typename T>
class BlockQueue
{
private:
    // 临界资源
    std::queue<T> _bq; // blockqueue
    u_int32_t _cap;     // 容量

    pthread_mutex_t _lock;
    pthread_cond_t _c_cond; // 消费者用的cond
    pthread_cond_t _p_cond; // 生产者用的cond

    int _c_wait_num; // 当前消费者等待的个数
    int _p_wait_num; // 当前生产者等待的个数
}; 

构造函数和析构函数:

    const static u_int32_t gcap = 5; // For Deubug

    BlockQueue(u_int32_t cap = gcap):_cap(cap), _c_wait_num(0), _p_wait_num(0)
    {
        pthread_mutex_init(&_lock, nullptr);
        pthread_cond_init(&_c_cond, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_c_cond);
        pthread_cond_destroy(&_p_cond);
    }

生产数据

private:
    bool IsFull()
    {
        return _bq.size() >= _cap;
    }

public:
    void Enqueue(const T &in)
    {
        pthread_mutex_lock(&_lock);
        // 要进行生产, 就一定能够进行生产吗?满足生产条件!!
        if(IsFull())
        {
            _p_wait_num++;
            pthread_cond_wait(&_p_cond, &_lock);  // 特征1:自动释放锁! 特征2: 自动重新竞争并持有锁 
            _p_wait_num--;
            // 当我们被唤醒的时候,就一定又从这个位置唤醒了!
            // 是在临界区内被唤醒的!!!
        }
        // 不满的
        _bq.push(in); // 完成生产
        if(_c_wait_num > 0)
            pthread_cond_signal(&_c_cond); // 唤醒消费者
        pthread_mutex_unlock(&_lock);
    }

为什么pthread_cond_wait需要互斥量?

问题1: 我在进行等待的时候,我可是在临界区里等啊!我可是持有锁的!!!其他线程还是需要竞争锁啊?

因此pthread_cond_wait的作用:

  • 特征1:自动释放锁!
  • 特征2: 自动重新竞争并持有锁

问题2: 我为什么把自己弄得需要再临界区内部等??先判断队列是否为满(生产条件是否满足)

  • 判断队列是否为满,本身就是访问临界资源!!
  • 判断队列是否为满,必须在临界区内部判断
  • 生产者必须先申请锁,在临界区内部判断
  • 判断为满的结果,需要等待的结构,也一定在临界区内部!
  • 所以,等待的时候,在内部释放锁是必然的!
  • 所以,锁被做到的pthread_cond_wait的参数中!!!

实际上面这份代码是有bug,重点就出在if判断条件下.为什么呢?这里涉及到两种情况:

  • 如果pthread_cond_wait调用失败了呢?那不是会出去if判断进行生产数据了啊!但此时共享资源是满的;

  • 存在为唤醒的情况,那么什么是伪唤醒呢?在极少数情况下,线程可能会在没有任何 pthread_cond_signalpthread_cond_broadcast 调用的情况下被唤醒。这听起来很奇怪,但它是 POSIX 线程标准所允许的。这可能是由于操作系统内核的实现细节、信号处理或其他复杂原因导致的。一个健壮的多线程程序必须能够正确处理这种情况。

为了解决这个问题,因此就不能使用if判断了,应该为while循环

while(IsFull())
{
    ...
}

消费数据

    void Pop(T *out)
    {
        pthread_mutex_lock(&_lock);
        while(IsEmpty())
        {
            _c_wait_num++;
            pthread_cond_wait(&_c_cond, &_lock);
            _c_wait_num--;
        }

        *out = _bq.front();
        _bq.pop();
        if(_p_wait_num>0)
            pthread_cond_signal(&_p_cond); // 唤醒生产者
        pthread_mutex_unlock(&_lock);
    }

基于BlockingQueue的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放⼊了元素;当队列满时,往队列⾥存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

管道不就是进程间的生产者消费者模型!!!
管道本身:就是一个阻塞队列 -- 队列的元素是字节!

多多CP代码

上面的单单CP代码就解决了多多CP代码的问题,为什么呢?

多多CP问题需要处理的有,生产者间的互斥、消费者间的互斥、生产者与消费者间的互斥同步.

通过“一把锁管互斥,两组信号管同步”的设计哲学,清晰、高效地解决了多线程并发中的所有核心难题。

生产和消费者的周边问题

生产消费模型高效,而我们发现生产和消费的过程是串行的啊!高效又体现在哪呢?

数据来源哪来?数据来源这里火腿肠供应商在生产火腿

怎么才叫消费?消费者从交易场所拿到了数据,还需要处理数据或做任务啊!

有没有可能生产者和消费者会并发执行呢?指的是生产之前和消费之后的并发 --> 高效

生产和消费,单纯仅是传递数据吗?

那肯定不是啊! 消费者从超市拿到了火腿肠、泡面,他的下一步应该怎么做?把火腿肠、泡面吃掉啊!所以传递数据不是目的,而是要消费数据!

总结

本文深入探讨了Linux线程同步中的生产者消费者模型。首先通过图书馆自习室钥匙的比喻,解释了线程抢占资源导致的饥饿问题,进而引入条件变量作为解决方案。详细解析了条件变量的初始化、等待、通知等接口函数,并通过"铃铛"类比说明其工作原理。重点阐述了生产者消费者模型的"321原则"(3种关系、2种角色、1个交易场所),强调使用两个独立条件变量分别管理生产者和消费者的必要性。文章提供了阻塞队列的实现代码,包含线程安全的入队出队操作,并解释了while循环检查条件的重要性以避免伪唤醒问题。最后指出该模型通过解耦生产消费过程、平衡忙闲不均来提升效率,是解决多线程并发问题的有效方案。

附源码

阻塞队列

#pragma once

#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>

const static u_int32_t gcap = 5; // For Deubug

template<typename T>
class BlockQueue
{
private:
    bool IsFull()
    {
        return _bq.size() >= _cap;
    }
    bool IsEmpty()
    {
        return _bq.empty();
    }

public:
    BlockQueue(u_int32_t cap = gcap):_cap(cap), _c_wait_num(0), _p_wait_num(0)
    {
        pthread_mutex_init(&_lock, nullptr);
        pthread_cond_init(&_c_cond, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
    }
    void Enqueue(const T &in)
    {
        pthread_mutex_lock(&_lock);
        // 要进行生产, 就一定能够进行生产吗?满足生产条件!!
        while(IsFull())
        {
            // 问题1: 我在进行等待的时候,我可是在临界区里等啊!我可是持有锁的!!!所以,需要自动让线程
            // 释放锁!!!
            // 问题2: 我为什么把自己弄得需要再临界区内部等??先判断队列是否为满(生产条件是否满足)
            // 判断队列是否为满,本身就是访问临界资源!!
            // 判断队列是否为满,必须在临界区内部判断
            // 生产者必须先申请锁,在临界区内部判断
            // 判断为满的结果,需要等待的结构,也一定在临界区内部!
            // 所以,等待的时候,在内部释放锁是必然的!
            // 所以,锁被做到的pthread_cond_wait的参数中!!!
            // pthread_cond_wait函数:
            // 1. 调用失败
            // 2. 伪唤醒情况
            _p_wait_num++;
            pthread_cond_wait(&_p_cond, &_lock);  // 特征1:自动释放锁! 特征2: 自动重新竞争并持有锁 
            _p_wait_num--;
            // 当我们被唤醒的时候,就一定又从这个位置唤醒了!
            // 是在临界区内被唤醒的!!!
        }
        // 不满的
        _bq.push(in); // 完成生产
        if(_c_wait_num > 0)
            pthread_cond_signal(&_c_cond); // 唤醒消费者
        pthread_mutex_unlock(&_lock);
    }
    void Pop(T *out)
    {
        pthread_mutex_lock(&_lock);
        while(IsEmpty())
        {
            _c_wait_num++;
            pthread_cond_wait(&_c_cond, &_lock);
            _c_wait_num--;
        }

        *out = _bq.front();
        _bq.pop();
        if(_p_wait_num>0)
            pthread_cond_signal(&_p_cond); // 唤醒生产者
        pthread_mutex_unlock(&_lock);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_c_cond);
        pthread_cond_destroy(&_p_cond);
    }
private:
    // 临界资源
    std::queue<T> _bq; // blockqueue
    u_int32_t _cap;     // 容量

    pthread_mutex_t _lock;
    pthread_cond_t _c_cond; // 消费者用的cond
    pthread_cond_t _p_cond; // 生产者用的cond

    int _c_wait_num; // 当前消费者等待的个数
    int _p_wait_num; // 当前生产者等待的个数
}; 

条件变量封装

class Cond
{
public:
    Cond()
    {
        pthread_cond_init(&_cond,nullptr);
    }

    void Wait(Mutex &mutex)
    {
        int n = pthread_cond_wait(&_cond,mutex.Get());
    }

    void NotifyOne()
    {
        int n = pthread_cond_signal(&_cond);
        (void)n;
    }

    void NotifyAll()
    {
        int n = pthread_cond_broadcast(&_cond);
        (void)n;
    }

    ~Cond()
    {
        pthread_cond_destroy(&_cond);
    }
private:
    pthread_cond_t _cond;
};

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐