C++多线程(二):并发竞争控制--锁与条件变量
本文系统介绍了C++多线程编程中的核心同步机制。主要内容包括:1)基础互斥量mutex及其RAII封装lock_guard和unique_lock的使用;2)线程安全栈的实现及竞态条件处理;3)死锁成因分析及解决方案(解耦合、同时加锁、层级锁);4)高级同步机制如共享锁、递归锁的应用场景;5)线程安全单例模式的多种实现方法;6)条件变量的正确使用方式及线程安全队列的实现。文章通过大量代码示例详细说
·
目录
mutex(基础互斥量)
核心概念
- 互斥量(std::mutex):C++11 提供的最基础的互斥同步原语,用于保护临界区(多线程共享的资源 / 代码段),保证同一时刻只有一个线程能进入临界区,避免竞态条件。
- lock()/unlock():手动加锁 / 解锁接口,必须成对调用 ——
lock()会阻塞直到获取锁,unlock()释放锁;若忘记unlock()会导致死锁,若重复lock()也会导致死锁。 - 临界区:代码中访问共享资源的片段(如示例中的
shared_data++/--),必须用互斥量保护,否则多线程同时操作会导致数据错乱。
std::mutex mtx1;
int shared_data = 100;
void use_lock(){
while(true){
mtx1.lock(); // 手动加锁,进入临界区
shared_data++; // 临界区:操作共享数据
mtx1.unlock(); // 手动解锁,退出临界区
this_thread::sleep_for(chrono::microseconds(10));
}
}
void test_lock(){
std::thread t1(use_lock);
std::thread t2([](){
while(true){
mtx1.lock();
shared_data--;
std::cout << "shared data: " << shared_data << std::endl;
mtx1.unlock();
this_thread::sleep_for(chrono::microseconds(10));
}
});
t1.join();
t2.join();
}
lock_guard(RAII封装的互斥量)
核心概念
- RAII(资源获取即初始化):
lock_guard是对std::mutex的 RAII 封装 —— 构造时自动调用lock()加锁,析构时自动调用unlock()解锁,无需手动管理锁的生命周期。 - 作用域绑定:
lock_guard的锁范围与作用域绑定,出作用域(如代码块结束、函数返回)时自动解锁,彻底避免 “忘记解锁” 导致的死锁。 - 不可手动控制:
lock_guard设计为 “极简”,不支持手动解锁 / 加锁,仅用于简单的、全程锁定的场景
std::mutex m;
int n = 0;
void use_lock(){
while(true){
std::lock_guard<std::mutex> lock(m);// 构造自动加锁,析构自动解锁
n++; // 临界区:受锁保护
std::cout << "n = " << n << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
} // 出作用域,lock析构,自动解锁
}
实例:线程安全的栈(基于锁实现):
核心概念
- 竞态条件(Race Condition):多线程执行时序不确定导致的错误,典型场景是 “检查 - 然后 - 执行”(如先判断
empty(),再pop())—— 两个操作看似原子,实则中间可能被其他线程打断。 - 复合操作 vs 原子操作:
empty()和pop()各自是原子操作(内部加锁),但组合起来是复合操作,仍有线程安全问题;需将 “检查 + 执行” 封装为一个原子操作。 - 异常安全:直接返回值的
pop()可能因内存不足(如复杂类型拷贝失败)导致异常,用智能指针(std::shared_ptr)可减少拷贝,提升异常安全性。 - 空栈处理:避免返回 “非法值”(复杂类型难以定义),优先用异常或空指针表示空栈,而非让调用者自行判断。
// 如何保证数据安全
// 对于读取类型的操作,即使读取函数是线程安全的,但是返回值抛给外边使用,存在不安全性
// 比如一个栈对象,我们要保证其在多线程访问的时候是安全的,可以在判断栈是否为空,判断操作内部我们可以加锁,但是判断结束后返回值就不在加锁了,就会存在线程安全问题
// 比如我定义了如下栈, 对于多线程访问时判断栈是否为空,此后两个线程同时出栈,可能会造成崩溃。
template<typename T>
class threadsafe_stack1
{
private:
std::stack<T> data;
mutable std::mutex m; // mutable:const成员函数也能加锁
public:
threadsafe_stack1() {}
threadsafe_stack1(const threadsafe_stack1& other)
{
std::lock_guard<std::mutex> lock(other.m);
// 在构造函数的函数体(constructor body)内进行复制操作
data = other.data;
}
threadsafe_stack1& operator=(const threadsafe_stack1&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value)); // 移动语义减少拷贝
}
//问题代码:检查-然后-执行,存在竞态条件
T pop()
{
std::lock_guard<std::mutex> lock(m);
auto element = data.top();
data.pop();
return element;
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
//如下, 线程1和线程2先后判断栈都不为空,之后执行出战操作,会造成崩溃
void test_threadsafe_stack1() {
threadsafe_stack1<int> safe_stack;
safe_stack.push(1);
std::thread t1([&safe_stack]() {
if (!safe_stack.empty()) { // 检查:此时栈非空
std::this_thread::sleep_for(std::chrono::seconds(1));
safe_stack.pop(); // 执行:可能栈已被线程2清空
}
});
std::thread t2([&safe_stack]() {
if (!safe_stack.empty()) { // 检查:此时栈仍非空
std::this_thread::sleep_for(std::chrono::seconds(1));
safe_stack.pop(); // 执行:栈为空,调用top()崩溃
}
});
t1.join();
t2.join();
}
//解决这个问题我们可以用抛出异常的方式,比如定义一个空栈的异常
struct empty_stack : std::exception
{
const char* what() const throw();
};
// 然后实现我们的出栈函数
T pop()
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack(); // 检查+执行封装为原子操作
auto element = data.top();
data.pop();
return element;
}
// 但是可以在函数pop内部再次判断栈是否为空,若为空则返回一个非法数据,这样比抛出异常好一些。
// 但是如果T是一个复杂类型,我们很难定义一个非法值给外界知晓,这一点可以通过智能指针进行优化。
// 之后我们再介绍更优化的方案,因为现在这个pop函数仍存在问题,比如T是一个vector<int>类型,那么在pop函数内部element就是vector<int>类型
// 函数执行pop操作, 假设此时程序内存暴增,导致当程序使用的内存足够大时,可用的有效空间不够, 函数返回element时,就会就会存在vector做拷贝赋值时造成失败
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack() {}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
//①在构造函数的函数体(constructor body)内进行复制操作
data = other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
// 试图弹出前检查是否为空栈(原子操作)
if (data.empty()) throw empty_stack();
// 改动栈容器前设置返回值:智能指针减少拷贝
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
value = data.top(); // 引用传参,避免返回值拷贝
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
// 我们提供了两个版本的pop操作,一个是带引用类型的参数的,一个是直接pop出智能指针类型,这样在pop函数内部减少了数据的拷贝,防止内存溢出
// 其实这两种做法确实是相比之前直接pop固定类型的值更节省内存,运行效率也好很多。我们也完全可以基于之前的思想,在pop时如果队列为空则返回空指针,这样比抛出异常更有好一些
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
// 试图弹出前检查是否为空栈
if (data.empty()) return nullptr; // 空栈返回空指针,替代异常
// 改动栈容器前设置返回值
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}
死锁
核心概念(死锁成因)
- 死锁的四个必要条件(缺一不可):
- 互斥:资源只能被一个线程持有(如 mutex 是互斥的);
- 持有并等待:线程持有一个资源,同时等待另一个资源;
- 不可剥夺:资源不能被强制从持有线程中夺走;
- 循环等待:线程 A 等待线程 B 持有的资源,线程 B 等待线程 A 持有的资源。
- 循环加锁:死锁的典型场景 —— 线程 1 先锁 A 再锁 B,线程 2 先锁 B 再锁 A,最终互相等待对方释放锁。
std::mutex t_lock1;
std::mutex t_lock2;
int m_1 = 0;
int m_2 = 1;
void dead_lock1() {
while (true) {
std::cout << "dead_lock1 begin " << std::endl;
t_lock1.lock(); // 持有t_lock1
m_1 = 1024;
t_lock2.lock(); // 等待t_lock2(可能被线程2持有)
m_2 = 2048;
t_lock2.unlock();
t_lock1.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
std::cout << "dead_lock2 end " << std::endl;
}
}
void dead_lock2() {
while (true) {
std::cout << "dead_lock2 begin " << std::endl;
t_lock2.lock(); // 持有t_lock2
m_2 = 2048;
t_lock1.lock(); // 等待t_lock1(可能被线程1持有)
m_1 = 1024;
t_lock1.unlock();
t_lock2.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
std::cout << "dead_lock2 end " << std::endl;
}
}
// 然后我们启动两个线程
void test_dead_lock() {
std::thread t1(dead_lock1);
std::thread t2(dead_lock2);
t1.join();
t2.join();
}
// 这样运行之后在某一个时刻一定会导致死锁。
// 实际工作中避免死锁的一个方式就是将加锁和解锁的功能封装为独立的函数,
// 这样能保证在独立的函数里执行完操作后就解锁,不会导致一个函数里使用多个锁的情况
核心概念(死锁解决方案)
- 解耦合(打破 “持有并等待”):将多锁操作拆分为单锁原子操作,每个函数只操作一个锁,执行完立即释放,避免 “持有一个锁等待另一个锁”。
- 同时加锁(打破 “循环等待”):用
std::lock()同时锁定多个互斥量,底层通过算法保证加锁顺序一致,避免循环等待;配合std::lock_guard的std::adopt_lock(领养锁)管理解锁。 - std::scoped_lock(C++17):替代
std::lock + lock_guard,直接支持多锁同时加锁,自动解锁,语法更简洁。
解决办法一:解耦合
//加锁和解锁作为原子操作解耦合,各自只管理自己的功能
void atomic_lock1() {
std::cout << "lock1 begin lock" << std::endl;
t_lock1.lock();
m_1 = 1024;
t_lock1.unlock(); // 立即释放,不持有锁等待
std::cout << "lock1 end lock" << std::endl;
}
void atomic_lock2() {
std::cout << "lock2 begin lock" << std::endl;
t_lock2.lock();
m_2 = 2048;
t_lock2.unlock(); // 立即释放,不持有锁等待
std::cout << "lock2 end lock" << std::endl;
}
void safe_lock1() {
while (true) {
atomic_lock1();
atomic_lock2();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}
void safe_lock2() {
while (true) {
atomic_lock2();
atomic_lock1();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}
void test_safe_lock() {
std::thread t1(safe_lock1);
std::thread t2(safe_lock2);
t1.join();
t2.join();
}
解决办法二:同时加锁
//当我们无法避免在一个函数内部使用两个互斥量,并且都要解锁的情况,那我们可以采取同时加锁的方式。我们先定义一个类,假设这个类不推荐拷贝构造,但我们也提供了这个类的拷贝构造和移动构造
class som_big_object {
public:
som_big_object(int data) :_data(data) {}
//拷贝构造
som_big_object(const som_big_object& b2) :_data(b2._data) {
_data = b2._data;
}
//移动构造
som_big_object(som_big_object&& b2) :_data(std::move(b2._data)) {
}
//重载输出运算符
friend std::ostream& operator << (std::ostream& os, const som_big_object& big_obj) {
os << big_obj._data;
return os;
}
//重载赋值运算符
som_big_object& operator = (const som_big_object& b2) {
if (this == &b2) {
return *this;
}
_data = b2._data;
return *this;
}
//交换数据
friend void swap(som_big_object& b1, som_big_object& b2) {
som_big_object temp = std::move(b1);
b1 = std::move(b2);
b2 = std::move(temp);
}
private:
int _data;
};
// 接下来我们定义一个类对上面的类做管理,为防止多线程情况下数据混乱, 包含了一个互斥量。
class big_object_mgr {
public:
big_object_mgr(int data = 0) :_obj(data) {}
void printinfo() {
std::cout << "current obj data is " << _obj << std::endl;
}
friend void danger_swap(big_object_mgr& objm1, big_object_mgr& objm2);
friend void safe_swap(big_object_mgr& objm1, big_object_mgr& objm2);
friend void safe_swap_scope(big_object_mgr& objm1, big_object_mgr& objm2);
private:
std::mutex _mtx;
som_big_object _obj;
};
// 为了方便演示哪些交换是安全的,哪些是危险的,所以写了三个函数
void danger_swap(big_object_mgr& objm1, big_object_mgr& objm2) {
std::cout << "thread [ " << std::this_thread::get_id() << " ] begin" << std::endl;
if (&objm1 == &objm2) {
return;
}
std::lock_guard <std::mutex> gurad1(objm1._mtx); // 持有objm1._mtx
//此处为了故意制造死锁,我们让线程小睡一会
std::this_thread::sleep_for(std::chrono::seconds(1));
std::lock_guard<std::mutex> guard2(objm2._mtx); // 等待objm2._mtx(循环等待)
swap(objm1._obj, objm2._obj);
std::cout << "thread [ " << std::this_thread::get_id() << " ] end" << std::endl;
}
// danger_swap是危险的交换方式。比如如下调用
void test_danger_swap() {
big_object_mgr objm1(5);
big_object_mgr objm2(100);
std::thread t1(danger_swap, std::ref(objm1), std::ref(objm2));
std::thread t2(danger_swap, std::ref(objm2), std::ref(objm1));
t1.join();
t2.join();
objm1.printinfo();
objm2.printinfo();
}
// 这种调用方式存在隐患,因为danger_swap函数在两个线程中使用会造成互相竞争加锁的情况。
// 那就需要用锁同时锁住两个锁。
void safe_swap(big_object_mgr& objm1, big_object_mgr& objm2) {
std::cout << "thread [ " << std::this_thread::get_id() << " ] begin" << std::endl;
if (&objm1 == &objm2) {
return;
}
std::lock(objm1._mtx, objm2._mtx); // 同时加锁,底层保证顺序,打破循环等待
//领养锁:接管已加锁的mutex,析构时自动解锁
std::lock_guard <std::mutex> gurad1(objm1._mtx, std::adopt_lock);
//此处为了故意制造死锁,我们让线程小睡一会
std::this_thread::sleep_for(std::chrono::seconds(1));
std::lock_guard <std::mutex> gurad2(objm2._mtx, std::adopt_lock);
swap(objm1._obj, objm2._obj);
std::cout << "thread [ " << std::this_thread::get_id() << " ] end" << std::endl;
}
// 比如下面的调用就是合理的
void test_safe_swap() {
big_object_mgr objm1(5);
big_object_mgr objm2(100);
std::thread t1(safe_swap, std::ref(objm1), std::ref(objm2));
std::thread t2(safe_swap, std::ref(objm2), std::ref(objm1));
t1.join();
t2.join();
objm1.printinfo();
objm2.printinfo();
}
// 当然上面加锁的方式可以简化,C++17 scope_lock可以对多个互斥量同时加锁,并且自动释放
// 上述代码可以简化为以下方式
void safe_swap_scope(big_object_mgr& objm1, big_object_mgr& objm2) {
std::cout << "thread [ " << std::this_thread::get_id() << " ] begin" << std::endl;
if (&objm1 == &objm2) {
return;
}
std::scoped_lock guard(objm1._mtx, objm2._mtx); // C++17:多锁RAII,自动加锁/解锁
// 等价于
// std::scoped_lock<std::mutex, std::mutex> guard(objm1._mtx, objm2._mtx);
swap(objm1._obj, objm2._obj);
std::cout << "thread [ " << std::this_thread::get_id() << " ] end" << std::endl;
}
层级锁
核心概念
- 层级锁设计思想:强制线程按 “层级高低” 加锁 —— 高优先级锁先加,低优先级锁后加,释放时相反,从根本上打破 “循环等待” 条件。
- thread_local(线程局部存储):每个线程有独立的变量副本,用于记录当前线程持有的最高层级锁,确保加锁顺序符合层级规则。
- 违规检测:加锁时检查当前线程的层级是否高于待加锁的层级,若违反则抛出异常,提前暴露死锁风险。
class hierarchical_mutex {
public:
explicit hierarchical_mutex(unsigned long value) :_hierarchy_value(value),
_previous_hierarchy_value(0) {}
hierarchical_mutex(const hierarchical_mutex&) = delete;
hierarchical_mutex& operator=(const hierarchical_mutex&) = delete;
void lock() {
check_for_hierarchy_violation(); // 检查加锁顺序是否违规
_internal_mutex.lock();
update_hierarchy_value(); // 更新当前线程的层级
}
void unlock() {
if (_this_thread_hierarchy_value != _hierarchy_value) {
throw std::logic_error("mutex hierarchy violated");
}
_this_thread_hierarchy_value = _previous_hierarchy_value; // 恢复层级
_internal_mutex.unlock();
}
bool try_lock() {
check_for_hierarchy_violation();
if (!_internal_mutex.try_lock()) {
return false;
}
update_hierarchy_value();
return true;
}
private:
std::mutex _internal_mutex;
// 当前层级值(数值越大,层级越高)
unsigned long const _hierarchy_value;
// 上一次层级值(解锁时恢复)
unsigned long _previous_hierarchy_value;
// 本线程记录的层级值:thread_local,每个线程独立
static thread_local unsigned long _this_thread_hierarchy_value;
void check_for_hierarchy_violation() {
// 只能加层级更低的锁(当前线程层级 > 待加锁层级)
if (_this_thread_hierarchy_value <= _hierarchy_value) {
throw std::logic_error("mutex hierarchy violated");
}
}
void update_hierarchy_value() {
_previous_hierarchy_value = _this_thread_hierarchy_value;
_this_thread_hierarchy_value = _hierarchy_value;
}
};
// 初始化为最大值(ULONG_MAX),表示初始时线程无锁,可加任意高层级锁
thread_local unsigned long hierarchical_mutex::_this_thread_hierarchy_value(ULONG_MAX);
void test_hierarchy_lock() {
hierarchical_mutex hmtx1(1000); // 高层级
hierarchical_mutex hmtx2(500); // 低层级
std::thread t1([&hmtx1, &hmtx2]() {
hmtx1.lock(); // 合法:当前层级ULONG_MAX > 1000
hmtx2.lock(); // 合法:当前层级1000 > 500
hmtx2.unlock();
hmtx1.unlock();
});
std::thread t2([&hmtx1, &hmtx2]() {
hmtx2.lock(); // 合法:当前层级ULONG_MAX > 500
hmtx1.lock(); // 违规:当前层级500 <= 1000,抛出异常
hmtx1.unlock();
hmtx2.unlock();
});
t1.join();
t2.join();
}
// 层级锁能保证我们每个线程加锁时,一定是先加权重高的锁。
// 并且释放时也保证了顺序。
// 主要原理就是将当前锁的权重保存在线程变量中,这样该线程再次加锁时判断线程变量的权重和锁的权重是否大于,如果满足条件则继续加锁
unique_lock(灵活的互斥量封装)
核心概念
- 灵活的锁管理:
unique_lock是比lock_guard更灵活的 RAII 封装,支持:- 手动解锁(
unlock()):控制锁粒度,非共享操作提前解锁提升性能; - 延迟加锁(
std::defer_lock):构造时不加锁,后续手动lock(); - 领养锁(
std::adopt_lock):接管已加锁的 mutex,析构时自动解锁; - 移动语义:
unique_lock可移动(不可拷贝),支持函数返回锁对象。
- 手动解锁(
- 锁粒度:加锁范围的大小 —— 粒度太大(锁住无关代码)会降低并发,粒度太小(漏锁)会导致线程安全问题;
unique_lock可手动解锁,精准控制粒度。 - owns_lock():判断当前
unique_lock是否持有锁,用于调试 / 条件判断。
// unique_lock 基本用法
std::mutex mtx;
int shared_data = 0;
void use_unique() {
//lock可自动解锁,也可手动解锁
std::unique_lock<std::mutex> lock(mtx); // 构造自动加锁
std::cout << "lock success" << std::endl;
shared_data++; // 临界区
lock.unlock(); // 手动解锁,提前释放锁(提升并发)
}
// 我们可以通过unique_lock的owns_lock判断是否持有锁
// 可判断是否占有锁
void owns_lock() {
// lock可自动解锁,也可手动解锁
std::unique_lock<std::mutex> lock(mtx);
shared_data++;
if (lock.owns_lock()) { // 持有锁,返回true
std::cout << "owns lock" << std::endl;
}
else {
std::cout << "doesn't own lock" << std::endl;
}
lock.unlock(); // 手动解锁
if (lock.owns_lock()) { // 未持有锁,返回false
std::cout << "owns lock" << std::endl;
}
else {
std::cout << "doesn't own lock" << std::endl;
}
}
// 上述代码输出
// owns lock
// doesn't own lock
// unique_lock可以延迟加锁
void defer_lock() {
// 延迟加锁:构造时不加锁,需手动lock()
std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
// 可以加锁
lock.lock();
// 可以自动析构解锁,也可以手动解锁
lock.unlock();
}
// 那我们写一段代码综合运用owns_lock和defer_lock
// 同时使用owns和defer
void use_own_defer() {
std::unique_lock<std::mutex> lock(mtx); // 主线程加锁
// 判断是否拥有锁
if (lock.owns_lock())
{
std::cout << "Main thread has the lock." << std::endl;
}
else
{
std::cout << "Main thread does not have the lock." << std::endl;
}
std::thread t([]() {
std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 延迟加锁
// 判断是否拥有锁:初始未加锁,返回false
if (lock.owns_lock())
{
std::cout << "Thread has the lock." << std::endl;
}
else
{
std::cout << "Thread does not have the lock." << std::endl;
}
// 加锁:阻塞,因为主线程未释放锁
lock.lock();
// 判断是否拥有锁:持有锁,返回true
if (lock.owns_lock())
{
std::cout << "Thread has the lock." << std::endl;
}
else
{
std::cout << "Thread does not have the lock." << std::endl;
}
// 解锁
lock.unlock();
});
t.join(); // 主线程等待子线程,但子线程卡在lock(),程序死锁
}
// 上述代码回依次输出, 但是程序会阻塞,因为子线程会卡在加锁的逻辑上,因为主线程未释放锁,而主线程又等待子线程退出,导致整个程序卡住。
// Main thread has the lock.
// Thread does not have the lock.
// 和lock_guard一样,unique_lock也支持领养锁
// 同样支持领养操作
void use_own_adopt() {
mtx.lock(); // 手动加锁
// 领养锁:接管已加锁的mutex,析构时自动解锁
std::unique_lock<std::mutex> lock(mtx, std::adopt_lock);
if (lock.owns_lock()) { // 持有锁,返回true
std::cout << "owns lock" << std::endl;
}
else {
std::cout << "does not have the lock" << std::endl;
}
lock.unlock(); // 手动解锁
}
// 尽管是领养的,但是打印还是会出现owns lock,因为不管如何锁被加上,就会输出owns lock。
// 既然unique_lock支持领养操作也支持延迟加锁,那么可以用两种方式实现前文lock_guard实现的swap操作。
// 之前的交换代码可以可以用如下方式等价实现
int a = 10;
int b = 99;
std::mutex mtx1;
std::mutex mtx2;
void safe_swap() {
std::lock(mtx1, mtx2); // 同时加锁
std::unique_lock<std::mutex> lock1(mtx1, std::adopt_lock); // 领养锁1
std::unique_lock<std::mutex> lock2(mtx2, std::adopt_lock); // 领养锁2
std::swap(a, b);
//错误用法:mutex已被unique_lock管理,不能手动unlock
//mtx1.unlock();
//mtx2.unlock();
}
void safe_swap2() {
std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock); // 延迟加锁1
std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock); // 延迟加锁2
//需用lock1,lock2加锁(传入unique_lock对象)
std::lock(lock1, lock2);
//错误用法:不能直接lock mutex,需通过unique_lock
//std::lock(mtx1, mtx2);
std::swap(a, b);
}
// 大家注意一旦mutex被unique_lock管理,加锁和释放的操作就交给unique_lock,不能调用mutex加锁和解锁,因为锁的使用权已经交给unique_lock了。
// 我们知道mutex是不支持移动和拷贝的,但是unique_lock支持移动,当一个mutex被转移给unique_lock后,可以通过unique_ptr转移其归属权
// 转移互斥量所有权
// 互斥量本身不支持move操作,但是unique_lock支持
std::unique_lock <std::mutex> get_lock() {
std::unique_lock<std::mutex> lock(mtx); // 加锁
shared_data++;
return lock; // 移动返回,所有权转移
}
void use_return() {
std::unique_lock<std::mutex> lock(get_lock()); // 接收移动的锁
shared_data++;
}
// 锁的粒度表示加锁的精细程度,一个锁的粒度要足够大,保证可以锁住要访问的共享数据。
// 同时一个锁的粒度要足够小,保证非共享数据不被锁住影响性能。
// 而unique_ptr则很好的支持手动解锁。
void precision_lock() {
std::unique_lock<std::mutex> lock(mtx);
shared_data++; // 临界区:加锁
lock.unlock(); // 提前解锁
// 不设计共享数据的耗时操作不要放在锁内执行(提升并发)
std::this_thread::sleep_for(std::chrono::seconds(1));
lock.lock(); // 再次加锁
shared_data++; // 临界区:加锁
}
共享锁(读写锁)
核心概念
- 读者 - 写者问题:典型的并发场景 —— 读操作可并发(多个线程同时读),写操作需独占(同一时刻只有一个线程写,且写时不能读)。
- 共享互斥量(std::shared_mutex):C++17 引入,支持两种锁模式:
- 共享锁(读锁):
std::shared_lock,多个线程可同时持有,用于读操作; - 独占锁(写锁):
std::lock_guard/std::unique_lock,仅一个线程持有,用于写操作。
- 共享锁(读锁):
- 版本兼容:C++14 可用
std::shared_timed_mutex(支持超时加锁),C++11 可使用 Boost 库的boost::shared_mutex。 - 适用场景:读多写少的场景(如 DNS 缓存、配置读取),大幅提升并发效率。
class DNService {
public:
DNService() {}
//读操作采用共享锁:多个线程可同时读
std::string QueryDNS(std::string dnsname) {
std::shared_lock<std::shared_mutex> shared_locks(_shared_mtx); // 共享锁
auto iter = _dns_info.find(dnsname);
if (iter != _dns_info.end()) {
return iter->second;
}
return "";
}
//写操作采用独占锁:同一时刻只有一个线程写
void AddDNSInfo(std::string dnsname, std::string dnsentry) {
std::lock_guard<std::shared_mutex> guard_locks(_shared_mtx); // 独占锁
_dns_info.insert(std::make_pair(dnsname, dnsentry));
}
private:
std::map<std::string, std::string> _dns_info; // 共享数据
mutable std::shared_mutex _shared_mtx; // mutable:const成员函数可加锁
};
// QueryDNS 用来查询dns信息,多个线程可同时访问。
// AddDNSInfo 用来添加dns信息,属独占锁,同一时刻只有一个线程在修改。
递归锁
核心概念
- 递归互斥量(std::recursive_mutex):允许同一线程多次加锁(嵌套加锁),解锁次数需与加锁次数一致,避免普通 mutex 嵌套加锁导致的死锁。
- 设计缺陷信号:需要递归锁通常意味着代码设计不合理(如函数嵌套调用加锁),优先通过 “拆分逻辑、统一加锁” 规避,而非依赖递归锁。
- 性能损耗:递归锁比普通 mutex 有额外开销(记录加锁次数),非必要不使用。
class RecursiveDemo {
public:
RecursiveDemo() {}
bool QueryStudent(std::string name) {
std::lock_guard<std::recursive_mutex> recursive_lock(_recursive_mtx); // 加锁
auto iter_find = _students_info.find(name);
if (iter_find == _students_info.end()) {
return false;
}
return true;
}
void AddScore(std::string name, int score) {
std::lock_guard<std::recursive_mutex> recursive_lock(_recursive_mtx); // 加锁(嵌套)
if (!QueryStudent(name)) { // 调用QueryStudent,再次加锁(递归锁允许)
_students_info.insert(std::make_pair(name, score));
return;
}
_students_info[name] = _students_info[name] + score;
}
// 不推荐采用递归锁,使用递归锁说明设计思路并不理想,需优化设计
// 推荐拆分逻辑,将共有逻辑拆分为统一接口
void AddScoreAtomic(std::string name, int score) {
std::lock_guard<std::recursive_mutex> recursive_lock(_recursive_mtx); // 单次加锁
auto iter_find = _students_info.find(name); // 拆分QueryStudent的逻辑
if (iter_find == _students_info.end()) {
_students_info.insert(std::make_pair(name, score));
return;
}
_students_info[name] = _students_info[name] + score;
return;
}
private:
std::map<std::string, int> _students_info;
std::recursive_mutex _recursive_mtx; // 递归锁
};
// 我们可以看到AddScore函数内部调用了QueryStudent, 所以采用了递归锁。
// 但是我们同样可以改变设计,将两者公有的部分抽离出来生成一个新的接口AddScoreAtomic.
// AddScoreAtomic可以不适用递归锁,照样能完成线程安全操作的目的
线程安全的单例模式
核心概念
- 局部静态变量(C++11):C++11 规定局部静态变量的初始化是线程安全的(仅初始化一次),是最简单的线程安全单例实现方式。
- 饿汉式 vs 懒汉式:
- 饿汉式:程序启动时(主线程)初始化单例,避免多线程竞争,但可能提前占用资源;
- 懒汉式:首次调用时初始化,需加锁保护,避免重复初始化。
- 双重检查锁定(DCLP)问题:早期懒汉式的 “检查 - 加锁 - 再检查” 存在内存模型问题(指令重排),C++11 后可通过
std::atomic修复,或直接用std::call_once。 - std::call_once/once_flag:C++11 引入,保证多个线程中只有一个执行初始化函数,彻底解决懒汉式的线程安全问题。
- 智能指针管理资源:避免裸指针的内存泄漏,辅助删除类可规避手动
delete导致的崩溃。
1.局部静态变量
class Single2 {
private:
Single2()
{
}
Single2(const Single2&) = delete; // 禁用拷贝
Single2& operator=(const Single2&) = delete; // 禁用赋值
public:
static Single2& GetInst()
{
static Single2 single; // C++11 后线程安全初始化
return single;
}
};
// 上述版本的单例模式在C++11 以前存在多线程不安全的情况,编译器可能会初始化多个静态变量。
// 但是C++11推出以后,各厂商优化编译器,能保证线程安全。所以为了保证运行安全请确保使用C++11以上的标准。
2.饿汉式初始化
// 饿汉式:程序启动时初始化,避免多线程竞争
class Single2Hungry
{
private:
Single2Hungry()
{
}
Single2Hungry(const Single2Hungry&) = delete;
Single2Hungry& operator=(const Single2Hungry&) = delete;
public:
static Single2Hungry* GetInst()
{
if (single == nullptr)
{
single = new Single2Hungry();
}
return single;
}
private:
static Single2Hungry* single;
};
// 调用如下
// 饿汉式初始化:全局作用域初始化,主线程启动前完成
Single2Hungry* Single2Hungry::single = Single2Hungry::GetInst();
void thread_func_s2(int i)
{
std::cout << "this is thread " << i << std::endl;
std::cout << "inst is " << Single2Hungry::GetInst() << std::endl;
}
void test_single2hungry()
{
std::cout << "s1 addr is " << Single2Hungry::GetInst() << std::endl;
std::cout << "s2 addr is " << Single2Hungry::GetInst() << std::endl;
for (int i = 0; i < 3; i++)
{
std::thread tid(thread_func_s2, i);
tid.join();
}
}
// 饿汉式是从使用角度规避多线程的安全问题,很多情况下我们很难从规则角度限制开发人员,所以这种方式不是很推荐。
3.懒汉式初始化
// 懒汉式:首次调用时初始化,需加锁保护
class SinglePointer
{
private:
SinglePointer()
{
}
SinglePointer(const SinglePointer&) = delete;
SinglePointer& operator=(const SinglePointer&) = delete;
public:
static SinglePointer* GetInst()
{
if (single != nullptr) // 第一次检查:避免每次加锁
{
return single;
}
s_mutex.lock(); // 加锁
if (single != nullptr) // 第二次检查:避免重复初始化
{
s_mutex.unlock();
return single;
}
single = new SinglePointer();
s_mutex.unlock();
return single;
}
private:
static SinglePointer* single;
static std::mutex s_mutex;
};
// 调用如下
SinglePointer* SinglePointer::single = nullptr;
std::mutex SinglePointer::s_mutex;
void thread_func_lazy(int i)
{
std::cout << "this is lazy thread " << i << std::endl;
std::cout << "inst is " << SinglePointer::GetInst() << std::endl;
}
void test_singlelazy()
{
for (int i = 0; i < 3; i++)
{
std::thread tid(thread_func_lazy, i);
tid.join();
}
// 何时释放new的对象?造成内存泄漏
}
// 这种方式存在一个很严重的问题,就是当多个线程都调用单例函数时,我们不确定资源是被哪个线程初始化的。
// 回收指针存在问题,存在多重释放或者不知道哪个指针释放的问题。
// 智能指针
// 我们能想到一个自动初始化资源并且自动释放的方式就是智能指针。利用智能指针自动回收资源。
// 可以利用智能指针完成自动回收
class SingleAuto
{
private:
SingleAuto()
{
}
SingleAuto(const SingleAuto&) = delete;
SingleAuto& operator=(const SingleAuto&) = delete;
public:
~SingleAuto()
{
std::cout << "single auto delete success " << std::endl;
}
static std::shared_ptr<SingleAuto> GetInst()
{
if (single != nullptr)
{
return single;
}
s_mutex.lock();
if (single != nullptr)
{
s_mutex.unlock();
return single;
}
single = std::shared_ptr<SingleAuto>(new SingleAuto); // 智能指针管理资源
s_mutex.unlock();
return single;
}
private:
static std::shared_ptr<SingleAuto> single;
static std::mutex s_mutex;
};
// 调用方式如下
std::shared_ptr<SingleAuto> SingleAuto::single = nullptr;
std::mutex SingleAuto::s_mutex;
void test_singleauto()
{
auto sp1 = SingleAuto::GetInst();
auto sp2 = SingleAuto::GetInst();
std::cout << "sp1 is " << sp1 << std::endl;
std::cout << "sp2 is " << sp2 << std::endl;
// 此时存在隐患,可以手动删除裸指针,造成崩溃
// delete sp1.get();
}
// 这样开辟的资源交给智能指针管理免去了回收资源的麻烦。
// 但是有些人觉得虽然智能指针能自动回收内存,如果有开发人员手动delete指针怎么办?
// 所以有人提出了利用辅助类帮助智能指针释放资源,将智能指针的析构设置为私有。
//为了规避用户手动释放内存,可以提供一个辅助类帮忙回收内存
//并将单例类的析构函数写为私有
class SingleAutoSafe;
class SafeDeletor
{
public:
void operator()(SingleAutoSafe* sf) // 自定义删除器
{
std::cout << "this is safe deleter operator()" << std::endl;
delete sf;
}
};
class SingleAutoSafe
{
private:
SingleAutoSafe() {}
~SingleAutoSafe() // 私有析构,避免手动delete
{
std::cout << "this is single auto safe deletor" << std::endl;
}
SingleAutoSafe(const SingleAutoSafe&) = delete;
SingleAutoSafe& operator=(const SingleAutoSafe&) = delete;
//定义友元类,通过友元类调用该类析构函数
friend class SafeDeletor;
public:
static std::shared_ptr<SingleAutoSafe> GetInst()
{
if (single != nullptr)
{
return single;
}
s_mutex.lock();
if (single != nullptr)
{
s_mutex.unlock();
return single;
}
//额外指定删除器:只有SafeDeletor能调用析构
single = std::shared_ptr<SingleAutoSafe>(new SingleAutoSafe, SafeDeletor());
//也可以指定删除函数
// single = std::shared_ptr<SingleAutoSafe>(new SingleAutoSafe, SafeDelFunc);
s_mutex.unlock();
return single;
}
private:
static std::shared_ptr<SingleAutoSafe> single;
static std::mutex s_mutex;
};
// SafeDeletor就是删除的辅助类,实现了仿函数。构造智能指针时指定了SafeDeletor对象,这样就能帮助智能指针释放了。
// 但是上面的代码存在危险,比如懒汉式的使用方式,当多个线程调用单例时,有一个线程加锁进入3处的逻辑。
// 其他的线程有的在1处,判断指针非空则跳过初始化直接使用单例的内存会存在问题。
// 主要原因在于SingleAutoSafe * temp = new SingleAutoSafe() 这个操作是由三部分组成的
//1 调用allocate开辟内存
//2 调用construct执行SingleAutoSafe的构造函数
//3 调用赋值操作将地址赋值给temp
// 而现实中2和3的步骤可能颠倒,所以有可能在一些编译器中通过优化是1,3,2的调用顺序,
// 其他线程取到的指针就是非空,还没来的及调用构造函数就交给外部使用造成不可预知错误。
// 为解决这个问题,C++11 推出了std::call_once函数保证多个线程只执行一次(见下)
4.call_once(C++11 线程安全初始化)
class SingletonOnce {
private:
SingletonOnce() = default;
SingletonOnce(const SingletonOnce&) = delete;
SingletonOnce& operator = (const SingletonOnce& st) = delete;
static std::shared_ptr<SingletonOnce> _instance;
public :
static std::shared_ptr<SingletonOnce> GetInstance() {
static std::once_flag s_flag; // 局部静态once_flag,仅初始化一次
// call_once:多个线程仅一个执行初始化逻辑
std::call_once(s_flag, [&]() {
_instance = std::shared_ptr<SingletonOnce>(new SingletonOnce);
});
return _instance;
}
void PrintAddress() {
std::cout << _instance.get() << std::endl;
}
~SingletonOnce() {
std::cout << "this is singleton destruct" << std::endl;
}
};
std::shared_ptr<SingletonOnce> SingletonOnce::_instance = nullptr;
// 调用方式如下
void TestSingle() {
std::thread t1([]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
SingletonOnce::GetInstance()->PrintAddress();
});
std::thread t2([]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
SingletonOnce::GetInstance()->PrintAddress();
});
t1.join();
t2.join();
}
// 为了使用单例类更通用,比如项目中使用多个单例类,可以通过继承实现多个单例类
// 为了让单例更加通用,可以做成模板类
template <typename T>
class Singleton {
protected:
Singleton() = default;
Singleton(const Singleton<T>&) = delete;
Singleton& operator=(const Singleton<T>& st) = delete;
static std::shared_ptr<T> _instance;
public:
static std::shared_ptr<T> GetInstance() {
static std::once_flag s_flag;
std::call_once(s_flag, [&]() {
_instance = std::shared_ptr<T>(new T);
});
return _instance;
}
void PrintAddress() {
std::cout << _instance.get() << std::endl;
}
~Singleton() {
std::cout << "this is singleton destruct" << std::endl;
}
};
template <typename T>
std::shared_ptr<T> Singleton<T>::_instance = nullptr;
// 比如我们想实现单例类,就像我们之前在网络编程中介绍的那样,可以通过继承实现单例模式
// 想使用单例类,可以继承上面的模板,我们在网络编程中逻辑单例类用的就是这种方式
class LogicSystem :public Singleton<LogicSystem>
{
friend class Singleton<LogicSystem>; // 允许Singleton调用构造
public:
~LogicSystem(){}
private:
LogicSystem(){} // 私有构造,保证单例
};
条件变量
核心概念
- 条件变量(std::condition_variable):用于线程间的 “通知 - 等待” 机制,让线程在条件不满足时挂起(释放锁),条件满足时被唤醒(重新加锁),避免忙等(如 sleep 轮询)。
- wait () 机制:
wait(lock):释放锁,挂起线程,被唤醒后重新加锁;wait(lock, pred):带谓词的 wait,唤醒后检查谓词,不满足则继续挂起(避免伪唤醒)。
- 伪唤醒:条件变量可能被无原因唤醒(操作系统层面),必须配合谓词检查条件,否则会执行错误逻辑。
- notify_one()/notify_all():
notify_one():唤醒一个等待的线程;notify_all():唤醒所有等待的线程。
- unique_lock 必要性:
wait()需要临时释放锁,lock_guard不支持手动解锁,因此必须用unique_lock。
1. 不良实现(忙等)
int num = 1;
std::mutex mtx_num;
void PoorImpleman() {
std::thread t1([]() {
for (;;) {
{
std::lock_guard<std::mutex> lock(mtx_num);
if (num == 1) {
std::cout << "thread A print 1....." << std::endl;
num++;
continue;
}
} // 设置一个块,降低锁的颗粒度
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 忙等,浪费资源
}
});
std::thread t2([]() {
for (;;) {
{
std::lock_guard<std::mutex> lock(mtx_num);
if (num == 2) {
std::cout << "thread B print 2....." << std::endl;
num--;
continue;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 忙等
}
});
t1.join();
t2.join();
}
// PoorImpleman虽然能实现我们交替打印的功能,会造成消息处理的不及时,因为线程A要循环检测num值,如果num不为1,则线程A就睡眠了,
// 在线程A睡眠这段时间很可能B已经处理完打印了,此时A还在睡眠,是对资源的浪费,也错过了最佳的处理时机。所以我们提出了用条件变量来通知线程的机制,
// 当线程A发现条件不满足时可以挂起,等待线程B通知,线程B通知A后,A被唤醒继续处理。
2. 条件变量用法(高效通知 - 等待)
std::condition_variable cvA;
std::condition_variable cvB;
void ResonableImplemention() {
std::thread t1([]() {
for (;;) {
std::unique_lock<std::mutex> lock(mtx_num); // 必须用unique_lock
// 带谓词的wait:num==1时继续,否则释放锁挂起
cvA.wait(lock, []() {
return num == 1;
}); // 也可以写成while(num != 1) cvA.wait(lock);
num++;
std::cout << "thread A print 1....." << std::endl;
cvB.notify_one(); // 通知线程B
}
});
std::thread t2([]() {
for (;;) {
std::unique_lock<std::mutex> lock(mtx_num);
cvB.wait(lock, []() { // 带谓词,避免伪唤醒
return num == 2;
});
num--;
std::cout << "thread B print 2....." << std::endl;
cvA.notify_one(); // 通知线程A
}
});
t1.join();
t2.join();
}
// 当条件不满足时(num 不等于1 时)cvA.wait就会挂起,等待线程B通知通知线程A唤醒,线程B采用的是cvA.notifyone。
// 这么做的好处就是线程交替处理非常及时。比起sleep的方式,我们可以从控制台上看出差异效果,sleep的方式看出日志基本是每隔1秒才打印一次,效率不高。
3. 线程安全队列(基于条件变量)
template<typename T>
class threadsafe_queue
{
private:
std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond; // 条件变量:通知消费者
public:
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one(); // 通知一个消费者
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
// 带谓词的wait:队列非空时继续,否则挂起
data_cond.wait(lk,[this]{return !data_queue.empty();});
value = data_queue.front();
data_queue.pop();
}
};
// 假设data_chunk是自定义数据类型(此处省略定义)
struct data_chunk {};
bool more_data_to_prepare() { return false; }
data_chunk prepare_data() { return {}; }
void process(data_chunk) {}
bool is_last_chunk(data_chunk) { return true; }
threadsafe_queue<data_chunk> data_queue;
void data_preparation_thread() // 生产者线程
{
while(more_data_to_prepare())
{
data_chunk const data= prepare_data();
data_queue.push(data); // 生产数据,通知消费者
}
}
void data_processing_thread() // 消费者线程
{
while(true)
{
data_chunk data;
data_queue.wait_and_pop(data); // 等待数据,无数据则挂起
process(data);
if(is_last_chunk(data))
break;
}
}
// 我们可以启动三个线程,一个producer线程用来向队列中放入数据。一个consumer1线程用来阻塞等待pop队列中的元素。
// 另一个consumer2尝试从队列中pop元素,如果队列为空则直接返回,如果非空则pop元素。
// 打印时为了保证线程输出在屏幕上不会乱掉,所以加了锁保证互斥输出
// 测试代码如下
void test_safe_que() {
threadsafe_queue<int> safe_que;
std::mutex mtx_print; // 保证打印互斥
std::thread producer(
[&]() {
for (int i = 0; ;i++) {
safe_que.push(i); // 生产数据
{
std::lock_guard<std::mutex> printlk(mtx_print);
std::cout << "producer push data is " << i << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
);
std::thread consumer1(
[&]() {
for (;;) {
int data;
safe_que.wait_and_pop(data); // 阻塞等待数据
{
std::lock_guard<std::mutex> printlk(mtx_print);
std::cout << "consumer1 wait and pop data is " << data << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
);
std::thread consumer2(
[&]() {
for (;;) {
int data;
safe_que.wait_and_pop(data); // 阻塞等待数据
{
std::lock_guard<std::mutex> printlk(mtx_print);
std::cout << "consumer2 try_pop data is " << data << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
);
producer.join();
consumer1.join();
consumer2.join();
}
总结
- 基础锁:
std::mutex是核心,lock_guard是极简 RAII 封装,unique_lock是灵活封装(支持手动解锁、延迟加锁)。 - 死锁规避:打破死锁四个必要条件(解耦合、强制加锁顺序、同时加锁、层级锁),优先用
std::scoped_lock(C++17)管理多锁。 - 高级锁:共享锁(
shared_mutex)适用于读多写少场景,递归锁尽量规避(设计层面优化)。 - 线程同步:条件变量(
condition_variable)解决 “通知 - 等待” 问题,避免忙等,需配合unique_lock和谓词(防伪唤醒)。 - 单例安全:C++11 后优先用 “局部静态变量” 或
std::call_once,智能指针管理资源避免内存泄漏。
更多推荐


所有评论(0)