本节重点:

• 深刻理解线程互斥的原理和操作

• 深刻理解线程同步

• 掌握⽣产消费模型

• 设计⽇志和线程池

• 理解线程安全和可重⼊,掌握锁相关概念

1. 线程互斥

1.1 进程线程间的互斥相关背景概念

• 共享资源

• 临界资源:多线程执⾏流被保护的共享的资源就叫做临界资源

• 临界区:每个线程内部,访问临界资源的代码,就叫做临界区

• 互斥:任何时刻,互斥保证有且只有⼀个执⾏流进⼊临界区,访问临界资源,通常对临界资源起保护作⽤

• 原⼦性(后⾯讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成

1.2 互斥量mutex

• ⼤部分情况,线程使⽤的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程⽆法获得这种变量。

• 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。

• 多个线程并发的操作共享变量,会带来⼀些问题。

// 操作共享变量会有问题的售票系统代码 
#include <stdio.h> 
#include <stdlib.h> 
#include <string.h> 
#include <unistd.h> 
#include <pthread.h> 

int ticket = 100; 
void *route(void *arg) 
{ 
    char *id = (char*)arg; 
    while ( 1 ) 
    { 
        if ( ticket > 0 ) 
        { 
            usleep(1000); 
            printf("%s sells ticket:%d\n", id, ticket); 
            ticket--; 
        } 
        else 
        { 
            break; 
        } 
    } 
} 

int main( void ) 
{ 
    pthread_t t1, t2, t3, t4; 
    pthread_create(&t1, NULL, route, (void*)"thread 1"); 
    pthread_create(&t2, NULL, route, (void*)"thread 2"); 
    pthread_create(&t3, NULL, route, (void*)"thread 3"); 
    pthread_create(&t4, NULL, route, (void*)"thread 4"); 
    pthread_join(t1, NULL); 
    pthread_join(t2, NULL); 
    pthread_join(t3, NULL); 
    pthread_join(t4, NULL); 
} 

⼀次执⾏结果: 
thread 4 sells ticket:100 
... 
thread 4 sells ticket:1 
thread 2 sells ticket:0 
thread 1 sells ticket:-1 
thread 3 sells ticket:-2 

为什么可能⽆法获得争取结果?

if 语句判断条件为真以后,代码可以并发的切换到其他线程

usleep 这个模拟漫⻓业务的过程,在这个漫⻓的业务过程中,可能有很多个线程会进⼊该代码段

--ticket 操作本⾝就不是⼀个原⼦操作

取出ticket--部分的汇编代码 
objdump -d a.out > test.objdump 
152 40064b: 8b 05 e3 04 20 00 mov 0x2004e3(%rip),%eax # 
600b34 <ticket> 
153 400651: 83 e8 01 sub $0x1,%eax 
154 400654: 89 05 da 04 20 00 mov %eax,0x2004da(%rip) # 
600b34 <ticket>

-- 操作并不是原⼦操作,⽽是对应三条汇编指令:

load :将共享变量ticket从内存加载到寄存器中

update : 更新寄存器⾥⾯的值,执⾏-1操作

store :将新值,从寄存器写回共享变量ticket的内存地址

要解决以上问题,需要做到三点:

• 代码必须要有互斥⾏为:当代码进⼊临界区执⾏时,不允许其他线程进⼊该临界区。

• 如果多个线程同时要求执⾏临界区的代码,并且临界区没有线程在执⾏,那么只能允许⼀个线程进⼊该临界区。

• 如果线程不在临界区中执⾏,那么该线程不能阻⽌其他线程进⼊临界区。

要做到这三点,本质上就是需要⼀把锁。Linux上提供的这把锁叫互斥量。

互斥量的接⼝

初始化互斥量

初始化互斥量有两种⽅法:

• ⽅法1,静态分配:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER 

• ⽅法2,动态分配:

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const 
pthread_mutexattr_t *restrict attr); 
    参数: 
        mutex:要初始化的互斥量 
        attr:NULL 

销毁互斥量

销毁互斥量需要注意:

• 使⽤ PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁

• 不要销毁⼀个已经加锁的互斥量

• 已经销毁的互斥量,要确保后⾯不会有线程再尝试加锁

int pthread_mutex_destroy(pthread_mutex_t *mutex); 

互斥量加锁和解锁

int pthread_mutex_lock(pthread_mutex_t *mutex); 
int pthread_mutex_unlock(pthread_mutex_t *mutex); 
返回值:成功返回0,失败返回错误号

