public interface Queue<E> extends Collection<E> {
    /*
    * add方法,在不违背队列容量限制的情况,往队列中添加一个元素,如果添加成功则返回true,
    * 如果因为容量限制添加失败了,则抛出IllegalStateException异常    
    */
    boolean add(E e);

	/*
    * offer方法在不违背容量限制的情况,往队列中添加一个元素,如果添加元素成功,返回true,
    * 如果因为空间限制,无法添加元素则返回false。在有容量限制的队列中,offer方法优于add方法,
    * 因为add方法通过抛异常的方式表示容量已满,offer方法通过返回false的方式表示容量已满,
    * 抛异常处理更加耗时,offer直接返回false的方式更好
    */
    boolean offer(E e);

	/*
    * 删除并返回队列头的元素,该方法与poll的区别在于:若队列为空,则抛异常
    */
    E remove();

	/*
    * poll方法也是删除并返回队列的头元素,若队列为空,则返回null
    */
    E poll();

	/*
    * 返回队列头元素,但不删除,该方法与peek的区别在于:如果队列为空,将抛异常
    */
    E element();

	/*
    * 返回队列头元素,但不删除,如果队列为空,将返回null
    */
    E peek();
}

阻塞队列接口BlockingQueue继承自Queue接口:

public interface BlockingQueue<E> extends Queue<E> {
 
	//将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量)
	//在成功时返回 true,如果此队列已满,则抛IllegalStateException。 
	boolean add(E e); 
	 
	//将指定的元素插入此队列的尾部,如果队列已满,则等待指定的时间,
	//如果等待时间结束了还没有可用的空间,则返回false。该方法可被中断 
	boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; 
	
	//将指定的元素插入此队列的尾部,如果该队列已满,则一直等待(阻塞)。 
	void put(E e) throws InterruptedException; 
	
	//获取并移除此队列的头部,如果没有元素则等待(阻塞),直到有元素将唤醒等待线程执行该操作
    //在ThreadPoolExecutor中若未设置核心线程存活时间,则使用该方法从阻塞队列中获取待执行的任务
	E take() throws InterruptedException; 
	
	//获取并移除此队列的头部,在指定的等待时间前一直等待获取元素,超过时间方法将结束
    //在ThreadPoolExecutor中若设置了核心线程存活时间,则使用该方法从阻塞队列中获取待执行的任务,
    //workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),超时时间是线程存活时间。
	E poll(long timeout, TimeUnit unit) throws InterruptedException; 
	
	//从此队列中移除指定元素的单个实例(如果存在)。 
	boolean remove(Object o); 
 
 
	//除了上述方法还有继承自Queue接口的方法 
	//获取但不移除此队列的头元素,没有则抛异常NoSuchElementException 
	E element(); 
	
	//获取但不移除此队列的头;如果此队列为空,则返回 null。 
	E peek(); 
	
	//获取并移除此队列的头,如果此队列为空,则返回 null。 
	E poll();
}

add:增加一个元素,如果队列已满,则抛出IllegalStateException异常

remove:移除并返回队列头部的元素,如果队列为空,则抛出NoSuchElementException异常

element:返回队列头部的元素,如果队列为空,则抛出NoSuchElementException异常

offer:添加一个元素并返回true,如果队列已满,则返回false

poll:移除并返问队列头部的元素,如果队列为空,则返回null

peek:返回队列头部的元素,但不删除,如果队列为空,则返回null

put:添加一个元素,如果队列已满,则阻塞。

take:移除并返回队列头部的元素,如果队列为空,则阻塞。

总结:

1. add和offer方法都是向队列中增加元素,区别在于:在队列满的情况下,add方法将选择抛异常来表示队列已经满了,而offer方法通过返回false表示队列已经满了;在有限队列的情况,使用offer方法优于add方法;

2. remove和poll方法都是删除并返回队列的头元素,区别在于:在队列为空的情况下,remove方法将抛异常,而poll方法将返回null;

3. element和peek方法都是返回队列的头元素,但是不删除头元素,区别在于:在队列为空的情况下,element方法将抛异常,而peek方法将返回null。

阻塞队列的成员(加锁的都是线程安全的队列):

在Java中,ArrayBlockingQueueLinkedBlockingQueue是两种常用的阻塞队列实现,它们都实现了BlockingQueue接口,且这两个阻塞队列都是线程安全的,它们通过锁机制保证多线程环境下的数据一致性。

