1 概述

  • CyclicBarrierCountDownLatchSemaphore不同:

    • CountDownLatchSemaphore直接基于 AQS(AbstractQueuedSynchronizer) 实现;

    • CyclicBarrier基于ReentrantLock + ConditionObject实现,间接依赖 AQS(因为 ReentrantLock 底层基于 AQS);

  • 内部结构

    • Generation 静态内部类。它是 CyclicBarrier 实现“循环复用”和“中断/异常处理”的核心:

      • 持有布尔属性 broken,默认 false

      • 当调用 reset() 方法、执行出现异常中断时调用 breakBarrier()broken 会被设为 true

    • 核心属性

      • 计数器:用于记录等待“凑齐一组线程”的进度;

      • generation 属性:关联当前的 Generation 实例,控制屏障的“代”切换;

    • breakBarrier() 方法。当任务执行中断异常或调用 reset() 时触发:

      • 将当前 Generationbroken 设为 true

      • ConditionObjectWaiter 队列中等待的线程,转移到 AQS 队列;

      • 执行 unlock 后,唤醒 AQS 队列中挂起的线程,让它们感知“屏障已破”;

    • await() 方法。这是 CyclicBarrier核心方法

      • 调用时会对“计数器”进行递减处理,直到计数器归 0,所有等待的线程才会一起继续执行;

      • 过程中会处理线程的等待、唤醒,以及 Generation 的切换逻辑;

    在这里插入图片描述

2 构造函数

  • CyclicBarrier 有两个构造函数,最终都调用 CyclicBarrier(int parties, Runnable barrierAction) 这个核心构造函数:

    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    
    public CyclicBarrier(int parties, Runnable barrierAction) {
        // 参数合法性校验
        if (parties <= 0) throw new IllegalArgumentException();
        // final修饰,所有线程执行完成归为或重置时使用
        this.parties = parties;
        // 在await方法中计数值,表示还有多少线程待执行await
        this.count = parties;
        // 当计数count为0时 ,执行此Runnnable,再唤醒被阻塞的线程
        this.barrierCommand = barrierAction;
    }
    
    • parties:表示需要“凑齐”的线程数量,只有当这么多线程都调用 await() 后,屏障才会放行。若 parties ≤ 0,会直接抛出 IllegalArgumentException 异常;

    • count:是一个计数器,初始值等于 parties。每次有线程调用 await()count 就减 1,直到 count = 0 时触发屏障放行逻辑;

    • parties(实例属性):用 final 修饰,是 CyclicBarrier 复用(重置)时的基准线程数;

    • barrierAction:是一个 Runnable 任务,当所有线程凑齐(count 减到 0)时,会优先执行这个任务,然后再唤醒所有等待的线程。若不需要回调任务,可传 null

  • 它的核心作用是初始化“线程凑齐的门槛”和“凑齐后的回调逻辑”

    • 先校验 parties 的合法性,保证必须有正整数个线程参与;

    • 初始化 countparties,用于后续 await() 方法中跟踪线程凑齐的进度;

    • 初始化 barrierCommand(即 barrierAction),指定所有线程凑齐后要执行的回调任务。

3 await()

  • await() 有两个重载版本,用于支持“无限等待”和“超时等待”两种场景:

    • await():线程会一直等待,直到指定数量的线程都调用 await() 才继续执行;若等待过程中屏障被中断、异常破坏,会抛出 InterruptedExceptionBrokenBarrierException

      // 执行没有超时时间的await
      public int await() throws InterruptedException, BrokenBarrierException {
          try {
              // 执行dowait()
              return dowait(false, 0L);
          } catch (TimeoutException toe) {
              throw new Error(toe);
          }
      }
      
    • await(long timeout, TimeUnit unit):线程最多等待 timeout 时间,若指定时间内线程未凑齐,会抛出 TimeoutException;同时也会处理中断、屏障破坏的情况;

      // 执行有超时时间的await
      public int await(long timeout, TimeUnit unit)
          throws InterruptedException,
                 BrokenBarrierException,
                 TimeoutException {
          return dowait(true, unit.toNanos(timeout));
      }
      
  • await() 最终都调用 dowait() 方法,其流程可分为以下关键步骤:

    private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
        // 获取锁
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lock();
        try {
            // 获取generation对象(屏障的“代”)
            final Generation g = generation;
    
            // 若 Generation 的 broken 为 true(屏障已破坏),直接抛出 BrokenBarrierException
            // 屏障已破坏,即线程中在执行过程中是否异常、超时、中断、重置
            if (g.broken)
                throw new BrokenBarrierException();
    
            // 若当前线程被中断
            if (Thread.interrupted()) {
                breakBarrier(); // 调用breakBarrier()标记屏障为“破坏”状态,将等待线程转移到 AQS 队列
                throw new InterruptedException(); // 并抛出 InterruptedException
            }
    
            // 执行 --count,得到当前线程在“凑齐序列”中的索引 index
            int index = --count;
            // 若所有线程已凑齐
            if (index == 0) {
                // 执行结果标识
                boolean ranAction = false;
                try {
                    // 执行构造函数中传入的 barrierAction(若不为 null),barrierCommand 即 barrierAction
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    // 执行完成,将执行结果设置为true
                    ranAction = true;
                    nextGeneration(); // 调用nextGeneration()重置计数器和Generation,实现屏障的“循环复用”
                    return 0; // 返回 0,表示当前线程是最后一个凑齐的线程
                } finally {
                    // 执行过程中出现问题
                    if (!ranAction)
                        // 重置标识与计数值,将Waiter队列中的线程转移到AQS队列
                        breakBarrier();
                }
            }
    
            // 若线程未凑齐(index != 0),进入循环挂起逻辑(自旋):
            for (;;) {
                try {
                    // 无超时场景(timed = false)
                    if (!timed)
                        trip.await(); // 调用 Condition.await(),将线程挂起在 Condition 队列
                    // 有超时场景(timed = true)
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos); // 调用Condition.awaitNanos(nanos),线程最多挂起nanos纳秒,返回剩余等待时间
                } catch (InterruptedException ie) { // 过程中若被中断、屏障破坏或超时,会触发对应异常
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
    
                // 该组线程被中断、执行异常、超时,抛出BrokenBarrierException异常
                if (g.broken)
                    throw new BrokenBarrierException();
    
                // g 是线程进入 dowait() 时获取的 “当前代”(Generation 实例)
                // generation 是 CyclicBarrier 的全局属性,表示 “最新代”
                // 若“当前代”与“最新代”不一致,说明在该线程等待期间,屏障已经通过nextGeneration()切换到了新的 “代”(可能是因为之前的线程已凑齐并重置了屏障)
                // 此时当前线程的等待已经“过期”,无需继续处理,直接返回 index 即可
                if (g != generation)
                    return index;
    
                // 超时,抛出异常TimeoutException
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            // 无论等待是否成功,最终都会释放 ReentrantLock,保证锁资源的回收
            lock.unlock();
        }
    }
    

