在这里插入图片描述

👋 大家好,欢迎来到我的技术博客!
💻 作为一名热爱 Java 与软件开发的程序员,我始终相信:清晰的逻辑 + 持续的积累 = 稳健的成长
📚 在这里,我会分享学习笔记、实战经验与技术思考,力求用简单的方式讲清楚复杂的问题。
🎯 本文将围绕一个常见的开发话题展开,希望能为你带来一些启发或实用的参考。
🌱 无论你是刚入门的新手,还是正在进阶的开发者,希望你都能有所收获!


Java 并发:CountDownLatch 与 CyclicBarrier 实战对比

在Java并发编程的广阔天地中,java.util.concurrent 包为我们提供了丰富的工具来协调多线程的执行。其中,CountDownLatchCyclicBarrier 是两个看似功能相似但设计意图和应用场景截然不同的同步辅助类。它们都能让一组线程等待某个条件达成,但其背后的哲学、生命周期和使用模式却大相径庭。

本文将通过大量贴近实际的代码示例、直观的图解以及性能对比,深入剖析 CountDownLatchCyclicBarrier 的核心机制、适用场景和最佳实践。无论你是想实现高效的并行计算、协调复杂的启动流程,还是设计高并发的测试框架,掌握这两者的区别与联系都将让你的并发代码更加健壮、高效且易于维护。


从一个故事开始:赛跑与会议

想象一下两种不同的场景:

  1. 赛跑(Race):10名运动员站在起跑线上,发令枪响后,所有人同时起跑。裁判需要等待所有运动员都冲过终点线后,才能宣布比赛结束并统计成绩。重点在于“等待所有任务完成”

  2. 会议(Meeting):10位团队成员约定在会议室开会。每个人都可能因交通状况不同而迟到。会议不能开始,直到最后一位成员到达。一旦会议结束,大家散会。过几天,他们又要开另一次会,流程重复。重点在于“等待所有参与者就位”

CountDownLatch 更像第一种场景——它是一个一次性倒计时门闩。而 CyclicBarrier 则像第二种场景——它是一个可循环使用的屏障。理解这个根本差异是掌握两者的关键。


CountDownLatch:一次性的倒计时门闩

CountDownLatch 是一个同步辅助类,它允许一个或多个线程等待,直到一组在其他线程中执行的操作完成。

核心机制

  • 计数器(Count):在创建 CountDownLatch 时,指定一个正整数作为计数器。
  • await():调用此方法的线程会被阻塞,直到计数器归零。
  • countDown():每次调用此方法,计数器减一。当计数器减到零时,所有因 await() 而阻塞的线程会被唤醒。

生命周期:一旦计数器归零,CountDownLatch不可再用。试图再次调用 await() 会立即返回,countDown() 也不会再有影响。

代码示例:并行计算

假设我们需要计算一个大型数组的总和,可以将数组分片,由多个线程并行计算各部分的和,最后汇总。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ParallelSumWithCountDownLatch {
    private static final int ARRAY_SIZE = 10_000_000;
    private static final int NUM_THREADS = 4;
    private static final int[] array = new int[ARRAY_SIZE];
    private static final long[] partialSums = new long[NUM_THREADS];

    static {
        // 初始化数组
        for (int i = 0; i < ARRAY_SIZE; i++) {
            array[i] = i + 1;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(NUM_THREADS);
        ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);

        int chunkSize = ARRAY_SIZE / NUM_THREADS;
        long startTime = System.currentTimeMillis();

        for (int i = 0; i < NUM_THREADS; i++) {
            final int threadId = i;
            final int start = i * chunkSize;
            final int end = (i == NUM_THREADS - 1) ? ARRAY_SIZE : start + chunkSize;

            executor.submit(() -> {
                try {
                    long sum = 0;
                    for (int j = start; j < end; j++) {
                        sum += array[j];
                    }
                    partialSums[threadId] = sum;
                    System.out.println("Thread " + threadId + " finished, partial sum: " + sum);
                } finally {
                    latch.countDown(); // 任务完成,计数器减一
                }
            });
        }

        // 主线程等待所有计算线程完成
        latch.await();
        executor.shutdown();

        // 汇总结果
        long totalSum = 0;
        for (long partialSum : partialSums) {
            totalSum += partialSum;
        }

        long endTime = System.currentTimeMillis();
        System.out.println("Total sum: " + totalSum);
        System.out.println("Calculation took: " + (endTime - startTime) + " ms");
    }
}

在这个例子中:

  • CountDownLatch latch = new CountDownLatch(4) 创建了一个计数器为4的门闩。
  • 每个计算线程在完成任务后调用 latch.countDown()
  • 主线程调用 latch.await() 被阻塞,直到所有4个线程都调用了 countDown(),计数器归零,主线程被唤醒,继续执行汇总操作。

图解 CountDownLatch

主线程:
    |
    |--- await() [阻塞]
    |                       latch.countDown()
Worker 1: ---|==============> (计数器: 4->3)
             |
             |                       latch.countDown()
