目录

引入

临界资源/区、互斥、原子性

pthread_mutex - 互斥锁

pthread_mutex_init/destory: 初始化/销毁锁

pthread_mutex_lock/unlock - 加锁解锁

综合运用

互斥锁的实现原理

互斥锁的封装

可重入VS线程安全

死锁


互斥概念引入

定义一个全局变量 int ticket = 1000; 表示有 1000 张票,创建出 4 个线程,让这 4 个线程并发的抢票,当 ticket == 0 时,让线程退出

#include <iostream>
#include <cstdio>
#include <cstring>
#include <vector>
#include <unistd.h>
#include <pthread.h>

using namespace std;

#define NUM 4

class threadData
{
public:
    threadData(int number)
    {
        threadname = "thread-" + to_string(number);
    }

public:
    string threadname;
};

int tickets = 1000; // 用多线程,模拟一轮抢票

void *getTicket(void *args)
{
    threadData *td = static_cast<threadData *>(args);
    const char *name = td->threadname.c_str();
    while (true)
    {
        if(tickets > 0)
        {
            usleep(1000);
            printf("who=%s, get a ticket: %d\n", name, tickets); // ?
            tickets--;
        }
        else
            break;
    }
    printf("%s ... quit\n", name);
    return nullptr;
}

int main()
{
    vector<pthread_t> tids;
    vector<threadData *> thread_datas;
    for (int i = 1; i <= NUM; i++)
    {
        pthread_t tid;
        threadData *td = new threadData(i);
        thread_datas.push_back(td);
        pthread_create(&tid, nullptr, getTicket, thread_datas[i - 1]);
        tids.push_back(tid);
    }

    for (auto thread : tids)
    {
        pthread_join(thread, nullptr);
    }

    for (auto td : thread_datas)
    {
        delete td;
    }
    return 0;
}

在抢票的最后:

who=thread-4, get a ticket: 2
who=thread-2, get a ticket: 1
thread-2 ... quit
who=thread-3, get a ticket: 0
thread-3 ... quit
who=thread-4, get a ticket: -1
thread-4 ... quit
who=thread-1, get a ticket: -2
thread-1 ... quit

线程抢到了编号为 0、-1、-2 的票,不符合预期。

分析为什么会出现这种情况:ticket 是全局变量,当多个线程并发访问共享数据时,可能出现数据不一致问题。对一个全局变量进行多线程并发--/++的操作是不安全的

对 ticket-- 操作做比较详细的讲解:

上面的每一步中,都对应一条汇编语句(非原子性)。线程在执行的时候,将共享数据,加载到CPU寄存器的本质:把数据的内容,变成了自己的上下文 --- 以拷贝的方式,给自己单独拿了一份。

假设现在有两个线程 a 和 b,a 线程只进行到了上面的步骤的第二步,它的时间片就耗尽了,a 线程被切换前,保存自己的上下文中,记录的 ticket 是 999。现在轮到 b 线程了,b 线程先读取 ticket 的值(1000),假设它一直进行 ticket-- 操作,把 ticket 减到了 10,然后时间片就耗尽了。现在又轮到 a 线程了,a 线程要接着执行第三步,但在执行第三步之前,要先恢复上下文,a 线程上下文记录的 ticket 是 999,于是将寄存器的值改为 999 后执行第三步,内存的 ticket 由 10 变为了 999 !这就是数据不一致问题

临界资源/区、互斥、原子性

  • 临界资源:多线程执行流共享的资源就叫做临界资源

  • 临界区

    • 指访问共享资源的那段代码片段(例如:修改全局变量、操作硬件寄存器)。

    • 特性:临界区不允许被多个执行流(进程、中断、SMP(对称多处理)中的CPU核)同时进入。如果同时进入,就会发生资源竞争,导致数据混乱。

  • 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用

  • 原子性

    • 指一个操作在执行完毕前不会被任何其他任务中断或分割的特性。对于观察者而言,原子操作要么执行完了,要么没执行,绝不处于中间状态。

    • 在Linux语境下,原子性通常分为指令级原子性(由CPU保证)和逻辑级原子性(由锁机制保证)

