Semaphore原理分析 - 969251639/study GitHub Wiki
Semaphore是jdk实现的信号量,用来控制线程的进与出,内部采用共享同步器实现
它有两个构造方法
public Semaphore(int permits) {//授权数
sync = new NonfairSync(permits);//非公平同步器
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);//根据fair参数控制是否是非公平还是公平同步器
}
不管公平还是非公平,都需要靠内置的Sync同步器实现
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {//调用父类初始化授权数
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {//调用父类初始化授权数
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())//判断是否线程排队获取锁
return -1;//存在排队获取,返回-1,表示失败
int available = getState();//获取同步状态
int remaining = available - acquires;//用可以用的同步状态值(可用授权数)减去需要的数量
if (remaining < 0 ||//如果剩余的授权数小于0
compareAndSetState(available, remaining))//如果不小于0,那么则cas修改可用授权数
return remaining;//返回小于0表示获取失败,否则成功
}
}
}
Sync同步器
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {//初始话同步状态值,即授权数
setState(permits);
}
final int getPermits() {//获取授权数
return getState();
}
final int nonfairTryAcquireShared(int acquires) {//非公平方式获取同步状态
for (;;) {
int available = getState();//可用授权数
int remaining = available - acquires;//用可以用的同步状态值(可用授权数)减去需要的数量
if (remaining < 0 ||//如果剩余的授权数小于0
compareAndSetState(available, remaining))//如果不小于0,那么则cas修改可用授权数
return remaining;//返回小于0表示获取失败,否则成功
}
}
protected final boolean tryReleaseShared(int releases) {//尝试释放同步状态
for (;;) {
int current = getState();//获取授权数
int next = current + releases;//授权数+1
if (next < current) // overflow// 边界判断,即参数reductions不能为负数
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))//cas修改授权数
return true;//修改成功返回
}
}
final void reducePermits(int reductions) {//减少授权数
for (;;) {
int current = getState();//获取授权数
int next = current - reductions;//获取剩余授权数
if (next > current) // 边界判断,即参数reductions不能为负数
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))//cas修改授权数
return;//修改成功返回
}
}
final int drainPermits() {//清空授权数
for (;;) {
int current = getState();//获取授权数
if (current == 0 || compareAndSetState(current, 0))//如果授权数为0或者cas修改成0则返回最近的一次获取授权数
return current;
}
}
}
接下来看申请进入的代码
public void acquire() throws InterruptedException {//支持中断方式申请授权
sync.acquireSharedInterruptibly(1);
}
public void acquireUninterruptibly() {//申请授权
sync.acquireShared(1);
}
内部sync内部的AQS又会去调用tryAcquire方法
public boolean tryAcquire() {//这里会调用上面的sync的nonfairTryAcquireShared方法返回是否成功的值
return sync.nonfairTryAcquireShared(1) >= 0;
}
交还授权数的代码
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);//调用AQS释放共享同步状态值
}
其他辅助方法
public int availablePermits() {//获取剩余的授权数
return sync.getPermits();
}
public int drainPermits() {//清空授权
return sync.drainPermits();
}