一、CountDownLatch 核心概述

CountDownLatch 是 Java 5 引入的并发工具类,隶属于 java.util.concurrent 包,其核心作用是让一个或多个线程等待其他线程完成操作,涉及两组线程组。

其工作原理核心是基于递减计数器工作(AQS),流程为 “初始化计数器→任务线程递减计数器→等待线程阻塞与唤醒”。

二、核心方法

构造方法

CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完 成,这里就传入N。

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

由于countDown方法可以用在任何地方,所以这里说的N个 点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,只需要把这个 CountDownLatch的引用传递到线程里即可。

CountDownLatch使用了AQS来做实现,countDown()就是对AQS的state值减一,await()就是去验证state是否等于0

countDown():计数器递减方法

核心功能:将 CountDownLatch 的内部计数器值原子性地减 1。

    public void countDown() {
        sync.releaseShared(1);
    }

关键特性:

  • 无返回值,执行效率极高,不会阻塞当前线程,调用后线程可继续执行后续操作;
  • 当计数器值已为 0 时,调用该方法不会产生任何变化,也不会抛出异常;
  • 可在任意线程中调用,包括任务线程自身,或其他监控线程。

await():无超时等待方法

核心功能:让当前调用线程进入阻塞状态,直到 CountDownLatch 的计数器值递减至 0。
关键特性:

  • 无返回值,阻塞期间会释放线程持有的锁资源(若有);
  • 当计数器值变为 0 时,所有调用该方法的阻塞线程会被同时唤醒,继续执行;
  • 响应线程中断:阻塞期间若当前线程被中断(调用 interrupt() 方法),会抛出 InterruptedException 异常,且线程会退出阻塞状态。

await(long timeout, TimeUnit unit):带超时等待方法

核心功能:让当前调用线程进入阻塞状态,等待计数器值递减至 0,同时设置超时时间。
关键特性:

  • 有返回值:返回 boolean 类型,若计数器值在超时时间内变为 0,返回 true;若超时时间已到但计数器值仍未为 0,返回 false;
  • 超时自动退出:即使计数器未归 0,超时时间到达后,线程也会自动退出阻塞状态,避免无限等待;
  • 同样响应线程中断,阻塞期间被中断会抛出 InterruptedException 异常。

三、多线程任务汇总示例(主线程等待所有子任务完成)

启动一个主线程,需要等到其他子线程初始化完毕

package com.xiaolyuh;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 5个初始化线程,6个扣除点,初始化完成后业务线程和住线程才能执行
 *
 * @author yuhao.wang3
 * @since 2019/6/26 15:24
 */
public class CountDownLatchTest {
    static final CountDownLatch countDownLatch = new CountDownLatch(6);

    public static void main(String[] args) {
        System.out.println(Thread.currentThread().getName() + "主线程开始......");
        new Thread(new InitJob()).start();
        new Thread(new BusinessWoerk()).start();
        new Thread(new InitJob()).start();
        new Thread(new InitWoerk()).start();
        new Thread(new InitJob()).start();
        new Thread(new InitJob()).start();

        try {
            System.out.println(Thread.currentThread().getName() + "主线程等待初始化线程初始化完成......");
            countDownLatch.await(1, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        sleep(5);
        System.out.println(Thread.currentThread().getName() + "主线程结束......");
    }

    /**
     * 一个线程一个扣减点
     */
    static class InitJob implements Runnable {

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "初始化任务开始。。。。。。");
            try {
                sleep(5);
            } finally {
                countDownLatch.countDown();
            }
            sleep(5);
            System.out.println(Thread.currentThread().getName() + "初始化任务完毕后,处理业务逻辑。。。。。。");
        }
    }

    /**
     * 一个线程两个扣减点
     */
    static class InitWoerk implements Runnable {

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "初始化工作开始第一步》》》》》");
            try {
                sleep(5);
            } finally {
                countDownLatch.countDown();
            }
            System.out.println(Thread.currentThread().getName() + "初始化工作开始第二步》》》》》");
            sleep(5);
            countDownLatch.countDown();
            System.out.println(Thread.currentThread().getName() + "初始化工作处理业务逻辑》》》》》");
        }
    }


    /**
     * 业务线程
     */
    static class BusinessWoerk implements Runnable {

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + "初始化线程还未完成,业务线程阻塞----------");
                countDownLatch.await(1, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "初始化工作完成业务线程开始工作----------");
            System.out.println(Thread.currentThread().getName() + "初始化工作完成业务线程开始工作----------");
            System.out.println(Thread.currentThread().getName() + "初始化工作完成业务线程开始工作----------");
            System.out.println(Thread.currentThread().getName() + "初始化工作完成业务线程开始工作----------");
        }
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

输出结果:

main主线程开始......
main主线程等待初始化线程初始化完成......
Thread-0初始化任务开始。。。。。。
Thread-1初始化线程还未完成,业务线程阻塞----------
Thread-2初始化任务开始。。。。。。
Thread-3初始化工作开始第一步》》》》》
Thread-4初始化任务开始。。。。。。
Thread-5初始化任务开始。。。。。。
Thread-3初始化工作开始第二步》》》》》
Thread-0初始化任务完毕后,处理业务逻辑。。。。。。
Thread-3初始化工作处理业务逻辑》》》》》
Thread-2初始化任务完毕后,处理业务逻辑。。。。。。
Thread-4初始化任务完毕后,处理业务逻辑。。。。。。
Thread-5初始化任务完毕后,处理业务逻辑。。。。。。
Thread-1初始化工作完成业务线程开始工作----------
Thread-1初始化工作完成业务线程开始工作----------
Thread-1初始化工作完成业务线程开始工作----------
Thread-1初始化工作完成业务线程开始工作----------
main主线程结束......

四、注意事项

  • 合理设置计数器初始值:初始值需与待完成的任务数(或操作数)一致,若设置过小,会导致等待线程提前唤醒;若设置过大,会导致等待线程无限阻塞(除非使用带超时的 await() 方法)。
  • 注意异常场景下的计数器递减:任务线程若在执行过程中抛出异常,可能导致 countDown() 方法无法被调用,进而使计数器无法归 0。建议在任务线程的 finally 代码块中调用 countDown(),确保无论任务正常完成还是异常终止,计数器都会被递减。
  • CountDownLatch 不可复用:计数器归 0 后,无法再次重置计数器值,若需要重复实现线程同步逻辑,可使用 CyclicBarrier 替代。
  • 妥善处理中断异常await() 方法会响应线程中断,使用时需捕获 InterruptedException 异常,并在异常处理逻辑中做相应的资源释放或业务补偿操作,避免程序出现未知问题。
  • 避免长时间阻塞:若无特殊业务需求,建议优先使用带超时的 await(long timeout, TimeUnit unit) 方法,设置合理的超时时间,防止因任务线程异常导致等待线程永久阻塞。

总结

  • CountDownLatch 是 Java 并发包中的 “一次性” 同步工具,用于实现 “一个 / 多个线程等待其他线程完成操作” 的场景;
  • 核心基于递减计数器工作,流程为 “初始化计数器→任务线程递减计数器→等待线程阻塞与唤醒”;
  • 核心方法包括 countDown()(递减计数器,不阻塞)、await()(无超时等待)、await(long, TimeUnit)(带超时等待);
  • 典型应用于多任务汇总、并发测试、服务启动校验,使用时需注意计数器初始值设置、异常场景处理及避免无限阻塞。

源码

https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

spring-boot-student-concurrent 工程

layering-cache

为监控而生的多级缓存框架 layering-cache这是我开源的一个多级缓存框架的实现,如果有兴趣可以看一下

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