线程同步与互斥(上)https://blog.csdn.net/Small_entreprene/article/details/146884657?sharetype=blogdetail&sharerId=146884657&sharerefer=PC&sharesource=Small_entreprene&sharefrom=mp_from_link我们学习了互斥,紧接着,我们需要认识何为同步,同步相关的 API,同步的原理。

同步话题

引入新技术,必定引入新的问题,为了进一步解决问题,就必须要有新的技术被引入。

所以对于纯互斥的话,会引出一些问题,为了解决这些问题,也就引出了同步的概念。

互斥引出的新的问题是什么?知道了我们才能理解同步是什么,以及同步的必要性。

有这样的情况:对于超级自习室,我是锁的拥有者,但是我一直想去吃饭,但是又不想放弃这个锁,也就是这个钥匙,一直反复的徘徊,一直在申请锁和释放锁(这一直是我的行为),而且在高频的申请钥匙,没有做有效动作,这就导致其他人得不到钥匙(锁),这种情况我们称为其他线程饥饿问题

管理员:纯互斥?有错吗?是没错的,但是不高效,而且不公平,所以要求但凡将钥匙挂墙上了(释放锁)后,就不能立即申请第二次;所有外边的人,需要进行排队,释放锁的人想要再次拿到钥匙,就需要跑到队列的尾部,进行二次申请。

这样,就可以在保证超级自习室的安全的时候,让所有的执行流,访问临界资源,按照一定的顺序进行访问资源!!这就是线程同步!!!

  • 线程同步解决的是不公平,不高效的问题!
  • 线程互斥解决的是有没有错的问题!

条件变量(cond)

  • 当一个线程互斥地访问某个变量时,它可能发现在其他线程改变状态之前,它什么也做不了。

  • 例如一个线程访问队列时,发现队列为空,它只能等待,直到其他线程将一个节点添加到队列中。这种情况就需要用到条件变量。

  • 也是 pthread 库提供的,C++11也有条件变量。

条件变量的核心是实现线程等待 / 唤醒(让线程在条件不满足时 “休眠”,条件满足时被 “叫醒”)。

同步概念与竞态条件

  • 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。

  • 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解。


下面,我们通过一个故事来了解条件变量:

故事背景:

有一个空盘子,有一批人要玩个游戏:其中,有一个人称为放苹果的人,这个盘子只能放一个苹果,放苹果的人的眼睛是蒙上的,也就是说自己并不清楚当前盘子当中是否有没有苹果,剩下拿苹果的人的眼睛也是蒙上的;

这个盘子是要被放苹果和拿苹果的人所访问的,所以该盘子是临界资源;也是因为是临界资源,所以拿放苹果的人都是要被加锁的,也就是不管是放苹果还是拿苹果的人,都是需要先申请这把锁;

对于放苹果的人,先申请锁,申请成功之后,对盘子进行检测有无苹果,没有就往盘子当中放入苹果,有就退出,再关闭锁,自此放苹果操作就成功了;

对于拿苹果的人,先申请锁,申请成功之后,对盘子进行检测有无苹果,有则拿,无则退,关闭锁,自此拿苹果的操作就成功了;

因为双方都不知道盘子里有没有苹果,所以双方就必须要非常高频的竞争这个锁!!!

因为看不见,但是还是听得见的,为了解决这个低效的问题,我们就引入新的东西 --- 铃铛和队列,队列也就看成是多个有序的凳子。

假设今天放苹果的人申请锁成功,检测到盘子当中没有苹果,放苹果,释放锁,但是,一旦将锁释放了,理论上放苹果的人可以继续申请锁,因为双方都看不到,自己并不清楚这个苹果有没有立马被拿走,所以放苹果的人就会做一件事:在释放锁后,将铃铛敲一下,此时放苹果的人就进入到等待模式中,对于拿苹果的人,去申请锁,假如盘子当中没有苹果,或者申请锁失败,那么这个人就不可以二次申请锁,这个人必须去队列当中排队,当听到铃铛响了,再醒来,再去拿苹果。

我们将铃铛+队列称为条件变量!!!(条件变量就是实现同步的一种做法!!!)

cond 接口

条件变量是多线程编程中用于线程间同步的一种机制,它允许一个或多个线程在某个条件不满足时挂起(等待),直到其他线程通过信号量通知它们条件已满足。

初始化条件变量
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

//可以定义一个全局或者静态的:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

功能:初始化一个条件变量。

参数

  • cond:指向条件变量的指针,该变量将被初始化。

  • attr:指向条件变量属性的对象指针,通常传递 NULL 以使用默认属性。

返回值:成功时返回 0,失败时返回相应的错误码。

