你有没有遇到过这样的情况:多个人同时抢 100 张火车票,最后却出现了 “-1 号票”?或者多人同时编辑一个文档,内容变得混乱不堪?在 Linux 多线程编程中,这种 “并发乱象” 的根源是多个线程同时访问共享资源—— 而解决它的核心,就是 “线程同步与互斥”。

今天我们就用生活中的场景类比,从 “为什么出问题” 到 “怎么解决”,再到 “工业级实战(线程池)”,带你彻底搞懂线程同步与互斥。

一、先搞懂:为什么多线程会出问题?

1.1 一个 “抢票乱象” 的真实案例

先看一段看似没问题的 “多线程售票代码”:

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

int ticket = 100; // 共享票源

// 售票逻辑
void *sell_ticket(void *arg) {
    char *thread_id = (char*)arg;
    while (1) {
        if (ticket > 0) {
            usleep(1000); // 模拟用户付款的耗时操作
            printf("%s 卖出第 %d 张票\n", thread_id, ticket);
            ticket--; // 票减1
        } else {
            break;
        }
    }
    return NULL;
}

int main() {
    pthread_t t1, t2, t3, t4;
    // 创建4个售票窗口(线程)
    pthread_create(&t1, NULL, sell_ticket, "窗口1");
    pthread_create(&t2, NULL, sell_ticket, "窗口2");
    pthread_create(&t3, NULL, sell_ticket, "窗口3");
    pthread_create(&t4, NULL, sell_ticket, "窗口4");
    
    // 等待所有线程结束
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);
    return 0;
}

运行后你可能会看到这样的混乱结果:

窗口4 卖出第 100 张票

窗口2 卖出第 99 张票

窗口1 卖出第 0 张票

窗口3 卖出第 -1 张票

为什么会出现 “负票”?问题出在ticket--这个操作 —— 它看似是 “一步”,实际对应三条 CPU 指令:

  1. Load:把内存中的ticket读到 CPU 寄存器;

  2. Update:寄存器中的值减 1;

  3. Store:把寄存器的值写回内存。

当 4 个线程同时执行这三步时,就会出现 “插队”:比如线程 1 刚把ticket=1读到寄存器,线程 2 也读走ticket=1,两者都减 1 后写回内存,最终ticket变成 0,但两个线程都以为自己卖出了 “第 1 张票”,后续甚至会出现负数。

1.2 核心概念:临界资源与临界区

要解决这个问题,首先要明确两个概念:

  • 临界资源:多个线程共享的资源(比如上面的ticket、文件、数据库连接);

  • 临界区:访问临界资源的代码段(比如if (ticket>0)ticket--的代码)。

问题的本质是:多个线程同时进入临界区,导致共享资源被破坏。解决方案就是 “互斥”—— 让临界区像 “公共厕所” 一样,同一时间只允许一个线程进入。

二、线程互斥:给临界区加一把 “锁”(互斥量)

Linux 提供的 “锁” 叫做互斥量(mutex),它的核心逻辑是:

  • 线程进入临界区前,先 “加锁”;

  • 加锁成功就进入临界区,失败则阻塞等待;

  • 离开临界区时 “解锁”,唤醒等待的线程。

2.1 互斥量的核心接口

互斥量的使用分 3 步:初始化→加锁→解锁→销毁,常用接口如下:

接口 功能 关键参数 / 说明
pthread_mutex_init 动态初始化互斥量 mutex:互斥量指针;attr:NULL(默认属性)
PTHREAD_MUTEX_INITIALIZER 静态初始化互斥量(全局 / 静态变量用) 直接赋值给pthread_mutex_t变量,无需销毁
pthread_mutex_lock 加锁(阻塞) 若互斥量已被锁,线程会挂起等待
pthread_mutex_unlock 解锁 必须由加锁的线程解锁,否则会出错
pthread_mutex_destroy 销毁互斥量 仅用于动态初始化的互斥量,静态初始化的无需销毁

2.2 用互斥量修复 “售票系统”