ArrayBlockingQueue

ArrayBlockingQueue与ArrayList一样,底层是基于数组实现的,是基于数组的阻塞队列,数组是要指定长度的,所以使用ArrayBlockingQueue时必须指定长度,它是一个有界队列。

既然它在JUC包内,说明使用它是线程安全的,它内部使用ReentrantLock来保证线程安全。ArrayBlockingQueue支持对生产者线程和消费者线程进行公平的调度,默认情况下是不保证公平性的。公平性通常会降低吞吐量,但是减少了可变性和避免了线程饥饿问题。

通常,队列的实现方式有数组和链表两种方式。对于数组这种实现方式来说,可以通过维护一个队尾指针,使得在入队的时候可以在O(1)的时间内完成。但是对于出队操作,在删除队头元素之后,必须将数组中的所有元素都往前移动一个位置,这个操作的复杂度达到了O(n),效果并不是很好。

为了解决这个问题,可以使用另外一种逻辑结构来处理数组中各个位置之间的关系。假设现在有一个数组array[0……n-1],可以把它想象成一个环型结构,即array[n-1]之后是array[0],如下图所示,可以使用两个指针,分别维护队头和队尾两个位置,使入队和出队操作都可以在O(1)的时间内完成。当然,这个环形结构只是逻辑上的结构,实际的物理结构还是一个普通的数组。

因此ArrayBlockingQueue的实现是一个循环数组,使用takeIndex和putIndex来控制元素的出入队列,效率较高。

1、源码分析

1、ArrayBlockingQueue的属性

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
	final Object[] items;   //队列的底层结构
	int takeIndex;    //队头指针
	int putIndex;     //队尾指针
	int count;       //队列中的元素个数
	final ReentrantLock lock;
	//并发时的两种状态
	private final Condition notEmpty;
	private final Condition notFull;
}

items是一个数组,用来存放入队的数据,count表示队列中元素的个数,takeIndex和putIndex分别代表队头和队尾指针。
说明:Lock的作用是提供独占锁机制,来保护竞争的资源;而Condition是为了更精细的对锁进行控制,但是依赖于lock,通过某个条件对多线程进行控制。
notEmpty表示"队列非空的条件"。当某线程想从队列中获取数据的时候,而此时队列中的数据为空,则该线程通过notEmpty.await()方法进行等待;当其他线程向队列中插入元素之后,就调用notEmpty.signal()方法进行唤醒之前等待的线程。
同理,notFull表示"队列不满的条件"。当某个线程向队列中插入元素,而此时队列已满时,该线程等待,通过notFull.wait()方法进行阻塞;其他线程从队列中取出元素之后,就调用notFull.signal()方法唤醒等待的线程。

2、ArrayBlockingQueue的构造函数

public class ArrayBlockingQueue {
	public ArrayBlockingQueue(int capacity) {
	    this(capacity, false);
	}
	
	public ArrayBlockingQueue(int capacity, boolean fair) {
	    if (capacity <= 0)
	        throw new IllegalArgumentException();
	    this.items = new Object[capacity];
	    lock = new ReentrantLock(fair);
	    notEmpty = lock.newCondition();
	    notFull =  lock.newCondition();
	}
	
	public ArrayBlockingQueue(int capacity, boolean fair,
	                          Collection<? extends E> c) {
	    this(capacity, fair);
	
	    final ReentrantLock lock = this.lock;
	    lock.lock(); // Lock only for visibility, not mutual exclusion
	    try {
	        int i = 0;
	        try {
	            for (E e : c) {
	                checkNotNull(e);
	                items[i++] = e;
	            }
	        } catch (ArrayIndexOutOfBoundsException ex) {
	            throw new IllegalArgumentException();
	        }
	        count = i;
	        putIndex = (i == capacity) ? 0 : i;
	    } finally {
	        lock.unlock();
	    }
	}
}
  • 第一个构造函数只需要指定队列大小,默认为非公平锁。
  • 第二个构造函数可以手动指定公平性和队列大小。
  • 第三个构造函数里面使用了ReentrantLock来加锁,然后把传入的集合元素按顺序一个个放入items中。这里加锁目的不是使用它的互斥性,而是让items中的元素对其他线程可见(用的是AQS里的state的volatile可见性)。

3、入队方法

