Poller组件 - 969251639/study GitHub Wiki
Tomcat的Acceptor组件会将接收到的连接放入一个队列中
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
NioChannel channel = nioChannels.pop();//从缓存中获取NioChannel对象,如果有的话
if ( channel == null ) {
// SSL setup
if (sslContext != null) {
SSLEngine engine = createSSLEngine();
int appbufsize = engine.getSession().getApplicationBufferSize();
NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()),
Math.max(appbufsize,socketProperties.getAppWriteBufSize()),
socketProperties.getDirectBuffer());
channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);
} else {
// normal tcp setup
NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);//将缓存中的NioChannel的socket置换掉
if ( channel instanceof SecureNioChannel ) {
SSLEngine engine = createSSLEngine();
((SecureNioChannel)channel).reset(engine);
} else {
channel.reset();//将缓存中的NioChannel的重置一个新的NioChannel
}
}
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("",t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
上面最重要的一个方法时将通道注册到poller队列
public void register(final NioChannel socket) {
socket.setPoller(this);
KeyAttachment ka = new KeyAttachment(socket);
ka.setPoller(this);
ka.setTimeout(getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
addEvent(r);
}
这里会将Socket封装成KeyAttachment,然后生成PollerEvent,最后放入队列
//SynchronizedQueue内部是一个数组,后面会被更好的线程安全队列ConcurrentLinkedQueue替换
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();
private void addEvent(PollerEvent event) {
events.offer(event);//塞入队列
if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup(); //唤醒poller循环中select方法
}
在启动Connector时会启动poller线程
@Override
public void startInternal() throws Exception {
if (!running) {
...
// Start poller threads
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {//创建poller线程池
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
...
}
}
接下来转到poller的run方法
@Override
public void run() {
// Loop until destroy() is called
while (true) {
try {
// Loop if endpoint is paused
while (paused && (!close) ) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}
}
boolean hasEvents = false;
// Time to terminate?
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString(
"endpoint.nio.selectorCloseFail"), ioe);
}
break;
} else {
hasEvents = events();
}
try {
if ( !close ) {
if (wakeupCounter.getAndSet(-1) > 0) {
//if we are here, means we have other stuff to do
//do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString(
"endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("",x);
continue;
}
//either we timed out or we woke up, process events first
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
KeyAttachment attachment = (KeyAttachment)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
attachment.access();
iterator.remove();
processKey(sk, attachment);
}
}//while
//process timeouts
timeout(keyCount,hasEvents);
if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
releaseCaches();
log.error("", oom);
}catch ( Throwable oomt ) {
try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
}
}//while
stopLatch.countDown();
}
代码很长,拣重点看
- events方法,用来扫描poller队列是否有事件
events();
public boolean events() {
boolean result = false;
PollerEvent pe = null;
for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
result = true;
try {
pe.run();
pe.reset();
if (running && !paused) {
eventCache.push(pe);
}
} catch ( Throwable x ) {
log.error("",x);
}
}
return result;
}
如果有事件,则执行事件的run方法,然后将执行完的PollerEvent重置后放回缓存,用于复用
@Override
public void run() {
if ( interestOps == OP_REGISTER ) {
try {
socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
} catch (Exception x) {
log.error("", x);
}
} else {
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
if (key == null) {
// The key was cancelled (e.g. due to socket closure)
// and removed from the selector while it was being
// processed. Count down the connections at this point
// since it won't have been counted down when the socket
// closed.
socket.getPoller().getEndpoint().countDownConnection();
} else {
final KeyAttachment att = (KeyAttachment) key.attachment();
if ( att!=null ) {
att.access();//to prevent timeout
//we are registering the key to start with, reset the fairness counter.
int ops = key.interestOps() | interestOps;
att.interestOps(ops);
key.interestOps(ops);
} else {
socket.getPoller().cancelledKey(key, SocketStatus.ERROR);
}
}
} catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT);
} catch (Exception ignore) {}
}
}//end if
}//run
里面主要是向多路复用器注册读事件
转回到poller的run方法,从队列取出数据后会走到processKey(sk, attachment);这个方法会根据读写事件调用processSocket方法
protected boolean processSocket(KeyAttachment attachment, SocketStatus status, boolean dispatch) {
try {
if (attachment == null) {
return false;
}
SocketProcessor sc = processorCache.pop();
if ( sc == null ) sc = new SocketProcessor(attachment, status);
else sc.reset(attachment, status);
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
log.warn(sm.getString("endpoint.executor.fail", attachment.getSocket()), ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
很明显,将通道的信息封装成SocketProcessor后放入worker线程池取执行
protected class SocketProcessor implements Runnable {
...
@Override
public void run() {
NioChannel socket = ka.getSocket();
// Upgraded connections need to allow multiple threads to access the
// connection at the same time to enable blocking IO to be used when
// NIO has been configured
if (ka.isUpgraded() && SocketStatus.OPEN_WRITE == status) {
synchronized (ka.getWriteThreadLock()) {
doRun();
}
} else {
synchronized (socket) {
doRun();
}
}
}
private void doRun() {
...
// Process the request from this socket
// 处理socket请求
if (status == null) {
state = handler.process(ka, SocketStatus.OPEN_READ);
} else {
state = handler.process(ka, status);
}
...
}
...
}
protected abstract static class AbstractConnectionHandler<S,P extends Processor<S>>
implements AbstractEndpoint.Handler {
...
public SocketState process(SocketWrapper<S> wrapper,
SocketStatus status) {
...
state = processor.process(wrapper);
...
}
...
}
public abstract class AbstractHttp11Processor<S> extends AbstractProcessor<S> {
...
@Override
public SocketState process(SocketWrapper<S> socketWrapper)
throws IOException {
// Setting up the I/O
setSocketWrapper(socketWrapper);
getInputBuffer().init(socketWrapper, endpoint);//初始化读缓存
getOutputBuffer().init(socketWrapper, endpoint);//初始化写缓存
...
getAdapter().service(request, response);
...
}
...
}
public class CoyoteAdapter implements Adapter {
...
@SuppressWarnings("deprecation")
@Override
public void service(org.apache.coyote.Request req,
org.apache.coyote.Response res)
throws Exception {
...
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
...
}
...
}
执行到最后会调用管道开始数据流转