深入浅出Java Condition 的await和signal机制(一)
Condition接口提供了比Object类更细粒度的线程同步控制,包含7个方法实现等待/通知机制。作为AQS内部类ConditionObject的实现,通过维护等待队列支持多条件变量。相比Object只能有一个等待队列,ReentrantLock通过newCondition()可创建多个等待队列,如典型应用BoundedBuffer中使用notEmpty和notFull两个条件分别控制生产者和消
·
每个对象都可以调用 Object 的 wait/notify 方法来实现等待/通知机制。而 Condition 接口也提供了类似的方法。
Condition 接口一共提供了以下 7 个方法:
- await():线程等待直到被通知或者中断。类似于 Object.wait()。
- awaitUninterruptibly():线程等待直到被通知,即使在等待时被中断也不会返回。没有与之对应的 Object 方法。
- await(long time, TimeUnit unit):线程等待指定的时间,或被通知,或被中断。类似于 Object.wait(long timeout),但提供了更灵活的时间单位。
- awaitNanos(long nanosTimeout):线程等待指定的纳秒时间,或被通知,或被中断。没有与之对应的 Object 方法。
- awaitUntil(Date deadline):线程等待直到指定的截止日期,或被通知,或被中断。没有与之对应的 Object 方法。
- signal():唤醒一个等待的线程。类似于 Object.notify()。
- signalAll():唤醒所有等待的线程。类似于 Object.notifyAll()。
以下是Object 类的主要方法,我们来做一下对比:
- wait():线程等待直到被通知或者中断。
- wait(long timeout):线程等待指定的时间,或被通知,或被中断。
- wait(long timeout, int nanos):线程等待指定的时间,或被通知,或被中断。
- notify():唤醒一个等待的线程。
- notifyAll():唤醒所有等待的线程。
Condition 源码分析
要想深入理解 Condition 的实现原理,就需要挖掘一下 Condiiton 的源码。
创建一个 Condition 对象可以通过lock.newCondition() 来创建,这个方法实际上会 new 一个 ConditionObject 的对象,ConditionObject 是 AQS 的一个内部类,我们就拿 ReentrantLock 来举例说明吧。
public class ReentrantLock implements Lock, java.io.Serializable {
abstract static class Sync extends AbstractQueuedSynchronizer {
final ConditionObject newCondition() {
return new ConditionObject();
}
}
public Condition newCondition() {
return sync.newCondition();
}
}
AQS 内部维护了一个先进先出(FIFO)的双端队列,并使用了两个引用 head 和 tail 用于标识队列的头部和尾部。
Condition 内部也使用了同样的方式,内部维护了一个先进先出(FIFO)的单向队列,我们把它称为等待队列。
所有调用 await 方法的线程都会加入到等待队列中,并且线程状态均为等待状态。firstWaiter 指向首节点,lastWaiter 指向尾节点,源码如下:
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
Node 中的 nextWaiter 指向队列中的下一个节点。并且进入到等待队列的 Node 节点状态都会被设置为 CONDITION。
同时还有一点需要注意:我们可以多次调用newCondition()方法创建多个 Condition 对象,也就是一个 lock 可以持有多个等待队列。
而如果是 Object 方式的话,就只能有一个同步队列和一个等待队列。
因此,ReentrantLock 等 AQS 是可以持有一个同步队列和多个等待队列的,new 多个 Condition 就行了。示意图如下:
AQS持有多个Condition
持有多个等待队列的好处是什么呢?我们可以通过下面这个例子来说明:
public class BoundedBuffer<T> {
private final LinkedList<T> buffer; // 使用 LinkedList 作为缓冲区
private final int capacity; // 缓冲区最大容量
private final ReentrantLock lock; // 互斥锁
private final Condition notEmpty; // 缓冲区非空条件
private final Condition notFull; // 缓冲区非满条件
public BoundedBuffer(int capacity) {
this.capacity = capacity;
this.buffer = new LinkedList<>();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.notFull = lock.newCondition();
}
// 放入一个元素
public void put(T item) throws InterruptedException {
lock.lock();
try {
// 如果缓冲区满,等待
while (buffer.size() == capacity) {
notFull.await();
}
buffer.add(item);
// 通知可能正在等待的消费者
notEmpty.signal();
} finally {
lock.unlock();
}
}
// 取出一个元素
public T take() throws InterruptedException {
lock.lock();
try {
// 如果缓冲区空,等待
while (buffer.isEmpty()) {
notEmpty.await();
}
T item = buffer.removeFirst();
// 通知可能正在等待的生产者
notFull.signal();
return item;
} finally {
lock.unlock();
}
}
}
考虑这个简单的有界缓冲区 BoundedBuffer,其中生产者放入元素,消费者取出元素。我们将使用两个 Condition:一个表示缓冲区不为空(用于消费者等待),另一个表示缓冲区不满(用于生产者等待)。
生产者调用 put 方法放入元素,如果缓冲区已满,则等待 notFull 条件。消费者调用 take 方法取出元素,如果缓冲区为空,则等待 notEmpty 条件。当一个元素被放入或取出时,相应的条件会发出信号,唤醒等待的线程。
使用多个 Condition 对象的主要优点是为锁提供了更细粒度的控制,可以实现更复杂的同步场景,比如上面提到的有界缓冲区。
更多推荐

所有评论(0)