【Linux】线程之同步(五)
文章摘要: 本文主要讨论了多线程编程中的死锁问题和线程同步机制。死锁产生的四个必要条件包括互斥、请求保持、不剥夺和循环阻塞,可通过加锁顺序一致、避免锁未释放等方式避免。线程同步通过条件变量实现,pthread_cond_wait()会自动释放锁并等待唤醒。生产者消费者模型通过阻塞队列解耦生产消费过程,使用互斥锁和条件变量保证线程安全。文中给出了阻塞队列的具体实现代码,展示了生产者和消费者线程如何安
文章目录
死锁
有这样一种情况,两个线程,两把锁
线程1申请第一把锁,又申请第二把锁,
线程2申请第二把锁,又申请第一把锁,
肯定会出现,并发,两个线程都卡在第一步上,另一把锁无法申请,永久阻塞导致死锁
造成死锁的四个必要条件,
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流在申请资源时阻塞时,对已经申请的资源保持不放
- 不剥夺条件:一个执行流已经获得的资源,在未使用完之前,不能强行剥夺
- 循环阻塞条件:多个执行流在申请资源时,形成头尾相接的阻塞关系
避免死锁
破坏死锁的必要条件,第一条破环不了,剩下3条都可以破环
- 加锁顺序一致
- 避免锁未释放的场景
- 资源一次性分配
解决死锁的算法(了解)
银行家算法
死锁检测算法
🚩线程同步
同步概念:在保证数据安全的情况下,让线程按照一定顺序去访问临界资源, 从而避免线程饥饿问题
🚩条件变量
一个线程互斥的访问某个临界变量时,可能在变量状态改变之前,该线程什么也做不了,
比如说,一个线程在访问队列,队列为空,在其他线程将数据加入到队列之前,该线程什么也做不了,我们要让该线程等待,就用到了条件变量
初始化
静态:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
静态的不需要手动销毁
动态:
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
cond:要初始化的条件变量
attr:NULL
销毁
int pthread_cond_destroy(pthread_cond_t *cond);
等待条件
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
cond:要在这个条件变量上等待
mutex:互斥量,
唤醒
int pthread_cond_broadcast(pthread_cond_t *cond); //唤醒所有线程
int pthread_cond_signal(pthread_cond_t *cond);//唤醒单个线程
🚩pthread_cond_wait()
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);让线程等待的时候,会自动释放锁,伪代码如下 ,
int pthread_cond_wait(cond, mutex) {
// 原子操作开始
unlock(mutex); // 释放保护数据的锁
add_to_wait_queue(); // 把自己加入条件变量的等待队列
// 原子操作结束
sleep(); // 等待被signal唤醒
// 被唤醒后
lock(mutex); // 重新获取锁
return;
}
条件变量实操
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <stdint.h>
using namespace std;
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
int cnt=0;
void* Count(void* args)
{
cout<<"create success"<<endl;
pthread_detach(pthread_self());
uint64_t num=uint64_t(args);
while(1)
{
pthread_mutex_lock(&lock);
pthread_cond_wait(&cond,&lock);
cout<<num<<": "<<cnt++<<endl;
pthread_mutex_unlock(&lock);
}
return nullptr;
}
int main()
{
for(uint64_t i=0; i<3;i++)
{
pthread_t tid;
pthread_create(&tid,nullptr,Count,(void*)i);
usleep(1000);
}
sleep(3);
while(1)
{
sleep(1);
pthread_cond_signal(&cond);
cout<<"signal success"<<endl;
}
return 0;
}
因为pthread_cond_wait(),自动释放锁,被其他线程申请再释放,很多线程会等待在这条代码中,
运行时,3个线程被创建,待主线程控制后,唤醒成功
##🚩 细节
pthread_mutex_lock(&lock);
pthread_cond_wait(&cond,&lock);
为什么先加锁,再等待?
:线程怎么知道要休眠?一定是判断临界资源不就绪,所以临界资源也是有状态的
我们唤醒线程就是更改临界资源状态,线程再次判断,既然判断就是访问临界资源,所以必须加锁再等待,不然就会像抢票那样出问题
🚩生产消费者模型
生产消费者模型主要是解决生产消费者强耦合问题,生产者和消费者不直接通讯,而是通过特定的内存块,生产者生产完数据后,不用等待消费者拿走,而是交给特定内存块,消费者拿数据直接从特定内存块拿数据,特定的内存块相当于缓冲区,给生产消费者解耦。例如生活中的超市
3个关系,
2个对象——生产和消费
1个交易场所——特定结构的内存空间

