任意一个Java对象,都拥有一组监视器方法(定义在 java.lang.Object 上),主要包括 wait()notify()notifyAll() 等方法,这些方法与 synchronized 同步关键字配合,可以实现等待/通知模式,具体详见《Java线程协作:wait/notify》

Condition 是 java.util.concurrent.locks 包中的一个接口,它也提供了类似 Object 的监视器方法,与 Lock 配合可以实现等待/通知模式,替代 Object.wait()/notify()。与 Object.wait()/notify() 相比,Condition 提供了更高级的功能,能够创建多个条件变量,每个条件变量都有自己的等待队列,使得线程管理更加灵活和精确。

Condition 使用介绍

Condition 对象是由 Lock 对象创建出来的(调用 Lock 对象的 newCondition() 方法)。当前线程调用 Condition 的等待/通知方法时,需要提前获取到 Condition 对象关联的锁。当调用 await() 方法后,当前线程会释放锁并在此等待;而其他线程调用 Condition 对象的 signal() 方法通知当前线程,并且当前线程重新竞争成功获取到同步锁后,当前线程才会从 await()方法返回。

生产者-消费者模型为例,使用 Condition 实现如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumerExample {
    private final Lock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition(); // 缓冲区非空条件
    private final Condition notFull = lock.newCondition();  // 缓冲区未满条件

    private final int[] buffer = new int[5];
    private int count = 0; // 当前元素数量
    private int putIndex = 0;
    private int takeIndex = 0;

    // 生产者方法
    public void produce(int item) throws InterruptedException {
        lock.lock();
        try {
            // 使用 while 而不是 if 判断条件,防止虚假唤醒
            while (count == buffer.length) {
                // 缓冲区满,等待 notFull 条件
                // await() 会自动释放锁,并在被唤醒后重新获取锁
                notFull.await();
            }
            buffer[putIndex] = item;
            putIndex = (putIndex + 1) % buffer.length;
            count++;
            System.out.println("Produced: " + item + ", count=" + count);

            // 通知消费者:缓冲区非空
            // signal() 只唤醒一个等待线程,signalAll() 则唤醒所有
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    // 消费者方法
    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                // 缓冲区空,等待 notEmpty 条件
                // await() 会自动释放锁,并在被唤醒后重新获取锁
                notEmpty.await();
            }
            int item = buffer[takeIndex];
            takeIndex = (takeIndex + 1) % buffer.length;
            count--;
            System.out.println("Consumed: " + item + ", count=" + count);

            // 通知生产者:缓冲区未满
            notFull.signal();
            return item;
        } finally {
            // 在finally块中释放锁,确保异常情况下也能解锁
            lock.unlock();
        }
    }

    // 测试
    public static void main(String[] args) {
        ProducerConsumerExample pc = new ProducerConsumerExample();

        Thread producer = new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                try {
                    pc.produce(i);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        Thread consumer = new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                try {
                    pc.consume();
                    Thread.sleep(150);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        producer.start();
        consumer.start();
    }
}

Condition 原理

在 JUC (java.util.concurrent)并发包中,Condition 并不是一个孤立的组件,而是深度集成于 Lock 接口和 AQSAbstractQueuedSynchronizer)框架中的关键协作机制。

ConditionObject实现了 Condition 接口,是同步器 AbstractQueuedSynchronizer 的内部类。每个 Condition 对象都包含一个单向等待队列,等待队列是一个 FIFO 的单向队列,在队列中的每个节点都包含了一个线程引用,该线程就是在 Condition 对象上等待的线程:

在 Object的监视器模型上,一个对象拥有一个同步队列和一个等待队列(详见Java并发:synchronized原理详解)。而 JUC 并发包中的 Lock AQS 同步器则拥有一个同步队列和多个等待队列,同步队列和等待队列中的节点类型都是同步器的静态内部类 AbstractQueuedSynchronizer.Node

await() 等待过程

当前线程(已获取到同步锁)调用 Condition 的 await() 方法(或者以 await 开头的方法),会使当前线程构造新节点加入等待队列尾部,并释放同步锁,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。

