一、前言

        在多线程并发编程中,线程并非孤立执行,常常需要协同工作完成复杂任务。例如生产者线程生产数据,消费者线程消费数据,二者需要通过通信协调生产和消费的节奏,避免数据积压或消费空数据。Java 提供了多种线程通信工具,

        本文将从基础的 wait/notify 机制,到进阶的 Condition 接口,再到实用的并发工具类( CountDownLatch 、 CyclicBarrier 等),全面解析线程通信的实现原理与实战场景。

二、线程通信

线程通信的本质是线程间的状态传递,常见需求包括:

  • 等待 / 唤醒:线程 A 执行到某一阶段后,需要等待线程 B 完成任务才能继续执行,线程 B 完成后唤醒线程 A。

  • 生产 / 消费:生产者线程生产数据后,唤醒消费者线程消费;消费者线程消费完数据后,唤醒生产者线程生产。

  • 多线程同步:多个线程需要等待所有线程都完成某一阶段任务后,再一起执行后续操作。

        没有通信的多线程,只能通过轮询的方式检测状态,这会浪费大量 CPU 资源。而合理的通信机制,能让线程在需要等待时释放资源,被唤醒时恢复执行,提升程序性能。

三、基础通信机制

wait() 、 notify() 、 notifyAll() 是 Object 类的成员方法,所有 Java 对象都自带这些方法,它们必须在 synchronized 同步代码块 / 方法中使用,否则会抛出 IllegalMonitorStateException 异常。

1. 核心原理

  • wait():线程调用对象的 wait() 方法时,会释放该对象的锁,并进入该对象的等待队列,直到被其他线程唤醒。

  • notify():线程调用对象的 notify() 方法时,会唤醒该对象等待队列中的一个随机线程,被唤醒的线程需要重新竞争锁才能继续执行。

  • notifyAll():唤醒该对象等待队列中的所有线程,所有被唤醒的线程会竞争锁,只有获取到锁的线程能继续执行。

2. 经典案例:wait/notify 实现生产者 - 消费者模型

import java.util.LinkedList;
import java.util.Queue;
/**
 * 生产者-消费者模型:仓库容量为3,生产满时生产者等待,消费空时消费者等待
 */
