一、线程通信的基本概念与意义

线程通信是指多个线程通过特定机制交换信息、协调执行顺序的过程。在并发场景中,线程通信的核心目标是解决以下问题:

  1. 资源协作问题
    多个线程共同完成一项任务时,需按特定顺序执行。

    • 典型场景:生产者-消费者模型中,生产者必须先生产数据,消费者才能消费
    • 具体实现:通过共享缓冲区(如阻塞队列)配合wait/notify机制实现有序协作
    • 示例:生产线上的装配工序,必须等前道工序完成才能进行后道工序
  2. 状态同步问题
    线程间共享变量的状态变更需要及时通知其他线程。

    • 关键机制:"等待-唤醒"机制(wait/notify/notifyAll)
    • 典型应用:线程池中的任务分配,当新任务到达时需要唤醒工作线程
    • 常见模式:条件变量(Condition)配合锁机制实现精准通知
  3. 避免忙等问题
    替代低效的while循环轮询(浪费CPU资源),实现线程的精准唤醒。

    • 传统做法:while(!condition){} 会持续消耗CPU资源
    • 改进方案:使用Object.wait()让线程进入WAITING状态
    • 性能对比:忙等可能占用100%CPU,而等待机制几乎不消耗CPU

典型应用示例:在经典的"生产者-消费者模型"中:

  1. 生产者线程生成数据后调用notify()通知消费者
  2. 消费者线程在空队列时调用wait()进入等待
  3. 数据结构通常使用BlockingQueue实现线程安全
  4. 扩展场景:数据库连接池、消息队列等中间件的底层实现

进阶通信机制

  • 管道通信(PipedInputStream/PipedOutputStream)
  • 共享内存(通过volatile变量或Atomic类)
  • CountDownLatch/CyclicBarrier等同步工具类
  • Future/Callable异步回调机制

二、线程通信的核心方法(基于 Object 类)

Java 中 Object 类的线程通信方法详解

在 Java 中,所有类都隐式继承自 Object 基类。Object 类提供了 3 个用于线程间通信的核心方法:wait()notify()notifyAll()。这些方法都必须配合对象锁(即 synchronized 同步块或同步方法)使用,否则会抛出 IllegalMonitorStateException 异常。下面详细说明这些方法的特性和使用方式。


1. wait() 方法

作用:使当前线程释放持有的对象锁,进入该对象的等待队列(wait set),并进入 WAITING 或 TIMED_WAITING 状态。线程会暂停执行,直到被其他线程唤醒或中断。

重载方法

  • wait():线程无限期等待,直到被其他线程通过 notify()notifyAll() 唤醒,或者被中断(抛出 InterruptedException)。
  • wait(long timeout):线程最多等待指定的毫秒数(timeout),超时后自动唤醒;如果在超时前被其他线程唤醒,则提前返回。
  • wait(long timeout, int nanos):提供更精确的超时控制,nanos 表示纳秒(范围 0-999999),但实际精度取决于操作系统的调度机制。

注意事项

  • 调用 wait() 后,线程会释放对象锁,因此其他线程可以进入该对象的同步代码块。
  • 线程被唤醒后,需要重新竞争对象锁才能继续执行。

示例

synchronized (sharedObject) {
    while (!condition) {
        sharedObject.wait(); // 条件不满足时等待
    }
    // 条件满足后执行后续逻辑
}


2. notify() 方法

作用:从当前对象的等待队列中随机选择一个线程唤醒,使其从 wait() 处恢复执行。被唤醒的线程不会立即运行,而是需要等待当前线程释放对象锁后,才能竞争锁并继续执行。

特点

  • 唤醒的线程是随机的,无法指定具体线程。
  • 如果等待队列中没有线程,调用 notify() 不会有任何效果。

适用场景:适用于单生产者-单消费者模型,或只需要唤醒一个线程的场景。

示例

synchronized (sharedObject) {
    sharedObject.notify(); // 唤醒一个等待线程
}


3. notifyAll() 方法

作用:唤醒当前对象等待队列中的所有线程。这些线程会竞争对象锁,获得锁的线程从 wait() 处继续执行,其他线程继续等待。

特点

  • 唤醒所有线程可能导致竞争激烈,需谨慎使用。
  • 如果等待队列中没有线程,调用 notifyAll() 不会有任何效果。

