阻塞队列特点

  1. 阻塞队列是线程安全
  2. 阻塞队列要有阻塞功能
    • 当队列为空时,出队列就会出现阻塞,阻塞到其他线程入队列为止.
    • 当队列为满时,入队列就会出现阻塞,阻塞到其他线程出队列为止.

生产者消费者模型

为了更好理解阻塞队列的作用,介绍一下生产者消费者模型,
在这里插入图片描述

大娘擀饺子皮给老王和老李包饺子,擀饺子皮就是生产者模型,包饺子就是消费者模型中间的桌子相当于阻塞队列.

  • 若擀饺子皮的大娘速度很快,那么桌子很快就放不下了,那大娘就得停下来,等老李或老王包饺子再擀饺子皮.
  • 若老王和老李包饺子的速度非常快,桌子空了,那老王和老李就得停下来,等大娘擀饺子皮再包饺子.

生产者消费者模型的意义

  1. 解耦合 通过阻塞队列降低耦合度
  2. 削峰填谷 由阻塞队列来承担数据峰值等情况.
    在这里插入图片描述
    在这里插入图片描述
  • 用户通过前端页面向后端服务器A发送请求,服务器A通过阻塞队列分配到相应的服务器B或服务器C当中进行处理.

  • 假设没有阻塞队列,服务器A直接连通B或C.当服务器B在某种情况工作不过来了,就会对服务器A甚至其他服务器造成重大影响,所以解耦合是极其重要的.

  • 有了阻塞队列,就算是加入了新的服务器D连接A也可以几乎不改变服务器A的代码.

  • 当遇上高峰期时,数据存储在阻塞队列中,由阻塞队列抗压

多线程环境使用阻塞队列

阻塞队列BlockingQueue是一个接口, 由以下几个类实现.

类型 描述
ArrayBlockingQueue 基于数组实现的阻塞队列
LinkedBlockingQueue 基于链表实现的阻塞队列
PriorityBlockingQueue 基于实现的带优先级的阻塞队列
TransferQueue 最多只包含一个元素的阻塞队列

阻塞队列的使用并不复杂:

  1. 实例化对象 ,传入参数为初始容量大小
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(100);

2.入队列和出队列

  • BlockingQueue的入队和出队为put()take();具有阻塞特性
    • put() 没有返回值
    • take() 返回出队列的数据
  • Queue的入队列和出队列是offer()poll();
  • BlockingQueue也有offer()poll();但是,offer()poll()没有阻塞特性的!

自己实现一个简单的阻塞队列

1. 先创建一个基本的阻塞队列类型

class MyBlockingQueue<E> {    //阻塞队列泛型类
    private E[] elems = null;
    private int head = 0;     //队头
    private int tail = 0;		 //队尾
    private int size = 0;		 //数据量

    public MyBlockingQueue(int capacity) {  //构造方法
        elems = (E[])new Object[capacity]; 
    }

    public void put(E elem) throws InterruptedException {
            if (size >= elems.length) {
                // 队列满了.
                // 后续需要让这个代码能够阻塞.              
            }
            // 新的元素要放到 tail 指向的位置上
            elems[tail] = elem;
            tail++;
            if (tail >= elems.length) {
                tail = 0;
            }
            size++;
    }

    public E take() throws InterruptedException {
        E elem = null;
            if(size == 0) {
                // 队列空了.
                // 后续也需要让这个代码阻塞
            }
            // 取出 head 位置的元素并返回
            elem = elems[head];
            head++;
            if (head >= elems.length) {
                head = 0;
            }
            size--;
        return elem;
    }
}

2. 考虑线程安全问题

* 入队和出队都对数据进行更改,需要加锁
class MyBlockingQueue<E> {    //阻塞队列泛型类
    private E[] elems = null;
    private int head = 0;     //队头
    private int tail = 0;		 //队尾
    private int size = 0;		 //数据量
    
   // 准备锁对象, 如果使用 this 也可以.
    private final Object locker = new Object();
    
    public MyBlockingQueue(int capacity) {  //构造方法
        elems = (E[])new Object[capacity]; 
    }

    public void put(E elem) throws InterruptedException {
            if (size >= elems.length) {
                // 队列满了.
                // 后续需要让这个代码能够阻塞.              
            }
            synchronize(locker){
	            // 新的元素要放到 tail 指向的位置上
	            elems[tail] = elem;
	            tail++;
	            if (tail >= elems.length) {
	                tail = 0;
	            }
	            size++;
            }
    }

    public E take() throws InterruptedException {
        E elem = null;
            if(size == 0) {
                // 队列空了.
                // 后续也需要让这个代码阻塞
            }
            synchronize(locker){
	            // 取出 head 位置的元素并返回
	            elem = elems[head];
	            head++;
	            if (head >= elems.length) {
	                head = 0;
	            }
	            size--;
            }
        return elem;
    }
}
  • if判断句中如果出现两个put在多线程刚好差一个满的情况也会出现线程安全问题.
    在这里插入图片描述
    所以,if也需要加上锁
class MyBlockingQueue<E> {
    private E[] elems = null;
    private int head = 0;
    private int tail = 0;
    private int size = 0;

