线程之间除了竞争共享资源之外,也可以进行通信协作。本小节介绍 Java 中提供的一个用于实现线程之间的通信的小巧 API,通过使用这个 API,一个线程会等待某个条件的存在(这是继续执行的必要前提),之后,另一个线程会创建该条件,并通知等待中的线程。

24.1 等待-通知 API

java.lang.Object 类提供了 “等待-通知” API,该类包含三个 wait() 方法、一个 notify() 方法以及一个 notifyAll() 方法。wait() 方法用于等待某个条件的存在;而 notify() 和 notifyAll() 方法则在条件存在时通知处于等待状态的线程,以下是方法的描述:

  • final void wait() throws InterruptedException:使当前线程等待,直至另一线程调用此对象的 notify() 或 notifyAll() 方法,或者直至其他线程在等待期间中断当前线程
  • final void wait(long timeoutMillis) throws InterruptedException:使当前线程等待,直到有其他线程调用此对象的 notify() 或 notifyAll() 方法,或者等待指定的毫秒数(由 timeoutMillis参数标识)的时间过去,或者有其他线程在等待期间中断当前线程。当 timeoutMillis为负值时,此方法会抛出 java.lang.IllegalArgumentException 异常
  • final void wait(long timeoutMillis, int nanos):使当前线程等待,直到有其他线程调用此对象的 notify() 或 notifyAll() 方法,或者等待指定的毫秒数(由 timeoutMillis 参数标识)加上纳秒数(由 nanos 参数标识)的时间过去,或者有其他线程在等待期间中断当前线程。当 timeoutMillis为负值、nanos 为负值或 nanos 大于 999999 时,此方法会抛出 IllegalArgumentException 异常
  • final native void notify():唤醒一个正在等待此对象监视器的单个线程。如果存在任何线程正在等待此对象,则会选择其中一个线程进行唤醒。选择是随意的,由实现决定。被唤醒的线程在当前线程释放对这个对象的锁之前无法继续执行。被唤醒的线程将以通常的方式与可能正在积极竞争对该对象进行同步的任何其他线程竞争
  • final native void notifyAll():唤醒所有正在等待此对象监视器的线程。被唤醒的线程在当前线程释放对这个对象的锁之前无法继续执行。这些被唤醒的线程将以常规方式与其他可能正在积极竞争对该对象进行同步的线程竞争

调用Object对象的wait()方法前,当前线程必须要拥有指定对象的监视器锁;调用wait()方法后,会释放对象的监视器锁,然后当前线程进入WAITING状态。直到另外一个线程调用该对象的notify()或notifyAll()方法,处于WAITING状态的线程才重新转为RUNNABLE状态。

这三个 wait() 方法在任何线程在当前线程等待通知之前或期间中断当前线程时都会抛出 java.lang.InterruptedException 异常。当抛出此异常时,当前线程的中断状态会被清除。

等待-通知 API 利用了一个对象的条件队列,这是一个用于存储等待特定条件成立的线程的数据结构。这些等待中的线程被称为等待集。由于条件队列与对象的锁紧密相关,因此这五个方法必须在同步上下文中调用(当前线程必须是对象监视器的所有者);否则,会抛出 java.lang.IllegalMonitorStateException 异常。

下面是一个简单的使用示例:

/**
 * 等待通知API使用
 *
 * @author 老谭
 */
public class WaitNotifyDemo {
    private static Object obj = new Object();

    static void main() throws InterruptedException {
        new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                System.out.println(Thread.currentThread().getName() + ":i=" + i);
                if (i == 5) {
                    synchronized (obj) {
                        System.out.println(Thread.currentThread().getName() + "等待...");
                        try {
                            obj.wait();
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
            }
        }).start();
        Thread.sleep(1000);
        new Thread(() -> {
            synchronized (obj) {
                System.out.println(Thread.currentThread().getName() + "发送通知");
                obj.notify();
            }
        }).start();
    }
}

代码第7行声明的对象 obj 作为 synchronized 锁的对象,同时也是 wait()/notify() 的调用者(必须是同一个对象)。

在上面的代码中,主线程常见并启动了两个线程:

第一个子线程,其中进入 for 循环,输出 i 的值(1~10);当 i=5 时,进入 synchronized (obj) 代码块:必须先获取 obj 的锁(这是 wait() 的硬性要求:调用 wait() 必须持有该对象的锁);执行 obj.wait()时,发生两件关键事:

  • 释放当前持有的 obj 锁(让其他线程有机会获取锁,否则其他线程永远拿不到锁,通知无法发送)
  • 当前线程进入 obj 的「等待队列」,阻塞暂停,不再执行后续代码

只有当其他线程调用 obj.notify()/notifyAll() 并释放锁后,等待的线程才可能被唤醒,重新获取 obj 锁,然后从 wait() 方法之后继续执行(继续循环输出 6~10)。

