深入浅出Java Condition 的await和signal机制(二)
摘要:本文深入分析了Condition的await()方法实现机制。该方法会将当前线程封装为Node节点插入等待队列尾部,释放持有的锁,并通过LockSupport.park()使线程进入等待状态。当被signal/signalAll唤醒或中断时,线程会重新获取锁并从await()返回。文章详细阐述了等待队列的链式结构(不带头节点)、锁释放过程、线程唤醒条件等核心逻辑,并对比了带头节点和不带头节点
·
Condition 的 await 方法
当调用condition.await()方法后会使当前获取锁的线程进入到等待队列,如果该线程能够从 await() 方法返回的话,一定是该线程获取了与 Condition 相关联的锁。
前面讲过了,Condition 只是一个接口,它的实现类为 ConditionObject,是 AQS 的子类。
ConditionObject 的 await 方法源码如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1. 将当前线程包装成Node,尾插入到等待队列中
Node node = addConditionWaiter();
// 2. 释放当前线程所占用的lock,在释放的过程中会唤醒同步队列中的下一个节点
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 3. 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4. 自旋等待获取到同步状态(即获取到lock)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 5. 处理被中断的情况
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
代码的主要逻辑请看注释。当前线程调用condition.await()方法后,会释放 lock 然后加入到等待队列,直到被 signal/signalAll 方法唤醒。
怎样将当前线程添加到等待队列?
调用 addConditionWaiter 方法会将当前线程添加到等待队列中,源码如下:
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
//将不处于等待状态的节点从等待队列中移除
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//尾节点为空
if (t == null)
//将首节点指向node
firstWaiter = node;
else
//将尾节点的nextWaiter指向node节点
t.nextWaiter = node;
//尾节点指向node
lastWaiter = node;
return node;
}
首先将 t 指向尾节点,如果尾节点不为空并且它的waitStatus!=-2(-2 为 CONDITION,表示正在等待 Condition 条件),则将不处于等待状态的节点从等待队列中移除,并且将 t 指向新的尾节点。然后将当前线程封装成 waitStatus 为-2 的节点追加到等待队列尾部。如果尾节点为空,则表明队列为空,将首尾节点都指向当前节点。
如果尾节点不为空,表明队列中有其他节点,则将当前尾节点的 nextWaiter 指向当前节点,将当前节点置为尾节点。
简单总结一下,这段代码的作用就是通过尾插入的方式将当前线程封装的 Node 插入到等待队列中,同时可以看出,Condtion 的等待队列是一个不带头节点的链式队列,不带头节点是指在链表数据结构中,链表的第一个节点就是实际存储的第一个数据元素,而不是一个特定的"头"节点,该节点不包含实际的数据。
1)不带头节点的链表:
- 链表的第一个节点就是第一个实际的数据节点。
- 当链表为空时,头引用(通常称为 head)指向 null。
2)带头节点的链表:
- 链表有一个特殊的节点作为链表的开头,这个特殊的节点称为头节点。
- 头节点通常不存储任何实际数据,或者它的数据字段不被使用。
- 无论链表是否为空,头节点总是存在的。当链表为空时,头节点的下一个节点指向 null。
- 使用头节点可以简化某些链表操作,因为不必特殊处理第一个元素的插入和删除。
1)不带头节点的链表
public class Node {
public int data;
public Node next;
public Node(int data) {
this.data = data;
this.next = null;
}
}
public class LinkedListWithoutHead {
public Node head;
public void insert(int value) {
Node newNode = new Node(value);
if (head == null) {
head = newNode;
} else {
Node temp = head;
while (temp.next != null) {
temp = temp.next;
}
temp.next = newNode;
}
}
}
2)带头节点的链表
public class NodeWithHead {
public int data;
public NodeWithHead next;
public NodeWithHead(int data) {
this.data = data;
this.next = null;
}
}
public class LinkedListWithHead {
private NodeWithHead head;
public LinkedListWithHead() {
head = new NodeWithHead(-1); // 初始化头节点
}
public void insert(int value) {
NodeWithHead newNode = new NodeWithHead(value);
NodeWithHead temp = head;
while (temp.next != null) {
temp = temp.next;
}
temp.next = newNode;
}
}
释放锁的过程
将当前节点插入到等待对列之后,会使当前线程释放 lock,由 fullyRelease 方法实现,源码如下:
final int fullyRelease(Node node) {
//释放锁失败为true,释放锁成功为false
boolean failed = true;
try {
//获取当前锁的state
int savedState = getState();
//释放锁成功的话
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
//释放锁失败的话将节点状态置为取消
node.waitStatus = Node.CANCELLED;
}
}
这段代码也很容易理解,调用 AQS 的模板方法 release 释放 AQS 的同步状态并且唤醒在同步队列中头节点的后继节点引用的线程,如果释放成功则正常返回,若失败的话就抛出异常。
怎么从await方法中退出
现在回过头再来看 await 方法,其中有这样一段逻辑:
while (!isOnSyncQueue(node)) {
// 3. 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
isOnSyncQueue 方法用于判断当前线程所在的 Node 是否在同步队列中。
如果当前节点的 waitStatus=-2,说明它在等待队列中,返回 false;如果当前节点有前驱节点,则证明它在 AQS 队列中,但是前驱节点为空,说明它是头节点,而头节点是不参与锁竞争的,也返回 false。如果当前节点既不在等待队列中,又不是 AQS 中的头节点且存在 next 节点,说明它存在于 AQS 中,直接返回 true。看一下同步队列与等待队列的关系图:
当线程第一次调用 condition.await 方法时,会进入到这个 while 循环,然后通过 LockSupport.park(this) 使当前线程进入等待状态,那么要想退出 await,第一个前提条件就是要先退出这个 while 循环,出口就只两个地方:
- 走到 break 退出 while 循环;
- while 循环中的逻辑判断为 false。
出现第 1 种情况的条件是,当前等待的线程被中断后代码会走到 break 退出,第 2 种情况是当前节点被移动到了同步队列中(即另外一个线程调用了 condition 的 signal 或者 signalAll 方法),while 中逻辑判断为 false 后结束 while 循环。
总结一下,退出 await 方法的前提条件是当前线程被中断或者调用 condition.signal 或者 condition.signalAll 使当前节点移动到同步队列后。
当退出 while 循环后会调用acquireQueued(node, savedState),该方法的作用是在自旋过程中线程不断尝试获取同步状态,直到成功(线程获取到 lock)。这样也说明了退出 await 方法必须是已经获得了 condition 引用(关联)的 lock。await 方法示意图如下:
如图,调用 condition.await 方法的线程必须是已经获得了 lock 的线程,也就是当前线程是同步队列中的头节点。调用该方法后会使得当前线程所封装的 Node 尾插入到等待队列中。
超时机制的支持condition 还额外支持超时机制,使用者可调用 awaitNanos、awaitUtil 这两个方法,实现原理基本上与 AQS 中的 tryAcquire 方法如出一辙。不响应中断的支持要想不响应中断可以调用 condition.awaitUninterruptibly() 方法,该方法的源码如下:
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
这段方法与上面的 await 方法基本一致,只不过减少了对中断的处理。
更多推荐

所有评论(0)