调⽤ pthread_lock 时,可能会遇到以下情况:

• 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功

• 发起函数调⽤时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调⽤会陷⼊阻塞(执⾏流被挂起),等待互斥量解锁。

改进上⾯的售票系统:

#include <stdio.h> 
#include <stdlib.h> 
#include <string.h> 
#include <unistd.h> 
#include <pthread.h> 
#include <sched.h> 

int ticket = 100; 
pthread_mutex_t mutex; 

void *route(void *arg) 
{ 
    char *id = (char*)arg; 
    while ( 1 )
    { 
        pthread_mutex_lock(&mutex); 
        if ( ticket > 0 ) 
        { 
            usleep(1000); 
            printf("%s sells ticket:%d\n", id, ticket); 
            ticket--; 
            pthread_mutex_unlock(&mutex); 
        } 
        
        else 
        { 
            pthread_mutex_unlock(&mutex); 
            break; 
        } 
    } 
    return nullptr; 
} 

int main( void ) 
{ 
    pthread_t t1, t2, t3, t4; 
    pthread_mutex_init(&mutex, NULL); 
    pthread_create(&t1, NULL, route, (void*)"thread 1"); 
    pthread_create(&t2, NULL, route, (void*)"thread 2"); 
    pthread_create(&t3, NULL, route, (void*)"thread 3"); 
    pthread_create(&t4, NULL, route, (void*)"thread 4"); 
    pthread_join(t1, NULL); 
    pthread_join(t2, NULL); 
    pthread_join(t3, NULL); 
    pthread_join(t4, NULL); 
    pthread_mutex_destroy(&mutex); 
} 

1.3 互斥量实现原理探究

• 经过上⾯的例⼦,⼤家已经意识到单纯的 i++ 或者 ++i 都不是原⼦的,有可能会有数据⼀致性问题

• 为了实现互斥锁操作,⼤多数体系结构都提供了swap或exchange指令,该指令的作⽤是把寄存器和内存单元的数据相交换,由于只有⼀条指令,保证了原⼦性,即使是多处理器平台,访问内存的 总线周期也有先后,⼀个处理器上的交换指令执⾏时另⼀个处理器的交换指令只能等待总线周期。 现在我们把lock和unlock的伪代码改⼀下

1.4 互斥量的封装

Lock.hpp

#pragma once 
#include <iostream> 
#include <string> 
#include <pthread.h> 
namespace LockModule 
{ 
    // 对锁进⾏封装,可以独⽴使⽤ 
    class Mutex 
    { 
    public: 
        // 删除不要的拷⻉和赋值 
        Mutex(const Mutex &) = delete; 
        const Mutex &operator =(const Mutex &) = delete; 
        
        Mutex() 
        { 
            int n = pthread_mutex_init(&_mutex, nullptr); 
            (void)n; 
        } 
        
        void Lock() 
        { 
            int n = pthread_mutex_lock(&_mutex); 
            (void)n; 
        }     
        
        void Unlock() 
        { 
            int n = pthread_mutex_unlock(&_mutex); 
            (void)n; 
        } 
        
        pthread_mutex_t *GetMutexOriginal() // 获取原始指针 
        { 
            return &_mutex; 
        } 
        
        ~Mutex() 
        { 
            int n = pthread_mutex_destroy(&_mutex); 
            (void)n; 
        } 
    private: 
        pthread_mutex_t _mutex; 
    }; 
    
    // 采⽤RAII⻛格,进⾏锁管理 
    class LockGuard 
    { 
    public: 
        LockGuard(Mutex &mutex):_mutex(mutex) 
        { 
            _mutex.Lock(); 
        } 
        
        ~LockGuard() 
        { 
            _mutex.Unlock(); 
        } 
    
    private: 
        Mutex &_mutex; 
    }; 
} 

// 抢票的代码就可以更新成为 
#include <stdio.h> 
#include <stdlib.h> 
#include <string.h> 
#include <unistd.h> 
#include <pthread.h> 
#include "Lock.hpp" 

using namespace LockModule; 

int ticket = 1000; 
Mutex mutex; 

void *route(void *arg) 
{ 
    char *id = (char *)arg; 
    while (1) 
    { 
        LockGuard lockguard(mutex); // 使⽤RAII⻛格的锁 
        if (ticket > 0) 
        { 
            usleep(1000); 
            printf("%s sells ticket:%d\n", id, ticket); 
            ticket--; 
        } 
        
        else 
        { 
            break; 
        } 
    } 
    return nullptr; 
} 