适用场景

  • 多生产者-多消费者模型,例如多个线程等待同一资源时,资源更新后需要通知所有线程。
  • 需要所有等待线程感知状态变化时(如配置文件更新后,所有线程重新加载配置)。

示例

synchronized (sharedObject) {
    sharedObject.notifyAll(); // 唤醒所有等待线程
}


使用注意事项

  1. 必须在同步代码块中调用:这三个方法必须由持有对象锁的线程调用,否则会抛出 IllegalMonitorStateException
  2. 条件检查:通常需要在 while 循环中检查条件,避免虚假唤醒(spurious wakeup)。
  3. 锁的释放与竞争:调用 wait() 会释放锁,而 notify()notifyAll() 后当前线程需退出同步块才会释放锁。
  4. 中断处理wait() 方法可能被中断,需处理 InterruptedException

典型应用场景

  • 生产者-消费者模型
  • 线程池任务调度
  • 多线程协作完成任务(如屏障同步)

三、线程通信的实现方式

除了Object类的基础方法,Java 还提供了多种线程通信工具,适用于不同场景。这些工具可以帮助开发者更好地控制线程间的协作和数据共享,提高程序的并发性能。

1. 基于 synchronized + wait()/notify() 的通信(传统方式)

这是最基础的通信方式,通过synchronized保证锁的独占性,wait()/notify()实现线程间的等待与唤醒。

实现原理

  • synchronized关键字确保同一时间只有一个线程可以进入临界区
  • wait()方法使当前线程释放锁并进入等待状态
  • notify()/notifyAll()唤醒等待该锁的一个/所有线程

完整示例:生产者-消费者模型(单缓冲区)

public class SyncCommDemo {

    private static final Object lock = new Object(); // 共享锁对象
    private static int data = 0; // 共享数据(缓冲区)
    private static boolean hasData = false; // 数据状态标记

