AbstractQueuedSynchronizer设计者:Doug Lea
一、简介
1、本质上,AQS是基于Unsafe.CAS Unsafe.park Unsafe.unpark的原子操作来实现锁功能
Unsafe的操作粒度不是类,而是数据和地址。
1)先理解LockSupport.park,unpark(LockSupport.park内部是Unsafe.park,unpark也类似)
使用LockSupport的线程都会与一个许可关联,如果该许可可用,则调用park()将会立即返回,否则可能阻塞。
如果许可尚不可用,则可以调用 unpark 使其可用。
许可不可重入,也就是说只能调用一次park()方法,否则会一直阻塞。
2)park():阻塞线程。
如果许可可用,则使用该许可,并且该调用立即返回;否则,为线程调度禁用当前线程,使其处于休眠状态。
3)唤醒条件:a:其他线程将当前线程(已休眠)作为目标调用 unpark,在其他线程里持有当前线程的对象,并LockSupport.unpark(休眠线程);
b:在其他线程中断当前线程(已休眠),在其他线程里持有当前线程对象,调用当前线程对象.interrupt();
c:该调用不合逻辑地(即毫无理由地)返回。
LockSupport.park():为了线程调度,在许可可用之前禁用当前线程(使线程休眠)
LockSupport.unpark():如果给定线程的许可尚不可用,则使其可用(唤醒线程)
4)AQS在功能上有 独占锁(排它锁) 和 共享锁 两种功能
抽象了绝大多数Lock的功能,而只把tryAcquire方法延迟到子类中去实现。
tryAcquire方法用具体子类判断请求线程是否可以获得锁,无论成功与否AbstractQueuedSynchronizer都将处理后面的流程。
3、AbstractQueuedSynchronizer结构
//在父类中定义的,独占模式下,指向占用该锁的线程
private transient Thread exclusiveOwnerThread;//独占模式下的线程
//head指向等待获取锁的队列(双链表)头部
private transient volatile Node head;
//tail指向等待获取锁的队列(双链表)尾部
private transient volatile Node tail;
/**锁的状态
* 锁的状态只有0 和1,0表示锁未被占用,可以获取;1表示锁被占用,请求的线程会放入队列尾部
*/
private volatile int state;
//AQS的内部类,每个请求锁的线程都会封装成一个Node对象
static final class Node {...}
//TODO 还有ConditionObject ...
4、AQS工作原理
1)独占锁(排它锁)工作原理:
一个线程尝试获取该锁时,先判断锁是否被占用,如果未被占用则,head指向该线程所在的Node
如果锁被占用,那么tail原先指向的OldTailNode.next -> CurrentNode && tail -> currentNode
这样就把当前线程封装的Node放到了等待获取锁的队列中,线程处于阻塞状态。
但正在执行的线程并不在队列中,而那些等待执行的线程全部处于阻塞状态。
2)共享锁工作原理:
5、 AQS的两个队列1)SyncQueue: 独占模式请求获取锁时,没获取到会塞到一个双向链表,就是SyncQueue
元素特点:
a:一定有node.prev
b:除了tail节点其他节点都有node.next
c:head为当前占用锁的节点,不在SyncQueue中,但是head.next在SyncQueue中
d:tail节点为SyncQueue的尾节点
2)condition队列: ConditionObject中管理的等待队列,单向链表
元素特点:
a:状态为Node.CONDITION
b:除了最后一个元素,其他元素都有node.nextWaiter
3)Node类的设计兼容这两个队列,nextWaiter是给condition队列用的,prev/next是给SyncQueue用的,不要混淆了
6、独占锁,SyncQueue中阻塞的节点,在head执行完后,调用release()唤醒后继节点,唤醒的节点成为head节点开始获取锁,如此驱动等待队列的。
二、源码1、父类
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
protected AbstractOwnableSynchronizer() { }
private transient Thread exclusiveOwnerThread;//独占模式下的线程
protected final void setExclusiveOwnerThread(Thread thread) {//设置独占线程
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {//获取独占线程
return exclusiveOwnerThread;
}
}
2、最核心的几个成员变量
/**
* 等待获取锁的队列的头部节点,只能通过setHead(node)方法来修改。如果head存在,它的waitStatus一定不会是CANCELLED
* 一个节点只有成功获取到锁时才会成为head节点,一个取消的线程永远无法获取锁,且一个线程只能被自身取消,不是其他任务节点取消的
*/
private transient volatile Node head;
//等待获取锁的队列的尾部节点,只能通过enq(node)方法来修改,添加新的等待节点
private transient volatile Node tail;
//锁的同步状态,只有两个值0,1;0:锁未被占用,可获取。1:锁已被占用,请求的线程进入等待队列。
private volatile int state;
// 自旋时间
static final long spinForTimeoutThreshold = 1000L;
3、内部类Node
static final class Node {
//标记,表示一个节点处于共享模式等待状态
static final Node SHARED = new Node();
//标记,表示一个节点处于独占模式等待状态
static final Node EXCLUSIVE = null;
//waitStatus值,因为超时或中断,该线程已经被取消
static final int CANCELLED = 1;
//waitStatus值,当前线程的后继线程正/已被阻塞,当该线程release或cancel时要唤醒后继线程(unpark)
static final int SIGNAL = -1;
//waitStatus值,表明该线程被处于条件队列,就是因为调用了Condition.await而被阻塞
static final int CONDITION = -2;//这是给condition队列用的
//waitStatus值,传播共享锁
static final int PROPAGATE = -3;
//waitStatus默认值为0,表示当前节点在sync队列中,等待着获取锁
volatile int waitStatus;
/**
* 指向当前节点的上一个节点,当前节点依赖它来检测waitStatus
* 在当前节点进入等待队列时赋值,在移除队列时置空,或者当前节点线程被取消时置空
*/
volatile Node prev;//当前节点的上一个节点,如果当前节点被取消,就要把上一个节点与下一个节点连接起来
volatile Node next;//当前节点的下一个节点,如果next为null,说明是队列尾部
volatile Thread thread;//请求锁的线程
Node nextWaiter;//1)condition队列,指向condition队列的下一个节点; 2)SyncQueue,用于标识Node是共享锁(Node.SHARE)还是排它锁(Node.EXCLUSIVE)
//当前节点是否处于共享模式等待状态
final boolean isShared() {
//condition队列的下一个节点,什么意思?
return nextWaiter == SHARED;
}
//返回当前节点的上一个节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {}
/** Constructor used by addWaiter. */
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;//添加condition队列的下一个节点
U.putObject(this, THREAD, Thread.currentThread());//设置节点的thread为当前线程(原子操作)
}
/** Constructor used by addConditionWaiter. */
Node(int waitStatus) {
U.putInt(this, WAITSTATUS, waitStatus);//设置节点的waitStatus为传入的值(原子操作)
U.putObject(this, THREAD, Thread.currentThread());//设置节点的thread为当前线程(原子操作)
}
//如果节点waitStatus为expect,则更新为update,并返回true;否则返回false(原子操作)
final boolean compareAndSetWaitStatus(int expect, int update) {
return U.compareAndSwapInt(this, WAITSTATUS, expect, update);
}
//如果节点的下一个节点next是expect,则更新为update并返回true,否则返回false(原子操作)
final boolean compareAndSetNext(Node expect, Node update) {
return U.compareAndSwapObject(this, NEXT, expect, update);
}
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long NEXT;
static final long PREV;
private static final long THREAD;
private static final long WAITSTATUS;
static {
try {
NEXT = U.objectFieldOffset
(Node.class.getDeclaredField("next"));
PREV = U.objectFieldOffset
(Node.class.getDeclaredField("prev"));
THREAD = U.objectFieldOffset
(Node.class.getDeclaredField("thread"));
WAITSTATUS = U.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
/**
* 独占模式可取消线程请求获取锁
*/
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);//先塞入等待队列,独占模式
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();//阻塞后如果线程已经中断了,抛出异常
}
} catch (Throwable t) {
cancelAcquire(node);//取消节点
throw t;
}
}
/**
* 独占模式获取锁,忽略中断,即线程在aquire过程中,中断此线程是无效的
* 当有线程竞争锁时,该线程会首先尝试获得锁tryAcquire(),这对于那些已经在队列中排队的线程来说显得不公平
* ,这也是非公平锁的由来。
*/
public final void acquire(int arg) {
/* 1)tryAcquire没有获取到锁(子类实现)
* 2)addWaiter没获取到锁,往队列尾部添加一个封装了该请求线程的Node节点(塞入阻塞队列)
* 3)acquireQueued 阻塞park新添加的节点,阻塞前会再次尝试获取锁,
* 失败则阻塞(阻塞塞入的节点线程,阻塞前还会再tryAcquire一次)
*/
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();//中断请求的线程本身
}
/**由子类实现的获取锁,比如子类中用Unsafe.CAS去设置state根据结果返回是否成功获取锁
* DEMO: if(Unsafe.compareAndSetState(0,1)) {return true;} return false;
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**由子类实现的释放锁
* DEMO: setExclusiveOwnerThread(null);state(0);return true;
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
* 独占模式在nanos时间内获取锁
* 超时或者异常时,取消节点
*/
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);//请求线程添加到等待队列tail节点
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {//是head的后继节点,且获取锁成功,设置为head节点
setHead(node);
p.next = null; // help GC
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) {
cancelAcquire(node);//超时,设置为取消状态
return false;
}
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
/* 1)超时退出
* 2)未超时被唤醒,重新循环
* 3)被中断
*/
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
/**
* 把已经追加到队列的线程节点(addWaiter方法返回值)进行阻塞,但阻塞前又通过tryAccquire重试是否能获得锁,
* 如果重试成功能则无需阻塞,直接返回。阻塞前把前继节点状态设置为SIGNAL(保证前继节点获取到锁后能唤醒自己)
*/
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//获取上一个节点
/* 1)尾部节点的上一个节点是head节点
* 2)新添加的节点所在线程(当前线程)成功请求到锁*/
if (p == head && tryAcquire(arg)) {
//head节点指向新添加的这个尾部节点,在setHead里会把node.pre置空
setHead(node);
p.next = null; //老节点持有的指向新的head节点(node)的引用置空
return interrupted;
}
/* 1、返回是否可以请求锁失败后是否可以park
* 2、park当前请求线程,返回当前请求线程状态是否为中断
*/
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
//当后期node解锁后,会从这里开始继续循环,重新竞争获取锁,失败的又会继续阻塞
interrupted = true;//如果返回状态为中断,那么返回到上一层后,会调用中断方法中断请求线程
}
} catch (Throwable t) {
cancelAcquire(node);//失败了取消节点
throw t;
}
}
/**
* 根据前继节点判断当前线程是否应该被阻塞,如果前继节点处于CANCELLED状态,则顺便删除这些节点重新构造队列
* 由于外层循环,最后要么node成了head节点,要么node.pre.waitStatus == SIGNAL,然后node节点park阻塞
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//如果前继的节点状态为SIGNAL,表明当前节点需要unpark,返回true
if (ws == Node.SIGNAL)
return true;
/* 如果前继节点状态为CANCELLED(ws>0),说明前置节点已经被取消,则回溯到一个非取消的前继节点,返回false,
* acquireQueued方法的无限循环,直至返回true,导致线程阻塞*/
if (ws > 0) {//前继节点为取消状态,删除该节点,继续往前遍历,直到找到非取消的前继节点
do {
node.prev = pred = pred.prev;//node.pre指向pre节点的上一个节点(pre节点自身已经被取消了)
} while (pred.waitStatus > 0);
pred.next = node;//非取消状态的前继节点跟当前节点连起来
} else {
/* 前继节点状态为非SIGNAL、非CANCELLED,设置前继的状态为SIGNAL,返回false后进入acquireQueued的无限循环
* 前继节点设置为SIGNAL了,下次循环就会返回true了;
*/
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);//如果pred.waitStatus == ws,则pred.waitStatus->SIGNAL
}
return false;
}
/**
* 使当前线程休眠,返回当前线程是否中断
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
//锁的状态state==expect,更新成update并返回true;否则返回false
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSwapInt(this, STATE, expect, update);
}
/**
* 把一个封装了线程的节点,放到等待队列
* 添加成功后返回新的tail节点,也就是最后添加的节点
* @param mode 是独占锁还是共享锁,默认为null,独占锁
* @return 返回添加的新节点(跟enq(node)不同之处)
*/
private Node addWaiter(Node mode) {
/* Node(Node nextWaiter) {
* this.nextWaiter = nextWaiter;//传入的mode赋值给了新node的nextWaiter
* U.putObject(this, THREAD, Thread.currentThread());//设置新node的线程为当前线程
* }*/
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
//新node.pre -> oldTail (原子操作)
U.putObject(node, Node.PREV, oldTail);
//如果oldTail == tail(并发下,可能tail已经被其他线程抢先赋值了),则 tail -> node,并返回true
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;//oldTail.next -> node
return node;//到这里已经完成节点插入,node已经是tail节点,返回新的tail节点,也就是添加的这个节点
}
} else {//tail为null,说明队列还没初始化
//创建一个节点,作为同步队列第一个元素,head、tail都指向它
initializeSyncQueue();
}
}
}
/**
* 往队列中插入节点
* @return 新的节点插入到尾部后,返回旧的tail节点(跟addWaiter区别在返回值不同)
*/
private Node enq(Node node) {
for (;;) {
/* 尾部插入节点,tail所指向的节点就成为倒数第二个节点了
* 插入的节点node.pre -> oldTail, oldTail.next -> node, tail ->node
*/
Node oldTail = tail;
if (oldTail != null) {
U.putObject(node, Node.PREV, oldTail);//node.pre -> oldTail(原子操作)
//如果oldTail == tail(并发下,可能tail已经被其他线程抢先赋值了),则 tail -> node,并返回true
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;//oldTail.next -> node
return oldTail;//到这里已经完成节点插入,node已经是tail节点,返回node的上一个节点
}
} else {//tail为null,说明队列还没初始化
//创建一个节点,作为同步队列第一个元素,head、tail都指向它
initializeSyncQueue();
}
}
}
/**
* 初始化锁的head、tail,二者都指向同一个节点
* 双链表队列嘛,有了一个节点了也就算初始化了同步队列且有了一个元素
*/
private final void initializeSyncQueue() {
Node h;
if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))//head为null,head->h,return true;
tail = h;//tail也指向h,初始化时head、tail都指向同一个node
}
/**
* 设置node为head节点
* Sets head of queue to be node, thus dequeuing. Called only by acquire methods. Also nulls out unused fields for sake of GC and to suppress unnecessary signals and traversals.
* @param node the node
*/
private void setHead(Node node) {
head = node;//head -> node
node.thread = null;//头节点的线程置空,方便回收,node.thread-> null
node.prev = null;//头节点前面不能有其他节点了,node.pre -> null
}
/**
* 如果节点后面还有节点,唤醒下一个节点的线程
* 找出第一个可以unpark的线程,一般说来head.next == head,Head就是第一个线程,
* 但Head.next可能被取消或被置为null,因此比较稳妥的办法是从后往前找第一个可用线程。
* 回溯会导致性能降低,但是这个发生的几率很小,所以不会有性能影响。之后便是通知系统内核继续执行该线程。
*/
private void unparkSuccessor(Node node) {
//如果状态为负值(即可能需要信号),请尝试清除信号
int ws = node.waitStatus;
if (ws < 0)//如果node.waitStatus < 0,将node.waitStatus设置为0
node.compareAndSetWaitStatus(ws, 0);
/*
* 队列中需要要释放许可的线程,通常是下一个节点的线程,
* 如果下一个节点明显为null或者下一个节点已经取消,则从tail节点向前遍历找到未取消的继任者节点
*/
Node s = node.next;
//waitStatus只有取消状态是>0的
if (s == null || s.waitStatus > 0) {//下一个节点为null/下一个节点的线程已经取消
s = null;//置空方便GC
//从尾巴tail节点开始往前遍历,找到最后一个非取消状态的节点
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;//从末尾开始往前找,找到的第一个未取消的节点,作为下一个要唤醒的节点
}
if (s != null)//唤醒该节点的线程
LockSupport.unpark(s.thread);
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
/**
* Cancels an ongoing attempt to acquire.
* 取消一个正在尝试获取锁的节点
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;//线程引用置空
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)//从队列移除已经取消的前继节点
node.prev = pred = pred.prev;
Node predNext = pred.next;
//下面开始做的事情就是把node状态设为取消,把node从它的前后节点中踢出来
node.waitStatus = Node.CANCELLED;//节点状态设置会CANCEL
//如果取消的是tail节点,把tail引用指向上一个节点,且把新的tail.next->null
if (node == tail && compareAndSetTail(node, pred)) {
pred.compareAndSetNext(predNext, null);
} else {//取消的不是tail节点
int ws;
if (pred != head
&& ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL)))
&& pred.thread != null) {//node不是head节点的后继节点head-node1-node2-node3-tail,【node位于node2-node3时】
//把node节点踢出去
Node next = node.next;
if (next != null && next.waitStatus <= 0)
pred.compareAndSetNext(predNext, next);//node.pre.next -> node.next,这样node就踢出去了
} else {//node是head的后继节点,唤醒node的下一个节点(因为node取消了,本来应该唤醒它的,取消后就得唤醒它的下一个节点了)
unparkSuccessor(node);//唤醒node节点之后需要执行的节点
}
node.next = node; //要取消的节点node的next指向自身,帮助GC
}
}
/**
* 独占模式释放锁,释放head节点,head节点状态设为0,唤醒后继节点的线程
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {//尝试释放,子类实现
Node h = head;
if (h != null && h.waitStatus != 0)//head节点非空,状态非0
unparkSuccessor(h);//释放head节点,并唤醒下一个节点(unpark下一个节点的线程)
return true;
}
return false;
}
/**
* 释放当前线程的锁
*/
final int fullyRelease(Node node) {
try {
int savedState = getState();
if (release(savedState))
return savedState;
throw new IllegalMonitorStateException();
} catch (Throwable t) {
node.waitStatus = Node.CANCELLED;
throw t;
}
}
// Internal support methods for Conditions
/**
* 检测node是否再次transfer到CLH队列中
* (其他线程调用signal或signalAll时,该线程可能从CONDITION队列中transfer到CLH队列中)
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
/*1)CONDITION状态肯定在CONDITION队列,也就是不在SyncQueue(SyncQueue就是AQS可以竞争锁的队列)
*2)SyncQueue队列一定有prev,除了最后一个其他的也都有next,而CONDITION队列只有nextWaiter,没有prev也不在SyncQueue
*/
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) //有next肯定在SyncQueue
return true;
//正好有prev,且没有next;从尾节点向前遍历整个SyncQueue得到结果
return findNodeFromTail(node);
}
/**
* 从尾节点开始遍历SyncQueue,返回node是否在SyncQueue中
*/
private boolean findNodeFromTail(Node node) {
for (Node p = tail;;) {
if (p == node)
return true;
if (p == null)
return false;
p = p.prev;
}
}
/**
* 在独占模式下,锁的状态是否为被占用状态
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
/**
* 将一个节点从condition队列转换到Sync队列
*/
final boolean transferForSignal(Node node) {
//waitStatus不为CONDITION或者无法改变这个值
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
//到这里node.waitStatus是0了
//把node塞到SyncQueue尾部
Node p = enq(node);//p是原来的尾部节点oldTail
int ws = p.waitStatus;
/*原来的尾节点oldTail已取消(>0),或者oldTail状态无法设置成SIGNAL
* 否则该节点在Sync队列中前继节点已经是signal状态了,唤醒工作交给前驱节点(节省了一次park和unpark操作)
*/
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);//唤醒node节点线程,唤醒了当然是去竞争锁了
return true;
}
...
public class ConditionObject implements Condition, java.io.Serializable {
ConditionObject解析
}
}
三、排它锁的实现流程图
四、共享锁实现
共享锁用的也是SyncQueue,只是策略不同,一开始阻塞请求的多个线程,一个共享节点唤醒后又会循环下去把所有共享节点都唤醒,阻塞的多个共享节点的线程都被唤醒,然后都去获取共享锁,并往下执行。
补充:tryAcquire()实现的不是偏向锁,而是不公平锁!上图写错了!
参考资料:深入JVM锁机制2-Lock