Java-Lock锁
ReentrantLock内部原理简析 ReentrantLock通过Sync抽象类及其子类FairSync(公平锁)和NonfairSync(非公平锁)实现锁机制,底层基于AQS(AbstractQueuedSynchronizer)框架。AQS使用volatile修饰的state变量表示锁状态,通过双向链表维护等待线程队列。Node节点包含waitStatus、前后指针和线程信息等字段。关键操
Lock接口定义了如下的方法
常见的实现类有ReentrantLock等
下面我们来探讨一下ReentrantLock的内部原理,他的类继承关系如下图
他的内部通过Sync的子类FairSync和NofairSync来实现公平和非公平锁,Sync又继承了AbstractQueuedSynchronizer抽象队列同步器(即AQS),AQS的内部通过双向链表来维护争夺共享资源的线程信息,使用内存可见的state字段标识队列状态。
/**
* 等待队列的头节点,采用懒加载方式。除了初始化,都是通过setHead方法修改。注意:如果头存在,他的waitStatus保证不会是CANCELLED
*/
private transient volatile Node head;
/**
* 等待队列的头节点,采用懒加载方式。只能通过enq方法添加新的节点
*/
private transient volatile Node tail;
/**
*AQS抽象队列的状态,使用volatile关键字保证内存可见性
*/
private volatile int state;
/**
* 以下字段设置用于支持compareAndSet(CAS)操作
*/
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
与常见的双向链表实现差不多,Node节点中主要包括了节点等待状态、前后指针和线程信息等;
static final class Node {
/** 标志该节点处于共享模式中的等待状态*/
static final Node SHARED = new Node();
/** 标志该节点处于独占模式中等待状态*/
static final Node EXCLUSIVE = null;
/** 等待状态枚举值:1:线程被取消、0:当前线程已释放锁、-1:后继线程需要激活、-2:线程正等待condition条件、-3:下一个获取共享锁的线程应该无条件传播*/
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/**
* 节点的等待状态值一般初始化为0或者-2,通过CAS操作进行修改
*/
volatile int waitStatus;
/**
* 前一个节点的指针
*/
volatile Node prev;
/**
* 后一个节点的指针
*/
volatile Node next;
/**
* 当前节点的线程信息
*/
volatile Thread thread;
/**
* 指向下一个正在等待条件的节点或者共享模式的特殊值。因为条件队列只能在维持独占状态的情况下被访问到,
* 我们需要一个简单的链表来保存处于等待条件的节点。他们之后会被传送到队列中被重新获取。由于条件节点只能是独占的,
* 我们需要一个字段设置特殊值来标识共享模式
*/
Node nextWaiter;
/**
* 如果节点在共享模式中处于等待状态,返回true
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回前驱节点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
// 为空报空指针异常
if (p == null)
throw new NullPointerException();
else
return p;
}
}
重要接口操作
看一下他常见的链表操作。
1、AQS的入队操作
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 尾部为空
// 通过CAS操作设置链表头,并将尾部指针指向他
if (compareAndSetHead(new Node()))
tail = head;
} else { // 头不为空
// 新节点的prev指向尾部
node.prev = t;
// CAS设置新队尾
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
// 通过Unsafe的本地方法实现CAS操作,这依赖于操作系统底层硬件的指令
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
CAS操作依赖的底层硬件指令
| 架构类型 | 关键指令 | 说明 |
|---|---|---|
| x86 | LOCK CMPXCHG | 加锁的比较并交换指令 |
| ARM | LDREX / STREX | 独占加载/独占存储指令 |
| MIPS/Power | LL / SC | 链接加载/条件存储指令 |
2、获取公平锁
ReentrantLock的公平锁通过内部类FairSync中的tryAcquire方法实现
public final void acquire(int arg) {
// tryAcquire一般是AQS的子类实现,AQS自己的实现会抛出UnsupportedOperationException异常
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 没有获取到锁并且已将当前线程加入到等待队列中,则中断当前线程
selfInterrupt();
}
/*
* ReentrantLock中的公平锁tryAcquire的实现
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取AQS的状态
int c = getState();
if (c == 0) { // 没有线程占用共享资源
// 有排队的线程并且CAS设置状态成功
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
// 设置为当前线程
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) { // 重复获取锁
// 增加计数值
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 设置状态
setState(nextc);
return true;
}
return false;
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 先尝试最快的入队方式,失败后再转为常规的enq
Node pred = tail;
if (pred != null) { // 队尾不为空
node.prev = pred;
// CAS设置当前node为队尾
if (compareAndSetTail(pred, node)) {
pred.next = node;
// 最快方式,直接返回
return node;
}
}
// enq方式入队,比较耗时
enq(node);
return node;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { // 自旋获取锁
// 获取前驱节点
final Node p = node.predecessor();
// 前驱节点为队头并且获取到了锁
if (p == head && tryAcquire(arg)) {
// 设置当前node为队头
setHead(node);
p.next = null; // help GC
failed = false;
// 返回
return interrupted;
}
// 当前驱节点等待状态为-1(后继线程需要激活),挂起当前线程并进中断
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
// 中断
interrupted = true;
}
} finally {
// 获取锁失败了,就取消获取操作
if (failed)
cancelAcquire(node);
}
}
// 确保当前节点的前驱节点是有效的,并且能正确通知(unpark)当前节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 等待状态枚举值:1:线程被取消、0:当前线程已释放锁、-1:后继线程需要激活、-2:线程正等待condition条件、-3:下一个获取共享锁的线程应该无条件传播
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 前驱节点状态为SIGNAL,表示他结束后会发送释放信号通知等待节点,因此可以安全的挂起当前节点
return true;
if (ws > 0) { // 前驱线程操作被取消,需要连续跳过这样的前驱节点直到节点的等待状态<=0
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 设置当前节点为前驱的下一个节点,便于下一次自旋获得锁
pred.next = node;
} else { // 前驱节点状态<=0
// CAS操作将前驱设置为 -1(SIGNAL),这样前驱节点完成之后,会发送释放信号
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程,线程中断
LockSupport.park(this);
return Thread.interrupted();
}
3、获取非公平锁
ReentrantLock的非公平锁通过内部类NonFairSync中的tryAcquire方法实现,与公平锁不同的是:非公平锁直接使用CAS操作设置AQS的状态,不会判断AQS有没有等待的节点,因此性能更高。
public final void acquire(int arg) {
// 只是tryAcquire实现不同,其他与公平锁相同
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// NonFairSync的tryLock实现,实际调用的是Sync实现的非公平锁
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
// Sync实现的非公平锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 直接使用CAS操作设置AQS的状态,获取到锁设置自己的线程信息
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) { // 锁重入逻辑
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
5、在等待时间内获取锁
AQS 支持在等待时间内获取锁的操作,实现方法为tryAcquireNanos,与tryAcquire不同的是,它先尝试立即获取锁,获取失败了就挂起当前线程,挂时长大致为nanosTimeout
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
// 线程中断了,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 先尝试常规方法获取锁,没有获取到再采用超时方式获取锁
return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
// 超时时间不能小于0
if (nanosTimeout <= 0L)
return false;
// 计算截止时间=当前时间纳秒+等待时间纳秒
final long deadline = System.nanoTime() + nanosTimeout;
// 新增等待节点
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) { // 循环获取锁
// 前驱节点
final Node p = node.predecessor();
// 前驱为队头并且获取到了锁,则设置队头为当前节点
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 获取失败了,更新超时时间
nanosTimeout = deadline - System.nanoTime();
// 超时时间到,返回获取失败
if (nanosTimeout <= 0L)
return false;
// 获取失败并且本次结算的超时nanosTimeout时间大于自旋的阈值时间,则挂起当前线程,挂起时长为nanosTimeout
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 当前线程中断,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
// 如果获取锁失败,则取消获取锁操作
if (failed)
cancelAcquire(node);
}
}
4、release操作为释放锁的操作
public final boolean release(int arg) {
// tryRelease一般是AQS的子类实现,AQS自己的实现会抛出UnsupportedOperationException异常
if (tryRelease(arg)) {
// 释放后操作
Node h = head;
// 链表头不为空并且表头等待状态!=0
if (h != null && h.waitStatus != 0)
// 激活后继节点
unparkSuccessor(h);
return true;
}
return false;
}
/*
* ReentrantLock中的tryRelease实现
*/
protected final boolean tryRelease(int releases) {
// 获取AQS的state状态 - 释放数量
int c = getState() - releases;
// 判断释放线程是不是当前获取锁的线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 锁释放了,需要设置占用线程为null
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
/*
* 等待状态枚举值:
* 1:线程被取消、0:当前线程已释放锁、-1:后继线程需要激活、-2:线程正等待condition条件、-3:下一个获取共享锁的线程应该无条件传播
*/
int ws = node.waitStatus;
if (ws < 0)
// CAS设置waitStatus为0
compareAndSetWaitStatus(node, ws, 0);
// s为后继节点
Node s = node.next;
// 后继节点为空或者其waitStatus>0
if (s == null || s.waitStatus > 0) {
s = null;
// 从末尾向前找待激活节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 后继节点不为空,激活节点线程
if (s != null)
LockSupport.unpark(s.thread);
}
更多推荐

所有评论(0)