ReentrantReadWriteLock原理 - 969251639/study GitHub Wiki
ReentrantReadWriteLock是jdk实现的读写锁,实现ReadWriteLock接口,重写ReadWriteLock接口的读锁和写锁两个方法
public interface ReadWriteLock {
//读锁
Lock readLock();
//写锁
Lock writeLock();
}
下面的分析需要先弄懂AQS和ReentrantLock
AQS:https://github.com/969251639/study/wiki/AbstractQueueSysnchorized%E5%8E%9F%E7%90%86
ReentrantLock:https://github.com/969251639/study/wiki/ReentrantLock%E5%8E%9F%E7%90%86
ReentrantReadWriteLock内部持有读锁和写锁的两个实例
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
...
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }//获取写锁
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }//获取读锁
...
}
同时也和ReentrantLock一样,在构造方法中可以构造公平锁和非公平锁
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
默认是非公平锁的算法,公司也一起构造了读锁和写锁的实例
下面在分析源码之前先说下ReentrantReadWriteLock的规则
获取读锁的条件:
- 没有其他线程的写锁
- 如果有写锁,且持有写锁的线程和读锁的线程要同一个线程
获取写锁的条件:
- 没有读锁
- 没有写锁
总结起来就是读和读不互斥,读和写互斥(读写都是一个线程除外),写和写互斥,所以缺点很明显,写的时候不能有任何读写锁,那么在读多写少的情况下,写锁的请求很可能因为一直被读锁卡着而被饥饿
另外,持有读写锁的线程释放锁时可以先释放写锁,从而将其降级为读锁,且都支持重入
写锁
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {//通过具体的同步器构造写锁
sync = lock.sync;
}
public void lock() {//获取写锁
sync.acquire(1);
}
public void lockInterruptibly() throws InterruptedException {//获取写锁,支持中断
sync.acquireInterruptibly(1);
}
public boolean tryLock( ) {//尝试写锁
return sync.tryWriteLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {//尝试写锁,获取写锁可超时
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {//释放写锁
sync.release(1);
}
public Condition newCondition() {//创建写锁的条件队列
return sync.newCondition();
}
public boolean isHeldByCurrentThread() {//当前线程是否持有锁
return sync.isHeldExclusively();
}
public int getHoldCount() {//写锁的重入次数
return sync.getWriteHoldCount();
}
}
读锁
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {//通过具体的同步器构造读锁
sync = lock.sync;
}
public void lock() {//获取读锁
sync.acquireShared(1);
}
public void lockInterruptibly() throws InterruptedException {//获取读锁,支持中断
sync.acquireSharedInterruptibly(1);
}
public boolean tryLock() {//尝试读锁
return sync.tryReadLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {//尝试读锁,获取读锁可超时
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void unlock() {//释放读锁
sync.releaseShared(1);
}
public Condition newCondition() {//创建读锁的条件队列
throw new UnsupportedOperationException();
}
}
可以看到不管是读锁还是写锁都需要具体的同步器Sync做锁的操作支持;读锁用共享锁,写锁用排他锁;读锁没有重入的获取
Sync的实现就是公平锁和非公平锁两种(构造函数中确定)
公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {//判断写锁是否需要阻塞
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {//判断读锁是否需要阻塞
return hasQueuedPredecessors();
}
}
非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {//判断写锁是否需要阻塞,这里永远不阻塞写锁
return false; // writers can always barge
}
final boolean readerShouldBlock() {//判断读锁是否需要阻塞
return apparentlyFirstQueuedIsExclusive();
}
}
同步器
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
//ReentrantReadWriteLock用一个32位的int将其分成两部分,分表表示读和写的线程占用数,即state
//用低16位用来表示写锁的占用数,用高16位来表示读锁的占用数
static final int SHARED_SHIFT = 16;//分段占位数
static final int SHARED_UNIT = (1 << SHARED_SHIFT);//读锁占用的单位
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;//最大读锁数量,2^16 - 1 = 65535
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//最大写锁数量,2^16 - 1 = 65535
/** 计算出读锁数量 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 计算出写锁 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
...
}
另外Sync还有四个辅助变量来辅助读写锁的操作
//读锁的重入数,用于上下文操作,且下面的三个变量都是为readHolds服务,Sync构造方法中会创建该实例
private transient ThreadLocalHoldCounter readHolds;
//缓存重入数
private transient HoldCounter cachedHoldCounter;
//第一个读锁的线程
private transient Thread firstReader = null;
//第一个读锁的重入数
private transient int firstReaderHoldCount;
Sync的构造方法
Sync() {
readHolds = new ThreadLocalHoldCounter();//构造重入数的上下文
setState(getState()); //根据当前的同步数创建同步状态值
}
Sync的重入计数器和重入计数器的上下文
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {//默认初始化时new了一个HoldCounter实例,哪怕还没有set,即线程没有set,也可以调用get获取
return new HoldCounter();
}
}
static final class HoldCounter {
int count = 0;//持有次数
final long tid = getThreadId(Thread.currentThread());//当前线程的id
}
尝试获取写锁
final boolean tryWriteLock() {
Thread current = Thread.currentThread();//获取当前线程
int c = getState();//获取当前同步状态值
if (c != 0) {//如果不等于0,表示已有线程获取锁
int w = exclusiveCount(c);//获取写锁的数量
if (w == 0 || current != getExclusiveOwnerThread())//如果写锁的数量为0,表示是其他线程获取了读锁,根据读写互斥,直接返回false,如果其他线程获取到的是写锁,那么判断当前持有写锁的线程是不是当前线程,不是返回false,获取失败
return false;
if (w == MAX_COUNT)//如果锁的次数超过最大值,抛出异常
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))//如果没有锁,那么尝试cas修改state,即重入数
return false;
setExclusiveOwnerThread(current);//修改成功,则设置当前获取锁的线程为当前线程
return true;
}
尝试获取读锁
final boolean tryReadLock() {
Thread current = Thread.currentThread();//获取当前线程
for (;;) {//自旋
int c = getState();//获取当前同步状态值
if (exclusiveCount(c) != 0 &&//如果有其他线程获取写锁且持有写锁的线程不是当前线程,根据读写互斥,返回false
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);//获取读锁数量
if (r == MAX_COUNT)//超出最大值,抛出异常
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {//尝试修改state值(修改高16位,高16位是读锁的位置)
//进入这个if分支表示获取读锁成功
if (r == 0) {//如果没有任何线程获得读锁
firstReader = current;//设置第一个读锁线程
firstReaderHoldCount = 1;//设置第一个读锁线程的重入数
} else if (firstReader == current) {//如果有其他线程获取了读锁,那么第一个读锁是当前线程则第一个读锁线程的重入数+1
firstReaderHoldCount++;
} else {//表示不是第一个读锁的线程
HoldCounter rh = cachedHoldCounter;//获取用来缓存重入次数的计数器
if (rh == null || rh.tid != getThreadId(current))//如果没有缓存或者cachedHoldCounter不是当前线程的计数器
cachedHoldCounter = rh = readHolds.get();//创建或从上下文获取缓存重入次数的计数器
else if (rh.count == 0)//如果重入次数为0
readHolds.set(rh);//设置到上下文
rh.count++;//重入次数+1
}
return true;
}
}
}
可以看到readHolds的作用其实就是用来缓存每个获取到的读锁线程的重入次数,另外cachedHoldCounter来作为最后一次访问到的读锁的线程的重入计数器的缓存,减少对readHolds的访问,另外,第一个读锁的线程不放入readHolds上下文
获取写锁
public static class WriteLock implements Lock, java.io.Serializable {
...
public void lock() {
sync.acquire(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
...
}
不管是lock还是lockInterruptibly,最后都是通过AQS调用sync内部的tryAcquire方法获取锁
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();//获取当前线程
int c = getState();//获取当前同步状态值
int w = exclusiveCount(c);//获取写锁的数量
if (c != 0) {//如果不等于0,表示已有线程获取锁
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())//如果写锁的数量为0,表示是其他线程获取了读锁,根据读写互斥,直接返回false,如果其他线程获取到的是写锁,那么判断当前持有写锁的线程是不是当前线程,不是返回false,获取失败
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)//如果锁的次数超过最大值,抛出异常
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() || //如果是非公平锁这里不阻塞,如果是公平锁,那么如果有线程在CLH队列,那么需要阻塞
!compareAndSetState(c, c + acquires))//那么根据writerShouldBlock来决定是否阻塞写锁如果没有锁,那么尝试cas修改state,即重入数
return false;
setExclusiveOwnerThread(current);//如果没有锁,那么尝试cas修改state,即重入数
return true;
}
获取读锁
public static class ReadLock implements Lock, java.io.Serializable {
...
public void lock() {
sync.acquireShared(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
...
}
不管是lock还是lockInterruptibly,最后都是通过AQS调用sync内部的tryAcquireShared方法获取锁
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();//获取当前线程
int c = getState();//获取当前同步状态值
if (exclusiveCount(c) != 0 &&//如果有其他线程获取写锁且持有写锁的线程不是当前线程,根据读写互斥,返回false
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);//获取读锁数量
if (!readerShouldBlock() && //是否需要阻塞
r < MAX_COUNT && //小于最大读锁数
compareAndSetState(c, c + SHARED_UNIT)) {//尝试修改state值(修改高16位,高16位是读锁的位置)
if (r == 0) {//如果没有任何线程获得读锁
firstReader = current;//设置第一个读锁线程
firstReaderHoldCount = 1;//设置第一个读锁线程的重入数
} else if (firstReader == current) {//如果有其他线程获取了读锁,那么第一个读锁是当前线程则第一个读锁线程的重入数+1
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;//获取用来缓存重入次数的计数器
if (rh == null || rh.tid != getThreadId(current))//如果没有缓存或者cachedHoldCounter不是当前线程的计数器
cachedHoldCounter = rh = readHolds.get();//创建或从上下文获取缓存重入次数的计数器
else if (rh.count == 0)//如果重入次数为0
readHolds.set(rh);//设置到上下文
rh.count++;//重入次数+1
}
return 1;
}
return fullTryAcquireShared(current);//如果上面的代码没有获取到锁进入该方法
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();//获取当前同步状态值
if (exclusiveCount(c) != 0) {//如果有其他线程获取写锁且持有写锁的线程不是当前线程,根据读写互斥,返回false
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {//是否阻塞
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {//不是第一个读锁线程
if (rh == null) {
rh = cachedHoldCounter;//获取用来缓存重入次数的计数器
if (rh == null || rh.tid != getThreadId(current)) {//如果没有缓存或者cachedHoldCounter不是当前线程的计数器
rh = readHolds.get();//创建或从上下文获取缓存重入次数的计数器
if (rh.count == 0)//如果重入次数为0
readHolds.remove();//从上下文remove掉
}
}
if (rh.count == 0)//重入次数为0
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)//超过最大读锁数,抛出异常
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {//尝试修改state值(修改高16位,高16位是读锁的位置)
if (sharedCount(c) == 0) {//如果没有任何线程获得读锁
firstReader = current;//设置第一个读锁线程
firstReaderHoldCount = 1;//设置第一个读锁线程的重入数
} else if (firstReader == current) {//如果有其他线程获取了读锁,那么第一个读锁是当前线程则第一个读锁线程的重入数+1
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;//获取用来缓存重入次数的计数器
if (rh == null || rh.tid != getThreadId(current))//如果没有缓存或者cachedHoldCounter不是当前线程的计数器
rh = readHolds.get();
else if (rh.count == 0)//如果重入次数为0
readHolds.set(rh);//设置到上下文
rh.count++;//重入次数+1
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}