18fde01fee5e4278981004762ce48cc4.png

✨✨ 欢迎大家来到小伞的大讲堂✨✨

🎈🎈养成好习惯,先赞后看哦~🎈🎈

所属专栏:LInux_st
小伞的主页:xiaosan_blog

制作不易!点个赞吧!!谢谢喵!!

1. 线程互斥

1.1 进程线程间的互斥相关概念

  • 临界资源:多线程执行流共享的资源就叫做临界资源
  • 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
  • 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
  • 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成

1.2 互斥量mutex

  • 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
  • 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
  • 多个线程并发的操作共享变量,会带来一些问题。
// 操作共享变量会有问题的售票系统代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>

int ticket = 100;//买一百张票
void *route(void *arg)
{
    char *id = (char *)arg;
    while (1)
    {
        if (ticket > 0)
        {
            usleep(1000);
            printf("%s sells ticket:%d\n", id, ticket);
            ticket--;//最终会导致几人共同强一张票
        }
        else
        {
            break;
        }
    }
}

int main(void)
{
    pthread_t t1, t2, t3, t4;//四个共同抢票
    pthread_create(&t1, NULL, route, "thread 1");
    pthread_create(&t2, NULL, route, "thread 2");
    pthread_create(&t3, NULL, route, "thread 3");
    pthread_create(&t4, NULL, route, "thread 4");
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);
}

操作系统会把

导致最后票数为-1的原因

  • if语句判断条件为真以后,代码可以并发的切换到其他线程
  • usleep这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段
  • --ticket操作本身就不是一个原子操作

--操作并不是原子操作,而是对应三条汇编指令:

  • load:将共享变量ticket从内存加载到寄存器中
  • update:更新寄存器里面的值,执行-l操作
  • store:将新值,从寄存器写回共享变量ticket的内存地址

为了解决这样的问题

  • 代码必须要有互斥行为当代码进入临界区执行时,不允许其他线程进入该临界区
  • 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
  • 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。

要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量。

1.2.1 互斥量的接口

初始化互斥量

初始化互斥量有两种方法:

方法1,静态分配:

thread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER

方法2,动态分配:

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const
pthread_mutexattr_t *restrict attr);

参数:
    mutex:要初始化的互斥量
    attr: NULL

销毁互斥量

销毁互斥量需要注意:

  • 使用PTHREAD_MUTEX_INITIALIZER初始化的互斥量不需要销毁
  • 不要销毁一个已经加锁的互斥量
  • 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);

互斥量加锁和解锁

int pthread_mutex_lock(pthread_mutex_t *mutex) ;

int pthread_mutex_unlock(pthread_mutex_t *mutex);

返回值:成功返回0,失败返回错误号

调用pthread_lock时,可能会遇到以下情况:

  • 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
  • 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。

改进上面的售票系统:

// 操作共享变量会有问题的售票系统代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>

int ticket = 100; // 买一百张票
//pthread_mutex_t mutex;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void *route(void *arg)
{
    char *id = (char *)arg;
    while (1)
    {
        pthread_mutex_lock(&mutex);
        if (ticket > 0)
        {
            usleep(1000);
            printf("%s sells ticket:%d\n", id, ticket);
            ticket--; // 最终会导致几人共同强一张票
            pthread_mutex_unlock(&mutex);
        }
        else
        {
            pthread_mutex_unlock(&mutex);
            break;
        }
    }
}

int main(void)
{
    pthread_t t1, t2, t3, t4; // 四个共同抢票
    
    pthread_create(&t1, NULL, route, "thread 1");
    pthread_create(&t2, NULL, route, "thread 2");
    pthread_create(&t3, NULL, route, "thread 3");
    pthread_create(&t4, NULL, route, "thread 4");

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);
    pthread_mutex_destroy(&mutex);
}

1.3 互斥量实现原理探究

  • 经过上面的例子,大家已经意识到单纯的i++或者++i都不是原子的,有可能会有数据一致性问题
  • 为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。现在我们把lock和unlock的伪代码改一下

1.4 互斥量的封装

Lock.hpp

#pragma once

#include <iostream>
#include <string>
#include <pthread.h>

namespace LockModule
{
    class Mutex
    {
    public:
        // 删除拷贝与赋值
        Mutex(const Mutex &) = delete;
        const Mutex &operator=(const Mutex &) = delete;
        Mutex()
        {
            int n = pthread_mutex_init(&_mutex, nullptr);
            (void)n;
        }