pthread_mutex - 互斥锁

要解决上面的问题,就要做到:对任何共享数据的访问,在任何时候,只有一个执行流在访问,即互斥

pthread_mutex_init/destory: 初始化/销毁锁

#include <pthread.h>

int pthread_mutex_init(pthread_mutex_t *restrict mutex,
                       const pthread_mutexattr_t *restrict attr);
  • mutex:指向要初始化的互斥锁对象的指针 。

  • attr:指向互斥锁属性对象的指针。如果传入 NULL,则使用默认属性进行初始化(通常为“快速”互斥锁,非递归、非检错)。

  • 返回值:成功:返回 0 。失败:返回一个错误码以指示错误(例如 EAGAINENOMEMEINVAL 或 EPERM)。需要注意的是,在 Linux 上,此函数总是返回 0,因此对其返回值进行检查可能是多余的

  • 说明:

    • 要保证所有线程都获取到同一把锁

    • 初始化后的互斥锁处于未锁定状态 。

    • 除了动态分配,还可以使用宏 PTHREAD_MUTEX_INITIALIZER 对静态分配的互斥锁(mutex 是全局变量)进行赋值初始化,其效果等同于调用 pthread_mutex_init  且attrNULL,用这种方式定义的锁不用pthread_mutex_init/destory 来初始化和销毁

    • pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    • 尝试初始化一个已经初始化过的互斥锁会导致未定义的行为 

#include <pthread.h>

int pthread_mutex_destroy(pthread_mutex_t *mutex);
  • mutex:指向要销毁的互斥锁对象的指针
  • 返回值:成功:返回 0 。失败:返回一个错误码以指示错误。最常见的错误是 EBUSY,表示互斥锁当前已被锁定或正在被其他线程使用(例如,在 pthread_cond_wait() 调用中),因此无法销毁 。也可能返回 EINVAL,表示 mutex 参数无效
  • 说明

  • 销毁互斥锁时,该互斥锁必须处于未锁定且未被使用的状态 。
  • 被销毁的互斥锁对象可以被 pthread_mutex_init 重新初始化 。
  • 在某些实现中(如 LinuxThreads),该函数可能除了检查互斥锁是否已解锁外,并不执行实际操作

pthread_mutex_lock/trylock/unlock - 加锁解锁

pthread_mutex_lock 函数用于锁定一个互斥量(mutex)。它的核心行为非常清晰:

  • 如果互斥锁当前是解锁状态:调用线程会立即获得该锁,并继续执行后续的代码。此时,互斥锁变为锁定状态,且调用线程成为其拥有者 。

  • 如果互斥锁已经被其他线程锁定:调用线程会被阻塞(block),进入休眠状态,直到该互斥锁被解锁。一旦锁被释放,该线程会被唤醒并尝试重新获得锁 。

这个过程确保了在任一时刻,只有一个线程能执行被该互斥锁保护的“临界区”代码。

#include <pthread.h>

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

mutex:指向需要锁定/释放的互斥锁对象的指针 。

返回值

  • 对于 lock:如果成功申请锁,返回 0 并继续往下执行。如果锁被占用,不会返回。如果发生异常或配置错误时,会返回错误码。
  • 对于 unlock:如果成功释放锁,返回 0 并继续往下执行。如果发生异常或配置错误时,会返回错误码。
  • 对于 trylock:如果成功申请锁,返回 0 并继续往下执行,如果锁被占用或发生异常或配置错误时,会返回错误码

注意

  • 加锁的本质是牺牲时间来换取数据安全为了不破环线程的高并发性,要保证临界区的代码越少越好。
  • 要保证一个互斥锁在使用完后能够解锁,要防止线程退出时互斥锁未释放导致其他线程一直阻塞等待的情况。可以用类来封装锁,实现自动解锁。
  • 不同线程对锁的竞争能力可能不同。由于某个线程对锁的竞争能力太强导致其他线程一直等待的现象叫做饥饿问题。如何解决饥饿问题:1、线程必须排队等待,先来后到 2、刚解完锁的线程,不能立刻加锁,必须排到等待队列的末尾。并不是说只要有互斥就必有饥饿问题,饥饿问题可以被避免。
  • 锁本身就是临界资源,为了保护锁,申请锁和释放锁的操作被设计成具有原子性的。
  • 即使对临界区的代码加锁,正在执行临界区的代码的线程依然可以被切换,但该线程是持有锁被切换的,其他没有锁的线程即使被调度也依然不能执行临界区的代码。持有锁的线程对其他线程来讲,它只有两种状态:持有锁和释放锁,所以线程访问临界资源的过程,对其他线程是原子的。

