Day45我们讲了AQS的静态结构,了解了AQS的核心组件。

Day46我们深入剖析了AQS的动态流程,根据源码追踪了独占锁的获取和释放。

基于之前的两篇文章,AQS的主干同步机制,我们应该有了一个大概的认知。

本文补充讲一下条件队列和中断响应机制。

先列出几个问题,看完本文再回过头来想这几个问题:

一个线程调用await()的时候,AQS内部是怎么实现原子性释放锁并让线程进入等待的?

另一个线程调用signal()的时候,又是怎么精确唤醒在条件队列中等待的线程?

一个等待节点是怎么在同步队列和条件队列之间转换的?

lock()和lockInterruptibly()的区别在哪里?AQS怎么响应一个正在排队的线程被中断的事件?

一、条件队列

在Day45中,我们模拟了一个仓库模型BoundedBuffer。生产者在发现仓库满了之后,会调用notFull.await()等待。

消费者发现仓库空了会调用notEmpty.await()等待。

这个await和signal的背后,其实就是ConditionObject在驱动。

1、await:从同步队列到条件队列

当一个线程,比如一个生产者P1发现仓库满了,调用notFull.await(),这个线程就会经历挂起的过程。

当然前提条件是调用await()的线程必须已经持有与这个Condition绑定的锁(在这里是ReentrantLock)。

整个await的核心过程大概分3步:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    ConditionNode node = new ConditionNode();
    long savedState = enableWait(node); //步骤1:入队并释放锁
    // ...
    //步骤2:循环等待
    while (!canReacquire(node)) {
        // ... 阻塞逻辑 ...
    }
    // ...
    //步骤3:重新获取锁
    acquire(node, savedState, false, false, false, 0L);
    // ...
}

1)入队并释放锁

这是await的第一步,也是最核心的一步,干了三件事:

private long enableWait(ConditionNode node) {
    if (isHeldExclusively()) {
        node.waiter = Thread.currentThread();
        node.setStatusRelaxed(COND | WAITING); // 设置状态为 3 (2 | 1)
        // ...把node加入条件队列的队尾 ...
        long savedState = getState();
        if (release(savedState)) //完全释放锁
            return savedState;
    }
    // ...
}

把节点的状态改成了3。COND和WAITING分别表示在条件队列里,正在等待。

 

第二件事就是把新节点添加到ConditionObject内部维护的单向链表末尾。

 

第三件事就是调用ASQ的release方法,完全释放当前线程持有的锁。

干完这三件事(enableWait结束),线程P1就完全释放了锁,而且他的信息也已经封装成了一个状态是3的ConditionNode,被放到条件队列里了。

2)循环等待

这个时候线程P1进入了等待循环。这个循环的退出条件是canReacquire(node),他会不停的检查node.prev != null来判断节点是不是已经被转移到了同步队列。

只有在同步队列里,节点才会有prev指针。

while (!canReacquire(node)) {
    if (interrupted |= Thread.interrupted()) { // 1处理中断
        if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
            break;              // else interrupted after signal
    } else if ((node.status & COND) != 0) { // 2检查状态,阻塞
        try {
            if (rejected)
                node.block();
            else
                ForkJoinPool.managedBlock(node);
        } catch (RejectedExecutionException ex) {
            rejected = true;
        } catch (InterruptedException ie) {
            interrupted = true;
        }
    } else // 3自旋等待
        Thread.onSpinWait();    // awoke while enqueuing
}

 

1处理中断:如果线程在阻塞的时候被中断,Thread.interrupted()就是true。node.getAndUnsetStatus(COND)就会把状态从3改成1(原子操作)。如果移除成功了,就说明是在signal发生前被中断的,cancelled就会变成true,循环退出,线程就放弃等待了。

2查状态和阻塞:只要(node.status &COND)!=0,就意味着节点的节点的状态还是2,说明还没被signal。线程会通过ForkJoinPool.managedBlock(node)进入阻塞状态。(底层调用LockSupport.park())

3自旋等待:这个主要是为了处理边界情况。如果线程醒来后,发现COND状态位已经没了,但是canReacquire 还不为true(意味着节点正在被转移到同步队列,但还没完成),这种情况就需要线程进行短暂的自旋,而不是马上重新park,提高点效率。

3)重新获取锁

