JAVA 锁的实现原理 - litter-fish/ReadSource GitHub Wiki
Lock接口
public interface Lock {
// 加锁
void lock();
// 可中断获取锁,在获取锁的过程中可响应中断请求
void lockInterruptibly() throws InterruptedException;
// 尝试获取非阻塞锁,调用该方法后会理解返回,获取成功返回true,否则返回false
boolean tryLock();
// 在指定的时间内尝试获取锁,获取成功返回true,否则返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 解锁
void unlock();
// 获取等待通知组件,该组件与当前锁绑定,当前线程只有获得了锁
// 才能调用该组件的wait()方法,而调用后,当前线程将释放锁。
Condition newCondition();
}
并发基础组件AQS
AbstractQueuedSynchronizer又称队列同步器(AQS),用来构建锁和其他同步器组件的基础。内部通过一个int类型的成员变量state来控制同步状态, 当state=0时,说明没有任何线程占用共享资源的锁,当state>0时,说明有线程正在使用共享变量。这时如果获取锁失败了会将线程加入队列同步器中进行等待。 AQS有两种队列:
- 同步队列:获取锁失败的线程会构造成一个Node节点然后加入到这个队列的队尾
- 等待队列:当调用了锁的await方法时,会将同步队列中的头节点取出重新构造Node对象,并加入到等待队列的队尾, 当其他线程通过调用signal方法时,会将节点从等待队列中取出,加入到同步队列的队尾。
AQS同步队模型
/**
* AQS抽象类
*/
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer{
//指向同步队列队头
private transient volatile Node head;
//指向同步的队尾
private transient volatile Node tail;
//同步状态,0代表锁未被占用,1代表锁已被占用
private volatile int state;
//省略其他代码......
}
Node 的结构
static final class Node {
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 标识线程已经被取消
static final int CANCELLED = 1;
// 等待被唤醒
static final int SIGNAL = -1;
// 条件状态
static final int CONDITION = -2;
// 在共享模式下表示获取的同步状态会被传播
static final int PROPAGATE = -3;
// 等待状态,存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4种
volatile int waitStatus;
// 同步队列的前驱节点
volatile Node prev;
// 同步队列的后继节点
volatile Node next;
// 请求锁的线程
volatile Thread thread;
// 等待队列的后继节点
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
等待状态:
- CANCELLED:值为1,在等待队列中等待超时或被中断,需要从同步队列中取消该Node节点
- SIGNAL:值为-1,
- CONDITION:值为-2,表示节点在等待队列中,当其他线程调用了signal方法后,节点将会被从等待队列移动到同步队列
- PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
- 0状态:值为0,代表初始化状态。
ReentrantLock与AQS的关系
- AbstractOwnableSynchronizer:抽象类,定义了存储独占当前锁的线程和获取的方法
- AbstractQueuedSynchronizer:抽象类,AQS框架核心类
- Node:AbstractQueuedSynchronizer 的内部类,同步队列的存储实例对象。
- Sync:抽象类,是ReentrantLock的内部类,继承自AbstractQueuedSynchronizer,实现了释放锁的操作(tryRelease()方法),并提供了lock抽象方法,由其子类实现。
- NonfairSync:ReentrantLock的内部类,继承自Sync,非公平锁的实现类。
- FairSync:ReentrantLock的内部类,继承自Sync,公平锁的实现类。
- ReentrantLock:实现了Lock接口的,其内部类有Sync、NonfairSync、FairSync,在创建时可以根据fair参数决定创建NonfairSync(默认非公平锁)还是FairSync。
AQS中提供的模板方法
// AQS中提供的主要模板方法,由子类实现。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer{
// 独占模式下获取锁的方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 独占模式下解锁的方法
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式下获取锁的方法
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式下解锁的方法
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 判断是否为持有独占锁
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
}
基于ReentrantLock分析AQS独占模式实现过程
ReentrantLock构造方法
// 默认构造,创建非公平锁NonfairSync
public ReentrantLock() {
sync = new NonfairSync();
}
// 根据传入参数创建锁类型
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
加锁操作
// 加锁操作
public void lock() {
sync.lock();
}
非公平锁加锁
获取锁的时序图:
非公平加锁操作
static final class NonfairSync extends Sync {
// 加锁
final void lock() {
// 执行CAS操作,获取同步状态
if (compareAndSetState(0, 1))
// 成功则将独占锁线程设置为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 否则再次请求同步状态
acquire(1);
}
}
acquire方法对中断操作不敏感,获取锁失败后,进入同步队列
尝试获取同步状态
public final void acquire(int arg) {
if (!tryAcquire(arg) // 尝试获取同步状态
&& acquireQueued(addWaiter(Node.EXCLUSIVE), // 构造一个Node节点加入同步队列
arg))
selfInterrupt();
}
再次尝试获取同步状态
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
// 非公平尝试获取同步状态
return nonfairTryAcquire(acquires);
}
}
调用父类获取同步状态方法
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果同步状态为0,尝试再次获取同步状态
if (c == 0) {
// CAS更新同步状态
if (compareAndSetState(0, acquires)) {
// 同步状态更新成功设置当前线程占用锁
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前线程已获取锁,属于重入锁,再次获取锁后将status值加1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置当前同步状态,当前只有一个线程持有锁,因为不会发生线程安全问题,可以直接执行 setState(nextc);
setState(nextc);
return true;
}
return false;
}
// 省略其他代码
}
一是尝试再次获取同步状态,如果获取成功则将当前线程设置为OwnerThread,否则失败, 二是判断当前线程current是否为OwnerThread,如果是则属于重入锁,state自增1,并获取锁成功,返回true,反之失败,返回false,也就是tryAcquire(arg)执行失败,返回false
如果再次尝试获取锁失败则构造一个独占式的节点
private Node addWaiter(Node mode) {
// 将请求同步状态失败的线程封装成结点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 如果同步队列的尾节点不是一个null节点
if (pred != null) {
// 获取尾节点的前一个节点
node.prev = pred;
// 通过CAS设置尾节点为当前节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果第一次加入或者CAS操作没有成功执行enq入队操作
enq(node);
return node;
}
如果节点是第一个加入节点或上面CAS设置新的尾节点失败,则只需enq方法加入同步队列中
private Node enq(final Node node) {
// 死循环,直到成功加入同步队列中
for (;;) {
Node t = tail;
// 如果队列为null,则进行初始化操作,创建一个头节点
if (t == null) { // Must initialize
// 创建并使用CAS设置头结点
if (compareAndSetHead(new Node()))
tail = head;
} else { // 队尾添加节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- 如果还没有初始同步队列则创建新结点并使用compareAndSetHead设置头结点,tail也指向head,
- 如果队列已存在,则将新结点node添加到队尾。 从上面代码可以看出head节点本身不会存储任何数据,只是起到牵头节点的作用。而tail节点永远指向尾节点
节点的自旋操作 添加到同步队列后,结点就会进入一个自旋过程,即每个结点都在观察时机待条件满足获取同步状态,然后从同步队列退出并结束自旋
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋,死循环
for (;;) {
// 获取前驱结点
final Node p = node.predecessor();
// 当且仅当p为头结点才尝试获取同步状态
if (p == head && tryAcquire(arg)) {
// 将node设置为头结点
setHead(node);
// 清空原来头结点的引用便于GC
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果前驱结点不是head,判断是否挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 最终都没能获取同步状态,结束该线程的请求
cancelAcquire(node);
}
}
设置头节点
private void setHead(Node node) {
head = node;
// 清空结点数据
node.thread = null;
node.prev = null;
}
判断当前结点的前驱结点是否为SIGNAL状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取当前结点的等待状态
int ws = pred.waitStatus;
// 如果为等待唤醒(SIGNAL)状态则返回true
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 如果ws>0 则说明是结束状态,
// 遍历前驱结点直到找到没有结束状态的结点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果ws小于0又不是SIGNAL状态,
// 则将其设置为SIGNAL状态,代表该结点的线程正在等待唤醒。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
若shouldParkAfterFailedAcquire()方法返回true,即前驱结点为SIGNAL状态同时又不是head结点,那么使用parkAndCheckInterrupt()方法挂起当前线程,称为WAITING状态,需要等待一个unpark()操作来唤醒它
阻塞线程并判断线程是否被中断
private final boolean parkAndCheckInterrupt() {
// 将当前线程挂起
LockSupport.park(this);
// 获取线程中断状态,interrupted()是判断当前中断状态,
// 并非中断线程,因此可能true也可能false,并返回
return Thread.interrupted();
}
获取锁的流程操作
lockInterruptibly()、tryLock()方法
该方法最终都胡调用 doAcquireInterruptibly
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
最大的不同点
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//直接抛异常,中断线程的同步状态请求
throw new InterruptedException();
解锁操作
解锁时序图:
unlock解锁操作
// ReentrantLock类的 unlock
public void unlock() {
sync.release(1);
}
AQS类的release()方法
// AQS类的release()方法
public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继结点的线程
unparkSuccessor(h);
return true;
}
return false;
}
尝试释放锁
// ReentrantLock类中的内部类Sync实现的tryRelease(int releases)
protected final boolean tryRelease(int releases) {
// 获取状态值 state - 1
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 判断状态是否为0,如果是则说明已释放同步状态
if (c == 0) {
free = true;
// 设置Owner为null
setExclusiveOwnerThread(null);
}
// 设置更新同步状态
setState(c);
return free;
}
唤醒后继节点
private void unparkSuccessor(Node node) {
// 获取头节点的状态值
int ws = node.waitStatus;
if (ws < 0)
// CAS更新状态值为0 ????
compareAndSetWaitStatus(node, ws, 0);
// 找到下一个需要唤醒的结点s
Node s = node.next;
// 如果为空或已取消
if (s == null || s.waitStatus > 0) {
s = null;
// 找到最前边的有效的节点,这边为啥不直接从头节点开始找 ??????
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒
LockSupport.unpark(s.thread);
}
公平锁加锁
与非公平锁不同的是,在获取锁的时,公平锁的获取顺序是完全遵循时间上的FIFO规则,也就是说先请求的线程一定会先获取锁,后来的线程肯定需要排队
获取公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先判断同步队列是否存在结点
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
公平锁与非公平锁的区别在于:公平锁在使用CAS设置同步状态时会先判断同步队列中是否存在节点,如果存在则需要先执行等待队列里的所有节点。 而非公平锁的获取时,不管同步队列中是否存在节点都先通过CAS尝试获取同步状态。