C++并发编程指南12 等待事件或条件
本文摘要:C++条件变量是多线程编程中实现线程同步的重要工具。文章通过火车旅行的类比,对比了忙等待、休眠等待和条件变量三种等待方式的优劣。详细介绍了条件变量的核心组件(std::condition_variable、std::mutex等)和关键成员函数(wait、notify_one、notify_all),并提供了完整的使用示例代码。重点讲解了虚假唤醒、锁管理等关键概念,最后给出了条件变量的最
文章目录
4.1 等待事件或条件
想象一下你在夜间火车上旅行,需要在特定站点下车。你有几种选择:
-
整夜保持清醒,每停一站都检查(消耗精力)
-
设置闹钟提前醒来(可能太早或错过站点)
-
理想方式:有人在你到站时叫醒你
在多线程编程中,我们面临类似问题。当线程需要等待某个条件时(如数据准备好),有几种方法:
-
忙等待(Busy Waiting):不断检查条件标志
while(!flag) { /* 空循环 */ }
这就像整夜保持清醒 - 浪费 CPU 资源
-
休眠等待(Sleep and Check):周期性休眠后检查
while(!flag) { std::this_thread::sleep_for(100ms); }
这像设置闹钟 - 减少了资源消耗但难以确定最佳休眠时间
-
条件变量(Condition Variables):最理想的方式
std::condition_variable cv; // 等待线程 cv.wait(lock, []{ return flag; }); // 通知线程 flag = true; cv.notify_one();
这就像有人在你到站时叫醒你 - 高效且准确
C++ 条件变量详解
什么是条件变量?
条件变量是 C++ 多线程编程中用于线程间同步的重要工具,它允许一个或多个线程等待某个特定条件成立后再继续执行。条件变量总是与互斥量配合使用,共同解决复杂的线程同步问题。
核心组件
1. std::condition_variable
定义在 <condition_variable>
头文件中,是最常用的条件变量类型。它必须与 std::mutex
配合使用。
2. std::condition_variable_any
更通用的条件变量类型,可以与任何满足互斥量基本要求的锁类型配合使用,但性能开销通常更大。
3. std::mutex
互斥量,用于保护共享数据,防止多线程同时访问。
4. std::unique_lock
比 std::lock_guard
更灵活的锁管理类,允许在需要时解锁和重新加锁。
关键成员函数
1. wait()
- 等待条件成立
void wait(std::unique_lock<std::mutex>& lock);
-
调用前必须持有锁
-
调用时会自动释放锁,并将线程置于等待状态
-
被唤醒后会重新获取锁
template <class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
-
带条件的等待,等价于:
while (!pred()) { wait(lock); }
-
防止虚假唤醒(spurious wakeup)
2. notify_one()
- 通知一个等待线程
void notify_one() noexcept;
-
唤醒一个正在等待此条件变量的线程
-
如果没有线程在等待,则什么也不做
3. notify_all()
- 通知所有等待线程
void notify_all() noexcept;
-
唤醒所有正在等待此条件变量的线程
-
如果没有线程在等待,则什么也不做
辅助函数
std::this_thread::sleep_for()
template <class Rep, class Period>
void sleep_for(const std::chrono::duration<Rep, Period>& sleep_duration);
-
使当前线程休眠指定时长
-
在条件变量出现前常用于轮询场景
完整使用流程
等待线程(消费者)
std::unique_lock<std::mutex> lock(mutex);
condition_variable.wait(lock, [&]{ return condition_met; });
// 条件满足后继续执行
通知线程(生产者)
{
std::lock_guard<std::mutex> lock(mutex);
// 修改共享数据
condition_met = true;
}
condition_variable.notify_one(); // 或 notify_all()
关键概念解释
1. 虚假唤醒(Spurious Wakeup)
-
条件变量可能在没有收到通知的情况下自行唤醒线程
-
使用带谓词的
wait()
可以避免此问题 -
谓词应该检查实际业务条件
2. 锁的管理
-
wait()
调用前必须持有锁 -
wait()
会在等待期间释放锁 -
被唤醒后会自动重新获取锁
3. 通知机制
-
notify_one()
:唤醒一个等待线程(不确定哪个) -
notify_all()
:唤醒所有等待线程 -
通知应在锁外发出以提高性能
完整示例代码
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
void worker_thread()
{
// 等待主线程的信号
std::unique_lock<std::mutex> lock(mtx);
std::cout << "工作线程: 等待主线程信号..." << std::endl;
cv.wait(lock, []{ return ready; });
// 条件满足后继续工作
std::cout << "工作线程: 收到信号,开始工作..." << std::endl;
}
int main()
{
std::thread worker(worker_thread);
// 模拟主线程准备工作
std::this_thread::sleep_for(std::chrono::seconds(2));
{
std::lock_guard<std::mutex> lock(mtx);
std::cout << "主线程: 准备工作完成..." << std::endl;
ready = true;
}
cv.notify_one(); // 通知工作线程
worker.join();
std::cout << "主线程: 工作线程已完成任务" << std::endl;
return 0;
}
输出示例
工作线程: 等待主线程信号...
主线程: 准备工作完成...
工作线程: 收到信号,开始工作...
主线程: 工作线程已完成任务
最佳实践
-
**总是使用带谓词的
wait()
** - 防止虚假唤醒 -
在锁外发送通知 - 提高性能
-
**优先使用
notify_one()
** - 除非需要唤醒所有线程 -
避免长时间持有锁 - 尤其是在调用外部代码时
-
使用 RAII 管理锁 - 确保异常安全
条件变量是多线程编程中协调线程活动的强大工具,正确使用可以构建高效、响应性强的并发系统。
1 等待条件达成
C++ 提供了两种条件变量:
-
std::condition_variable
(配合std::mutex
使用) -
std::condition_variable_any
(配合任何互斥量使用)
下面是完整的使用示例:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>
#include <random>
// 模拟数据块
struct DataChunk {
int id;
explicit DataChunk(int id) : id(id) {}
};
std::mutex mut;
std::queue<DataChunk> data_queue;
std::condition_variable data_cond;
// 数据准备线程
void data_preparation_thread() {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(500, 2000);
for (int i = 1; i <= 5; ++i) {
// 模拟数据准备时间
std::this_thread::sleep_for(std::chrono::milliseconds(dis(gen)));
DataChunk data(i);
{
std::lock_guard<std::mutex> lk(mut);
std::cout << "准备数据: " << i << std::endl;
data_queue.push(data);
}
data_cond.notify_one(); // 通知一个等待线程
}
}
// 数据处理线程
void data_processing_thread() {
int processed_count = 0;
while (processed_count < 5) {
std::unique_lock<std::mutex> lk(mut);
// 等待条件:队列不为空
data_cond.wait(lk, []{
return !data_queue.empty();
});
// 条件满足,处理数据
DataChunk data = data_queue.front();
data_queue.pop();
lk.unlock(); // 提前释放锁
// 模拟数据处理
std::cout << "处理数据: " << data.id << std::endl;
++processed_count;
}
}
int main() {
std::thread producer(data_preparation_thread);
std::thread consumer(data_processing_thread);
producer.join();
consumer.join();
std::cout << "所有数据处理完成!" << std::endl;
return 0;
}
代码说明:
-
数据准备线程随机休眠(模拟工作),然后准备数据并推入队列
-
数据处理线程等待队列不为空的条件
-
data_cond.wait()
会自动解锁互斥量并进入等待状态 -
当数据准备好时,准备线程调用
notify_one()
唤醒处理线程 -
处理线程被唤醒后重新获取锁并检查条件
-
条件满足则处理数据,否则继续等待
2 构建线程安全队列
下面是完整的线程安全队列实现:
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
template<typename T>
class ThreadSafeQueue {
private:
mutable std::mutex mut; // 可变互斥量(用于const方法)
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
ThreadSafeQueue() = default;
// 禁止赋值操作
ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete;
// 推入数据
void push(T value) {
std::lock_guard<std::mutex> lk(mut);
data_queue.push(std::move(value));
data_cond.notify_one(); // 通知一个等待线程
}
// 等待并弹出数据(引用版本)
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{ return !data_queue.empty(); });
value = std::move(data_queue.front());
data_queue.pop();
}
// 等待并弹出数据(智能指针版本)
std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{ return !data_queue.empty(); });
std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}
// 尝试弹出数据(引用版本)
bool try_pop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty()) return false;
value = std::move(data_queue.front());
data_queue.pop();
return true;
}
// 尝试弹出数据(智能指针版本)
std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty()) return nullptr;
std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}
// 检查队列是否为空
bool empty() const {
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
// 获取队列大小
size_t size() const {
std::lock_guard<std::mutex> lk(mut);
return data_queue.size();
}
};
// 使用示例
int main() {
ThreadSafeQueue<int> queue;
// 生产者线程
std::thread producer([&]{
for (int i = 0; i < 10; ++i) {
queue.push(i);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
// 消费者线程
std::thread consumer([&]{
for (int i = 0; i < 10; ++i) {
int value;
queue.wait_and_pop(value);
std::cout << "处理值: " << value << std::endl;
}
});
producer.join();
consumer.join();
return 0;
}
关键设计点:
-
使用
std::unique_lock
配合条件变量,允许在等待时解锁 -
提供两种弹出方式:
-
wait_and_pop()
:阻塞直到数据可用 -
try_pop()
:立即返回(成功或失败)
-
-
两种返回值形式:
-
通过引用参数返回
-
通过智能指针返回(避免拷贝开销)
-
-
互斥量声明为
mutable
,允许在const
方法中加锁 -
禁止赋值操作,防止意外共享状态
使用场景:
-
生产者-消费者模式
-
线程池任务队列
-
任何需要线程间安全传递数据的场景
条件变量是多线程编程中协调线程活动的强大工具,能有效避免忙等待和休眠时间难以确定的问题,是实现高效并发系统的关键组件。
3.更加完善的生产者消费者模型 代码实现
上面的代码没有关注队列满了的情况,所以生产者就没有考虑 队列满了的情况,也就不需要条件等待,下面是一个更加完善的代码例子
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <iostream>
#include <thread>
#include <sstream>
// 获取当前时间的字符串表示
#include <chrono>
#include <iomanip>
#include <sstream>
#include <mutex>
std::string current_time() {
auto now = std::chrono::system_clock::now();
auto now_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now);
auto epoch = now_ms.time_since_epoch();
auto value = std::chrono::duration_cast<std::chrono::milliseconds>(epoch);
auto time = std::chrono::system_clock::to_time_t(now);
// 线程安全的本地时间转换
static std::mutex mtx;
std::tm tm_time;
{
std::lock_guard<std::mutex> lock(mtx);
tm_time = *std::localtime(&time);
}
std::stringstream ss;
ss << std::put_time(&tm_time, "%H:%M:%S")
<< '.' << std::setfill('0') << std::setw(3) << (value.count() % 1000);
return ss.str();
}
// 获取线程ID的字符串表示
std::string thread_id() {
std::stringstream ss;
ss << std::this_thread::get_id();
return ss.str();
}
template<typename T>
class ThreadSafeQueue {
private:
mutable std::mutex mut; // 可变互斥量(用于const方法)
std::queue<T> data_queue;
std::condition_variable data_cond; // 消费者条件变量:等待队列非空
std::condition_variable space_cond; // 生产者条件变量:等待队列非满
size_t capacity; // 队列容量
std::string name; // 队列名称,用于输出识别
public:
// 构造函数,必须指定容量
explicit ThreadSafeQueue(size_t cap, const std::string& queue_name = "Queue")
: capacity(cap > 0 ? cap : 1), name(queue_name) {
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "创建队列 '" << name << "',容量: " << capacity << std::endl;
}
// 禁止拷贝构造和赋值操作
ThreadSafeQueue(const ThreadSafeQueue&) = delete;
ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete;
// 推入数据(阻塞直到有空间)
void push(T value) {
std::unique_lock<std::mutex> lk(mut);
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "生产者尝试推送值: " << value << " 到队列 '" << name << "'" << std::endl;
space_cond.wait(lk, [this] {
bool has_space = data_queue.size() < capacity;
if (!has_space) {
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "队列 '" << name << "' 已满,生产者等待空间..." << std::endl;
}
return has_space;
});
data_queue.push(std::move(value));
size_t new_size = data_queue.size();
lk.unlock();
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "生产者成功推送值: " << value << " 到队列 '" << name
<< "',当前大小: " << new_size << "/" << capacity << std::endl;
data_cond.notify_one(); // 通知一个等待的消费者
}
// 尝试推入数据(非阻塞)
bool try_push(T value) {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.size() >= capacity) {
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "尝试推送值: " << value << " 到队列 '" << name
<< "' 失败(队列已满)" << std::endl;
return false;
}
data_queue.push(std::move(value));
size_t new_size = data_queue.size();
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "尝试推送值: " << value << " 到队列 '" << name
<< "' 成功,当前大小: " << new_size << "/" << capacity << std::endl;
data_cond.notify_one();
return true;
}
// 带超时的推入数据
template<typename Rep, typename Period>
bool push_timeout(T value, const std::chrono::duration<Rep, Period>& timeout) {
std::unique_lock<std::mutex> lk(mut);
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "生产者尝试带超时推送值: " << value << " 到队列 '" << name << "'" << std::endl;
if (!space_cond.wait_for(lk, timeout, [this] {
bool has_space = data_queue.size() < capacity;
if (!has_space) {
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "队列 '" << name << "' 已满,生产者等待空间..." << std::endl;
}
return has_space;
})) {
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "推送值: " << value << " 到队列 '" << name
<< "' 超时" << std::endl;
return false; // 超时
}
data_queue.push(std::move(value));
size_t new_size = data_queue.size();
lk.unlock();
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "生产者成功带超时推送值: " << value << " 到队列 '" << name
<< "',当前大小: " << new_size << "/" << capacity << std::endl;
data_cond.notify_one();
return true;
}
// 等待并弹出数据(引用版本)
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lk(mut);
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "消费者等待从队列 '" << name << "' 弹出数据" << std::endl;
data_cond.wait(lk, [this] {
bool has_data = !data_queue.empty();
if (!has_data) {
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "队列 '" << name << "' 为空,消费者等待数据..." << std::endl;
}
return has_data;
});
value = std::move(data_queue.front());
data_queue.pop();
size_t new_size = data_queue.size();
lk.unlock();
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "消费者成功从队列 '" << name << "' 弹出值: " << value
<< ",当前大小: " << new_size << "/" << capacity << std::endl;
space_cond.notify_one(); // 通知可能有空间了
}
// 等待并弹出数据(智能指针版本)
std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lk(mut);
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "消费者等待从队列 '" << name << "' 弹出数据" << std::endl;
data_cond.wait(lk, [this] {
bool has_data = !data_queue.empty();
if (!has_data) {
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "队列 '" << name << "' 为空,消费者等待数据..." << std::endl;
}
return has_data;
});
std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
size_t new_size = data_queue.size();
lk.unlock();
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "消费者成功从队列 '" << name << "' 弹出值: " << *res
<< ",当前大小: " << new_size << "/" << capacity << std::endl;
space_cond.notify_one(); // 通知可能有空间了
return res;
}
// 尝试弹出数据(引用版本)
bool try_pop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty()) {
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "尝试从队列 '" << name << "' 弹出数据失败(队列为空)" << std::endl;
return false;
}
value = std::move(data_queue.front());
data_queue.pop();
size_t new_size = data_queue.size();
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "尝试从队列 '" << name << "' 弹出值: " << value
<< " 成功,当前大小: " << new_size << "/" << capacity << std::endl;
space_cond.notify_one(); // 通知可能有空间了
return true;
}
// 尝试弹出数据(智能指针版本)
std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty()) {
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "尝试从队列 '" << name << "' 弹出数据失败(队列为空)" << std::endl;
return nullptr;
}
std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
size_t new_size = data_queue.size();
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "尝试从队列 '" << name << "' 弹出值: " << *res
<< " 成功,当前大小: " << new_size << "/" << capacity << std::endl;
space_cond.notify_one(); // 通知可能有空间了
return res;
}
// 带超时的等待并弹出数据(引用版本)
template<typename Rep, typename Period>
bool wait_and_pop_timeout(T& value, const std::chrono::duration<Rep, Period>& timeout) {
std::unique_lock<std::mutex> lk(mut);
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "消费者带超时等待从队列 '" << name << "' 弹出数据" << std::endl;
if (!data_cond.wait_for(lk, timeout, [this] {
bool has_data = !data_queue.empty();
if (!has_data) {
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "队列 '" << name << "' 为空,消费者等待数据..." << std::endl;
}
return has_data;
})) {
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "从队列 '" << name << "' 弹出数据超时" << std::endl;
return false; // 超时
}
value = std::move(data_queue.front());
data_queue.pop();
size_t new_size = data_queue.size();
lk.unlock();
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "消费者成功带超时从队列 '" << name << "' 弹出值: " << value
<< ",当前大小: " << new_size << "/" << capacity << std::endl;
space_cond.notify_one(); // 通知可能有空间了
return true;
}
// 带超时的等待并弹出数据(智能指针版本)
template<typename Rep, typename Period>
std::shared_ptr<T> wait_and_pop_timeout(const std::chrono::duration<Rep, Period>& timeout) {
std::unique_lock<std::mutex> lk(mut);
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "消费者带超时等待从队列 '" << name << "' 弹出数据" << std::endl;
if (!data_cond.wait_for(lk, timeout, [this] {
bool has_data = !data_queue.empty();
if (!has_data) {
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "队列 '" << name << "' 为空,消费者等待数据..." << std::endl;
}
return has_data;
})) {
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "从队列 '" << name << "' 弹出数据超时" << std::endl;
return nullptr; // 超时
}
std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
size_t new_size = data_queue.size();
lk.unlock();
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "消费者成功带超时从队列 '" << name << "' 弹出值: " << *res
<< ",当前大小: " << new_size << "/" << capacity << std::endl;
space_cond.notify_one(); // 通知可能有空间了
return res;
}
// 检查队列是否为空
bool empty() const {
std::lock_guard<std::mutex> lk(mut);
bool is_empty = data_queue.empty();
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "检查队列 '" << name << "' 是否为空: " << (is_empty ? "是" : "否") << std::endl;
return is_empty;
}
// 检查队列是否已满
bool full() const {
std::lock_guard<std::mutex> lk(mut);
bool is_full = data_queue.size() >= capacity;
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "检查队列 '" << name << "' 是否已满: " << (is_full ? "是" : "否") << std::endl;
return is_full;
}
// 获取队列大小
size_t size() const {
std::lock_guard<std::mutex> lk(mut);
size_t current_size = data_queue.size();
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "获取队列 '" << name << "' 大小: " << current_size << "/" << capacity << std::endl;
return current_size;
}
// 获取队列容量
size_t get_capacity() const {
std::lock_guard<std::mutex> lk(mut);
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "获取队列 '" << name << "' 容量: " << capacity << std::endl;
return capacity;
}
// 设置队列容量
void set_capacity(size_t new_capacity) {
std::lock_guard<std::mutex> lk(mut);
size_t old_capacity = capacity;
capacity = new_capacity > 0 ? new_capacity : 1;
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "设置队列 '" << name << "' 容量: " << old_capacity << " -> " << capacity << std::endl;
// 如果新容量大于当前大小,可能需要通知生产者
if (data_queue.size() < capacity) {
space_cond.notify_all(); // 通知所有等待的生产者
std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
<< "队列 '" << name << "' 有新空间可用,通知所有生产者" << std::endl;
}
}
};
// 使用示例
int main() {
std::cout << "[" << current_time() << "][Main Thread] "
<< "开始线程安全队列演示程序" << std::endl;
// 创建容量为3的线程安全队列
ThreadSafeQueue<int> queue(3, "DemoQueue");
std::cout << "[" << current_time() << "][Main Thread] "
<< "创建生产者和消费者线程" << std::endl;
// 生产者线程
std::thread producer([&] {
std::cout << "[" << current_time() << "][Producer Thread] "
<< "生产者线程启动" << std::endl;
for (int i = 0; i < 6; ++i) {
// 使用阻塞推送
queue.push(i);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
std::cout << "[" << current_time() << "][Producer Thread] "
<< "生产者线程完成" << std::endl;
});
// 消费者线程
std::thread consumer([&] {
std::cout << "[" << current_time() << "][Consumer Thread] "
<< "消费者线程启动" << std::endl;
for (int i = 0; i < 6; ++i) {
int value;
queue.wait_and_pop(value);
std::cout << "[" << current_time() << "][Consumer Thread] "
<< "处理值: " << value << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
std::cout << "[" << current_time() << "][Consumer Thread] "
<< "消费者线程完成" << std::endl;
});
// 等待线程完成
producer.join();
consumer.join();
std::cout << "[" << current_time() << "][Main Thread] "
<< "演示程序完成" << std::endl;
return 0;
}
线程安全队列代码流程分析
我将详细分析这个有三个角色(队列、生产者、消费者)的代码执行流程,帮助您理解多线程环境下的交互过程。
整体架构
+-------------+ 生产数据 +----------+ 消费数据 +-------------+
| 生产者线程 | -------------> | 线程安全队列 | -------------> | 消费者线程 |
+-------------+ +----------+ +-------------+
阻塞当队列满 阻塞当队列空
详细执行流程
1. 初始化阶段
- 主线程创建容量为3的线程安全队列
- 主线程创建生产者线程和消费者线程
- 两个线程开始并行执行
2. 生产者线程流程
- 生产者线程启动,开始循环推送数据(0-5)
- 前3次推送(0,1,2)会立即成功,因为队列有空间
- 当尝试推送第4个值(3)时,队列已满,生产者阻塞
- 等待消费者消费数据后,生产者被唤醒并继续推送
- 重复步骤3-4直到所有数据推送完成
3. 消费者线程流程
- 消费者线程启动,尝试从队列弹出数据
- 最初队列为空,消费者阻塞等待数据
- 当生产者推送数据后,消费者被唤醒并消费数据
- 消费后队列空间释放,通知可能阻塞的生产者
- 重复步骤2-4直到消费完所有数据
4. 队列状态变化
时间线: 0-----1-----2-----3-----4-----5-----6-----7-----8-----9-----10---->
生产者: P0---P1---P2---(阻塞)------------------P3---P4---(阻塞)---P5--->
消费者: (阻塞)-----------C0---(阻塞)---C1---(阻塞)---C2---C3---C4---C5--->
队列状态: 0 -> 0,1 -> 0,1,2 -> 1,2 -> 1,2,3 -> 2,3 -> 2,3,4 -> 3,4 -> 4 -> 4,5 -> 5
5. 关键交互点
- 生产者阻塞点: 当队列满时(大小=容量),生产者调用
space_cond.wait()
进入等待状态 - 消费者阻塞点: 当队列空时,消费者调用
data_cond.wait()
进入等待状态 - 唤醒生产者: 消费者消费数据后调用
space_cond.notify_one()
唤醒可能阻塞的生产者 - 唤醒消费者: 生产者生产数据后调用
data_cond.notify_one()
唤醒可能阻塞的消费者
控制台输出分析示例
以下是程序可能产生的输出片段及其解释:
[14:30:25.123][Thread 140245678912256] 创建队列 'DemoQueue',容量: 3
- 主线程创建容量为3的队列
[14:30:25.124][Thread 140245678912256] 创建生产者和消费者线程
[14:30:25.125][Thread 140245670519552] 生产者线程启动
[14:30:25.126][Thread 140245662126848] 消费者线程启动
- 主线程创建生产者和消费者线程
- 两个线程开始执行
[14:30:25.127][Thread 140245670519552] 生产者尝试推送值: 0 到队列 'DemoQueue'
[14:30:25.128][Thread 140245670519552] 生产者成功推送值: 0 到队列 'DemoQueue',当前大小: 1/3
- 生产者成功推送第一个值
[14:30:25.228][Thread 140245670519552] 生产者尝试推送值: 1 到队列 'DemoQueue'
[14:30:25.229][Thread 140245670519552] 生产者成功推送值: 1 到队列 'DemoQueue',当前大小: 2/3
[14:30:25.329][Thread 140245670519552] 生产者尝试推送值: 2 到队列 'DemoQueue'
[14:30:25.330][Thread 140245670519552] 生产者成功推送值: 2 到队列 'DemoQueue',当前大小: 3/3
- 生产者继续推送,队列逐渐填满
[14:30:25.430][Thread 140245670519552] 生产者尝试推送值: 3 到队列 'DemoQueue'
[14:30:25.431][Thread 140245670519552] 队列 'DemoQueue' 已满,生产者等待空间...
- 队列已满,生产者开始阻塞
[14:30:25.432][Thread 140245662126848] 消费者等待从队列 'DemoQueue' 弹出数据
[14:30:25.433][Thread 140245662126848] 消费者成功从队列 'DemoQueue' 弹出值: 0,当前大小: 2/3
[14:30:25.434][Thread 140245670519552] 生产者成功推送值: 3 到队列 'DemoQueue',当前大小: 3/3
- 消费者消费数据,释放空间
- 生产者被唤醒并成功推送数据
[14:30:25.435][Thread 140245662126848] 处理值: 0
[14:30:25.635][Thread 140245662126848] 消费者等待从队列 'DemoQueue' 弹出数据
[14:30:25.636][Thread 140245662126848] 消费者成功从队列 'DemoQueue' 弹出值: 1,当前大小: 2/3
- 消费者处理数据并尝试下一次消费
关键同步机制
- 互斥锁(mutex): 保护队列数据的并发访问
- 条件变量(condition_variable):
data_cond
: 消费者等待队列非空space_cond
: 生产者等待队列非满
- 通知机制(notify):
- 生产者推送数据后通知
data_cond
- 消费者弹出数据后通知
space_cond
- 生产者推送数据后通知
性能特点
- 生产者受限: 当队列满时,生产者速度受消费者速度限制
- 消费者受限: 当队列空时,消费者速度受生产者速度限制
- 平衡点: 当生产者和消费者速度匹配时,队列大小保持稳定
- 内存使用: 队列大小不会超过指定容量,避免内存无限增长
这个实现提供了一个高效、线程安全的生产者-消费者模式,适用于需要控制资源使用的并发场景。
更多推荐
所有评论(0)