4.1 等待事件或条件

想象一下你在夜间火车上旅行,需要在特定站点下车。你有几种选择:

  1. 整夜保持清醒,每停一站都检查(消耗精力)

  2. 设置闹钟提前醒来(可能太早或错过站点)

  3. 理想方式:有人在你到站时叫醒你

在多线程编程中,我们面临类似问题。当线程需要等待某个条件时(如数据准备好),有几种方法:

  1. 忙等待(Busy Waiting)​​:不断检查条件标志

    while(!flag) { /* 空循环 */ }
    

    这就像整夜保持清醒 - 浪费 CPU 资源

  2. 休眠等待(Sleep and Check)​​:周期性休眠后检查

    while(!flag) {
        std::this_thread::sleep_for(100ms);
    }
    

    这像设置闹钟 - 减少了资源消耗但难以确定最佳休眠时间

  3. 条件变量(Condition Variables)​​:最理想的方式

    std::condition_variable cv;
    // 等待线程
    cv.wait(lock, []{ return flag; });
    // 通知线程
    flag = true;
    cv.notify_one();
    

    这就像有人在你到站时叫醒你 - 高效且准确

C++ 条件变量详解

什么是条件变量?

条件变量是 C++ 多线程编程中用于线程间同步的重要工具,它允许一个或多个线程等待某个特定条件成立后再继续执行。条件变量总是与互斥量配合使用,共同解决复杂的线程同步问题。

核心组件

1. std::condition_variable

定义在 <condition_variable>头文件中,是最常用的条件变量类型。它必须与 std::mutex配合使用。

2. std::condition_variable_any

更通用的条件变量类型,可以与任何满足互斥量基本要求的锁类型配合使用,但性能开销通常更大。

3. std::mutex

互斥量,用于保护共享数据,防止多线程同时访问。

4. std::unique_lock

std::lock_guard更灵活的锁管理类,允许在需要时解锁和重新加锁。

关键成员函数

1. wait()- 等待条件成立

void wait(std::unique_lock<std::mutex>& lock);
  • 调用前必须持有锁

  • 调用时会自动释放锁,并将线程置于等待状态

  • 被唤醒后会重新获取锁

template <class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
  • 带条件的等待,等价于:

    while (!pred()) {
        wait(lock);
    }
    
  • 防止虚假唤醒(spurious wakeup)

2. notify_one()- 通知一个等待线程

void notify_one() noexcept;
  • 唤醒一个正在等待此条件变量的线程

  • 如果没有线程在等待,则什么也不做

3. notify_all()- 通知所有等待线程

void notify_all() noexcept;
  • 唤醒所有正在等待此条件变量的线程

  • 如果没有线程在等待,则什么也不做

辅助函数

std::this_thread::sleep_for()

template <class Rep, class Period>
void sleep_for(const std::chrono::duration<Rep, Period>& sleep_duration);
  • 使当前线程休眠指定时长

  • 在条件变量出现前常用于轮询场景

完整使用流程

等待线程(消费者)

std::unique_lock<std::mutex> lock(mutex);
condition_variable.wait(lock, [&]{ return condition_met; });
// 条件满足后继续执行

通知线程(生产者)

{
    std::lock_guard<std::mutex> lock(mutex);
    // 修改共享数据
    condition_met = true;
}
condition_variable.notify_one(); // 或 notify_all()

关键概念解释

1. 虚假唤醒(Spurious Wakeup)

  • 条件变量可能在没有收到通知的情况下自行唤醒线程

  • 使用带谓词的 wait()可以避免此问题

  • 谓词应该检查实际业务条件

2. 锁的管理

  • wait()调用前必须持有锁

  • wait()会在等待期间释放锁

  • 被唤醒后会自动重新获取锁

3. 通知机制

  • notify_one():唤醒一个等待线程(不确定哪个)

  • notify_all():唤醒所有等待线程

  • 通知应在锁外发出以提高性能

