ReentrantReadWriteLock 实现分析 - litter-fish/ReadSource GitHub Wiki
ReentrantReadWriteLock的基本架构
ReentrantReadWriteLock的实现非常经典,由于ReentrantReadWriteLock允许同时创建读锁和写锁,其中读锁是共享锁,而写锁是独占锁,因此它需要同时控制两种同步状态,且这两种同步状态是不同的。ReentrantReadWriteLock也是基于AQS类实现的,从前面的文章中我们知道,AQS使用了一个int类型的变量state来控制同步状态的获取、重入和释放,这种使用在ReentrantLock的实现中尤其明显;与ReentrantLock一样,ReentrantReadWriteLock类中也存在着同步器Sync的概念,且在ReentrantReadWriteLock中巧妙地使用Sync将state变量分成了两个部分分别用于控制两种同步状态,源码如下:
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
* AQS提供的state是int类型的变量,占32位
* 在ReentrantReadWriteLock中需要同时表示独占和共享两种模式的加锁次数
* 因此将32位state分为两部分,高16位表示共享锁加锁次数,低16位表示独占锁加锁次数
* 因此共享和独占最多加锁次数为2^16 - 1,即65535次
*/
// 该值是位移位数标识
static final int SHARED_SHIFT = 16;
/**
* 共享模式的计数器,65536, 0001 0000 0000 0000 0000
* 由于共享锁加锁次数占据高16位,因此每次加锁,需要加上第17位为1,其余位全为0的值,才能正确完成计数
*/
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 加锁的最大次数,即16位二进制最大表示的大小,65535, 1111 1111 1111 1111
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 计算独占模式的高位掩码,65535, 0001 0000 0000 0000 0000 - 1 -> 1111 1111 1111 1111
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/**
* Returns the number of shared holds represented in count
* 获取共享模式下加锁的次数
*/
static int sharedCount(int c) {
// 无符号右移,将低16位抛弃后,得到的就是高16位,即共享锁加锁次数
return c >>> SHARED_SHIFT;
}
/**
* Returns the number of exclusive holds represented in count
* 获取独占模式下加锁的次数
*/
static int exclusiveCount(int c) {
// c和0000 0000 0000 0000 1111 1111 1111 1111相与后,高16位被抛弃,得到低16位,即独占锁加锁次数
return c & EXCLUSIVE_MASK;
}
...
}
从源码可以得知,ReentrantReadWriteLock类的内部抽象类Sync继承自AQS类,state作为一个int类型的变量,在内存中存储的二进制长度为32位,因此Sync将state分为两部分,其中高16位代表共享锁的加锁次数,而低16位代表独占锁的加锁次数,通过一系列的运算实现,详细分析如下:
- SHARED_SHIFT:这个常量的值恒定为16,代表在计算过程中需要用到的位移位数。
- MAX_COUNT:代表加锁的最大次数;因为state分为两部分后,共享锁和独占锁的记录值最大只能是16位的二进制数,因此最大加锁次数为65535次(16位全为1,2^16 - 1),就是MAX_COUNT的值。
- SHARED_UNIT:这个值是在更新共享锁时的计数单位;由于共享锁占据高16位,因此每次共享锁加锁次数加1,都需要加上第17位为1,其余位全为0的值,才能正确完成计数。
- EXCLUSIVE_MASK:用于计算独占锁的高位掩码,通过将state值与该值相与,可以将state的高16位全部置为0,得到的就是低16位的值。
同时,Sync提供了两个方法用于获取共享锁和独占锁的重入次数。 sharedCount(int c)用于获取共享锁的重入次数,它的实现是将传入的c(具体使用中会传入state)进行无符号右移16位操作,右移后c值的低16位会被抛弃,高16位会被补0,得到的就是共享锁的重入次数。 exclusiveCount(int c)用于获取独占锁的重入次数,它的实现是将传入的c(具体使用中会传入state)和EXCLUSIVE_MASK做与计算操作,计算后c值的高16位会被置为0,得到的低16位就是独占锁的重入次数。
下图是对上述各类值和操作的示意图:
ReentrantReadWriteLock对state的整体结构:
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
public ReentrantReadWriteLock.WriteLock writeLock() {
return writerLock;
}
public ReentrantReadWriteLock.ReadLock readLock() {
return readerLock;
}
abstract static class Sync extends AbstractQueuedSynchronizer { ... }
static final class NonfairSync extends Sync { ... }
static final class FairSync extends Sync { ... }
public static class ReadLock implements Lock, java.io.Serializable { ... }
public static class WriteLock implements Lock, java.io.Serializable { ... }
}
写锁分析
WriteLock类中的lock()和unlock()方法:
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
ReentrantReadWriteLock类中FairSync的acquire(int)和release(int)方法也是来自父类Sync,而父类Sync的这两个方法来自于AQS:
public final void acquire(int arg) {
/**
* 这里的tryAcquire()会尝试获取同步状态
* 如果没有获取到,将会调用addWaiter()方法将当前线程包装为一个Node节点加入等待队列
* 然后对节点调用acquireQueued()方法使其进入自旋尝试获取同步的状态
* 加入成功后将中断当前线程
*/
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒等待状态的线程
unparkSuccessor(h);
return true;
}
return false;
}
tryAcquire(int)方法的源码:
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
// state值
int c = getState();
// 获取独占锁的持有记录值
int w = exclusiveCount(c);
if (c != 0) {
/**
* (Note: if c != 0 and w == 0 then shared count != 0)
* 当c不为0时,表示独占锁和共享锁其中一个必然有被线程持有,分两种情况:
* 1. 当独占锁记录值为0,则此时共享锁必然被持有了,则获取独占锁失败,直接返回false
* 2. 当独占锁记录值不为0,即此时独占锁被持有了,就需要判断获取锁的线程是否就是当前拥有独占锁的线程,如果不是则获取独占锁失败,直接返回false
*/
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 判断独占锁重入次数是否过多,导致记录值超过MAX_COUNT
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
/**
* Reentrant acquire
* 获取独占锁成功,更新state的独占锁记录值
*/
setState(c + acquires);
return true;
}
/**
* 此时state值为0,表示没有线程获取锁
* writerShouldBlock()底层调用了hasQueuedPredecessors(),用于判断同步队列是否有线程等待了很久
* 如果没有等待的线程,就尝试CAS方式修改state值,
* 如果修改失败说明此时有其他线程并发抢锁,而当前线程没抢到,直接返回false
*/
if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
return false;
// 抢锁成功,将独占锁线程设置为当前线程
setExclusiveOwnerThread(current);
return true;
}
如果有线程持有了共享锁(读锁),那么获取独占锁(写锁)是失败的,即当有线程在进行读操作时,无法进行写操作。同时写操作同一时刻只允许一个线程进行。
释放锁操作的tryRelease(int)方法比较简单,源码如下:
protected final boolean tryRelease(int releases) {
// 判断当前线程是否是拥有独占锁的线程,如果不是直接抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 计算释放锁后的state值
int nextc = getState() - releases;
// 判断独占锁记录值是否为0,如果为0表示可以释放独占锁了
boolean free = exclusiveCount(nextc) == 0;
if (free)
// 将拥有独占锁的线程记录变量设置为null,即释放独占锁
setExclusiveOwnerThread(null);
// 更新state值
setState(nextc);
// 返回值表示独占锁是否已被释放
return free;
}
读锁分析
ReentrantReadWriteLock类的读锁,是典型的共享锁的应用。类似于写锁,读锁的lock()和unlock()的实现实际对应Sync的tryAcquireShared(int)和tryReleaseShared(int)方法:
public void lock() {
sync.acquireShared(1);
}
public void unlock() {
sync.releaseShared(1);
}
acquireShared(int)和releaseShared(int)都属于AQS的方法,内部又调用了tryAcquireShared(int)和tryReleaseShared(int):
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
// 获取失败将调用doAcquireShared()
doAcquireShared(arg);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryAcquireShared(int)和tryReleaseShared(int)由ReentrantReadWriteLock的内部类Sync进行了重写。
获取读锁
获取读锁(共享锁)的tryAcquireShared(int)方法的源码:
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
// 获取state值
int c = getState();
/**
* 如果已经有线程获取了独占锁,且当前线程并不是拥有独占锁的线程,则获取共享锁失败,直接返回-1
* 从这里我们可以得知,拥有写锁的线程其实是可以同时拥有读锁的,这也是锁降级的实现,后面会讲解
*/
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
// 共享锁持有计数
int r = sharedCount(c);
/**
* 如果同步队列中没有线程已经等待了很久
* 且共享锁的计数小于最大阈值,则尝试修改state的值,即更新共享锁记录值
*/
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
// 能够进入if内部,表示更新共享锁记录值成功
if (r == 0) {
/**
* 如果r为0,则表示之前其实没有线程持有共享锁,
* 因此当前线程是第一个持有共享锁的线程,使用firstReader记录当前线程
* 同时设置firstReaderHoldCount为1
*/
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
/**
* 如果r不为0,则判断当前线程是否是第一个持有共享锁的线程,
* 如果是,就将firstReaderHoldCount的值加1
*/
firstReaderHoldCount++;
} else {
/**
* 走到这里,说明当前线程不是第一个获取共享锁的线程
* 需要在readHolds的本线程副本中记录当前线程重入数,readHolds是一个ThreadLocal类型的对象
* 这是为了实现JDK 1.6中加入的getReadHoldCount()方法的,
* 这个方法能获取当前线程重入共享锁的次数,
* 原理很简单:
* 如果当前只有一个线程的话,还不需要使用readHolds,直接更新firstReaderHoldCount来记录重入数,
* 当有第二个线程来的时候,就要使用readHolds,每个线程拥有自己的副本,用来保存自己的重入数。
*/
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
// 返回1,表示获取共享锁成功
return 1;
}
/**
* 走到这里,说明获取共享锁失败,会进入自旋抢锁;失败的情况有三种:
* 1. 当前同步队列已经有线程等待很久了;
* 2. 共享锁的计数太大,已经超过最大阈值;
* 3. CAS方式修改共享锁记录值失败
*/
return fullTryAcquireShared(current);
}
读锁记录了每个线程重入的次数,在内部实现上,如果仅仅只有一个线程多次重入获取了读锁,其实只会更新成员变量firstReaderHoldCount进行记录;一旦有多个线程获取了读锁,就会使用ThreadLocal类型的变量readHolds分别记录每个线程的重入次数。
tryAcquireShared(int)方法的最后,当获取共享锁失败时,会调用fullTryAcquireShared(current)方法,该方法源码如下:
/**
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (; ; ) {
// state值
int c = getState();
if (exclusiveCount(c) != 0) {
// 如果已有线程持有独占锁,并且持有独占锁的线程不是当前线程,则表示获取失败,直接返回-1
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// 走到这里表示可能需要阻塞当前线程
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// 当前线程是第一个获取共享锁的线程,则继续往下执行
} else {
// 否则取得当前线程的获取锁的次数,如果次数为0,则表示获取失败,直接返回-1
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// 如果共享锁的计数等于最大阈值,抛出错误
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// CAS方式修改共享锁的计数
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
/**
* 如果sharedCount(c)为0,则表示之前其实没有线程持有共享锁,注意此时的c是修改前的共享锁的计数
* 因此当前线程是第一个持有共享锁的线程,使用firstReader记录当前线程
* 同时设置firstReaderHoldCount为1
*/
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
/**
* 如果sharedCount(c)不为0,则判断当前线程是否是第一个持有共享锁的线程,
* 如果是,就将firstReaderHoldCount的值加1
*/
firstReaderHoldCount++;
} else {
/**
* 走到这里,说明当前线程不是第一个获取共享锁的线程
* 需要在readHolds的本线程副本中记录当前线程重入数
*/
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
// 返回1,表示获取共享锁成功
return 1;
}
}
}
从源码可以看出fullTryAcquireShared(Thread)方法与tryAcquireShared(int)主要功能基本相同,不同的是fullTryAcquireShared(Thread)方法会进入自旋,在CAS方式修改state失败后在下一次循环重新尝试修改。同时,如果当前有线程获取了写锁(即独占锁),或者需要进行阻塞等待时,会返回-1。
当tryAcquireShared(int)在尝试修改state获取共享锁失败后会返回-1,因此会执行doAcquireShared(arg),该方法源码如下:
private void doAcquireShared(int arg) {
// 创建共享节点并添加到队列尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// 然后进行自旋,不断尝试获取同步状态
for (; ; ) {
final Node p = node.predecessor();
if (p == head) {
// 尝试获取共享锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// 获取共享锁成功,向后传递
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
// 如果在等待期间出现中断,则重现中断操作
selfInterrupt();
failed = false;
return;
}
}
/**
* 执行到此处,说明上面尝试获取锁失败了,因此可以尝试将当前线程挂起
* 这里的shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()在之前已经讲解过了
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate(node, r)方法,源码如下:
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
// 记录旧的头节点,用于下面的各项检查
Node h = head; // Record old head for check below
// 当前节点的线程已获取到共享锁,因此将其该节点设置为头节点
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*
* propagate大于0时,表示当前线程已经获取到共享锁了,需要尝试唤醒下一个节点
* 如果旧的头节点为空,表示之前同步队列就是空的,没有线程在等待
* 如果旧的头节点不为空,但旧的头节点的waitStatus小于0(即为SIGNAL(-1)或者PROPAGATE(-3)),表示应该尝试唤醒后继节点
*/
if (propagate > 0 || h == null || h.waitStatus < 0) {
/**
* 如果node的下一个节点为空,表示此时同步队列没有节点等待了,因此当前线程是最后一个获取共享锁的线程
* 或者node的下一个节点不为空,且是共享节点
* 就执行释放共享锁操作
*/
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
与acquireQueued(final Node, int)方法中setHead()的实现不一样,setHeadAndPropagate(node, r)方法不仅将获取到共享锁的节点设置为头节点,并且进行了传播唤醒;查看doReleaseShared()的源码:
/**
* Release action for shared mode -- signal successor and ensure
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
* 共享模式的释放操作,将唤醒后继节点并保证传播性
* 与独占模式不同,独占模式只会尝试唤醒后继节点
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (; ; ) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 如果当前节点的waitStatus为SIGNAL,则尝试改为0,修改成功就唤醒头节点的后继节点锁包装的线程,修改失败重新尝试修改
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 这里会唤醒头节点的后继节点
unparkSuccessor(h);
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
// 如果头节点的waitStatus为0,则尝试修改为PROPAGATE,如果修改失败则重新尝试修改
continue; // loop on failed CAS
}
/**
* 由于头节点的后继节点所包装的线程被唤醒了,所以当被唤醒的线程获取到共享锁后,头节点可能会被修改;
* 因此当头节点被修改后,继续循环操作,开始下一轮的传播唤醒;
* 如果头节点没有被改变,说明此时虽然唤醒了后继节点,但后继节点并没有获取到共享锁,
* 因此直接break,将下一轮的传播唤醒操作交给后继节点
*/
if (h == head) // loop if head changed
break;
}
}
doReleaseShared()整体是一个无限for循环,它会不断地尝试唤醒头节点的后继节点。 这里我们着重分析一下该for循环的跳出条件h == head,h == head成立即表示在一次循环后,头节点没有被修改,那么什么时候头节点会被修改呢? 假设当前线程获取到共享锁了,因此该线程所在的节点就是头节点(注意,此时头节点只是代表该线程,但该节点的thread被置为null了),头节点尝试唤醒它的后继节点后,如果后继节点成功获取到共享锁就会将自己设置为头节点,此时头节点就被修改了,h == head条件不成立,还会进入下一轮for循环,继续唤醒新的头节点的后继节点;头节点没有被修改的情况主要是:头节点唤醒的后继节点并不是共享节点,此时该节点的线程想获取独占锁,是会被阻塞的。所以此时,头节点并不会被修改,因此直接跳出for循环,结束传播唤醒。
因此我们还可以得出一个结论,当同步队列中既存在共享锁的竞争线程,也存在独占锁的竞争线程时,如果某些线程已经获取到了读锁,此时是无法获取到写锁的,且处于请求写锁的时刻后面的所有请求读锁的线程(非本线程的),也只能继续等待,无法获取到读锁。
释放读锁
释放读锁由releaseShared(int)方法处理,该方法由AQS提供,并且调用了ReentrantReadWriteLock中Sync同步器重写的tryReleaseShared(int)方法:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared(int)方法的源码如下:
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
/**
* 如果释放共享锁的线程就是firstReader,且firstReaderHoldCount为0,
* 说明此时firstReader只重入获取了一次共享锁,
* 此时释放了共享锁,就将firstReader置为null
*/
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
// 否则只是将firstReaderHoldCount减1
firstReaderHoldCount--;
} else {
// 否则需要从readHolds中取出当前线程重入的次数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
// 此时表示当前线程释放了所有的重入次数
readHolds.remove();
if (count <= 0)
// 此时表示释放锁的次数超过了加锁的次数,直接抛出异常
throw unmatchedUnlockException();
}
// 更新值
--rh.count;
}
for (; ; ) {
// 自旋方式保证共享锁记录值能被成功更新
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
// 如果共享锁的记录值更新后为0,则表示所有线程都已经释放了锁,返回true
return nextc == 0;
}
}
公平模式和非公平模式
ReentrantReadWriteLock中的独占锁和共享锁同时存在公平和非公平两种模式,在上面的讲解中我们一直是以FairSync公平同步器讲解的公平模式下的独占锁和共享锁。在使用tryAcquire(int)方法获取独占锁,和使用tryAcquireShared(int)方法获取共享锁时,它们分别调用了writerShouldBlock()和readerShouldBlock()来进行公平和非公平的区分:
// 获取独占锁的方法
protected final boolean tryAcquire(int acquires) {
...
if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
return false;
...
}
// 获取共享锁的方法
protected final int tryAcquireShared(int unused) {
...
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
...
}
...
}
两个方法的声明都在Sync中:
abstract static class Sync extends AbstractQueuedSynchronizer {
...
abstract boolean readerShouldBlock();
abstract boolean writerShouldBlock();
...
}
具体实现在FairSync和NonfairSync中却各不同;下面是FairSync和NonfairSync中readerShouldBlock()各自的实现:
// FairSync中的readerShouldBlock()调用了hasQueuedPredecessors(),这个方法在上一篇文章中已经讲解过了
// 需要判断是否有前驱节点,如果有则返回false,否则返回true
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
// NonfairSync中的readerShouldBlock()调用apparentlyFirstQueuedIsExclusive()方法
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
/**
* 当head节点不为null,且head节点的下一个节点s不为null,且s是独占模式(写线程),且s的线程不为null时,返回true。
* 即判断头节点的下一个节点是否是正在等待的独占锁(写锁),如果是,则需要等待。
* 目的是不应该让写锁始终等待,避免可能的写线程饥饿。
*/
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
可以发现,在竞争共享锁(即读锁)时, 公平模式下竞争线程会首先查看同步队列中是否有线程已经等待了一段时间,如果有就表示自己让步; 而在非公平模式下,只有在同步队列中第一个等待的线程竞争是独占锁(即写锁),才会进行让步,其他情况不会让步。非公平模式的这种设计是为了避免写线程因读线程过多而造成饥饿发生。
FairSync和NonfairSync中writerShouldBlock()各自的实现:
// FairSync中的writerShouldBlock()调用了hasQueuedPredecessors(),不再赘述
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
// NonfairSync中的writerShouldBlock()直接返回false,即永远不让步
final boolean writerShouldBlock() {
return false; // writers can always barge
}
在竞争独占锁时, 公平模式下竞争线程会首先查看同步队列中是否有线程已经等待了一段时间,如果有就表示自己让步; 而在非公平模式下是永远不会让步的,这是由于竞争独占锁的是写线程,无需让步。