一个典型的使用模式如下,它必须与 pthread_mutex_unlock 配对出现

pthread_mutex_t mutex;

// ... 初始化互斥锁 ...

// 在访问共享资源前加锁
pthread_mutex_lock(&mutex);

// 临界区:在此处安全地操作共享资源
// ...

// 访问完成后立即解锁
pthread_mutex_unlock(&mutex);

综合运用

运用上面介绍的互斥锁,给引入部分的抢票代码加互斥锁,保证线程不会抢到负数的票。代码注释部分为新增有关互斥锁的代码

#include <iostream>
#include <cstdio>
#include <cstring>
#include <vector>
#include <unistd.h>
#include <pthread.h>
using namespace std;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int tickets = 3000; // 用多线程,模拟一轮抢票
const int NUM = 10;

class threadData
{
public:
    threadData(int number /*, pthread_mutex_t *mutex*/)
    {
        threadname = "thread-" + to_string(number);
        // lock = mutex;
    }

public:
    string threadname;
    // pthread_mutex_t *lock;
};

void* getTicket(void* args)
{
    threadData* td = static_cast<threadData*>(args);
    const char* name = td->threadname.c_str();
    while (true)
    {
        
        // pthread_mutex_lock(td->lock); 
        // 申请锁成功,才能往后执行,不成功,阻塞等待。
       
        if (tickets > 0)
        {
            usleep(1000);
            printf("who=%s, get a ticket: %d\n", name, tickets); // ?
            tickets--;
            // pthread_mutex_unlock(&lock);
        }
        else {
            // pthread_mutex_unlock(&lock);
            break;
        }
        usleep(13); 
        // 我们抢到了票,我们会立马抢下一张吗?
        // 其实多线程还要执行得到票之后的后续动作。usleep模拟
        // 如果没有usleep,unlock之后线程立刻又lock,其他线程就没有机会抢票了
        // 饥饿问题
    }
    printf("%s ... quit\n", name);
    return nullptr;
}

int main()
{
    // pthread_mutex_init(&lock, nullptr);

    vector<pthread_t> tids;
    vector<threadData*> thread_datas;
    for (int i = 1; i <= NUM; i++)
    {
        pthread_t tid;
        threadData* td = new threadData(i /*, &lock*/);
        thread_datas.push_back(td);
        pthread_create(&tid, nullptr, getTicket, thread_datas[i - 1]);
        tids.push_back(tid);
    }

    for (auto thread : tids)
    {
        pthread_join(thread, nullptr);
    }

    for (auto td : thread_datas)
    {
        delete td;
    }

    // pthread_mutex_destroy(&lock);
    return 0;
}

互斥锁的实现原理

锁本身就是临界资源为了保护锁,申请锁和释放锁的操作被设计成具有原子性的。那么具体是怎么实现的呢?为了实现互斥锁操作,大多数 CPU 架构都提供了 swap 或 exchange 指令,该指令的作用是把寄存器和内存单元的数据相交换,由于完成交换只需一条指令,保证了原子性

pthread_mutex_lock/unlock 的伪代码:

lock:
    movb $1, %al      ; 将 1(表示"我要锁")存入 al 寄存器
    xchgb %al, mutex  ; 原子性地交换 al 和内存 mutex 的内容
    cmp $0, %al   ; 测试拿回来的旧值(al)是否为 0
    jz lock_acquired  ; 如果旧值是 0(锁原来是空闲的),跳转到成功获取
    ; 如果旧值是1,说明锁已被占用
    ; 挂起等待(通常通过 futex 系统调用)
    jmp lock          ; 被唤醒后重试

lock_acquired:
    ; 成功获得锁,进入临界区
    ret

