线程通信:wait/notify、Condition、CountDownLatch
在多线程并发编程中,线程并非孤立执行,常常需要协同工作完成复杂任务。例如生产者线程生产数据,消费者线程消费数据,二者需要通过通信协调生产和消费的节奏,避免数据积压或消费空数据。Java 提供了多种线程通信工具,本文将从基础的 wait/notify 机制,到进阶的 Condition 接口,再到实用的并发工具类( CountDownLatch 、 CyclicBarrier 等),全面解析线程通信
一、前言
在多线程并发编程中,线程并非孤立执行,常常需要协同工作完成复杂任务。例如生产者线程生产数据,消费者线程消费数据,二者需要通过通信协调生产和消费的节奏,避免数据积压或消费空数据。Java 提供了多种线程通信工具,
本文将从基础的 wait/notify 机制,到进阶的 Condition 接口,再到实用的并发工具类( CountDownLatch 、 CyclicBarrier 等),全面解析线程通信的实现原理与实战场景。
二、线程通信
线程通信的本质是线程间的状态传递,常见需求包括:
-
等待 / 唤醒:线程 A 执行到某一阶段后,需要等待线程 B 完成任务才能继续执行,线程 B 完成后唤醒线程 A。
-
生产 / 消费:生产者线程生产数据后,唤醒消费者线程消费;消费者线程消费完数据后,唤醒生产者线程生产。
-
多线程同步:多个线程需要等待所有线程都完成某一阶段任务后,再一起执行后续操作。
没有通信的多线程,只能通过轮询的方式检测状态,这会浪费大量 CPU 资源。而合理的通信机制,能让线程在需要等待时释放资源,被唤醒时恢复执行,提升程序性能。
三、基础通信机制
wait() 、 notify() 、 notifyAll() 是 Object 类的成员方法,所有 Java 对象都自带这些方法,它们必须在 synchronized 同步代码块 / 方法中使用,否则会抛出 IllegalMonitorStateException 异常。
1. 核心原理
-
wait():线程调用对象的 wait() 方法时,会释放该对象的锁,并进入该对象的等待队列,直到被其他线程唤醒。
-
notify():线程调用对象的 notify() 方法时,会唤醒该对象等待队列中的一个随机线程,被唤醒的线程需要重新竞争锁才能继续执行。
-
notifyAll():唤醒该对象等待队列中的所有线程,所有被唤醒的线程会竞争锁,只有获取到锁的线程能继续执行。
2. 经典案例:wait/notify 实现生产者 - 消费者模型
import java.util.LinkedList;
import java.util.Queue;
/**
* 生产者-消费者模型:仓库容量为3,生产满时生产者等待,消费空时消费者等待
*/
public class ProducerConsumerWaitNotify {
// 仓库:存储产品
private static final Queue<String> WAREHOUSE = new LinkedList<>();
// 仓库最大容量
private static final int MAX_CAPACITY = 3;
// 锁对象
private static final Object LOCK = new Object();
// 生产者线程:生产产品
static class Producer extends Thread {
@Override
public void run() {
int count = 0;
while (true) {
synchronized (LOCK) {
// 仓库满时,生产者等待
while (WAREHOUSE.size() == MAX_CAPACITY) {
try {
System.out.println("仓库已满,生产者等待...");
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生产产品
String product = "产品" + (++count);
WAREHOUSE.add(product);
System.out.println(Thread.currentThread().getName() + " 生产了:" + product + ",仓库容量:" + WAREHOUSE.size());
// 唤醒消费者线程
LOCK.notifyAll();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 消费者线程:消费产品
static class Consumer extends Thread {
@Override
public void run() {
while (true) {
synchronized (LOCK) {
// 仓库空时,消费者等待
while (WAREHOUSE.isEmpty()) {
try {
System.out.println("仓库已空,消费者等待...");
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 消费产品
String product = WAREHOUSE.poll();
System.out.println(Thread.currentThread().getName() + " 消费了:" + product + ",仓库容量:" + WAREHOUSE.size());
// 唤醒生产者线程
LOCK.notifyAll();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
new Producer().start();
new Consumer().start();
}
}
3. 核心注意事项
-
必须在同步代码块中使用:调用 wait() / notify() 前,线程必须获取该对象的锁,否则会抛出异常。
-
使用 while 循环判断条件:不能用 if 判断,因为线程被唤醒后可能条件仍不满足(如多个消费者被唤醒,仓库已空), while 会重新检查条件。
-
notify() 与 notifyAll() 的选择: notify() 可能导致线程 “假死”(唤醒的是同类线程,如生产者唤醒生产者),推荐使用 notifyAll() 保证唤醒正确的线程。
四、Condition 接口
Condition 是 java.util.concurrent.locks 包下的接口,是 Lock 锁的配套通信工具,相比 wait/notify 更加灵活,支持精准唤醒指定线程组。
1. 核心原理
-
Condition 对象由 Lock 对象创建,一个 Lock 可以创建多个 Condition 对象,对应多个等待队列。
-
await():类似 wait() ,线程释放锁并进入 Condition 的等待队列。
-
signal():类似 notify() ,唤醒 Condition 等待队列中的一个线程。
-
signalAll():类似 notifyAll() ,唤醒 Condition 等待队列中的所有线程。
2. 经典案例:Condition 实现精准唤醒的生产者 - 消费者模型
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Condition实现生产者-消费者:精准唤醒生产者/消费者,避免唤醒同类线程
*/
public class ProducerConsumerCondition {
private static final Queue<String> WAREHOUSE = new LinkedList<>();
private static final int MAX_CAPACITY = 3;
// 创建可重入锁
private static final Lock LOCK = new ReentrantLock();
// 生产者等待的Condition
private static final Condition PRODUCER_COND = LOCK.newCondition();
// 消费者等待的Condition
private static final Condition CONSUMER_COND = LOCK.newCondition();
static class Producer extends Thread {
@Override
public void run() {
int count = 0;
while (true) {
LOCK.lock();
try {
while (WAREHOUSE.size() == MAX_CAPACITY) {
System.out.println("仓库已满,生产者等待...");
// 生产者进入PRODUCER_COND等待队列
PRODUCER_COND.await();
}
String product = "产品" + (++count);
WAREHOUSE.add(product);
System.out.println(Thread.currentThread().getName() + " 生产了:" + product + ",仓库容量:" + WAREHOUSE.size());
// 精准唤醒消费者线程
CONSUMER_COND.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Consumer extends Thread {
@Override
public void run() {
while (true) {
LOCK.lock();
try {
while (WAREHOUSE.isEmpty()) {
System.out.println("仓库已空,消费者等待...");
// 消费者进入CONSUMER_COND等待队列
CONSUMER_COND.await();
}
String product = WAREHOUSE.poll();
System.out.println(Thread.currentThread().getName() + " 消费了:" + product + ",仓库容量:" + WAREHOUSE.size());
// 精准唤醒生产者线程
PRODUCER_COND.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
new Producer().start();
new Consumer().start();
}
}
3. Condition 与 wait/notify 的核心对比
|
特性 |
wait/notify |
Condition |
|---|---|---|
|
锁依赖 |
依赖 |
依赖 |
|
等待队列 |
一个对象只有一个等待队列 |
一个Lock可以有多个等待队列 |
|
唤醒方式 |
随机唤醒一个或全部唤醒,无法精准唤醒 |
可以精准唤醒指定条件队列的线程 |
|
灵活性 |
较低 |
较高,支持更复杂的线程协作场景 |
|
使用方式 |
直接调用对象的方法,需在同步代码块中 |
通过Lock创建,需在lock/unlock之间使用 |
五、实用并发工具类
除了基础的等待唤醒机制,JUC 包还提供了一些高级工具类,简化多线程通信与同步的实现。
1. CountDownLatch:倒计时门闩,等待多个线程完成任务
核心作用:让一个或多个线程等待其他多个线程完成任务后,再继续执行。
原理:初始化一个计数器,线程完成任务后调用 countDown() 方法让计数器减 1,等待线程调用 await() 方法阻塞,直到计数器变为 0。
适用场景:主线程等待多个子线程初始化完成后,再执行后续逻辑。
代码案例:CountDownLatch 实现线程初始化等待
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
// 计数器:需要3个线程初始化完成
private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(3);
static class InitThread extends Thread {
private final String threadName;
public InitThread(String threadName) {
this.threadName = threadName;
}
@Override
public void run() {
System.out.println(threadName + " 开始初始化...");
try {
// 模拟初始化耗时
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(threadName + " 初始化完成!");
// 计数器减1
COUNT_DOWN_LATCH.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
new InitThread("初始化线程1").start();
new InitThread("初始化线程2").start();
new InitThread("初始化线程3").start();
System.out.println("主线程等待所有初始化线程完成...");
// 主线程阻塞,直到计数器为0
COUNT_DOWN_LATCH.await();
System.out.println("所有线程初始化完成,主线程开始执行任务!");
}
}
2. CyclicBarrier:循环屏障,让多个线程到达同一屏障点后再继续
核心作用:让多个线程相互等待,直到所有线程都到达指定的 “屏障点”,再一起继续执行。
原理:初始化屏障的线程数量,线程到达屏障点时调用 await() 方法阻塞,当所有线程都调用 await() 后,屏障打开,所有线程继续执行。
适用场景:多个线程需要协同完成任务,必须等所有线程都准备好后才能开始执行。
与 CountDownLatch 的区别: CyclicBarrier 可以重复使用(调用 reset() 重置屏障), CountDownLatch 的计数器只能使用一次。
代码案例:CyclicBarrier 实现多线程协同执行
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
// 屏障:3个线程到达后一起执行
private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(3, () -> {
System.out.println("所有线程已到达屏障点,开始执行任务!");
});
static class WorkerThread extends Thread {
private final String threadName;
public WorkerThread(String threadName) {
this.threadName = threadName;
}
@Override
public void run() {
System.out.println(threadName + " 正在前往屏障点...");
try {
Thread.sleep((long) (Math.random() * 1000));
System.out.println(threadName + " 到达屏障点,等待其他线程...");
// 到达屏障点,等待其他线程
CYCLIC_BARRIER.await();
// 屏障打开后执行的任务
System.out.println(threadName + " 执行任务ing...");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new WorkerThread("工作线程1").start();
new WorkerThread("工作线程2").start();
new WorkerThread("工作线程3").start();
}
}
3. Semaphore:信号量,控制同时访问资源的线程数量
核心作用:控制同时访问特定资源的线程数量,通过信号量的 “许可” 机制实现。
原理:初始化许可数量,线程访问资源前调用 acquire() 获取许可,访问完成后调用 release() 释放许可;若没有可用许可,线程会阻塞直到有许可被释放。
适用场景:限流场景,如限制同时连接数据库的线程数量、限制接口的并发访问量。
代码案例:Semaphore 实现接口限流
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
// 信号量:最多允许2个线程同时访问资源
private static final Semaphore SEMAPHORE = new Semaphore(2);
static class AccessThread extends Thread {
private final String threadName;
public AccessThread(String threadName) {
this.threadName = threadName;
}
@Override
public void run() {
try {
// 获取许可
SEMAPHORE.acquire();
System.out.println(threadName + " 获取到许可,开始访问资源...");
Thread.sleep(1000);
System.out.println(threadName + " 访问资源完成,释放许可!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放许可
SEMAPHORE.release();
}
}
}
public static void main(String[] args) {
// 5个线程竞争2个许可
for (int i = 1; i <= 5; i++) {
new AccessThread("访问线程" + i).start();
}
}
}
六、线程通信的核心总结
-
基础场景:简单的生产者 - 消费者模型,使用 wait/notify 即可满足需求,注意在同步代码块中使用并通过 while 判断条件。
-
进阶场景:需要精准唤醒指定线程组时,优先使用 Condition 接口,搭配 ReentrantLock 锁,灵活性更高。
-
复杂场景:多线程同步等待(如初始化、协同执行)使用 CountDownLatch / CyclicBarrier ;限流场景使用 Semaphore 。
-
核心原则:线程通信的本质是 状态共享与协同 ,无论使用哪种工具,都要保证共享状态的线程安全,避免出现数据错乱。
七、总结
本文详细讲解了 Java 线程通信的多种方式,从基础的 wait/notify 机制,到进阶的 Condition 接口,再到实用的 JUC 工具类,每种方式都有其适用场景。掌握这些通信工具,能帮助我们更好地实现多线程的协同工作,解决复杂的并发问题。
更多推荐



所有评论(0)