生产者-消费者模式详解

概述

生产者-消费者模式是多线程编程中最经典的同步模式之一。它描述了两类线程之间的协作关系:生产者负责生成数据并放入缓冲区,消费者从缓冲区取出数据进行处理。这个模式的核心挑战在于如何安全地协调这两类线程,避免竞态条件,同时保证高效的资源利用。

为什么需要有界队列

在实际应用中,我们通常使用有界队列作为缓冲区。这带来两个关键的同步点:当队列满时,生产者必须等待;当队列空时,消费者必须等待。这种双向的等待机制正是条件变量大显身手的地方。

完整实现

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <csignal>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

// 互斥锁,保护共享队列的访问
std::mutex mu;

// 两个条件变量分别用于不同的等待条件
// cv_prod: 生产者等待"队列未满"
// cv_cons: 消费者等待"队列非空"
std::condition_variable cv_prod, cv_cons;

// 共享的有界队列
std::queue<int> q;
constexpr int cap = 10;

// 原子变量,用于优雅退出
// 使用 atomic 是因为信号处理函数和其他线程都会访问它
std::atomic<bool> shutdown_flag{false};

// 信号处理函数
// 注意:信号处理函数中只能安全地使用 async-signal-safe 的操作
// 写入 atomic<bool> 和 write() 系统调用是安全的
void signal_handler(int sig) {
    // 使用 write 而不是 cout,因为 cout 不是 async-signal-safe
    const char msg[] = "\nReceived Ctrl+C, shutting down...\n";
    write(STDOUT_FILENO, msg, sizeof(msg) - 1);
    
    // 设置退出标志,使用 relaxed 内存序即可
    shutdown_flag.store(true, std::memory_order_relaxed);
    
    // 必须唤醒所有等待的线程,让它们检查 shutdown_flag
    // 这里用 notify_all 而不是 notify_one
    cv_prod.notify_all();
    cv_cons.notify_all();
}

void producer() {
    while (true) {
        std::unique_lock<std::mutex> lk(mu);
        
        // 等待条件:队列未满 或者 收到退出信号
        // 必须把 shutdown_flag 加入条件,否则线程可能永远阻塞
        cv_prod.wait(lk, [] { 
            return q.size() < cap || shutdown_flag.load(std::memory_order_relaxed); 
        });
        
        // 检查是否需要退出
        if (shutdown_flag.load(std::memory_order_relaxed)) {
            std::cout << std::this_thread::get_id() << " Producer exiting" << std::endl;
            return;
        }
        
        int x = rand();
        std::cout << std::this_thread::get_id() << " Producing " << x << std::endl;
        q.push(x);
        
        // 通知一个等待的消费者:队列非空了
        cv_cons.notify_one();
    }
}

void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lk(mu);
        
        // 等待条件:队列非空 或者 收到退出信号
        cv_cons.wait(lk, [] { 
            return !q.empty() || shutdown_flag.load(std::memory_order_relaxed); 
        });
        
        // 检查是否需要退出
        // 注意:即使收到退出信号,如果队列还有数据,可以选择继续消费
        // 这里选择直接退出,根据业务需求可以调整
        if (shutdown_flag.load(std::memory_order_relaxed)) {
            std::cout << std::this_thread::get_id() << " Consumer exiting" << std::endl;
            return;
        }
        
        int x = q.front();
        q.pop();
        std::cout << std::this_thread::get_id() << " Consuming " << x << std::endl;
        
        // 通知一个等待的生产者:队列有空位了
        cv_prod.notify_one();
    }
}

int main() {
    // 注册信号处理函数
    std::signal(SIGINT, signal_handler);
    
    std::vector<std::thread> ts;
    int cnt = 5;
    
    for (int i = 0; i < cnt; i++) {
        ts.emplace_back(producer);
        ts.emplace_back(consumer);
    }

    // 等待所有线程结束
    for (auto &t : ts) {
        t.join();
    }
    
    std::cout << "All threads finished, goodbye!" << std::endl;
    return 0;
}

实现细节

为什么需要两个条件变量

代码中使用了两个条件变量:cv_prodcv_cons。这不仅仅是效率问题,更重要的是避免死锁。

考虑只用一个条件变量的情况:

std::condition_variable cv;

void producer() {
    std::unique_lock<std::mutex> lk(mu);
    cv.wait(lk, [] { return q.size() < cap; });
    q.push(x);
    cv.notify_one();  // 想唤醒消费者
}

void consumer() {
    std::unique_lock<std::mutex> lk(mu);
    cv.wait(lk, [] { return !q.empty(); });
    q.pop();
    cv.notify_one();  // 想唤醒生产者
}

