Lock接口定义了如下的方法
在这里插入图片描述

常见的实现类有ReentrantLock等

下面我们来探讨一下ReentrantLock的内部原理,他的类继承关系如下图
在这里插入图片描述

他的内部通过Sync的子类FairSync和NofairSync来实现公平和非公平锁,Sync又继承了AbstractQueuedSynchronizer抽象队列同步器(即AQS),AQS的内部通过双向链表来维护争夺共享资源的线程信息,使用内存可见的state字段标识队列状态。

   /**
     * 等待队列的头节点,采用懒加载方式。除了初始化,都是通过setHead方法修改。注意:如果头存在,他的waitStatus保证不会是CANCELLED
     */
    private transient volatile Node head;

    /**
     * 等待队列的头节点,采用懒加载方式。只能通过enq方法添加新的节点
     */
    private transient volatile Node tail;

    /**
     *AQS抽象队列的状态,使用volatile关键字保证内存可见性
     */
    private volatile int state;

    /**
     * 以下字段设置用于支持compareAndSet(CAS)操作
     */
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }

与常见的双向链表实现差不多,Node节点中主要包括了节点等待状态、前后指针和线程信息等;

static final class Node {
        /** 标志该节点处于共享模式中的等待状态*/
        static final Node SHARED = new Node();
        /** 标志该节点处于独占模式中等待状态*/
        static final Node EXCLUSIVE = null;

        /** 等待状态枚举值:1:线程被取消、0:当前线程已释放锁、-1:后继线程需要激活、-2:线程正等待condition条件、-3:下一个获取共享锁的线程应该无条件传播*/
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;

        /**
         * 节点的等待状态值一般初始化为0或者-2,通过CAS操作进行修改
         */
        volatile int waitStatus;

        /**
         * 前一个节点的指针
         */
        volatile Node prev;

        /**
         * 后一个节点的指针
         */
        volatile Node next;

        /**
         * 当前节点的线程信息
         */
        volatile Thread thread;

        /**
         * 指向下一个正在等待条件的节点或者共享模式的特殊值。因为条件队列只能在维持独占状态的情况下被访问到,
         * 我们需要一个简单的链表来保存处于等待条件的节点。他们之后会被传送到队列中被重新获取。由于条件节点只能是独占的,
         * 我们需要一个字段设置特殊值来标识共享模式
         */
        Node nextWaiter;

        /**
         * 如果节点在共享模式中处于等待状态,返回true
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * 返回前驱节点
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            // 为空报空指针异常
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
    }

重要接口操作

看一下他常见的链表操作。

1、AQS的入队操作
   private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // 尾部为空
                // 通过CAS操作设置链表头,并将尾部指针指向他
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else { // 头不为空
                // 新节点的prev指向尾部
                node.prev = t;
                // CAS设置新队尾
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

  private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

 private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

// 通过Unsafe的本地方法实现CAS操作,这依赖于操作系统底层硬件的指令
 public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

CAS操作依赖的底层硬件指令

架构类型 关键指令 说明
x86 LOCK CMPXCHG 加锁的比较并交换指令
ARM LDREX / STREX 独占加载/独占存储指令
MIPS/Power LL / SC 链接加载/条件存储指令
2、获取公平锁

ReentrantLock的公平锁通过内部类FairSync中的tryAcquire方法实现

 public final void acquire(int arg) {
        // tryAcquire一般是AQS的子类实现,AQS自己的实现会抛出UnsupportedOperationException异常
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 没有获取到锁并且已将当前线程加入到等待队列中,则中断当前线程
            selfInterrupt();
    }

/*
* ReentrantLock中的公平锁tryAcquire的实现
*/
 protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // 获取AQS的状态
            int c = getState();
           