完整示例代码

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void worker_thread()
{
    // 等待主线程的信号
    std::unique_lock<std::mutex> lock(mtx);
    std::cout << "工作线程: 等待主线程信号..." << std::endl;
    cv.wait(lock, []{ return ready; });
    
    // 条件满足后继续工作
    std::cout << "工作线程: 收到信号,开始工作..." << std::endl;
}

int main()
{
    std::thread worker(worker_thread);
    
    // 模拟主线程准备工作
    std::this_thread::sleep_for(std::chrono::seconds(2));
    
    {
        std::lock_guard<std::mutex> lock(mtx);
        std::cout << "主线程: 准备工作完成..." << std::endl;
        ready = true;
    }
    
    cv.notify_one(); // 通知工作线程
    
    worker.join();
    std::cout << "主线程: 工作线程已完成任务" << std::endl;
    
    return 0;
}

输出示例

工作线程: 等待主线程信号...
主线程: 准备工作完成...
工作线程: 收到信号,开始工作...
主线程: 工作线程已完成任务

最佳实践

  1. ​**总是使用带谓词的 wait()**​ - 防止虚假唤醒

  2. 在锁外发送通知​ - 提高性能

  3. ​**优先使用 notify_one()**​ - 除非需要唤醒所有线程

  4. 避免长时间持有锁​ - 尤其是在调用外部代码时

  5. 使用 RAII 管理锁​ - 确保异常安全

条件变量是多线程编程中协调线程活动的强大工具,正确使用可以构建高效、响应性强的并发系统。

1 等待条件达成

C++ 提供了两种条件变量:

  • std::condition_variable(配合 std::mutex使用)

  • std::condition_variable_any(配合任何互斥量使用)

下面是完整的使用示例:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>
#include <random>

// 模拟数据块
struct DataChunk {
    int id;
    explicit DataChunk(int id) : id(id) {}
};

std::mutex mut;
std::queue<DataChunk> data_queue;
std::condition_variable data_cond;

// 数据准备线程
void data_preparation_thread() {
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(500, 2000);

    for (int i = 1; i <= 5; ++i) {
        // 模拟数据准备时间
        std::this_thread::sleep_for(std::chrono::milliseconds(dis(gen)));
        
        DataChunk data(i);
        {
            std::lock_guard<std::mutex> lk(mut);
            std::cout << "准备数据: " << i << std::endl;
            data_queue.push(data);
        }
        data_cond.notify_one(); // 通知一个等待线程
    }
}

// 数据处理线程
void data_processing_thread() {
    int processed_count = 0;
    
    while (processed_count < 5) {
        std::unique_lock<std::mutex> lk(mut);
        
        // 等待条件:队列不为空
        data_cond.wait(lk, []{ 
            return !data_queue.empty(); 
        });
        
        // 条件满足,处理数据
        DataChunk data = data_queue.front();
        data_queue.pop();
        lk.unlock(); // 提前释放锁
        
        // 模拟数据处理
        std::cout << "处理数据: " << data.id << std::endl;
        ++processed_count;
    }
}

int main() {
    std::thread producer(data_preparation_thread);
    std::thread consumer(data_processing_thread);
    
    producer.join();
    consumer.join();
    
    std::cout << "所有数据处理完成!" << std::endl;
    return 0;
}

代码说明:​

  1. 数据准备线程随机休眠(模拟工作),然后准备数据并推入队列

  2. 数据处理线程等待队列不为空的条件

  3. data_cond.wait()会自动解锁互斥量并进入等待状态

  4. 当数据准备好时,准备线程调用 notify_one()唤醒处理线程

  5. 处理线程被唤醒后重新获取锁并检查条件

  6. 条件满足则处理数据,否则继续等待

2 构建线程安全队列

下面是完整的线程安全队列实现:

#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>

template<typename T>
class ThreadSafeQueue {
private:
    mutable std::mutex mut; // 可变互斥量(用于const方法)
    std::queue<T> data_queue;
    std::condition_variable data_cond;

public:
    ThreadSafeQueue() = default;

    // 禁止赋值操作
    ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete;

    // 推入数据
    void push(T value) {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(std::move(value));
        data_cond.notify_one(); // 通知一个等待线程
    }

