Day47 | J.U.C中AQS的完全指南(补充)
本文深入解析AQS的条件队列与中断响应机制。条件队列部分详细剖析了await()的三步流程:释放锁入队、循环等待检查状态、重新获取锁;以及signal()如何将节点从条件队列转移到同步队列。中断机制部分重点解读了acquireInterruptibly()的实现原理,包括中断检测点和取消获取锁的善后处理。全文通过源码分析揭示了AQS如何实现线程在同步队列与条件队列间的状态转换,以及不同中断响应方式
Day45我们讲了AQS的静态结构,了解了AQS的核心组件。
Day46我们深入剖析了AQS的动态流程,根据源码追踪了独占锁的获取和释放。
基于之前的两篇文章,AQS的主干同步机制,我们应该有了一个大概的认知。
本文补充讲一下条件队列和中断响应机制。
先列出几个问题,看完本文再回过头来想这几个问题:
一个线程调用await()的时候,AQS内部是怎么实现原子性释放锁并让线程进入等待的?
另一个线程调用signal()的时候,又是怎么精确唤醒在条件队列中等待的线程?
一个等待节点是怎么在同步队列和条件队列之间转换的?
lock()和lockInterruptibly()的区别在哪里?AQS怎么响应一个正在排队的线程被中断的事件?
一、条件队列
在Day45中,我们模拟了一个仓库模型BoundedBuffer。生产者在发现仓库满了之后,会调用notFull.await()等待。
消费者发现仓库空了会调用notEmpty.await()等待。
这个await和signal的背后,其实就是ConditionObject在驱动。
1、await:从同步队列到条件队列
当一个线程,比如一个生产者P1发现仓库满了,调用notFull.await(),这个线程就会经历挂起的过程。
当然前提条件是调用await()的线程必须已经持有与这个Condition绑定的锁(在这里是ReentrantLock)。
整个await的核心过程大概分3步:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
ConditionNode node = new ConditionNode();
long savedState = enableWait(node); //步骤1:入队并释放锁
// ...
//步骤2:循环等待
while (!canReacquire(node)) {
// ... 阻塞逻辑 ...
}
// ...
//步骤3:重新获取锁
acquire(node, savedState, false, false, false, 0L);
// ...
}
1)入队并释放锁
这是await的第一步,也是最核心的一步,干了三件事:
private long enableWait(ConditionNode node) {
if (isHeldExclusively()) {
node.waiter = Thread.currentThread();
node.setStatusRelaxed(COND | WAITING); // 设置状态为 3 (2 | 1)
// ...把node加入条件队列的队尾 ...
long savedState = getState();
if (release(savedState)) //完全释放锁
return savedState;
}
// ...
}
把节点的状态改成了3。COND和WAITING分别表示在条件队列里,正在等待。

第二件事就是把新节点添加到ConditionObject内部维护的单向链表末尾。

第三件事就是调用ASQ的release方法,完全释放当前线程持有的锁。
干完这三件事(enableWait结束),线程P1就完全释放了锁,而且他的信息也已经封装成了一个状态是3的ConditionNode,被放到条件队列里了。
2)循环等待
这个时候线程P1进入了等待循环。这个循环的退出条件是canReacquire(node),他会不停的检查node.prev != null来判断节点是不是已经被转移到了同步队列。
只有在同步队列里,节点才会有prev指针。