给上面的代码加互斥量,确保同一时间只有一个线程操作ticket

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

int ticket = 100;
pthread_mutex_t mutex; // 定义互斥量

void *sell_ticket(void *arg) {
    char *thread_id = (char*)arg;
    while (1) {
        pthread_mutex_lock(&mutex); // 加锁:进入临界区前抢锁
        if (ticket > 0) {
            usleep(1000);
            printf("%s 卖出第 %d 张票\n", thread_id, ticket);
            ticket--;
            pthread_mutex_unlock(&mutex); // 解锁:离开临界区释放锁
        } else {
            pthread_mutex_unlock(&mutex); // 即使没票,也要解锁!
            break;
        }
    }
    return NULL;
}

int main() {
    pthread_mutex_init(&mutex, NULL); // 动态初始化互斥量
    
    pthread_t t1, t2, t3, t4;
    pthread_create(&t1, NULL, sell_ticket, "窗口1");
    pthread_create(&t2, NULL, sell_ticket, "窗口2");
    pthread_create(&t3, NULL, sell_ticket, "窗口3");
    pthread_create(&t4, NULL, sell_ticket, "窗口4");
    
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);
    
    pthread_mutex_destroy(&mutex); // 销毁互斥量
    return 0;
}

运行后会发现:票从 100 依次减到 1,没有负数,也没有重复售票 —— 互斥量成功保护了临界资源。

2.3 避坑:用 RAII 风格封装互斥量

上面的代码有个隐患:如果临界区里有return或异常,可能会忘记解锁,导致 “死锁”(所有线程都等这把锁)。

解决方案是RAII(资源获取即初始化):用 C++ 类的构造函数加锁,析构函数解锁 —— 无论函数怎么退出,析构函数都会自动执行,确保解锁。

封装一个简单的LockGuard

// Lock.hpp
#include <pthread.h>

// 互斥量封装类
class Mutex {
public:
    Mutex() { pthread_mutex_init(&_mutex, NULL); }
    ~Mutex() { pthread_mutex_destroy(&_mutex); }
    
    // 加锁
    void lock() { pthread_mutex_lock(&_mutex); }
    // 解锁
    void unlock() { pthread_mutex_unlock(&_mutex); }
    // 获取原始互斥量(给条件变量用)
    pthread_mutex_t* get_raw() { return &_mutex; }

private:
    pthread_mutex_t _mutex;
    // 禁止拷贝(避免多个对象操作同一把锁)
    Mutex(const Mutex&) = delete;
    Mutex& operator=(const Mutex&) = delete;
};

// RAII风格的锁守卫
class LockGuard {
public:
    // 构造时加锁
    explicit LockGuard(Mutex& mutex) : _mutex(mutex) { _mutex.lock(); }
    // 析构时解锁
    ~LockGuard() { _mutex.unlock(); }

private:
    Mutex& _mutex;
    // 禁止拷贝
    LockGuard(const LockGuard&) = delete;
    LockGuard& operator=(const LockGuard&) = delete;
};

LockGuard简化售票代码,再也不用担心忘记解锁:

void *sell_ticket(void *arg) {
    char *thread_id = (char*)arg;
    while (1) {
        LockGuard lock(mutex); // 构造时加锁
        if (ticket > 0) {
            usleep(1000);
            printf("%s 卖出第 %d 张票\n", thread_id, ticket);
            ticket--;
        } else {
            break; // 析构时自动解锁
        }
    }
    return NULL; // 析构时自动解锁
}

三、线程同步:让线程 “按顺序” 执行(条件变量)

互斥解决了 “同一时间只有一个线程进临界区”,但没解决 “线程执行的顺序”。比如:

  • 消费者线程想从队列取数据,却发现队列为空;

  • 生产者线程想往队列存数据,却发现队列已满。

这时候需要条件变量—— 让线程在 “条件不满足” 时等待,条件满足时被唤醒,实现 “按顺序执行”。

3.1 条件变量的核心接口

条件变量通常和互斥量配合使用,核心接口如下:

接口 功能 关键说明
pthread_cond_init 初始化条件变量 attr:NULL(默认属性)
pthread_cond_destroy 销毁条件变量
pthread_cond_wait 等待条件满足 1. 自动释放互斥量;2. 被唤醒后重新抢互斥量;3. 必须在循环中判断条件(防 “伪唤醒”)
pthread_cond_signal 唤醒一个等待的线程 随机唤醒一个,适合 “一对一” 场景
pthread_cond_broadcast 唤醒所有等待的线程 适合 “一对多” 场景,比如 “通知所有消费者取数据”

3.2 为什么pthread_cond_wait需要互斥量?

你可能会疑惑:“等待的时候为什么要带互斥量?”

因为 “判断条件” 和 “等待” 必须是原子操作。比如消费者线程:

  1. 加锁后判断 “队列是否为空”;

  2. 如果为空,调用pthread_cond_wait—— 此时会自动释放互斥量,让生产者能加锁存数据;

  3. 生产者存完数据后唤醒消费者,消费者重新抢互斥量,再次判断条件(避免 “伪唤醒”)。

如果没有互斥量,消费者刚判断完 “队列为空”,还没来得及等待,生产者就存了数据并唤醒 —— 但消费者已经错过了唤醒信号,会一直等下去(死等)。

3.3 实战:用 “条件变量 + 互斥量” 实现阻塞队列

阻塞队列(Blocking Queue)是 “生产者 - 消费者模型” 的核心组件:

  • 队列为空时,消费者等待;

  • 队列满时,生产者等待;

  • 生产者存数据后唤醒消费者,消费者取数据后唤醒生产者。

实现一个简单的阻塞队列:

// BlockingQueue.hpp
#include <queue>
#include <pthread.h>
#include "Lock.hpp" // 复用之前的Mutex和LockGuard

template <typename T>
class BlockingQueue {
public:
    BlockingQueue(int capacity) : _capacity(capacity) {
        // 初始化条件变量
        pthread_cond_init(&_not_empty, NULL); // 消费者等待的条件:队列非空
        pthread_cond_init(&_not_full, NULL);  // 生产者等待的条件:队列非满
    }

    ~BlockingQueue() {
        pthread_cond_destroy(&_not_empty);
        pthread_cond_destroy(&_not_full);
    }

    // 生产者存数据
    void push(const T& data) {
        LockGuard lock(_mutex); // 加锁
        // 队列满了,等待“非满”条件
        while (size() >= _capacity) {
            pthread_cond_wait(&_not_full, _mutex.get_raw());
        }
        // 存数据
        _queue.push(data);
        printf("生产者存数据:%d,队列大小:%d\n", data, size());
        // 唤醒等待“非空”的消费者
        pthread_cond_signal(&_not_empty);
    }

    // 消费者取数据
    T pop() {
        LockGuard lock(_mutex); // 加锁
        // 队列空了,等待“非空”条件
        while (size() == 0) {
            pthread_cond_wait(&_not_empty, _mutex.get_raw());
        }
        // 取数据
        T data = _queue.front();
        _queue.pop();
        printf("消费者取数据:%d,队列大小:%d\n", data, size());
        // 唤醒等待“非满”的生产者
        pthread_cond_signal(&_not_full);
        return data;
    }

    // 获取队列大小
    size_t size() const { return _queue.size(); }

private:
    std::queue<T> _queue;       // 实际存储数据的队列
    int _capacity;              // 队列最大容量
    Mutex _mutex;               // 保护队列的互斥量
    pthread_cond_t _not_empty;  // 条件:队列非空(给消费者用)
    pthread_cond_t _not_full;   // 条件:队列非满(给生产者用)
};

测试生产者 - 消费者模型:

#include <stdio.h>
#include <pthread.h>
#include "BlockingQueue.hpp"

BlockingQueue<int> bq(5); // 队列容量5

// 生产者逻辑:不断生成数据
void *producer(void *arg) {
    int data = 0;
    while (1) {
        bq.push(data++);
        usleep(500000); // 每0.5秒生产一个数据
    }
    return NULL;
}