    // 等待并弹出数据(引用版本)
    void wait_and_pop(T& value) {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]{ return !data_queue.empty(); });
        value = std::move(data_queue.front());
        data_queue.pop();
    }

    // 等待并弹出数据(智能指针版本)
    std::shared_ptr<T> wait_and_pop() {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]{ return !data_queue.empty(); });
        std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
        data_queue.pop();
        return res;
    }

    // 尝试弹出数据(引用版本)
    bool try_pop(T& value) {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty()) return false;
        value = std::move(data_queue.front());
        data_queue.pop();
        return true;
    }

    // 尝试弹出数据(智能指针版本)
    std::shared_ptr<T> try_pop() {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty()) return nullptr;
        std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
        data_queue.pop();
        return res;
    }

    // 检查队列是否为空
    bool empty() const {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }

    // 获取队列大小
    size_t size() const {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.size();
    }
};

// 使用示例
int main() {
    ThreadSafeQueue<int> queue;

    // 生产者线程
    std::thread producer([&]{
        for (int i = 0; i < 10; ++i) {
            queue.push(i);
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    });

    // 消费者线程
    std::thread consumer([&]{
        for (int i = 0; i < 10; ++i) {
            int value;
            queue.wait_and_pop(value);
            std::cout << "处理值: " << value << std::endl;
        }
    });

    producer.join();
    consumer.join();

    return 0;
}

关键设计点:​

  1. 使用 std::unique_lock配合条件变量,允许在等待时解锁

  2. 提供两种弹出方式:

    • wait_and_pop():阻塞直到数据可用

    • try_pop():立即返回(成功或失败)

  3. 两种返回值形式:

    • 通过引用参数返回

    • 通过智能指针返回(避免拷贝开销)

  4. 互斥量声明为 mutable,允许在 const方法中加锁

  5. 禁止赋值操作,防止意外共享状态

使用场景:​

  • 生产者-消费者模式

  • 线程池任务队列

  • 任何需要线程间安全传递数据的场景

条件变量是多线程编程中协调线程活动的强大工具,能有效避免忙等待和休眠时间难以确定的问题,是实现高效并发系统的关键组件。

3.更加完善的生产者消费者模型 代码实现

上面的代码没有关注队列满了的情况,所以生产者就没有考虑 队列满了的情况,也就不需要条件等待,下面是一个更加完善的代码例子

#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <iostream>
#include <thread>
#include <sstream>

// 获取当前时间的字符串表示
#include <chrono>
#include <iomanip>
#include <sstream>
#include <mutex>

std::string current_time() {
    auto now = std::chrono::system_clock::now();
    auto now_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now);
    auto epoch = now_ms.time_since_epoch();
    auto value = std::chrono::duration_cast<std::chrono::milliseconds>(epoch);
    auto time = std::chrono::system_clock::to_time_t(now);

    // 线程安全的本地时间转换
    static std::mutex mtx;
    std::tm tm_time;
    {
        std::lock_guard<std::mutex> lock(mtx);
        tm_time = *std::localtime(&time);
    }

    std::stringstream ss;
    ss << std::put_time(&tm_time, "%H:%M:%S")
        << '.' << std::setfill('0') << std::setw(3) << (value.count() % 1000);
    return ss.str();
}

// 获取线程ID的字符串表示
std::string thread_id() {
    std::stringstream ss;
    ss << std::this_thread::get_id();
    return ss.str();
}

template<typename T>
class ThreadSafeQueue {
private:
    mutable std::mutex mut; // 可变互斥量(用于const方法)
    std::queue<T> data_queue;
    std::condition_variable data_cond; // 消费者条件变量:等待队列非空
    std::condition_variable space_cond; // 生产者条件变量:等待队列非满
    size_t capacity; // 队列容量
    std::string name; // 队列名称,用于输出识别

public:
    // 构造函数,必须指定容量
    explicit ThreadSafeQueue(size_t cap, const std::string& queue_name = "Queue")
        : capacity(cap > 0 ? cap : 1), name(queue_name) {
        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "创建队列 '" << name << "',容量: " << capacity << std::endl;
    }