        void Lock()
        {
            int n = pthread_mutex_lock(&_mutex);
            (void)n;
        }

        void Unlock()
        {
            int n = pthread_mutex_unlock(&_mutex);
            (void)n;
        }

        pthread_mutex_t *GetMutexOriginal() // 获取原始指针
        {
            return &_mutex;
        }

        ~Mutex()
        {
            int n = pthread_mutex_destroy(&_mutex);
            (void)n;
        }

    private:
        pthread_mutex_t _mutex;
    };

    class LockGuard
    {
    public:
        LockGuard(Mutex &mutex) : _mutex(mutex)
        {
            _mutex.Lock();
        }

        ~LockGuard()
        {
            _mutex.Unlock();
        }

    private:
        Mutex &_mutex;
    };
}
// 抢票的代码就可以更新成为
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include "Lock.hpp"

using namespace LockModule;
int ticket = 1000;

Mutex mutex;
void *route(void *arg)
{
    char *id = (char *)arg;
    while (1)
    {
        LockGuard lockguard(mutex); // 使⽤RAII⻛格的锁
        if (ticket > 0)
        {
            usleep(1000);
            printf("%s sells ticket:%d\n", id, ticket);
            ticket--;
        }
        else
        {
            break;
        }
    }
    return nullptr;
}
int main(void)
{
    pthread_t t1, t2, t3, t4;
    pthread_create(&t1, NULL, route, (void *)"thread 1");
    pthread_create(&t2, NULL, route, (void *)"thread 2");
    pthread_create(&t3, NULL, route, (void *)"thread 3");
    pthread_create(&t4, NULL, route, (void *)"thread 4");
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);
}

RAII风格的互斥锁,C++11也有,比如:

std::mutex mtx

std::lock_guard<std::mutex> guard(mtx) ;

2. 线程同步

2.1 条件变量

  • 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
  • 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。

2.2 同步概念与竞态条件

  • 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
  • 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解

2.3 条件变量函数

初始化

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t
*restrict attr);
参数:
    cond:要初始化的条件变量
    attr: NULL

销毁

int pthread_cond_destroy(pthread_cond_t *cond)

等待条件满足

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict
mutex);
参数:
    cond:要在这个条件变量上等待
    mutex:互斥量

唤醒等待

int pthread_cond_broadcast(pthread_cond_t *cond) ;
int pthread_cond_signal(pthread_cond_t *cond) ;
  • 我们先使用PTHREAD_COND/MUTEX_INITIALIZER进行测试
  • 然后将接口更改成为使用pthread_cond_init/pthread_cond_destroy的方式,方便后续进行封装
#include <iostream>
#include <string.h>
#include <unistd.h>
#include <pthread.h>

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void *active(void *arg)
{
    std::string name = static_cast<const char *>(arg);
    while (true)
    {
        pthread_mutex_lock(&mutex);
        pthread_cond_wait(&cond, &mutex);//挂起等待唤醒执行
        std::cout << name << "活动..." << std::endl;
        pthread_mutex_unlock(&mutex);
    }
}

int main()
{
    pthread_t t1, t2;
    pthread_create(&t1, NULL, active, (void *)"thread-1");
    pthread_create(&t2, NULL, active, (void *)"thread-2");

    sleep(3);

    while (true)
    {
        pthread_cond_signal(&cond);//唤醒单个线程
        //pthread_cond_broadcast(&cond);//唤醒所有线程
        sleep(1);
    }
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    return 0;
}

唤醒单个线程

唤醒多个线程

2.4 生产消费者模型

2.4.1 321原则
2种角色
  • 自然是生产者和消费者
  • 在计算机中可以看作是两个线程 / 多个线程(因为可能会存在多个生产者/消费者)
1个交易场所
  • 也就是超市充当的角色
  • 在计算机中是一块共享的缓冲区 / 一种数据结构,是两个角色都可以访问到的一块空间一个生成数据或任务,将其放入共享的缓冲区中 ; 一个从共享缓冲区中取出数据或任务,并进行相应的处理
3种关系

生产者之间

首先,因为可能存在多个生产者,自然需要维护生产者和生产者的关系

  • 也就是互斥关系/竞争关系
  • 同一个位置,只能有一个生产者的货物放上去
  • 是不是和之前说的[只能有一个线程访问临界资源]很相似?
  • 所以对应的,我们需要对生产的过程加锁保护

