多线程代码案例2-阻塞队列
假设没有阻塞队列,服务器A直接连通B或C.当服务器B在某种情况工作不过来了,就会对服务器A甚至其他服务器造成重大影响,所以。用户通过前端页面向后端服务器A发送请求,服务器A通过阻塞队列分配到相应的服务器B或服务器C当中进行处理.当遇上高峰期时,数据存储在阻塞队列中,由阻塞队列。,使wait能够多次判断,再次进入阻塞.为了更好理解阻塞队列的作用,介绍一下。大娘擀饺子皮给老王和老李包饺子,有了阻塞队列
·
文章目录
阻塞队列特点
- 阻塞队列是线程安全的
- 阻塞队列要有阻塞功能
- 当队列为空时,出队列就会出现阻塞,阻塞到其他线程入队列为止.
- 当队列为满时,入队列就会出现阻塞,阻塞到其他线程出队列为止.
生产者消费者模型
为了更好理解阻塞队列的作用,介绍一下生产者消费者模型,
大娘擀饺子皮给老王和老李包饺子,擀饺子皮就是生产者模型,包饺子就是消费者模型中间的桌子相当于阻塞队列.
- 若擀饺子皮的大娘速度很快,那么桌子很快就放不下了,那大娘就得停下来,等老李或老王包饺子再擀饺子皮.
- 若老王和老李包饺子的速度非常快,桌子空了,那老王和老李就得停下来,等大娘擀饺子皮再包饺子.
生产者消费者模型的意义
- 解耦合 通过阻塞队列降低耦合度
- 削峰填谷 由阻塞队列来承担数据峰值等情况.


