一、前言

        在基础并发编程中,我们常用 Object 类的 wait() 、 notify() 、 notifyAll() 方法实现线程间的等待 / 唤醒通信,但这套机制存在明显局限性 —— 仅支持单一等待队列,无法针对不同条件进行精准唤醒,容易出现 “唤醒错线程” 的无效调度。

        ReentrantLock 搭配 Condition 接口则完美解决了这一问题,它允许为一把锁绑定多个条件队列,实现线程的多条件精准唤醒。本文将从 Condition 的核心特性、源码原理、实战案例三个维度,全面解析这一进阶特性,帮大家掌握更灵活的并发通信方式。

 

二、Condition 接口核心特性

1. 与 Object 等待 / 唤醒机制的对比

先通过表格明确 Condition 与 Object 方法的核心差异,理解其设计优势:

特性

Object 等待 / 唤醒机制

Condition 接口

等待队列数量

单一队列(一个锁对应一个)

多队列(一个锁可绑定多个)

唤醒粒度

随机唤醒(notify ())/ 全唤醒(notifyAll ())

精准唤醒(signal ()/signalAll () 指定条件队列)

使用依赖

必须在 synchronized 块中

必须在 ReentrantLock 保护下

方法中断性

wait () 可中断,但需捕获异常

await () 支持可中断 / 不可中断等多种模式

2. Condition 核心特性

  • 多条件队列:一个 ReentrantLock 可通过 newCondition() 方法创建多个 Condition 实例,每个实例对应一个独立的条件等待队列,线程可根据不同业务条件进入不同队列等待;

  • 精准唤醒:通过 signal() 唤醒对应条件队列的首线程, signalAll() 唤醒对应队列的所有线程,避免无关线程被误唤醒,提升并发调度效率;

  • 灵活的等待模式:除基础的 await() 外,还支持 awaitUninterruptibly() (不可中断等待)、 await(long time, TimeUnit unit) (超时等待)、 awaitUntil(Date deadline) (截止时间等待)等丰富方法;

  • 与锁的强绑定:Condition 由 ReentrantLock 直接创建,其底层依赖锁的同步状态,只有获取锁的线程才能调用 Condition 的等待 / 唤醒方法。

 

三、Condition 接口基础使用

1. 核心方法介绍

Condition 接口的核心方法可分为 “等待” 和 “唤醒” 两类,使用时需严格遵循锁保护的前提:

等待方法

  • void await() throws InterruptedException:当前线程释放锁并进入条件队列等待,可被中断,唤醒后需重新竞争锁;

  • void awaitUninterruptibly():与 await() 类似,但不响应线程中断;

  • boolean await(long time, TimeUnit unit) throws InterruptedException:超时等待,超时未唤醒则返回 false ,否则返回 true ;

唤醒方法

  • void signal():唤醒对应条件队列的首线程,使其从 await () 中恢复并竞争锁;

  • void signalAll():唤醒对应条件队列的所有等待线程。

2. 基础使用范式

Condition 必须与 ReentrantLock 配合使用,且所有方法调用必须在 lock() 和 unlock() 之间,示例如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionBasicDemo {
    private static final ReentrantLock lock = new ReentrantLock();
    // 创建两个独立的Condition实例,对应不同条件
    private static final Condition conditionA = lock.newCondition();
    private static final Condition conditionB = lock.newCondition();
    public static void awaitConditionA() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 进入conditionA等待");
            conditionA.await(); // 释放锁,进入conditionA队列等待
            System.out.println(Thread.currentThread().getName() + " 被conditionA唤醒,继续执行");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }
    public static void signalConditionA() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 唤醒conditionA队列的线程");
            conditionA.signal(); // 唤醒conditionA队列首线程
        } finally {
            lock.unlock();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(ConditionBasicDemo::awaitConditionA, "线程1");
        t1.start();
        Thread.sleep(1000);
        // 主线程唤醒conditionA的等待线程
        signalConditionA();
    }
}