// 消费者逻辑:不断取数据
void *consumer(void *arg) {
    while (1) {
        bq.pop();
        usleep(1000000); // 每1秒消费一个数据
    }
    return NULL;
}

int main() {
    pthread_t p1, c1;
    pthread_create(&p1, NULL, producer, NULL);
    pthread_create(&c1, NULL, consumer, NULL);
    
    pthread_join(p1, NULL);
    pthread_join(c1, NULL);
    return 0;
}

运行后会看到有序的生产和消费:

生产者存数据:0,队列大小:1

消费者取数据:0,队列大小:0

生产者存数据:1,队列大小:1

生产者存数据:2,队列大小:2

消费者取数据:1,队列大小:1

生产者存数据:3,队列大小:2

...

四、工业级实战:线程池的设计与实现

在实际开发中,频繁创建 / 销毁线程会消耗大量资源(比如 Web 服务器每秒处理上千个请求,每次创建线程太耗时)。线程池就是 “预先创建一批线程,循环处理任务”,避免频繁创建线程的开销。

4.1 线程池的核心组成

一个基础的线程池包含 4 个部分:

  1. 线程数组:预先创建的线程,等待处理任务;

  2. 任务队列:存储待处理的任务(比如函数对象);

  3. 互斥量:保护任务队列的线程安全;

  4. 条件变量:任务队列空时,线程等待;有任务时,唤醒线程。

4.2 线程池的实现(结合单例模式)

为了保证全局只有一个线程池(避免资源浪费),我们用线程安全的单例模式实现:

// ThreadPool.hpp
#include <vector>
#include <queue>
#include <functional>
#include <pthread.h>
#include "Lock.hpp"
#include "BlockingQueue.hpp"

// 任务类型:函数对象
using Task = std::function<void()>;

class ThreadPool {
public:
    // 单例模式:获取全局唯一线程池
    static ThreadPool* get_instance(int thread_num = 5) {
        if (_instance == NULL) {
            LockGuard lock(_singleton_mutex); // 双重检查锁,保证线程安全
            if (_instance == NULL) {
                _instance = new ThreadPool(thread_num);
            }
        }
        return _instance;
    }

    // 销毁单例(可选)
    static void destroy_instance() {
        if (_instance != NULL) {
            delete _instance;
            _instance = NULL;
        }
    }

    // 提交任务到队列
    void submit_task(const Task& task) {
        _task_queue.push(task);
    }

    // 停止线程池
    void stop() {
        LockGuard lock(_mutex);
        _is_running = false;
        // 唤醒所有等待的线程
        _cond_not_empty.notify_all();
    }

private:
    // 私有构造:禁止外部创建
    ThreadPool(int thread_num) : _thread_num(thread_num), _is_running(true) {
        // 初始化条件变量
        pthread_cond_init(&_cond_not_empty, NULL);
        // 创建线程
        for (int i = 0; i < _thread_num; i++) {
            pthread_t tid;
            pthread_create(&tid, NULL, thread_work, this);
            _threads.push_back(tid);
        }
    }

    // 私有析构:禁止外部销毁
    ~ThreadPool() {
        stop();
        // 等待所有线程结束
        for (pthread_t tid : _threads) {
            pthread_join(tid, NULL);
        }
        // 销毁条件变量
        pthread_cond_destroy(&_cond_not_empty);
    }

    // 线程工作函数:循环取任务执行
    static void* thread_work(void* arg) {
        ThreadPool* pool = (ThreadPool*)arg;
        while (pool->_is_running) {
            LockGuard lock(pool->_mutex);
            // 任务队列为空,等待
            while (pool->_task_queue.empty() && pool->_is_running) {
                pthread_cond_wait(&pool->_cond_not_empty, pool->_mutex.get_raw());
            }
            // 如果线程池要停止,且任务队列为空,退出
            if (!pool->_is_running && pool->_task_queue.empty()) {
                break;
            }
            // 取任务执行
            Task task = pool->_task_queue.front();
            pool->_task_queue.pop();
            lock.~LockGuard(); // 提前解锁,避免任务执行时占用锁
            task(); // 执行任务
        }
        printf("线程 %lu 退出\n", pthread_self());
        return NULL;
    }

