深入AQS源码:解密Condition的await与signal
节点入队后,线程就从“等待条件”的状态,转变成了“等待获取锁”的状态,并通过。这会进入AQS标准的获取锁流程,线程会和其他所有正在排队获取锁的线程一样,在同步队列中竞争,直到成功获取之前保存的。的实现精髓在于它维护了两个队列:一个用于等待条件的条件队列,一个用于竞争锁的同步队列。,这个方法负责将等待的线程节点从条件队列转移到同步队列,这是整个唤醒机制的关键。这个方法会彻底释放当前线程持有的锁,无论
在Java并发编程中,ReentrantLock
配合Condition
,是我们替代synchronized
和wait/notify
的常用工具。它提供了更灵活的线程等待和唤醒机制。那么,当我们调用condition.await()
时,线程到底经历了什么?signal()
又是如何唤醒它的?要回答这些问题,就必须深入AQS(AbstractQueuedSynchronizer
)的源码,跟踪一个线程的全过程。
await()
的执行路径
当一个已经获取了锁的线程调用condition.await()
时,它会进入一段精心设计的等待旅程。
public final void await() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
// 1. 创建节点并加入条件队列
Node node = addConditionWaiter();
// 2. 释放锁(完全释放,包括重入次数)
int savedState = fullyRelease(node);
int interruptMode = 0;
// 3. 自旋等待:直到节点被移动到同步队列(通过signal)或线程被中断
while (!isOnSyncQueue(node)) {
LockSupport.park(this); // 挂起线程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break;
}
// 4. 重新竞争锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 5. 清理无效节点
if (node.nextWaiter != null) unlinkCancelledWaiters();
// 6. 处理中断
if (interruptMode != 0) reportInterruptAfterWait(interruptMode);
}
我们来分解一下这个过程。首先,代码会调用addConditionWaiter()
,将当前线程包装成一个Node
节点,并加入到条件队列中。
private Node addConditionWaiter() {
Node t = lastWaiter;
// 清除被取消的尾节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 将当前线程保存在Node中
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
// 队尾插入
t.nextWaiter = node;
// 更新lastWaiter
lastWaiter = node;
return node;
}
可以看到,addConditionWaiter
的逻辑就是将新节点追加到条件队列这个链表的末尾。这个条件队列由firstWaiter
和lastWaiter
指针维护,专门用来存放调用了await()
的线程。
节点入队后,线程必须释放它当前持有的锁,这由fullyRelease()
完成。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
// 成功释放同步状态
failed = false;
return savedState;
} else {
// 不成功释放同步状态抛出异常
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
这个方法会彻底释放当前线程持有的锁,无论重入了多少次,并将原始的同步状态savedState
保存下来,以便后续恢复。锁释放后,线程就进入while (!isOnSyncQueue(node))
循环,并通过LockSupport.park(this)
将自己挂起,进入WAITING
状态,静静等待唤醒信号。
signal()
的执行路径
唤醒过程由另一个持有锁的线程调用signal()
来触发。
public final void signal() {
// 1. 先检测当前线程是否已经获取lock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 2. 获取等待队列中第一个节点,之后的操作都是针对这个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
signal()
方法会先检查当前线程是否持有锁,然后获取条件队列的头节点,并调用doSignal()
。
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 1. 将头结点从等待队列中移除
first.nextWaiter = null;
// 2. while中transferForSignal方法对头结点做真正的处理
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
doSignal
的核心是循环调用transferForSignal
,这个方法负责将等待的线程节点从条件队列转移到同步队列,这是整个唤醒机制的关键。
final boolean transferForSignal(Node node) {
// 1. 更新状态为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 2. 将该节点移入到同步队列中去
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
transferForSignal
主要做了两件事:一是通过CAS将节点的waitStatus
从CONDITION
改为0;二是调用enq()
方法,将该节点追加到AQS同步队列的队尾。节点入队后,线程就从“等待条件”的状态,转变成了“等待获取锁”的状态,并通过LockSupport.unpark()
来唤醒。
await()
方法的返回
一旦在signal()
中unpark
被调用,原先在await()
中被挂起的线程就会醒来。此时,因为它所在的节点已经被移入了同步队列,while (!isOnSyncQueue(node))
的循环条件不再满足,循环退出。
接下来,线程会执行acquireQueued(node, savedState)
。这会进入AQS标准的获取锁流程,线程会和其他所有正在排队获取锁的线程一样,在同步队列中竞争,直到成功获取之前保存的savedState
数量的锁。
当acquireQueued()
成功返回,await()
方法才算执行完毕,在进行一些清理后,正式返回。
总结来看,Condition
的实现精髓在于它维护了两个队列:一个用于等待条件的条件队列,一个用于竞争锁的同步队列。await()
过程是“加入条件队列 -> 释放锁 -> 等待”,而signal()
过程则是“将节点从条件队列转移到同步队列”。一个线程必须先被signal()
,然后重新成功竞争到锁,才能最终从await()
方法中退出。
更多推荐
所有评论(0)