消费者之间和生产者之间的关系类似

  • 如果有多个消费者,他们之间也是互斥/竞争关系
  • 虽然我们平时没有见到在超市里有争夺的现象,但那是因为超市里的资源多
  • 如果全超市只剩下一包泡面,就会抢起来了(因为只有一个人可以拿到那包泡面)
  • 在现实生活中,这样的前提是自然存在的 ; 但计算机中,需要用代码去实现
  • 所以,为了保证一个货物只有一个消费者去消费,就要进行加锁保护噜

生产者和消费者关系

这是最重要的关系了,毕竟可能存在只有一个消费者,一个生产者的情况,那上面两种关系就不存在了

  • 生产者和消费者之间会存在互斥关系
  • 毕竟,你不能在生产者正在生产时,去试图拿取东西吧 (你无法确定此时是否生产完成,自然拿取行为也就是不确定的)反过来也是一样的
  • 所以我们要保证生产和消费的过程是原子的,也就是进行加锁保护(当然,这个已经在上面提到过了)
2.4.2 为何使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

2.4.3 生产者消费者模型优点
  • 解耦
  • 支持并发
  • 支持忙闲不均

2.5 基于BlockingQueue的生产者消费者模型

2.5.1 BlockQueue

在多线程编程中阻塞队列(BlockingQueue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

2.5.2 C++queue模拟阻塞队列的生产消费模型

我们以单生产者,单消费者

BlockQueue.hpp

// 阻塞队列的实现
#pragma once

#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>

const int defaultcap = 5; // for test

template <typename T>
class BlockQueue
{
private:
    bool IsFull() { return _q.size() >= _cap; }
    bool IsEmpty() { return _q.empty(); }

public:
    BlockQueue(int cap = defaultcap)
        : _cap(cap), _csleep_num(0), _psleep_num(0)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_full_cond, nullptr);
        pthread_cond_init(&_empty_cond, nullptr);
    }
    void Equeue(const T &in)
    {
        pthread_mutex_lock(&_mutex);
        // 生产者调用
        while (IsFull())
        {
            // 应该让生产者线程进行等待
            // 重点1:pthread_cond_wait调用成功,挂起当前线程之前,要先自动释放锁!!
            // 重点2:当线程被唤醒的时候,默认就在临界区内唤醒!要从pthread_cond_wait
            // 成功返回,需要当前线程,重新申请_mutex锁!!!
            // 重点3:如果我被唤醒,但是申请锁失败了??我就会在锁上阻塞等待!!!
            _psleep_num++;
            std::cout << "生产者,进入休眠了: _psleep_num" <<  _psleep_num << std::endl;
            // 问题1: pthread_cond_wait是函数吗?有没有可能失败?pthread_cond_wait立即返回了
            // 问题2:pthread_cond_wait可能会因为,条件其实不满足,pthread_cond_wait 伪唤醒
            pthread_cond_wait(&_full_cond, &_mutex);
            _psleep_num--;
        }
        // 100%确定:队列有空间
        _q.push(in);

        // 临时方案
        // v2
        if(_csleep_num>0)
        {
            pthread_cond_signal(&_empty_cond);
            std::cout << "唤醒消费者..." << std::endl;
        }

        // pthread_cond_signal(&_empty_cond); // 可以
        pthread_mutex_unlock(&_mutex); // TODO
        // pthread_cond_signal(&_empty_cond); // 可以
    }
    T Pop()
    {
        // 消费者调用
        pthread_mutex_lock(&_mutex);
        while (IsEmpty())
        {
            _csleep_num++;
            pthread_cond_wait(&_empty_cond, &_mutex);
            _csleep_num--;
        }
        T data = _q.front();
        _q.pop();

        if(_psleep_num > 0)
        {
            pthread_cond_signal(&_full_cond);
            std::cout << "唤醒消费者" << std::endl;
        }

        // pthread_cond_signal(&_full_cond);
        pthread_mutex_unlock(&_mutex);
        return data;
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_full_cond);
        pthread_cond_destroy(&_empty_cond);
    }

private:
    std::queue<T> _q; // 临界资源!!!
    int _cap;         // 容量大小

    pthread_mutex_t _mutex;
    pthread_cond_t _full_cond;
    pthread_cond_t _empty_cond;

    int _csleep_num; // 消费者休眠的个数
    int _psleep_num; // 生产者休眠的个数
};

Task.hpp

#pragma once
#include <iostream>
#include <unistd.h>
#include <functional>

