1 概述

  • CountDownLatch 是 Java 并发包中用于线程同步的工具类,核心逻辑是:

    • 初始化时指定一个计数count

    • 线程执行完任务后调用 countDown(),使计数count减1;

    • 其他线程调用 await() 会被阻塞,直到count减到0时,阻塞的线程才会被唤醒并继续执行;

  • 流程步骤:

    在这里插入图片描述

    • 初始化:创建 CountDownLatch 时指定count=4(即图中的state=4),代表需要等待4个线程完成任务;

    • 主线程阻塞main线程调用 await(),进入阻塞状态(图中粉色“main 阻塞”块),直到count减为0;

    • 子线程执行并计数递减

      • Thread1 执行完任务,调用 countDown()count从4→3(图中state=3);
      • Thread2 调用 countDown()count从3→2(图中state=2);
      • Thread3 调用 countDown()count从2→1(图中state=1);
      • Thread4 调用 countDown()count从1→0(图中state=0);
  • 主线程唤醒并继续:当count=0时,main线程的await()不再阻塞,继续执行后续逻辑(图中绿色“main Done”块);

  • 源码方法总览:

    在这里插入图片描述

2 构造函数

public CountDownLatch(int count) {
    // 确保传入的计数 count 不能为负数,否则直接抛出异常,保证了 CountDownLatch 初始化的合理性
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // 初始化sync属性
    this.sync = new Sync(count);
}
  • SyncCountDownLatch 内部基于 AQS(抽象队列同步器) 实现的同步组件,它将传入的 count 作为 AQS 的同步状态(state),为后续的 countDown() 计数递减和 await() 阻塞唤醒逻辑提供了底层支持。

3 Sync-队列同步器

// Sync 是 AQS 的子类,用于实现 CountDownLatch 的同步逻辑
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    // 构造函数通过 setState(count) 将 CountDownLatch 的初始化计数赋值给 AQS 的 state 属性(state 是 AQS 用于维护同步状态的核心变量)
    Sync(int count) {
        setState(count);
    }
    
    // 获取当前 AQS 中 state 的值,即 CountDownLatch 剩余的未完成线程数
    int getCount() {
        return getState();
    }
    
    // 判断是否可以“获取共享资源”(即 CountDownLatch 的计数是否已减到 0)
    // 若 state == 0,返回 1(表示可以获取,await() 方法不会阻塞)
    // 若 state != 0,返回 -1(表示无法获取,await() 方法会阻塞线程)
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    
    // 释放锁
    protected boolean tryReleaseShared(int releases) {
        // 自旋
        for (;;) {
            int c = getState(); // 获取 AQS 的 state
            if (c == 0) // 计数已为0,无需再释放
                return false;
            int nextc = c-1; // 计数减1
            if (compareAndSetState(c, nextc)) // CAS原子更新state
                return nextc == 0; // 若减到0,返回true(触发唤醒所有阻塞线程)
        }
    }
}

4 await()-阻塞等待

  • CountDownLatch#await():入口方法

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    • 作用:使当前线程阻塞等待,直到 CountDownLatch 的计数减到 0;

    • 实现:委托给 sync(基于 AQS 的同步组件)的 acquireSharedInterruptibly(1) 方法执行,1 是 AQS 共享式获取的参数(此处无实际数值意义,仅为兼容方法签名);

  • AQS#acquireSharedInterruptibly(int arg):共享式可中断获取逻辑

    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException(); // 检查线程中断,若已中断则抛异常
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg); // 若获取失败,进入阻塞队列逻辑
    }
    
    • 中断检查:先判断线程是否被中断,若已中断则立即抛出 InterruptedException

    • 尝试获取资源:调用 tryAcquireShared(arg)(由 CountDownLatchSync 实现),若返回值 < 0(表示计数未到 0),则进入 doAcquireSharedInterruptibly(arg) 处理阻塞逻辑;

  • CountDownLatch#Sync#tryAcquireShared(int acquires):共享式获取的状态判断

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    
    • 作用:判断 CountDownLatch 的计数是否已减到 0
      • state == 0,返回 1(表示可以获取,await() 不阻塞);
      • state != 0,返回 -1(表示无法获取,await() 会阻塞)。
  • AQS#doAcquireSharedInterruptibly(int arg):共享式阻塞获取(可中断)

    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); // 将当前线程包装为“共享模式”节点,加入等待队列
        boolean failed = true;
        try {
            for (;;) { // 自旋
                final Node p = node.predecessor(); // 获取前驱节点
                if (p == head) { // 若前驱是头节点,说明当前节点有资格尝试获取资源
                    int r = tryAcquireShared(arg);
                    if (r >= 0) { // 计数已到0,获取成功
                        setHeadAndPropagate(node, r); // 设置头节点并传播唤醒(共享模式特有)
                        p.next = null; // 断开原头节点引用,帮助GC
                        failed = false;
                        return;
                    }
                }
                // 若获取失败,判断是否需要挂起线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // 挂起线程并检查中断
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node); // 若获取过程中失败,取消节点的获取请求
        }
    }
    
    • 加入等待队列addWaiter(Node.SHARED) 将当前线程包装为“共享模式”的节点,加入 AQS 的等待队列尾部;

    • 自旋尝试获取:循环中先判断“前驱是否为头节点”(若为头节点,说明当前节点是队列中最有资格获取资源的线程),然后再次尝试 tryAcquireShared(arg)

    • 线程挂起与中断:若获取失败,通过 shouldParkAfterFailedAcquire 判断是否需要挂起线程;若线程在挂起期间被中断,parkAndCheckInterrupt() 会返回 true,进而抛出 InterruptedException