全局 or 静态的就直接一行代码就可以了。而且不需要 destroy,但是局部定义的,即使用 pthread_cond_init() 初始换的需要手动调用 pthread_cond_destroy()。


销毁条件变量
int pthread_cond_destroy(pthread_cond_t *cond);

功能:销毁一个已经初始化的条件变量,释放相关资源。

参数

  • cond:指向要销毁的条件变量的指针。

返回值:成功时返回 0,失败时返回相应的错误码。


等待条件满足(条件变量最核心的函数)

对于上面的故事,因为拿苹果的人申请锁失败,或者盘子当中没有苹果的话,就需要进入到等待队列,这些人,也就是这些线程本质就是在条件变量下等,也就是说,条件变量需要为我们提供相应的 API:

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

int pthread_cond_timedwait(pthread_cond_t *restrict cond,
                           pthread_mutex_t *restrict mutex,
                           const struct timespec *restrict abstime);

功能:使当前线程等待直到条件变量被另一个线程唤醒。

参数

  • cond:要等待的条件变量。

  • mutex:一个已经锁定的互斥锁,等待期间将被释放,唤醒后重新获取。

  • abstime:指向 timespec 结构的指针,该结构指定了绝对超时时间。

返回值:成功时返回 0,失败时返回相应的错误码。


唤醒等待的线程

放苹果的人,放好苹果,需要敲响铃铛这个条件变量,唤醒等待该条件变量的线程中的一个或者全部。

int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

功能

  • pthread_cond_signal:唤醒一个等待该条件变量的线程。

  • pthread_cond_broadcast:唤醒所有等待该条件变量的线程。

参数

cond:要发送信号的条件变量。

返回值:成功时返回 0,失败时返回相应的错误码。


使用条件变量的步骤
  1. 初始化条件变量:使用 pthread_cond_init 初始化条件变量。

  2. 锁定互斥锁:在等待条件变量之前,必须先锁定一个互斥锁,以保护条件变量和相关数据。

  3. 等待条件变量:如果条件不满足,调用 pthread_cond_wait 使线程等待。

  4. 检查条件:在 pthread_cond_wait 返回后,重新检查条件是否满足,因为可能有假唤醒(spurious wakeup)。

  5. 唤醒等待的线程:当条件满足时,可以调用 pthread_cond_signalpthread_cond_broadcast 唤醒等待的线程。

  6. 销毁条件变量:在程序结束时,使用 pthread_cond_destroy 销毁条件变量,释放资源。

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

// 共享数据:消息是否就绪
int msg_ready = 0; 
// 互斥锁和条件变量
pthread_mutex_t mutex;
pthread_cond_t cond;

// 消费者线程:等待消息就绪
void* consumer(void* arg) {
    pthread_mutex_lock(&mutex);  // 1. 先锁互斥锁

    // 2. 条件不满足则等待(防止假唤醒用while)
    while (msg_ready == 0) {
        printf("消费者:等待消息...\n");
        pthread_cond_wait(&cond, &mutex);  // 释放锁并等待
    }

    // 3. 被唤醒后,确认条件满足,处理数据
    printf("消费者:收到消息!\n");
    msg_ready = 0;  // 重置条件

    pthread_mutex_unlock(&mutex);  // 解锁
    return NULL;
}

// 生产者线程:生产消息后唤醒消费者
void* producer(void* arg) {
    sleep(1);  // 模拟生产耗时
    pthread_mutex_lock(&mutex);  // 锁互斥锁

    // 4. 满足条件,唤醒等待线程
    msg_ready = 1;  // 更新条件
    printf("生产者:消息已就绪,唤醒消费者\n");
    pthread_cond_signal(&cond);  // 唤醒一个等待线程

    pthread_mutex_unlock(&mutex);  // 解锁
    return NULL;
}

int main() {
    // 初始化锁和条件变量
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init(&cond, NULL);

    pthread_t ctid, ptid;
    pthread_create(&ctid, NULL, consumer, NULL);
    pthread_create(&ptid, NULL, producer, NULL);

    // 等待线程结束
    pthread_join(ctid, NULL);
    pthread_join(ptid, NULL);

    // 销毁资源
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);
    return 0;
}
  1. 消费者先上锁,发现 msg_ready=0(条件不满足),调用 pthread_cond_wait 释放锁并休眠。
  2. 生产者生产完成后,上锁并将 msg_ready 设为 1(条件满足),调用 pthread_cond_signal 唤醒消费者。
  3. 消费者被唤醒后,自动重新上锁,通过 while 再次检查条件(防假唤醒),确认满足后处理数据。

注意:

pthread_cond_wait 中的 “释放锁” 本质上就是临时解锁的过程,但它是一个 “原子操作”,与手动调用 pthread_mutex_unlock 既有联系又有区别。

核心关系:“释放锁” 是 pthread_cond_wait 内部的临时解锁

当线程调用 pthread_cond_wait(&cond, &mutex) 时,会自动完成两个关键操作(这两个操作是原子的,中间不会被其他线程打断):

  1. 临时释放持有的 mutex 锁(相当于执行了一次 pthread_mutex_unlock),让其他线程可以获取锁并修改共享条件。

  2. 将当前线程放入条件变量 cond 的等待队列,进入休眠状态,等待被唤醒。

与手动解锁的区别

场景

手动调用 pthread_mutex_unlock

pthread_cond_wait 内部的 “释放锁”

目的

主动释放锁,让其他线程执行

临时释放锁,同时让线程进入等待状态(必须结合条件变量)

后续操作

释放后线程继续执行自己的逻辑

释放后线程直接休眠,不再执行任何代码,直到被唤醒

原子性

单独的解锁操作,可能与 “等待” 步骤拆分(导致竞态条件)

“释放锁 + 进入等待” 是原子操作,避免竞态条件

为什么需要这种 “内部释放锁”?

假设不用 pthread_cond_wait 的内部释放,而是手动解锁后再等待,会出现致命问题:

// 错误示例:手动解锁后等待,存在竞态条件
pthread_mutex_unlock(&mutex);  // 手动解锁
// 此时其他线程可能已经修改条件并唤醒,但当前线程还没开始等待,导致唤醒信号丢失!
pthread_cond_wait(&cond, &mutex);  // 此时等待已经晚了

而 pthread_cond_wait 的 “释放锁 + 等待” 是原子操作,能确保:线程释放锁的同时立刻进入等待队列,不会错过其他线程的唤醒信号。

唤醒后的锁状态:

当线程被 pthread_cond_signal 或 pthread_cond_broadcast 唤醒后,pthread_cond_wait 会自动做一件事:重新获取 mutex 锁(相当于执行 pthread_mutex_lock),然后才返回。

这意味着:线程从 pthread_cond_wait 返回时,一定是重新持有了锁的,后续可以安全地检查和修改共享条件。

所以:

pthread_cond_wait 中的 “释放锁” 就是临时解锁,但它是与 “进入等待队列” 绑定的原子操作,目的是:

  1. 让其他线程有机会修改条件(必须释放锁);

  2. 避免线程在释放锁后、进入等待前的间隙错过唤醒信号(保证原子性)。

手动解锁是独立的操作,而 pthread_cond_wait 的释放锁是为了配合等待 / 唤醒机制的 “特殊解锁”,二者本质都是释放锁,但应用场景和安全性不同。

注意事项

  • 互斥锁保护:条件变量必须始终与互斥锁一起使用,以保护共享数据的一致性。

  • 假唤醒:有时线程可能会在没有收到信号的情况下被唤醒(假唤醒),因此需要在 pthread_cond_wait 返回后重新检查条件。

  • 资源管理:确保在不再需要条件变量时调用 pthread_cond_destroy 以释放资源。

通过以上步骤和注意事项,可以有效地使用条件变量来实现线程间的同步和协调。

下面,我们利用上面的接口,来简单实现一个demo:

#include <iostream>
#include <vector>
#include <string>
#include <unistd.h>
#include <pthread.h>

#define NUM 5
int cnt = 1000;

pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER; // 定义锁, 为什么一定要有锁??
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;   // 定义条件变量

// 等待是需要等,什么条件才会等呢?票数为0,等待之前,就要对资源的数量进行判定。
// 判定本身就是访问临界资源!,判断一定是在临界区内部的.
// 判定结果,也一定在临界资源内部。所以,条件不满足要休眠,一定是在临界区内休眠的!
// 证明一件事情:条件变量,可以允许线程等待
// 可以允许一个线程唤醒在cond等待的其他线程, 实现同步过程
void *threadrun(void *args)
{
    std::string name = static_cast<const char *>(args);
    while (true)
    {
        pthread_mutex_lock(&glock);
        // 直接让对用的线程进行等待?? 临界资源不满足导致我们等待的!
        pthread_cond_wait(&gcond, &glock); // glock在pthread_cond_wait之前,会被自动释放掉
        std::cout << name << " 计算: " << cnt << std::endl;
        cnt++;
        pthread_mutex_unlock(&glock);
    }
}

