手写阻塞队列的线程安全设计:这段 MyBlockingQueue 到底“安全”在哪?

这份 MyBlockingQueue 是典型的循环数组 + 阻塞语义实现:data[] 存元素,head/tail 控制出队/入队位置,size 记录当前元素个数。并发场景里,“线程安全”要同时满足两件事:

  • 数据结构不被写坏:任何时刻都要维持队列不变量:0 <= size <= capacityhead/tail 始终落在合法范围内,入队不会覆盖未消费元素,出队不会重复消费同一元素。
  • 阻塞语义正确:满了 put 必须等,空了 take 必须等;并且等待要能被可靠唤醒,唤醒后行为仍然正确。

下面按代码里的关键点逐个拆开讲:哪些地方做了线程安全处理、为什么必须这么做。


源代码:

class MyBlockingQueue{
    private String[] data = null;

    private int head = 0;
    private int tail = 0;
    private int size = 0;
    public MyBlockingQueue(int capacity){
        data = new String[capacity];
    }
    public void put(String elem) throws InterruptedException{
        synchronized (this){
           while(size >= data.length)
        {
            //这里用while不用if的原因跟下面一样
            //队列满了,需要阻塞
            //return;
            this.wait();
        }
        data[tail] = elem;
        tail++;
        if(tail >= data.length){
            tail = 0;
        }
        size++;
        this.notify();
    }}
    public String take() throws InterruptedException{
        synchronized (this) {
            while (size == 0) {
                //return null;
                this.wait();
                //用while不用if是为了多次验证当前这里的条件是否成立
                //wait唤醒之前跟之后都判定一次,主要目的在之后这一次
            }
            String ret = data[head];
            head++;
            if (head >= data.length) {
                head = 0;
            }
            size--;
            this.notify();
            return ret;
        }
    }
}

1)synchronized (this):把共享状态的读写变成互斥的“原子区间”

put()take() 的核心逻辑都包在:

synchronized (this) {
    ...
}

这一步是线程安全的“地基”。原因在于:入队/出队都不是单条指令,而是一串复合操作

  • 检查条件(满/空)
  • 写/读数组槽位
  • 移动 head/tail(并处理回环)
  • 修改 size
  • 唤醒等待线程

如果没有互斥,两个线程交错执行会出现非常具体的灾难:

  • 丢失更新:两个线程同时 size++size--,最终只生效一次
  • 覆盖数据:两个生产者同时写入同一个 tail 位置,覆盖彼此元素
  • 重复消费:两个消费者同时从同一个 head 位置取值
  • 指针错乱head/tail 推进时被打断,导致结构性破坏(队列不变量失真)

synchronized (this) 的意义就是:同一时刻只允许一个线程修改 data/head/tail/size 这组共享状态,把“复合操作”提升为一个对外不可分割的临界区,从而保证原子性与一致性。


2)wait():满/空时阻塞等待,并且关键地——释放锁

put 里的等待(队列满)

while (size >= data.length) {
    this.wait();
}

take 里的等待(队列空)

while (size == 0) {
    this.wait();
}

这里的线程安全点不只是“阻塞”,而是 wait() 的组合语义:

  • 当前线程进入等待状态
  • 释放当前持有的这把锁(this 的监视器锁)
  • 之后被唤醒时,会先重新竞争锁,拿到锁后才继续执行

释放锁这件事至关重要:
如果队列满时生产者不释放锁,消费者永远进不来做 take(),队列永远满;如果队列空时消费者不释放锁,生产者永远进不来做 put(),队列永远空。于是就变成“抱着锁睡觉”,全员卡死。

所以 wait() 是一种条件同步:不满足条件时把机会让出去,让别的线程进入临界区改变条件。

顺带一提:wait() 必须在持有该对象监视器锁的情况下调用(也就是必须在对应 synchronized 内),否则会直接抛异常。这里把 wait 放在同步块内是正确姿势。


3)while 不用 if:被唤醒 ≠ 条件已经满足

代码里两处都坚持用 while

  • putwhile (size >= data.length) wait()
  • takewhile (size == 0) wait()