4 breakBarrier()

  • breakBarrier()CyclicBarrier 中用于主动“破坏屏障”的核心方法。当线程执行出现中断异常或调用 reset() 时,会触发该方法,让所有等待的线程退出等待状态;

    // 结束CyclicBarrier的执行
    private void breakBarrier() {
        // 将当前 Generation 的 broken 属性设为 true,标识该屏障已被破坏。后续线程检查到这个标记时,会抛出 BrokenBarrierException
        generation.broken = true;
        // 把计数器 count 重置为构造函数中指定的 parties(即需要凑齐的线程总数),为下一次屏障复用做准备
        count = parties;
        // trip 是 ConditionObject 实例,调用 signalAll() 会将 Condition 队列中所有等待的线程转移到 AQS 队列
        // 当锁释放(unlock())后,这些线程会被唤醒,从而感知到 “屏障已破坏” 并退出等待
        trip.signalAll();
    }
    

5 reset()

  • reset()CyclicBarrier 用于主动重置屏障状态的方法,让 CyclicBarrier 可以“循环复用”,回到初始状态以支持新的一轮线程凑齐逻辑;

    // 重置CyclicBarrier
    public void reset() {
        // 获取 ReentrantLock 并加锁,确保 reset() 操作的原子性,避免多线程并发修改导致状态混乱
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lock();
        try {
            // 调用 breakBarrier() 方法,标记当前 Generation 为“破坏”状态,重置计数器,并唤醒所有等待的线程,让它们退出当前屏障的等待逻辑
            breakBarrier();
            // 生成新的 Generation 实例,重置计数器 count 为 parties(构造函数中指定的线程总数),让 CyclicBarrier 进入“新的一轮”,可以接收新的线程凑齐请求
            nextGeneration();
        } finally {
            // 无论重置过程是否出现异常,最终都会释放锁,保证锁资源的正确回收
            lock.unlock();
        }
    }
    

6 nextGeneration()

  • CyclicBarrier 实现**“循环复用”**的核心方法,用于在一组线程凑齐后,将屏障状态“归位”,以便支持下一轮线程的凑齐逻辑;
private void nextGeneration() {
    // trip 是 ConditionObject 实例,调用 signalAll() 会将 Condition 队列中所有等待的线程转移到 AQS 队列。当锁释放后,这些线程会被唤醒,继续执行后续逻辑
    trip.signalAll();
    // 将计数器 count 重置为构造函数中指定的 parties(即需要凑齐的线程总数),为下一轮凑齐逻辑准备
    count = parties;
    // 创建新的 Generation 实例,标记为 “新的一代” 屏障,与上一轮的屏障状态隔离
    generation = new Generation();
}

7 总结

  • CyclicBarrier 基于 ReentrantLock + ConditionObject 实现。构造函数必须指定 parties(初始待执行线程数),内部通过 generation 这个带有布尔属性的结构,标记当前屏障执行过程中是否出现超时、异常、中断等情况;

  • 构造函数会把 parties 赋值给计数值 count,每当有一个线程执行 await() 方法,count 就会减 1;

  • 线程凑齐后的执行流程

    • count 减到 0 时,代表所有线程都准备就绪。此时会先判断是否初始化了 barrierCommand(构造函数中传入的 Runnable 任务),如果有则优先执行该任务
    • barrierCommand 执行完成后,会把 Condition 队列(Waiter 队列)中的线程转移到 AQS 队列,执行 unlock 操作后唤醒这些线程;同时将计数值 countgeneration 归位,为下一轮屏障复用做准备。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