int main(void) 
{ 
    pthread_t t1, t2, t3, t4; 
    pthread_create(&t1, NULL, route, (void*)"thread 1"); 
    pthread_create(&t2, NULL, route, (void*)"thread 2"); 
    pthread_create(&t3, NULL, route, (void*)"thread 3"); 
    pthread_create(&t4, NULL, route, (void*)"thread 4"); 
    pthread_join(t1, NULL); 
    pthread_join(t2, NULL); 
    pthread_join(t3, NULL); 
    pthread_join(t4, NULL); 
} 

📌 RAII⻛格的互斥锁, C++11也有,⽐如:

std::mutex mtx;

std::lock_guard<std::mutex> guard(mtx);

此处我们仅做封装,⽅便后续使⽤,详情⻅C++课程

另外,如果课堂有时间,也可以把我们封装的线程加⼊进来。

2. 线程同步

2.1 条件变量

• 当⼀个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。

• 例如⼀个线程访问队列时,发现队列为空,它只能等待,直到其它线程将⼀个节点添加到队列中。这种情况就需要⽤到条件变量。

2.2 同步概念与竞态条件

• 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从⽽有效避免饥饿问题,叫做同步

• 竞态条件:因为时序问题,⽽导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解

2.3 条件变量函数

初始化

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t* restrict attr); 
参数: 
    cond:要初始化的条件变量 
    attr:NULL 

销毁

int pthread_cond_destroy(pthread_cond_t *cond) 

等待条件满⾜

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex); 
参数: 
    cond:要在这个条件变量上等待 
    mutex:互斥量,后⾯详细解释

唤醒等待

int pthread_cond_broadcast(pthread_cond_t *cond); 
int pthread_cond_signal(pthread_cond_t *cond); 

简单案例:

• 我们先使⽤PTHREAD_COND/MUTEX_INITIALIZER进⾏测试,对其他细节暂不追究

• 然后将接⼝更改成为使⽤ pthread_cond_init/pthread_cond_destroy 的⽅式,⽅便后续进⾏封装

#include <iostream> 
#include <string.h> 
#include <unistd.h> 
#include <pthread.h> 

pthread_cond_t cond = PTHREAD_COND_INITIALIZER; 
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 

void *active( void *arg ) 
{ 
    std::string name = static_cast<const char*>(arg); 
    while (true)
    { 
        pthread_mutex_lock(&mutex); 
        pthread_cond_wait(&cond, &mutex); 
        std::cout << name << " 活动..." << std::endl; 
        pthread_mutex_unlock(&mutex); 
    } 
} 

int main( void ) 
{ 
    pthread_t t1, t2; 
    pthread_create(&t1, NULL, active, (void*)"thread-1"); 
    pthread_create(&t2, NULL, active, (void*)"thread-2"); 
    sleep(3); 
    // 可有可⽆,这⾥确保两个线程已经在运⾏ 
    while(true) 
    { 
        // 对⽐测试 
        // pthread_cond_signal(&cond); // 唤醒⼀个线程 
        pthread_cond_broadcast(&cond); // 唤醒所有线程 
        sleep(1); 
    } 
    
    pthread_join(t1, NULL); 
    pthread_join(t2, NULL); 
} 

$ ./cond 
thread-1 活动... 
thread-2 活动... 
thread-1 活动... 
thread-1 活动... 
thread-2 活动... 

2.4 ⽣产者消费者模型

• 321原则(便于记忆)

2.4.1 为何要使⽤⽣产者消费者模型

⽣产者消费者模式就是通过⼀个容器来解决⽣产者和消费者的强耦合问题。⽣产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取,阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒。这个阻塞队列就是⽤来给⽣产者和消费者解耦的。

2.4.2 ⽣产者消费者模型优点

• 解耦

• ⽀持并发

• ⽀持忙闲不均

2.5 基于BlockingQueue的⽣产者消费者模型

2.5.1 BlockingQueue