    // 禁止拷贝构造和赋值操作
    ThreadSafeQueue(const ThreadSafeQueue&) = delete;
    ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete;

    // 推入数据(阻塞直到有空间)
    void push(T value) {
        std::unique_lock<std::mutex> lk(mut);
        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "生产者尝试推送值: " << value << " 到队列 '" << name << "'" << std::endl;

        space_cond.wait(lk, [this] {
            bool has_space = data_queue.size() < capacity;
            if (!has_space) {
                std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
                    << "队列 '" << name << "' 已满,生产者等待空间..." << std::endl;
            }
            return has_space;
            });

        data_queue.push(std::move(value));
        size_t new_size = data_queue.size();
        lk.unlock();

        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "生产者成功推送值: " << value << " 到队列 '" << name
            << "',当前大小: " << new_size << "/" << capacity << std::endl;

        data_cond.notify_one(); // 通知一个等待的消费者
    }

    // 尝试推入数据(非阻塞)
    bool try_push(T value) {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.size() >= capacity) {
            std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
                << "尝试推送值: " << value << " 到队列 '" << name
                << "' 失败(队列已满)" << std::endl;
            return false;
        }

        data_queue.push(std::move(value));
        size_t new_size = data_queue.size();
        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "尝试推送值: " << value << " 到队列 '" << name
            << "' 成功,当前大小: " << new_size << "/" << capacity << std::endl;