while (!canReacquire(node)) {
if (interrupted |= Thread.interrupted()) { // 1处理中断
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break; // else interrupted after signal
} else if ((node.status & COND) != 0) { // 2检查状态,阻塞
try {
if (rejected)
node.block();
else
ForkJoinPool.managedBlock(node);
} catch (RejectedExecutionException ex) {
rejected = true;
} catch (InterruptedException ie) {
interrupted = true;
}
} else // 3自旋等待
Thread.onSpinWait(); // awoke while enqueuing
}
1处理中断:如果线程在阻塞的时候被中断,Thread.interrupted()就是true。node.getAndUnsetStatus(COND)就会把状态从3改成1(原子操作)。如果移除成功了,就说明是在signal发生前被中断的,cancelled就会变成true,循环退出,线程就放弃等待了。
2查状态和阻塞:只要(node.status &COND)!=0,就意味着节点的节点的状态还是2,说明还没被signal。线程会通过ForkJoinPool.managedBlock(node)进入阻塞状态。(底层调用LockSupport.park())
3自旋等待:这个主要是为了处理边界情况。如果线程醒来后,发现COND状态位已经没了,但是canReacquire 还不为true(意味着节点正在被转移到同步队列,但还没完成),这种情况就需要线程进行短暂的自旋,而不是马上重新park,提高点效率。
3)重新获取锁
当signal发生,P1节点的COND位被移除,节点被成功转移到同步队列后,while循环就会退出。
这个时候,P1已经从一个等待条件的线程变回了等待锁的线程。
然后调用acquire(node, savedState, ...)。
这个流程就跟我们上一篇文章分析的完全一样了:P1线程会进入AQS的主同步队列排队,遵循FIFO原则,等待轮到自己的时候,被前驱节点唤醒,最后重新获取到锁。
savedState参数会保证他恢复到等待前正确的锁重入次数。
2、signal:从条件队列到同步队列
当另一个线程,比如消费者C1调用signal()的时候,他的任务就是把条件队列中最老的等待者(队头节点)叫醒,让后把他送到同步队列的门口去排队。
public final void signal() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
doSignal(first, false);
}
private void doSignal(ConditionNode first, boolean all) {
while (first != null) {
ConditionNode next = first.nextWaiter;
if ((firstWaiter = next) == null)
lastWaiter = null;
if ((first.getAndUnsetStatus(COND) & COND) != 0) {
enqueue(first);
if (!all)
break;
}
first = next;
}
}
如果firstWaiter不是空,就唤醒一个。doSignal的第二个参数表示是全部唤醒还是唤醒一个。
doSignal中getAndUnsetStatus(COND)是唤醒的第一步,原子移除节点的COND状态。
节点的状态从3(COND | WAITING)变成1(WAITING)。说明这个节点就不再属于条件队列了。
enqueue(first)是最关键的转移操作,这个方法会把这个状态变成1的节点,添加到AQS同步队列的队尾。
到这儿,signal就完成了。你会发现他并没有直接唤醒线程,更多的是扮演着一个调度员的角色。
把await的线程节点从条件等待区搬到锁等待区。这个线程的真正唤醒,是靠他在同步队列里的前驱节点在释放锁的时候完成的。
二、中断响应机制
关于中断响应,我们直接看acquireInterruptibly方法:
public final void acquireInterruptibly(long arg)
throws InterruptedException {
if (Thread.interrupted() ||
(!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0))
throw new InterruptedException();
}
lockInterruptibly() 会调用这个方法。核心就在他调用了内部的acquire方法,然后把interruptible参数设成了 true。
1、中断检测点
在acquire里面有这样一段代码:
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
如果一个线程因为调用lockInterruptibly而在队列里park时,他如果被interrupt了,park就会返回。
Thread.interrupted()会返回true,interruptible本身设置是true。&&两边都是true,循环就退出了。线程就不会继续尝试获取锁了。
2、中断后干什么
当循环因为中断break之后,acquire方法里面会调用cancelAcquire。
cancelAcquire方法里面做的事情就是中断线程的善后工作。
private int cancelAcquire(Node node, boolean interrupted,
boolean interruptible) {
if (node != null) {
node.waiter = null;
node.status = CANCELLED;
if (node.prev != null)
cleanQueue();
}
if (interrupted) {
if (interruptible)
return CANCELLED;
else
Thread.currentThread().interrupt();
}
return 0;
}
把节点的状态修改成CANCELLED。
然后通过cleanQueue清理队列,清理方法有点复杂,大致的逻辑就是遍历队列,把所有CANCELLED状态的节点从双向链表里移除,剩下的节点调整前后驱指针,保证队列的完整性和准确性。
最后返回CANCELLED,acquireInterruptibly接收到这个返回值后,就会往外抛InterruptedException。
结语
本文通过AQS源码剖析了AQS中的条件队列和中断响应机制。
算是不补齐了AQS的最后一块内容。
AQS系列三篇文章,从静态结构到动态流程。
其实不难发现,AQS的设计就是以一个原子状态为核心,围绕一个等待队列,通过模板方法模式把通用并发控制逻辑和具体同步语义解耦。
了解了AQS的设计理念,对于ReentrantLock、Semaphore或CountDownLatch这些基于AQS的同步工具,在使用和分析上,都会有巨大的帮助。
下一篇预告
Day48 | J.U.C集合-ConcurrentHashMap
如果你觉得这系列文章对你有帮助,欢迎关注专栏,我们一起坚持下去!
更多推荐



所有评论(0)