    // 生产者线程:生产数据
    static class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 5; i++) { // 生产5次数据
                synchronized (lock) {
                    // 若已有数据,等待消费者消费
                    while (hasData) {
                        try {
                            System.out.println("生产者等待...");
                            lock.wait(); // 释放锁,进入等待
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                    // 生产数据
                    data = (int) (Math.random() * 100);
                    System.out.println("生产者生产:" + data);
                    hasData = true;
                    lock.notify(); // 唤醒消费者
                }
                try {
                    Thread.sleep(500); // 模拟生产耗时
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    // 消费者线程:消费数据
    static class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 5; i++) { // 消费5次数据
                synchronized (lock) {
                    // 若无数据,等待生产者生产
                    while (!hasData) {
                        try {
                            System.out.println("消费者等待...");
                            lock.wait(); // 释放锁,进入等待
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                    // 消费数据
                    System.out.println("消费者消费:" + data);
                    hasData = false;
                    lock.notify(); // 唤醒生产者
                }
                try {
                    Thread.sleep(1000); // 模拟消费耗时
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        Thread p = new Thread(new Producer());
        Thread c = new Thread(new Consumer());
        p.start();
        c.start();
    }
}

核心逻辑

  1. 通过hasData标记控制线程执行节奏
  2. wait()释放锁让对方线程执行
  3. notify()唤醒对方线程
  4. 实现"生产-消费"交替进行

注意事项

  • 必须使用while循环检查条件,而不是if语句,防止虚假唤醒
  • notify()随机唤醒一个等待线程,notifyAll()唤醒所有等待线程
  • 必须在同步代码块中调用wait()/notify()

2. 基于 ReentrantLock + Condition 的通信(JDK 1.5+)

ReentrantLock是可重入锁,配合Condition接口可实现更灵活的线程通信(类似wait()/notify(),但支持多条件队列)。

Condition 接口核心方法

  • await():对应wait(),释放锁并进入条件队列等待
  • signal():对应notify(),唤醒单个等待线程
  • signalAll():对应notifyAll(),唤醒所有等待线程

优势

  1. 可创建多个Condition对象,实现线程按条件分组唤醒
  2. 支持中断响应、超时等待等更丰富的操作
  3. 更灵活的锁获取和释放控制

完整示例:多条件通信

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class LockCommDemo {
    private static final ReentrantLock lock = new ReentrantLock();
    private static final Condition producerCond = lock.newCondition(); // 生产者条件
    private static final Condition consumerCond = lock.newCondition(); // 消费者条件
    private static int data = 0;
    private static boolean hasData = false;

    static class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 5; i++) {
                lock.lock();
                try {
                    while (hasData) {
                        System.out.println("生产者等待...");
                        producerCond.await(); // 生产者等待
                    }
                    data = (int) (Math.random() * 100);
                    System.out.println("生产者生产:" + data);
                    hasData = true;
                    consumerCond.signal(); // 唤醒消费者
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    lock.unlock();
                }
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 5; i++) {
                lock.lock();
                try {
                    while (!hasData) {
                        System.out.println("消费者等待...");
                        consumerCond.await(); // 消费者等待
                    }
                    System.out.println("消费者消费:" + data);
                    hasData = false;
                    producerCond.signal(); // 唤醒生产者
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    lock.unlock();
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        Thread p = new Thread(new Producer());
        Thread c = new Thread(new Consumer());
        p.start();
        c.start();
    }
}

应用场景

  • 需要精确控制不同条件线程的唤醒
  • 需要更灵活的超时或中断处理
  • 需要公平锁或非公平锁的选择

3. 基于阻塞队列(BlockingQueue)的通信(JDK 1.5+)

BlockingQueue是线程安全的队列,提供了阻塞式的入队(put())和出队(take())方法,可直接用于线程通信,无需手动处理锁和等待/唤醒。

核心方法

  • put(E e):队列满时阻塞,直到有空间
  • take():队列空时阻塞,直到有元素
  • offer(E e, long timeout, TimeUnit unit):超时阻塞入队
  • poll(long timeout, TimeUnit unit):超时阻塞出队

常用实现类

  1. ArrayBlockingQueue:基于数组的有界队列
  2. LinkedBlockingQueue:基于链表的可选有界队列
  3. SynchronousQueue:无缓冲队列(生产者与消费者直接交换数据)
  4. PriorityBlockingQueue:支持优先级的无界队列
  5. DelayQueue:基于时间的调度队列

完整示例:用BlockingQueue实现生产者-消费者

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueCommDemo {
    private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1); // 容量为1的缓冲区

    static class Producer implements Runnable {
        @Override
        public void run() {
            try {
                for (int i = 0; i < 5; i++) {
                    int data = (int) (Math.random() * 100);
                    System.out.println("生产者准备生产:" + data);
                    queue.put(data); // 若队列满则阻塞
                    System.out.println("生产者生产完成:" + data);
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    static class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                for (int i = 0; i < 5; i++) {
                    System.out.println("消费者等待数据...");
                    int data = queue.take(); // 若队列空则阻塞
                    System.out.println("消费者消费:" + data);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public static void main(String[] args) {
        Thread p = new Thread(new Producer());
        Thread c = new Thread(new Consumer());
        p.start();
        c.start();
    }
}

优势

  1. 代码简洁,无需手动控制锁和等待/唤醒
  2. 由队列内部实现线程安全与通信逻辑
  3. 提供多种队列实现满足不同场景需求
  4. 支持超时操作,避免永久阻塞

4. 其他高级通信方式

CountDownLatch

通过计数器实现线程等待(如主线程等待所有子线程完成)

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CountDownLatch latch = new CountDownLatch(threadCount);
        
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " 开始执行");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 执行完成");
                latch.countDown();
            }).start();
        }
        
        System.out.println("主线程等待所有子线程完成...");
        latch.await();
        System.out.println("所有子线程已完成,主线程继续执行");
    }
}

CyclicBarrier

让多个线程到达屏障后再同时继续执行

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        int threadCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
            System.out.println("所有线程已到达屏障,执行屏障操作");
        });
        
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " 开始执行");
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + " 到达屏障");
                    barrier.await();
                    System.out.println(Thread.currentThread().getName() + " 屏障后继续执行");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

Semaphore

通过信号量控制并发线程数,间接实现线程间的协作

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    public static void main(String[] args) {
        int permits = 2; // 允许的并发数
        Semaphore semaphore = new Semaphore(permits);
        
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " 尝试获取许可");
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + " 获取到许可,执行中");
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName() + " 释放许可");
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

Exchanger

用于两个线程之间交换数据

import java.util.concurrent.Exchanger;

