【C/C++】生产者消费者
生产者-消费者模式看似简单,但正确实现需要对条件变量、互斥锁、原子操作和信号处理都有清晰的理解。关键点包括:使用两个条件变量避免死锁并精确唤醒正确类型的线程,理解 wait 的原子性释放锁和进入等待队列的机制,根据实际需求选择合适的内存序,以及优雅退出时需要唤醒所有等待线程。
生产者-消费者模式详解
概述
生产者-消费者模式是多线程编程中最经典的同步模式之一。它描述了两类线程之间的协作关系:生产者负责生成数据并放入缓冲区,消费者从缓冲区取出数据进行处理。这个模式的核心挑战在于如何安全地协调这两类线程,避免竞态条件,同时保证高效的资源利用。
为什么需要有界队列
在实际应用中,我们通常使用有界队列作为缓冲区。这带来两个关键的同步点:当队列满时,生产者必须等待;当队列空时,消费者必须等待。这种双向的等待机制正是条件变量大显身手的地方。
完整实现
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <csignal>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
// 互斥锁,保护共享队列的访问
std::mutex mu;
// 两个条件变量分别用于不同的等待条件
// cv_prod: 生产者等待"队列未满"
// cv_cons: 消费者等待"队列非空"
std::condition_variable cv_prod, cv_cons;
// 共享的有界队列
std::queue<int> q;
constexpr int cap = 10;
// 原子变量,用于优雅退出
// 使用 atomic 是因为信号处理函数和其他线程都会访问它
std::atomic<bool> shutdown_flag{false};
// 信号处理函数
// 注意:信号处理函数中只能安全地使用 async-signal-safe 的操作
// 写入 atomic<bool> 和 write() 系统调用是安全的
void signal_handler(int sig) {
// 使用 write 而不是 cout,因为 cout 不是 async-signal-safe
const char msg[] = "\nReceived Ctrl+C, shutting down...\n";
write(STDOUT_FILENO, msg, sizeof(msg) - 1);
// 设置退出标志,使用 relaxed 内存序即可
shutdown_flag.store(true, std::memory_order_relaxed);
// 必须唤醒所有等待的线程,让它们检查 shutdown_flag
// 这里用 notify_all 而不是 notify_one
cv_prod.notify_all();
cv_cons.notify_all();
}
void producer() {
while (true) {
std::unique_lock<std::mutex> lk(mu);
// 等待条件:队列未满 或者 收到退出信号
// 必须把 shutdown_flag 加入条件,否则线程可能永远阻塞
cv_prod.wait(lk, [] {
return q.size() < cap || shutdown_flag.load(std::memory_order_relaxed);
});
// 检查是否需要退出
if (shutdown_flag.load(std::memory_order_relaxed)) {
std::cout << std::this_thread::get_id() << " Producer exiting" << std::endl;
return;
}
int x = rand();
std::cout << std::this_thread::get_id() << " Producing " << x << std::endl;
q.push(x);
// 通知一个等待的消费者:队列非空了
cv_cons.notify_one();
}
}
void consumer() {
while (true) {
std::unique_lock<std::mutex> lk(mu);
// 等待条件:队列非空 或者 收到退出信号
cv_cons.wait(lk, [] {
return !q.empty() || shutdown_flag.load(std::memory_order_relaxed);
});
// 检查是否需要退出
// 注意:即使收到退出信号,如果队列还有数据,可以选择继续消费
// 这里选择直接退出,根据业务需求可以调整
if (shutdown_flag.load(std::memory_order_relaxed)) {
std::cout << std::this_thread::get_id() << " Consumer exiting" << std::endl;
return;
}
int x = q.front();
q.pop();
std::cout << std::this_thread::get_id() << " Consuming " << x << std::endl;
// 通知一个等待的生产者:队列有空位了
cv_prod.notify_one();
}
}
int main() {
// 注册信号处理函数
std::signal(SIGINT, signal_handler);
std::vector<std::thread> ts;
int cnt = 5;
for (int i = 0; i < cnt; i++) {
ts.emplace_back(producer);
ts.emplace_back(consumer);
}
// 等待所有线程结束
for (auto &t : ts) {
t.join();
}
std::cout << "All threads finished, goodbye!" << std::endl;
return 0;
}
实现细节
为什么需要两个条件变量
代码中使用了两个条件变量:cv_prod 和 cv_cons。这不仅仅是效率问题,更重要的是避免死锁。
考虑只用一个条件变量的情况:
std::condition_variable cv;
void producer() {
std::unique_lock<std::mutex> lk(mu);
cv.wait(lk, [] { return q.size() < cap; });
q.push(x);
cv.notify_one(); // 想唤醒消费者
}
void consumer() {
std::unique_lock<std::mutex> lk(mu);
cv.wait(lk, [] { return !q.empty(); });
q.pop();
cv.notify_one(); // 想唤醒生产者
}
假设队列已满,有多个生产者在等待。此时一个消费者取走一个元素,调用 notify_one。问题来了:notify_one 唤醒的是等待队列中的任意一个线程,可能是生产者,也可能是另一个消费者。如果唤醒的是消费者,而队列此时为空,这个消费者检查条件失败,又回去睡觉。而那些等待"队列未满"的生产者永远不会被唤醒。
更极端的情况:所有生产者都在等待队列未满,所有消费者都在等待队列非空,每次 notify_one 都唤醒了错误类型的线程,导致所有线程都在睡觉,程序死锁。
使用两个条件变量彻底解决这个问题。生产者只在 cv_prod 上等待,消费者只在 cv_cons 上等待。生产者放入元素后通知 cv_cons,保证唤醒的一定是消费者;消费者取出元素后通知 cv_prod,保证唤醒的一定是生产者。
wait 的工作原理
cv.wait(lk, predicate) 这行代码背后隐藏了相当多的复杂性。它等价于:
while (!predicate()) {
cv.wait(lk);
}
当谓词返回 false 时,wait 会原子性地释放锁并将当前线程放入条件变量的等待队列。这个原子性至关重要:如果先释放锁再进入等待队列,另一个线程可能在这个间隙中修改条件并发出通知,而我们会错过这个通知,导致永久阻塞。
当线程被唤醒时,它首先要重新获取互斥锁。如果锁被其他线程持有,它会在锁的等待队列上阻塞。获取锁之后,wait 返回,线程重新检查谓词。这个循环检查是必要的,因为存在虚假唤醒的可能,而且在多生产者或多消费者的场景下,可能有其他线程抢先处理了数据。
notify_one vs notify_all
正常运行时代码使用 notify_one,因为每次只有一个元素被添加或移除,只需要唤醒一个等待的线程。使用 notify_all 会导致惊群效应:所有等待的线程被唤醒,但只有一个能成功操作,其余的检查条件失败后又回到等待状态,白白浪费了 CPU 时间。
但在信号处理函数中,我们必须使用 notify_all。因为我们需要唤醒所有等待的线程让它们检查 shutdown_flag 并优雅退出。如果只用 notify_one,可能只有一个线程被唤醒并退出,其他线程会永远阻塞。
内存序的选择
代码中对 shutdown_flag 的访问使用了 memory_order_relaxed,而不是默认的 memory_order_seq_cst。这是一个有意的选择。
对于这种简单的退出标志,我们只关心"最终能看到 true",而不关心"看到 true 的时候其他变量处于什么状态"。relaxed 保证原子性和最终可见性,这就足够了。我们不需要 shutdown_flag 和其他内存操作之间有任何顺序关系。
而且条件变量的 wait 和 notify_all 内部已经包含了足够的同步原语。当线程被唤醒并重新获取锁之后,它一定能看到 shutdown_flag 的最新值。
使用 seq_cst 不会导致错误,只是白白付出性能代价。在 x86 架构上差别不大,因为 x86 本身是强内存序模型。但在 ARM 等弱内存序架构上,seq_cst 需要插入额外的内存屏障指令,会有可观的性能开销。
什么时候需要更强的内存序?当你用原子变量来同步其他非原子数据的时候。比如经典的"发布-获取"模式:生产者先写入数据,再用 release 写入标志;消费者用 acquire 读取标志,再读取数据。这样可以保证消费者看到标志为 true 时,一定能看到完整的数据。但在我们的场景中,所有共享数据都由互斥锁保护,不需要原子变量来提供额外的同步。
优雅退出的实现
优雅退出需要解决几个问题:
第一,如何通知所有线程退出?我们使用 std::atomic<bool> 作为退出标志。原子类型保证了多线程环境下的安全访问,无需额外加锁。
第二,如何让阻塞在 wait 上的线程醒来?关键在于修改等待条件,把 shutdown_flag 加入谓词中。这样当 shutdown_flag 变为 true 时,谓词返回 true,线程不再阻塞。同时在信号处理函数中调用 notify_all 立即唤醒所有等待的线程。
第三,信号处理函数中能做什么?信号处理函数的执行环境很特殊,很多操作是不安全的。我们使用 write 系统调用而不是 std::cout 来输出信息,因为 write 是 async-signal-safe 的,而 std::cout 不是。写入 std::atomic<bool> 也是安全的。
关于锁和条件变量的销毁
C++ 标准规定,销毁一个还有线程在等待的条件变量是未定义行为。因此我们必须确保所有线程都已经退出等待状态并完成执行后,才能让条件变量被销毁。这就是为什么 main 函数中要 join 所有线程的原因之一。
总结
生产者-消费者模式看似简单,但正确实现需要对条件变量、互斥锁、原子操作和信号处理都有清晰的理解。关键点包括:使用两个条件变量避免死锁并精确唤醒正确类型的线程,理解 wait 的原子性释放锁和进入等待队列的机制,根据实际需求选择合适的内存序,以及优雅退出时需要唤醒所有等待线程。
更多推荐


所有评论(0)