java-BlockingQueue、CountDownLatch讲解
恢复线程的中断状态它用在里,防止中断信息丢失这是处理线程中断的最佳实践CountDownLatch = 倒计时器创建时指定初始值(等待的线程个数)每个工作线程完成后调用主线程调用await()阻塞等待当倒计时变成 0 时,主线程被唤醒这样可以保证主线程一定要等所有工作线程完成后才继续。
一、BlockingQueue的基本用法
1-1、什么是 BlockingQueue?
想象你在一个饭店排队买饭。BlockingQueue 就像这个队列一样,但它有个特殊功能:如果队伍满了,后来的人会自动等待;如果队伍空了,取餐的人会自动等待。这就是 BlockingQueue 的核心思想。
1-2、为什么需要它?
在多线程程序中,经常需要一个线程生产数据,另一个线程消费数据。BlockingQueue 就是为了让这两个线程安全、方便地交互。
比如:
- 线程A不断生产任务,放入队列
- 线程B从队列取任务,然后处理
- 如果队列满了,线程A自动等待
- 如果队列空了,线程B自动等待
这样两个线程就协调得很好。
1-3、常用操作
BlockingQueue 有四种主要的操作方式:
1. 添加元素:
add(E)- 队列满了会报错put(E)- 队列满了会等待offer(E)- 队列满了会返回 false
2. 移除元素:
remove()- 队列空了会报错take()- 队列空了会等待poll()- 队列空了会返回 null
1-4、简单例子
BlockingQueue<String> queue = new LinkedBlockingQueue<>(3); // 容量为3
// 线程1:生产者
new Thread(() -> {
try {
queue.put("任务1");
queue.put("任务2");
queue.put("任务3");
System.out.println("任务都放入了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 线程2:消费者
new Thread(() -> {
try {
String task = queue.take();
System.out.println("处理: " + task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
1-5、实际场景
生产者-消费者问题: 假如你有一条流水线,工人A负责制造零件,工人B负责组装。BlockingQueue 就像中间的储存架子,A 把零件放上去,B 从上面取。如果架子满了,A 就等待;如果架子空了,B 就等待。
1-6、常见类型
LinkedBlockingQueue- 用链表实现,性能一般ArrayBlockingQueue- 用数组实现,性能好PriorityBlockingQueue- 带优先级的队列
简单来说,BlockingQueue = 线程安全的队列 + 自动等待机制。它让多线程协作变得简单安全。
1-7、测试代码
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.CountDownLatch;
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
// 创建一个容量为10的队列
BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
// 用于同步,等待所有线程完成
CountDownLatch latch = new CountDownLatch(6); // 5个生产者 + 1个消费者
System.out.println("===== 开始测试 BlockingQueue 线程安全性 =====\n");
// 创建5个生产者线程
for (int i = 1; i <= 5; i++) {
final int producerId = i;
new Thread(() -> {
try {
for (int j = 1; j <= 20; j++) {
String item = "生产者" + producerId + "-任务" + j;
queue.put(item);
System.out.println("[生产] " + item + " | 队列大小: " + queue.size());
Thread.sleep(10); // 模拟生产耗时
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
}, "Producer-" + i).start();
}
// 创建1个消费者线程
new Thread(() -> {
try {
for (int i = 0; i < 100; i++) { // 消费100个任务
String item = queue.take();
System.out.println("[消费] " + item + " | 队列大小: " + queue.size());
Thread.sleep(50); // 模拟消费耗时
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
}, "Consumer-1").start();
// 等待所有线程完成
latch.await();
System.out.println("\n===== 测试完成 =====");
System.out.println("最终队列大小: " + queue.size());
System.out.println("如果没有异常,说明 BlockingQueue 是线程安全的!");
}
}
这段代码测试了什么?
- 5个生产者线程 - 同时生产任务,每个生产 20 个
- 1个消费者线程 - 消费所有的任务
- CountDownLatch - 等待所有线程完成
二、解释:Thread.currentThread().interrupt();
以上测试代码中有: Thread.currentThread().interrupt() 这行代码。这是什么意思呢?
2-1、它是干什么的?
Thread.currentThread().interrupt() 的作用是向当前线程发送一个中断信号。
2-2、中断是什么?
中断不是强制杀死线程,而是一种礼貌的通知机制。它告诉线程:"嘿,有人希望你停止工作了"。线程可以选择听或不听这个通知。
2-3、代码中的场景
在我之前的测试代码中,这行代码出现在 catch 块里:
try {
// ... 代码执行
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 这里
}
2-4、为什么要这样做?
当你调用 queue.take() 或 queue.put() 时,如果线程被中断了,会抛出 InterruptedException 异常。这时有三种处理方式:
方式1:忽略异常(不推荐)
try {
queue.take();
} catch (InterruptedException e) {
// 什么都不做,异常就丢失了
}
问题:中断信号丢失了,外层代码不知道发生过中断。
方式2:重新中断线程(推荐)
try {
queue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
}
这样做是为了"保留"中断信息,让外层代码知道这个线程曾经被中断过。
方式3:直接抛出异常(也可以)
try {
queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e); // 让外层处理
}
2-5、一个生动的比喻
想象你在专心工作,老板过来轻轻拍你肩膀说"我需要你停下来"。这就是中断。
- 如果你直接说"好的,我停",这是正常响应中断
- 如果你继续工作,装作没听到,这是忽略中断
- 如果你之后主动告诉别人"老板曾经中断过我",这是保留中断状态
2-6、实际例子
public class InterruptExample {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
try {
System.out.println("线程开始睡眠...");
Thread.sleep(10000); // 睡眠10秒
} catch (InterruptedException e) {
System.out.println("我被中断了!");
Thread.currentThread().interrupt(); // 恢复中断状态
}
// 检查中断状态
if (Thread.currentThread().isInterrupted()) {
System.out.println("中断状态为真,线程即将退出");
}
});
thread.start();
Thread.sleep(1000); // 等待1秒
thread.interrupt(); // 中断线程
}
}
运行结果:
线程开始睡眠...
我被中断了!
中断状态为真,线程即将退出
2-7、总结
Thread.currentThread().interrupt()= 恢复线程的中断状态- 它用在
catch(InterruptedException)里,防止中断信息丢失 - 这是处理线程中断的最佳实践
三、CountDownLatch工具
3-1、它是干什么的?
CountDownLatch 是一个倒计时器。它让一个线程等待其他多个线程完成后,再继续执行。
3-2、生动比喻
想象你是一个班级的班长,要组织全班去春游:
- 班长说:"我们等所有人都到达集合点后,再一起出发"
- 班长创建一个倒计时器,数字设为 50(班级有 50 人)
- 每当一个学生到达,倒计时器就减 1
- 当倒计时器变成 0 时,班长就知道所有人都到了,可以出发
3-3、代码中的含义
CountDownLatch latch = new CountDownLatch(6);
这行代码的意思是:创建一个倒计时器,初始值为 6
为什么是 6?因为你有:
- 5 个生产者线程
- 1 个消费者线程
- 总共 6 个线程
3-4、三个关键方法
1. countDown() - 倒计时减 1
latch.countDown(); // 倒计时从 6 变成 5
2. await() - 等待倒计时到 0
latch.await(); // 阻塞当前线程,直到倒计时变成 0
3. getCount() - 查看当前倒计时值
System.out.println(latch.getCount()); // 打印剩余的数字
3-5、代码执行流程
CountDownLatch latch = new CountDownLatch(6); // 倒计时: 6
// 启动6个线程...
latch.await(); // main线程阻塞,等待这里
// 等等等等...
// 当6个线程都执行了 latch.countDown() 时
// 倒计时变成 0,main线程才会继续
System.out.println("所有线程都完成了!");
3-6、测试代码
// 创建的6个线程
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
// ... 生产任务
} finally {
latch.countDown(); // 线程完成时,倒计时减1
}
}).start();
}
new Thread(() -> {
try {
// ... 消费任务
} finally {
latch.countDown(); // 线程完成时,倒计时减1
}
}).start();
latch.await(); // main线程在这里等待,直到倒计时变成0
System.out.println("===== 测试完成 ====="); // 所有线程都完成后才执行
3-7、没有 CountDownLatch 会怎样?
// 不用 CountDownLatch
for (int i = 0; i < 6; i++) {
new Thread(() -> {
// ... 做工作
}).start();
}
System.out.println("===== 测试完成 ====="); // 👈 这会立即执行!
// 此时工作线程可能还没完成
这样的话,工作线程还在忙的时候,main 线程就已经打印"测试完成"了。很不安全。
3-8、实际例子
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
System.out.println("班长:请所有同学到集合点集合...\n");
CountDownLatch latch = new CountDownLatch(5); // 5个学生
// 5个学生线程
for (int i = 1; i <= 5; i++) {
final int studentId = i;
new Thread(() -> {
try {
// 学生去不同的地方,耗时不同
long time = (long)(Math.random() * 3000) + 1000;
System.out.println("学生" + studentId + ":我在外面,需要" + time + "ms才能到达");
Thread.sleep(time);
System.out.println("学生" + studentId + ":我到达了!");
latch.countDown(); // 倒计时减1
System.out.println("【还剩" + latch.getCount() + "个学生未到达】");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Student-" + i).start();
}
System.out.println("班长:我在集合点等待...\n");
latch.await(); // 班长在这里等待,阻塞
System.out.println("\n班长:太好了!所有同学都到了,我们出发去春游!");
}
}
运行这个代码,你会看到:
班长:请所有同学到集合点集合...
学生1:我在外面,需要1234ms才能到达
学生2:我在外面,需要2567ms才能到达
...
班长:我在集合点等待...
学生1:我到达了!
【还剩4个学生未到达】
学生3:我到达了!
【还剩3个学生未到达】
...
班长:太好了!所有同学都到了,我们出发去春游!
3-9、总结
- CountDownLatch = 倒计时器
- 创建时指定初始值(等待的线程个数)
- 每个工作线程完成后调用
countDown() - 主线程调用
await()阻塞等待 - 当倒计时变成 0 时,主线程被唤醒
这样可以保证主线程一定要等所有工作线程完成后才继续。
更多推荐
所有评论(0)