int main()
{
    std::vector<pthread_t> threads;
    for (int i = 0; i < NUM; i++)
    {
        pthread_t tid;
        char *name = new char[64];
        snprintf(name, 64, "thread-%d", i);
        int n = pthread_create(&tid, nullptr, threadrun, name);
        if (n != 0)
            continue;
        threads.push_back(tid);
        sleep(1);
    }

    for (auto &id : threads)
    {
        int m = pthread_join(id, nullptr);
        (void)m;
    }

    return 0;
}

pthread_cond_wait(&gcond, &glock);写在了加锁和解锁的内部?

我们举之前抢票的例子,假设我们不让线程抢完退出,而是抢完后进行等待的话,就是抢完了,等待放票,再继续抢,这时候,就需要让线程等。

不过if判断也是临界区资源,等待是需要等,什么条件才会等呢?票数为0,等待之前,就要对资源的数量进行判定。判定本身就是访问临界资源!,判断一定是在临界区内部的,判定结果,也一定在临界资源内部。所以,条件不满足要休眠,一定是在临界区内休眠的!

pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;定义锁, 为什么一定要有锁??  

第一,我们知道 pthread_cond_wait(&gcond, &glock); 这个API是需要传入锁的,所以我们需要定义锁,这是我们的表象。线程在进行等待的时候,而且线程是加了锁的(申请了锁),去cond_wait等待了,锁还被该线程占用,如果单纯是这样,那么其他线程不就没有机会获取到锁,访问临界区资源了嘛?所以在 pthread_cond_wait(&gcond, &glock); 时,我们需要传进来一把锁glock,该锁其实就是该线程申请到的锁,所以应该是会要释放这把glock锁,该线程就在cond条件变量下进行等待,释放锁了,其他线程就可以申请锁了。虽然这还不是为什么一定要有锁的关键,但是,我们知道了一个细节:glock在pthread_cond_wait之前,会被自动释放掉!!!

当线程被唤醒的时候,也是重wait出来继续向后运行的,这就默认了就在临界区当中唤醒,但是之前锁不是被释放了吗?所以该线程要从pthread_cond_wait中成功返回,就需要当前线程重新申请锁,申请成功才算真正的成功,返回真正的唤醒。

如果阻塞的线程被唤醒,但是申请锁失败了,就会在锁上阻塞等待!!! “在锁上阻塞等待”描述的是线程试图获取一个已经被占用的锁时的行为。线程会暂停执行,进入等待状态,直到锁被释放并且它能够成功获取锁。如果线程被唤醒后仍然无法获取锁,它会再次进入阻塞状态,继续等待。这种机制确保了对共享资源的访问是有序的,避免了数据竞争和不一致的问题。

所以:

  • 重点1: 在临界区内休眠,可别将锁一起带去休眠了
  • 重点2: 当线程被唤醒的时候,也是重wait出来继续向后运行的,这就默认了就在临界区当中唤醒,但是之前锁不是被释放了吗?所以该线程要从pthread_cond_wait中成功返回,就需要当前线程重新申请锁
  • 重点3: 如果阻塞的线程被唤醒,但是申请锁失败了,就会在锁上阻塞等待!!!

接下来,我们运行代码就应该会:所有线程等待,后续打印代码不执行:

接下来,我们来唤醒进程(打印对应消息):

int main()
{

    //......
    //......
    
    sleep(3);

    // 每隔1s唤醒一个线程
    while (true)
    {
        std::cout << "唤醒所有线程... " << std::endl;
        pthread_cond_broadcast(&gcond);

        // std::cout << "唤醒一个线程... " << std::endl;
        // pthread_cond_signal(&gcond);
        sleep(1);
    }

    //for(auto....)
    //......

}

 现象:唤醒线程:

证明一件事:条件变量,可以允许线程等待,可以允许一个线程在cond下等待的其他线程。依此,我们后面就可以利用条件变量来实现同步过程。

为了更好的理解条件变量,同步与互斥间的功能,我们需要依附于一个生产者消费者模型。

生产者消费者模型

生产者消费者模型是一种多线程协作的模式。


简单举个生活中的例子:

想象一下超市的运营场景:

  • 生产者:供应商。他们负责向超市提供各种商品,如食品、日用品等。这些供应商就是生产者,他们不断地“生产”商品并将其送入超市的仓库。(超市不是生产者)

  • 消费者:顾客。他们来到超市,挑选并购买商品。这些顾客就是消费者,他们从超市的货架上“消费”商品。

  • 共享缓冲区:超市的货架和仓库(空间)。货架上的商品(内容)是顾客可以直接购买的,而仓库中的商品则是暂时存储,等待被摆放到货架上或直接被顾客购买。

