AbstractQueuedSynchronizer - tenji/ks GitHub Wiki
AQS (AbstractQueuedSynchronizer)
AQS 是 AbstractQueuedSynchronizer 类的简称,虽然我们不会直接使用这个类,但是这个类是 Java 很多并发工具的底层实现。本文主要从源码的角度,全方位的解析 AQS 类。
一、AQS 定义
可以看到,CountDownLatch, Semaphore, ReentrantLock 等等常见的工具类都是由 AQS 来实现的。所以不管是面试也好,还是自己研究底层实现也好,AQS 类都是必须要重点关注的。
二、AQS 基本结构
首先从 AQS 类的定义开始,逐步深入了解。AQS 类的定义如下:
/**
* 可以看到AbstractQueuedSynchronizer是一个抽象类
* 实现了Serializable 接口
* @since 1.5
* @author Doug Lea
*/
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* The synchronization state.
* state变量表示锁的状态
* 0 表示未锁定
* 大于0表示已锁定
* 需要注意的是,这个值可以用来实现锁的【可重入性】,例如 state=3 就表示锁被同一个线程获取了3次,想要完全解锁,必须要对应的解锁3次
* 同时这个变量还是用volatile关键字修饰的,保证可见性
*/
private volatile int state;
/**
* 等待队列的头节点,只能通过setHead方法修改
* 如果head存在,能保证waitStatus状态不为CANCELLED
*/
private transient volatile Node head;
/**
* 等待队列的尾结点,只能通过enq方法来添加新的等待节点
*/
private transient volatile Node tail;
}
AbstractQueuedSynchronizer 从名字上就可看出本质是一个队列(Queue),其内部维护着 FIFO 的双向队列,也就是 CLH 队列。
CLH (Craig, Landin, and Hagersten) lock queue
这个队列中的每一个元素都是一个 Node,所以接下来了解一下其内部类 Node,内部类 Node 的定义如下:
static final class Node {
// 节点正在共享模式下等待的标记
static final Node SHARED = new Node();
// 节点正在以独占模式等待的标记
static final Node EXCLUSIVE = null;
// waitStatus变量的可选值,因为超时或者或者被中断,节点会被设置成取消状态。被取消的节点不会参与锁竞争,状态也不会再改变
static final int CANCELLED = 1;
// waitStatus变量的可选值,表示后继节点处于等待状态,如果当前节点释放了锁或者被取消,会通知后继节点去运行
static final int SIGNAL = -1;
// waitStatus变量的可选值,表示节点处于condition队列中,正在等待被唤醒
static final int CONDITION = -2;
// waitStatus变量的可选值,下一次acquireShared应该无条件传播
static final int PROPAGATE = -3;
// 节点的等待状态
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 获取同步状态的线程
volatile Thread thread;
// 下一个condition队列等待节点
Node nextWaiter;
// 是否是共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
// 返回前驱节点或者抛出异常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
有了前面的基础,再来看下 AQS 的基本结构:
三、核心方法
我们都知道 CountDownLatch、CyclicBarrier、Semaphore、ReentrantLock 这些工具类中,有的只支持独占,如 ReentrantLock#lock(),有的支持共享,多个线程同时执行,如Semaphore。并且,从前文 Node 类的定义也可以看到:
// 节点正在共享模式下等待的标记
static final Node SHARED = new Node();
// 节点正在以独占模式等待的标记
static final Node EXCLUSIVE = null;
AQS 实现了两套加锁解锁的方式,那就是独占式和共享式。我们先看下独占式的实现,独占式的实现,就从 ReentrantLock#lock() 方法开始。
ReentrantLock#lock
该方法定义如下:
public void lock() {
sync.lock();
}
其中 sync 是 AbstractQueuedSynchronizer 的实现,我们知道,ReentrantLock 支持公平锁和非公平锁,其实现类分别是 FairSync 和 NonfairSync,我们看看公平锁和非公平锁分别是怎么实现的:
// FairSync 公平锁的实现
final void lock() {
acquire(1);
}
// NonfairSync 非公平锁的实现
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
可以看到,非公平锁的实现仅仅是多了一个步骤:通过 CAS (compareAndSetState) 的方式尝试改变 state 的状态,修改成功后设置当前线程以独占的方式获取了锁,修改失败执行的逻辑和公平锁一样。
这就是公平锁和非公平锁的本质区别。
从这段代码中可以看到,独占锁加锁的核心逻辑就是 acquire 方法,接下来就看看这个方法。
acquire
该方法定义如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
该方法主要调用 tryAcquire 方法尝试获取锁,成功返回 true,失败就将线程封装成 Node 对象,放入队列。
tryAcquire
tryAcquire 方法在 AQS 中并没有直接实现,而是采用模板方法的设计模式,交给子类去实现。我们来看公平锁的实现。
protected final boolean tryAcquire(int acquires) {
// 当前线程
final Thread current = Thread.currentThread();
// 获取state状态,0表示未锁定,大于1表示重入
int c = getState();
if (c == 0) {
// 表示没有线程获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 没有比当前线程等待更久的线程了,通过CAS的方式修改state
// 成功之后,设置当前拥有独占访问权的线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 独占访问权的线程就是当前线程,重入
// 此处就是【可重入性】的实现
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 直接修改state
setState(nextc);
return true;
}
return false;
}
可以看到该方法就是以独占的方式获取锁,获取成功后返回 true。从这个方法可以看出 state 变量是实现可重入性的关键。
非公平锁的实现方式大同小异。
acquire 方法除了调用 tryAcquire,还调用了 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),这里分为两步,先看下 addWaiter 方法。
addWaiter
该方法用于把当前线程封装成一个 Node 节点,并加入队列。方法定义如下:
/**
* Creates and enqueues node for current thread and given mode.
* 为当前线程和给定模式创建并排队节点,给的的模式分为:
* 1、Node.EXCLUSIVE:独占模式
* 2、Node.SHARED:共享模式
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
*/
private Node addWaiter(Node mode) {
// 创建Node节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 尝试快速添加尾结点,失败就执行enq方法
Node pred = tail;
if (pred != null) {
node.prev = pred;
// CAS的方式设置尾结点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 快速添加失败,执行该方法
enq(node);
return node;
}
enq 方法定义如下:
/**
* Inserts node into queue, initializing if necessary. See picture above.
* 将节点插入队列,必要时进行初始化
*
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
// 自旋
Node t = tail;
if (t == null) { // Must initialize
// 尾结点为空,队列还没有进行初始化
// 设置头节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// CAS的方式设置尾结点,失败就进入下次循环
// 也就是【自旋 + CAS】的方式保证设置成功
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
可以看到该方法就是用来往队列尾部插入一个新的节点,通过自旋 + CAS的方式保证线程安全和插入成功。
需要注意的是,该方法返回的Node节点不是新插入的节点,而是新插入节点的前驱节点。
acquireQueued
ReentrantLock#unlock
方法定义如下:
public void unlock() {
sync.release(1);
}
我们已经知道了 sync 是 AQS 的实现,所以直接查看 AQS 中的 release 方法
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 尝试释放锁
Node h = head;
if (h != null && h.waitStatus != 0)
// 头节点已经释放,唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
方法定义如下:
protected final boolean tryRelease(int releases) {
// 计算剩余的重入次数
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否完全的释放了锁(针对可重入性)
boolean free = false;
if (c == 0) {
// 表示完全释放了锁
free = true;
// 设置独占锁的持有者为null
setExclusiveOwnerThread(null);
}
// 设置AQS的state
setState(c);
return free;
}
unparkSuccessor
unparkSuccessor 方法用于唤醒后继节点,其定义如下:
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
// 获取当前节点的状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 当前节点的后继节点为null,或者被取消了
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
// 从尾结点查找状态不为取消的可用节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒后继节点
LockSupport.unpark(s.thread);
}
相信大家已经猜到了,和加锁时一样,这里的 tryRelease 方法同样使用了模板方法的设计模式,其真正的逻辑由子类实现。
前文说过 AQS 实现了两套同步逻辑,也就是独占式和共享式 。看完了独占式锁的实现,再来看一下共享式。这里以 Semaphore 为例。
Semaphore#acquire
该方法是作用是请求一个许可,如果暂时没有可用的许可,则被阻塞,等待将来的某个时间被唤醒。因为 Semaphore 可以允许多个线程同时执行,所以可以看成是共享锁的实现。该方法定义如下:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
sync 是 AQS 的实现,可以看到 acquire 方法底层调用的是 acquireSharedInterruptibly 方法。
在 JDK 中,与锁相关的方法,Interruptibly 表示可中断,也就是可中断锁。可中断锁的意思是线程在等待获取锁的过程中可以被中断,换言之,线程在等待锁的过程中可以响应中断。
接下来看看 acquireSharedInterruptibly 方法的实现。
acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
// 检测线程的中断中断状态,如果已经被中断了,就响应中断
// 该方法会清除线程的中断标识位
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared
tryAcquireShared 方法,相信大家已经能看出来,这里使用了模板方法模式,具体实现由子类去实现。Semaphore也实现了公平模式和非公平模式。公平的方式和非公平的方式实现逻辑大同小异。所以具体看下公平模式下的实现方式:
protected int tryAcquireShared(int acquires) {
for (;;) {
// 自旋
if (hasQueuedPredecessors())
// 如果有线程排在自己的前面(公平锁排队),直接返回
return -1;
// 获取同步状态的值
int available = getState();
// 可用的(许可)减去申请的,等于剩余的
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
// 如果剩余的小于0,或者设置状态成功,就返回,如果设置失败,则进入下一次循环
// 如果剩余小于0,返回负数,表示失败
// 如果设置状态成功,表示申请许可成功,返回正数
return remaining;
}
}
此处还是 自旋 + CAS 的方式保证线程安全和设置成功。
doAcquireSharedInterruptibly
doAcquireSharedInterruptibly 方法定义如下:
/**
* Acquires in shared interruptible mode.
* 在共享可中断模式下请求(许可)
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 为当前线程和给定模式创建节点并插入队列尾部,addWaiter方法前文讲解过
final Node node = addWaiter(Node.SHARED);
// 操作是否失败
boolean failed = true;
try {
for (;;) {
// 自旋
// 获取当前节点的前驱节点
final Node p = node.predecessor();
if (p == head) {
// 如果前驱节点是头节点,以共享的方式请求获取锁,tryAcquireShared方法前文讲解过
int r = tryAcquireShared(arg);
if (r >= 0) {
// 成功获取锁,设置头节点和共享模式传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果前驱节点不是头节点或者没有获取锁
// shouldParkAfterFailedAcquire方法用于判断当前线程是否需要被阻塞,该方法前文讲解过
// parkAndCheckInterrupt方法用于阻塞线程并且检测线程是否被中断,该方法前文讲解过
// 没抢到锁的线程需要被阻塞,避免一直去争抢锁,浪费CPU资源
throw new InterruptedException();
}
} finally {
if (failed)
// 自旋异常退出,取消正在进行锁争抢
cancelAcquire(node);
}
}
加锁的逻辑已经完成,再来看看解锁的逻辑。
Semaphore#release
release 用于释放许可,其方法定义如下:
public void release() {
sync.releaseShared(1);
}
releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 自旋
// 获取同步状态的值
int current = getState();
// 可用的(许可)加上释放的,等于剩余的
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
// CAS的方式设置同步状态
return true;
}
}
可以看到此处依旧是 自旋 + CAS 的操作。
doReleaseShared
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
// 自旋
// 记录头节点
Node h = head;
if (h != null && h != tail) {
// 头节点不为null,且不等于尾结点,说明队列中还有节点
// 获取头节点等待状态
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 头节点等待状态是SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
// 如果修改节点等待状态失败,进入下一次循环
continue; // loop to recheck cases
// 修改成功后,唤醒后继节点,unparkSuccessor前文讲过
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
总结
AQS 可以说是整个并发编程中最难的一个类。但是理解 AQS 的实现却非常重要,因为它是 JDK 中锁和其他同步工具实现的基础。