        data_cond.notify_one();
        return true;
    }

    // 带超时的推入数据
    template<typename Rep, typename Period>
    bool push_timeout(T value, const std::chrono::duration<Rep, Period>& timeout) {
        std::unique_lock<std::mutex> lk(mut);
        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "生产者尝试带超时推送值: " << value << " 到队列 '" << name << "'" << std::endl;

        if (!space_cond.wait_for(lk, timeout, [this] {
            bool has_space = data_queue.size() < capacity;
            if (!has_space) {
                std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
                    << "队列 '" << name << "' 已满,生产者等待空间..." << std::endl;
            }
            return has_space;
            })) {
            std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
                << "推送值: " << value << " 到队列 '" << name
                << "' 超时" << std::endl;
            return false; // 超时
        }

        data_queue.push(std::move(value));
        size_t new_size = data_queue.size();
        lk.unlock();

        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "生产者成功带超时推送值: " << value << " 到队列 '" << name
            << "',当前大小: " << new_size << "/" << capacity << std::endl;

        data_cond.notify_one();
        return true;
    }

    // 等待并弹出数据(引用版本)
    void wait_and_pop(T& value) {
        std::unique_lock<std::mutex> lk(mut);
        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "消费者等待从队列 '" << name << "' 弹出数据" << std::endl;

        data_cond.wait(lk, [this] {
            bool has_data = !data_queue.empty();
            if (!has_data) {
                std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
                    << "队列 '" << name << "' 为空,消费者等待数据..." << std::endl;
            }
            return has_data;
            });

        value = std::move(data_queue.front());
        data_queue.pop();
        size_t new_size = data_queue.size();
        lk.unlock();

        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "消费者成功从队列 '" << name << "' 弹出值: " << value
            << ",当前大小: " << new_size << "/" << capacity << std::endl;

        space_cond.notify_one(); // 通知可能有空间了
    }

    // 等待并弹出数据(智能指针版本)
    std::shared_ptr<T> wait_and_pop() {
        std::unique_lock<std::mutex> lk(mut);
        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "消费者等待从队列 '" << name << "' 弹出数据" << std::endl;

        data_cond.wait(lk, [this] {
            bool has_data = !data_queue.empty();
            if (!has_data) {
                std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
                    << "队列 '" << name << "' 为空,消费者等待数据..." << std::endl;
            }
            return has_data;
            });

        std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
        data_queue.pop();
        size_t new_size = data_queue.size();
        lk.unlock();

        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "消费者成功从队列 '" << name << "' 弹出值: " << *res
            << ",当前大小: " << new_size << "/" << capacity << std::endl;

        space_cond.notify_one(); // 通知可能有空间了
        return res;
    }

    // 尝试弹出数据(引用版本)
    bool try_pop(T& value) {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty()) {
            std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
                << "尝试从队列 '" << name << "' 弹出数据失败(队列为空)" << std::endl;
            return false;
        }

        value = std::move(data_queue.front());
        data_queue.pop();
        size_t new_size = data_queue.size();

        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "尝试从队列 '" << name << "' 弹出值: " << value
            << " 成功,当前大小: " << new_size << "/" << capacity << std::endl;

        space_cond.notify_one(); // 通知可能有空间了
        return true;
    }

    // 尝试弹出数据(智能指针版本)
    std::shared_ptr<T> try_pop() {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty()) {
            std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
                << "尝试从队列 '" << name << "' 弹出数据失败(队列为空)" << std::endl;
            return nullptr;
        }

        std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
        data_queue.pop();
        size_t new_size = data_queue.size();

        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "尝试从队列 '" << name << "' 弹出值: " << *res
            << " 成功,当前大小: " << new_size << "/" << capacity << std::endl;

        space_cond.notify_one(); // 通知可能有空间了
        return res;
    }

    // 带超时的等待并弹出数据(引用版本)
    template<typename Rep, typename Period>
    bool wait_and_pop_timeout(T& value, const std::chrono::duration<Rep, Period>& timeout) {
        std::unique_lock<std::mutex> lk(mut);
        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "消费者带超时等待从队列 '" << name << "' 弹出数据" << std::endl;

        if (!data_cond.wait_for(lk, timeout, [this] {
            bool has_data = !data_queue.empty();
            if (!has_data) {
                std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
                    << "队列 '" << name << "' 为空,消费者等待数据..." << std::endl;
            }
            return has_data;
            })) {
            std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
                << "从队列 '" << name << "' 弹出数据超时" << std::endl;
            return false; // 超时
        }

        value = std::move(data_queue.front());
        data_queue.pop();
        size_t new_size = data_queue.size();
        lk.unlock();

        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "消费者成功带超时从队列 '" << name << "' 弹出值: " << value
            << ",当前大小: " << new_size << "/" << capacity << std::endl;

        space_cond.notify_one(); // 通知可能有空间了
        return true;
    }

    // 带超时的等待并弹出数据(智能指针版本)
    template<typename Rep, typename Period>
    std::shared_ptr<T> wait_and_pop_timeout(const std::chrono::duration<Rep, Period>& timeout) {
        std::unique_lock<std::mutex> lk(mut);
        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "消费者带超时等待从队列 '" << name << "' 弹出数据" << std::endl;

        if (!data_cond.wait_for(lk, timeout, [this] {
            bool has_data = !data_queue.empty();
            if (!has_data) {
                std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
                    << "队列 '" << name << "' 为空,消费者等待数据..." << std::endl;
            }
            return has_data;
            })) {
            std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
                << "从队列 '" << name << "' 弹出数据超时" << std::endl;
            return nullptr; // 超时
        }

        std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
        data_queue.pop();
        size_t new_size = data_queue.size();
        lk.unlock();

        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "消费者成功带超时从队列 '" << name << "' 弹出值: " << *res
            << ",当前大小: " << new_size << "/" << capacity << std::endl;

        space_cond.notify_one(); // 通知可能有空间了
        return res;
    }

    // 检查队列是否为空
    bool empty() const {
        std::lock_guard<std::mutex> lk(mut);
        bool is_empty = data_queue.empty();
        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "检查队列 '" << name << "' 是否为空: " << (is_empty ? "是" : "否") << std::endl;
        return is_empty;
    }

    // 检查队列是否已满
    bool full() const {
        std::lock_guard<std::mutex> lk(mut);
        bool is_full = data_queue.size() >= capacity;
        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "检查队列 '" << name << "' 是否已满: " << (is_full ? "是" : "否") << std::endl;
        return is_full;
    }

    // 获取队列大小
    size_t size() const {
        std::lock_guard<std::mutex> lk(mut);
        size_t current_size = data_queue.size();
        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "获取队列 '" << name << "' 大小: " << current_size << "/" << capacity << std::endl;
        return current_size;
    }

    // 获取队列容量
    size_t get_capacity() const {
        std::lock_guard<std::mutex> lk(mut);
        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "获取队列 '" << name << "' 容量: " << capacity << std::endl;
        return capacity;
    }

    // 设置队列容量
    void set_capacity(size_t new_capacity) {
        std::lock_guard<std::mutex> lk(mut);
        size_t old_capacity = capacity;
        capacity = new_capacity > 0 ? new_capacity : 1;

        std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
            << "设置队列 '" << name << "' 容量: " << old_capacity << " -> " << capacity << std::endl;

        // 如果新容量大于当前大小,可能需要通知生产者
        if (data_queue.size() < capacity) {
            space_cond.notify_all(); // 通知所有等待的生产者
            std::cout << "[" << current_time() << "][Thread " << thread_id() << "] "
                << "队列 '" << name << "' 有新空间可用,通知所有生产者" << std::endl;
        }
    }
};