第二个子线程,启动后,首先尝试获取 obj 的锁(此时第一个线程已经释放了锁,所以能成功获取);

  • 执行 obj.notify():从 obj 的「等待队列」中唤醒一个等待的线程(这里只有两个线程,所以唤醒第一个线程);

    注意:notify() 只是唤醒线程,不会立即释放锁!锁会在 synchronized 代码块执行完毕后才释放;

  • 同步代码块结束,第二个线程释放 obj 锁;

  • 被唤醒的第一个线程重新尝试获取 obj 锁,获取成功后,从 wait() 之后继续执行循环。

如果应用程序中只有两个线程,并且其中一个线程偶尔会等待并需要由另一个线程通知,使用 notify() 方法。否则,使用 notifyAll() 方法。

24.2 生产者-消费者模型

在多线程开发中,如果生产者生产数据的速度很快,而消费者消费数据的速度很慢,那么生产者就必须等待消费者消费完了数据才能够继续生产数据,因为生产那么多也没有地方放啊;同理如果消费者的速度大于生产者那么消费者就会经常处理等待状态,所以为了达到生产者和消费者生产数据和消费数据之间的平衡,那么就需要一个缓冲区用来存储生产者生产的数据,所以就引入了生产者-消费者模型。

简单来说这里的缓冲区的作用就是为了平衡生产者和消费者的处理能力,起到一个数据缓存的作用,同时也达到了一个解耦的作用。

生产者 - 消费者模型是多线程协作的经典场景,核心是生产者线程生产数据,消费者线程消费数据,通过等待 - 通知机制解决数据空时消费者等待、数据满时生产者等待的问题,同时保证线程安全。

下面我使用 synchronized 锁 + wait()/notifyAll() 来实现此模型。

以下的例子中,我首先声明一个公共共享资源类,其中维护一个属性num,就是固定大小的缓冲区(0-1之间),用于表示资源的数量,其值默认为0。生产者用于生产后,其值增加1,然后不能继续生产,得等待消费者先消费掉。同样,消费者用于消费后,其值减少1,然后不能继续消费,得等待生产者先生产。

/**
 * 共享资源
 *
 * @author 老谭
 */
public class Resource {
    private int num;

    public Resource(int num) {
        this.num = num;
    }

    public int getNum() {
        return this.num;
    }

    public synchronized void increase() throws InterruptedException {
        // 循环判断 num 是否为 1(已满)
        while (num == 1) {
            wait();   // 已满则等待消费者线程进行消费
        }
        Thread.sleep(300);
        num++;  // 生产
        System.out.println(num);
        notifyAll(); // 通知其他线程继续运行
    }

    public synchronized void decrease() throws InterruptedException {
        // 循环判断 num 是否为 0(已空)
        while (num == 0) {
            wait();  // 已空则等待生产者线程进行生产
        }
        Thread.sleep(500);
        num--;  // 消费
        System.out.println(num);
        notifyAll();  // 通知其他线程继续运行
    }
}

上面的代码中,无论是增加 num 亦或是减少 num,都是使用 while 进行循环判断,这里很容易直接使用 if;以增加 num为例,如果使用 if 可能造成的问题是等待被唤醒之后,继续生产(num++),但其他线程已经成功生产那么 num 就被连续自增两次了,这种情况称为伪唤醒。

生产者线程用于增加资源数量:

/**
 * 生产者线程
 *
 * @author 老谭
 */
public class ProducerThread extends Thread {
    private final Resource resource;

    public ProducerThread(Resource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        try {
            resource.increase();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

消费者线程用于减少资源数量:

/**
 * 消费者线程
 *
 * @author 老谭
 */
public class ConsumerThread extends Thread {
    private final Resource resource;

    public ConsumerThread(Resource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        try {
            resource.decrease();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

测试代码如下,创建与启动若干生产者线程与消费者线程,它们的数量可以不相等:

/**
 * 生产者消费者测试
 *
 * @author 老谭
 */
public class ProducerConsumerTest {
    static void main() {
        Resource resource = new Resource(0);
        for (int i = 0; i < 50; i++) {
            new ProducerThread(resource).start();
        }
        for (int i = 0; i < 50; i++) {
            new ConsumerThread(resource).start();
        }
    }
}

运行之后的结果如下,可以看到 num 的值永远是 1 和 0 交替出现:

1
0
1
0
1
0
1
0
......

对于上述资源类中的num,可以在实际开发中使用更复杂的数据结构,如数组,队列等。

24.3 小结

本小节介绍了Java中的线程通信机制—等待-通知API,主要包括Object类提供的wait()、notify()和notifyAll()方法的使用原理。这些方法必须在同步上下文中调用,通过条件队列实现线程间的协作。文章以代码示例展示了等待-通知机制的基本用法,并详细分析了线程状态转换过程。接着阐述了生产者-消费者模型,通过共享资源类和synchronized锁实现线程间生产与消费的平衡,强调使用while循环避免伪唤醒问题,给出了具体的实现代码。该机制能有效解决多线程环境下的资源同步和协作问题。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