目录

一:条件变量

二:同步概念与竞态条件

三:条件变量函数

3.1初始化

3.2销毁

3.3等待条件满足

3.4唤醒等待

3.5函数使用样例

四:生产者消费者模型

4.1为何要使用生产者消费者模型

4.2生产者消费者模型优点

五:基于BlockingQueue的生产者消费者模型

六:为什么 pthread_cond_wait 需要互斥量?

七:条件变量使用规范


一:条件变量

当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。

例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。


二:同步概念与竞态条件

同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免 饥饿问题,叫做同步

竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也 不难理解


三:条件变量函数

3.1初始化

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t 
*restrict attr);
参数:
 cond:要初始化的条件变量
 attr:NULL

也可以使用全局变量初始化,和全局初始化锁一样,会自动销毁:

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

3.2销毁

int pthread_cond_destroy(pthread_cond_t *cond)

3.3等待条件满足

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict 
mutex);
 参数:
 cond:要在这个条件变量上等待
 mutex:互斥量

3.4唤醒等待

//广播唤醒:一次唤醒多个线程
int pthread_cond_broadcast(pthread_cond_t *cond);

//一次唤醒一个线程
int pthread_cond_signal(pthread_cond_t *cond);

3.5函数使用样例

#include <iostream>
#include <pthread.h>
#include <string>
#include <cstdio>
#include <unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

//使用全局变量初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void *active(void *args)
{

    std::string name = static_cast<const char *>(args);

    while (true)
    {
        pthread_mutex_lock(&mutex);
        // 判断资源是否准备就绪
        pthread_cond_wait(&cond, &mutex); // 等待资源准备就绪
        printf("%s is active\n", name.c_str());
        pthread_mutex_unlock(&mutex);
    }

    return nullptr;
}

int main()
{
    pthread_t tid1, tid2, tid3;
    pthread_create(&tid1, nullptr, active, (void *)"thread-1");
    pthread_create(&tid2, nullptr, active, (void *)"thread-2");
    pthread_create(&tid3, nullptr, active, (void *)"thread-3");

    sleep(1);
    printf("Main thread ctrl begin...\n");

    while (true)
    {
        printf("main wakeup thread...\n");
        pthread_cond_signal(&cond);
        //pthread_cond_broadcast(&cond);
        sleep(1);
    }

    pthread_join(tid1, nullptr);
    pthread_join(tid2, nullptr);
    pthread_join(tid3, nullptr);

    return 0;
}

一次唤醒多个线程:

一次唤醒一个线程:


四:生产者消费者模型

4.1为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区, 平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

4.2生产者消费者模型优点

1.解耦  2.支持并发  3.支持忙闲不均

生产者:生产数据,向仓库中写入数据(写缓存)

消费者:消费数据,向仓库中拿出数据,使用数据(读缓存)

1.生产者消费者模型中的仓库是生产者和消费者都可以看到的,本质上是共享资源,即临界区,需要被保护

2.我们研究生产者消费者模型,就需要研究多少个生产者,多少个消费者

3.如果是多个生产者,同时想向仓库写入数据,此时多个生产者就要竞争向仓库写入数据的名额,向仓库写入数据时,一次只能让一个生产者写入,否则会发生数据错乱的情况,所以生产者之间是互斥关系

4.如果是多个消费者,和情况3一样,一次只能与有一个消费者消费仓库数据,所以消费者之间是互斥关系

5.那么生产者与消费者之间呢?是互斥和同步的关系

5.1互斥关系:仓库一次只能让一端访问,即消费者消费数据时,生产者不能同时生产数据,否则可能会发生数据错乱,反之同理,所以消费者和生产者之间也是互斥关系,一次只能一端访问仓库

5.2同步关系:同步关系体现在如果仓库数据满了之后,生产者就会发生写堵塞,生产者无法再向仓库写入数据,那什么时候才能写入数据呢?消费者消费一个数据之后,就可以再写入一个数据,所以消费之知道什么时候可以开始生产数据

同理,如果仓库空了,消费者没有数据可读就会发生读阻塞,此时什么消费者可以开始读数据,只有生产者开始向仓库放入数据的时候,所以生产者可以告诉消费者可以开始读数据了

所以生产和消费具有一定的顺序性,此时就是同步

综上,生产者消费者模型可以总结为 "321" 

3:三种关系:生产者与生产者,消费者与消费者,生产者与消费者

2:两种角色:生产者和消费者

1:一个交易场所


五:基于BlockingQueue的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是⼀种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

上述代码适合单生产单消费,同时也适用于多生产多消费,生产者和消费者都看到同一个mutex,竞争同一个资源,竞争到锁之后,如果满足条件就执行操作,不满足就释放锁,然后在cond里面等待条件满足再竞争获得锁继续下面的操作,所以多生产多消费和单生产单消费的代码一样,就是竞争资源的线程数量多少的问题

同时,生产者的线程和消费者的线程谁先被调用不知道,OS自主调度。但是无论谁先调度,最后肯定是生产者先生产,因为开始的时候,队列为空,即使是消费者先调度,也会因为不满足条件,然后释放锁进入条件队列里面等待,等生产者被调度,竞争到锁开始生产之后,才会唤醒消费者消费资源。

此时,会有一个问题,那就是消费者和消费者,生产者和生产者都是互斥关系,在多个存在的情况下,一次只能一个访问资源,而且生产和消费之间也有互斥关系,那么不就是在访问资源的时候,不就是串行吗?如何提高效率呢?