在这个场景中,超市的运营者需要确保货架上的商品充足,以满足顾客的需求;同时,也需要及时补充仓库中的商品,以避免供应商的货物积压。这就需要一个有效的协调机制,确保商品的供应和消费能够平衡进行,这就是生产者消费者模型的应用。(消费者直接找生产者的话,就是效率低,成本高的问题了)

优点

  1. 解耦:生产者和消费者之间不需要直接通信,降低了系统的耦合度。

  2. 支持并发:可以同时有多个生产者和消费者,提高了系统的并发处理能力。

  3. 支持忙闲不均:生产者和消费者的处理速度可以不同,缓冲区可以平衡两者的处理能力。

生产者消费者模型的实现

在计算机科学中,生产者消费者模型通常通过队列来实现。队列作为一种先进先出(FIFO)的数据结构,非常适合用作生产者和消费者之间的缓冲区。生产者输入数据,消费者获取数据,都是要访问这个缓冲区,所以该队列是一个临界资源,因此,在多线程环境中,为了防止多个线程同时访问队列导致数据不一致,通常需要使用互斥锁(mutex)来保护队列的访问。

队列就是一个内存空间;

生产者之间的关系就是竞争关系,也就是互斥关系;(还有空间时,卖!我卖给你,我要挣钱!!!!)

消费者之间的关系就是竞争关系,也就是互斥关系;(只剩下一个面包时,抢!!!我饿啦!!!)

那么生产者和消费者之间又是什么关系?

生产者负责将商品(比如方便面)放到货架上,而消费者则从货架上拿取商品。当生产者正在往货架上放置商品时,消费者不能同时从货架上拿取商品,反之亦然。这是因为货架上的商品数量是有限的,而且在任何时刻,货架上的商品数量必须保持一致,不能出现生产者和消费者同时操作导致商品数量混乱的情况。

例如,你希望一次性购买10包方便面,但货架上只有2包。此时,你只能选择购买这2包,或者等待货架上补充足够的商品。而生产者在补货时,也需要考虑消费者的需求和货架的容量。如果生产者一次性放100包,可能会导致货架空间不够,或者商品积压;但如果生产者只放2包,又可能无法满足消费者的购买需求。

所以生产者往货架上放东西,然后消费者来拿的话,就很容易造成数据不一致问题!!

超市销售员就像两边的桥梁,没有商品了,就联系厂家,没有顾客来了,就联系顾客(推销)。(这样就可以实现高效率,不然顾客来超市发现没货了,但是过几天来也是没货,一直跑一直跑,厂家也不知道还有没有货物,一直不管一直不管,这时候就很低效了!!!)

所以:生产者和消费者之间的关系是互斥关系,同时也是同步关系!!! 

所以生产者消费者模型其实就是同步与互斥的!!! 我们实现对应代码的话,就是要维护上面这三个条件,还有两个角色:生产者和消费者来承担(线程承担),还有一个交易场所:以特定结构构成的“内存”空间。(" 321 "原则!!!)

我们上面的拿苹果的例子就是生产者消费者模型。 我们之前学习的管道通信也是一种生产者消费者模型,只是由一个生产者(一个读端),一个消费者(一个写端)构成。(对于管道通信,管道通信接口的互斥关系不需要维护,操作系统自己就做好了维护,仅此而已)。


在多线程编程中,生产者消费者模型是一个非常经典的同步问题。它描述了两类线程(生产者和消费者)如何通过共享缓冲区(队列)进行通信和数据交换。本文将详细讲解生产者消费者模型的原理、优点以及基于 BlockingQueue 的实现。

为什么要使用生产者消费者模型

生产者消费者模型通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而是通过阻塞队列进行通信。生产者将生产的数据放入队列中,消费者从队列中取出数据进行处理。阻塞队列起到了缓冲作用,平衡了生产者和消费者的处理能力。

生产者消费者模型优点

  • 解耦:生产者和消费者之间不需要直接通信,降低了系统的耦合度。(生产过程和消费过程是解耦的)

  • 支持并发:可以同时有多个生产者和消费者,提高了系统的并发处理能力。(提高效率)

  • 支持忙闲不均:生产者和消费者的处理速度可以不同,阻塞队列可以平衡两者的处理能力。

基于 BlockingQueue 的生产者消费者模型

在多线程编程中,阻塞队列(BlockingQueue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进行操作时会被阻塞)。

也就是说阻塞队列是一个容量具有上限的队列,不满足读写条件的时候,就要进行阻塞对应的线程。  

C++ queue模拟阻塞队列的生产消费模型代码(单生产-单消费)

为了便于理解,我们以单生产者,单消费者,来进行讲解。

