AbstractQueueSysnchorized原理 - 969251639/study GitHub Wiki
AQS,AbstractQueuedSynchronizer,抽象队列同步器,哟呵过来构建java同步组件的基础框架(ReentrantLock,Semaphore等),时JUC并发包中核心中的核心组件
AQS实现了同步器大量的细节,子类通过继承的方式实现它的抽象方法来管理同步状态
AQS内部用一个volatile修饰的成员变量state来表示同步状态,规定state>0表示获得同步状态,state=0表示任何线程获得同步状态,并通过CAS保证state修改的原子性,同时内置FIFO同步队列来管理线程资源
AQS的主要方法如下:
- getState():返回同步状态的当前值;
- setState(int newState):设置当前同步状态;
- compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;
- tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态;
- tryRelease(int arg):独占式释放同步状态;
- tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;
- tryReleaseShared(int arg):共享式释放同步状态;
- isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;
- acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;
- acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;
- tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
- acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
- acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
- tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;
- release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;
- releaseShared(int arg):共享式释放同步状态;
AQS同步队列:
CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其定义如下:
AQS的同步队列CLH(Craig, Landin, and * Hagersten)是一个双向的FIFO队列,队列中的节点(Node)会保存
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
1. Node
节点(Node)是AQS的CLH队列的元素,其成员变量主要如下:
//共享模式
static final Node SHARED = new Node();
//独占模式
static final Node EXCLUSIVE = null;
//取消状态
static final int CANCELLED = 1;
//通知状态,用于通知唤醒后继节点
static final int SIGNAL = -1;
//条件等待状态,主要用于Condition
static final int CONDITION = -2;
//共享模式下同步状态传播
static final int PROPAGATE = -3;
//等待状态,就是上面的那四个状态,大小依次递减(上面的那四个状态不包含0,是因为0是初始状态,也就是watiStatus的初始值是0,因为是int类型)
volatile int waitStatus;
//双向队列的前驱节点
volatile Node prev;
//双向队列的后继节点
volatile Node next;
//引用的线程
volatile Thread thread;
//下一个等待者,Condition条件队列使用
Node nextWaiter;
从上可以看出Node主要是由模式、状态、队列节点和线程引用这四部分组成的
Node包括三个构造方法
//构造初始节点,成员变量都是默认值
Node() { // Used to establish initial head or SHARED marker
}
//基于模式构造节点,用于插入队列节点
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
//基于等待状态构造节点,用于Condition
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
同时也包含两个辅助方法
//是否是共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
//获取当前节点的前驱节点,获取不到则报空指针异常(表示前驱节点一定要有,不然抛异常)
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
2. AQS变量
//头节点指针
private transient volatile Node head;
//尾节点指针
private transient volatile Node tail;
//同步状态
private volatile int state;
//是否自旋的阈值
static final long spinForTimeoutThreshold = 1000L;
3. AQS的变量和Node共同构成AQS的基础数据结构
AQS内部的数据结构如下
head指针指向队列的第一个节点,tail指针指向队列的最后一个节点
4. 入队
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
4.1 根据模式构造一个节点
Node node = new Node(Thread.currentThread(), mode);
4.2 获取当前队列的尾节点,将尾节点赋给要入队的节点的前驱节点(保存下当前尾节点的引用而已)
Node pred = tail;
4.3 如果尾节点不为空(队列有元素的情况)
那么将刚才的尾节点引用再次赋给前驱节点
这里为什么又要再次赋给前驱节点呢?因为如果队列不为空,且有多个线程入队的情况下,假如Node4和Node5入队,Node3为尾节点,那么如果Node4先执行到addWaiter方法,那么会有如下的情况
这时候Node5也入队
然后两个线程cas,如果Node4 cas成功,那么Node5 cas失败,进入enq方法的是Node5,如果Node5 cas成功,Node4 cas失败,进入enq方法的是Node5,但不管谁成功,都会有一个线程进入enq方法,这里假设Node5 cas设置成功
也就是
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
如果cas成功,那么表示入队成功(也就是插入到队列的尾节点),则返回,接下来看enq方法
进入enq方法有两种场景,第一个是队列为空的时候,也就是初始状态,prev == null,第二个是多线程竞争,cas入队失败的情况
**场景一:多线程竞争导致cas入队失败 ** 很明显上面的例子中,Node4 cas入队失败导致进入enq
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
...
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
可以看到跟addWaiter方法一样,获取当前队列的尾指针,缓存起来,然后有个if,走两个场景,当t == null的时候,也就是队列为空的场景,否则进入else,多线程竞争情况,和addWaiter的套路一样,继续上面的那个例子,首先将tail(Node5)赋给Node4的prev
然后也一样cas入队到尾节点,继续多线程竞争入队,入队失败就重试,直到到成功,成功后,设置下队列的next指针,组成双向链表即可,最后返回尾节点的前驱节点(不是尾节点)
场景二:队列为空
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
...
}
}
}
队列为空的情况下入队很简单,初始化一个空节点(注意:这里是一个空节点,不是要入队的那个节点),然后将空节点头尾相连,这里并没有操作要入队的节点
设置完头尾指针相连的时候,方法没有返回,而是进入到else方法(tail!=null),也就是场景一的情况节点又去入队了
5. AQS独占模式下的同步状态获取
分两种情况获取
5.1 不响应中断获取
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上面的那段代码有两个条件
第一个是tyrAcquire方法,如果tyrAcquire返回true,则表示获取得到同步状态,acquire方法执行结束
第二个是acquireQueued方法,如果获取不到同步状态(tyrAcquire方法返回false),acquireQueued方法如果返回true,则中断自己(给当前线程打中断标记),如果acquireQueued方法返回true,则acquire方法执行结束
5.1.1 tyrAcquire方法,尝试获取一下同步状态
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
AQS没有实现,这是因为AQS使用模板设计模式,AQS内部组装模板,至于模板怎么用交由之类去实现,这里的含义就是由具体的同步器自己去实现这个规则,AQS内部帮你封装串起来
5.1.2 acquireQueued方法
acquireQueued方法有两个参数,第一个就是要入队的节点,即持当前线程的封装的节点,第二个是同步状态值,这里的int可以指定大小(支持可重入用),其中入队的分析上面已经给出了,该方法会返回入队的节点(Node),然后进入acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//设置执行是否成功或失败的标记
try {
boolean interrupted = false;//设置线程是否被中断的标记
for (;;) {//自旋
final Node p = node.predecessor();//获取当前节点前驱节点,获取不到抛空指针异常
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
接下来分析下面一段关键代码
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
...
}
判断当前节点前驱节点是否是头节点,且尝试获取同步状态,如果两者都为true,则表示该节点可以被执行,其中tryAcquire交由之类具体的同步器去执行(这里之所以不能根据头结点去执行,还要根据这个tryAcquire方法来判断是因为在非公平状态下,同步状态随时有可能被其他线程抢占式的修改,后面的非公平锁就可以看到这个判断的作用)
从入队的分析可以看到,在初始状态下,头指针指向一个空的节点
这时获取Node1的前驱节点就是head,如果tryAcquire方法返回true,那么将Node1设置为head节点
private void setHead(Node node) {
head = node;
node.thread = null;//设置为空,也是因为可以断掉thread的引用,让GC可以在线程执行完后回收该内存,而且这时也不需要该引用去设置中断标记之类的,故直接设置为null
node.prev = null;
}
p.next = null; // help GC
这是之前的Node节点就完全孤立了,没有任何引用指向它,GC可以将它回收掉
这时Node1就可以获取同步状态了
如果是下图的node2,那么会进入自旋休眠状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
这里也有两个判断,先看第一个shouldParkAfterFailedAcquire方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
进入到这里,就需要使用节点状态,逐行代码的分析:
(1)先获取前驱节点的等待状态,如果前驱节点的等待状态为SIGNAL,返回true
(2)如果前驱节点的等待状态大于0(大于0表示取消状态),则将该节点踢出
将该取消节点的前驱赋给它后面的节点,且一直往前走,直到找到没有取消的为止,然后将找到的最后一个取消节点(这里是往前找,所以最后一个肯定是在CLH队列的最前面那里),然后将最后一个取消节点的前驱节点的next指向当前节点
(3)如果前驱节点的等待状态小于0,将前驱节点的等待状态设置为SIGNAL,最后返回false
这个方法最后总结下就是,前驱节点的等待状态需要后继节点来设置为SIGNAL,如果本来就是SIGNAL,那么返回true就可以了,否则后继节点将其修改为SIGANL,如果要修改的前驱节点是取消状态,呢么需要将其从队列中踢掉,而之所以要将其前驱节点设置为SIGNAL,是因为后继节点的唤醒都是由它的前驱节点来触发
如果它的前驱节点为SIGNAL(设置成功后返回false后继续循环也会返回true)那么进入第二个判断parkAndCheckInterrupt方法
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
将该线程挂起,直到它的前驱节点将它唤醒或被其他线程给中断了,返回中断状态,如果是真的被中断了而唤醒,那么interrupted 会被设置为true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
最后如果没有获取到同步状态,而在获取的过程中出问题而非法跳出自旋的死循环,那么failed是为true,结束该方法时,取消该节点(因为非法退出,导致该节点是有问题的)
if (failed)
cancelAcquire(node);
cancelAcquire方法实现如下
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)//节点为null,返回
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;//获取该节点的前驱节点,缓存到pred变量
while (pred.waitStatus > 0)//如果他的前驱节点也是出于取消,则一直往前找,直到找到不取消的节点,并缓存给pred变量
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;//获取pred的next节点
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;//将该节点设置为取消状态
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {//如果该节点是尾节点,则cas的将pred设置为尾节点(因为它的前面也可能是取消节点,所以pred就是最后一个节点)
compareAndSetNext(pred, predNext, null);//pred成为尾节点后cas的将它的next指针设置为null
} else {//如果cas失败,或者取消的节点不是尾节点
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&//前驱节点不是头节点
((ws = pred.waitStatus) == Node.SIGNAL ||//前驱节点处于于SIGNAL,且将前驱节点设置SIGNAL成功
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)//将原有的next指针交给找到的前驱节点
compareAndSetNext(pred, predNext, next);
} else {//唤醒前驱节点
//上面的那个if,如果前驱节点是if且本身是SIGNAL(只有SIGNAL才有资格去唤醒它的后继节点),那么是需要去唤醒它的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
最后回到acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
经过上面的分析,如果tryAcquire获取不到,那么将该线程封装成Node插入到CLH队列的尾吧,如果是正常的被前驱节点唤醒,那么acquireQueued返回false,如果是被其他线程中断唤醒那么acquireQueued方法返回true,这时就会进入selfInterrupt方法
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
这里可以看到acquire的中断只是将当前线程的中断状态设置为true,至于是否需要处理这个中断状态完全交给它的具体同步器去处理,这里只是打一个中断标记,仅此而已
5.2 可中断获取
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
首先先判断线程是否被中断,如果被中断,则抛出InterruptedException异常,否则也是先tryAcquire尝试获取一下同步状态,作用和上面的acquire方法类似,如果tryAcquire返回false,直接进入doAcquireInterruptibly方法
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看到doAcquireInterruptibly的处理逻辑和之前的acquireQueued方法类似,只不过有点细节上的区别
首先doAcquireInterruptibly方法也是将该节点入队,然后获取前驱节点,判断是不是头结点且再次尝试获取同步状态,这点和acquireQueued方法类似,唯一的不同点是下面的这个if
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
当shouldParkAfterFailedAcquire方法和parkAndCheckInterrupt方法返回true时抛出InterruptedException异常,而acquireQueued方法只是将中断标记设置为true后又去尝试获取同步状态,也就是说doAcquireInterruptibly方法在线程被其他线程中断后会抛出InterruptedException异常,抛出异常后会将该节点取消掉(执行finally块代码)
5.3 释放同步状态
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
调用tryRelease尝试一下(有可能同步状态被其他线程给抢了,args可用于重入),如果成功则取根据等待状态唤醒后继节点 tryRelease同样是个模板方法,交由具体是同步器去实现
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
如果tryRelease成功后根据等待状态去唤醒(waitStatus != 0表示非初始状态)后继节点后返回true,否则返回false,所以这里的释放根据tryRelease来决定是否成功失败
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)//非取消状态,设置为初始状态
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//或去后继节点
if (s == null || s.waitStatus > 0) {如果后继节点为空或者后继节点是取消状态
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)//从后往回找,直到找到t == null或者t == node(这里就是找回到原点,可以退出循环)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)//如果找到的后继节点不为空
LockSupport.unpark(s.thread);//唤醒后继节点的线程
}
6. AQS共享模式下的同步状态获取
6.1 不响应中断获取
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared和之前的独占模式类似,交由具体的同步器去控制
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
小于0则表示获取失败,进入doAcquireShared方法
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
总体的处理和独占式的处理很类似
首先将线程封装成共享模式的节点后入队,然后也是获取它的前驱节点,判断是不是头节点,如果是,则进入tryAcquireShared再次去尝试获取同步状态,获取成功,则调用setHeadAndPropagate方法,这里是和独占模式有点不一样
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;//将后继节点赋给成员变量s
if (s == null || s.isShared())//如果该节点是尾节点或者是共享模式节点
doReleaseShared();
}
}
AQS的节点中有个状态是PROPAGATE,专门用来处理共享模式下的操作,setHeadAndPropagate方法的第二个参数是tryAcquireShared返回的,可以返回大于等于0的整数
(1)获取头节点,缓存到h变量,然后将该节点设置为头结点
(2)如果propagate大于0(允许多个线程获取同步状态) 或者 h == null(初始状态)获取头结点的等待状态为非取消状态 或者 将新设置的头结点赋给成员变量h且为空 或者 新的头结点的等待状态为非取消状态,进入if分支(这里已经吊用过addWaiter方法,头结点不应该为空,不知这里的判断是什么原因考虑,或者调用addWaiter方法在设置头尾节点的时候出问题而没设置陈成功?)
总之,上面的if判断无非就是头结点没有取消的情况下去唤醒后继节点,也就是doReleaseShared方法
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {//如果头结点不为空且头结点不等尾节点,意味着队列中有元素
int ws = h.waitStatus;//获取节点的等待状态
if (ws == Node.SIGNAL) {//如果是SIGNAL状态
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//设置状态回到初始状态
continue; //不成功重试
unparkSuccessor(h);//唤醒后继线程
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//节点为初始状态且将节点设置为PROPAGATE状态
continue; // cas失败继续重试
}
if (h == head) // 将要执行的结点设置为头结点,跳出循环
break;
}
}
接下来用图分析上面的情况
首先,如果node1,node2,node3都没有获取共享同步状态时,且刚开始都不是head,那么都会在doAcquireShared方法中的parkAndCheckInterrupt方法里面休眠,假设如果node1变为head后,进入setHeadAndPropagate方法,发现它的next是shared进入doReleaseShared,且head!=tail,如果此时node1是SIGNAL状态,则进行CAS设置回到初始状态(这里之所以需要cas,是因为下面的unparkSuccessor方法会唤醒后继线程node2,且node1在这个循环后进入下一轮也会将h==head,即这是的h也是node2,两个循环有可能冲突,保证一个成功),设置成功后调用unparkSuccessor唤醒后继节点,如果node1是在初始状态,那么直接将节点的等待状态设置为PROPAGATE,最后判断h是不是head,如果是则返回,不是继续循环(这里如果head被node1抢占后,唤醒后继线程去抢head,如果抢成功的话也就是node1唤醒了node2,node3唤醒了node3,以此类推,直到head没有被其他线程抢占)
这里也就提现了共享状态的实现就是用不断的主动通过前驱节点去唤醒后继节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)//非取消状态,设置为初始状态
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//或去后继节点
if (s == null || s.waitStatus > 0) {如果后继节点为空或者后继节点是取消状态
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)//从后往回找,直到找到t == null或者t == node(这里就是找回到原点,可以退出循环)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)//如果找到的后继节点不为空
LockSupport.unpark(s.thread);//唤醒后继节点的线程
}
至于doAcquireInterruptibly的后半部分,也就是休眠状态的设置和唤醒跟独占模式一样
这里总结一下共享模式下同步状态的获取
(1)tryAcquireShared由具体的同步器去实现控制
(2)如果tryAcquireShared获取不到同步状态,则添加到队列中
(3)添加到队列后会进入休眠,等待唤醒或被中断
(4)醒来后去获取同步状态,获取到后将自己替换为头结点,且唤醒后继线程,继续tryAcquireShared,直到tryAcquireShared获取不到后继续休眠
6.2 可中断获取
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
不一样的点和独占式的一样,抛出InterruptedException异常,而不响应中断的则是将其标记为中断状态
6.3 释放同步状态
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
和独占式的释放一样tryReleaseShared是模板方法,交由子类实现
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
成功后进入doReleaseShared方法后返回true,否则返回false,doReleaseShared在上面有过一个调用,就是线程在唤醒后会自动去调用doReleaseShared方法去获取同步状态,之所以可以在这里唤醒无非就是共享模式下可以又多个线程同时获取共享状态,那么一个线程出来后可以立马让下一个线程立马补上
- 超时处理
7.1 独占模式
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
可以看到超时的处理是响应中断的,如果线程被中断,会抛出InterruptedException异常,下面的处理交由两个方法来判断,tryAcquire和之前的一样,是个模板方法,这里主要是doAcquireNanos的区别
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)//超时小于0,返回false
return false;
final long deadline = System.nanoTime() + nanosTimeout;//计算下超时的截止时间
final Node node = addWaiter(Node.EXCLUSIVE);//设置独占模式创建节点后入队
boolean failed = true;//是否成功失败标记
try {
for (;;) {
final Node p = node.predecessor();//获取前驱节点
if (p == head && tryAcquire(arg)) {//如果是头结点且尝试获取同步状态成功
setHead(node);//设置头结点
p.next = null; // 情况前驱节点的next引用
failed = false;//标记成功
return true;//返回true
}
nanosTimeout = deadline - System.nanoTime();//计算超时剩余时间
if (nanosTimeout <= 0L)//剩余时间小于0,返回false,这里不用参数中的nanosTimeout而是需要再计算一遍因为上面的执行过程或者线程切换也是需要消耗时间,所以这里再次用System.nanoTime()计算超时时间,如果小于0,表示已经超时,返回false,另外,线程被唤醒后也会回到这里来继续计算剩余时间是否小于0,总之小于0,就是超时,直接返回false
return false;
if (shouldParkAfterFailedAcquire(p, node) &&//将前驱节点的等待状态设置为SIGNAL
nanosTimeout > spinForTimeoutThreshold)//剩余时间是否大于spinForTimeoutThreshold这个阈值(默认1000,),如果大于,则休眠nanosTimeout,这里之所以要判断大于spinForTimeoutThreshold后才能休眠是因为nanos时间太短,如果直接休眠,那么精度可能不准,所以设置一个阈值,超过阈值才休眠,否则就让其自旋
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())//如果被中断抛出InterruptedException异常
throw new InterruptedException();
}
} finally {
if (failed)//如果失败
cancelAcquire(node);//取消节点
}
}
这里的处理和acquireQueued很类似,但多传了个nanosTimeout参数用于超时判断
7.2 共享模式
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
共享模式和独占模式一样都是响应中断
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)//超时
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
其超时的处理也和独占模式的超时处理一样
8. AQS的辅助方法
8.1 获取和设置同步状态值state
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
//通过CAS方法保证原子的设置state
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
8.2 队列是否有元素
public final boolean hasQueuedThreads() {
return head != tail;//通过头尾队列是否相等判断队列是否有线程在等待
}
8.3 是否有线程竞争
public final boolean hasContended() {
return head != null;//如果头结点不为空,那么可以确定有线程在队列里面竞争等待
}
8.4 获取线程是否在等待队列中
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)//从尾到头扫描
if (p.thread == thread)//相等返回true
return true;
return false;
}
9. 条件等待
AQS中还包含了Condition,用于条件等待和唤醒,它主要的数据结构是ConditionObject
public class ConditionObject implements Condition, java.io.Serializable {
...
/** First node of condition queue. */
//条件队列的第一个等待节点
private transient Node firstWaiter;
/** Last node of condition queue. */
//条件队列的最后一个等待节点
private transient Node lastWaiter;
...
}
一个构造方法
public ConditionObject() { }
首先看下Condition队列的基本结构
可以看到condition是可以创建多个,一个condition队列由ConditionObject对象保存,包含firstWaiter和lastWaiter分别指向头尾节点,内部由Node本身的nextWaiter属性串联起来形成单向队列,当condition满足条件被唤醒后从头节点开始,将condition队列的头结点加入到CLH队列的尾部进行同步状态的获取,jdk称该行为为转移。
其中condition最重要的两个方法是await和signal方法,即根据条件等待和唤醒
public final void await() throws InterruptedException {
if (Thread.interrupted())//响应中断
throw new InterruptedException();//如果线程被中断,抛出InterruptedException异常
Node node = addConditionWaiter();//添加到条件队列的队尾
int savedState = fullyRelease(node);//释放持有的资源,即同步状态
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//判断该节点是否在CLH队列中
LockSupport.park(this);//休眠
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//如果中断,跳出while循环
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//唤醒后添加到CLH队列的队尾
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled 清除已取消的节点
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
可以看到,await第一件要做的事就是判断该线程是否被中断,如果中断,立马抛出异常,然后将其封装成Node节点(条件队列和阻塞队列共用Node封装实现框架高度统一)
private Node addConditionWaiter() {
Node t = lastWaiter;//获取当前条件队列的尾节点
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {//如果尾节点不是CONDITION状态
unlinkCancelledWaiters();//清除已取消的节点
t = lastWaiter;//从新将尾节点赋给t变量(unlinkCancelledWaiters可能会修改lastWaiter)
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)//队列初始化状态,设置头结点
firstWaiter = node;
else//设置下一个节点
t.nextWaiter = node;
lastWaiter = node;//设置尾节点
return node;
}
private void unlinkCancelledWaiters() {
Node t = firstWaiter;//条件队列的头节点
Node trail = null;
while (t != null) {//遍历真个条件队列
Node next = t.nextWaiter;//获取下一个节点
if (t.waitStatus != Node.CONDITION) {//如果当前的头结点的状态不等于CONDITION
t.nextWaiter = null;//将nextWaiter指针置为null,断掉链表中的指针
if (trail == null)//前驱节点为空
firstWaiter = next;//将下一个节点设置为头结点
else//将前驱节点的下一个节点设置为next
trail.nextWaiter = next;
if (next == null)//如果是队尾
lastWaiter = trail;//将trail设置为尾节点
}
else
trail = t;//保存下一个节点(next)的前面一个节点的引用,可以看做是前驱节点
t = next;//继续下一个节点
}
}
总之unlinkCancelledWaiters的作用就是将非CONDITION状态的节点全部踢掉
t = node1, trail = null, next = node2
t = node2, trail = null, next = node3
t = node4, trail=node3, next=node5
回到await方法继续往下走,调用fullyRelease方法释放同步状态(比如该线程持有锁,那么进入等待队列肯定要先释放锁给其他线程去竞争,跟notify和wait中wait方法类似,释放的时候需要持有锁)
final int fullyRelease(Node node) {
boolean failed = true;//是否成功或失败的标记
try {
int savedState = getState();//获取同步状态
if (release(savedState)) {//释放同步状态
failed = false;//标记成功
return savedState;
} else {//如果在释放的过程中,持有同步状态的不是该线程,抛出IllegalMonitorStateException异常
throw new IllegalMonitorStateException();
}
} finally {
if (failed)//如果执行失败,取消该节点,取消的节点会自动被从队列中踢掉
node.waitStatus = Node.CANCELLED;
}
}
release的释放规则最终会交给子类处理,前面的分析已经说过
回到await方法继续往下走,会判断该节点是否已经在CLH队列中
final boolean isOnSyncQueue(Node node) {
//如果该节点是等待状态是CONDITION或者它的前驱节点是null,那么肯定不在CLH队列中,返回false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) //如果它的后继节点非空,那么肯定在CLH队列,返回false
return true;
//上面的两个简单判断,node.prev == null和node.next != null相互呼应
//如果是CLH的头节点,那么prev为空,但next不为空,如果是CLH的尾节点,prev不为空,next为空,如果是CLH的初始状态,prev和next都为空,进入下面的findNodeFromTail方法进行整个CLH队列扫描判断
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;//从尾开始找,效率高,因为CONDITION队列到CLH队列都是添加到队尾
for (;;) {
if (t == node)//找到返回true
return true;
if (t == null)//找到队头,返回false
return false;
t = t.prev;//继续往前找上一个节点
}
}
如果不在CLH队列,那么就可以进入休眠状态,从休眠醒来的时候无非两种情况,要么被其他线程调用signal方法唤醒或者被其他线程中断,所以醒来后调用checkInterruptWhileWaiting方法进行判断
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
这里的判断是对中断情况的处理,首先判断是不是被中断,如果不是返回0,说明是被其他线程正常唤醒(可能有假唤醒),否则进入transferAfterCancelledWait方法判断是THROW_IE的中断模式还是REINTERRUPT的中断模式,下面看下transferAfterCancelledWait方法后来解释THROW_IE和REINTERRUPT
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {//cas设置节状态为0,即初始状态,之所以cas有可能被signal后状态被改成了0,所以这一步也表达了如果这里的cas成功,则是signal之前发生中断,如果cas失败,则signal之后发生中断
enq(node);//如果signal之前发生中断,也将其入队到CLH队列的队尾
return true;//返回true,表示signal之前就发生了中断,而非正常的signal之后的发生中断
}
while (!isOnSyncQueue(node))//如果在signal之后会将节点加入到CLH队列的队尾,这里判断是否已经转移到了CLH队列,如果还没到CLH队列,那么自旋等待,等其完成
Thread.yield();
return false;//返回false,即signal之后发生的中断
}
所以,THROW_IE和REINTERRUPT的含义分别是
THROW_IE:表示线程在signal之前就发生了中断,这种情况会直接抛异常响应中断
REINTERRUPT:表示线程在signal之后发生了中断,这种情况只是标记该线程被中断
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//判断该节点是否在CLH队列中
LockSupport.park(this);//休眠
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//如果中断,跳出while循环
break;
}
再回来看这段代码,线程被唤醒后如果interruptMode==0表示在await过程中没有被中断,那么就是被其他线程正常signal,然后入队到CLH队列,然后下次循环isOnSyncQueue方法返回true,跳出该循环,否则,不管返回THROW_IE(-1)还是REINTERRUPT(1)也都会跳出循环
另外这里解释下为什么signal之前被中断,也会进入到CLH队列,因为这是进入到CLH队列之后,如果该线程被后继节点唤醒,也机会继续处理中断情况(Thread.interrupted()已经返回true),这样整个流程就能串通起来,将中断的行为抛给上层的AQS去控制,至于AQS怎么去处理中断,则可以看之前的分析
继续往下走,跳出循环后进入下面的这段代码
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
其中第一个if是如果acquireQueued方法返回true且是正常被signal且不是signal之前发生中断,那么将中断模式设置为REINTERRUPT,什么意思呢?
意思就是如果acquireQueued返回true则意味着在CLH队列后被中断,那么也将中断模式设置为REINTERRUPT统一处理
第二个if是如果它醒来后有下一个节点,那么需要清空已取消的节点,如何取消之前已经分析过了
第三个判断是如果不是正常的signal来唤醒而是被中断唤醒则进入if方法
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
这里可以看到是对两种中断模式的处理,如果是THROW_IE那么抛出InterruptedException异常,这里需要联想到transferAfterCancelledWait方法以及THROW_IE的含义,之前说过THROW_IE代表在signal之前就被中断了,那么这种情况也是会进入到CLH队列,那么这里会抛出InterruptedException异常,响应该节点,也就是将在CLH队列中的节点直接以中断方式唤醒,进而响应中断,如果是REINTERRUPT,代表是signal之后被中断了,那么这里的中断只是标记一下,如何处理,交由子类实现,前面也已经分析过了
这样await的方法就分析完了,另外await还有几个重载的方法,其实都是用于控制超时,其处理方式和之前的CLH队列的超时类似
接下来就看下与await相对应的signal方法
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
第一行代码是调用isHeldExclusively方法进行判断调用signal的方法是否持有同步状态,如果没有持有锁抛出IllegalMonitorStateException异常,至于怎么持有同步状态交由具体同步器,即子类自己去实现,接下来是获取条件队列的头结点,如果头结点不为空,那么调用doSignal方法进行通知
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
这里有一个循环,因为第一个节点要出队,所以讲之前的头结点的下一个节点设置为头结点,如果设置后的头结点为空,那么表示该队列只有一个或者是最后一个节点,将lastWaiter设置为null否则将之前的头结点的nextWaiter设置为null
node1出队
如果到最后一个节点则
接下来解释在while做转移到CLH队列
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);//入队,返回的是node的前驱节点
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
首先,如果将节点的等待状态从CONDITION设置为初始状态(0)失败(如果节点被取消的话),返回false,转移失败,继续下一个节点的转移,如果有的话
(first = firstWaiter) != null//查找下一个节点,有则返回true,没有返回false
其次,将node节点调用enq方法入队到队尾,如果它的前驱节点是取消状态或者将前驱节点的等待状态设置为SIGNAL失败(节点入队就是需要将它的前驱节点的状态设置为SIGNAL,之前分析过),则唤醒该线程,然后返回true。
这里说下为什么前驱节点取消会唤醒后继节点,之前的分析中,后继节点都是通过后继节点来唤醒的,这时入队的节点发现它的前驱节点是取消后,会先唤醒它的后继节点,唤醒后后继节点会重新获取同步状态,同时也会重新设置它的前驱节点为非取消的节点,具体看上面shouldParkAfterFailedAcquire方法的分析,其实说白里这将其唤醒就是重新设置它的前驱节点
与signal相似的方法是signalAll(和notify、notifyAll一样,AQS的Condition也实现了类似的语义),可以唤醒Condition队列全部到CLH队列中排队获取同步状态
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
和signal很类似,不一样的是while条件是不断的查找下一个节点,直到找完所有节点,哪怕转移失败也继续往后找