一、BlockingQueue的基本用法

1-1、什么是 BlockingQueue?

想象你在一个饭店排队买饭。BlockingQueue 就像这个队列一样,但它有个特殊功能:如果队伍满了,后来的人会自动等待;如果队伍空了,取餐的人会自动等待。这就是 BlockingQueue 的核心思想。

1-2、为什么需要它?

在多线程程序中,经常需要一个线程生产数据,另一个线程消费数据。BlockingQueue 就是为了让这两个线程安全、方便地交互。

比如:

  • 线程A不断生产任务,放入队列
  • 线程B从队列取任务,然后处理
  • 如果队列满了,线程A自动等待
  • 如果队列空了,线程B自动等待

这样两个线程就协调得很好。

1-3、常用操作

BlockingQueue 有四种主要的操作方式:

1. 添加元素:

  • add(E) - 队列满了会报错
  • put(E) - 队列满了会等待
  • offer(E) - 队列满了会返回 false

2. 移除元素:

  • remove() - 队列空了会报错
  • take() - 队列空了会等待
  • poll() - 队列空了会返回 null

1-4、简单例子

BlockingQueue<String> queue = new LinkedBlockingQueue<>(3); // 容量为3

// 线程1:生产者
new Thread(() -> {
    try {
        queue.put("任务1");
        queue.put("任务2");
        queue.put("任务3");
        System.out.println("任务都放入了");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

// 线程2:消费者
new Thread(() -> {
    try {
        String task = queue.take();
        System.out.println("处理: " + task);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

1-5、实际场景

生产者-消费者问题: 假如你有一条流水线,工人A负责制造零件,工人B负责组装。BlockingQueue 就像中间的储存架子,A 把零件放上去,B 从上面取。如果架子满了,A 就等待;如果架子空了,B 就等待。

1-6、常见类型

  • LinkedBlockingQueue - 用链表实现,性能一般
  • ArrayBlockingQueue - 用数组实现,性能好
  • PriorityBlockingQueue - 带优先级的队列

简单来说,BlockingQueue = 线程安全的队列 + 自动等待机制。它让多线程协作变得简单安全。

1-7、测试代码

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.CountDownLatch;

public class BlockingQueueTest {
    
    public static void main(String[] args) throws InterruptedException {
        // 创建一个容量为10的队列
        BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
        
        // 用于同步,等待所有线程完成
        CountDownLatch latch = new CountDownLatch(6); // 5个生产者 + 1个消费者
        
        System.out.println("===== 开始测试 BlockingQueue 线程安全性 =====\n");
        
        // 创建5个生产者线程
        for (int i = 1; i <= 5; i++) {
            final int producerId = i;
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 20; j++) {
                        String item = "生产者" + producerId + "-任务" + j;
                        queue.put(item);
                        System.out.println("[生产] " + item + " | 队列大小: " + queue.size());
                        Thread.sleep(10); // 模拟生产耗时
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown();
                }
            }, "Producer-" + i).start();
        }
        
        // 创建1个消费者线程
        new Thread(() -> {
            try {
                for (int i = 0; i < 100; i++) { // 消费100个任务
                    String item = queue.take();
                    System.out.println("[消费] " + item + " | 队列大小: " + queue.size());
                    Thread.sleep(50); // 模拟消费耗时
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                latch.countDown();
            }
        }, "Consumer-1").start();
        
        // 等待所有线程完成
        latch.await();
        
        System.out.println("\n===== 测试完成 =====");
        System.out.println("最终队列大小: " + queue.size());
        System.out.println("如果没有异常,说明 BlockingQueue 是线程安全的!");
    }
}

这段代码测试了什么?

  • 5个生产者线程 - 同时生产任务,每个生产 20 个
  • 1个消费者线程 - 消费所有的任务
  • CountDownLatch - 等待所有线程完成

二、解释:Thread.currentThread().interrupt();

以上测试代码中有: Thread.currentThread().interrupt() 这行代码。这是什么意思呢?

2-1、它是干什么的?

Thread.currentThread().interrupt() 的作用是向当前线程发送一个中断信号

2-2、中断是什么?

中断不是强制杀死线程,而是一种礼貌的通知机制。它告诉线程:"嘿,有人希望你停止工作了"。线程可以选择听或不听这个通知。

2-3、代码中的场景

在我之前的测试代码中,这行代码出现在 catch 块里:

try {
    // ... 代码执行
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();  // 这里
}

2-4、为什么要这样做?

当你调用 queue.take()queue.put() 时,如果线程被中断了,会抛出 InterruptedException 异常。这时有三种处理方式:

方式1:忽略异常(不推荐)

try {
    queue.take();
} catch (InterruptedException e) {
    // 什么都不做,异常就丢失了
}

问题:中断信号丢失了,外层代码不知道发生过中断。

方式2:重新中断线程(推荐)

try {
    queue.take();
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();  // 恢复中断状态
}

这样做是为了"保留"中断信息,让外层代码知道这个线程曾经被中断过。

方式3:直接抛出异常(也可以)

try {
    queue.take();
} catch (InterruptedException e) {
    throw new RuntimeException(e);  // 让外层处理
}

2-5、一个生动的比喻

想象你在专心工作,老板过来轻轻拍你肩膀说"我需要你停下来"。这就是中断。

  • 如果你直接说"好的,我停",这是正常响应中断
  • 如果你继续工作,装作没听到,这是忽略中断
  • 如果你之后主动告诉别人"老板曾经中断过我",这是保留中断状态

2-6、实际例子

public class InterruptExample {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            try {
                System.out.println("线程开始睡眠...");
                Thread.sleep(10000);  // 睡眠10秒
            } catch (InterruptedException e) {
                System.out.println("我被中断了!");
                Thread.currentThread().interrupt();  // 恢复中断状态
            }
            
            // 检查中断状态
            if (Thread.currentThread().isInterrupted()) {
                System.out.println("中断状态为真,线程即将退出");
            }
        });
        
        thread.start();
        Thread.sleep(1000);  // 等待1秒
        thread.interrupt();  // 中断线程
    }
}

运行结果:

线程开始睡眠...
我被中断了!
中断状态为真,线程即将退出

2-7、总结

  • Thread.currentThread().interrupt() = 恢复线程的中断状态
  • 它用在 catch(InterruptedException) 里,防止中断信息丢失
  • 这是处理线程中断的最佳实践

三、CountDownLatch工具

3-1、它是干什么的?

CountDownLatch 是一个倒计时器。它让一个线程等待其他多个线程完成后,再继续执行。

3-2、生动比喻

想象你是一个班级的班长,要组织全班去春游:

  • 班长说:"我们等所有人都到达集合点后,再一起出发"
  • 班长创建一个倒计时器,数字设为 50(班级有 50 人)
  • 每当一个学生到达,倒计时器就减 1
  • 当倒计时器变成 0 时,班长就知道所有人都到了,可以出发

3-3、代码中的含义

CountDownLatch latch = new CountDownLatch(6);

这行代码的意思是:创建一个倒计时器,初始值为 6

为什么是 6?因为你有:

  • 5 个生产者线程
  • 1 个消费者线程
  • 总共 6 个线程

3-4、三个关键方法

1. countDown() - 倒计时减 1

latch.countDown();  // 倒计时从 6 变成 5

2. await() - 等待倒计时到 0

latch.await();  // 阻塞当前线程,直到倒计时变成 0

3. getCount() - 查看当前倒计时值

System.out.println(latch.getCount());  // 打印剩余的数字

3-5、代码执行流程

CountDownLatch latch = new CountDownLatch(6);  // 倒计时: 6

// 启动6个线程...

latch.await();  // main线程阻塞,等待这里
               // 等等等等...

// 当6个线程都执行了 latch.countDown() 时
// 倒计时变成 0,main线程才会继续

System.out.println("所有线程都完成了!");

3-6、测试代码

// 创建的6个线程
for (int i = 1; i <= 5; i++) {
    new Thread(() -> {
        try {
            // ... 生产任务
        } finally {
            latch.countDown();  // 线程完成时,倒计时减1
        }
    }).start();
}

new Thread(() -> {
    try {
        // ... 消费任务
    } finally {
        latch.countDown();  // 线程完成时,倒计时减1
    }
}).start();

latch.await();  // main线程在这里等待,直到倒计时变成0

System.out.println("===== 测试完成 =====");  // 所有线程都完成后才执行

3-7、没有 CountDownLatch 会怎样?

// 不用 CountDownLatch
for (int i = 0; i < 6; i++) {
    new Thread(() -> {
        // ... 做工作
    }).start();
}

System.out.println("===== 测试完成 =====");  // 👈 这会立即执行!
// 此时工作线程可能还没完成

这样的话,工作线程还在忙的时候,main 线程就已经打印"测试完成"了。很不安全。

3-8、实际例子

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("班长:请所有同学到集合点集合...\n");
        
        CountDownLatch latch = new CountDownLatch(5);  // 5个学生
        
        // 5个学生线程
        for (int i = 1; i <= 5; i++) {
            final int studentId = i;
            new Thread(() -> {
                try {
                    // 学生去不同的地方,耗时不同
                    long time = (long)(Math.random() * 3000) + 1000;
                    System.out.println("学生" + studentId + ":我在外面,需要" + time + "ms才能到达");
                    Thread.sleep(time);
                    
                    System.out.println("学生" + studentId + ":我到达了!");
                    latch.countDown();  // 倒计时减1
                    System.out.println("【还剩" + latch.getCount() + "个学生未到达】");
                    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "Student-" + i).start();
        }
        
        System.out.println("班长:我在集合点等待...\n");
        latch.await();  // 班长在这里等待,阻塞
        
        System.out.println("\n班长:太好了!所有同学都到了,我们出发去春游!");
    }
}

运行这个代码,你会看到:

班长:请所有同学到集合点集合...

学生1:我在外面,需要1234ms才能到达
学生2:我在外面,需要2567ms才能到达
...
班长:我在集合点等待...

学生1:我到达了!
【还剩4个学生未到达】
学生3:我到达了!
【还剩3个学生未到达】
...
班长:太好了!所有同学都到了,我们出发去春游!

3-9、总结

  • CountDownLatch = 倒计时器
  • 创建时指定初始值(等待的线程个数)
  • 每个工作线程完成后调用 countDown()
  • 主线程调用 await() 阻塞等待
  • 当倒计时变成 0 时,主线程被唤醒

这样可以保证主线程一定要等所有工作线程完成后才继续。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