5 countDown()-释放锁资源

  • CountDownLatch#countDown():入口方法

    public void countDown() {
        sync.releaseShared(1);
    }
    
    • 作用:将 CountDownLatch 的计数减 1,若计数减到 0,则唤醒所有等待的线程;

    • 实现:委托给sync(基于 AQS 的同步组件)的releaseShared(1)方法执行,1表示要递减的计数(此处固定为 1,因为 countDown() 每次只减 1);

  • AQS#releaseShared(int arg):共享式释放逻辑

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { // 尝试递减计数
            doReleaseShared(); // 唤醒等待队列中的线程
            return true;
        }
        return false;
    }
    
    • 尝试释放资源:调用 tryReleaseShared(arg)(由 CountDownLatchSync 实现),若递减成功则进入下一步;

    • 唤醒等待线程:调用 doReleaseShared() 唤醒 AQS 等待队列中阻塞的线程(即调用 await() 的线程);

  • CountDownLatch#Sync#tryReleaseShared(int releases):计数递减的核心逻辑

    protected boolean tryReleaseShared(int releases) {
        for (;;) { // 自旋保证原子性
            int c = getState(); // 获取当前计数(AQS的state)
            if (c == 0) return false; // 计数已为0,无需再递减
            int nextc = c - 1; // 计数减1
            if (compareAndSetState(c, nextc)) { // CAS原子更新计数
                return nextc == 0; // 若减到0,返回true(触发唤醒所有阻塞线程)
            }
        }
    }
    
    • 自旋 + CAS 保证原子性:通过循环尝试 CAS 操作,保证多线程同时调用 countDown() 时计数递减的原子性;

    • 唤醒触发条件:当 nextc == 0 时返回 true,AQS 会感知到这个状态变化,进而调用 doReleaseShared() 唤醒所有等待的线程;

  • AQS#doReleaseShared():唤醒等待队列的共享线程

    private void doReleaseShared() {
        for (;;) { // 自旋保证唤醒可靠性
            Node h = head; // 获取等待队列的头节点
            if (h != null && h != tail) { // 队列非空且有等待线程
                int ws = h.waitStatus; // 获取AQS等待队列中头节点的等待状态
                if (ws == Node.SIGNAL) { // 头节点状态为SIGNAL(表示后续节点需要唤醒)
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 
                        continue; // CAS更新状态失败,继续自旋
                    unparkSuccessor(h); // 唤醒头节点的后继线程
                } 
                // 处理共享模式下的状态传播(保证多个线程依次被唤醒)
                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;
            }
            if (h == head) // 头节点未变化,说明唤醒操作完成
                break;
        }
    }
    
    • 自旋循环:不断尝试唤醒操作,确保所有需要唤醒的线程都能被处理

    • 状态判断与唤醒:若头节点状态为 SIGNAL,则通过 unparkSuccessor(h) 唤醒其后继线程;同时设置状态为 PROPAGATE,保证共享模式下唤醒的“传播性”(多个等待线程依次被唤醒)。

6 总结

  • CountDownLatch 基于 AQS(抽象队列同步器)CAS(比较并交换) 实现:

    • AQS 提供了同步状态管理(state 属性)和等待队列机制,是 CountDownLatch 实现“线程阻塞/唤醒”的基础;

    • CAS 保证了“计数递减”操作的原子性(如 countDown() 中通过 CAS 原子更新 state);

  • CountDownLatch 的构造函数必须指定 count(需等待的线程数),并通过内部类 Sync(继承自 AQS)将 count 赋值给 AQS 的 state 属性。这一步为后续的“计数递减”和“阻塞等待”逻辑奠定了状态基础;

  • 调用 countDown() 时,本质是将 AQS 的 state 减 1(通过 CAS 保证原子性);

  • 当所有线程执行完毕,state 会被减到 0,此时 countDown() 会触发 AQS 唤醒等待队列中所有挂起的线程(即调用 await() 的线程);

  • 调用 await() 时,本质是判断 AQS 的 state 是否为 0:

    • state > 0,说明还有线程未执行完毕,await() 会阻塞当前线程,将其加入 AQS 等待队列;
    • state == 0(最后一个线程执行 countDown() 后),await() 会停止阻塞,当前线程继续执行。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