ArrayBlockingQueue 提供了多种入队操作的实现来满足不同情况下的需求,入队操作有如下几种:
boolean add(E e);
void put(E e); //阻塞,其余非阻塞
boolean offer(E e);
boolean offer(E e, long timeout, TimeUnit unit)。

boolean add(E e)方法:

public boolean add(E e) {   // ArrayBlockingQueue.java
    return super.add(e);
}
}

//super.add(e)
public boolean add(E e) {    // AbstractQueue.java
    if (offer(e))            //复用offer方法
        return true;
    else
        throw new IllegalStateException("Queue full");   //抛出异常
}

可以看到add方法调用的是父类AbstractQueue的add方法,实际上调用的是offer方法,并进行封装,针对返回值false情况抛出异常。

boolean offer(E e)方法:

public boolean offer(E e) {    //ArrayBlockingQueue.java
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)    //如果相等,则说明队列满
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

offer方法在队列满了的时候返回false,否则调用enqueue方法插入元素,并返回true。

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;   //存放当前元素
    // 圆环的index操作
    if (++putIndex == items.length)
        putIndex = 0;    //putIndex 标记队尾,下一个元素可以存放的位置
    count++;             //数组内实际元素个数+1,count用来判断满队列或空队列
    notEmpty.signal();   //唤醒等待获取元素的线程
}

enqueue方法首先把元素放在items的putIndex位置,接着判断在putIndex+1等于队列的长度时把putIndex设置为0,也就是上面提到的圆环的index操作。最后唤醒等待获取元素的线程。从enqueue方法,可以得知采用了圆环操作。圆环操作的精髓就是当添加元素到队列最后一个位置时,重新从队列头开始循环,通过内部的putIndex指针实现。

boolean offer(E e, long timeout, TimeUnit unit)方法:

offer(E e, long timeout, TimeUnit unit)方法只是在offer(E e)的基础上增加了超时时间的概念。

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

    checkNotNull(e);
    // 把超时时间转换成纳秒
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    // 获取一个可中断的互斥锁
    lock.lockInterruptibly();
    try {
        // while循环的目的是防止在中断后没有到达传入的timeout时间,继续重试
        while (count == items.length) {
            if (nanos <= 0)
                return false;    //超时
            // 等待nanos纳秒,返回剩余的等待时间(可被中断)
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);   //没有超时,继续插入数据
        return true;
    } finally {
        lock.unlock();
    }
}

该方法利用了ConditionawaitNanos方法,等待指定时间,因为该方法可中断,所以这里利用while循环来处理中断后还有剩余时间的问题,等待时间到了以后调用enqueue方法放入队列。

void put(E e)

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();   //队列满了,阻塞等待
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

put方法在count等于items长度时,一直等待,直到被其他线程唤醒,唤醒后调用enqueue方法放入队列。

4、出队方法

ArrayBlockingQueue提供了多种出队操作的实现来满足不同情况下的需求,如下:
E poll();
E poll(long timeout, TimeUnit unit);
E take()。

E poll()方法:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();  //队列为空,返回null
    } finally {
        lock.unlock();
    }
}

poll方法是非阻塞方法,若队列没有元素,则返回null,否则调用dequeue把队首的元素出队列。

private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;     //圆环原理,循环
    count--;         //队列中的实际元素个数-1
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

dequeue会根据takeIndex获取到该位置的元素,并把该位置置为null,接着利用圆环原理,在takeIndex到达列表长度时设置为0,最后唤醒等待将元素放入队列的线程。

E poll(long timeout, TimeUnit unit)方法:
该方法是poll()的可配置超时等待方法,和上面的offer一样,使用while循环和Condition的awaitNanos来进行等待,等待时间到后执行dequeue获取元素。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {    //超时策略
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);   
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

E take()方法:

队列为空时就一直阻塞。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();  //阻塞
        return dequeue();
    } finally {
        lock.unlock();
    }
}

5、获取元素

E peek()方法获取队首元素,但不删除。

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

final E itemAt(int i) {
    return (E) items[i];
}

6、删除元素

remove(Object o)删除指定对象,注意与remove()无参的区别,前者是删除指定的元素,后者是删除队首的元素。

从队列中删除某一个元素时,要从队首遍历整个队列找到该元素,并把该元素后的所有元素往前移一位,该方法的时间复杂度是O(n)。

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;      //下标
             // 从takeIndex一直遍历到putIndex,直到找到和元素o相同的元素,调用removeAt进行删除
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                if (++i == items.length)
                    i = 0;    // 环形结构,队尾需要重置到队头
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}

