C++ 线程实战案例解析
这是 C++ 多线程最核心的安全机制。:真正的 SPSC 优化方案是不使用任何阻塞工具(锁或信号量),而是利用 std::atomic 配合 CPU 缓存一致性协议,实现真正的“零拷贝、零阻塞”。使用者不需要手动调用 sem_init 或 sem_destroy,这些都在 Semaphore 和 RingQueue 的构造函数和析构函数中自动完成。使用者不需要知道内部用的是 sem_wait 还是
案例1: 最基础的生产者消费者模型
#include <stdio.h> // 标准输入输出(printf)
#include <pthread.h> // POSIX线程库(线程创建/互斥锁/条件变量)
#include <unistd.h> // 睡眠函数(sleep)
#include <stdlib.h> // 标准库(内存分配/退出等,此处暂未用到)
#define BUF_SIZE 3 // 缓冲区大小:最多存放3个数据
int g_buf[BUF_SIZE]; // 共享缓冲区:存储生产/消费的数据
int g_idx = 0; // 缓冲区当前已存元素个数(共享资源,需互斥保护)
// 同步原语:互斥锁+两个条件变量(分别协调生产者/消费者)
pthread_mutex_t g_mutex; // 互斥锁:保护共享资源(g_buf/g_idx)的排他访问
pthread_cond_t g_cond_prod; // 生产者条件变量:缓冲区有空位时唤醒生产者
pthread_cond_t g_cond_cons; // 消费者条件变量:缓冲区有数据时唤醒消费者
// 生产者线程函数:循环往缓冲区放入5个数据
// 参数:arg - 线程入参(此处未使用),返回值:线程退出状态(此处返回NULL)
void* producer(void* arg) {
// 生产5个数据(循环5次)
for (int i = 0; i < 5; i++) {
// 1. 加互斥锁:保护共享资源(g_buf/g_idx),防止多线程同时修改
// 加锁后,其他线程调用pthread_mutex_lock会阻塞,直到当前线程解锁
pthread_mutex_lock(&g_mutex);
// 2. 检查缓冲区是否满:如果满了,生产者需要等待消费者取数据
// 注意:必须用while而非if!避免「虚假唤醒」(条件变量被唤醒后,条件可能已不满足)
// 比如:多个生产者被唤醒时,缓冲区可能已被其他生产者占满
while (g_idx == BUF_SIZE) {
// pthread_cond_wait做两件事:
// a. 原子操作:解锁g_mutex + 阻塞当前线程,等待被唤醒
// b. 被唤醒后:继续从这个代码开始执行, 重新加锁g_mutex,然后退出wait继续执行
pthread_cond_wait(&g_cond_prod, &g_mutex);
}
// 3. 生产数据:往缓冲区放入当前数值i
g_buf[g_idx++] = i; // g_idx先赋值,再自增(等价于g_buf[g_idx]=i; g_idx++;)
// 打印生产信息:pthread_self()获取当前线程ID,g_idx是当前缓冲区数据量
printf("生产者[%ld]:放入%d,缓冲区数量=%d\n", pthread_self(), i, g_idx);
// 4. 唤醒消费者:缓冲区已有数据,通知消费者可以取数据了
// pthread_cond_signal:唤醒等待该条件变量的任意一个线程(此处只有1个消费者)
// 若有多个消费者,可用pthread_cond_broadcast唤醒所有
pthread_cond_signal(&g_cond_cons);
// 5. 解锁互斥锁:释放共享资源,让其他线程(消费者)可以访问
pthread_mutex_unlock(&g_mutex);
// 模拟生产耗时(比如生产一个数据需要1秒),让出CPU,让消费者有机会执行
sleep(1);
}
// 线程正常退出,返回NULL
return NULL;
}
// 消费者线程函数:循环从缓冲区取出5个数据
// 参数:arg - 线程入参(此处未使用),返回值:线程退出状态(此处返回NULL)
void* consumer(void* arg) {
// 消费5个数据(与生产者生产数量一致)
for (int i = 0; i < 5; i++) {
// 1. 加互斥锁:保护共享资源,防止与生产者同时修改g_buf/g_idx
pthread_mutex_lock(&g_mutex);
// 2. 检查缓冲区是否空:如果空了,消费者需要等待生产者放数据
// 同样用while避免虚假唤醒(比如多个消费者被唤醒时,缓冲区已空)
while (g_idx == 0) {
// 阻塞当前线程,等待生产者的唤醒信号(g_cond_cons)
// 等待期间会自动解锁g_mutex,被唤醒后重新加锁
pthread_cond_wait(&g_cond_cons, &g_mutex);
}
// 3. 消费数据:从缓冲区取出最后一个数据
int val = g_buf[--g_idx]; // g_idx先自减,再赋值(等价于g_idx--; val=g_buf[g_idx];)
// 打印消费信息:当前线程ID + 取出的数据 + 剩余缓冲区数量
printf("消费者[%ld]:取出%d,缓冲区数量=%d\n", pthread_self(), val, g_idx);
// 4. 唤醒生产者:缓冲区有空位了,通知生产者可以继续生产
pthread_cond_signal(&g_cond_prod);
// 5. 解锁互斥锁:释放共享资源,让生产者可以访问
pthread_mutex_unlock(&g_mutex);
// 模拟消费耗时(比如消费一个数据需要1秒),让出CPU,让生产者有机会执行
sleep(1);
}
// 线程正常退出,返回NULL
return NULL;
}
int main() {
// 定义线程ID:分别存储生产者/消费者线程的ID
pthread_t tid_prod, tid_cons;
// 初始化同步原语:必须先初始化,才能使用
// 第二个参数为NULL:使用默认属性
pthread_mutex_init(&g_mutex, NULL); // 初始化互斥锁
pthread_cond_init(&g_cond_prod, NULL); // 初始化生产者条件变量
pthread_cond_init(&g_cond_cons, NULL); // 初始化消费者条件变量
// 创建线程
// 参数:线程ID指针、线程属性(NULL=默认)、线程函数、函数入参(NULL)
pthread_create(&tid_prod, NULL, producer, NULL); // 创建生产者线程
pthread_create(&tid_cons, NULL, consumer, NULL); // 创建消费者线程
// 等待线程结束:主线程阻塞,直到生产者/消费者线程执行完毕
// 第二个参数为NULL:不接收线程的返回值
pthread_join(tid_prod, NULL); // 等待生产者线程退出
pthread_join(tid_cons, NULL); // 等待消费者线程退出
// 销毁同步原语:释放系统资源,避免内存泄漏
pthread_mutex_destroy(&g_mutex); // 销毁互斥锁
pthread_cond_destroy(&g_cond_prod); // 销毁生产者条件变量
pthread_cond_destroy(&g_cond_cons); // 销毁消费者条件变量
return 0; // 主线程正常退出
}
案例2: 信号量