单生产 --- 单消费(三个要维护的关系退化成了最后一个:生产者和消费者之间的同步和互斥关系)

// 阻塞队列的实现
#pragma once

#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>

const int defaultcap = 5; // for test

template <class T>
class BlockQueue
{
private:
    bool IsFull()
    {
        return _q.size() >= _cap;
    }

    bool IsEmpty()
    {
        return !_q.size();
    }

public:
    BlockQueue(int cap = defaultcap)
        : _cap(cap), _csleep_num(0), _psleep_num(0)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_full_cond, nullptr);
        pthread_cond_init(&_empty_cond, nullptr);
    }

    void Equeue(const T &in)
    {
        // 下面判断和push都是访问临界资源:因为可能一个线程在入队列的时候,其他的线程在出队列,我们应该加锁
        pthread_mutex_lock(&_mutex); // 加锁
        // 生产者调用
        // if (IsFull())//BUG!!!!!
        while (IsFull()) // 增加代码的健壮性
        {
            // 商品满了,需要等待,不然都没有位置放了
            // 重点1: 在临界区内休眠,可别将锁一起带去休眠了
            // 重点2: 当线程被唤醒的时候,也是重wait出来继续向后运行的,这就默认了就在临界区当中唤醒,但是之前锁不是被释放了吗?所以该线程要从pthread_cond_wait中成功返回,就需要当前线程重新申请锁
            // 重点3: 如果阻塞的线程被唤醒,但是申请锁失败了,就会在锁上阻塞等待!!!
            _psleep_num++;
            pthread_cond_wait(&_full_cond, &_mutex);
            _psleep_num--;
        }
        // 100%是队列有空间了
        _q.push(in);
        // 到这里就一定有数据,这就可以唤醒消费者来消费了

        // 临时方案:后续优化
        if (_csleep_num > 0)
        {
            // 别睡了,快来消费
            pthread_cond_signal(&_empty_cond); // 1: 唤醒是放在解锁之前
            std::cout << "唤醒消费者..." << std::endl;
        }

        pthread_mutex_unlock(&_mutex); // 解锁    TODO

        // pthread_cond_signal(&_empty_cond); // 2: 唤醒是放在解锁之后
    }

    T Pop()
    {
        // 消费者调用
        pthread_mutex_lock(&_mutex);
        while (IsEmpty())
        {
            _csleep_num++;
            pthread_cond_wait(&_empty_cond, &_mutex);
            _csleep_num--;
        }
        T data = _q.front();
        _q.pop();
        // 消费者到这说明已经消费了,就一定有空间,所以可以唤醒生产者进行生产了

        if (_psleep_num > 0)
        {
            // 别睡了,快来生产
            pthread_cond_signal(&_full_cond);
            std::cout << "唤醒生产者..." << std::endl;
        }

        pthread_mutex_unlock(&_mutex);
        return data;
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_full_cond);
        pthread_cond_destroy(&_empty_cond);
    }

private:
    std::queue<T> _q;           // 临界资源
    int _cap;                   // 容量大小
    pthread_mutex_t _mutex;     // 锁
    pthread_cond_t _full_cond;  // 生产者生产满了,就把自己放在条件变量下
    pthread_cond_t _empty_cond; // 消费者消费完了,就把自己放在条件变量下
    int _csleep_num;            // 消费者休眠的个数
    int _psleep_num;            // 生产者休眠的个数
};

实现代码过程中的几个问题:

唤醒代码是在解锁前,还是在解锁后?

对于唤醒放在解锁之前:我们上述图片对应的是唤醒消费者(因为队列有空间了),那么消费者能立即醒来吗?

可以醒来,但是当该线程醒来的时候,申请锁一定会失败,因为下一步的 unlock 就说明了当前在访问临界资源的线程还是占有锁的,导致消费者线程在对应的 pthread_cond_wait(&_empty_cond, &mutex); 中无法成功返回,不过该消费者线程已经不是在 _empty_cond 条件变量下等待了,而是被唤醒,申请锁失败,在锁上阻塞,当锁被解锁的时候,该消费者线程就重新持有锁了。所以,放在前面是可以的!

对于唤醒放在解锁之后?

那就是直接线 unlock 进行解锁后,再唤醒在 _empty_cond 条件变量下的消费者线程,但是万一锁一释放,就别其他线程拿走了呢?这是不需要担心的,因为对应线程已经是被唤醒了的,一样的要么就 wait 返回成功,要么就是 wait 返回不成功,但是,在锁上进行阻塞。所以,放在后面也是可以的!

我是习惯放在前面。


pthread_cond_wait(); 是一个函数,那么该函数调用有没有可能会调用失败?