这不是“风格问题”,而是正确性底线。

原因很现实:线程从 wait() 醒来后,并不是立刻继续执行,而是要重新抢锁。在重新抢锁、以及重新进入临界区之前,队列状态可能已经被其他线程改过了。于是“醒来时的世界”不一定还是“刚被唤醒时以为的世界”。

再说得更直接一点:

  • 可能发生虚假唤醒(spurious wakeup):线程莫名其妙醒了,但条件没变
  • 多线程场景下,可能被唤醒的那一刻条件短暂满足,抢到锁时又不满足了
  • notifyAll 场景下会唤醒一批线程,其中只有少数真的能继续,其余必须回去再等

因此正确模板永远是:条件谓词 + while 循环等待。醒来后再检查一次条件,条件不满足就继续 wait(),这样才能保证不变量不被破坏。


4)notify():状态改变后“叫醒”对方继续推进

put() 成功入队后:

size++;
this.notify();

take() 成功出队后:

size--;
this.notify();

这一块做的是“唤醒机制”:当队列状态发生变化(空→非空 或 满→非满),需要通知在该锁对象上等待的线程,否则可能出现:

  • 队列已经有元素了,消费者还一直睡着(死等)
  • 队列已经有空位了,生产者还一直睡着(死等)

并且 notify() 放在同步块内、且在状态修改之后调用,也是关键:

  • 必须先把 size/head/tail/data 变更落实,唤醒线程抢到锁后才能看到正确状态
  • 唤醒并不等于立刻放锁;只有退出 synchronized 后才会真正释放锁,让被唤醒线程继续

5)这份实现“安全”到什么程度?一个典型改进点:notify vs notifyAll

这份实现靠 synchronized + while(wait) + notify 已经能保证基本正确性,但在“多生产者 + 多消费者”更通用的场景里,有一个经常被拿来打磨的点:notifyAll() 替代 notify()

为什么 notify() 可能不够稳

notify() 只随机唤醒一个等待线程,而等待线程可能是两类:

  • 生产者在等“队列非满”
  • 消费者在等“队列非空”

如果现在发生的是“入队一次”(队列从空变为非空),理论上应该唤醒消费者;但 notify() 有可能恰好唤醒了某个生产者。生产者醒来后会因为 while (size >= capacity) 条件仍不满足而继续睡回去,这虽然不会破坏正确性(因为用了 while),但会造成更多无效唤醒与竞争,吞吐下降,甚至出现“体感卡顿”。

notifyAll() 的价值

notifyAll() 会唤醒所有在这把锁上等待的线程,让它们重新竞争锁;最终只有条件满足的线程能通过 while 检查进入临界区,其余会继续等待。由于 while 做了二次校验,正确性仍然稳。

把两处改成更通用的写法:

// put 成功后
this.notifyAll();

// take 成功后
this.notifyAll();

结论:notifyAll() 往往更“稳”,代价是可能唤醒更多线程产生额外上下文切换;notify() 更“省”,但在多生产者多消费者混合等待下更依赖运气。


6)额外的小工程点(不影响主线,但值得顺手提)

这些不属于“线程安全核心”,但属于“把实现打磨得更像标准库”的细节:

  • take() 取走元素后可以把槽位置 data[head] = null;,避免旧引用滞留(尤其泛型对象时更有意义)
  • 目前实现固定 String[],工程里更常写成 E[] 泛型队列
  • 只用一把锁(this)能正确工作,但更复杂的实现会把“非空/非满”拆成不同条件队列(用 ReentrantLock + Condition),减少无效唤醒,提高吞吐

总结:

这份阻塞队列之所以能在并发场景下保持正确,关键防线集中在四件事:

  1. 互斥synchronized (this) 把共享状态的复合修改变成原子临界区
  2. 条件同步:队列满/空时用 wait() 阻塞,并释放锁让对方改变条件
  3. 循环校验:用 while 重试条件,防止虚假唤醒与竞争下的状态漂移
  4. 唤醒推进:状态变化后 notify/notifyAll 叫醒等待线程继续工作流转
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