// 使用示例
int main() {
    std::cout << "[" << current_time() << "][Main Thread] "
        << "开始线程安全队列演示程序" << std::endl;

    // 创建容量为3的线程安全队列
    ThreadSafeQueue<int> queue(3, "DemoQueue");

    std::cout << "[" << current_time() << "][Main Thread] "
        << "创建生产者和消费者线程" << std::endl;

    // 生产者线程
    std::thread producer([&] {
        std::cout << "[" << current_time() << "][Producer Thread] "
            << "生产者线程启动" << std::endl;

        for (int i = 0; i < 6; ++i) {
            // 使用阻塞推送
            queue.push(i);
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }

        std::cout << "[" << current_time() << "][Producer Thread] "
            << "生产者线程完成" << std::endl;
        });

    // 消费者线程
    std::thread consumer([&] {
        std::cout << "[" << current_time() << "][Consumer Thread] "
            << "消费者线程启动" << std::endl;

        for (int i = 0; i < 6; ++i) {
            int value;
            queue.wait_and_pop(value);
            std::cout << "[" << current_time() << "][Consumer Thread] "
                << "处理值: " << value << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
        }

        std::cout << "[" << current_time() << "][Consumer Thread] "
            << "消费者线程完成" << std::endl;
        });

    // 等待线程完成
    producer.join();
    consumer.join();

    std::cout << "[" << current_time() << "][Main Thread] "
        << "演示程序完成" << std::endl;

    return 0;
}

线程安全队列代码流程分析

我将详细分析这个有三个角色(队列、生产者、消费者)的代码执行流程,帮助您理解多线程环境下的交互过程。

整体架构

+-------------+    生产数据     +----------+    消费数据     +-------------+
|  生产者线程  | -------------> | 线程安全队列 | -------------> |  消费者线程  |
+-------------+                +----------+                +-------------+
        阻塞当队列满                              阻塞当队列空

详细执行流程

1. 初始化阶段

  • 主线程创建容量为3的线程安全队列
  • 主线程创建生产者线程和消费者线程
  • 两个线程开始并行执行

2. 生产者线程流程

  1. 生产者线程启动,开始循环推送数据(0-5)
  2. 前3次推送(0,1,2)会立即成功,因为队列有空间
  3. 当尝试推送第4个值(3)时,队列已满,生产者阻塞
  4. 等待消费者消费数据后,生产者被唤醒并继续推送
  5. 重复步骤3-4直到所有数据推送完成

3. 消费者线程流程

  1. 消费者线程启动,尝试从队列弹出数据
  2. 最初队列为空,消费者阻塞等待数据
  3. 当生产者推送数据后,消费者被唤醒并消费数据
  4. 消费后队列空间释放,通知可能阻塞的生产者
  5. 重复步骤2-4直到消费完所有数据

4. 队列状态变化