当signal发生,P1节点的COND位被移除,节点被成功转移到同步队列后,while循环就会退出。

这个时候,P1已经从一个等待条件的线程变回了等待锁的线程。

然后调用acquire(node, savedState, ...)。

这个流程就跟我们上一篇文章分析的完全一样了:P1线程会进入AQS的主同步队列排队,遵循FIFO原则,等待轮到自己的时候,被前驱节点唤醒,最后重新获取到锁。

savedState参数会保证他恢复到等待前正确的锁重入次数。

2、signal:从条件队列到同步队列

当另一个线程,比如消费者C1调用signal()的时候,他的任务就是把条件队列中最老的等待者(队头节点)叫醒,让后把他送到同步队列的门口去排队。

public final void signal() {
    ConditionNode first = firstWaiter;
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    if (first != null)
        doSignal(first, false);
}

private void doSignal(ConditionNode first, boolean all) {
    while (first != null) {
        ConditionNode next = first.nextWaiter;
        if ((firstWaiter = next) == null)
                lastWaiter = null;
        if ((first.getAndUnsetStatus(COND) & COND) != 0) {
                enqueue(first);
                if (!all)
                    break;
         }
         first = next;
     }
}

如果firstWaiter不是空,就唤醒一个。doSignal的第二个参数表示是全部唤醒还是唤醒一个。

doSignal中getAndUnsetStatus(COND)是唤醒的第一步,原子移除节点的COND状态。

节点的状态从3(COND | WAITING)变成1(WAITING)。说明这个节点就不再属于条件队列了。

enqueue(first)是最关键的转移操作,这个方法会把这个状态变成1的节点,添加到AQS同步队列的队尾。

到这儿,signal就完成了。你会发现他并没有直接唤醒线程,更多的是扮演着一个调度员的角色。

把await的线程节点从条件等待区搬到锁等待区。这个线程的真正唤醒,是靠他在同步队列里的前驱节点在释放锁的时候完成的。

二、中断响应机制

关于中断响应,我们直接看acquireInterruptibly方法:

public final void acquireInterruptibly(long arg)
        throws InterruptedException {
    if (Thread.interrupted() ||
        (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0))
        throw new InterruptedException();
}

lockInterruptibly() 会调用这个方法。核心就在他调用了内部的acquire方法,然后把interruptible参数设成了 true。

1、中断检测点

在acquire里面有这样一段代码:

node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
    break;

如果一个线程因为调用lockInterruptibly而在队列里park时,他如果被interrupt了,park就会返回。

Thread.interrupted()会返回true,interruptible本身设置是true。&&两边都是true,循环就退出了。线程就不会继续尝试获取锁了。

2、中断后干什么

当循环因为中断break之后,acquire方法里面会调用cancelAcquire。

cancelAcquire方法里面做的事情就是中断线程的善后工作。

private int cancelAcquire(Node node, boolean interrupted,
                              boolean interruptible) {
        if (node != null) {
            node.waiter = null;
            node.status = CANCELLED;
            if (node.prev != null)
                cleanQueue();
        }
        if (interrupted) {
            if (interruptible)
                return CANCELLED;
            else
                Thread.currentThread().interrupt();
        }
        return 0;
    }

把节点的状态修改成CANCELLED。

然后通过cleanQueue清理队列,清理方法有点复杂,大致的逻辑就是遍历队列,把所有CANCELLED状态的节点从双向链表里移除,剩下的节点调整前后驱指针,保证队列的完整性和准确性。

最后返回CANCELLED,acquireInterruptibly接收到这个返回值后,就会往外抛InterruptedException。

结语

本文通过AQS源码剖析了AQS中的条件队列和中断响应机制。

算是不补齐了AQS的最后一块内容。

AQS系列三篇文章,从静态结构到动态流程。

其实不难发现,AQS的设计就是以一个原子状态为核心,围绕一个等待队列,通过模板方法模式把通用并发控制逻辑和具体同步语义解耦。

了解了AQS的设计理念,对于ReentrantLock、Semaphore或CountDownLatch这些基于AQS的同步工具,在使用和分析上,都会有巨大的帮助。

下一篇预告

Day48 | J.U.C集合-ConcurrentHashMap

如果你觉得这系列文章对你有帮助,欢迎关注专栏,我们一起坚持下去!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