Worker 2: ---|==============> (计数器: 3->2)
             |
             |                       latch.countDown()
Worker 3: ---|==============> (计数器: 2->1)
             |
             |                       latch.countDown()
Worker 4: ---|==============> (计数器: 1->0)
             |
             |--- await() [唤醒,继续执行]

CyclicBarrier:可循环的同步屏障

CyclicBarrier 也是一个同步辅助类,它允许一组线程相互等待,直到所有线程都到达某个公共的屏障点(barrier point)。

核心机制

  • 屏障点(Barrier Point):所有线程必须调用 await() 方法到达这个点。
  • 屏障阈值(Parties):创建 CyclicBarrier 时指定需要等待的线程数量。
  • await():线程调用此方法后会被阻塞,直到所有线程都调用了 await()
  • 可循环性:一旦所有线程都到达屏障点,屏障就会自动重置,可以被重复使用。

生命周期CyclicBarrier可重复使用的。每次所有线程都到达屏障点后,它可以被重置并再次使用。

代码示例:多阶段并行任务

设想一个科学计算任务,分为三个阶段:数据加载、数据处理、结果汇总。每个阶段都需要所有工作线程协同完成,且阶段之间有严格的顺序依赖。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MultiStageTaskWithCyclicBarrier {
    private static final int NUM_THREADS = 3;
    private static final CyclicBarrier barrier = new CyclicBarrier(NUM_THREADS);

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);

        for (int i = 0; i < NUM_THREADS; i++) {
            final int workerId = i;
            executor.submit(() -> {
                try {
                    // 阶段1:数据加载
                    System.out.println("Worker " + workerId + " loading data...");
                    Thread.sleep((long) (Math.random() * 2000)); // 模拟耗时
                    System.out.println("Worker " + workerId + " finished loading.");
                    barrier.await(); // 等待所有线程完成加载

                    // 阶段2:数据处理
                    System.out.println("Worker " + workerId + " processing data...");
                    Thread.sleep((long) (Math.random() * 1500));
                    System.out.println("Worker " + workerId + " finished processing.");
                    barrier.await(); // 等待所有线程完成处理

                    // 阶段3:结果汇总(假设只有主线程做)
                    if (workerId == 0) {
                        System.out.println("Master worker aggregating results...");
                        Thread.sleep(1000);
                        System.out.println("Results aggregated.");
                    }
                    barrier.await(); // 等待汇总完成(其他线程也需到达)

                    System.out.println("Worker " + workerId + " task completed.");

                } catch (InterruptedException | BrokenBarrierException e) {
                    Thread.currentThread().interrupt();
                    System.err.println("Worker " + workerId + " interrupted: " + e.getMessage());
                }
            });
        }

        executor.shutdown();
    }
}

在这个例子中:

  • CyclicBarrier barrier = new CyclicBarrier(3) 创建了一个需要3个线程共同到达的屏障。
  • 在每个阶段结束时,所有线程调用 barrier.await()
  • 只有当3个线程都调用了 await(),它们才会同时从该方法返回,继续执行下一阶段。
  • 屏障在每个 await() 后自动重置,因此可以用于多个阶段。

图解 CyclicBarrier

Worker 1: ---|=====> await() [阻塞] ---|=====> await() [阻塞] ---|=====> ...
            |                        |                        |
Worker 2: ---|=====> await() [阻塞] ---|=====> await() [阻塞] ---|=====> ...
            |                        |                        |
Worker 3: ---|=====> await() [阻塞] ---|=====> await() [阻塞] ---|=====> ...
            |                        |                        |
            |<-- All arrived,       |<-- All arrived,       |<-- All arrived,
            |    continue           |    continue           |    continue

关键差异对比

尽管 CountDownLatchCyclicBarrier 都用于线程同步,但它们在设计和使用上存在根本性差异。

特性 CountDownLatch CyclicBarrier
计数方向 倒计时(递减) 等待固定数量到达
可重用性 不可重用,计数归零后失效 可重用,屏障可自动重置
等待主体 一个或多个线程等待其他线程完成任务 一组线程相互等待
典型场景 主线程等待多个工作线程完成 多个线程协同完成多阶段任务
异常处理 await() 可能抛出 InterruptedException await() 可能抛出 InterruptedExceptionBrokenBarrierException
构造函数 CountDownLatch(int count) CyclicBarrier(int parties)CyclicBarrier(int parties, Runnable barrierAction)

一个重要的构造函数:CyclicBarrierbarrierAction

CyclicBarrier 提供了一个强大的构造函数,允许在所有线程到达屏障点后,自动执行一个指定的 Runnable。这个动作在最后一个到达的线程的上下文中执行。

CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    System.out.println("All workers have reached the barrier. Performing cleanup...");
    // 执行一些清理或汇总操作
});

这个特性在 CountDownLatch 中是无法直接实现的。


性能考量与最佳实践