            if (c == 0) { // 没有线程占用共享资源
                // 有排队的线程并且CAS设置状态成功
                if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                    // 设置为当前线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) { // 重复获取锁
                // 增加计数值
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                // 设置状态
                setState(nextc);
                return true;
            }
            return false;
        }

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 先尝试最快的入队方式,失败后再转为常规的enq
        Node pred = tail;
        if (pred != null) { // 队尾不为空
            node.prev = pred;
            // CAS设置当前node为队尾
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                // 最快方式,直接返回
                return node;
            }
        }
        // enq方式入队,比较耗时
        enq(node);
        return node;
    }

  final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) { // 自旋获取锁
                // 获取前驱节点
                final Node p = node.predecessor();
                // 前驱节点为队头并且获取到了锁
                if (p == head && tryAcquire(arg)) {
                    // 设置当前node为队头
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // 返回
                    return interrupted;
                }
                // 当前驱节点等待状态为-1(后继线程需要激活),挂起当前线程并进中断
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    // 中断
                    interrupted = true;
            }
        } finally {
            // 获取锁失败了,就取消获取操作
            if (failed)
                cancelAcquire(node);
        }
    }

  // 确保当前节点的前驱节点是有效的,并且能正确通知(unpark)当前节点
  private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 等待状态枚举值:1:线程被取消、0:当前线程已释放锁、-1:后继线程需要激活、-2:线程正等待condition条件、-3:下一个获取共享锁的线程应该无条件传播
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) // 前驱节点状态为SIGNAL,表示他结束后会发送释放信号通知等待节点,因此可以安全的挂起当前节点
            return true;
        if (ws > 0) { // 前驱线程操作被取消,需要连续跳过这样的前驱节点直到节点的等待状态<=0
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            // 设置当前节点为前驱的下一个节点,便于下一次自旋获得锁
            pred.next = node;
        } else { // 前驱节点状态<=0
            // CAS操作将前驱设置为 -1(SIGNAL),这样前驱节点完成之后,会发送释放信号
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

private final boolean parkAndCheckInterrupt() {
        // 阻塞当前线程,线程中断
        LockSupport.park(this);
        return Thread.interrupted();
    }
3、获取非公平锁

ReentrantLock的非公平锁通过内部类NonFairSync中的tryAcquire方法实现,与公平锁不同的是:非公平锁直接使用CAS操作设置AQS的状态,不会判断AQS有没有等待的节点,因此性能更高。

 public final void acquire(int arg) {
        // 只是tryAcquire实现不同,其他与公平锁相同
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

// NonFairSync的tryLock实现,实际调用的是Sync实现的非公平锁
public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

// Sync实现的非公平锁
final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 直接使用CAS操作设置AQS的状态,获取到锁设置自己的线程信息
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) { // 锁重入逻辑
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
5、在等待时间内获取锁

AQS 支持在等待时间内获取锁的操作,实现方法为tryAcquireNanos,与tryAcquire不同的是,它先尝试立即获取锁,获取失败了就挂起当前线程,挂时长大致为nanosTimeout

 public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
        // 线程中断了,抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        // 先尝试常规方法获取锁,没有获取到再采用超时方式获取锁
        return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
    }

  private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
        // 超时时间不能小于0
        if (nanosTimeout <= 0L)
            return false;
        // 计算截止时间=当前时间纳秒+等待时间纳秒
        final long deadline = System.nanoTime() + nanosTimeout;
        // 新增等待节点
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) { // 循环获取锁
                // 前驱节点
                final Node p = node.predecessor();
                // 前驱为队头并且获取到了锁,则设置队头为当前节点
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                // 获取失败了,更新超时时间
                nanosTimeout = deadline - System.nanoTime();
                // 超时时间到,返回获取失败
                if (nanosTimeout <= 0L)
                    return false;
                // 获取失败并且本次结算的超时nanosTimeout时间大于自旋的阈值时间,则挂起当前线程,挂起时长为nanosTimeout
                if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                // 当前线程中断,抛出异常
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            // 如果获取锁失败,则取消获取锁操作
            if (failed)
                cancelAcquire(node);
        }
    }

4、release操作为释放锁的操作
  public final boolean release(int arg) {
        // tryRelease一般是AQS的子类实现,AQS自己的实现会抛出UnsupportedOperationException异常
        if (tryRelease(arg)) {
            // 释放后操作
            Node h = head;
            // 链表头不为空并且表头等待状态!=0
            if (h != null && h.waitStatus != 0)
                // 激活后继节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

   /*
    * ReentrantLock中的tryRelease实现
    */
    protected final boolean tryRelease(int releases) {
                // 获取AQS的state状态 - 释放数量
                int c = getState() - releases;
                // 判断释放线程是不是当前获取锁的线程
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
        
                boolean free = false;
                if (c == 0) {
                    // 锁释放了,需要设置占用线程为null
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
     }


   private void unparkSuccessor(Node node) {
        /*
         * 等待状态枚举值:
         * 1:线程被取消、0:当前线程已释放锁、-1:后继线程需要激活、-2:线程正等待condition条件、-3:下一个获取共享锁的线程应该无条件传播
         */
        int ws = node.waitStatus;
        if (ws < 0)
            // CAS设置waitStatus为0
            compareAndSetWaitStatus(node, ws, 0);
        
       // s为后继节点
        Node s = node.next;
        // 后继节点为空或者其waitStatus>0
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 从末尾向前找待激活节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
       // 后继节点不为空,激活节点线程
        if (s != null)
            LockSupport.unpark(s.thread);
    }
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