pthread_cond_wait() 函数调用可能会失败。这个函数的返回值是一个整数,通常在成功时返回 0,失败时返回错误码。调用失败可能是由于多种原因,比如互斥锁或条件变量无效、线程被取消、系统资源不足等。

所以我们对于pthread_cond_wait() 函数调用是需要判断的,否则在类似 Equeue 的接口中,就会线程在队列满的了的情况下,可能出现 wait 失败调用,导致还继续 push 进入到等待队列当中。

上面我们是从函数调用角度谈论的,而且当前的模型是单生产单消费的,那么如果是多生产单消费的话:会有情况:当所有的生产者都在休眠,队列是满的了,当消费者消费了几个字节的空间,且生产者不是被 signal 唤醒,而是通过 broadcast 唤醒的话,就可能会出现:第一个获取到锁的生产者线程 push 了相应的字节,这时候队列又满了,但是其他的线程因为 broadcast,导致全部生产者线程被唤醒,就会继续去获取锁,导致满了的队列继续 push 数据进去。

解决:

//if (IsFull())//BUG!!!!!
while(IsFull)

上面这第二种情况,是被称为pthread_cond_wait() 伪唤醒现象!

在多生产单消费且用 broadcast 唤醒的场景下,while(IsFull()) 能解决队列溢出问题的核心是:被 broadcast 唤醒的所有生产者会竞争同一把互斥锁,先抢到锁的生产者会填满消费者释放的有限空闲空间(队列可能再次变满),而 while 会强制后续被唤醒的生产者重新检查队列是否真的非满 —— 若队列已被前一个生产者填充满,这些线程会再次休眠,不会盲目执行 PushData;反观 if(IsFull()) 是一次性检查,后续被唤醒的生产者不会重新验证条件,会误以为队列仍有空间而强行入队导致溢出,同时 while 还能应对条件变量的虚假唤醒,确保只有队列真的非满时才执行入队操作。


接下来,我们测试一下代码:

Main.cc

#include "BlockQueue.hpp"

void *consumer(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    while (true)
    {
        int data = bq->Pop();
        std::cout << "消费了一个数据: " << data << std::endl;
    }
}

void *productor(void *args)
{
    int data = 1;
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    while (true)
    {
        sleep(1);
        std::cout << "生产了一个数据: " << data << std::endl;
        bq->Equeue(data);
        data++;
    }
}

int main()
{
    // 申请阻塞队列
    BlockQueue<int> *bq = new BlockQueue<int>();

    // 构建生产者和消费者
    pthread_t c, p;

    pthread_create(&c, nullptr, consumer, bq);
    pthread_create(&p, nullptr, productor, bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    return 0;
}

生成一个,唤醒消费者,消费一个:(这里生产者是 sleep,也就是生产的比较慢)

我们让消费者慢一点,只需要在对应位置增减 sleep(1):会直接生产5个,接下来消费一个,生产一个:

阻塞队列的扩展

接下来,我们先来做阻塞队列的扩展:

我们阻塞队列当中只能放入整数吗?可以放入任务吗?(包可以的呀)

阻塞队列不仅仅局限于存储整数,它还可以存储各种类型的数据,包括任务对象。这种灵活性使得阻塞队列在多线程编程中具有广泛的应用场景,例如生产者-消费者模型。通过将任务对象放入阻塞队列,可以实现任务的异步处理和线程间的协作。

Task.hpp

#pragma once
#include <iostream>
#include <unistd.h>
#include <functional>

// 任务形式2
// 定义了一个任务类型,返回值为void,参数为空
using task_t = std::function<void()>;

// 定义一个简单的下载任务
void Download()
{
    std::cout << "我是一个下载任务..." << std::endl;
    sleep(3); // 假设处理任务比较耗时
}

// 任务形式1
// 定义一个任务类,包含任务的输入参数和执行逻辑
class Task
{
public:
    Task() {}
    Task(int x, int y) : _x(x), _y(y)
    {
    }
    // 执行任务,计算结果
    void Execute()
    {
        _result = _x + _y;
    }
    // 获取任务的输入参数
    int X() { return _x; }
    int Y() { return _y; }
    // 获取任务的执行结果
    int Result()
    {
        return _result;
    }

private:
    int _x;
    int _y;
    int _result;
};

任务形式 1 的测试:

#include "BlockQueue.hpp"
#include "Task.hpp"

// 消费者线程函数
void *consumer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    while (true)
    {
        sleep(1); // 模拟消费者处理任务的时间
        Task t = bq->Pop(); // 从阻塞队列中取出任务
        t.Execute(); // 执行任务
        std::cout << "消费了一个任务: " << t.X() << "+" << t.Y() << "=" << t.Result() << std::endl;
    }
}