1. 选择合适的工具

  • 使用 CountDownLatch

    • 你需要一个线程(通常是主线程)等待多个后台任务完成。
    • 任务是一次性的,不需要重复。
    • 例如:启动服务等待多个组件初始化完成;并行计算后汇总结果。
  • 使用 CyclicBarrier

    • 多个线程需要相互协调,分阶段执行任务。
    • 同步点需要重复使用多次。
    • 你需要在所有线程到达后执行一个特定动作(利用 barrierAction)。
    • 例如:多线程模拟(所有线程同时开始);分阶段批处理任务。

2. 异常处理

两者都可能抛出 InterruptedException。在 CyclicBarrier 中,如果某个线程在等待时被中断,或者屏障被破坏(broken),后续所有调用 await() 的线程都会抛出 BrokenBarrierException

try {
    barrier.await();
} catch (InterruptedException e) {
    Thread.currentThread().interrupt(); // 恢复中断状态
    // 处理中断
} catch (BrokenBarrierException e) {
    // 屏障已破坏,可能需要清理并退出
    break;
}

3. 避免死锁

确保 countDown()await() 被正确调用。如果某个线程忘记调用 countDown()CountDownLatch 将永远无法归零,导致死锁。

4. 性能对比

在大多数场景下,两者的性能差异微乎其微,因为它们都是基于 AbstractQueuedSynchronizer (AQS) 实现的。选择哪个主要取决于语义和功能需求,而非性能。


实际应用:并发测试框架

我们可以利用 CyclicBarrier 来构建一个简单的并发测试框架,确保所有测试线程在同一时刻开始执行,以获得更准确的性能数据。

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class ConcurrentTestFramework {
    private static final int NUM_THREADS = 10;
    private static final CyclicBarrier startBarrier = new CyclicBarrier(NUM_THREADS);
    private static final CyclicBarrier endBarrier = new CyclicBarrier(NUM_THREADS + 1); // +1 for main thread
    private static final AtomicInteger successCount = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);

        long startTime = System.currentTimeMillis();

        for (int i = 0; i < NUM_THREADS; i++) {
            executor.submit(() -> {
                try {
                    startBarrier.await(); // 所有线程在此等待,确保同时开始

                    // 模拟并发操作,如调用API、访问数据库等
                    boolean success = performTask();
                    if (success) {
                        successCount.incrementAndGet();
                    }

                    endBarrier.await(); // 等待所有线程完成

                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }

        endBarrier.await(); // 主线程也等待,直到所有工作线程完成
        long endTime = System.currentTimeMillis();

        executor.shutdown();

        System.out.println("Test completed in " + (endTime - startTime) + " ms");
        System.out.println("Success count: " + successCount.get());
    }

    private static boolean performTask() {
        // 模拟一个可能成功或失败的操作
        try {
            Thread.sleep(100);
            return Math.random() > 0.1; // 90% 成功率
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }
}

在这个框架中:

  • startBarrier 确保所有线程同时开始 performTask()
  • endBarrier 让主线程等待所有测试线程完成,以便准确计算总耗时。
  • 这种精确的同步对于性能测试至关重要。

替代方案与演进

随着Java并发库的演进,一些新的工具可以替代或补充 CountDownLatchCyclicBarrier

1. CompletableFuture

对于复杂的异步任务编排,CompletableFuture 提供了更强大的函数式编程接口。

CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> task1());
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> task2());

// 等待所有任务完成
CompletableFuture.allOf(future1, future2)
                 .thenRun(() -> System.out.println("All tasks completed"));

2. Phaser

Phaser 是一个更灵活的同步屏障,可以动态地注册和注销参与者,支持分层屏障,是 CyclicBarrierCountDownLatch 的超集。

Phaser phaser = new Phaser(3); // 初始3个参与者
// phaser.register() / phaser.arriveAndDeregister() 动态调整
phaser.arriveAndAwaitAdvance(); // 等价于 await()

总结

CountDownLatchCyclicBarrier 是Java并发编程中不可或缺的工具。它们的区别可以简单概括为:

  • CountDownLatch“我等你们做完” —— 一个或多个线程等待一组操作完成,一次性使用。
  • CyclicBarrier“我们一起走” —— 一组线程相互等待,到达共同点后一起继续,可循环使用。

通过本文的深入探讨和丰富示例,你应该已经掌握了如何根据具体需求选择合适的工具。记住,正确的工具选择不仅能解决同步问题,还能让代码更加清晰、健壮和易于维护。

在实际开发中,结合 ExecutorServiceFutureCompletableFuture 等工具,你可以构建出高效、可靠的并发应用程序。

参考资料


🙌 感谢你读到这里!
🔍 技术之路没有捷径,但每一次阅读、思考和实践,都在悄悄拉近你与目标的距离。
💡 如果本文对你有帮助,不妨 👍 点赞、📌 收藏、📤 分享 给更多需要的朋友!
💬 欢迎在评论区留下你的想法、疑问或建议,我会一一回复,我们一起交流、共同成长 🌿
🔔 关注我,不错过下一篇干货!我们下期再见!✨

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