    // 准备锁对象, 如果使用 this 也可以.
    private final Object locker = new Object();

    public MyBlockingQueue(int capacity) {
        elems = (E[])new Object[capacity];
    }

    public void put(E elem) throws InterruptedException {
        // 锁加到这里和加到方法上本质一样的. 加到方法上是给 this 加锁. 此处是给 locker 加锁.
        synchronized (locker) {
            if (size >= elems.length) {
                // 队列满了.
                // 后续需要让这个代码能够阻塞.
                
            }
            // 新的元素要放到 tail 指向的位置上
            elems[tail] = elem;
            tail++;
            if (tail >= elems.length) {
                tail = 0;
            }
            size++;

            // 入队列成功之后唤醒
            locker.notify();
        }
    }

    public E take() throws InterruptedException {
        E elem = null;
        synchronized (locker) {
            if (size == 0) {
                // 队列空了.
                // 后续也需要让这个代码阻塞
   
            }
            // 取出 head 位置的元素并返回
            elem = elems[head];
            head++;
            if (head >= elems.length) {
                head = 0;
            }
            // 这个代码不要遗漏.
            size--;
        }
        return elem;
    }
}

3. wait 和 notify 实现阻塞

  • 当队列满时,put()进入阻塞wait();当执行take()之后,通过notify()唤醒put()
  • 当队列空时,take()进入阻塞wait();当执行put()之后,通过notify()唤醒take()
  • 如果没有线程处于wait阻塞,调用notify就不会产生任何结果
class MyBlockingQueue<E> {
    private E[] elems = null;
    private int head = 0;
    private int tail = 0;
    private int size = 0;

    // 准备锁对象, 如果使用 this 也可以.
    private final Object locker = new Object();

    public MyBlockingQueue(int capacity) {
        elems = (E[])new Object[capacity];
    }

    public void put(E elem) throws InterruptedException {
        // 锁加到这里和加到方法上本质一样的. 加到方法上是给 this 加锁. 此处是给 locker 加锁.
        synchronized (locker) {
            if (size >= elems.length) {
                // 队列满了.
                // 后续需要让这个代码能够阻塞.
                locker.wait();
            }
            // 新的元素要放到 tail 指向的位置上
            elems[tail] = elem;
            tail++;
            if (tail >= elems.length) {
                tail = 0;
            }
            size++;

            // 入队列成功之后唤醒
            locker.notify();
        }
    }

    public E take() throws InterruptedException {
        E elem = null;
        synchronized (locker) {
            if (size == 0) {
                // 队列空了.
                // 后续也需要让这个代码阻塞
                locker.wait();
            }
            // 取出 head 位置的元素并返回
            elem = elems[head];
            head++;
            if (head >= elems.length) {
                head = 0;
            }
            // 这个代码不要遗漏.
            size--;

            // 元素出队列成功之后, 加上唤醒
            locker.notify();
        }
        return elem;
    }
}

4.if 改为while

此时还有问题:
在这里插入图片描述
对此改进代码if更改成while,使wait能够多次判断,再次进入阻塞.

5.最终阻塞队列

class MyBlockingQueue<E> {
    private E[] elems = null;
    private int head = 0;
    private int tail = 0;
    private int size = 0;

    // 准备锁对象, 如果使用 this 也可以.
    private final Object locker = new Object();

    public MyBlockingQueue(int capacity) {
        elems = (E[])new Object[capacity];
    }

    public void put(E elem) throws InterruptedException {
        // 锁加到这里和加到方法上本质一样的. 加到方法上是给 this 加锁. 此处是给 locker 加锁.
        synchronized (locker) {
            while (size >= elems.length) {
                // 队列满了.
                // 后续需要让这个代码能够阻塞.
                locker.wait();
            }
            // 新的元素要放到 tail 指向的位置上
            elems[tail] = elem;
            tail++;
            if (tail >= elems.length) {
                tail = 0;
            }
            size++;

            // 入队列成功之后唤醒
            locker.notify();
        }
    }

    public E take() throws InterruptedException {
        E elem = null;
        synchronized (locker) {
            while (size == 0) {
                // 队列空了.
                // 后续也需要让这个代码阻塞
                locker.wait();
            }
            // 取出 head 位置的元素并返回
            elem = elems[head];
            head++;
            if (head >= elems.length) {
                head = 0;
            }
            // 这个代码不要遗漏.
            size--;

            // 元素出队列成功之后, 加上唤醒
            locker.notify();
        }
        return elem;
    }
}

6. 测试类

public class ThreadDemo29 {
    public static void main(String[] args) throws InterruptedException {
        MyBlockingQueue<String> queue = new MyBlockingQueue<>(1000);
        
        // 生产者
        Thread t1 = new Thread(() -> {
            int n = 1;
            while (true) {
                try {
                    queue.put(n + "");
                    System.out.println("生产元素 " + n);
                    n++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        // 消费者
        Thread t2 = new Thread(() -> {
            while (true) {
                try {
                    String n = queue.take();
                    System.out.println("消费元素 " + n);

                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        t1.start();
        t2.start();
    }
}
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