unlock:
    movb $0, mutex    ; 将 mutex 设置为0(释放锁)
    ; 唤醒等待该 mutex 的线程
    ret

互斥锁的封装

上面直接使用互斥锁的相关系统调用,有点不太方便,并且还有可能忘记解锁导致饥饿问题的麻烦。可以使用类来封装锁解决问题,即 RAII 风格的锁。

#pragma once

#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t *lock):lock_(lock)
    {}
    void Lock()
    {
        pthread_mutex_lock(lock_);
    }
    void Unlock()
    {
        pthread_mutex_unlock(lock_);
    }
    ~Mutex()
    {}
private:
    pthread_mutex_t *lock_;
};

// 封装锁和封装锁的系统调用解耦合

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *lock):mutex_(lock)
    {
        mutex_.Lock();
    }
    ~LockGuard()
    {
        mutex_.Unlock();
    }
private:
    Mutex mutex_;
};

封装锁后,上面的抢票函数就可以写成:

void* getTicket(void* args)
{
    threadData *td = static_cast<threadData *>(args);
    const char *name = td->threadname.c_str();
    while (true)
    {
        // 临界区单独一个代码块
        {
            LockGuard lockguard(&lock); // 临时的LockGuard对象, RAII风格的锁
            if (tickets > 0)
            {
                usleep(1000);
                printf("who=%s, get a ticket: %d\n", name, tickets); // ?
                tickets--;
            }
            else
                break;
        }
        // 出代码块后,LockGuard自动析构解锁
        usleep(13);
    }
    printf("%s ... quit\n", name);
    return nullptr;
}

可重入VS线程安全

  • 线程安全:多个线程并发执行同一段代码时,不会出现预料之外的结果,此时称为线程安全。否则称为线程不安全。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,可能会出现线程不安全。
  • 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,执行完毕不会出现预料之外的结果,则该函数被称为可重入函数,否则,是不可重入函数。

它们之间的关系:多线程代码中只要包含不可重入函数,可能出现线程不安全问题。多线程代码中只包含可重入函数,一定不会出现线程不安全问题。

常见的线程不安全的情况

  • 调用不保护共享变量的函数
  • 函数状态随着被调用,状态发生变化的函数(比如函数内部包含 static 变量用来统计该函数被调用了多少次)
  • 返回指向静态变量指针的函数
  • 调用线程不安全函数的函数

常见的线程安全的情况

  • 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
  • 类或者接口对于线程来说都是原子操作
  • 多个线程之间的切换不会导致该接口的执行结果存在二义性

常见不可重入的情况

  • 调用了 malloc/free 函数,因为 malloc 函数是用全局链表来管理堆的
  • 调用了标准 I/O 库函数,标准 I/O 库的很多实现都以不可重入的方式使用全局数据结构
  • 可重入函数体内使用了静态、全局的数据结构

常见可重入的情况

  • 不存在上面常见不可重入的情况的任意一种的函数
  • 不返回静态或全局数据,所有数据都由函数的调用者提供
  • 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据

死锁

死锁是指两个或多个线程(或进程)在执行过程中,因争夺资源而造成的一种互相等待的状态,导致它们都无法继续执行

在一个线程内部也可能产生死锁,即该线程连续多次申请同一把普通锁,如果第一次申请成功,在第二次申请时,由于 al 寄存器的交换后的值是 1,说明锁被占用,该线程阻塞等待自己释放锁,可是 pthread_mutex_unlock 在后面。相当于线程将自己反锁在门外了。

在两个或多个线程产生死锁时,必须先同时满足下面四个条件:

前提:使用锁以满足互斥

  • 互斥条件:一个资源每次只能被一个执行流使用

线程遵循下面两条原则:

  • 请求与保持条件:一个执行流因请求其他线程资源而阻塞时,对自己已获得的资源保持不释放。并且所有线程都是这样做的。
  • 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺

线程的等待关系必须满足:

  • 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系(A等待B,B等待C,C等待A)

如何解决死锁问题

  • 破坏死锁的四个必要条件

