Java高级25-Java 并发:CountDownLatch 与 CyclicBarrier 实战对比
Java并发工具:CountDownLatch与CyclicBarrier对比 摘要 Java并发编程中的CountDownLatch和CyclicBarrier是两种常用的线程同步工具,尽管功能相似但设计理念和应用场景不同。CountDownLatch是一次性的倒计时门闩,适用于主线程等待多个子线程完成任务(如并行计算汇总),通过countDown()递减计数器,await()阻塞直到归零。Cy
👋 大家好,欢迎来到我的技术博客!
💻 作为一名热爱 Java 与软件开发的程序员,我始终相信:清晰的逻辑 + 持续的积累 = 稳健的成长。
📚 在这里,我会分享学习笔记、实战经验与技术思考,力求用简单的方式讲清楚复杂的问题。
🎯 本文将围绕一个常见的开发话题展开,希望能为你带来一些启发或实用的参考。
🌱 无论你是刚入门的新手,还是正在进阶的开发者,希望你都能有所收获!
Java 并发:CountDownLatch 与 CyclicBarrier 实战对比
在Java并发编程的广阔天地中,java.util.concurrent
包为我们提供了丰富的工具来协调多线程的执行。其中,CountDownLatch
和 CyclicBarrier
是两个看似功能相似但设计意图和应用场景截然不同的同步辅助类。它们都能让一组线程等待某个条件达成,但其背后的哲学、生命周期和使用模式却大相径庭。
本文将通过大量贴近实际的代码示例、直观的图解以及性能对比,深入剖析 CountDownLatch
与 CyclicBarrier
的核心机制、适用场景和最佳实践。无论你是想实现高效的并行计算、协调复杂的启动流程,还是设计高并发的测试框架,掌握这两者的区别与联系都将让你的并发代码更加健壮、高效且易于维护。
从一个故事开始:赛跑与会议
想象一下两种不同的场景:
-
赛跑(Race):10名运动员站在起跑线上,发令枪响后,所有人同时起跑。裁判需要等待所有运动员都冲过终点线后,才能宣布比赛结束并统计成绩。重点在于“等待所有任务完成”。
-
会议(Meeting):10位团队成员约定在会议室开会。每个人都可能因交通状况不同而迟到。会议不能开始,直到最后一位成员到达。一旦会议结束,大家散会。过几天,他们又要开另一次会,流程重复。重点在于“等待所有参与者就位”。
CountDownLatch
更像第一种场景——它是一个一次性倒计时门闩。而 CyclicBarrier
则像第二种场景——它是一个可循环使用的屏障。理解这个根本差异是掌握两者的关键。
CountDownLatch:一次性的倒计时门闩
CountDownLatch
是一个同步辅助类,它允许一个或多个线程等待,直到一组在其他线程中执行的操作完成。
核心机制
- 计数器(Count):在创建
CountDownLatch
时,指定一个正整数作为计数器。 - await():调用此方法的线程会被阻塞,直到计数器归零。
- countDown():每次调用此方法,计数器减一。当计数器减到零时,所有因
await()
而阻塞的线程会被唤醒。
生命周期:一旦计数器归零,CountDownLatch
就不可再用。试图再次调用 await()
会立即返回,countDown()
也不会再有影响。
代码示例:并行计算
假设我们需要计算一个大型数组的总和,可以将数组分片,由多个线程并行计算各部分的和,最后汇总。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ParallelSumWithCountDownLatch {
private static final int ARRAY_SIZE = 10_000_000;
private static final int NUM_THREADS = 4;
private static final int[] array = new int[ARRAY_SIZE];
private static final long[] partialSums = new long[NUM_THREADS];
static {
// 初始化数组
for (int i = 0; i < ARRAY_SIZE; i++) {
array[i] = i + 1;
}
}
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(NUM_THREADS);
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
int chunkSize = ARRAY_SIZE / NUM_THREADS;
long startTime = System.currentTimeMillis();
for (int i = 0; i < NUM_THREADS; i++) {
final int threadId = i;
final int start = i * chunkSize;
final int end = (i == NUM_THREADS - 1) ? ARRAY_SIZE : start + chunkSize;
executor.submit(() -> {
try {
long sum = 0;
for (int j = start; j < end; j++) {
sum += array[j];
}
partialSums[threadId] = sum;
System.out.println("Thread " + threadId + " finished, partial sum: " + sum);
} finally {
latch.countDown(); // 任务完成,计数器减一
}
});
}
// 主线程等待所有计算线程完成
latch.await();
executor.shutdown();
// 汇总结果
long totalSum = 0;
for (long partialSum : partialSums) {
totalSum += partialSum;
}
long endTime = System.currentTimeMillis();
System.out.println("Total sum: " + totalSum);
System.out.println("Calculation took: " + (endTime - startTime) + " ms");
}
}
在这个例子中:
CountDownLatch latch = new CountDownLatch(4)
创建了一个计数器为4的门闩。- 每个计算线程在完成任务后调用
latch.countDown()
。 - 主线程调用
latch.await()
被阻塞,直到所有4个线程都调用了countDown()
,计数器归零,主线程被唤醒,继续执行汇总操作。
图解 CountDownLatch
主线程:
|
|--- await() [阻塞]
| latch.countDown()
Worker 1: ---|==============> (计数器: 4->3)
|
| latch.countDown()
Worker 2: ---|==============> (计数器: 3->2)
|
| latch.countDown()
Worker 3: ---|==============> (计数器: 2->1)
|
| latch.countDown()
Worker 4: ---|==============> (计数器: 1->0)
|
|--- await() [唤醒,继续执行]
CyclicBarrier:可循环的同步屏障
CyclicBarrier
也是一个同步辅助类,它允许一组线程相互等待,直到所有线程都到达某个公共的屏障点(barrier point)。
核心机制
- 屏障点(Barrier Point):所有线程必须调用
await()
方法到达这个点。 - 屏障阈值(Parties):创建
CyclicBarrier
时指定需要等待的线程数量。 - await():线程调用此方法后会被阻塞,直到所有线程都调用了
await()
。 - 可循环性:一旦所有线程都到达屏障点,屏障就会自动重置,可以被重复使用。
生命周期:CyclicBarrier
是可重复使用的。每次所有线程都到达屏障点后,它可以被重置并再次使用。
代码示例:多阶段并行任务
设想一个科学计算任务,分为三个阶段:数据加载、数据处理、结果汇总。每个阶段都需要所有工作线程协同完成,且阶段之间有严格的顺序依赖。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MultiStageTaskWithCyclicBarrier {
private static final int NUM_THREADS = 3;
private static final CyclicBarrier barrier = new CyclicBarrier(NUM_THREADS);
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
for (int i = 0; i < NUM_THREADS; i++) {
final int workerId = i;
executor.submit(() -> {
try {
// 阶段1:数据加载
System.out.println("Worker " + workerId + " loading data...");
Thread.sleep((long) (Math.random() * 2000)); // 模拟耗时
System.out.println("Worker " + workerId + " finished loading.");
barrier.await(); // 等待所有线程完成加载
// 阶段2:数据处理
System.out.println("Worker " + workerId + " processing data...");
Thread.sleep((long) (Math.random() * 1500));
System.out.println("Worker " + workerId + " finished processing.");
barrier.await(); // 等待所有线程完成处理
// 阶段3:结果汇总(假设只有主线程做)
if (workerId == 0) {
System.out.println("Master worker aggregating results...");
Thread.sleep(1000);
System.out.println("Results aggregated.");
}
barrier.await(); // 等待汇总完成(其他线程也需到达)
System.out.println("Worker " + workerId + " task completed.");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
System.err.println("Worker " + workerId + " interrupted: " + e.getMessage());
}
});
}
executor.shutdown();
}
}
在这个例子中:
CyclicBarrier barrier = new CyclicBarrier(3)
创建了一个需要3个线程共同到达的屏障。- 在每个阶段结束时,所有线程调用
barrier.await()
。 - 只有当3个线程都调用了
await()
,它们才会同时从该方法返回,继续执行下一阶段。 - 屏障在每个
await()
后自动重置,因此可以用于多个阶段。
图解 CyclicBarrier
Worker 1: ---|=====> await() [阻塞] ---|=====> await() [阻塞] ---|=====> ...
| | |
Worker 2: ---|=====> await() [阻塞] ---|=====> await() [阻塞] ---|=====> ...
| | |
Worker 3: ---|=====> await() [阻塞] ---|=====> await() [阻塞] ---|=====> ...
| | |
|<-- All arrived, |<-- All arrived, |<-- All arrived,
| continue | continue | continue
关键差异对比
尽管 CountDownLatch
和 CyclicBarrier
都用于线程同步,但它们在设计和使用上存在根本性差异。
特性 | CountDownLatch | CyclicBarrier |
---|---|---|
计数方向 | 倒计时(递减) | 等待固定数量到达 |
可重用性 | 不可重用,计数归零后失效 | 可重用,屏障可自动重置 |
等待主体 | 一个或多个线程等待其他线程完成任务 | 一组线程相互等待 |
典型场景 | 主线程等待多个工作线程完成 | 多个线程协同完成多阶段任务 |
异常处理 | await() 可能抛出 InterruptedException |
await() 可能抛出 InterruptedException 和 BrokenBarrierException |
构造函数 | CountDownLatch(int count) |
CyclicBarrier(int parties) 或 CyclicBarrier(int parties, Runnable barrierAction) |
一个重要的构造函数:CyclicBarrier
的 barrierAction
CyclicBarrier
提供了一个强大的构造函数,允许在所有线程到达屏障点后,自动执行一个指定的 Runnable
。这个动作在最后一个到达的线程的上下文中执行。
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("All workers have reached the barrier. Performing cleanup...");
// 执行一些清理或汇总操作
});
这个特性在 CountDownLatch
中是无法直接实现的。
性能考量与最佳实践
1. 选择合适的工具
-
使用
CountDownLatch
当:- 你需要一个线程(通常是主线程)等待多个后台任务完成。
- 任务是一次性的,不需要重复。
- 例如:启动服务等待多个组件初始化完成;并行计算后汇总结果。
-
使用
CyclicBarrier
当:- 多个线程需要相互协调,分阶段执行任务。
- 同步点需要重复使用多次。
- 你需要在所有线程到达后执行一个特定动作(利用
barrierAction
)。 - 例如:多线程模拟(所有线程同时开始);分阶段批处理任务。
2. 异常处理
两者都可能抛出 InterruptedException
。在 CyclicBarrier
中,如果某个线程在等待时被中断,或者屏障被破坏(broken
),后续所有调用 await()
的线程都会抛出 BrokenBarrierException
。
try {
barrier.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
// 处理中断
} catch (BrokenBarrierException e) {
// 屏障已破坏,可能需要清理并退出
break;
}
3. 避免死锁
确保 countDown()
或 await()
被正确调用。如果某个线程忘记调用 countDown()
,CountDownLatch
将永远无法归零,导致死锁。
4. 性能对比
在大多数场景下,两者的性能差异微乎其微,因为它们都是基于 AbstractQueuedSynchronizer
(AQS) 实现的。选择哪个主要取决于语义和功能需求,而非性能。
实际应用:并发测试框架
我们可以利用 CyclicBarrier
来构建一个简单的并发测试框架,确保所有测试线程在同一时刻开始执行,以获得更准确的性能数据。
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class ConcurrentTestFramework {
private static final int NUM_THREADS = 10;
private static final CyclicBarrier startBarrier = new CyclicBarrier(NUM_THREADS);
private static final CyclicBarrier endBarrier = new CyclicBarrier(NUM_THREADS + 1); // +1 for main thread
private static final AtomicInteger successCount = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
long startTime = System.currentTimeMillis();
for (int i = 0; i < NUM_THREADS; i++) {
executor.submit(() -> {
try {
startBarrier.await(); // 所有线程在此等待,确保同时开始
// 模拟并发操作,如调用API、访问数据库等
boolean success = performTask();
if (success) {
successCount.incrementAndGet();
}
endBarrier.await(); // 等待所有线程完成
} catch (Exception e) {
e.printStackTrace();
}
});
}
endBarrier.await(); // 主线程也等待,直到所有工作线程完成
long endTime = System.currentTimeMillis();
executor.shutdown();
System.out.println("Test completed in " + (endTime - startTime) + " ms");
System.out.println("Success count: " + successCount.get());
}
private static boolean performTask() {
// 模拟一个可能成功或失败的操作
try {
Thread.sleep(100);
return Math.random() > 0.1; // 90% 成功率
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
}
在这个框架中:
startBarrier
确保所有线程同时开始performTask()
。endBarrier
让主线程等待所有测试线程完成,以便准确计算总耗时。- 这种精确的同步对于性能测试至关重要。
替代方案与演进
随着Java并发库的演进,一些新的工具可以替代或补充 CountDownLatch
和 CyclicBarrier
。
1. CompletableFuture
对于复杂的异步任务编排,CompletableFuture
提供了更强大的函数式编程接口。
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> task1());
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> task2());
// 等待所有任务完成
CompletableFuture.allOf(future1, future2)
.thenRun(() -> System.out.println("All tasks completed"));
2. Phaser
Phaser
是一个更灵活的同步屏障,可以动态地注册和注销参与者,支持分层屏障,是 CyclicBarrier
和 CountDownLatch
的超集。
Phaser phaser = new Phaser(3); // 初始3个参与者
// phaser.register() / phaser.arriveAndDeregister() 动态调整
phaser.arriveAndAwaitAdvance(); // 等价于 await()
总结
CountDownLatch
和 CyclicBarrier
是Java并发编程中不可或缺的工具。它们的区别可以简单概括为:
CountDownLatch
:“我等你们做完” —— 一个或多个线程等待一组操作完成,一次性使用。CyclicBarrier
:“我们一起走” —— 一组线程相互等待,到达共同点后一起继续,可循环使用。
通过本文的深入探讨和丰富示例,你应该已经掌握了如何根据具体需求选择合适的工具。记住,正确的工具选择不仅能解决同步问题,还能让代码更加清晰、健壮和易于维护。
在实际开发中,结合 ExecutorService
、Future
、CompletableFuture
等工具,你可以构建出高效、可靠的并发应用程序。
参考资料
- Oracle Java Documentation: CountDownLatch
- Oracle Java Documentation: CyclicBarrier
- Java Concurrency in Practice by Brian Goetz
- Baeldung: CountDownLatch vs CyclicBarrier
- Jenkov Java Concurrency Tutorials
🙌 感谢你读到这里!
🔍 技术之路没有捷径,但每一次阅读、思考和实践,都在悄悄拉近你与目标的距离。
💡 如果本文对你有帮助,不妨 👍 点赞、📌 收藏、📤 分享 给更多需要的朋友!
💬 欢迎在评论区留下你的想法、疑问或建议,我会一一回复,我们一起交流、共同成长 🌿
🔔 关注我,不错过下一篇干货!我们下期再见!✨
更多推荐
所有评论(0)