public class ProducerConsumerWaitNotify {
    // 仓库:存储产品
    private static final Queue<String> WAREHOUSE = new LinkedList<>();
    // 仓库最大容量
    private static final int MAX_CAPACITY = 3;
    // 锁对象
    private static final Object LOCK = new Object();
    // 生产者线程:生产产品
    static class Producer extends Thread {
        @Override
        public void run() {
            int count = 0;
            while (true) {
                synchronized (LOCK) {
                    // 仓库满时,生产者等待
                    while (WAREHOUSE.size() == MAX_CAPACITY) {
                        try {
                            System.out.println("仓库已满,生产者等待...");
                            LOCK.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    // 生产产品
                    String product = "产品" + (++count);
                    WAREHOUSE.add(product);
                    System.out.println(Thread.currentThread().getName() + " 生产了:" + product + ",仓库容量:" + WAREHOUSE.size());
                    // 唤醒消费者线程
                    LOCK.notifyAll();
                }
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    // 消费者线程:消费产品
    static class Consumer extends Thread {
        @Override
        public void run() {
            while (true) {
                synchronized (LOCK) {
                    // 仓库空时,消费者等待
                    while (WAREHOUSE.isEmpty()) {
                        try {
                            System.out.println("仓库已空,消费者等待...");
                            LOCK.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    // 消费产品
                    String product = WAREHOUSE.poll();
                    System.out.println(Thread.currentThread().getName() + " 消费了:" + product + ",仓库容量:" + WAREHOUSE.size());
                    // 唤醒生产者线程
                    LOCK.notifyAll();
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) {
        new Producer().start();
        new Consumer().start();
    }
}

3. 核心注意事项

  • 必须在同步代码块中使用:调用 wait() / notify() 前,线程必须获取该对象的锁,否则会抛出异常。

  • 使用 while 循环判断条件:不能用 if 判断,因为线程被唤醒后可能条件仍不满足(如多个消费者被唤醒,仓库已空), while 会重新检查条件。

  • notify() 与 notifyAll() 的选择: notify() 可能导致线程 “假死”(唤醒的是同类线程,如生产者唤醒生产者),推荐使用 notifyAll() 保证唤醒正确的线程。

四、Condition 接口

Condition 是 java.util.concurrent.locks 包下的接口,是 Lock 锁的配套通信工具,相比 wait/notify 更加灵活,支持精准唤醒指定线程组

1. 核心原理

  • Condition 对象由 Lock 对象创建,一个 Lock 可以创建多个 Condition 对象,对应多个等待队列。

  • await():类似 wait() ,线程释放锁并进入 Condition 的等待队列。

  • signal():类似 notify() ,唤醒 Condition 等待队列中的一个线程。

  • signalAll():类似 notifyAll() ,唤醒 Condition 等待队列中的所有线程。

2. 经典案例:Condition 实现精准唤醒的生产者 - 消费者模型

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 * Condition实现生产者-消费者:精准唤醒生产者/消费者,避免唤醒同类线程
 */
public class ProducerConsumerCondition {
    private static final Queue<String> WAREHOUSE = new LinkedList<>();
    private static final int MAX_CAPACITY = 3;
    // 创建可重入锁
    private static final Lock LOCK = new ReentrantLock();
    // 生产者等待的Condition
    private static final Condition PRODUCER_COND = LOCK.newCondition();
    // 消费者等待的Condition
    private static final Condition CONSUMER_COND = LOCK.newCondition();
    static class Producer extends Thread {
        @Override
        public void run() {
            int count = 0;
            while (true) {
                LOCK.lock();
                try {
                    while (WAREHOUSE.size() == MAX_CAPACITY) {
                        System.out.println("仓库已满,生产者等待...");
                        // 生产者进入PRODUCER_COND等待队列
                        PRODUCER_COND.await();
                    }
                    String product = "产品" + (++count);
                    WAREHOUSE.add(product);
                    System.out.println(Thread.currentThread().getName() + " 生产了:" + product + ",仓库容量:" + WAREHOUSE.size());
                    // 精准唤醒消费者线程
                    CONSUMER_COND.signalAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    LOCK.unlock();
                }
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    static class Consumer extends Thread {
        @Override
        public void run() {
            while (true) {
                LOCK.lock();
                try {
                    while (WAREHOUSE.isEmpty()) {
                        System.out.println("仓库已空,消费者等待...");
                        // 消费者进入CONSUMER_COND等待队列
                        CONSUMER_COND.await();
                    }
                    String product = WAREHOUSE.poll();
                    System.out.println(Thread.currentThread().getName() + " 消费了:" + product + ",仓库容量:" + WAREHOUSE.size());
                    // 精准唤醒生产者线程
                    PRODUCER_COND.signalAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    LOCK.unlock();
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) {
        new Producer().start();
        new Consumer().start();
    }
}

3. Condition 与 wait/notify 的核心对比

特性

wait/notify

Condition

锁依赖

依赖synchronized

依赖Lock

等待队列

一个对象只有一个等待队列

一个Lock可以有多个等待队列

唤醒方式

随机唤醒一个或全部唤醒,无法精准唤醒

可以精准唤醒指定条件队列的线程

灵活性

较低

较高,支持更复杂的线程协作场景

使用方式

直接调用对象的方法,需在同步代码块中

通过Lock创建,需在lock/unlock之间使用

五、实用并发工具类

除了基础的等待唤醒机制,JUC 包还提供了一些高级工具类,简化多线程通信与同步的实现。

1. CountDownLatch:倒计时门闩,等待多个线程完成任务

核心作用:让一个或多个线程等待其他多个线程完成任务后,再继续执行。

原理:初始化一个计数器,线程完成任务后调用 countDown() 方法让计数器减 1,等待线程调用 await() 方法阻塞,直到计数器变为 0。

适用场景:主线程等待多个子线程初始化完成后,再执行后续逻辑。

代码案例:CountDownLatch 实现线程初始化等待

import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
    // 计数器:需要3个线程初始化完成
    private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(3);
    static class InitThread extends Thread {
        private final String threadName;
        public InitThread(String threadName) {
            this.threadName = threadName;
        }
        @Override
        public void run() {
            System.out.println(threadName + " 开始初始化...");
            try {
                // 模拟初始化耗时
                Thread.sleep((long) (Math.random() * 1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(threadName + " 初始化完成!");
            // 计数器减1
            COUNT_DOWN_LATCH.countDown();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        new InitThread("初始化线程1").start();
        new InitThread("初始化线程2").start();
        new InitThread("初始化线程3").start();
        System.out.println("主线程等待所有初始化线程完成...");
        // 主线程阻塞,直到计数器为0
        COUNT_DOWN_LATCH.await();
        System.out.println("所有线程初始化完成,主线程开始执行任务!");
    }
}

2. CyclicBarrier:循环屏障,让多个线程到达同一屏障点后再继续

核心作用:让多个线程相互等待,直到所有线程都到达指定的 “屏障点”,再一起继续执行。

原理:初始化屏障的线程数量,线程到达屏障点时调用 await() 方法阻塞,当所有线程都调用 await() 后,屏障打开,所有线程继续执行。

适用场景:多个线程需要协同完成任务,必须等所有线程都准备好后才能开始执行。

与 CountDownLatch 的区别: CyclicBarrier 可以重复使用(调用 reset() 重置屏障), CountDownLatch 的计数器只能使用一次。

代码案例:CyclicBarrier 实现多线程协同执行

import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
    // 屏障:3个线程到达后一起执行
    private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(3, () -> {
        System.out.println("所有线程已到达屏障点,开始执行任务!");
    });
    static class WorkerThread extends Thread {
        private final String threadName;
        public WorkerThread(String threadName) {
            this.threadName = threadName;
        }
        @Override
        public void run() {
            System.out.println(threadName + " 正在前往屏障点...");
            try {
                Thread.sleep((long) (Math.random() * 1000));
                System.out.println(threadName + " 到达屏障点,等待其他线程...");
                // 到达屏障点,等待其他线程
                CYCLIC_BARRIER.await();
                // 屏障打开后执行的任务
                System.out.println(threadName + " 执行任务ing...");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        new WorkerThread("工作线程1").start();
        new WorkerThread("工作线程2").start();
        new WorkerThread("工作线程3").start();
    }
}

3. Semaphore:信号量,控制同时访问资源的线程数量

核心作用:控制同时访问特定资源的线程数量,通过信号量的 “许可” 机制实现。

原理:初始化许可数量,线程访问资源前调用 acquire() 获取许可,访问完成后调用 release() 释放许可;若没有可用许可,线程会阻塞直到有许可被释放。

适用场景:限流场景,如限制同时连接数据库的线程数量、限制接口的并发访问量。

代码案例:Semaphore 实现接口限流

import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
    // 信号量:最多允许2个线程同时访问资源
    private static final Semaphore SEMAPHORE = new Semaphore(2);
    static class AccessThread extends Thread {
        private final String threadName;
        public AccessThread(String threadName) {
            this.threadName = threadName;
        }
        @Override
        public void run() {
            try {
                // 获取许可
                SEMAPHORE.acquire();
                System.out.println(threadName + " 获取到许可,开始访问资源...");
                Thread.sleep(1000);
                System.out.println(threadName + " 访问资源完成,释放许可!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 释放许可
                SEMAPHORE.release();
            }
        }
    }
    public static void main(String[] args) {
        // 5个线程竞争2个许可
        for (int i = 1; i <= 5; i++) {
            new AccessThread("访问线程" + i).start();
        }
    }
}

六、线程通信的核心总结

  • 基础场景:简单的生产者 - 消费者模型,使用 wait/notify 即可满足需求,注意在同步代码块中使用并通过 while 判断条件。

  • 进阶场景:需要精准唤醒指定线程组时,优先使用 Condition 接口,搭配 ReentrantLock 锁,灵活性更高。

  • 复杂场景:多线程同步等待(如初始化、协同执行)使用 CountDownLatch / CyclicBarrier ;限流场景使用 Semaphore 。

  • 核心原则:线程通信的本质是 状态共享与协同 ,无论使用哪种工具,都要保证共享状态的线程安全,避免出现数据错乱。

七、总结

        本文详细讲解了 Java 线程通信的多种方式,从基础的 wait/notify 机制,到进阶的 Condition 接口,再到实用的 JUC 工具类,每种方式都有其适用场景。掌握这些通信工具,能帮助我们更好地实现多线程的协同工作,解决复杂的并发问题。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