生产者-消费者

synchronized /wait/notify 版

共享仓库

public class Warehouse {
    private final int MAX = 5;           // 仓库上限
    private int count = 0;               // 当前库存

    // 1️⃣ 生产
    public synchronized void produce() throws InterruptedException {
        while (count == MAX) {           // 满则等待
            wait();
        }
        count++;
        System.out.println(Thread.currentThread().getName() + " 生产 -> " + count);
        notifyAll();                     // 唤醒等待线程
    }

    // 2️⃣ 消费
    public synchronized void consume() throws InterruptedException {
        while (count == 0) {             // 空则等待
            wait();
        }
        System.out.println(Thread.currentThread().getName() + " 消费 <- " + count);
        count--;
        notifyAll();                     // 唤醒等待线程
    }
}

生产者 & 消费者线程

class Producer implements Runnable {
    private final Warehouse warehouse;
    Producer(Warehouse w) { this.warehouse = w; }

    @Override
    public void run() {
        try {
            while (true) {
                warehouse.produce();
                Thread.sleep(500);  // 模拟耗时
            }
        } catch (InterruptedException ignored) {}
    }
}

class Consumer implements Runnable {
    private final Warehouse warehouse;
    Consumer(Warehouse w) { this.warehouse = w; }

    @Override
    public void run() {
        try {
            while (true) {
                warehouse.consume();
                Thread.sleep(800);  // 模拟耗时
            }
        } catch (InterruptedException ignored) {}
    }
}

启动演示

public static void main(String[] args) {
    Warehouse warehouse = new Warehouse();
    new Thread(new Producer(warehouse), "Producer-1").start();
    new Thread(new Consumer(warehouse), "Consumer-1").start();
}

运行示例

Producer-1 生产 -> 1
Producer-1 生产 -> 2
Consumer-1 消费 <- 2
Producer-1 生产 -> 2
...
要点 说明
synchronized 保证仓库操作原子性
while 而非 if 防止虚假唤醒
notifyAll() 避免生产者/消费者互相饿死
  • if 替代 while → 假醒导致越界。

  • 忘记 notifyAll → 死锁。

阻塞队列 (BlockingQueue)

公共仓库(线程安全)

import java.util.concurrent.*;

public class PC {
    // 1. 仓库:容量 5 的阻塞队列
    private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

    /* ========= 2. 生产者 ========= */
    static class Producer implements Runnable {
        @Override
        public void run() {
            int i = 0;
            try {
                while (true) {
                    queue.put(++i);              // 满了自动阻塞
                    System.out.println("生产者 -> " + i);
                    Thread.sleep(500);           // 模拟生产耗时
                }
            } catch (InterruptedException ignored) {}
        }
    }

    /* ========= 3. 消费者 ========= */
    static class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    int val = queue.take();      // 空队列自动阻塞
                    System.out.println("消费者 <- " + val);
                    Thread.sleep(800);           // 模拟消费耗时
                }
            } catch (InterruptedException ignored) {}
        }
    }

    /* ========= 4. 启动演示 ========= */
    public static void main(String[] args) {
        new Thread(new Producer(), "Producer-1").start();
        new Thread(new Consumer(), "Consumer-1").start();
    }
}

运行效果

生产者 -> 1
生产者 -> 2
消费者 <- 1
生产者 -> 3
生产者 -> 4
生产者 -> 5
消费者 <- 2
类/方法 作用
BlockingQueue 自带 阻塞/唤醒 机制,无需手写 wait/notify
put() 队列满时自动阻塞生产者
take() 队列空时自动阻塞消费者
线程名 方便 jstack 定位

双检锁单例(DCL)

volatile 禁止指令重排

public final class Singleton {
    private static volatile Singleton INSTANCE;   // 必须 volatile

    private Singleton() {}

    public static Singleton getInstance() {
        if (INSTANCE == null) {                // ① 第一次检查
            synchronized (Singleton.class) {
                if (INSTANCE == null) {        // ② 第二次检查
                    INSTANCE = new Singleton(); // ③ 分配+初始化
                }
            }
        }
        return INSTANCE;
    }
}
  • 漏写 volatile → 可能拿到半初始化对象。

  • 反射 / 序列化破坏 → 加 readResolve() 或枚举单例。

volatile 停止线程(可见性)

public class Task implements Runnable {
    private volatile boolean running = true;   // 必须 volatile

    public void run() {
        while (running) {
            // do work
        }
    }
    public void cancel() { running = false; }
}
  • 去掉 volatile → 线程可能永远读不到 false

  • 误用 volatilei++ → 非原子,需 AtomicInteger

android 消息机制是生产者消费者模式

Android 消息机制(Looper/MessageQueue/Handler)本质上就是「生产者-消费者」

  • 生产者——任意线程通过 Handler.sendXxx()postXxx()Message/Runnable 投放到 MessageQueue

  • 消费者——目标线程的 Looper 无限循环从 MessageQueue 中取出消息并交由对应 Handler.dispatchMessage() 处理。

生产者-消费者术语 Android 对应组件
缓冲区(Buffer) MessageQueue
生产者线程 任意线程(主线程、子线程、网络线程……)
消费者线程 拥有 Looper 的线程(主线程、HandlerThread、自己调用 Looper.prepare() 的线程)
同步原语 synchronized(MessageQueue.this) + nativeWake()/nativePollOnce()
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