Java 线程通信全解析
使当前线程释放持有的对象锁,进入该对象的等待队列(wait set),并进入 WAITING 或 TIMED_WAITING 状态。BlockingQueue是线程安全的队列,提供了阻塞式的入队(put())和出队(take())方法,可直接用于线程通信,无需手动处理锁和等待/唤醒。这是最基础的通信方式,通过synchronized保证锁的独占性,wait()/notify()实现线程间的等待与唤
一、线程通信的基本概念与意义
线程通信是指多个线程通过特定机制交换信息、协调执行顺序的过程。在并发场景中,线程通信的核心目标是解决以下问题:
-
资源协作问题
多个线程共同完成一项任务时,需按特定顺序执行。- 典型场景:生产者-消费者模型中,生产者必须先生产数据,消费者才能消费
- 具体实现:通过共享缓冲区(如阻塞队列)配合wait/notify机制实现有序协作
- 示例:生产线上的装配工序,必须等前道工序完成才能进行后道工序
-
状态同步问题
线程间共享变量的状态变更需要及时通知其他线程。- 关键机制:"等待-唤醒"机制(wait/notify/notifyAll)
- 典型应用:线程池中的任务分配,当新任务到达时需要唤醒工作线程
- 常见模式:条件变量(Condition)配合锁机制实现精准通知
-
避免忙等问题
替代低效的while循环轮询(浪费CPU资源),实现线程的精准唤醒。- 传统做法:while(!condition){} 会持续消耗CPU资源
- 改进方案:使用Object.wait()让线程进入WAITING状态
- 性能对比:忙等可能占用100%CPU,而等待机制几乎不消耗CPU
典型应用示例:在经典的"生产者-消费者模型"中:
- 生产者线程生成数据后调用notify()通知消费者
- 消费者线程在空队列时调用wait()进入等待
- 数据结构通常使用BlockingQueue实现线程安全
- 扩展场景:数据库连接池、消息队列等中间件的底层实现
进阶通信机制:
- 管道通信(PipedInputStream/PipedOutputStream)
- 共享内存(通过volatile变量或Atomic类)
- CountDownLatch/CyclicBarrier等同步工具类
- Future/Callable异步回调机制
二、线程通信的核心方法(基于 Object 类)
Java 中 Object 类的线程通信方法详解
在 Java 中,所有类都隐式继承自 Object
基类。Object
类提供了 3 个用于线程间通信的核心方法:wait()
、notify()
和 notifyAll()
。这些方法都必须配合对象锁(即 synchronized
同步块或同步方法)使用,否则会抛出 IllegalMonitorStateException
异常。下面详细说明这些方法的特性和使用方式。
1. wait() 方法
作用:使当前线程释放持有的对象锁,进入该对象的等待队列(wait set),并进入 WAITING 或 TIMED_WAITING 状态。线程会暂停执行,直到被其他线程唤醒或中断。
重载方法:
wait()
:线程无限期等待,直到被其他线程通过notify()
或notifyAll()
唤醒,或者被中断(抛出InterruptedException
)。wait(long timeout)
:线程最多等待指定的毫秒数(timeout
),超时后自动唤醒;如果在超时前被其他线程唤醒,则提前返回。wait(long timeout, int nanos)
:提供更精确的超时控制,nanos
表示纳秒(范围 0-999999),但实际精度取决于操作系统的调度机制。
注意事项:
- 调用
wait()
后,线程会释放对象锁,因此其他线程可以进入该对象的同步代码块。 - 线程被唤醒后,需要重新竞争对象锁才能继续执行。
示例:
synchronized (sharedObject) {
while (!condition) {
sharedObject.wait(); // 条件不满足时等待
}
// 条件满足后执行后续逻辑
}
2. notify() 方法
作用:从当前对象的等待队列中随机选择一个线程唤醒,使其从 wait()
处恢复执行。被唤醒的线程不会立即运行,而是需要等待当前线程释放对象锁后,才能竞争锁并继续执行。
特点:
- 唤醒的线程是随机的,无法指定具体线程。
- 如果等待队列中没有线程,调用
notify()
不会有任何效果。
适用场景:适用于单生产者-单消费者模型,或只需要唤醒一个线程的场景。
示例:
synchronized (sharedObject) {
sharedObject.notify(); // 唤醒一个等待线程
}
3. notifyAll() 方法
作用:唤醒当前对象等待队列中的所有线程。这些线程会竞争对象锁,获得锁的线程从 wait()
处继续执行,其他线程继续等待。
特点:
- 唤醒所有线程可能导致竞争激烈,需谨慎使用。
- 如果等待队列中没有线程,调用
notifyAll()
不会有任何效果。
适用场景:
- 多生产者-多消费者模型,例如多个线程等待同一资源时,资源更新后需要通知所有线程。
- 需要所有等待线程感知状态变化时(如配置文件更新后,所有线程重新加载配置)。
示例:
synchronized (sharedObject) {
sharedObject.notifyAll(); // 唤醒所有等待线程
}
使用注意事项
- 必须在同步代码块中调用:这三个方法必须由持有对象锁的线程调用,否则会抛出
IllegalMonitorStateException
。 - 条件检查:通常需要在
while
循环中检查条件,避免虚假唤醒(spurious wakeup)。 - 锁的释放与竞争:调用
wait()
会释放锁,而notify()
或notifyAll()
后当前线程需退出同步块才会释放锁。 - 中断处理:
wait()
方法可能被中断,需处理InterruptedException
。
典型应用场景:
- 生产者-消费者模型
- 线程池任务调度
- 多线程协作完成任务(如屏障同步)
三、线程通信的实现方式
除了Object类的基础方法,Java 还提供了多种线程通信工具,适用于不同场景。这些工具可以帮助开发者更好地控制线程间的协作和数据共享,提高程序的并发性能。
1. 基于 synchronized + wait()/notify() 的通信(传统方式)
这是最基础的通信方式,通过synchronized保证锁的独占性,wait()/notify()实现线程间的等待与唤醒。
实现原理
- synchronized关键字确保同一时间只有一个线程可以进入临界区
- wait()方法使当前线程释放锁并进入等待状态
- notify()/notifyAll()唤醒等待该锁的一个/所有线程
完整示例:生产者-消费者模型(单缓冲区)
public class SyncCommDemo {
private static final Object lock = new Object(); // 共享锁对象
private static int data = 0; // 共享数据(缓冲区)
private static boolean hasData = false; // 数据状态标记
// 生产者线程:生产数据
static class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) { // 生产5次数据
synchronized (lock) {
// 若已有数据,等待消费者消费
while (hasData) {
try {
System.out.println("生产者等待...");
lock.wait(); // 释放锁,进入等待
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 生产数据
data = (int) (Math.random() * 100);
System.out.println("生产者生产:" + data);
hasData = true;
lock.notify(); // 唤醒消费者
}
try {
Thread.sleep(500); // 模拟生产耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 消费者线程:消费数据
static class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) { // 消费5次数据
synchronized (lock) {
// 若无数据,等待生产者生产
while (!hasData) {
try {
System.out.println("消费者等待...");
lock.wait(); // 释放锁,进入等待
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 消费数据
System.out.println("消费者消费:" + data);
hasData = false;
lock.notify(); // 唤醒生产者
}
try {
Thread.sleep(1000); // 模拟消费耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
Thread p = new Thread(new Producer());
Thread c = new Thread(new Consumer());
p.start();
c.start();
}
}
核心逻辑
- 通过hasData标记控制线程执行节奏
- wait()释放锁让对方线程执行
- notify()唤醒对方线程
- 实现"生产-消费"交替进行
注意事项
- 必须使用while循环检查条件,而不是if语句,防止虚假唤醒
- notify()随机唤醒一个等待线程,notifyAll()唤醒所有等待线程
- 必须在同步代码块中调用wait()/notify()
2. 基于 ReentrantLock + Condition 的通信(JDK 1.5+)
ReentrantLock是可重入锁,配合Condition接口可实现更灵活的线程通信(类似wait()/notify(),但支持多条件队列)。
Condition 接口核心方法
await()
:对应wait(),释放锁并进入条件队列等待signal()
:对应notify(),唤醒单个等待线程signalAll()
:对应notifyAll(),唤醒所有等待线程
优势
- 可创建多个Condition对象,实现线程按条件分组唤醒
- 支持中断响应、超时等待等更丰富的操作
- 更灵活的锁获取和释放控制
完整示例:多条件通信
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class LockCommDemo {
private static final ReentrantLock lock = new ReentrantLock();
private static final Condition producerCond = lock.newCondition(); // 生产者条件
private static final Condition consumerCond = lock.newCondition(); // 消费者条件
private static int data = 0;
private static boolean hasData = false;
static class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
lock.lock();
try {
while (hasData) {
System.out.println("生产者等待...");
producerCond.await(); // 生产者等待
}
data = (int) (Math.random() * 100);
System.out.println("生产者生产:" + data);
hasData = true;
consumerCond.signal(); // 唤醒消费者
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
lock.lock();
try {
while (!hasData) {
System.out.println("消费者等待...");
consumerCond.await(); // 消费者等待
}
System.out.println("消费者消费:" + data);
hasData = false;
producerCond.signal(); // 唤醒生产者
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
Thread p = new Thread(new Producer());
Thread c = new Thread(new Consumer());
p.start();
c.start();
}
}
应用场景
- 需要精确控制不同条件线程的唤醒
- 需要更灵活的超时或中断处理
- 需要公平锁或非公平锁的选择
3. 基于阻塞队列(BlockingQueue)的通信(JDK 1.5+)
BlockingQueue是线程安全的队列,提供了阻塞式的入队(put())和出队(take())方法,可直接用于线程通信,无需手动处理锁和等待/唤醒。
核心方法
put(E e)
:队列满时阻塞,直到有空间take()
:队列空时阻塞,直到有元素offer(E e, long timeout, TimeUnit unit)
:超时阻塞入队poll(long timeout, TimeUnit unit)
:超时阻塞出队
常用实现类
ArrayBlockingQueue
:基于数组的有界队列LinkedBlockingQueue
:基于链表的可选有界队列SynchronousQueue
:无缓冲队列(生产者与消费者直接交换数据)PriorityBlockingQueue
:支持优先级的无界队列DelayQueue
:基于时间的调度队列
完整示例:用BlockingQueue实现生产者-消费者
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueCommDemo {
private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1); // 容量为1的缓冲区
static class Producer implements Runnable {
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
int data = (int) (Math.random() * 100);
System.out.println("生产者准备生产:" + data);
queue.put(data); // 若队列满则阻塞
System.out.println("生产者生产完成:" + data);
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
System.out.println("消费者等待数据...");
int data = queue.take(); // 若队列空则阻塞
System.out.println("消费者消费:" + data);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
Thread p = new Thread(new Producer());
Thread c = new Thread(new Consumer());
p.start();
c.start();
}
}
优势
- 代码简洁,无需手动控制锁和等待/唤醒
- 由队列内部实现线程安全与通信逻辑
- 提供多种队列实现满足不同场景需求
- 支持超时操作,避免永久阻塞
4. 其他高级通信方式
CountDownLatch
通过计数器实现线程等待(如主线程等待所有子线程完成)
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 开始执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 执行完成");
latch.countDown();
}).start();
}
System.out.println("主线程等待所有子线程完成...");
latch.await();
System.out.println("所有子线程已完成,主线程继续执行");
}
}
CyclicBarrier
让多个线程到达屏障后再同时继续执行
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有线程已到达屏障,执行屏障操作");
});
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 开始执行");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 到达屏障");
barrier.await();
System.out.println(Thread.currentThread().getName() + " 屏障后继续执行");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
Semaphore
通过信号量控制并发线程数,间接实现线程间的协作
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
public static void main(String[] args) {
int permits = 2; // 允许的并发数
Semaphore semaphore = new Semaphore(permits);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 尝试获取许可");
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 获取到许可,执行中");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " 释放许可");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
Exchanger
用于两个线程之间交换数据
import java.util.concurrent.Exchanger;
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
String data = "Thread1数据";
System.out.println("线程1发送: " + data);
String received = exchanger.exchange(data);
System.out.println("线程1接收: " + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
String data = "Thread2数据";
System.out.println("线程2发送: " + data);
String received = exchanger.exchange(data);
System.out.println("线程2接收: " + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
5. 选择合适通信方式的建议
- 简单同步:优先考虑synchronized + wait/notify
- 复杂条件:使用ReentrantLock + Condition
- 生产者-消费者:优先使用BlockingQueue
- 线程计数等待:使用CountDownLatch
- 多线程同步执行:使用CyclicBarrier
- 资源池控制:使用Semaphore
- 线程间数据交换:使用Exchanger
在选择线程通信工具时,应考虑以下因素:
- 通信的复杂性
- 性能要求
- 是否需要公平性
- 是否需要超时或中断支持
- 代码的可读性和维护性
通过合理选择和使用这些工具,可以构建出高效、可靠的并发程序。
四、线程通信的注意事项
1. wait()方法的正确使用
wait()方法必须在synchronized同步块中调用,这是因为:
- wait()的核心机制是释放当前对象的监视器锁
- 调用前必须确保线程已经持有锁,否则会抛出IllegalMonitorStateException
- 典型用法模式:
synchronized (lockObject) {
while (!condition) {
lockObject.wait(); // 释放锁并等待
}
// 条件满足后的处理逻辑
}
2. 等待条件的正确检查方式
必须使用while循环而非if判断条件的原因包括:
- 虚假唤醒:线程可能在没有收到notify/notifyAll的情况下被唤醒
- 条件变化:在wait返回和重新获取锁之间,条件可能再次变化
- 示例对比:
// 危险用法:if判断
if (buffer.isEmpty()) {
wait(); // 唤醒后直接执行后续代码
}
// 安全用法:while循环
while (buffer.isEmpty()) {
wait(); // 唤醒后会再次检查条件
}
3. 唤醒机制的选择策略
notify()的适用场景
- 单生产者-单消费者模型
- 明确知道只需要唤醒一个特定线程
- 对性能要求较高的场景
notifyAll()的适用场景
- 多生产者-多消费者模型
- 不确定应该唤醒哪个线程时
- 条件变化可能涉及多个等待线程时
典型问题案例:在生产者-消费者模型中,如果生产者使用notify(),可能会唤醒另一个生产者而非消费者,导致某些线程永久等待。
4. 同步块内的性能优化
需要避免的常见问题:
- 在同步块内执行IO操作
- 进行复杂计算
- 调用可能阻塞的外部方法
优化建议:
synchronized (lock) {
// 只包含必要的共享变量操作
Object data = prepareData(); // 耗时的准备工作应放在同步块外
sharedQueue.add(data);
lock.notify();
}
5. 中断处理最佳实践
中断处理的正确方式:
try {
while (condition) {
lock.wait();
}
} catch (InterruptedException e) {
// 恢复中断状态,使上层代码能感知
Thread.currentThread().interrupt();
// 执行必要的清理工作
cleanUp();
return; // 通常需要退出执行
}
6. 锁机制的隔离特性
不同通信机制的隔离规则:
通信机制 | 唤醒方式 | 隔离特性 |
---|---|---|
synchronized | notify()/notifyAll() | 同一对象锁内有效 |
Lock+Condition | signal()/signalAll() | 同一Condition对象内有效 |
示例说明:
Lock lock = new ReentrantLock();
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();
// conditionA.signal() 不会唤醒等待conditionB的线程
理解这些隔离特性可以避免跨条件错误唤醒的问题。
五、典型场景与实战建议
1. 生产者-消费者模型
单缓冲区实现
-
synchronized+wait()/notify()方式:使用内置锁机制,通过wait()让线程等待,notify()唤醒等待线程
public class SingleBuffer { private Object data; private boolean empty = true; public synchronized void produce(Object newData) throws InterruptedException { while (!empty) { wait(); } data = newData; empty = false; notifyAll(); } public synchronized Object consume() throws InterruptedException { while (empty) { wait(); } Object result = data; empty = true; notifyAll(); return result; } }
-
Condition方式:使用ReentrantLock和Condition提供更灵活的等待/通知机制
public class SingleBufferWithCondition { private final Lock lock = new ReentrantLock(); private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition(); private Object data; private boolean empty = true; public void produce(Object newData) throws InterruptedException { lock.lock(); try { while (!empty) { notFull.await(); } data = newData; empty = false; notEmpty.signal(); } finally { lock.unlock(); } } }
多缓冲区实现
- BlockingQueue实现:使用ArrayBlockingQueue/LinkedBlockingQueue等线程安全队列
public class MultiBuffer { private final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10); public void produce(Object data) throws InterruptedException { queue.put(data); // 队列满时自动阻塞 } public Object consume() throws InterruptedException { return queue.take(); // 队列空时自动阻塞 } }
2. 线程间的数据传递
简单场景实现
- 共享变量+volatile:适用于单生产者单消费者场景
public class SharedData { private volatile boolean flag = false; private volatile String message; public void setMessage(String msg) { message = msg; flag = true; } public String getMessage() { while (!flag) { /* 自旋等待 */ } return message; } }
复杂场景实现
- BlockingQueue传递数据对象:实现生产者和消费者完全解耦
public class DataPipeline { private final BlockingQueue<DataPacket> queue = new LinkedBlockingQueue<>(); // 生产者线程 public void sendData(DataPacket packet) throws InterruptedException { queue.put(packet); } // 消费者线程 public DataPacket receiveData() throws InterruptedException { return queue.take(); } } class DataPacket { // 复杂的数据对象定义 }
3. 线程池中的任务协作
使用BlockingQueue实现
- 任务结果收集:多个工作线程处理任务后将结果放入共享队列
ExecutorService executor = Executors.newFixedThreadPool(4); BlockingQueue<Result> resultQueue = new LinkedBlockingQueue<>(); for (int i = 0; i < 10; i++) { executor.submit(() -> { Result r = processTask(); resultQueue.put(r); }); } // 主线程收集结果 for (int i = 0; i < 10; i++) { Result r = resultQueue.take(); // 处理结果 }
使用CountDownLatch实现
- 等待子任务完成:主线程等待所有子任务完成后继续执行
int taskCount = 5; CountDownLatch latch = new CountDownLatch(taskCount); for (int i = 0; i < taskCount; i++) { executor.submit(() -> { try { // 执行任务 } finally { latch.countDown(); } }); } // 主线程等待所有任务完成 latch.await(); System.out.println("所有任务已完成");
4. 实战建议与最佳实践
选择适当的通信方式
-
优先使用BlockingQueue:适用于大多数生产者-消费者场景,如:
- 日志处理系统(多生产者单消费者)
- 订单处理系统(单生产者多消费者)
- 消息推送系统(多生产者多消费者)
-
多条件场景使用ReentrantLock+Condition:
- 银行账户转账系统(需要区分存款和取款条件)
- 交通信号灯控制系统(多方向等待条件)
-
简单场景使用synchronized:
- 简单的计数器同步
- 单例模式的双重检查锁定
中断处理规范
try {
while (condition) {
queue.take(); // 可能抛出InterruptedException
}
} catch (InterruptedException e) {
// 恢复中断状态
Thread.currentThread().interrupt();
// 执行清理操作
cleanUp();
}
性能考量
-
LinkedBlockingQueue vs ArrayBlockingQueue:
- LinkedBlockingQueue默认无界(可指定容量),吞吐量通常更高
- ArrayBlockingQueue有界,内存使用更可控
-
锁粒度优化:
- 读写分离场景考虑使用ReadWriteLock
- 高竞争场景考虑使用StampedLock
更多推荐
所有评论(0)