    // 单例相关
    static ThreadPool* _instance;
    static Mutex _singleton_mutex;

    int _thread_num;          // 线程数量
    std::vector<pthread_t> _threads; // 线程数组
    std::queue<Task> _task_queue;    // 任务队列
    Mutex _mutex;             // 保护任务队列的互斥量
    pthread_cond_t _cond_not_empty;  // 任务非空条件变量
    bool _is_running;         // 线程池运行状态
};

// 初始化静态成员
ThreadPool* ThreadPool::_instance = NULL;
Mutex ThreadPool::_singleton_mutex;

4.3 测试线程池

用 “打印日志” 作为任务,测试线程池:

#include <stdio.h>
#include <unistd.h>
#include "ThreadPool.hpp"

// 任务1:打印数字
void print_num(int num) {
    printf("线程 %lu 处理任务:打印数字 %d\n", pthread_self(), num);
    usleep(500000);
}

int main() {
    // 获取线程池(5个线程)
    ThreadPool* pool = ThreadPool::get_instance(5);
    
    // 提交10个任务
    for (int i = 0; i < 10; i++) {
        // 用lambda封装任务(捕获i)
        pool->submit_task([i]() { print_num(i); });
    }
    
    // 等待所有任务完成
    sleep(5);
    // 停止线程池
    pool->stop();
    // 销毁单例
    ThreadPool::destroy_instance();
    return 0;
}

运行后会看到 5 个线程轮流处理 10 个任务,避免了频繁创建线程的开销。

五、避坑指南:死锁与线程安全

5.1 死锁:线程的 “互相僵持”

死锁是多线程编程的 “噩梦”—— 比如线程 A 持有锁 1,等待锁 2;线程 B 持有锁 2,等待锁 1,两者互相僵持,永远无法继续。

死锁的 4 个必要条件(缺一不可):
  1. 互斥条件:资源只能被一个线程持有(比如锁);

  2. 请求与保持:线程持有一个资源,又请求另一个资源;

  3. 不剥夺条件:资源不能被强行夺走(只能主动释放);

  4. 循环等待:线程间形成 “你等我、我等你” 的循环。

如何避免死锁?

破坏任意一个必要条件即可,常用方法:

  • 破坏循环等待:所有线程按 “固定顺序” 加锁(比如先锁 1 后锁 2);

  • 破坏请求与保持:一次性申请所有资源(比如同时锁 1 和锁 2);

  • 超时释放:加锁时设置超时时间,超时后释放已持有资源。

5.2 线程安全与可重入

很多人会混淆 “线程安全” 和 “可重入”,其实两者有明确区别:

概念 核心含义 例子
线程安全 多个线程访问时,结果正确(通常靠锁实现) 加锁的售票函数,多个线程调用结果正确
可重入 函数被多个执行流(线程 / 信号)重复调用,结果正确 纯数学函数(如int add(int a, int b) { return a+b; }
关键结论:
  • 可重入函数一定是线程安全的

  • 线程安全函数不一定是可重入的(比如加锁的函数,重入时会死锁);

  • 避免在信号处理函数中调用不可重入函数(如printfmalloc)。

六、总结:从 “乱象” 到 “有序” 的核心逻辑

Linux 线程同步与互斥的本质,是 “解决多线程并发访问共享资源的问题”:

  1. 互斥(锁):解决 “同一时间多个线程进临界区” 的问题,保证共享资源不被破坏;

  2. 同步(条件变量):解决 “线程执行顺序” 的问题,避免线程 “死等”;

  3. 线程池:解决 “频繁创建线程的开销” 问题,提升工业级程序的性能;

  4. 避坑:避免死锁(固定加锁顺序)、区分线程安全与可重入。

记住:好的多线程程序,不是 “越多线程越好”,而是 “有序、高效地利用线程”—— 这正是同步与互斥的核心价值。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