public class ExchangerDemo {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        
        new Thread(() -> {
            try {
                String data = "Thread1数据";
                System.out.println("线程1发送: " + data);
                String received = exchanger.exchange(data);
                System.out.println("线程1接收: " + received);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        
        new Thread(() -> {
            try {
                String data = "Thread2数据";
                System.out.println("线程2发送: " + data);
                String received = exchanger.exchange(data);
                System.out.println("线程2接收: " + received);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

5. 选择合适通信方式的建议

  1. 简单同步:优先考虑synchronized + wait/notify
  2. 复杂条件:使用ReentrantLock + Condition
  3. 生产者-消费者:优先使用BlockingQueue
  4. 线程计数等待:使用CountDownLatch
  5. 多线程同步执行:使用CyclicBarrier
  6. 资源池控制:使用Semaphore
  7. 线程间数据交换:使用Exchanger

在选择线程通信工具时,应考虑以下因素:

  • 通信的复杂性
  • 性能要求
  • 是否需要公平性
  • 是否需要超时或中断支持
  • 代码的可读性和维护性

通过合理选择和使用这些工具,可以构建出高效、可靠的并发程序。

四、线程通信的注意事项

1. wait()方法的正确使用

wait()方法必须在synchronized同步块中调用,这是因为:

  • wait()的核心机制是释放当前对象的监视器锁
  • 调用前必须确保线程已经持有锁,否则会抛出IllegalMonitorStateException
  • 典型用法模式:
synchronized (lockObject) {
    while (!condition) {
        lockObject.wait(); // 释放锁并等待
    }
    // 条件满足后的处理逻辑
}

2. 等待条件的正确检查方式

必须使用while循环而非if判断条件的原因包括:

  • 虚假唤醒:线程可能在没有收到notify/notifyAll的情况下被唤醒
  • 条件变化:在wait返回和重新获取锁之间,条件可能再次变化
  • 示例对比:
// 危险用法:if判断
if (buffer.isEmpty()) {
    wait(); // 唤醒后直接执行后续代码
}

// 安全用法:while循环
while (buffer.isEmpty()) {
    wait(); // 唤醒后会再次检查条件
}

3. 唤醒机制的选择策略

notify()的适用场景

  • 单生产者-单消费者模型
  • 明确知道只需要唤醒一个特定线程
  • 对性能要求较高的场景

notifyAll()的适用场景

  • 多生产者-多消费者模型
  • 不确定应该唤醒哪个线程时
  • 条件变化可能涉及多个等待线程时

典型问题案例:在生产者-消费者模型中,如果生产者使用notify(),可能会唤醒另一个生产者而非消费者,导致某些线程永久等待。

4. 同步块内的性能优化

需要避免的常见问题:

  • 在同步块内执行IO操作
  • 进行复杂计算
  • 调用可能阻塞的外部方法

优化建议:

synchronized (lock) {
    // 只包含必要的共享变量操作
    Object data = prepareData(); // 耗时的准备工作应放在同步块外
    sharedQueue.add(data);
    lock.notify();
}

5. 中断处理最佳实践

中断处理的正确方式:

try {
    while (condition) {
        lock.wait();
    }
} catch (InterruptedException e) {
    // 恢复中断状态,使上层代码能感知
    Thread.currentThread().interrupt();
    // 执行必要的清理工作
    cleanUp();
    return; // 通常需要退出执行
}

6. 锁机制的隔离特性

不同通信机制的隔离规则:

通信机制 唤醒方式 隔离特性
synchronized notify()/notifyAll() 同一对象锁内有效
Lock+Condition signal()/signalAll() 同一Condition对象内有效

示例说明:

Lock lock = new ReentrantLock();
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();

// conditionA.signal() 不会唤醒等待conditionB的线程

理解这些隔离特性可以避免跨条件错误唤醒的问题。

五、典型场景与实战建议

1. 生产者-消费者模型

单缓冲区实现

  • synchronized+wait()/notify()方式:使用内置锁机制,通过wait()让线程等待,notify()唤醒等待线程

    public class SingleBuffer {
        private Object data;
        private boolean empty = true;
        
        public synchronized void produce(Object newData) throws InterruptedException {
            while (!empty) {
                wait();
            }
            data = newData;
            empty = false;
            notifyAll();
        }
        
        public synchronized Object consume() throws InterruptedException {
            while (empty) {
                wait();
            }
            Object result = data;
            empty = true;
            notifyAll();
            return result;
        }
    }
    

  • Condition方式:使用ReentrantLock和Condition提供更灵活的等待/通知机制

    public class SingleBufferWithCondition {
        private final Lock lock = new ReentrantLock();
        private final Condition notEmpty = lock.newCondition();
        private final Condition notFull = lock.newCondition();
        private Object data;
        private boolean empty = true;
        
        public void produce(Object newData) throws InterruptedException {
            lock.lock();
            try {
                while (!empty) {
                    notFull.await();
                }
                data = newData;
                empty = false;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }
    }
    

多缓冲区实现

  • BlockingQueue实现:使用ArrayBlockingQueue/LinkedBlockingQueue等线程安全队列
    public class MultiBuffer {
        private final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
        
        public void produce(Object data) throws InterruptedException {
            queue.put(data);  // 队列满时自动阻塞
        }
        
        public Object consume() throws InterruptedException {
            return queue.take();  // 队列空时自动阻塞
        }
    }
    

2. 线程间的数据传递

简单场景实现

  • 共享变量+volatile:适用于单生产者单消费者场景
    public class SharedData {
        private volatile boolean flag = false;
        private volatile String message;
        
        public void setMessage(String msg) {
            message = msg;
            flag = true;
        }
        
        public String getMessage() {
            while (!flag) { /* 自旋等待 */ }
            return message;
        }
    }
    

复杂场景实现

  • BlockingQueue传递数据对象:实现生产者和消费者完全解耦
    public class DataPipeline {
        private final BlockingQueue<DataPacket> queue = new LinkedBlockingQueue<>();
        
        // 生产者线程
        public void sendData(DataPacket packet) throws InterruptedException {
            queue.put(packet);
        }
        
        // 消费者线程
        public DataPacket receiveData() throws InterruptedException {
            return queue.take();
        }
    }
    
    class DataPacket {
        // 复杂的数据对象定义
    }
    

3. 线程池中的任务协作

使用BlockingQueue实现

  • 任务结果收集:多个工作线程处理任务后将结果放入共享队列
    ExecutorService executor = Executors.newFixedThreadPool(4);
    BlockingQueue<Result> resultQueue = new LinkedBlockingQueue<>();
    
    for (int i = 0; i < 10; i++) {
        executor.submit(() -> {
            Result r = processTask();
            resultQueue.put(r);
        });
    }
    
    // 主线程收集结果
    for (int i = 0; i < 10; i++) {
        Result r = resultQueue.take();
        // 处理结果
    }
    

使用CountDownLatch实现

  • 等待子任务完成:主线程等待所有子任务完成后继续执行
    int taskCount = 5;
    CountDownLatch latch = new CountDownLatch(taskCount);
    
    for (int i = 0; i < taskCount; i++) {
        executor.submit(() -> {
            try {
                // 执行任务
            } finally {
                latch.countDown();
            }
        });
    }
    
    // 主线程等待所有任务完成
    latch.await();
    System.out.println("所有任务已完成");
    

4. 实战建议与最佳实践

选择适当的通信方式

  1. 优先使用BlockingQueue:适用于大多数生产者-消费者场景,如:

    • 日志处理系统(多生产者单消费者)
    • 订单处理系统(单生产者多消费者)
    • 消息推送系统(多生产者多消费者)
  2. 多条件场景使用ReentrantLock+Condition

    • 银行账户转账系统(需要区分存款和取款条件)
    • 交通信号灯控制系统(多方向等待条件)
  3. 简单场景使用synchronized

    • 简单的计数器同步
    • 单例模式的双重检查锁定

中断处理规范

try {
    while (condition) {
        queue.take();  // 可能抛出InterruptedException
    }
} catch (InterruptedException e) {
    // 恢复中断状态
    Thread.currentThread().interrupt();
    // 执行清理操作
    cleanUp();
}

性能考量

  • LinkedBlockingQueue vs ArrayBlockingQueue

    • LinkedBlockingQueue默认无界(可指定容量),吞吐量通常更高
    • ArrayBlockingQueue有界,内存使用更可控
  • 锁粒度优化

    • 读写分离场景考虑使用ReadWriteLock
    • 高竞争场景考虑使用StampedLock

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