我就用信号量最经典的**“限流”**场景来举例:只有 3 个窗口的银行,来了 10 个客户办理业务。
在这个例子中,你会看到:
-
信号量 充当了“叫号机”和“柜员空闲数”。
-
只有 3 个线程能同时运行(办理业务)。
-
其他 7 个线程必须阻塞等待,直到有人办完离开。
这个代码不需要 pthread_cond,也不需要 while 循环检查,逻辑非常直线条。
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <semaphore.h> // 必须引入信号量头文件
#define NUM_WINDOWS 3 // 只有3个柜台窗口
#define NUM_CUSTOMERS 10 // 来了10个客户
sem_t sem_windows; // 定义信号量
// 客户线程函数
void* customer(void* arg) {
int id = *(int*)arg;
free(arg); // 释放传进来的内存
printf("--- 客户 %d 到达银行,正在排队...\n", id);
// [P操作] 申请资源
// 信号量 > 0 则减1并进入;
// 信号量 = 0 则阻塞在这里睡觉
sem_wait(&sem_windows);
// --- 临界区(能走到这里说明拿到窗口了) ---
printf("+++ 客户 %d 抢到了窗口,正在办理业务...\n", id);
sleep(2); // 模拟办理业务耗时 2秒
printf("<<< 客户 %d 办理完毕,离开窗口\n", id);
// [V操作] 释放资源
// 信号量 + 1,如果有人在排队,会唤醒其中一个
sem_post(&sem_windows);
return NULL;
}
int main() {
pthread_t tids[NUM_CUSTOMERS];
// 初始化信号量
// 第二个参数 0 表示线程间共享
// 第三个参数 NUM_WINDOWS (3) 是信号量的初始值!表示有3个空位
sem_init(&sem_windows, 0, NUM_WINDOWS);
// 创建 10 个客户线程
for (int i = 0; i < NUM_CUSTOMERS; i++) {
int* p_id = malloc(sizeof(int)); // 动态分配内存传参
*p_id = i;
pthread_create(&tids[i], NULL, customer, p_id);
// 稍微间隔一下让打印好看点,不是必须的
usleep(100000);
}
// 等待所有线程结束
for (int i = 0; i < NUM_CUSTOMERS; i++) {
pthread_join(tids[i], NULL);
}
// 销毁信号量
sem_destroy(&sem_windows);
return 0;
}
案例3: 环形队列
生产者消费者模型,环形队列 ,信号量, 且单独实现环形队列类
这是一个非常好的练习,能够帮助你理解如何从 面向过程 (C语言) 转向 面向对象 (C++) 的思维方式。在 C++ 中,我们通常会利用 RAII (资源获取即初始化) 技术来管理锁和信号量,利用 模板 (Template) 来让队列支持任意数据类型。
以下是完整的 C++ 实现,包含三个部分:
Semaphore 类:将 POSIX sem_t 封装成 C++ 类(为了 RAII 管理)。
RingQueue 类:线程安全的泛型环形队列。
主程序:生产者与消费者逻辑。
完整代码 (C++ Standard 11/14/17/20 兼容)
为了保证代码在 Linux/Unix 环境下与之前的 C 代码逻辑一致,底层依然使用 <semaphore.h>,但用 C++ 类进行了封装。
#include <iostream> // 标准输入输出(cout/endl)
#include <vector> // 动态数组,用于实现环形队列的缓冲区
#include <thread> // C++11线程库(线程创建/线程ID获取)
#include <mutex> // C++11互斥锁(lock_guard)
#include <semaphore.h> // POSIX信号量(sem_init/sem_wait/sem_post)
#include <unistd.h> // sleep函数(模拟生产/消费耗时)
// ---------------------------------------------------------
// 1. 信号量封装类 (RAII风格)
// 核心设计:利用C++的构造/析构自动管理信号量生命周期,避免手动销毁导致的资源泄漏
// ---------------------------------------------------------
class Semaphore {
private:
sem_t _sem; // POSIX信号量原生对象
public:
// 构造函数:初始化信号量
// 参数value:信号量初始值(>=0)
// sem_init参数说明:
// &_sem:要初始化的信号量对象
// 0:信号量作用域(0=进程内线程共享,非0=进程间共享)
// value:信号量初始值(可用资源数)
Semaphore(int value) {
sem_init(&_sem, 0, value);
}
// 析构函数:销毁信号量(RAII核心:对象销毁时自动释放资源)
// 避免手动调用sem_destroy遗漏导致的系统资源泄漏
~Semaphore() {
sem_destroy(&_sem);
}
// P操作 (Wait/减操作):申请资源,若无可用则阻塞
// 作用:信号量值-1;若值<0,当前线程阻塞,直到有其他线程调用signal()
void wait() {
sem_wait(&_sem);
}
// V操作 (Post/Signal/加操作):释放资源,唤醒阻塞线程
// 作用:信号量值+1;若有线程阻塞在wait(),则唤醒其中一个
void signal() {
sem_post(&_sem);
}
};
// ---------------------------------------------------------
// 2. 线程安全的环形队列类 (模板类)
// 核心设计:
// - 环形队列:通过head/tail取模实现缓冲区循环复用
// - 同步机制:信号量(控制空/数据数量)+ 互斥锁(保护临界区)
// - 线程安全:所有对head/tail/buffer的操作都加锁保护
// ---------------------------------------------------------
template <typename T> // 模板类:支持任意类型的数据存储
class RingQueue {
private:
std::vector<T> _buffer; // 环形缓冲区:替代原生数组,自动管理内存(无需手动new/delete)
int _capacity; // 队列总容量(缓冲区最大可存储元素数)
int _head; // 读索引(消费者使用):指向下一个要取出的元素位置
int _tail; // 写索引(生产者使用):指向下一个要写入的元素位置
// 同步工具:保证多线程访问的安全性和顺序性
std::mutex _mtx; // 互斥锁:保护head/tail/_buffer的原子操作(临界区)
Semaphore _sem_empty; // 信号量:表示缓冲区的「空闲位置数」(生产者可用资源)
Semaphore _sem_data; // 信号量:表示缓冲区的「已存储数据数」(消费者可用资源)
public:
// 构造函数:初始化环形队列的容量和同步工具
// 参数cap:队列容量(必须>0)
RingQueue(int cap)
: _capacity(cap), // 初始化队列总容量
_buffer(cap), // 初始化缓冲区大小为cap
_head(0), // 读索引初始化为0
_tail(0), // 写索引初始化为0
_sem_empty(cap), // 初始空闲位置数 = 总容量(队列空)
_sem_data(0) // 初始数据数 = 0(队列空)
{}
// 禁止拷贝构造和赋值运算符重载
// 原因:队列包含互斥锁、信号量等不可拷贝的同步对象,拷贝会导致未定义行为
RingQueue(const RingQueue&) = delete;
RingQueue& operator=(const RingQueue&) = delete;
// 入队操作 (生产者调用):将数据写入环形队列
// 参数data:要写入队列的数据(const& 避免拷贝,提高效率)
void push(const T& data) {
// 1. P操作:申请「空闲位置」资源
// 若队列已满(_sem_empty值=0),生产者线程阻塞,直到消费者取出数据释放空位
_sem_empty.wait();
// 2. 加互斥锁:应对多个生产者, 保护临界区(_tail/_buffer的修改)
// lock_guard是RAII风格锁:作用域结束时自动解锁,避免手动解锁遗漏导致死锁
{ // 局部作用域:限制lock_guard的生命周期,解锁时机更精准
std::lock_guard<std::mutex> lock(_mtx);
// --- 临界区开始:仅允许一个线程执行 ---
_buffer[_tail] = data; // 将数据写入当前写索引位置
_tail = (_tail + 1) % _capacity; // 写索引后移,取模实现「环形」(到末尾则回到0)
// --- 临界区结束 ---
}
// 出作用域,lock_guard析构,_mtx自动解锁
// 3. V操作:释放「数据」资源(通知消费者有新数据可消费)
// _sem_data值+1,若有消费者阻塞在pop()的wait(),则唤醒其中一个
_sem_data.signal();
}
// 出队操作 (消费者调用):从环形队列取出数据
// 参数out_data:输出参数,用于接收取出的数据(指针避免返回值拷贝)
void pop(T* out_data) {
// 1. P操作:申请「数据」资源
// 若队列为空(_sem_data值=0),消费者线程阻塞,直到生产者写入数据
_sem_data.wait();
// 2. 加互斥锁:应对多个消费者, 保护临界区(_head/_buffer的读取)
{
std::lock_guard<std::mutex> lock(_mtx);
// --- 临界区开始 ---
*out_data = _buffer[_head]; // 从当前读索引位置取出数据
_head = (_head + 1) % _capacity; // 读索引后移,取模实现环形
// --- 临界区结束 ---
}
// 3. V操作:释放「空闲位置」资源(通知生产者有新空位可写入)
// _sem_empty值+1,若有生产者阻塞在push()的wait(),则唤醒其中一个
_sem_empty.signal();
}
};
// ---------------------------------------------------------
// 3. 主程序逻辑:生产者-消费者模型测试
// ---------------------------------------------------------
// 全局的环形队列实例:容量为5的整数队列(生产者和消费者共享)
RingQueue<int> g_queue(5);
// 生产者线程函数:无限循环生产数据并写入队列
void producer() {
int i = 0; // 生产的数据值(从0开始递增)
while (true) { // 死循环持续生产
// 打印生产信息:输出线程ID和生产的数据值
std::cout << "生产者 [" << std::this_thread::get_id() << "] 生产: " << i << std::endl;
// 将数据推入队列:若队列满则自动阻塞,无需手动判断
g_queue.push(i);
i++; // 下一个要生产的数据值
sleep(1); // 模拟生产耗时(1秒生产一个数据)
}
}
// 消费者线程函数:无限循环从队列取出数据并消费
void consumer() {
while (true) { // 死循环持续消费
int data; // 存储从队列取出的数据
// 从队列取出数据:若队列为空则自动阻塞,无需手动判断
g_queue.pop(&data);
// 打印消费信息:输出线程ID和消费的数据值(>>> 区分消费日志)
std::cout << " >>> 消费者 [" << std::this_thread::get_id() << "] 消费: " << data << std::endl;
sleep(2); // 模拟消费耗时(2秒消费一个数据,比生产慢,队列会逐渐填满)
}
}
int main() {
// 创建生产者线程和消费者线程
// 线程创建后自动执行对应的线程函数(producer/consumer)
std::thread t_prod(producer);
std::thread t_cons(consumer);
// 等待线程结束:阻塞主线程,直到生产者/消费者线程退出
// 注:由于线程函数是死循环,此处join()永远不会返回,程序会一直运行
t_prod.join();
t_cons.join();
return 0;
}
注意: 如果是单生产者-单消费者模型, 不需要加锁, 可以只使用信号量完成
C++ 写法的关键点
类封装 (Encapsulation):
-
使用者不需要手动调用 sem_init 或 sem_destroy,这些都在 Semaphore 和 RingQueue 的构造函数和析构函数中自动完成。
-
使用者不需要知道内部用的是 sem_wait 还是 pthread_mutex_lock,接口只有简单的 push 和 pop。
RAII 锁管理 (std::lock_guard):
-
在 C 中,你必须记得手动 unlock。如果在解锁前函数 return 了或者抛出了错误,就会造成死锁。
-
在 C++ 中,std::lock_guard<std::mutex> lock(_mtx); 这行代码创建了一个对象。当这个对象超出作用域(也就是遇到右大括号 })时,它的析构函数会自动释放锁。这是 C++ 多线程最核心的安全机制。
C++ 线程互斥锁 lock_guard
https://blog.csdn.net/Howrun777/article/details/157184216?spm=1001.2014.3001.5501
案例4: SPSC 无锁队列实现 (工业级写法)
这才是“不需要锁”的终极形态。我们利用 std::atomic 的 acquire / release 语义来替代锁和信号量。
#include <vector>
#include <atomic>
#include <thread>
#include <iostream>
template <typename T>
class SPSCLockFreeQueue {
private:
std::vector<T> _buffer;
int _capacity;
// 使用 atomic 替代普通的 int,并移除 Mutex 和 Semaphore
std::atomic<int> _head;
std::atomic<int> _tail;
public:
SPSCLockFreeQueue(int cap) : _capacity(cap + 1), _buffer(cap + 1), _head(0), _tail(0) {}
// 注意:实际容量是 cap,多留一个空位用于判断队满(head == (tail + 1) % size)
// 生产者调用
bool push(const T& data) {
int current_tail = _tail.load(std::memory_order_relaxed);
int next_tail = (current_tail + 1) % _capacity;
// 检查是否已满:需要读取 _head
// memory_order_acquire: 保证看到消费者对 _head 的最新修改
if (next_tail == _head.load(std::memory_order_acquire)) {
return false; // 队列满
}
_buffer[current_tail] = data;
// 更新 _tail
// memory_order_release: 保证在此之前的写入(buffer data)对读取 tail 的线程可见
_tail.store(next_tail, std::memory_order_release);
return true;
}
// 消费者调用
bool pop(T* out_data) {
int current_head = _head.load(std::memory_order_relaxed);
// 检查是否为空:需要读取 _tail
// memory_order_acquire: 保证看到生产者对 _tail 和 buffer 的最新修改
if (current_head == _tail.load(std::memory_order_acquire)) {
return false; // 队列空
}
*out_data = _buffer[current_head];
// 更新 _head
// memory_order_release: 通知生产者这个位置已经空出来了
_head.store((current_head + 1) % _capacity, std::memory_order_release);
return true;
}
};
-
针对你之前的代码:在单生产者-单消费者模式下,可以删掉 mutex。因为 semaphore 的屏障作用已经足够保证安全。你是对的。
-
更深层的逻辑:如果你觉得 mutex 慢,那么 semaphore 也很慢。
-
最佳实践:真正的 SPSC 优化方案是不使用任何阻塞工具(锁或信号量),而是利用 std::atomic 配合 CPU 缓存一致性协议,实现真正的“零拷贝、零阻塞”。
更多推荐


所有评论(0)