从队列(同步队列和等待队列)的角度看 await() 方法,相当于同步队列的首节点(获取了锁的节点)移动到Condition 的等待队列尾部。

await() 源码如下(JDK 8):

public final void await() throws InterruptedException {
    // 入口检查中断,若已中断,立即抛异常。
    if (Thread.interrupted()) throw new InterruptedException();

    // 1、将当前线程加入 Condition 等待队列
    AbstractQueuedLongSynchronizer.Node node = addConditionWaiter();
    // 2、完全释放锁,并保存锁状态
    long savedState = fullyRelease(node);

    int interruptMode = 0;
    // 3、循环阻塞,直到被 signal(节点移入同步队列)或中断。
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 4、在同步队列中重新获取锁。
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 清理取消节点(防内存泄漏)
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

signal() 通知过程

当前线程(必须已获取同步锁)调用 Condition 的 signal() 方法,将会获取在等待队列中等待时间最长的节点(首节点),将其移到同步队列中,并使用 LockSupport 唤醒节点中的线程,开始尝试竞争获取同步状态。成功竞争获取到同步状态(同步锁)之后,被唤醒的线程将从先前调用的 await() 方法返回,此时该线程已经成功地获取了同步锁。如果不是通过其他线程调用 Condition.signal() 方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException。

Condition 的 signalAll() 方法,相当于对等待队列中的每个节点均执行一次 signal() 方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程去重新竞争获取同步锁。

从队列(同步队列和等待队列)的角度看 signal() 方法,相当于从等待队列取出头节点,重新加入同步队列竞争锁。

signal() 源码如下(JDK 8):

public final void signal() {
    // 检查调用者是否持有锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 获取等待队列的头节点
    // firstWaiter 是 ConditionObject 内部字段,指向 Condition 等待队列的第一个节点
    AbstractQueuedLongSynchronizer.Node first = firstWaiter;
    // 若等待队列非空,则执行唤醒
    if (first != null)
        doSignal(first);
}

private void doSignal(AbstractQueuedLongSynchronizer.Node first) {
    do {
        // 从等待队列中移除头节点
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        // 尝试将 first 节点从等待队列转移到AQS的同步队列
        // 被转移的线程将在同步队列中重新竞争锁,并在获取锁后才从 await() 返回
        // 一旦成功转移一个有效节点,循环结束
    } while (!transferForSignal(first) &&
            (first = firstWaiter) != null);
}

Object Monitor 与 Condition 对比

对比维度 Object 监视器(synchronized + wait/notify) Condition(ReentrantLock + Condition)
所属机制 Java 内置监视器(JVM 层原生支持) java.util.concurrent.locks 包提供的显式锁机制
锁获取方式 隐式:通过 synchronized 关键字自动加锁/释放 显式:需手动调用 lock() / unlock()
条件等待方法 wait(), wait(long), wait(long, int) await(), awaitNanos(), awaitUntil(), awaitUninterruptibly()
唤醒方法 notify(), notifyAll() signal(), signalAll()
条件队列数量 每个对象只有 1 个 等待队列 每个 Lock 可创建 多个独立的 Condition,每个对应一个等待队列
中断响应 wait() 可被中断,抛出 InterruptedException 提供多种选择:
• await():可中断
• awaitUninterruptibly():不可中断
• 支持超时+中断组合
锁释放行为 调用 wait() 时自动释放锁,被唤醒后自动重新获取 调用 await() 时完全释放锁(包括重入次数),返回前重新竞争获取
使用前提 必须在 synchronized 块内调用 wait/notify 必须在持有 Lock 的情况下调用 await/signal
异常安全性 自动释放锁,无需担心忘记释放 必须在 finally 块中调用 unlock(),否则可能死锁
性能与可扩展性 JVM 优化较好,但功能受限 更灵活,适合复杂并发场景;AQS 底层高效,支持自定义同步器
公平性支持 不支持公平锁 ReentrantLock 可配置为公平锁(FairSync)
典型应用场景 简单线程协作(如基础生产者-消费者) 复杂多条件协作(如阻塞队列、读写分离条件等)
底层实现 JVM 的 ObjectMonitor(C++ 实现) 基于 AQS 的 ConditionObject(Java 实现)
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