在Java并发编程中,ReentrantLock配合Condition,是我们替代synchronizedwait/notify的常用工具。它提供了更灵活的线程等待和唤醒机制。那么,当我们调用condition.await()时,线程到底经历了什么?signal()又是如何唤醒它的?要回答这些问题,就必须深入AQS(AbstractQueuedSynchronizer)的源码,跟踪一个线程的全过程。

await()的执行路径

当一个已经获取了锁的线程调用condition.await()时,它会进入一段精心设计的等待旅程。

public final void await() throws InterruptedException {
    if (Thread.interrupted()) throw new InterruptedException();
    // 1. 创建节点并加入条件队列
    Node node = addConditionWaiter();
    // 2. 释放锁(完全释放,包括重入次数)
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 3. 自旋等待:直到节点被移动到同步队列(通过signal)或线程被中断
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 挂起线程
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break;
    }
    // 4. 重新竞争锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 5. 清理无效节点
    if (node.nextWaiter != null) unlinkCancelledWaiters();
    // 6. 处理中断
    if (interruptMode != 0) reportInterruptAfterWait(interruptMode);
}

我们来分解一下这个过程。首先,代码会调用addConditionWaiter(),将当前线程包装成一个Node节点,并加入到条件队列中。

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 清除被取消的尾节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 将当前线程保存在Node中
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        // 队尾插入
        t.nextWaiter = node;
    // 更新lastWaiter
    lastWaiter = node;
    return node;
}

可以看到,addConditionWaiter的逻辑就是将新节点追加到条件队列这个链表的末尾。这个条件队列由firstWaiterlastWaiter指针维护,专门用来存放调用了await()的线程。

节点入队后,线程必须释放它当前持有的锁,这由fullyRelease()完成。

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            // 成功释放同步状态
            failed = false;
            return savedState;
        } else {
            // 不成功释放同步状态抛出异常
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

这个方法会彻底释放当前线程持有的锁,无论重入了多少次,并将原始的同步状态savedState保存下来,以便后续恢复。锁释放后,线程就进入while (!isOnSyncQueue(node))循环,并通过LockSupport.park(this)将自己挂起,进入WAITING状态,静静等待唤醒信号。

signal()的执行路径

唤醒过程由另一个持有锁的线程调用signal()来触发。

public final void signal() {
    // 1. 先检测当前线程是否已经获取lock
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 2. 获取等待队列中第一个节点,之后的操作都是针对这个节点
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

signal()方法会先检查当前线程是否持有锁,然后获取条件队列的头节点,并调用doSignal()

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 1. 将头结点从等待队列中移除
        first.nextWaiter = null;
        // 2. while中transferForSignal方法对头结点做真正的处理
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

doSignal的核心是循环调用transferForSignal,这个方法负责将等待的线程节点从条件队列转移到同步队列,这是整个唤醒机制的关键。

final boolean transferForSignal(Node node) {
    // 1. 更新状态为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    // 2. 将该节点移入到同步队列中去
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

transferForSignal主要做了两件事:一是通过CAS将节点的waitStatusCONDITION改为0;二是调用enq()方法,将该节点追加到AQS同步队列的队尾。节点入队后,线程就从“等待条件”的状态,转变成了“等待获取锁”的状态,并通过LockSupport.unpark()来唤醒。

await()方法的返回

一旦在signal()unpark被调用,原先在await()中被挂起的线程就会醒来。此时,因为它所在的节点已经被移入了同步队列,while (!isOnSyncQueue(node))的循环条件不再满足,循环退出。

接下来,线程会执行acquireQueued(node, savedState)。这会进入AQS标准的获取锁流程,线程会和其他所有正在排队获取锁的线程一样,在同步队列中竞争,直到成功获取之前保存的savedState数量的锁。

acquireQueued()成功返回,await()方法才算执行完毕,在进行一些清理后,正式返回。

总结来看,Condition的实现精髓在于它维护了两个队列:一个用于等待条件的条件队列,一个用于竞争锁的同步队列。await()过程是“加入条件队列 -> 释放锁 -> 等待”,而signal()过程则是“将节点从条件队列转移到同步队列”。一个线程必须先被signal(),然后重新成功竞争到锁,才能最终从await()方法中退出。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