核心注意点 :调用 await() 时,线程会自动释放持有的 ReentrantLock 锁,否则其他线程无法获取锁并执行唤醒操作;唤醒后线程需重新竞争锁,获取锁后才能继续执行 await() 之后的逻辑。

 

四、Condition 底层原理(基于 AQS 源码)

        Condition 的底层实现是 AQS 的内部类 ConditionObject ,其核心是条件队列同步队列的联动,我们从数据结构和核心方法两方面拆解原理。

1. 核心数据结构:条件队列

每个 ConditionObject 实例对应一个独立的 单向条件队列 (与 AQS 的双向同步队列区分),队列中的节点复用 AQS 的 Node 类,核心属性为 nextWaiter (维护单向链表)。

public class ConditionObject implements Condition, java.io.Serializable {
    // 条件队列的首节点
    private transient Node firstWaiter;
    // 条件队列的尾节点
    private transient Node lastWaiter;
    // 省略其他代码...
}
  • 当线程调用 await() 时,会被封装为 Node 节点加入条件队列尾部;

  • 当调用 signal() 时,会将条件队列的首节点转移到 AQS 的同步队列,等待竞争锁。

2. await () 方法核心流程

await() 是 Condition 的核心等待方法,其底层逻辑可概括为 “释放锁→加入条件队列→阻塞线程→被唤醒后转移到同步队列”,简化流程如下:

public final void await() throws InterruptedException {
    // 1. 响应中断,若线程已中断则抛异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 2. 将当前线程封装为Node,加入Condition的条件队列尾部
    Node node = addConditionWaiter();
    // 3. 释放当前线程持有的锁(同步状态state归零)
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 4. 判断节点是否在同步队列中,不在则阻塞线程
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 线程阻塞
        // 检查中断状态,处理中断逻辑
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 5. 被唤醒后,进入同步队列竞争锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 6. 清理条件队列中的无效节点
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    // 7. 处理中断(抛异常或重置中断状态)
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

关键步骤解析 :

  • addConditionWaiter():将线程封装为 Node 加入条件队列,同时清理队列中已取消的节点;

  • fullyRelease(node):彻底释放锁(无论重入次数多少,直接将 state 置 0),保证其他线程能获取锁;

  • isOnSyncQueue(node):判断节点是否已转移到 AQS 同步队列,未转移则持续阻塞;

  • 线程被唤醒后,会通过 acquireQueued() 参与同步队列的锁竞争,获取锁后才能继续执行。

3. signal () 方法核心流程

signal() 是精准唤醒的核心方法,其底层逻辑为 “转移条件队列首节点到同步队列→唤醒线程”,简化流程如下:

public final void signal() {
    // 1. 校验:只有锁持有者才能调用唤醒方法
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 2. 获取条件队列首节点
    Node first = firstWaiter;
    if (first != null)
        // 3. 转移节点到同步队列并唤醒
        doSignal(first);
}
private void doSignal(Node first) {
    do {
        // 4. 移除条件队列首节点
        if ((firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        // 5. 转移节点到AQS同步队列,失败则尝试下一个节点
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
    // 6. 标记节点状态,取消条件等待
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;
    // 7. 将节点加入AQS同步队列尾部
    Node p = enq(node);
    int ws = p.waitStatus;
    // 8. 唤醒节点对应的线程
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

关键步骤解析 :

  • isHeldExclusively():通过 AQS 方法校验当前线程是否为锁持有者,保证唤醒操作的安全性;

  • transferForSignal(node):核心转移方法,先修改节点状态(从条件等待态转为同步队列态),再将节点加入同步队列,最后唤醒线程;

  • 线程被唤醒后,会从 await() 方法的 LockSupport.park() 处恢复,进入同步队列竞争锁。

 

五、实战案例:多条件生产者消费者模型

        我们以 “有界队列的生产者消费者” 为例,对比 Object 方法与 Condition 接口的实现差异,体现 Condition 精准唤醒的优势。

1. 业务场景

实现一个容量为 5 的有界队列:

  • 当队列满时,生产者线程等待,唤醒消费者线程;

  • 当队列空时,消费者线程等待,唤醒生产者线程。

2. 基于 Condition 的实现

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionProducerConsumer {
    // 有界队列,容量5
    private static final Queue<Integer> queue = new LinkedList<>();
    private static final int CAPACITY = 5;
    private static final ReentrantLock lock = new ReentrantLock();
    // 两个条件:队列空(消费者等待)、队列满(生产者等待)
    private static final Condition emptyCondition = lock.newCondition();
    private static final Condition fullCondition = lock.newCondition();
    // 生产者方法
    public static void produce(int num) {
        lock.lock();
        try {
            // 队列满则等待
            while (queue.size() == CAPACITY) {
                System.out.println(Thread.currentThread().getName() + " 队列已满,等待消费");
                fullCondition.await();
            }
            // 生产数据
            queue.offer(num);
            System.out.println(Thread.currentThread().getName() + " 生产数据:" + num + ",队列大小:" + queue.size());
            // 唤醒等待的消费者
            emptyCondition.signal();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }
    // 消费者方法
    public static void consume() {
        lock.lock();
        try {
            // 队列空则等待
            while (queue.isEmpty()) {
                System.out.println(Thread.currentThread().getName() + " 队列已空,等待生产");
                emptyCondition.await();
            }
            // 消费数据
            int num = queue.poll();
            System.out.println(Thread.currentThread().getName() + " 消费数据:" + num + ",队列大小:" + queue.size());
            // 唤醒等待的生产者
            fullCondition.signal();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }
    public static void main(String[] args) {
        // 启动3个生产者线程
        for (int i = 0; i < 3; i++) {
            int finalI = i;
            new Thread(() -> {
                for (int j = 0; j < 3; j++) {
                    produce(finalI * 10 + j);
                }
            }, "生产者" + i).start();
        }
        // 启动2个消费者线程
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                for (int j = 0; j < 5; j++) {
                    consume();
                }
            }, "消费者" + i).start();
        }
    }
}

3. 实现优势分析

  • 精准唤醒:生产者只唤醒等待 “队列空” 的消费者,消费者只唤醒等待 “队列满” 的生产者,无无效唤醒,比 Object 的 notifyAll() 更高效;

  • 逻辑清晰:两个条件队列分别对应 “队列满” 和 “队列空” 的业务场景,代码可读性与可维护性远超单一等待队列的实现;

  • 避免虚假唤醒:通过 while 循环判断队列状态(而非 if ),即使出现虚假唤醒,也会重新校验条件,保证业务逻辑正确性。

 

六、Condition 避坑指南

  1. 必须在锁保护下调用方法:未获取 ReentrantLock 锁时调用 await() / signal() ,会直接抛出 IllegalMonitorStateException ;

  2. 使用 while 循环校验条件:不能用 if 判断等待条件,因为线程可能被虚假唤醒(即使未调用 signal (),也可能因系统原因唤醒), while 循环可确保唤醒后重新校验条件;

  3. await () 会释放锁:调用 await() 后线程会彻底释放锁,重入次数会被重置,唤醒后需重新竞争锁;

  4. signal () 不释放锁:调用 signal() 仅转移节点并唤醒线程,锁需手动调用 unlock() 释放,否则被唤醒线程无法获取锁。

七、总结

本文详解了 Condition 接口的核心特性、底层原理与实战用法,核心要点总结如下:

  1. Condition 基于 AQS 实现,通过 “条件队列 + 同步队列” 的联动,实现多条件精准唤醒;

  2. 一个 ReentrantLock 可绑定多个 Condition,解决了 Object 单一等待队列的局限性;

  3. await () 会释放锁并进入条件队列,signal () 会将条件队列节点转移到同步队列,唤醒后线程需重新竞争锁。

 

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