remove方法比较简单,它从takeIndex一直遍历到putIndex,直到找到和元素o相同的元素,调用removeAt进行删除。重点看一下removeAt方法。

void removeAt(final int removeIndex) {
    final Object[] items = this.items;
    if (removeIndex == takeIndex) {        //刚好是队头元素,直接删,不需要移位
        // removing front item; just advance
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } else {
        // an "interior" remove

        // slide over all others up through putIndex.
        final int putIndex = this.putIndex;   //队尾,需要把待删除元素后至队尾的所有元素前移
        for (int i = removeIndex;;) {
            int next = i + 1;
            if (next == items.length)
                next = 0;     //环形原理,需要找到队头
            if (next != putIndex) {
                items[i] = items[next];  //非队尾元素,前移一位
                i = next;
            } else {
                items[i] = null;   //原来队尾元素置空,并标记为下次插入元素的下标
                this.putIndex = i;
                break;
            }
        }
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    notFull.signal();
}

2、总结

ArrayBlockingQueue是一个阻塞队列,内部由ReentrantLock来实现线程安全,由Condition的await和signal控制等待唤醒从而实现阻塞队列的功能。它的数据结构是数组,准确的说是一个循环数组(可以类比一个圆环),所有的下标在到达最大长度时自动从0继续开始。

LinkedBlockingQueue

LinkedBlockingQueue和LinkedList一样,内部是基于链表来存放元素的。

LinkedBlockingQueue实现了BlockingQueue接口。LinkedBlockingQueue不同于ArrayBlockingQueue,如果不指定容量,默认为Integer.MAX_VALUE(2的31次方-1=2147483647),也就是无界队列。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,在使用的时候建议手动传一个队列的大小。

1、源码分析

1、属性

/**
 * 节点类,用于存储数据
 */
static class Node<E> {
    E item;
    Node<E> next;

    Node(E x) { item = x; }
}

/** 阻塞队列的大小,默认为Integer.MAX_VALUE */
private final int capacity;

/** 当前阻塞队列中的元素个数 */
private final AtomicInteger count = new AtomicInteger();

/**
 * 阻塞队列的头结点
 */
transient Node<E> head;

/**
 * 阻塞队列的尾节点
 */
private transient Node<E> last;

/** 获取并移除元素时使用的锁,如take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程 */
private final Condition notEmpty = takeLock.newCondition();

/** 添加元素时使用的锁如 put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** notFull条件对象,当队列数据已满时用于挂起执行添加的线程 */
private final Condition notFull = putLock.newCondition();

从上面的属性可知,每个添加到LinkedBlockingQueue队列中的数据都将被封装成Node节点,添加到链表队列中,其中head和last分别指向队列的头结点和尾结点。与ArrayBlockingQueue不同的是,LinkedBlockingQueue内部分别使用了takeLock和 putLock对并发进行控制,也就是说,添加和删除操作并不是互斥操作,可以同时进行,这样也就可以大大提高吞吐量。
另外,LinkedBlockingQueue对每一个lock锁都提供了一个Condition用来挂起和唤醒其他线程。

2、构造函数

public LinkedBlockingQueue() {
    // 默认大小为Integer.MAX_VALUE
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

public LinkedBlockingQueue(Collection<? extends E> c) {    //利用给定的集合构造
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();   //Never contended, but necessary for visibility
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

默认的构造函数和最后一个构造函数创建的队列大小都为Integer.MAX_VALUE,只有第二个构造函数用户可以指定队列的大小。第二个构造函数最后初始化了last和head节点,让它们都指向了一个元素为null的节点。

3、入队方法

LinkedBlockingQueue提供了多种入队操作的实现来满足不同情况下的需求,入队操作有如下几种:
void put(E e);
boolean offer(E e);
boolean offer(E e, long timeout, TimeUnit unit)。

void put(E e)方法:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 获取锁中断
    putLock.lockInterruptibly();
    try {
        //判断队列是否已满,如果已满阻塞等待
        while (count.get() == capacity) {
            notFull.await();
        }
        // 把node放入队列中
        enqueue(node);
        c = count.getAndIncrement();  //元素个数计数器+1
        // 再次判断队列是否有可用空间,如果有唤醒下一个线程进行添加操作
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // 如果队列中有一条数据,唤醒消费线程进行消费
    if (c == 0)
        signalNotEmpty();
}

若队列已满,则阻塞等待。若队列未满,则创建一个node节点放入队列中,如果放完以后队列还有剩余空间,继续唤醒下一个添加线程进行添加。如果放之前队列中没有元素,放完以后要唤醒消费线程进行消费。

来看看该方法中用到的几个其他方法,先来看看enqueue(Node node)方法:

private void enqueue(Node<E> node) {
    last = last.next = node;
}

在构造一个链表时,内部会先初始化一个空的node节点,并赋值给head和last

last = head = new Node<E>(null); //值为null,空节点

接下来看看signalNotEmpty和signalNotFull方法。

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

调用signal方法时,要获取到该signal对应的Condition对象的锁才可以。

boolean offer(E e)方法:

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;  //队列满,直接返回false
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        // 队列有可用空间,放入node节点,判断放入元素后是否还有可用空间,
        // 如果有,唤醒下一个添加线程进行添加操作。
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

可以看到offer仅仅对put方法改动了一点点,当队列满的时候,不同于put方法的阻塞等待,offer方法直接返回false。

boolean offer(E e, long timeout, TimeUnit unit)方法:

该方法只是对offer方法进行了阻塞超时处理,使用了Condition的awaitNanos来进行超时等待。

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        // 等待超时时间nanos,超时时间到了返回false
        while (count.get() == capacity) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

4、出队方法

LinkedBlockingQueue提供了多种出队操作的实现来满足不同情况下的需求,如下:

E take();
E poll();
E poll(long timeout, TimeUnit unit);

E take()方法:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 队列为空,阻塞等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();  //元素计数器-1
        // 队列中还有元素,唤醒下一个消费线程进行消费
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 移除元素之前队列是满的,唤醒生产线程进行添加元素
    if (c == capacity)
        signalNotFull();
    return x;
}

若队列为空,则阻塞等待。若队列不为空,则从队首获取并移除一个元素,如果消费后还有元素在队列中,继续唤醒下一个消费线程进行元素移除。如果移除之前队列是满元素的情况,移除完后要唤醒生产线程进行添加元素。

下面来看看dequeue方法:

private E dequeue() {
    // 获取到head节点
    Node<E> h = head;
    // 获取到head节点指向的下一个节点
    Node<E> first = h.next;
    // head节点原来指向的节点的next指向自己,等待下次gc回收
    h.next = h; // help GC
    // head节点指向新的节点
    head = first;
    // 获取到新的head节点的item值
    E x = first.item;
    // 新head节点的item值设置为null
    first.item = null;
    return x;
}

E poll()方法:

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

poll(long timeout, TimeUnit unit)也和offer(E e, long timeout, TimeUnit unit)一样,利用了Condition的awaitNanos方法来进行阻塞等待直至超时,这里就不列出来说了。

5、获取元素

E peek()方法:获取队首元素,但不删除队首元素。加锁后获取到head节点的next节点,如果为空,则返回null,如果不为空,则返回next节点的item值。

public E peek() {
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}

6、删除元素

boolean remove(Object o)删除指定元素。

public boolean remove(Object o) {
    if (o == null) return false;
    // 两个lock全部上锁
    fullyLock();
    try {
        // 从head开始遍历元素,直到最后一个元素
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            // 如果找到相等的元素,调用unlink方法删除元素
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        // 两个lock全部解锁
        fullyUnlock();
    }
}

void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

因为remove方法使用两个锁全部上锁,所以其他操作都需要等待它完成,而该方法需要从head节点遍历到尾节点,所以时间复杂度为O(n)。

void unlink(Node<E> p, Node<E> trail) {
    // p的元素置为null
    p.item = null;
    // p的前一个节点的next指向p的next,也就是把p从链表中去除了
    trail.next = p.next;
    // 如果last指向p,删除p后让last指向trail
    if (last == p)
        last = trail;
    // 如果删除之前元素是满的,删除之后就有空间了,唤醒生产线程放入元素
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}

2、总结

LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来控制等待和唤醒从而实现阻塞队列的功能。


LinkedBlockingQueue和ArrayBlockingQueue的不同点在于:

  • 1.队列大小有所不同

ArrayBlockingQueue是有界队列,初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。

  • 2.数据存储容器不同

ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的是以Node节点作为连接对象的链表。
由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据时,对于GC可能存在较大影响。

  • 3.两者实现阻塞队列时添加和移除的锁不一样

ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的是同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