在多线程编程中阻塞队列(Blocking Queue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放⼊了元素;当队列满时,往队列⾥存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

2.5.2 C++ queue模拟阻塞队列的⽣产消费模型

代码:

• 为了便于同学们理解,我们以单⽣产者,单消费者,来进⾏讲解。

• 刚开始写,我们采⽤原始接⼝。

• 我们先写单⽣产,单消费。然后改成多⽣产,多消费(这⾥代码其实不变)。

BlockQueue.hpp

#ifndef __BLOCK_QUEUE_HPP__ 
#define __BLOCK_QUEUE_HPP__ 
#include <iostream> 
#include <string> 
#include <queue> 
#include <pthread.h> 

template <typename T> 
class BlockQueue 
{ 
private: 
    bool IsFull() 
    { 
        return _block_queue.size() == _cap; 
    } 
    bool IsEmpty() 
    { 
        return _block_queue.empty(); 
    } 
    
public: 
    BlockQueue(int cap) : _cap(cap) 
    { 
        _productor_wait_num = 0; 
        _consumer_wait_num = 0; 
        pthread_mutex_init(&_mutex, nullptr); 
        pthread_cond_init(&_product_cond, nullptr); 
        pthread_cond_init(&_consum_cond, nullptr); 
    } 
    
    void Enqueue(T &in) // ⽣产者⽤的接⼝ 
    { 
        pthread_mutex_lock(&_mutex); 
        while(IsFull()) // 保证代码的健壮性 
        { 
            // ⽣产线程去等待,是在临界区中休眠的!你现在还持有锁呢!!! 
            // 1. pthread_cond_wait调⽤是: a. 让调⽤线程等待 b. ⾃动释放曾经持有的 
            _mutex锁 c. 当条件满⾜,线程唤醒,pthread_cond_wait要求线性 
            // 必须重新竞争_mutex锁,竞争成功,⽅可返回!!! 
            // 之前:安全 
            _productor_wait_num++; 
            pthread_cond_wait(&_product_cond, &_mutex); // 只要等待,必定会有 
            唤醒,唤醒的时候,就要继续从这个位置向下运⾏!! 
            _productor_wait_num--; 
            // 之后:安全 
        } 
        // 进⾏⽣产 
        // _block_queue.push(std::move(in)); 
        // std::cout << in << std::endl; 
        _block_queue.push(in); 
        // 通知消费者来消费 
        if(_consumer_wait_num > 0) 
        pthread_cond_signal(&_consum_cond); // pthread_cond_broadcast 
        pthread_mutex_unlock(&_mutex); 
    } 
    
    void Pop(T *out) // 消费者⽤的接⼝ --- 5个消费者 
    { 
        pthread_mutex_lock(&_mutex); 
        while(IsEmpty()) // 保证代码的健壮性 
        { 
            // 消费线程去等待,是在临界区中休眠的!你现在还持有锁呢!!! 
            // 1. pthread_cond_wait调⽤是: a. 让调⽤进程等待 b. ⾃动释放曾经持有的 
            _mutex锁 
            _consumer_wait_num++; 
            pthread_cond_wait(&_consum_cond, &_mutex); // 伪唤醒 
            _consumer_wait_num--; 
        } 
    
        // 进⾏消费 
        *out = _block_queue.front(); 
        _block_queue.pop(); 
        // 通知⽣产者来⽣产 
        if(_productor_wait_num > 0) pthread_cond_signal(&_product_cond); 
        pthread_mutex_unlock(&_mutex); 
        // pthread_cond_signal(&_product_cond); 
    } 
    
    ~BlockQueue() 
    { 
        pthread_mutex_destroy(&_mutex); 
        pthread_cond_destroy(&_product_cond); 
        pthread_cond_destroy(&_consum_cond); 
    } 
    
private: 
    std::queue<T> _block_queue; // 阻塞队列,是被整体使⽤的!!! 
    int _cap; // 总上限 
    pthread_mutex_t _mutex; // 保护_block_queue的锁 
    pthread_cond_t _product_cond; // 专⻔给⽣产者提供的条件变量 
    pthread_cond_t _consum_cond; // 专⻔给消费者提供的条件变量 
    int _productor_wait_num; 
    int _consumer_wait_num; 
}; 

注意:这⾥采⽤模版,是想告诉我们,队列中不仅仅可以防⽌内置类型,⽐如int, 对象也可

以作为任务来参与⽣产消费的过程哦.

#pragma once
#include <iostream>
#include <string>
#include <functional>
// 任务类型1
class Task
{
public:
    Task() {}
    Task(int a, int b) : _a(a), _b(b), _result(0)
    {
    }
    void Excute()
    {
        _result = _a + _b;
    }
    std::string ResultToString()
    {
        return std::to_string(_a) + "+" + std::to_string(_b) + "=" +
               std::to_string(_result);
    }
    std::string DebugToString()
    {
        return std::to_string(_a) + "+" + std::to_string(_b) + "=?";
    }

private:
    int _a;
    int _b;
    int _result;
};
//任务类型2
using Task = std::function<void()>;

2.6 为什么 pthread_cond_wait 需要互斥量?

• 条件等待是线程间同步的⼀种⼿段,如果只有⼀个线程,条件不满⾜,⼀直等下去都不会满⾜,所以必须要有⼀个线程通过某些操作,改变共享变量,使原先不满⾜的条件变得满⾜,并且友好的通知等待在条件变量上的线程。

• 条件不会⽆缘⽆故的突然变得满⾜了,必然会牵扯到共享数据的变化。所以⼀定要⽤互斥锁来保护。没有互斥锁就⽆法安全的获取和修改共享数据。

• 按照上⾯的说法,我们设计出如下的代码:先上锁,发现条件不满⾜,解锁,然后等待在条件变量上不就⾏了,如下代码:

// 错误的设计 
pthread_mutex_lock(&mutex); 
while (condition_is_false) 
{ 
    pthread_mutex_unlock(&mutex); 
    //解锁之后,等待之前,条件可能已经满⾜,信号已经发出,但是该信号可能被错过 
    pthread_cond_wait(&cond, &mutex); 
    pthread_mutex_lock(&mutex); 
} 

pthread_mutex_unlock(&mutex); 

• 由于解锁和等待不是原⼦操作。调⽤解锁之后, pthread_cond_wait 之前,如果已经有其他线程获取到互斥量,摒弃条件满⾜,发送了信号,那么 pthread_cond_wait 将错过这个信号,可能会导致线程永远阻塞在这个 pthread_cond_wait 。所以解锁和等待必须是⼀个原⼦操作。

• int pthread_cond_wait(pthread_cond_ t *cond,pthread_mutex_ t * mutex); 进⼊该函数后,会去看条件量等于0不?等于,就把互斥量变成1,直到cond_ wait返回,把条件量改成1,把互斥量恢复成原样。

2.7 条件变量使⽤规范

• 等待条件代码

pthread_mutex_lock(&mutex); 
while (条件为假) //if?? 
    pthread_cond_wait(cond, mutex); 
修改条件 
pthread_mutex_unlock(&mutex);

• 给条件发送信号代码

pthread_mutex_lock(&mutex); 
设置条件为真 
pthread_cond_signal(cond); 
pthread_mutex_unlock(&mutex); 

2.8 条件变量的封装

• 基于上⾯的基本认识,我们已经知道条件变量如何使⽤,虽然细节需要后⾯再来进⾏解释,但这⾥可以做⼀下基本的封装,以备后⽤.

Cond.hpp

#pragma once 
#include <iostream> 
#include <string> 
#include <pthread.h> 
#include "Lock.hpp" 
namespace CondModule 
{ 
    using namespace LockModule; 
    class Cond 
    { 
    public: 
        Cond() 
        { 
            int n = pthread_cond_init(&_cond, nullptr); 
            (void)n; // 酌情加⽇志,加判断 
        } 
        
        void Wait(Mutex &mutex) 
        { 
            int n = pthread_cond_wait(&_cond, mutex.GetMutexOriginal()); 
            (void)n; 
        } 
        
        void Notify() 
        { 
            int n = pthread_cond_signal(&_cond); 
            (void)n; 
        } 
        
        void NotifyAll() 
        { 
            int n = pthread_cond_broadcast(&_cond); 
            (void)n; 
        } 
        
        ~Cond() 
        { 
            int n = pthread_cond_destroy(&_cond); 
            (void)n; // 酌情加⽇志,加判断 
        } 
    
    private: 
        pthread_cond_t _cond; 
    }; 
} 

注意:

为了让条件变量更具有通⽤性,建议封装的时候,不要在Cond类内部引⽤对应的封装互斥

量,要不然后⾯组合的时候,会因为代码耦合的问题难以初始化,因为⼀般⽽⾔Mutex和

Cond基本是⼀起创建的。

2.9 POSIX信号量

POSIX信号量和SystemV信号量作⽤相同,都是⽤于同步操作,达到⽆冲突的访问共享资源⽬的。但POSIX可以⽤于线程间同步。

初始化信号量

#include <semaphore.h> 
int sem_init(sem_t *sem, int pshared, unsigned int value); 
参数: 
    pshared:0表⽰线程间共享,⾮零表⽰进程间共享 
    value:信号量初始值

销毁信号量

int sem_destroy(sem_t *sem); 

等待信号量

功能:等待信号量,会将信号量的值减1 
int sem_wait(sem_t *sem); //P() 

发布信号量

功能:发布信号量,表⽰资源使⽤完毕,可以归还资源了。将信号量值加1。 
int sem_post(sem_t *sem);//V() 

上⼀节⽣产者-消费者的例⼦是基于queue的,其空间可以动态分配,现在基于固定⼤⼩的环形队列重写这个程序(POSIX信号量):

2.9.1 基于环形队列的⽣产消费模型

• 环形队列采⽤数组模拟,⽤模运算来模拟环状特性

• 环形结构起始状态和结束状态都是⼀样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留⼀个空的位置,作为满的状态

• 但是我们现在有信号量这个计数器,就很简单的进⾏多线程间的同步过程。

#pragma once 
#include <iostream> 
#include <semaphore.h> 
// 随⼿做⼀下封装 
class Sem 
{ 
public: 
    Sem(int n) 
    { 
        sem_init(&_sem, 0, n); 
    } 
    
    void P() 
    { 
        sem_wait(&_sem); 
    } 
    
    void V() 
    { 
        sem_post(&_sem); 
    } 
    
    ~Sem() 
    { 
        sem_destroy(&_sem); 
    } 
private: 
    sem_t _sem; 
};

注意:

• 这⾥我们还是忍住,先进⾏原始接⼝的使⽤

• 先单⽣产,单消费,然后改成多⽣产,多消费。

• 关于任务,cond处已经介绍,这⾥就不再重复了。

#pragma once 
#include <iostream> 
#include <string> 
#include <vector> 
#include <semaphore.h> 
#include <pthread.h> 
// 单⽣产,单消费 
// 多⽣产,多消费 
// "321": 
// 3: 三种关系 
// a: ⽣产和消费互斥和同步 
// b: ⽣产者之间: 
// c: 消费者之间: 
// 解决⽅案:加锁 
// 1. 需要⼏把锁?2把 
// 2. 如何加锁? 

template<typename T> 
class RingQueue 
{ 
private: 
    void Lock(pthread_mutex_t &mutex) 
    { 
        pthread_mutex_lock(&mutex); 
    } 
    void Unlock(pthread_mutex_t &mutex) 
    { 
        pthread_mutex_unlock(&mutex); 
    } 

public: 
    RingQueue(int cap) 
        : _ring_queue(cap), 
        _cap(cap), 
        _room_sem(cap), 
        _data_sem(0), 
        _productor_step(0), 
        _consumer_step(0) 
    { 
        pthread_mutex_init(&_productor_mutex, nullptr); 
        pthread_mutex_init(&_consumer_mutex, nullptr); 
    } 
    
    void Enqueue(const T &in) 
    { 
        // ⽣产⾏为 
        _room_sem.P(); 
        Lock(_productor_mutex); 
        
        // ⼀定有空间!!! 
        _ring_queue[_productor_step++] = in; // ⽣产 
        _productor_step %= _cap; 
        Unlock(_productor_mutex); 
        _data_sem.V(); 
    } 
    
    void Pop(T *out) 
    { 
        // 消费⾏为 
        _data_sem.P(); 
        Lock(_consumer_mutex); 
        *out = _ring_queue[_consumer_step++]; 
        _consumer_step %= _cap; 
        Unlock(_consumer_mutex); 
        _room_sem.V(); 
    } 
    
    ~RingQueue() 
    { 
        pthread_mutex_destroy(&_productor_mutex); 
        pthread_mutex_destroy(&_consumer_mutex); 
    } 
    
private: 
    // 1. 环形队列 
    std::vector<T> _ring_queue; 
    int _cap; // 环形队列的容量上限 
    
    // 2. ⽣产和消费的下标 
    int _productor_step; 
    int _consumer_step; 
    
    // 3. 定义信号量 
    Sem _room_sem; // ⽣产者关⼼ 
    Sem _data_sem; // 消费者关⼼ 
    
    // 4. 定义锁,维护多⽣产多消费之间的互斥关系 
    pthread_mutex_t _productor_mutex; 
    pthread_mutex_t _consumer_mutex; 
}; 

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