执行流在做通信,保证了安全高效的通信
🚩并发问题:
- 生产vs生产:互斥
- 消费vs消费:互斥
- 生产vs消费:互斥,同步,(互斥,不能一边放饮料,一边拿饮料,同步,不能我还没放你就拿)
优点
- 支持忙闲不均
- 给生产消费者解耦
代码实现生产消费模型
//阻塞队列实现
#include <iostream>
#include <queue>
using namespace std;
template<class T>
class BlockQueue{
static const int defaultnum=20;
public:
BlockQueue(int maxcap=defaultnum)
:_maxcap(maxcap)
{
pthread_mutex_init(&_lock,nullptr);
pthread_cond_init(&_c,nullptr);
pthread_cond_init(&_p,nullptr);
}
T pop()
{
pthread_mutex_lock(&_lock);
while (_queue.empty())
{
pthread_cond_wait(&_c,&_lock);
}
T tp = _queue.front();
_queue.pop();
pthread_cond_signal(&_p);
pthread_mutex_unlock(&_lock);
return tp;
}
void push(const T &in)
{
pthread_mutex_lock(&_lock);
while (_queue.size()==_maxcap)
{
pthread_cond_wait(&_p,&_lock);
}
_queue.push(in);
pthread_cond_signal(&_c);
pthread_mutex_unlock(&_lock);
}
~BlockQueue()
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_c);
pthread_cond_destroy(&_p);
}
private:
queue<T> _queue;
pthread_mutex_t _lock;
pthread_cond_t _c;
pthread_cond_t _p;
int _maxcap;
};
#pragma
#include <iostream>
#include <pthread.h>
#include "BlockQueue.hpp"
#include <time.h>
#include <unistd.h>
using namespace std;
void* Consumer(void* args)
{
BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(args);
while(1)
{
int t=bq->pop();
cout<<"消费成功"<<":"<<t<<endl;
sleep(1);
}
}
void* Producter(void* args)
{
BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(args);
while(1)
{
int t=rand()%10;
bq->push(t);
cout<<"生产成功"<<":"<<t<<endl;
sleep(1);
}
}
int main()
{
BlockQueue<int>* bq=new BlockQueue<int>;
srand(time(nullptr));
//for(int i=0;i<3;i++)
//{
pthread_t ptid;
pthread_create(&ptid,nullptr,Producter,bq);
//}
//for(int i=0;i<3;i++)
//{
pthread_t ctid;
pthread_create(&ctid,nullptr,Consumer,bq);
// }
//for(int i=0;i<3;i++)
//{
pthread_join(ptid,nullptr);
pthread_join(ctid,nullptr);
delete bq;
//}
return 0;
}

有些显示器竞争问题,正常
🚩伪唤醒
while (_queue.empty())
{
pthread_cond_wait(&_c,&_lock);
}
这里为什么用while而不是if?
想一下如果是if,队列为空,3个消费者线程都进入等待了,这时生产者生产了一个数据,我们唤醒所有进程,其中1个线程抢到了锁,消费释放后,另外的线程也会竞争锁,因为是if,所以竞争到锁继续向下消费了,但是我们的队列是空的
所以用while,即使拿到锁了,还是要再判断一次,队列是空,继续等待
水平线版本
定义水平线,低于某个水平线,就唤醒生产
高于某个水平线,唤醒消费
template<class T>
class BlockQueue{
static const int defaultnum=20;
public:
BlockQueue(int maxcap=defaultnum)
:_maxcap(maxcap)
{
pthread_mutex_init(&_lock,nullptr);
pthread_cond_init(&_c,nullptr);
pthread_cond_init(&_p,nullptr);
_low_water =_maxcap/3;
_high_water=_maxcap*2/3;
}
T pop()
{
pthread_mutex_lock(&_lock);
while (_queue.size()==0)
{
pthread_cond_wait(&_c,&_lock);
}
T tp = _queue.front();
_queue.pop();
if(_queue.size()<_high_water)pthread_cond_signal(&_p);
pthread_mutex_unlock(&_lock);
return tp;
}
void push(const T &in)
{
pthread_mutex_lock(&_lock);
while (_queue.size()==_maxcap)
{
pthread_cond_wait(&_p,&_lock);
}
_queue.push(in);
if(_queue.size()>_low_water)pthread_cond_signal(&_c);
pthread_mutex_unlock(&_lock);
}
~BlockQueue()
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_c);
pthread_cond_destroy(&_p);
}
private:
queue<T> _queue;
pthread_mutex_t _lock;
pthread_cond_t _c;
pthread_cond_t _p;
int _maxcap;
int _low_water;
int _high_water;
};