时间线: 0-----1-----2-----3-----4-----5-----6-----7-----8-----9-----10---->
生产者: P0---P1---P2---(阻塞)------------------P3---P4---(阻塞)---P5--->
消费者: (阻塞)-----------C0---(阻塞)---C1---(阻塞)---C2---C3---C4---C5--->
队列状态: 0 -> 0,1 -> 0,1,2 -> 1,2 -> 1,2,3 -> 2,3 -> 2,3,4 -> 3,4 -> 4 -> 4,5 -> 5

5. 关键交互点

  1. 生产者阻塞点: 当队列满时(大小=容量),生产者调用space_cond.wait()进入等待状态
  2. 消费者阻塞点: 当队列空时,消费者调用data_cond.wait()进入等待状态
  3. 唤醒生产者: 消费者消费数据后调用space_cond.notify_one()唤醒可能阻塞的生产者
  4. 唤醒消费者: 生产者生产数据后调用data_cond.notify_one()唤醒可能阻塞的消费者

控制台输出分析示例

以下是程序可能产生的输出片段及其解释:

[14:30:25.123][Thread 140245678912256] 创建队列 'DemoQueue',容量: 3
  • 主线程创建容量为3的队列
[14:30:25.124][Thread 140245678912256] 创建生产者和消费者线程
[14:30:25.125][Thread 140245670519552] 生产者线程启动
[14:30:25.126][Thread 140245662126848] 消费者线程启动
  • 主线程创建生产者和消费者线程
  • 两个线程开始执行
[14:30:25.127][Thread 140245670519552] 生产者尝试推送值: 0 到队列 'DemoQueue'
[14:30:25.128][Thread 140245670519552] 生产者成功推送值: 0 到队列 'DemoQueue',当前大小: 1/3
  • 生产者成功推送第一个值
[14:30:25.228][Thread 140245670519552] 生产者尝试推送值: 1 到队列 'DemoQueue'
[14:30:25.229][Thread 140245670519552] 生产者成功推送值: 1 到队列 'DemoQueue',当前大小: 2/3
[14:30:25.329][Thread 140245670519552] 生产者尝试推送值: 2 到队列 'DemoQueue'
[14:30:25.330][Thread 140245670519552] 生产者成功推送值: 2 到队列 'DemoQueue',当前大小: 3/3
  • 生产者继续推送,队列逐渐填满
[14:30:25.430][Thread 140245670519552] 生产者尝试推送值: 3 到队列 'DemoQueue'
[14:30:25.431][Thread 140245670519552] 队列 'DemoQueue' 已满,生产者等待空间...
  • 队列已满,生产者开始阻塞
[14:30:25.432][Thread 140245662126848] 消费者等待从队列 'DemoQueue' 弹出数据
[14:30:25.433][Thread 140245662126848] 消费者成功从队列 'DemoQueue' 弹出值: 0,当前大小: 2/3
[14:30:25.434][Thread 140245670519552] 生产者成功推送值: 3 到队列 'DemoQueue',当前大小: 3/3
  • 消费者消费数据,释放空间
  • 生产者被唤醒并成功推送数据
[14:30:25.435][Thread 140245662126848] 处理值: 0
[14:30:25.635][Thread 140245662126848] 消费者等待从队列 'DemoQueue' 弹出数据
[14:30:25.636][Thread 140245662126848] 消费者成功从队列 'DemoQueue' 弹出值: 1,当前大小: 2/3
  • 消费者处理数据并尝试下一次消费

关键同步机制

  1. 互斥锁(mutex): 保护队列数据的并发访问
  2. 条件变量(condition_variable):
    • data_cond: 消费者等待队列非空
    • space_cond: 生产者等待队列非满
  3. 通知机制(notify):
    • 生产者推送数据后通知data_cond
    • 消费者弹出数据后通知space_cond

性能特点

  1. 生产者受限: 当队列满时,生产者速度受消费者速度限制
  2. 消费者受限: 当队列空时,消费者速度受生产者速度限制
  3. 平衡点: 当生产者和消费者速度匹配时,队列大小保持稳定
  4. 内存使用: 队列大小不会超过指定容量,避免内存无限增长

这个实现提供了一个高效、线程安全的生产者-消费者模式,适用于需要控制资源使用的并发场景。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