Java自己手写阻塞队列的线程安全设计:这段 MyBlockingQueue到底在哪些地方体现了线程安全
这段代码实现了一个线程安全的阻塞队列 MyBlockingQueue,其线程安全设计主要体现在: 使用 synchronized 同步块确保对共享变量(data/head/tail/size)的原子操作 队列满/空时调用 wait() 阻塞并释放锁,避免死锁 使用 while 而非 if 检查条件,防止虚假唤醒导致状态不一致 状态变更后调用 notify() 唤醒等待线程 改进建议包括改用 not
手写阻塞队列的线程安全设计:这段 MyBlockingQueue 到底“安全”在哪?
这份 MyBlockingQueue 是典型的循环数组 + 阻塞语义实现:data[] 存元素,head/tail 控制出队/入队位置,size 记录当前元素个数。并发场景里,“线程安全”要同时满足两件事:
- 数据结构不被写坏:任何时刻都要维持队列不变量:
0 <= size <= capacity,head/tail始终落在合法范围内,入队不会覆盖未消费元素,出队不会重复消费同一元素。 - 阻塞语义正确:满了
put必须等,空了take必须等;并且等待要能被可靠唤醒,唤醒后行为仍然正确。
下面按代码里的关键点逐个拆开讲:哪些地方做了线程安全处理、为什么必须这么做。
源代码:
class MyBlockingQueue{
private String[] data = null;
private int head = 0;
private int tail = 0;
private int size = 0;
public MyBlockingQueue(int capacity){
data = new String[capacity];
}
public void put(String elem) throws InterruptedException{
synchronized (this){
while(size >= data.length)
{
//这里用while不用if的原因跟下面一样
//队列满了,需要阻塞
//return;
this.wait();
}
data[tail] = elem;
tail++;
if(tail >= data.length){
tail = 0;
}
size++;
this.notify();
}}
public String take() throws InterruptedException{
synchronized (this) {
while (size == 0) {
//return null;
this.wait();
//用while不用if是为了多次验证当前这里的条件是否成立
//wait唤醒之前跟之后都判定一次,主要目的在之后这一次
}
String ret = data[head];
head++;
if (head >= data.length) {
head = 0;
}
size--;
this.notify();
return ret;
}
}
}
1)synchronized (this):把共享状态的读写变成互斥的“原子区间”
put() 和 take() 的核心逻辑都包在:
synchronized (this) {
...
}
这一步是线程安全的“地基”。原因在于:入队/出队都不是单条指令,而是一串复合操作:
- 检查条件(满/空)
- 写/读数组槽位
- 移动
head/tail(并处理回环) - 修改
size - 唤醒等待线程
如果没有互斥,两个线程交错执行会出现非常具体的灾难:
- 丢失更新:两个线程同时
size++或size--,最终只生效一次 - 覆盖数据:两个生产者同时写入同一个
tail位置,覆盖彼此元素 - 重复消费:两个消费者同时从同一个
head位置取值 - 指针错乱:
head/tail推进时被打断,导致结构性破坏(队列不变量失真)
synchronized (this) 的意义就是:同一时刻只允许一个线程修改 data/head/tail/size 这组共享状态,把“复合操作”提升为一个对外不可分割的临界区,从而保证原子性与一致性。
2)wait():满/空时阻塞等待,并且关键地——释放锁
put 里的等待(队列满)
while (size >= data.length) {
this.wait();
}
take 里的等待(队列空)
while (size == 0) {
this.wait();
}
这里的线程安全点不只是“阻塞”,而是 wait() 的组合语义:
- 当前线程进入等待状态
- 释放当前持有的这把锁(this 的监视器锁)
- 之后被唤醒时,会先重新竞争锁,拿到锁后才继续执行
释放锁这件事至关重要:
如果队列满时生产者不释放锁,消费者永远进不来做 take(),队列永远满;如果队列空时消费者不释放锁,生产者永远进不来做 put(),队列永远空。于是就变成“抱着锁睡觉”,全员卡死。
所以 wait() 是一种条件同步:不满足条件时把机会让出去,让别的线程进入临界区改变条件。
顺带一提:
wait()必须在持有该对象监视器锁的情况下调用(也就是必须在对应synchronized内),否则会直接抛异常。这里把wait放在同步块内是正确姿势。
3)while 不用 if:被唤醒 ≠ 条件已经满足
代码里两处都坚持用 while:
put:while (size >= data.length) wait()take:while (size == 0) wait()
这不是“风格问题”,而是正确性底线。
原因很现实:线程从 wait() 醒来后,并不是立刻继续执行,而是要重新抢锁。在重新抢锁、以及重新进入临界区之前,队列状态可能已经被其他线程改过了。于是“醒来时的世界”不一定还是“刚被唤醒时以为的世界”。
再说得更直接一点:
- 可能发生虚假唤醒(spurious wakeup):线程莫名其妙醒了,但条件没变
- 多线程场景下,可能被唤醒的那一刻条件短暂满足,抢到锁时又不满足了
notifyAll场景下会唤醒一批线程,其中只有少数真的能继续,其余必须回去再等
因此正确模板永远是:条件谓词 + while 循环等待。醒来后再检查一次条件,条件不满足就继续 wait(),这样才能保证不变量不被破坏。
4)notify():状态改变后“叫醒”对方继续推进
在 put() 成功入队后:
size++;
this.notify();
在 take() 成功出队后:
size--;
this.notify();
这一块做的是“唤醒机制”:当队列状态发生变化(空→非空 或 满→非满),需要通知在该锁对象上等待的线程,否则可能出现:
- 队列已经有元素了,消费者还一直睡着(死等)
- 队列已经有空位了,生产者还一直睡着(死等)
并且 notify() 放在同步块内、且在状态修改之后调用,也是关键:
- 必须先把
size/head/tail/data变更落实,唤醒线程抢到锁后才能看到正确状态 - 唤醒并不等于立刻放锁;只有退出
synchronized后才会真正释放锁,让被唤醒线程继续
5)这份实现“安全”到什么程度?一个典型改进点:notify vs notifyAll
这份实现靠 synchronized + while(wait) + notify 已经能保证基本正确性,但在“多生产者 + 多消费者”更通用的场景里,有一个经常被拿来打磨的点:用 notifyAll() 替代 notify()。
为什么 notify() 可能不够稳
notify() 只随机唤醒一个等待线程,而等待线程可能是两类:
- 生产者在等“队列非满”
- 消费者在等“队列非空”
如果现在发生的是“入队一次”(队列从空变为非空),理论上应该唤醒消费者;但 notify() 有可能恰好唤醒了某个生产者。生产者醒来后会因为 while (size >= capacity) 条件仍不满足而继续睡回去,这虽然不会破坏正确性(因为用了 while),但会造成更多无效唤醒与竞争,吞吐下降,甚至出现“体感卡顿”。
notifyAll() 的价值
notifyAll() 会唤醒所有在这把锁上等待的线程,让它们重新竞争锁;最终只有条件满足的线程能通过 while 检查进入临界区,其余会继续等待。由于 while 做了二次校验,正确性仍然稳。
把两处改成更通用的写法:
// put 成功后
this.notifyAll();
// take 成功后
this.notifyAll();
结论:
notifyAll()往往更“稳”,代价是可能唤醒更多线程产生额外上下文切换;notify()更“省”,但在多生产者多消费者混合等待下更依赖运气。
6)额外的小工程点(不影响主线,但值得顺手提)
这些不属于“线程安全核心”,但属于“把实现打磨得更像标准库”的细节:
take()取走元素后可以把槽位置data[head] = null;,避免旧引用滞留(尤其泛型对象时更有意义)- 目前实现固定
String[],工程里更常写成E[]泛型队列 - 只用一把锁(
this)能正确工作,但更复杂的实现会把“非空/非满”拆成不同条件队列(用ReentrantLock + Condition),减少无效唤醒,提高吞吐
总结:
这份阻塞队列之所以能在并发场景下保持正确,关键防线集中在四件事:
- 互斥:
synchronized (this)把共享状态的复合修改变成原子临界区 - 条件同步:队列满/空时用
wait()阻塞,并释放锁让对方改变条件 - 循环校验:用
while重试条件,防止虚假唤醒与竞争下的状态漂移 - 唤醒推进:状态变化后
notify/notifyAll叫醒等待线程继续工作流转
更多推荐

所有评论(0)