假设队列已满,有多个生产者在等待。此时一个消费者取走一个元素,调用 notify_one。问题来了:notify_one 唤醒的是等待队列中的任意一个线程,可能是生产者,也可能是另一个消费者。如果唤醒的是消费者,而队列此时为空,这个消费者检查条件失败,又回去睡觉。而那些等待"队列未满"的生产者永远不会被唤醒。

更极端的情况:所有生产者都在等待队列未满,所有消费者都在等待队列非空,每次 notify_one 都唤醒了错误类型的线程,导致所有线程都在睡觉,程序死锁。

使用两个条件变量彻底解决这个问题。生产者只在 cv_prod 上等待,消费者只在 cv_cons 上等待。生产者放入元素后通知 cv_cons,保证唤醒的一定是消费者;消费者取出元素后通知 cv_prod,保证唤醒的一定是生产者。

wait 的工作原理

cv.wait(lk, predicate) 这行代码背后隐藏了相当多的复杂性。它等价于:

while (!predicate()) {
    cv.wait(lk);
}

当谓词返回 false 时,wait 会原子性地释放锁并将当前线程放入条件变量的等待队列。这个原子性至关重要:如果先释放锁再进入等待队列,另一个线程可能在这个间隙中修改条件并发出通知,而我们会错过这个通知,导致永久阻塞。

当线程被唤醒时,它首先要重新获取互斥锁。如果锁被其他线程持有,它会在锁的等待队列上阻塞。获取锁之后,wait 返回,线程重新检查谓词。这个循环检查是必要的,因为存在虚假唤醒的可能,而且在多生产者或多消费者的场景下,可能有其他线程抢先处理了数据。

notify_one vs notify_all

正常运行时代码使用 notify_one,因为每次只有一个元素被添加或移除,只需要唤醒一个等待的线程。使用 notify_all 会导致惊群效应:所有等待的线程被唤醒,但只有一个能成功操作,其余的检查条件失败后又回到等待状态,白白浪费了 CPU 时间。

但在信号处理函数中,我们必须使用 notify_all。因为我们需要唤醒所有等待的线程让它们检查 shutdown_flag 并优雅退出。如果只用 notify_one,可能只有一个线程被唤醒并退出,其他线程会永远阻塞。

内存序的选择

代码中对 shutdown_flag 的访问使用了 memory_order_relaxed,而不是默认的 memory_order_seq_cst。这是一个有意的选择。

对于这种简单的退出标志,我们只关心"最终能看到 true",而不关心"看到 true 的时候其他变量处于什么状态"。relaxed 保证原子性和最终可见性,这就足够了。我们不需要 shutdown_flag 和其他内存操作之间有任何顺序关系。

而且条件变量的 waitnotify_all 内部已经包含了足够的同步原语。当线程被唤醒并重新获取锁之后,它一定能看到 shutdown_flag 的最新值。

使用 seq_cst 不会导致错误,只是白白付出性能代价。在 x86 架构上差别不大,因为 x86 本身是强内存序模型。但在 ARM 等弱内存序架构上,seq_cst 需要插入额外的内存屏障指令,会有可观的性能开销。

什么时候需要更强的内存序?当你用原子变量来同步其他非原子数据的时候。比如经典的"发布-获取"模式:生产者先写入数据,再用 release 写入标志;消费者用 acquire 读取标志,再读取数据。这样可以保证消费者看到标志为 true 时,一定能看到完整的数据。但在我们的场景中,所有共享数据都由互斥锁保护,不需要原子变量来提供额外的同步。

优雅退出的实现

优雅退出需要解决几个问题:

第一,如何通知所有线程退出?我们使用 std::atomic<bool> 作为退出标志。原子类型保证了多线程环境下的安全访问,无需额外加锁。

第二,如何让阻塞在 wait 上的线程醒来?关键在于修改等待条件,把 shutdown_flag 加入谓词中。这样当 shutdown_flag 变为 true 时,谓词返回 true,线程不再阻塞。同时在信号处理函数中调用 notify_all 立即唤醒所有等待的线程。

第三,信号处理函数中能做什么?信号处理函数的执行环境很特殊,很多操作是不安全的。我们使用 write 系统调用而不是 std::cout 来输出信息,因为 write 是 async-signal-safe 的,而 std::cout 不是。写入 std::atomic<bool> 也是安全的。

关于锁和条件变量的销毁

C++ 标准规定,销毁一个还有线程在等待的条件变量是未定义行为。因此我们必须确保所有线程都已经退出等待状态并完成执行后,才能让条件变量被销毁。这就是为什么 main 函数中要 join 所有线程的原因之一。

总结

生产者-消费者模式看似简单,但正确实现需要对条件变量、互斥锁、原子操作和信号处理都有清晰的理解。关键点包括:使用两个条件变量避免死锁并精确唤醒正确类型的线程,理解 wait 的原子性释放锁和进入等待队列的机制,根据实际需求选择合适的内存序,以及优雅退出时需要唤醒所有等待线程。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