-
用户通过前端页面向后端服务器A发送请求,服务器A通过阻塞队列分配到相应的服务器B或服务器C当中进行处理.
-
假设没有阻塞队列,服务器A直接连通B或C.当服务器B在某种情况工作不过来了,就会对服务器A甚至其他服务器造成重大影响,所以解耦合是极其重要的.
-
有了阻塞队列,就算是加入了新的服务器D连接A也可以几乎不改变服务器A的代码.
-
当遇上高峰期时,数据存储在阻塞队列中,由阻塞队列抗压
多线程环境使用阻塞队列
阻塞队列BlockingQueue是一个接口, 由以下几个类实现.
| 类型 | 描述 |
|---|---|
ArrayBlockingQueue |
基于数组实现的阻塞队列 |
LinkedBlockingQueue |
基于链表实现的阻塞队列 |
PriorityBlockingQueue |
基于堆实现的带优先级的阻塞队列 |
TransferQueue |
最多只包含一个元素的阻塞队列 |
阻塞队列的使用并不复杂:
- 实例化对象 ,传入参数为初始容量大小
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(100);
2.入队列和出队列
BlockingQueue的入队和出队为put()和take();具有阻塞特性put()没有返回值take()返回出队列的数据
Queue的入队列和出队列是offer()和poll();BlockingQueue也有offer()和poll();但是,offer()和poll()是没有阻塞特性的!
自己实现一个简单的阻塞队列
1. 先创建一个基本的阻塞队列类型
class MyBlockingQueue<E> { //阻塞队列泛型类
private E[] elems = null;
private int head = 0; //队头
private int tail = 0; //队尾
private int size = 0; //数据量
public MyBlockingQueue(int capacity) { //构造方法
elems = (E[])new Object[capacity];
}
public void put(E elem) throws InterruptedException {
if (size >= elems.length) {
// 队列满了.
// 后续需要让这个代码能够阻塞.
}
// 新的元素要放到 tail 指向的位置上
elems[tail] = elem;
tail++;
if (tail >= elems.length) {
tail = 0;
}
size++;
}
public E take() throws InterruptedException {
E elem = null;
if(size == 0) {
// 队列空了.
// 后续也需要让这个代码阻塞
}
// 取出 head 位置的元素并返回
elem = elems[head];
head++;
if (head >= elems.length) {
head = 0;
}
size--;
return elem;
}
}
2. 考虑线程安全问题
* 入队和出队都对数据进行更改,需要加锁
class MyBlockingQueue<E> { //阻塞队列泛型类
private E[] elems = null;
private int head = 0; //队头
private int tail = 0; //队尾
private int size = 0; //数据量
// 准备锁对象, 如果使用 this 也可以.
private final Object locker = new Object();
public MyBlockingQueue(int capacity) { //构造方法
elems = (E[])new Object[capacity];
}
public void put(E elem) throws InterruptedException {
if (size >= elems.length) {
// 队列满了.
// 后续需要让这个代码能够阻塞.
}
synchronize(locker){
// 新的元素要放到 tail 指向的位置上
elems[tail] = elem;
tail++;
if (tail >= elems.length) {
tail = 0;
}
size++;
}
}
public E take() throws InterruptedException {
E elem = null;
if(size == 0) {
// 队列空了.
// 后续也需要让这个代码阻塞
}
synchronize(locker){
// 取出 head 位置的元素并返回
elem = elems[head];
head++;
if (head >= elems.length) {
head = 0;
}
size--;
}
return elem;
}
}
if判断句中如果出现两个put在多线程刚好差一个满的情况也会出现线程安全问题.
所以,if也需要加上锁
class MyBlockingQueue<E> {
private E[] elems = null;
private int head = 0;
private int tail = 0;
private int size = 0;
// 准备锁对象, 如果使用 this 也可以.
private final Object locker = new Object();
public MyBlockingQueue(int capacity) {
elems = (E[])new Object[capacity];
}
public void put(E elem) throws InterruptedException {
// 锁加到这里和加到方法上本质一样的. 加到方法上是给 this 加锁. 此处是给 locker 加锁.
synchronized (locker) {
if (size >= elems.length) {
// 队列满了.
// 后续需要让这个代码能够阻塞.
}
// 新的元素要放到 tail 指向的位置上
elems[tail] = elem;
tail++;
if (tail >= elems.length) {
tail = 0;
}
size++;
// 入队列成功之后唤醒
locker.notify();
}
}
public E take() throws InterruptedException {
E elem = null;
synchronized (locker) {
if (size == 0) {
// 队列空了.
// 后续也需要让这个代码阻塞
}
// 取出 head 位置的元素并返回
elem = elems[head];
head++;
if (head >= elems.length) {
head = 0;
}
// 这个代码不要遗漏.
size--;
}
return elem;
}
}
3. wait 和 notify 实现阻塞
- 当队列满时,
put()进入阻塞wait();当执行take()之后,通过notify()唤醒put() - 当队列空时,
take()进入阻塞wait();当执行put()之后,通过notify()唤醒take() - 如果没有线程处于wait阻塞,调用notify就不会产生任何结果
class MyBlockingQueue<E> {
private E[] elems = null;
private int head = 0;
private int tail = 0;
private int size = 0;
// 准备锁对象, 如果使用 this 也可以.
private final Object locker = new Object();
public MyBlockingQueue(int capacity) {
elems = (E[])new Object[capacity];
}
public void put(E elem) throws InterruptedException {
// 锁加到这里和加到方法上本质一样的. 加到方法上是给 this 加锁. 此处是给 locker 加锁.
synchronized (locker) {
if (size >= elems.length) {
// 队列满了.
// 后续需要让这个代码能够阻塞.
locker.wait();
}
// 新的元素要放到 tail 指向的位置上
elems[tail] = elem;
tail++;
if (tail >= elems.length) {
tail = 0;
}
size++;
// 入队列成功之后唤醒
locker.notify();
}
}
public E take() throws InterruptedException {
E elem = null;
synchronized (locker) {
if (size == 0) {
// 队列空了.
// 后续也需要让这个代码阻塞
locker.wait();
}
// 取出 head 位置的元素并返回
elem = elems[head];
head++;
if (head >= elems.length) {
head = 0;
}
// 这个代码不要遗漏.
size--;
// 元素出队列成功之后, 加上唤醒
locker.notify();
}
return elem;
}
}
4.if 改为while
此时还有问题:
对此改进代码if更改成while,使wait能够多次判断,再次进入阻塞.
5.最终阻塞队列
class MyBlockingQueue<E> {
private E[] elems = null;
private int head = 0;
private int tail = 0;
private int size = 0;
// 准备锁对象, 如果使用 this 也可以.
private final Object locker = new Object();
public MyBlockingQueue(int capacity) {
elems = (E[])new Object[capacity];
}
public void put(E elem) throws InterruptedException {
// 锁加到这里和加到方法上本质一样的. 加到方法上是给 this 加锁. 此处是给 locker 加锁.
synchronized (locker) {
while (size >= elems.length) {
// 队列满了.
// 后续需要让这个代码能够阻塞.
locker.wait();
}
// 新的元素要放到 tail 指向的位置上
elems[tail] = elem;
tail++;
if (tail >= elems.length) {
tail = 0;
}
size++;
// 入队列成功之后唤醒
locker.notify();
}
}
public E take() throws InterruptedException {
E elem = null;
synchronized (locker) {
while (size == 0) {
// 队列空了.
// 后续也需要让这个代码阻塞
locker.wait();
}
// 取出 head 位置的元素并返回
elem = elems[head];
head++;
if (head >= elems.length) {
head = 0;
}
// 这个代码不要遗漏.
size--;
// 元素出队列成功之后, 加上唤醒
locker.notify();
}
return elem;
}
}
6. 测试类
public class ThreadDemo29 {
public static void main(String[] args) throws InterruptedException {
MyBlockingQueue<String> queue = new MyBlockingQueue<>(1000);
// 生产者
Thread t1 = new Thread(() -> {
int n = 1;
while (true) {
try {
queue.put(n + "");
System.out.println("生产元素 " + n);
n++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
// 消费者
Thread t2 = new Thread(() -> {
while (true) {
try {
String n = queue.take();
System.out.println("消费元素 " + n);
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
}
更多推荐



所有评论(0)