// 任务形式2
// 我们定义了一个任务类型,返回值void,参数为空
using task_t = std::function<void()>;

void Download()
{
    std::cout << "我是一个下载任务..." << std::endl;
    sleep(3); // 假设处理任务比较耗时
}
// 任务形式1
class Task
{
public:
    Task(){}
    Task(int x, int y):_x(x), _y(y)
    {
    }
    void Execute()
    {
        _result = _x + _y;
    }
    int X() { return _x; }
    int Y() { return _y; }
    int Result()
    {
        return _result;
    }
private:
    int _x;
    int _y;
    int _result;
};

Main.cc

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>

void *consumer(void *args)
{
    BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);

    while (true)
    {
        sleep(10);
        // 1. 消费任务
        task_t t = bq->Pop();

        // 2. 处理任务 -- 处理任务的时候,这个任务,已经被拿到线程的上下文中了,不属于队列了
        t();
    }
}

void *productor(void *args)
{
    BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);
    while (true)
    {
        // 1. 获得任务
        // std::cout << "生产了一个任务: " << x << "+" << y << "=?" << std::endl;
        std::cout << "生产了一个任务: " << std::endl;

        // 2. 生产任务
        bq->Equeue(Download);
    }
}

int main()
{
    // 扩展认识: 阻塞队列: 可以放任务吗?
    // 申请阻塞队列
    BlockQueue<task_t> *bq = new BlockQueue<task_t>();

    // 构建生产和消费者
    pthread_t c[2], p[3];

    pthread_create(c, nullptr, consumer, bq);
    pthread_create(c + 1, nullptr, consumer, bq);
    pthread_create(p, nullptr, productor, bq);
    pthread_create(p + 1, nullptr, productor, bq);
    pthread_create(p + 2, nullptr, productor, bq);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);
    pthread_join(p[2], nullptr);

    return 0;
}

// #include "BlockQueue.hpp"
// #include "Task.hpp"
// #include <iostream>
// #include <pthread.h>
// #include <unistd.h>

// void *consumer(void *args)
// {
//     BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);

//     while (true)
//     {
//         sleep(1);
//         Task t = bq->Pop();

//         t.Execute();

//         std::cout << "消费了一个任务: " << t.X() << "+" << t.Y() << "=" << t.Result()  << std::endl;
//     }
// }

// void *productor(void *args)
// {
//     int x = 1;
//     int y = 1;
//     BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
//     while (true)
//     {
//         // sleep(1);
//         std::cout << "生产了一个任务: " << x << "+" << y << "=?" << std::endl;
//         Task t(x, y);
//         bq->Equeue(t);

//         x++, y++;
//     }
// }

// int main()
// {
//     // 扩展认识: 阻塞队列: 可以放任务吗?
//     // 申请阻塞队列
//     BlockQueue<Task> *bq = new BlockQueue<Task>();

//     // 构建生产和消费者
//     pthread_t c, p;

//     pthread_create(&c, nullptr, consumer, bq);
//     pthread_create(&p, nullptr, productor, bq);

//     pthread_join(c, nullptr);
//     pthread_join(p, nullptr);

//     return 0;
// }

Makefile

cp:Main.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f cp

2.6 为什么 pthread_cond_wait需要互斥量?

  • 条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。
  • 条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。

  • 按照上面的说法,我们设计出如下的代码:先上锁,发现条件不满足,解锁,然后等待在条件变量上不就行了,如下代码:
// 错误的设计
pthread_mutex_lock(&mutex);
    while (condition_is_false) {
        pthread_mutex_unlock(&mutex);
        //解锁之后,等待之前,条件可能已经满⾜,信号已经发出,但是该信号可能被错过
        pthread_cond_wait(&cond);
        pthread_mutex_lock(&mutex);
    } 
pthread_mutex_unlock(&mutex);
  • 由于解锁和等待不是原子操作。调用解锁之后,pthread_cond_wait之前,如果已经有其他线程获取到互斥量,摒弃条件满足,发送了信号,那么pthread_cond_wait将错过这个信号,可能会导致线程永远阻塞在这个pthread_cond_wait。所以解锁和等待必须是一个原子操作。
  • int pthread_cond_wait(pthread_cond_ t *cond,pthread_mutex_ t *mutex);进入该函数后,会去看条件量等于0不?等于,就把互斥量变成1,直到cond_wait返回,把条件量改成1,把互斥量恢复成原样。

2.7 条件变量使用规范

等待条件代码