// 生产者线程函数
void *productor(void *args)
{
    int x = 1;
    int y = 1;
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    while (true)
    {
        // sleep(1); // 可以取消注释来模拟生产者生产任务的速度
        std::cout << "生产了一个任务: " << x << "+" << y << "=" << std::endl;
        Task t(x, y); // 创建一个任务
        bq->Equeue(t); // 将任务放入阻塞队列
        x++;
        y++;
    }
}

int main()
{
    // 申请阻塞队列
    BlockQueue<Task> *bq = new BlockQueue<Task>();

    // 构建生产者和消费者线程
    pthread_t c, p;

    pthread_create(&c, nullptr, consumer, bq);
    pthread_create(&p, nullptr, productor, bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    return 0;
}

测试结果:

任务形式 2 的测试:

#include "BlockQueue.hpp"
#include "Task.hpp"

// 消费者线程函数
void *consumer(void *args)
{
    BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);
    while (true)
    {
        // sleep(1); // 可以取消注释来模拟消费者处理任务的速度
        task_t t = bq->Pop(); // 从阻塞队列中取出任务
        t(); // 执行任务
    }
}

// 生产者线程函数
void *productor(void *args)
{
    BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);
    while (true)
    {
        sleep(1); // 模拟生产者生产任务的速度
        std::cout << "生产了一个任务" << std::endl;
        bq->Equeue(Download); // 将任务放入阻塞队列
    }
}

int main()
{
    // 申请阻塞队列
    BlockQueue<task_t> *bq = new BlockQueue<task_t>();

    // 构建生产者和消费者线程
    pthread_t c, p;

    pthread_create(&c, nullptr, consumer, bq);
    pthread_create(&p, nullptr, productor, bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    return 0;
}

 测试结果:

  • 任务形式1的测试结果:生产者线程会不断地生产任务并将其放入阻塞队列,消费者线程会从队列中取出任务并执行,计算两个整数的和,并输出结果。

  • 任务形式2的测试结果:生产者线程会不断地将Download任务放入阻塞队列,消费者线程会从队列中取出任务并执行,输出“我是一个下载任务...”,并模拟耗时操作。

为什么生产者消费者模型效率比较高?

我们的代码实现中,也就是对于生产者消费者模型中,消费和生产是通过加锁互斥的,是串行的(生产就一个在生产,不能它的也在生产或消费....),怎么说他效率高呢?

其实消费就是从任务队列当中,将任务取出来,这个过程,我们称为消费,但是取出来的任务到自己的线程内部(上下文当中),是还需要对任务进行处理的!假如获取任务是1毫秒(消费),处理任务需要1秒。

所以未来的多消费者来说,我们可以让每一个线程串行的向任务队列进行消费,获取对用的任务,但是每一个消费者线程都可以并发的处理所消费获取的任务,生产者依旧是类似。

我们生产者上产的任务其实是我们自己定义的,未来的生产者的任务其实是从另一个模块来的,比如说网络(上面我们的Download其实是自己随便写的)

C++ queue 模拟阻塞队列的生产消费模型代码(多生产 - 多消费)

基于上面 单生产 - 单消费 的代码,其实我们只需要多维护生产者之间的关系和消费者之间的关系就好了。那么我们上面的代码根本就是不需要更改,因为锁是对所有线程共享的,临界资源在一时间段内自能允许一个线程访问!

测试代码:

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>

void *consumer(void *args)
{
    BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);

    while (true)
    {
        sleep(10);
        // 1. 消费任务
        task_t t = bq->Pop();

        // 2. 处理任务 -- 处理任务的时候,这个任务,已经被拿到线程的上下文中了,不属于队列了
        t();
    }
}

void *productor(void *args)
{
    BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);
    while (true)
    {
        // 1. 获得任务
        std::cout << "生产了一个任务: " << std::endl;

        // 2. 生产任务
        bq->Equeue(Download);
    }
}

int main()
{
    // 扩展认识: 阻塞队列: 可以放任务吗?
    // 申请阻塞队列
    BlockQueue<task_t> *bq = new BlockQueue<task_t>();

    // 构建生产和消费者
    pthread_t c[2], p[3];

    pthread_create(c, nullptr, consumer, bq);
    pthread_create(c + 1, nullptr, consumer, bq);
    pthread_create(p, nullptr, productor, bq);
    pthread_create(p + 1, nullptr, productor, bq);
    pthread_create(p + 2, nullptr, productor, bq);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);
    pthread_join(p[2], nullptr);

    return 0;
}

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