互斥条件一般不能破坏,可以破坏请求与保持条件:使用 pthead_mutex_trylock,即非阻塞申请锁(如果申请锁失败,便出错返回)。破坏不剥夺条件:释放对方的锁。破坏循环等待条件:所有线程按照顺序申请锁(按照锁1、锁2、锁3的顺序申请锁,不要一个线程申请锁2,另一个线程申请锁3)。

  • 避免锁未释放的场景
  • 资源一次性分配(要么一次获得所有资源,要么一个都不拿)

其他常见的锁

悲观锁

核心思想: 保守派。假设发生冲突的概率很高,因此在访问共享资源之前,先锁住资源,阻止其他任何进程的访问,直到当前持有者释放锁。操作完成之前,其他人只能等待。

特点:

  • 适用场景: 写操作较多、临界区执行时间较长、锁竞争非常激烈的场景。

  • 开销: 锁本身的获取和释放有固定的上下文开销(如系统调用、线程挂起/唤醒)。

  • 实现形式:

    • 互斥锁: 最常用的悲观锁。如果锁被占用,线程会进入睡眠状态,让出 CPU。适用于临界区较长、可能会睡眠的场景。

    • 读写锁: 读模式共享,写模式互斥。

    • 自旋锁: 特殊的悲观锁。如果锁被占用,当前线程不会睡眠,而是原地忙等(一直循环检测锁状态)。适用于临界区极短(如修改一个变量)、且不允许睡眠的上下文(如中断处理程序)。可以使用一个 while 死循环和 pthread_mutex_trylock 实现简单的自旋锁。也可以使用系统实现的 spin 自旋锁:

#include <pthread.h>

int pthread_spin_destroy(pthread_spinlock_t *lock);
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_trylock(pthread_spinlock_t *lock);
int pthread_spin_unlock(pthread_spinlock_t *lock);

乐观锁

核心思想: 乐观派。假设发生冲突的概率很低,因此不主动加锁。在修改共享资源时,先读取数据,进行计算,然后在最后提交更新时,检查在此期间数据是否被其他人修改过。

特点:

  • 适用场景: 读操作远多于写操作、锁竞争非常低的场景。

  • 开销: 如果没有冲突,开销非常小。如果有冲突,则需要回滚重试,浪费 CPU 周期。

  • 实现形式:

    • 无锁编程: 依赖 CPU 提供的原子指令,如 Compare And Swap。

      • 流程:读取值 -> 计算新值 -> 使用 CAS 指令将旧值与当前内存值比较,如果相等则写入新值;如果不相等,说明被修改过,则循环重试。

    • 序列锁: 一种特殊的乐观锁。写者从不等待读者,读者在读数据前后读取一个序列号。如果序列号发生变化,说明写者修改了数据,读者需要重试。它适用于读操作频繁且对数据一致性要求不那么严格(允许写者优先)的场景。

 读者写者问题

在读者写者问题中,写者与写者之间是互斥关系,写者与读者之间是同步+互斥关系,读者与读者之间通常没有关系(或称为共享共享)。那么为什么 cp 模型的消费者和消费者之间的关系是互斥的呢?区别在于数据是否会被拿走:读者不会拿走数据,而消费者会。

在该问题中,大多数时候是读者多,写者少,读者竞争锁的能力太强导致写者饥饿问题。但这是正常的现象,我们称为读者优先。读者优先:读者和写者同时竞争锁,读者竞争成功。当然也有写者优先:读者和写者同时竞争锁,写者竞争成功。

读写锁

注意:写独占,读共享,读锁优先级高

// 读写锁相关接口

int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
/*
pref 共有 3 种选择
PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先
PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁
*/

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t
*restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); // 读者加锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); // 写者加锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock); // 读者写者解锁

读者优先的伪代码,理解 rwlock 的实现原理

// 读者加锁/解锁

lock(&rlock); 
reader_count++; 
if(reader_count == 1) lock(&wlock); 
unlock(&rlock);

//进行读取

lock(&rlock); 
reader_count--; 
if(reader_count == 0) unlock(&wlock); 
unlock(&rlock);



// 写者加锁/解锁

lock(&wlock);

// 写入操作

unlock(&wlock);

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