Linux 线程同步与互斥:从 “抢票乱象” 到 “井然有序” 的实战指南
文章摘要:本文深入探讨了Linux多线程编程中的同步与互斥问题。通过抢票系统的案例,分析了多线程并发访问共享资源导致的"负票"现象,并提出了解决方案:1)使用互斥量保护临界区;2)采用RAII风格封装锁机制避免死锁;3)引入条件变量实现线程同步。文章还展示了线程池的工业级实现,结合单例模式优化资源管理。最后总结了避免死锁、区分线程安全与可重入等关键点,强调多线程编程的核心在于有
你有没有遇到过这样的情况:多个人同时抢 100 张火车票,最后却出现了 “-1 号票”?或者多人同时编辑一个文档,内容变得混乱不堪?在 Linux 多线程编程中,这种 “并发乱象” 的根源是多个线程同时访问共享资源—— 而解决它的核心,就是 “线程同步与互斥”。
今天我们就用生活中的场景类比,从 “为什么出问题” 到 “怎么解决”,再到 “工业级实战(线程池)”,带你彻底搞懂线程同步与互斥。
一、先搞懂:为什么多线程会出问题?
1.1 一个 “抢票乱象” 的真实案例
先看一段看似没问题的 “多线程售票代码”:
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
int ticket = 100; // 共享票源
// 售票逻辑
void *sell_ticket(void *arg) {
char *thread_id = (char*)arg;
while (1) {
if (ticket > 0) {
usleep(1000); // 模拟用户付款的耗时操作
printf("%s 卖出第 %d 张票\n", thread_id, ticket);
ticket--; // 票减1
} else {
break;
}
}
return NULL;
}
int main() {
pthread_t t1, t2, t3, t4;
// 创建4个售票窗口(线程)
pthread_create(&t1, NULL, sell_ticket, "窗口1");
pthread_create(&t2, NULL, sell_ticket, "窗口2");
pthread_create(&t3, NULL, sell_ticket, "窗口3");
pthread_create(&t4, NULL, sell_ticket, "窗口4");
// 等待所有线程结束
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
return 0;
}
运行后你可能会看到这样的混乱结果:
窗口4 卖出第 100 张票
窗口2 卖出第 99 张票
窗口1 卖出第 0 张票
窗口3 卖出第 -1 张票
为什么会出现 “负票”?问题出在ticket--
这个操作 —— 它看似是 “一步”,实际对应三条 CPU 指令:
-
Load:把内存中的
ticket
读到 CPU 寄存器; -
Update:寄存器中的值减 1;
-
Store:把寄存器的值写回内存。
当 4 个线程同时执行这三步时,就会出现 “插队”:比如线程 1 刚把ticket=1
读到寄存器,线程 2 也读走ticket=1
,两者都减 1 后写回内存,最终ticket
变成 0,但两个线程都以为自己卖出了 “第 1 张票”,后续甚至会出现负数。
1.2 核心概念:临界资源与临界区
要解决这个问题,首先要明确两个概念:
-
临界资源:多个线程共享的资源(比如上面的
ticket
、文件、数据库连接); -
临界区:访问临界资源的代码段(比如
if (ticket>0)
到ticket--
的代码)。
问题的本质是:多个线程同时进入临界区,导致共享资源被破坏。解决方案就是 “互斥”—— 让临界区像 “公共厕所” 一样,同一时间只允许一个线程进入。
二、线程互斥:给临界区加一把 “锁”(互斥量)
Linux 提供的 “锁” 叫做互斥量(mutex),它的核心逻辑是:
-
线程进入临界区前,先 “加锁”;
-
加锁成功就进入临界区,失败则阻塞等待;
-
离开临界区时 “解锁”,唤醒等待的线程。
2.1 互斥量的核心接口
互斥量的使用分 3 步:初始化→加锁→解锁→销毁,常用接口如下:
接口 | 功能 | 关键参数 / 说明 |
---|---|---|
pthread_mutex_init |
动态初始化互斥量 | mutex :互斥量指针;attr :NULL(默认属性) |
PTHREAD_MUTEX_INITIALIZER |
静态初始化互斥量(全局 / 静态变量用) | 直接赋值给pthread_mutex_t 变量,无需销毁 |
pthread_mutex_lock |
加锁(阻塞) | 若互斥量已被锁,线程会挂起等待 |
pthread_mutex_unlock |
解锁 | 必须由加锁的线程解锁,否则会出错 |
pthread_mutex_destroy |
销毁互斥量 | 仅用于动态初始化的互斥量,静态初始化的无需销毁 |
2.2 用互斥量修复 “售票系统”
给上面的代码加互斥量,确保同一时间只有一个线程操作ticket
:
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
int ticket = 100;
pthread_mutex_t mutex; // 定义互斥量
void *sell_ticket(void *arg) {
char *thread_id = (char*)arg;
while (1) {
pthread_mutex_lock(&mutex); // 加锁:进入临界区前抢锁
if (ticket > 0) {
usleep(1000);
printf("%s 卖出第 %d 张票\n", thread_id, ticket);
ticket--;
pthread_mutex_unlock(&mutex); // 解锁:离开临界区释放锁
} else {
pthread_mutex_unlock(&mutex); // 即使没票,也要解锁!
break;
}
}
return NULL;
}
int main() {
pthread_mutex_init(&mutex, NULL); // 动态初始化互斥量
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, sell_ticket, "窗口1");
pthread_create(&t2, NULL, sell_ticket, "窗口2");
pthread_create(&t3, NULL, sell_ticket, "窗口3");
pthread_create(&t4, NULL, sell_ticket, "窗口4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
pthread_mutex_destroy(&mutex); // 销毁互斥量
return 0;
}
运行后会发现:票从 100 依次减到 1,没有负数,也没有重复售票 —— 互斥量成功保护了临界资源。
2.3 避坑:用 RAII 风格封装互斥量
上面的代码有个隐患:如果临界区里有return
或异常,可能会忘记解锁,导致 “死锁”(所有线程都等这把锁)。
解决方案是RAII(资源获取即初始化):用 C++ 类的构造函数加锁,析构函数解锁 —— 无论函数怎么退出,析构函数都会自动执行,确保解锁。
封装一个简单的LockGuard
:
// Lock.hpp
#include <pthread.h>
// 互斥量封装类
class Mutex {
public:
Mutex() { pthread_mutex_init(&_mutex, NULL); }
~Mutex() { pthread_mutex_destroy(&_mutex); }
// 加锁
void lock() { pthread_mutex_lock(&_mutex); }
// 解锁
void unlock() { pthread_mutex_unlock(&_mutex); }
// 获取原始互斥量(给条件变量用)
pthread_mutex_t* get_raw() { return &_mutex; }
private:
pthread_mutex_t _mutex;
// 禁止拷贝(避免多个对象操作同一把锁)
Mutex(const Mutex&) = delete;
Mutex& operator=(const Mutex&) = delete;
};
// RAII风格的锁守卫
class LockGuard {
public:
// 构造时加锁
explicit LockGuard(Mutex& mutex) : _mutex(mutex) { _mutex.lock(); }
// 析构时解锁
~LockGuard() { _mutex.unlock(); }
private:
Mutex& _mutex;
// 禁止拷贝
LockGuard(const LockGuard&) = delete;
LockGuard& operator=(const LockGuard&) = delete;
};
用LockGuard
简化售票代码,再也不用担心忘记解锁:
void *sell_ticket(void *arg) {
char *thread_id = (char*)arg;
while (1) {
LockGuard lock(mutex); // 构造时加锁
if (ticket > 0) {
usleep(1000);
printf("%s 卖出第 %d 张票\n", thread_id, ticket);
ticket--;
} else {
break; // 析构时自动解锁
}
}
return NULL; // 析构时自动解锁
}
三、线程同步:让线程 “按顺序” 执行(条件变量)
互斥解决了 “同一时间只有一个线程进临界区”,但没解决 “线程执行的顺序”。比如:
-
消费者线程想从队列取数据,却发现队列为空;
-
生产者线程想往队列存数据,却发现队列已满。
这时候需要条件变量—— 让线程在 “条件不满足” 时等待,条件满足时被唤醒,实现 “按顺序执行”。
3.1 条件变量的核心接口
条件变量通常和互斥量配合使用,核心接口如下:
接口 | 功能 | 关键说明 |
---|---|---|
pthread_cond_init |
初始化条件变量 | attr :NULL(默认属性) |
pthread_cond_destroy |
销毁条件变量 | |
pthread_cond_wait |
等待条件满足 | 1. 自动释放互斥量;2. 被唤醒后重新抢互斥量;3. 必须在循环中判断条件(防 “伪唤醒”) |
pthread_cond_signal |
唤醒一个等待的线程 | 随机唤醒一个,适合 “一对一” 场景 |
pthread_cond_broadcast |
唤醒所有等待的线程 | 适合 “一对多” 场景,比如 “通知所有消费者取数据” |
3.2 为什么pthread_cond_wait
需要互斥量?
你可能会疑惑:“等待的时候为什么要带互斥量?”
因为 “判断条件” 和 “等待” 必须是原子操作。比如消费者线程:
-
加锁后判断 “队列是否为空”;
-
如果为空,调用
pthread_cond_wait
—— 此时会自动释放互斥量,让生产者能加锁存数据; -
生产者存完数据后唤醒消费者,消费者重新抢互斥量,再次判断条件(避免 “伪唤醒”)。
如果没有互斥量,消费者刚判断完 “队列为空”,还没来得及等待,生产者就存了数据并唤醒 —— 但消费者已经错过了唤醒信号,会一直等下去(死等)。
3.3 实战:用 “条件变量 + 互斥量” 实现阻塞队列
阻塞队列(Blocking Queue)是 “生产者 - 消费者模型” 的核心组件:
-
队列为空时,消费者等待;
-
队列满时,生产者等待;
-
生产者存数据后唤醒消费者,消费者取数据后唤醒生产者。
实现一个简单的阻塞队列:
// BlockingQueue.hpp
#include <queue>
#include <pthread.h>
#include "Lock.hpp" // 复用之前的Mutex和LockGuard
template <typename T>
class BlockingQueue {
public:
BlockingQueue(int capacity) : _capacity(capacity) {
// 初始化条件变量
pthread_cond_init(&_not_empty, NULL); // 消费者等待的条件:队列非空
pthread_cond_init(&_not_full, NULL); // 生产者等待的条件:队列非满
}
~BlockingQueue() {
pthread_cond_destroy(&_not_empty);
pthread_cond_destroy(&_not_full);
}
// 生产者存数据
void push(const T& data) {
LockGuard lock(_mutex); // 加锁
// 队列满了,等待“非满”条件
while (size() >= _capacity) {
pthread_cond_wait(&_not_full, _mutex.get_raw());
}
// 存数据
_queue.push(data);
printf("生产者存数据:%d,队列大小:%d\n", data, size());
// 唤醒等待“非空”的消费者
pthread_cond_signal(&_not_empty);
}
// 消费者取数据
T pop() {
LockGuard lock(_mutex); // 加锁
// 队列空了,等待“非空”条件
while (size() == 0) {
pthread_cond_wait(&_not_empty, _mutex.get_raw());
}
// 取数据
T data = _queue.front();
_queue.pop();
printf("消费者取数据:%d,队列大小:%d\n", data, size());
// 唤醒等待“非满”的生产者
pthread_cond_signal(&_not_full);
return data;
}
// 获取队列大小
size_t size() const { return _queue.size(); }
private:
std::queue<T> _queue; // 实际存储数据的队列
int _capacity; // 队列最大容量
Mutex _mutex; // 保护队列的互斥量
pthread_cond_t _not_empty; // 条件:队列非空(给消费者用)
pthread_cond_t _not_full; // 条件:队列非满(给生产者用)
};
测试生产者 - 消费者模型:
#include <stdio.h>
#include <pthread.h>
#include "BlockingQueue.hpp"
BlockingQueue<int> bq(5); // 队列容量5
// 生产者逻辑:不断生成数据
void *producer(void *arg) {
int data = 0;
while (1) {
bq.push(data++);
usleep(500000); // 每0.5秒生产一个数据
}
return NULL;
}
// 消费者逻辑:不断取数据
void *consumer(void *arg) {
while (1) {
bq.pop();
usleep(1000000); // 每1秒消费一个数据
}
return NULL;
}
int main() {
pthread_t p1, c1;
pthread_create(&p1, NULL, producer, NULL);
pthread_create(&c1, NULL, consumer, NULL);
pthread_join(p1, NULL);
pthread_join(c1, NULL);
return 0;
}
运行后会看到有序的生产和消费:
生产者存数据:0,队列大小:1
消费者取数据:0,队列大小:0
生产者存数据:1,队列大小:1
生产者存数据:2,队列大小:2
消费者取数据:1,队列大小:1
生产者存数据:3,队列大小:2
...
四、工业级实战:线程池的设计与实现
在实际开发中,频繁创建 / 销毁线程会消耗大量资源(比如 Web 服务器每秒处理上千个请求,每次创建线程太耗时)。线程池就是 “预先创建一批线程,循环处理任务”,避免频繁创建线程的开销。
4.1 线程池的核心组成
一个基础的线程池包含 4 个部分:
-
线程数组:预先创建的线程,等待处理任务;
-
任务队列:存储待处理的任务(比如函数对象);
-
互斥量:保护任务队列的线程安全;
-
条件变量:任务队列空时,线程等待;有任务时,唤醒线程。
4.2 线程池的实现(结合单例模式)
为了保证全局只有一个线程池(避免资源浪费),我们用线程安全的单例模式实现:
// ThreadPool.hpp
#include <vector>
#include <queue>
#include <functional>
#include <pthread.h>
#include "Lock.hpp"
#include "BlockingQueue.hpp"
// 任务类型:函数对象
using Task = std::function<void()>;
class ThreadPool {
public:
// 单例模式:获取全局唯一线程池
static ThreadPool* get_instance(int thread_num = 5) {
if (_instance == NULL) {
LockGuard lock(_singleton_mutex); // 双重检查锁,保证线程安全
if (_instance == NULL) {
_instance = new ThreadPool(thread_num);
}
}
return _instance;
}
// 销毁单例(可选)
static void destroy_instance() {
if (_instance != NULL) {
delete _instance;
_instance = NULL;
}
}
// 提交任务到队列
void submit_task(const Task& task) {
_task_queue.push(task);
}
// 停止线程池
void stop() {
LockGuard lock(_mutex);
_is_running = false;
// 唤醒所有等待的线程
_cond_not_empty.notify_all();
}
private:
// 私有构造:禁止外部创建
ThreadPool(int thread_num) : _thread_num(thread_num), _is_running(true) {
// 初始化条件变量
pthread_cond_init(&_cond_not_empty, NULL);
// 创建线程
for (int i = 0; i < _thread_num; i++) {
pthread_t tid;
pthread_create(&tid, NULL, thread_work, this);
_threads.push_back(tid);
}
}
// 私有析构:禁止外部销毁
~ThreadPool() {
stop();
// 等待所有线程结束
for (pthread_t tid : _threads) {
pthread_join(tid, NULL);
}
// 销毁条件变量
pthread_cond_destroy(&_cond_not_empty);
}
// 线程工作函数:循环取任务执行
static void* thread_work(void* arg) {
ThreadPool* pool = (ThreadPool*)arg;
while (pool->_is_running) {
LockGuard lock(pool->_mutex);
// 任务队列为空,等待
while (pool->_task_queue.empty() && pool->_is_running) {
pthread_cond_wait(&pool->_cond_not_empty, pool->_mutex.get_raw());
}
// 如果线程池要停止,且任务队列为空,退出
if (!pool->_is_running && pool->_task_queue.empty()) {
break;
}
// 取任务执行
Task task = pool->_task_queue.front();
pool->_task_queue.pop();
lock.~LockGuard(); // 提前解锁,避免任务执行时占用锁
task(); // 执行任务
}
printf("线程 %lu 退出\n", pthread_self());
return NULL;
}
// 单例相关
static ThreadPool* _instance;
static Mutex _singleton_mutex;
int _thread_num; // 线程数量
std::vector<pthread_t> _threads; // 线程数组
std::queue<Task> _task_queue; // 任务队列
Mutex _mutex; // 保护任务队列的互斥量
pthread_cond_t _cond_not_empty; // 任务非空条件变量
bool _is_running; // 线程池运行状态
};
// 初始化静态成员
ThreadPool* ThreadPool::_instance = NULL;
Mutex ThreadPool::_singleton_mutex;
4.3 测试线程池
用 “打印日志” 作为任务,测试线程池:
#include <stdio.h>
#include <unistd.h>
#include "ThreadPool.hpp"
// 任务1:打印数字
void print_num(int num) {
printf("线程 %lu 处理任务:打印数字 %d\n", pthread_self(), num);
usleep(500000);
}
int main() {
// 获取线程池(5个线程)
ThreadPool* pool = ThreadPool::get_instance(5);
// 提交10个任务
for (int i = 0; i < 10; i++) {
// 用lambda封装任务(捕获i)
pool->submit_task([i]() { print_num(i); });
}
// 等待所有任务完成
sleep(5);
// 停止线程池
pool->stop();
// 销毁单例
ThreadPool::destroy_instance();
return 0;
}
运行后会看到 5 个线程轮流处理 10 个任务,避免了频繁创建线程的开销。
五、避坑指南:死锁与线程安全
5.1 死锁:线程的 “互相僵持”
死锁是多线程编程的 “噩梦”—— 比如线程 A 持有锁 1,等待锁 2;线程 B 持有锁 2,等待锁 1,两者互相僵持,永远无法继续。
死锁的 4 个必要条件(缺一不可):
-
互斥条件:资源只能被一个线程持有(比如锁);
-
请求与保持:线程持有一个资源,又请求另一个资源;
-
不剥夺条件:资源不能被强行夺走(只能主动释放);
-
循环等待:线程间形成 “你等我、我等你” 的循环。
如何避免死锁?
破坏任意一个必要条件即可,常用方法:
-
破坏循环等待:所有线程按 “固定顺序” 加锁(比如先锁 1 后锁 2);
-
破坏请求与保持:一次性申请所有资源(比如同时锁 1 和锁 2);
-
超时释放:加锁时设置超时时间,超时后释放已持有资源。
5.2 线程安全与可重入
很多人会混淆 “线程安全” 和 “可重入”,其实两者有明确区别:
概念 | 核心含义 | 例子 |
---|---|---|
线程安全 | 多个线程访问时,结果正确(通常靠锁实现) | 加锁的售票函数,多个线程调用结果正确 |
可重入 | 函数被多个执行流(线程 / 信号)重复调用,结果正确 | 纯数学函数(如int add(int a, int b) { return a+b; } ) |
关键结论:
-
可重入函数一定是线程安全的;
-
线程安全函数不一定是可重入的(比如加锁的函数,重入时会死锁);
-
避免在信号处理函数中调用不可重入函数(如
printf
、malloc
)。
六、总结:从 “乱象” 到 “有序” 的核心逻辑
Linux 线程同步与互斥的本质,是 “解决多线程并发访问共享资源的问题”:
-
互斥(锁):解决 “同一时间多个线程进临界区” 的问题,保证共享资源不被破坏;
-
同步(条件变量):解决 “线程执行顺序” 的问题,避免线程 “死等”;
-
线程池:解决 “频繁创建线程的开销” 问题,提升工业级程序的性能;
-
避坑:避免死锁(固定加锁顺序)、区分线程安全与可重入。
记住:好的多线程程序,不是 “越多线程越好”,而是 “有序、高效地利用线程”—— 这正是同步与互斥的核心价值。
更多推荐
所有评论(0)