每个对象都可以调用 Object 的 wait/notify 方法来实现等待/通知机制。而 Condition 接口也提供了类似的方法。
Condition 接口一共提供了以下 7 个方法:
  • await():线程等待直到被通知或者中断。类似于 Object.wait()
  • awaitUninterruptibly():线程等待直到被通知,即使在等待时被中断也不会返回。没有与之对应的 Object 方法。
  • await(long time, TimeUnit unit):线程等待指定的时间,或被通知,或被中断。类似于 Object.wait(long timeout),但提供了更灵活的时间单位。
  • awaitNanos(long nanosTimeout):线程等待指定的纳秒时间,或被通知,或被中断。没有与之对应的 Object 方法。
  • awaitUntil(Date deadline):线程等待直到指定的截止日期,或被通知,或被中断。没有与之对应的 Object 方法。
  • signal():唤醒一个等待的线程。类似于 Object.notify()
  • signalAll():唤醒所有等待的线程。类似于 Object.notifyAll()
以下是Object 类的主要方法,我们来做一下对比:
  • wait():线程等待直到被通知或者中断。
  • wait(long timeout):线程等待指定的时间,或被通知,或被中断。
  • wait(long timeout, int nanos):线程等待指定的时间,或被通知,或被中断。
  • notify():唤醒一个等待的线程。
  • notifyAll():唤醒所有等待的线程。

Condition 源码分析

要想深入理解 Condition 的实现原理,就需要挖掘一下 Condiiton 的源码。
创建一个 Condition 对象可以通过lock.newCondition() 来创建,这个方法实际上会 new 一个 ConditionObject 的对象,ConditionObject 是 AQS 的一个内部类,我们就拿 ReentrantLock 来举例说明吧。
public class ReentrantLock implements Lock, java.io.Serializable {
    abstract static class Sync extends AbstractQueuedSynchronizer {
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
    }
    public Condition newCondition() {
        return sync.newCondition();
    }
}
AQS 内部维护了一个先进先出(FIFO)的双端队列,并使用了两个引用 head 和 tail 用于标识队列的头部和尾部。
Condition 内部也使用了同样的方式,内部维护了一个先进先出(FIFO)的单向队列,我们把它称为等待队列。
所有调用 await 方法的线程都会加入到等待队列中,并且线程状态均为等待状态。firstWaiter 指向首节点,lastWaiter 指向尾节点,源码如下:
public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
}
Node 中的 nextWaiter 指向队列中的下一个节点。并且进入到等待队列的 Node 节点状态都会被设置为 CONDITION。
同时还有一点需要注意:我们可以多次调用newCondition()方法创建多个 Condition 对象,也就是一个 lock 可以持有多个等待队列。
而如果是 Object 方式的话,就只能有一个同步队列和一个等待队列。
因此,ReentrantLock 等 AQS 是可以持有一个同步队列和多个等待队列的,new 多个 Condition 就行了。示意图如下:
AQS持有多个Condition
持有多个等待队列的好处是什么呢?我们可以通过下面这个例子来说明:
public class BoundedBuffer<T> {
    private final LinkedList<T> buffer;  // 使用 LinkedList 作为缓冲区
    private final int capacity;          // 缓冲区最大容量
    private final ReentrantLock lock;    // 互斥锁
    private final Condition notEmpty;    // 缓冲区非空条件
    private final Condition notFull;     // 缓冲区非满条件

    public BoundedBuffer(int capacity) {
        this.capacity = capacity;
        this.buffer = new LinkedList<>();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.notFull = lock.newCondition();
    }

    // 放入一个元素
    public void put(T item) throws InterruptedException {
        lock.lock();
        try {
            // 如果缓冲区满,等待
            while (buffer.size() == capacity) {
                notFull.await();
            }
            buffer.add(item);
            // 通知可能正在等待的消费者
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    // 取出一个元素
    public T take() throws InterruptedException {
        lock.lock();
        try {
            // 如果缓冲区空,等待
            while (buffer.isEmpty()) {
                notEmpty.await();
            }
            T item = buffer.removeFirst();
            // 通知可能正在等待的生产者
            notFull.signal();
            return item;
        } finally {
            lock.unlock();
        }
    }
}
考虑这个简单的有界缓冲区 BoundedBuffer,其中生产者放入元素,消费者取出元素。我们将使用两个 Condition:一个表示缓冲区不为空(用于消费者等待),另一个表示缓冲区不满(用于生产者等待)。
生产者调用 put 方法放入元素,如果缓冲区已满,则等待 notFull 条件。消费者调用 take 方法取出元素,如果缓冲区为空,则等待 notEmpty 条件。当一个元素被放入或取出时,相应的条件会发出信号,唤醒等待的线程。
使用多个 Condition 对象的主要优点是为锁提供了更细粒度的控制,可以实现更复杂的同步场景,比如上面提到的有界缓冲区。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