LimitLatch组件 - 969251639/study GitHub Wiki
LimitLatch组件是Tomcat用来控制最大连接数,如果超过最大连接数,则等待
protected void countUpOrAwaitConnection() throws InterruptedException {
if (maxConnections==-1) return;
LimitLatch latch = connectionLimitLatch;
if (latch!=null) latch.countUpOrAwait();
}
可以看到如果maxConnections配为-1,则无此限制,maxConnections默认大小是10000
private int maxConnections = 10000;
而这个配置其实是可以在server.xml中Connector节点中配置(maxConnections配置只对nio模式下才生效)
<Connector port="8080" protocol="HTTP/1.1" connectionTimeout="20000" redirectPort="8443" maxConnections="1000"/>
在LimitLatch的构造器中,用一个AtomicLong来计数,用一个Sync来控制限流
public LimitLatch(long limit) {
this.limit = limit;
this.count = new AtomicLong(0);
this.sync = new Sync();
}
Sync是LimitLatch一个内部类,是控制连接数的核心,内部是通过java内置的AQS来实现,其实本质上就是一个共享锁(可以又n个线程可执行,超出则等待)
private class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
public Sync() {
}
@Override
protected int tryAcquireShared(int ignored) {
long newCount = count.incrementAndGet();
if (!released && newCount > limit) {
// Limit exceeded
count.decrementAndGet();
return -1;
} else {
return 1;
}
}
@Override
protected boolean tryReleaseShared(int arg) {
count.decrementAndGet();
return true;
}
}
重写了共享锁需要的两个方法进行控制,其余都交由AQS来控制
当Acceptor调用countUpOrAwaitConnection()时最后会调用LimitLatch的countUpOrAwait方法
public void countUpOrAwait() throws InterruptedException {
if (log.isDebugEnabled()) {
log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
}
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly是AQS的内部方法,里面又会调用tryAcquireShared尝试是否能获取得到执行权限,tryAcquireShared方法的返回值只有两种
- 当达到连接上线时返回-1,表示需要等待
- 当连接数未达到上线时返回1,表示可以继续执行
连接释放也是类似,AQS内部会自动调用tryReleaseShared进行释放,其实就是把连接计数器减1