如果只看生产者和消费者访问仓库,那么效率确实不高,但是我们不只是考虑访问仓库这一小部分,还要想生产者怎么生成的数据,消费者拿到数据要干什么?在一个生产者拿到锁向仓库放数据时,其他生产者都可以生产自己那一部分的数据,互不干扰,是并行的;消费者拿到数据之后,就可以释放锁,处理自己的数据,只有一个消费者拿锁访问仓库,其他消费者同步可以处理之前拿到的数据,此时是并行的。

而在生产消费模型中,生产数据和消费数据是主要任务,占比多,是并行工作;在交集场所交易是次要任务,占比少,是串行的,所以综上生产消费模型的效率是很高的。


六:为什么 pthread_cond_wait 需要互斥量?

条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足, 所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好 的通知等待在条件变量上的线程。

条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保 护。没有互斥锁就无法安全的获取和修改共享数据。

按照上面的说法,我们设计出如下的代码:先上锁,发现条件不满足,解锁,然后等待在条件变 量上不就行了,如下代码:

// 错误的设计 
pthread_mutex_lock(&mutex);
while (condition_is_false) {
 pthread_mutex_unlock(&mutex);
 //解锁之后,等待之前,条件可能已经满⾜,信号已经发出,但是该信号可能被错过 
 pthread_cond_wait(&cond, &mutex);
 pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);

由于解锁和等待不是原子操作。调用解锁之后, pthread_cond_wait 之前,如果已经有其他线程获取到互斥量,摒弃条件满足,发送了信号,那么 pthread_cond_wait 将错过这个信号,可能会导致线程永远阻塞在这个 pthread_cond_wait 。所以解锁和等待必须是⼀个原子操作。

int pthread_cond_wait(pthread_cond_ t *cond,pthread_mutex_ t * mutex); 进入该函数后,会去看条件量等于0不?等于,就把互斥量变成1,直到cond_wait返 回,把条件量改成1,把互斥量恢复成原样。


七:条件变量使用规范

等待条件代码:

pthread_mutex_lock(&mutex);

while (条件为假)
     pthread_cond_wait(cond, mutex);

修改条件
pthread_mutex_unlock(&mutex);

给条件发送信号代码:

pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);

条件变量的封装:

namespace CondModule
{
    using namespace MutexModule;
    class Cond
    {
    public:
        Cond()
        {
            int n = ::pthread_cond_init(&_cond,nullptr);
            (void)n;
        }

        void Wait(Mutex &mutex)
        {
            int n= ::pthread_cond_wait(&_cond,mutex.LockPtr());
        }

        void Notify()
        {
            int n = ::pthread_cond_signal(&_cond);
            (void)n;
        }

        void NotifyAll()
        {
            int n = ::pthread_cond_broadcast(&_cond);
            (void)n;
        }

        ~Cond()
        {
            int n = pthread_cond_destroy(&_cond);
        }
    private:
        pthread_cond_t _cond;
    };
}

八:生产消费模型实现代码:

#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>
#include "Mutex.hpp"
#include "Cond.hpp"

static int gcap = 10;

namespace BlockQueueModule
{
    using namespace MutexModule;
    using namespace CondModule;

    template <typename T>
    class BlockQueue
    {
        bool IsFull() { return _q.size() == _cap; }
        bool IsEmpty() { return _q.empty(); }

    public:
        BlockQueue(int cap = gcap) : _cap(cap), _cwait_num(0), _pwait_num(0)
        {}

        // 生产者生产数据
        void Equeue(const T &in)
        {
            LockGuard lock(_mutex);//函数作用域范围内的临时变量
            
            while (IsFull()) // 对条件进行判断,为了防止发生伪判断,进行循环判断
            {
                std::cout << "生产者进行等待" << std::endl;
                _pwait_num++;

                _productor_cond.Wait(_mutex); 

                _pwait_num--;
                
                std::cout << "生产者醒来了" << std::endl;
            }

            // IsFull不满足或者线程醒过来了
            _q.push(in); // 生产

            if (_cwait_num) // 此时肯定有数据,可以唤醒等待队列里面的消费者
            {
                std::cout << "唤醒消费者" << std::endl;
                _consumer_cond.Notify();
            }

            // 出了函数作用域,销毁:释放锁
        }

        // 消费者消费数据
        void Pop(T *out)
        {
            LockGuard lock(_mutex);

            while (IsEmpty())
            {
                std::cout << "消费者进行等待" << std::endl;

                _cwait_num++;
                _consumer_cond.Wait(_mutex); // wait的时候是一定是持有锁的,wait的时候,会释放锁
                _cwait_num--;

                std::cout << "消费者醒来了" << std::endl;
            }

            *out = _q.front();
            _q.pop();

            if (_pwait_num) // 生产者消费数据了,可以让生产者生产了
            {
                std::cout << "唤醒生产者" << std::endl;
               _productor_cond.Notify();
            }
        }

        ~BlockQueue()
        {}

    private:
        std::queue<T> _q;     // 保存数据的容器:临界资源
        int _cap;             // 最大容量
        Mutex _mutex;         // 互斥量
        Cond _productor_cond; // 消费者条件变量
        Cond _consumer_cond;  // 生产者条件变量

        int _cwait_num;
        int _pwait_num;
    };
}

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