pthread_mutex_lock(&mutex)
while (条件为假)
    pthread_cond_wait(cond, mutex) ;
修改条件
pthread_mutex_unlock(&mutex)

给条件发送信号代码

pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);

2.8 条件变量的封装

Cond.hpp

#pragma once
#include <iostream>
#include <string>
#include <pthread.h>
#include "Lock.hpp"

namespace CondModule
{
    using namespace LockModule;

    class Cond
    {
    public:
        Cond()
        {
            int n = pthread_cond_init(&_cond, nullptr);
            (void)n;
        }

        void Wait(Mutex &mutex)
        {
            int n = pthread_cond_wait(&_cond, mutex.GetMutexOriginal());
            (void)n;
        }

        void Notify()//唤醒单个
        {
            int n = pthread_cond_signal(&_cond);
            (void)n;
        }

        void NotifyAll()//全部唤醒
        {
            int n = pthread_cond_broadcast(&_cond);
            (void)n;
        }

        ~Cond()
        {
            int n = pthread_cond_destroy(&_cond);
            (void)n;
        }

    private:
        pthread_cond_t _cond;
    };
}

Lock.hpp

#pragma once

#include <iostream>
#include <string>
#include <pthread.h>

namespace LockModule
{
    class Mutex
    {
    public:
        // 删除拷贝与赋值
        Mutex(const Mutex &) = delete;
        const Mutex &operator=(const Mutex &) = delete;
        Mutex()
        {
            int n = pthread_mutex_init(&_mutex, nullptr);
            (void)n;
        }

        void Lock()
        {
            int n = pthread_mutex_lock(&_mutex);
            (void)n;
        }

        void Unlock()
        {
            int n = pthread_mutex_unlock(&_mutex);
            (void)n;
        }

        pthread_mutex_t *GetMutexOriginal() // 获取原始指针
        {
            return &_mutex;
        }

        ~Mutex()
        {
            int n = pthread_mutex_destroy(&_mutex);
            (void)n;
        }

    private:
        pthread_mutex_t _mutex;
    };

    class LockGuard
    {
    public:
        LockGuard(Mutex &mutex) : _mutex(mutex)
        {
            _mutex.Lock();
        }

        ~LockGuard()
        {
            _mutex.Unlock();
        }

    private:
        Mutex &_mutex;
    };
}

2.9 POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。但POSIX可以用于线程间同步。

初始化信号量

#include <semaphore.h>
    int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:
    pshared:0表⽰线程间共享,⾮零表⽰进程间共享
    value:信号量初始值

销毁信号量

int sem_destroy(sem_t *sem)

等待信号量

功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()

发布信号量

功能:发布信号量,表⽰资源使⽤完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()

上一节生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序(POSIX信号量):

2.9.1 基于环形队列的生产消费模型

  • 环形队列采用数组模拟,用模运算来模拟环状特性

  • 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态

  • 但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程。
#pragma once

#include <iostream>
#include <vector>
#include "Sem.hpp"
#include "Mutex.hpp"

static const int gcap = 5; // for debug

using namespace SemModule;
using namespace MutexModule;

template <typename T>
class RingQueue
{
public:
    RingQueue(int cap = gcap)
        : _cap(cap),
          _rq(cap),
          _blank_sem(cap),
          _p_step(0),
          _data_sem(0),
          _c_step(0)
    {
    }
    void Equeue(const T &in)
    {
        // 生产者
        // 1. 申请信号量,空位置信号量
        _blank_sem.P();
        {
            LockGuard lockguard(_pmutex);
            // 2. 生产
            _rq[_p_step] = in;
            // 3. 更新下标
            ++_p_step;
            // 4. 维持环形特性
            _p_step %= _cap;
        }
        _data_sem.V();
    }
    void Pop(T *out)
    {
        // 消费者
        // 1. 申请信号量,数据信号量
        _data_sem.P();
        {
            LockGuard lockguard(_cmutex);
            // 2. 消费
            *out = _rq[_c_step];
            // 3. 更新下标
            ++_c_step;
            // 4. 维持环形特性
            _c_step %= _cap;
        }
        _blank_sem.V();
    }

private:
    std::vector<T> _rq;
    int _cap;

    // 生产者
    Sem _blank_sem; // 空位置
    int _p_step;
    // 消费者
    Sem _data_sem; // 数据
    int _c_step;

    // 维护多生产,多消费, 2把锁
    Mutex _cmutex;
    Mutex _pmutex;
};
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