BlockingQueue接口七大类实现
队列是一种常见的数据结构,Java中以Queue的形式存在,继承Collection。而BlockingQueue又继承Queue,是一种具有阻塞线程功能的特殊队列。BlockingQueue的实现是基于ReentrantLock,最常用的场景是:生产者/消费者模式,不需要额外的实现线程的同步和唤醒。ArrayBlockingQueue:由数组组成的有界阻塞队列LinkedBlockingQueu
队列是一种常见的数据结构,Java中以Queue的形式存在,继承Collection。而BlockingQueue又继承Queue,是一种具有阻塞线程功能的特殊队列。
BlockingQueue的实现是基于ReentrantLock,最常用的场景是:生产者/消费者模式,不需要额外的实现线程的同步和唤醒。
- ArrayBlockingQueue:由数组组成的有界阻塞队列
- LinkedBlockingQueue:由链表组成的有界阻塞队列
- LinkedTransferQueue:由链表组成的无界队列
- PriorityBlockingQueue:优先级排序的无界阻塞队列
- DelayQueue:优先级排序的无界阻塞队列
- SynchronousQueue:不存储元素的阻塞队列
- LinkedBlockingDeque:由链表组成的双端阻塞队列
ArrayBlockingQueue
由数组结构实现,队列的容量是固定的。存、取数据默认使用一把非公平锁,无法实现真正意义上的存、取数据的并发执行。
- 由于是数组实现,容量固定不变,因此不容易出现内存占用率过高等问题,但如果容量过小,取数据比存数据的数据慢,这样会造成很多线程进入阻塞状态,可以使用offer( )方法达到不阻塞线程,在高并发、吞吐量高的情况下,由于存、取共用一把锁,不推荐使用。
- 使用场景:更多是放在项目的次级业务上,比如:人事系统中员工离职、变更后,其它依赖应用进行数据同步。在一些项目中,可能公司的其他部门的应用服务会要求同步人事系统的部分组织架构数据,但是人事系统数据发生变更后,应用的依赖方需要进行数据的同步,这种场景下,由于员工离职、变更操作不是非常频繁,因此可以有效防止线程阻塞,基本没有并发量和吞吐量的要求。
LinkedBlockingQueue
由链表结构实现,队列容量默认Integer.MAX_VALUE,存、取数据的操作分别用于独立的锁,可以实现存、取的并发执行。
- 基于链表实现,数据的新增和移除速度比数组快,但是每次存、取数据都会有Node对象的新建和移除,因此也会存在GC影响性能的可能。
- 默认容量很大,因此存储数据的线程基本不会阻塞,但是取数据的速度过低,内存占用可能会飙升。
- 存、取操作锁分离,所以使用有并发和吞吐量要求的场景。
- 使用场景:在项目的核心业务,且生产和消费速度相似的场景,比如:订单完成的邮件、短信提醒。在订单系统中,当用户下单成功后,将信息放入ArrayBlockingQueue中,由消息推送系统取出数据进行消息推送提醒用户下单成功。如果订单的成交量非常大,那么使用ArrayBlockingQueue就会出现一些问题,固定数组很容易被使用完,此时调用的线程会进入阻塞,那么可能无法及时将消息推送出去,所以使用LinkedBlockingQueue比较合适,但是要注意消费速度不能太低,不然容易内存被使用完
PriorityBlockingQueue
基于数组实现,队列容量最大值为Integer.MAX_VALUE-8(-8是因为数组的对象头)。根据传入的优先级进行排序,保证优先级来消费。
- 优先级阻塞队列中存在一次排序,根据优先级来将数据放入到头部或者尾部;
- 排序带来的损耗因素,由二叉树最小堆排序算法来降低。
- 使用场景:在项目上存在优先级的业务,比如:VIP队列购票。用户在购票的时候,根据用户不同的等级,优先放到队伍的前面,当存在票源的时候,根据优先级分配。
package Learning.java.BlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
/*
* @ClassName: PriorityTest
* @Author: XiaoChen
* @Date: 2021/12/10 星期五 15:42
*/
public class PriorityTest {
static class Ticket implements Comparable<Ticket>{
private final int level;
public Ticket(int level){
this.level=level;
}
@Override
public int compareTo(Ticket o) {
if(this.level>o.level) return -1;
return 1;
}
public static void main(String[] args) {
//VIP客户优先登机,抢票
BlockingQueue<Ticket> queue=new PriorityBlockingQueue<>();
Ticket ticket1=new Ticket(0);
Ticket ticket2=new Ticket(1);
Ticket ticket3=new Ticket(2);
Ticket ticket4=new Ticket(-1);
queue.add(ticket1);
queue.add(ticket2);
queue.add(ticket3);
queue.add(ticket4);
for(;;){
try{
System.out.println(queue.take().level);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
DelayQueue
延迟队列,基于优先级队列来实现存储元素,必须实现Delayed接口(Delayed接口继承了Comparable接口)
- 由于基于优先级队列实现,这个优先级是根据时间排序的,比如:订单超时取消功能。用户订单未支付开始倒计时。
package Learning.java.BlockingQueue;
import java.util.concurrent.*;
/*
* @ClassName: DelayTest
* @Author: XiaoChen
* @Date: 2021/12/10 星期五 16:12
*/
public class DelayTest {
static class Work implements Delayed{
private String name;//名称
private long time;//时长
public Work(String name,long time,TimeUnit unit){
this.name=name;
this.time=System.currentTimeMillis()+(time>0?unit.toMillis(time):0);
}
@Override
public long getDelay(TimeUnit unit) {
return time-System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
Work work=(Work)o;
long diff=this.time-work.time;
if(diff<=0) return -1;
return 1;
}
}
public static void main(String[] args) {
BlockingQueue<Work> queue=new DelayQueue<>();
try{
Work work1=new Work("张三",30,TimeUnit.SECONDS);
Work work2=new Work("李四",15,TimeUnit.SECONDS);
Work work3=new Work("王五",25,TimeUnit.SECONDS);
queue.add(work1);
queue.add(work2);
queue.add(work3);
for(;;){
Work work=queue.take();
System.out.println(work.name+","+work.time);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
SynchronousQueue
利用双栈双队列算法的无空间队列或栈任何一个对SynchronousQueue写需要等到一个对SynchronousQueue的读操作,任何一个个读操作需要等待一个写操作。没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者。可以理解成交换通道,不存储任何元素,提供者和消费者是需要组队完成工作,缺少任何一个将会阻塞线程,直到等到配对为止。
- 使用场景:newCachedThreadPool( )、轻量级的任务转交
如果我们不确定每一个来自生产者请求数量但是需要很快的处理掉,那么配合SynchronousQueue为每一个生产者请求分配一个消费者线程是最简洁的方法。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来)创建新的线程,如果有空闲线程则会重复使用,线程默认空闲了60秒回被回收。此外还有会话转交,通常坐席需要进行会话转交,如果坐席在线那么会为我们分配一个客服,但是如果没有,那么阻塞请求线程,一段时间会超时或者提示坐席已满。
更多推荐
所有评论(0)