多生产多消费
int main()
{
BlockQueue<int>* bq=new BlockQueue<int>;
srand(time(nullptr));
pthread_t p[3],c[5];
for(int i=0;i<3;i++)
{
pthread_t ptid;
pthread_create(p+i,nullptr,Producter,bq);
}
for(int i=0;i<3;i++)
{
pthread_t ctid;
pthread_create(c+i,nullptr,Consumer,bq);
}
for(int i=0;i<3;i++)
{
pthread_join(p[i],nullptr);
pthread_join(c[i],nullptr);
delete bq;
}
return 0;
}
🚩添加任务版
循环队列不仅可以放int,还可以放任务
#include <iostream>
using namespace std;
string len="+-*/%";
enum{
Divzero=1,
ModZero,
Unkown
};
class Task{
public:
Task(int date1,int date2,char ch)
:_date1(date1),_date2(date2),_ch(ch)
{}
int run()
{
switch (_ch)
{
case '+':
_result = _date1 + _date2;
break;
case '-':
_result = _date1 - _date2;
break;
case '*':
_result = _date1 * _date2;
break;
case '/':
{
if (_date2 == 0)
{
_exitcode = Divzero;
}
else
_result = _date1 / _date2;
}
break;
case '%':
{
if (_date2 == 0)
{
_exitcode = ModZero;
}
else
_result = _date1 % _date2;
}
break;
default:
_exitcode = Unkown;
break;
}
}
int operator ()()
{
run();
}
string Getresult()
{
string r;
r+=to_string(_date1);
r+=_ch;
r+=to_string(_date2);
r+='=';
r+=to_string(_result);
r+="[code]:";
r+=to_string(_exitcode);
return r;
}
string Gettask()
{
string r;
r+=to_string(_date1);
r+=_ch;
r+=to_string(_date2);
r+="=?";
return r;
}
~Task()
{}
private:
int _date1;
int _date2;
int _result;
char _ch;
int _exitcode;
};
#pragma
#include <iostream>
#include <pthread.h>
#include "BlockQueue.hpp"
#include <time.h>
#include <unistd.h>
#include "Task.hpp"
using namespace std;
void* Consumer(void* args)
{
BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(args);
while(1)
{
Task t=bq->pop();
t();
cout<<"消费成功"<<":"<<t.Getresult()<<endl;
sleep(2);
}
}
void* Producter(void* args)
{
BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(args);
while(1)
{
int date1=rand()%10;
int date2=rand()%10;
int ch=len[rand()%len.size()];
Task t(date1,date2,ch);
bq->push(t);
cout<<"生产成功"<<":"<<t.Gettask()<<endl;
sleep(2);
}
}
int main()
{
BlockQueue<Task>* bq=new BlockQueue<Task>;
srand(time(nullptr));
pthread_t p[3],c[5];
//for(int i=0;i<3;i++)
//{
pthread_t ptid;
pthread_create(&ptid,nullptr,Producter,bq);
//}
//for(int i=0;i<3;i++)
//{
pthread_t ctid;
pthread_create(&ctid,nullptr,Consumer,bq);
// }
//for(int i=0;i<3;i++)
//{
pthread_join(ptid,nullptr);
pthread_join(ctid,nullptr);
//}
delete bq;
return 0;
}

更多推荐


所有评论(0)