生产者——网络 - 969251639/study GitHub Wiki
Netclient是Kafka的默认的网络处理器,它的底层是对java的nio的封装,在构造Sender的时候会创建它
Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) {
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
this.metrics, time, "producer", channelBuilder, logContext),//多路复用器
metadata,//元数据
clientId,//生产者id
maxInflightRequests,
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
requestTimeoutMs,//请求超时时间,request.timeout.ms控制
ClientDnsLookup.forConfig(producerConfig.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
time,
true,
apiVersions,
throttleTimeSensor,
logContext);
}
结合生产者——发送器继续分析
https://github.com/969251639/study/wiki/%E7%94%9F%E4%BA%A7%E8%80%85%E2%80%94%E2%80%94%E5%8F%91%E9%80%81%E5%99%A8
在Sender的线程中run方法会调用sendProduceRequest方法,然后再发送网络IO之前会先将消息写到nio的通道中
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
...
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
requestTimeoutMs, callback);//构造发送请求
client.send(clientRequest, now);//发送请求,这里只是将消息写到通道,还没有真正的处理网络IO
...
}
首先构造一个request,kafka要求所有的网络请求都是通过这个ClientRequest来进行网络交互
@Override
public ClientRequest newClientRequest(String nodeId,
AbstractRequest.Builder<?> requestBuilder,
long createdTimeMs,
boolean expectResponse,
int requestTimeoutMs,
RequestCompletionHandler callback) {
return new ClientRequest(nodeId, requestBuilder, correlation++, clientId, createdTimeMs, expectResponse,
requestTimeoutMs, callback);
}
public ClientRequest(String destination,
AbstractRequest.Builder<?> requestBuilder,
int correlationId,
String clientId,
long createdTimeMs,
boolean expectResponse,
int requestTimeoutMs,
RequestCompletionHandler callback) {
this.destination = destination;//节点id,也就是需要发送到那个broker上
this.requestBuilder = requestBuilder;//请求构造器,请求的信息都封装在里面,比如acks,超时等
this.correlationId = correlationId;//???
this.clientId = clientId;//生产者id
this.createdTimeMs = createdTimeMs;//创建时间
this.expectResponse = expectResponse;//是否需要服务器ack
this.requestTimeoutMs = requestTimeoutMs;//请求超时时间
this.callback = callback;//回调方法
}
构造完ClientRequest开始调用send方法写通道
@Override
public void send(ClientRequest request, long now) {
doSend(request, false, now);
}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
//网络处理器是否处于激活状态,初始化时就是激活,它有三个状态,分别是
//ACTIVE:激活
//CLOSING:关闭中
//CLOSED:已关闭
ensureActive();
String nodeId = clientRequest.destination();//获取brokerId
if (!isInternalRequest) {//非本地(内网)发送???,检查网络状况
// If this request came from outside the NetworkClient, validate
// that we can send data. If the request is internal, we trust
// that internal code has done this validation. Validation
// will be slightly different for some internal requests (for
// example, ApiVersionsRequests can be sent prior to being in
// READY state.)
if (!canSendRequest(nodeId, now))//检查该broker网络状况是否Ok
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
}
AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
try {
NodeApiVersions versionInfo = apiVersions.get(nodeId);
short version;
// Note: if versionInfo is null, we have no server version information. This would be
// the case when sending the initial ApiVersionRequest which fetches the version
// information itself. It is also the case when discoverBrokerVersions is set to false.
if (versionInfo == null) {
version = builder.latestAllowedVersion();
if (discoverBrokerVersions && log.isTraceEnabled())
log.trace("No version information found when sending {} with correlation id {} to node {}. " +
"Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
} else {
version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
}
// The call to build may also throw UnsupportedVersionException, if there are essential
// fields that cannot be represented in the chosen version.
doSend(clientRequest, isInternalRequest, now, builder.build(version));//开始send
} catch (UnsupportedVersionException unsupportedVersionException) {
// If the version is not supported, skip sending the request over the wire.
// Instead, simply add it to the local queue of aborted requests.
log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder,
clientRequest.correlationId(), clientRequest.destination(), unsupportedVersionException);
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
clientRequest.callback(), clientRequest.destination(), now, now,
false, unsupportedVersionException, null, null);
abortedSends.add(clientResponse);
}
}
上面的doSend主要就做两步,一个是检查节点网络状况,一个是执行真正的doSend
private boolean canSendRequest(String node, long now) {
return connectionStates.isReady(node, now) && selector.isChannelReady(node) &&
inFlightRequests.canSendMore(node);
}
每个节点的网络状况可以看通过3个条件控制
- 连接状态器:connectionStates
private NetworkClient(...) {
...
this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax, logContext);
...
}
连接状态也是懒惰式触发,在每次发送消息的时候都会去ready检查一下,如果发现连接状态为空则初始化连接状态
private long sendProducerData(long now) {
...
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {//检查网络是否OK
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
...
}
@Override
public boolean ready(Node node, long now) {
if (node.isEmpty())
throw new IllegalArgumentException("Cannot connect to empty node " + node);
if (isReady(node, now))//如果节点已经准备好了返回true,第一次肯定返回false
return true;
if (connectionStates.canConnect(node.idString(), now))//如果没有连接过,则初始化连接
// if we are interested in sending to a node and we don't have a connection to it, initiate one
initiateConnect(node, now);
return false;
}
connectionStates内部会用一个HashMap缓存节点的状态
public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs, LogContext logContext) {
...
this.nodeState = new HashMap<>();
}
所以调用canConnect的时候会从缓存中获取该节点状态,获取不到则去初始化连接,否则检查下状态以及是否超过重连的间隔时间
public boolean canConnect(String id, long now) {
NodeConnectionState state = nodeState.get(id);
if (state == null)
return true;
else
return state.state.isDisconnected() &&
now - state.lastConnectAttemptMs >= state.reconnectBackoffMs;
}
初始化连接
private void initiateConnect(Node node, long now) {
String nodeConnectionId = node.idString();//brokerId
try {
this.connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);//初始化连接状态到缓存中,并将其状态设置为连接中
InetAddress address = this.connectionStates.currentAddress(nodeConnectionId);//broker地址
log.debug("Initiating connection to node {} using address {}", node, address);
selector.connect(nodeConnectionId,
new InetSocketAddress(address, node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);//调用多路复用器发送连接请求
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnected(nodeConnectionId, now);
/* maybe the problem is our metadata, update it */
metadataUpdater.requestUpdate();
log.warn("Error connecting to node {}", node, e);
}
}
public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException {
NodeConnectionState connectionState = nodeState.get(id);
if (connectionState != null && connectionState.host().equals(host)) {//刷新连接时间,并重置连接状态
connectionState.lastConnectAttemptMs = now;
connectionState.state = ConnectionState.CONNECTING;
connectionState.moveToNextAddress();//轮询连接到下一个broker
return;
} else if (connectionState != null) {
log.info("Hostname for node {} changed from {} to {}.", id, connectionState.host(), host);
}
// Create a new NodeConnectionState if nodeState does not already contain one
// for the specified id or if the hostname associated with the node id changed.
nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now,
this.reconnectBackoffInitMs, host, clientDnsLookup));//放入缓存,并设置连接状态为CONNECTING,并记录连接时间
}
@Override
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
ensureNotRegistered(id);//确保该节点没有注册到通道中
SocketChannel socketChannel = SocketChannel.open();//打开通道
try {
configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);//设置非阻塞连接参数
boolean connected = doConnect(socketChannel, address);//发起连接请求
SelectionKey key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);//注册OP_CONNECT事件到多路复用器中
if (connected) {//连接成功
// OP_CONNECT won't trigger for immediately connected channels
log.debug("Immediately connected to node {}", id);
immediatelyConnectedKeys.add(key);//将该key放到缓存中
key.interestOps(0);//设置key的Ops
}
} catch (IOException | RuntimeException e) {
socketChannel.close();
throw e;
}
}
private void ensureNotRegistered(String id) {
if (this.channels.containsKey(id))//通道缓存已存在该节点
throw new IllegalStateException("There is already a connection for id " + id);
if (this.closingChannels.containsKey(id))//关闭通道缓存已存在该节点
throw new IllegalStateException("There is already a connection for id " + id + " that is still being closed");
}
private void configureSocketChannel(SocketChannel socketChannel, int sendBufferSize, int receiveBufferSize)
throws IOException {
socketChannel.configureBlocking(false);//设置非阻塞
Socket socket = socketChannel.socket();//获取socket
socket.setKeepAlive(true);//设置长连接为true
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)//设置发送缓存大小
socket.setSendBufferSize(sendBufferSize);
if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)//设置响应缓存大小
socket.setReceiveBufferSize(receiveBufferSize);
socket.setTcpNoDelay(true);//设置TCP_NODELAY为true,立即发送报文,对于非延迟性的请求都应该设置为true
}
protected boolean doConnect(SocketChannel channel, InetSocketAddress address) throws IOException {
try {
return channel.connect(address);//发起连接
} catch (UnresolvedAddressException e) {
throw new IOException("Can't resolve address: " + address, e);
}
}
private SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException {
SelectionKey key = socketChannel.register(nioSelector, interestedOps);//将该通道注册到多路复用器上,并关注OP_CONNECT事件
KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);//建立传输协议,绑定通道
this.channels.put(id, channel);//每个节点一个通道,即每个节点一个请求通道
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), time.nanoseconds());
return key;
}
上面有几个点需要注意:
- 第一次创建连接请求时不会立即发送消息,因为ready方法在初始化连接后会返回false,因为是异步连接,需要等待服务器响应后才会真正的表示可连接
- 上面整个过程都没有发送网络的IO读写
- Kafka封装了Selector,非原始nio的Selector
多路复用器监听轮询监听事件在poll方法中
public void poll(long timeout) throws IOException {
...
pollSelectionKeys(readyKeys, false, endSelect);
...
}
void pollSelectionKeys(Set<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
...
if (isImmediatelyConnected || key.isConnectable()) {//如果是OP_CONNECT事件
if (channel.finishConnect()) {//是否已完成连接
this.connected.add(channel.id());//放入到连接队列中
this.sensors.connectionCreated.record();
SocketChannel socketChannel = (SocketChannel) key.channel();
log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
socketChannel.socket().getReceiveBufferSize(),
socketChannel.socket().getSendBufferSize(),
socketChannel.socket().getSoTimeout(),
channel.id());
} else {
continue;
}
}
...
}
最后在NetworkClient的poll方法中完成连接状态处理
public List<ClientResponse> poll(long timeout, long now) {
...
handleConnections();
...
}
private void handleConnections() {
for (String node : this.selector.connected()) {//获取所有已确认连接的节点Id
// We are now connected. Node that we might not still be able to send requests. For instance,
// if SSL is enabled, the SSL handshake happens after the connection is established.
// Therefore, it is still necessary to check isChannelReady before attempting to send on this
// connection.
if (discoverBrokerVersions) {
this.connectionStates.checkingApiVersions(node);
nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
log.debug("Completed connection to node {}. Fetching API versions.", node);
} else {
this.connectionStates.ready(node);//设置连接状态
log.debug("Completed connection to node {}. Ready.", node);
}
}
}
public void ready(String id) {
NodeConnectionState nodeState = nodeState(id);
nodeState.state = ConnectionState.READY;//状态设置为READY
nodeState.authenticationException = null;
resetReconnectBackoff(nodeState);
}
现在再回过头来看connectionStates.isReady(node, now)方法
public boolean isReady(String id, long now) {
return isReady(nodeState.get(id), now);
}
private boolean isReady(NodeConnectionState state, long now) {//判断连接状态是否为READY且受限时间小于当前时间(受限时间由服务器响应返回)
return state != null && state.state == ConnectionState.READY && state.throttleUntilTimeMs <= now;
}
这时候的状态已经被设置为READY
- 多路复用器的通道是否已连接
Kafka的多路复用器非jdk的多路复用器,而是自己封装了一个在org.apache.kafka.common.network.Selector中
同样的在创建Sender的时候会同时去创建Selector,而在Selector中通过nioSelector变量封装了java.nio.channels.Selector来进行异步IO的读写
public Selector(int maxReceiveSize,
long connectionMaxIdleMs,
int failedAuthenticationDelayMs,
Metrics metrics,
Time time,
String metricGrpPrefix,
Map<String, String> metricTags,
boolean metricsPerConnection,
boolean recordTimePerConnection,
ChannelBuilder channelBuilder,
MemoryPool memoryPool,
LogContext logContext) {
try {
this.nioSelector = java.nio.channels.Selector.open();
} catch (IOException e) {
throw new KafkaException(e);
}
...
}
另外,为了支持不同的协议,比如明文传输,ssl加密传输等,使用TransportLayer接口定义了协议的实现
SslTransportLayer:SSL传输协议
PlaintextTransportLayer:明文传输协议
在创建之前的创建注册通道的时候会去绑定传输协议
private SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException {
SelectionKey key = socketChannel.register(nioSelector, interestedOps);//将该通道注册到多路复用器上,并关注OP_CONNECT事件
KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);//建立传输协议,绑定kafka通道
...
}
private KafkaChannel buildAndAttachKafkaChannel(SocketChannel socketChannel, String id, SelectionKey key) throws IOException {
try {
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);//将通道绑定到传输协议中
key.attach(channel);
return channel;
} catch (Exception e) {
try {
socketChannel.close();
} finally {
key.cancel();
}
throw new IOException("Channel could not be created for socket " + socketChannel, e);
}
}
channelBuilder是根据kafka的配置来创建
Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) {
...
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig);
...
}
默认都是PlaintextTransportLayer明文传输协议,kafka用TransportLayer来封装对底层channel的操作
回过来看多路复用的通道连接便肯定是对TransportLayer中的channel的连接检查
selector.isChannelReady(node)
@Override
public boolean isChannelReady(String id) {
KafkaChannel channel = this.channels.get(id);//在注册通道的时候会将节点id和通道绑定
return channel != null && channel.ready();
}
public boolean ready() {
return transportLayer.ready() && authenticator.complete();//明文传输的ready和complete都返回true
}
@Override
public boolean ready() {
return true;
}
@Override
public boolean complete() {
return true;
}
- 已发出但没收到服务器响应或正在发送的消息是否已堆积满
NetworkClient有个成员变量inFlightRequests,类型是InFlightRequests,用来缓存正在发送的和已经发送的但还没有接收到response的request,它内部有3个成员变量
private final int maxInFlightRequestsPerConnection;//每个节点最大的堆积请求数,由max.in.flight.requests.per.connection配置项控制
private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>();//broker节点和InFlightRequests的映射
/** Thread safe total number of in flight requests. */
private final AtomicInteger inFlightRequestCount = new AtomicInteger(0);//一个InFlightRequests的堆积数
知道了InFlightRequests结构后来看它的canSendMore方法就很简单了
inFlightRequests.canSendMore(node);
public boolean canSendMore(String node) {
Deque<NetworkClient.InFlightRequest> queue = requests.get(node);//取出该节点的堆积请求队列
//如果队列为空或者(第一个请求已完成,即如果第一个请求都没完成,那么网络可能出问题直接返回false,且队列的长度小于最大堆积请求数)
return queue == null || queue.isEmpty() ||
(queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
String destination = clientRequest.destination();//获取目标brokerId
RequestHeader header = clientRequest.makeHeader(request.version());//标记头部版本号
if (log.isDebugEnabled()) {
int latestClientVersion = clientRequest.apiKey().latestVersion();
if (header.apiVersion() == latestClientVersion) {
log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
clientRequest.correlationId(), destination);
} else {
log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination);
}
}
Send send = request.toSend(destination, header);//创建一个Send用来发送数据包
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);//创建一个新的发送请求
this.inFlightRequests.add(inFlightRequest);//添加到堆积请求的队列中
selector.send(send);//调用多路复用器的发送
}
public void send(Send send) {
String connectionId = send.destination();//目标brokerId
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);//获取节点对应的通道
if (closingChannels.containsKey(connectionId)) {//如果是正在关闭中的通道
// ensure notification via `disconnected`, leave channel in the state in which closing was triggered
this.failedSends.add(connectionId);//放到失败的发送List中
} else {
try {
channel.setSend(send);
} catch (Exception e) {
// update the state for consistency, the channel will be discarded after `close`
channel.state(ChannelState.FAILED_SEND);
// ensure notification via `disconnected` when `failedSends` are processed in the next poll
this.failedSends.add(connectionId);
close(channel, CloseMode.DISCARD_NO_NOTIFY);
if (!(e instanceof CancelledKeyException)) {
log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}",
connectionId, e);
throw e;
}
}
}
}
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);//发布写事件,但没有IO,只是声明有buffer可以写了
}
@Override
public void addInterestOps(int ops) {
key.interestOps(key.interestOps() | ops);
}
其实doSend就是将声明写事件,告知多路复用器可以进行写操作,所以整个过程其实还没有真正的执行网络IO操作,而真正的网络操作是NetworkClient的poll方法中,Sender在调用完sendProducerData就会去做poll的轮询
void run(long now) {
...
long pollTimeout = sendProducerData(now);
client.poll(pollTimeout, now);
}
@Override
public List<ClientResponse> poll(long timeout, long now) {
ensureActive();//确保NetworkClient处于Active状态
if (!abortedSends.isEmpty()) {//如果在发送的过程中有被终止,那么处理这部分消息
// If there are aborted sends because of unsupported version exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
}
long metadataTimeout = metadataUpdater.maybeUpdate(now);//根据needUpdate是否需要尝试更新元数据
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));//多路服务器轮询
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
//处理响应结果
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);
return responses;
}
首先看多路复用的轮询,里面肯定涉及到nio的读写操作
@Override
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
boolean madeReadProgressLastCall = madeReadProgressLastPoll;//因为读有可能分多次,这里标记是否读完
clear();
boolean dataInBuffers = !keysWithBufferedRead.isEmpty();//是否还有数据在缓存中没读完
//如果有数据响应或者有新的连接或者有数据还在缓存没读完,设置timeout=0,表示不需要阻塞
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers))
timeout = 0;
if (!memoryPool.isOutOfMemory() && outOfMemory) {//处理内存溢出问题
//we have recovered from memory pressure. unmute any channel not explicitly muted for other reasons
log.trace("Broker no longer low on memory - unmuting incoming sockets");
for (KafkaChannel channel : channels.values()) {
if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
channel.maybeUnmute();
}
}
outOfMemory = false;
}
/* check ready keys */
long startSelect = time.nanoseconds();
int numReadyKeys = select(timeout);//等待响应
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {//有事件响应或者连接事件或者还有数据在缓存中没读完
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();//获取事件的集合
// Poll from channels that have buffered data (but nothing more from the underlying socket)
if (dataInBuffers) {//继续读取数据
keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
Set<SelectionKey> toPoll = keysWithBufferedRead;
keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
pollSelectionKeys(toPoll, false, endSelect);
}
// Poll from channels where the underlying socket has more data
pollSelectionKeys(readyKeys, false, endSelect);//处理轮询事件
// Clear all selected keys so that they are included in the ready count for the next select
readyKeys.clear();
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);//处理连接事件
immediatelyConnectedKeys.clear();//清除连接的键
} else {
madeReadProgressLastPoll = true; //no work is also "progress" 设置所有数据已读完
}
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
// Close channels that were delayed and are now ready to be closed
completeDelayedChannelClose(endIo);//处理那些延迟的通道
// we use the time at the end of select to ensure that we don't close any connections that
// have just been processed in pollSelectionKeys
maybeCloseOldestConnection(endSelect);//处理那些可能过期的老连接
// Add to completedReceives after closing expired connections to avoid removing
// channels with completed receives until all staged receives are completed.
addToCompletedReceives();//完成响应处理
}
上面最重要的就是对事件的处理pollSelectionKeys
void pollSelectionKeys(Set<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
for (SelectionKey key : determineHandlingOrder(selectionKeys)) {//轮询键
KafkaChannel channel = channel(key);//获取通道
long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;
boolean sendFailed = false;
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), currentTimeNanos);
try {
/* complete any connections that have finished their handshake (either normally or immediately) */
if (isImmediatelyConnected || key.isConnectable()) {//处理连接
if (channel.finishConnect()) {//完成服务器的连接处理
this.connected.add(channel.id());//将该节点id添加到已连接中
this.sensors.connectionCreated.record();
SocketChannel socketChannel = (SocketChannel) key.channel();
log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
socketChannel.socket().getReceiveBufferSize(),
socketChannel.socket().getSendBufferSize(),
socketChannel.socket().getSoTimeout(),
channel.id());
} else {
continue;
}
}
/* if channel is not ready finish prepare */
if (channel.isConnected() && !channel.ready()) {//如果已连接服务但通道还没准备好
try {
channel.prepare();//建立通道中的协议层transportLayer的连接
} catch (AuthenticationException e) {
sensors.failedAuthentication.record();
throw e;
}
if (channel.ready())
sensors.successfulAuthentication.record();
}
attemptRead(key, channel);//处理IO读操作
if (channel.hasBytesBuffered()) {//如果通道还有缓存数据
//this channel has bytes enqueued in intermediary buffers that we could not read
//(possibly because no memory). it may be the case that the underlying socket will
//not come up in the next poll() and so we need to remember this channel for the
//next poll call otherwise data may be stuck in said buffers forever. If we attempt
//to process buffered data and no progress is made, the channel buffered status is
//cleared to avoid the overhead of checking every time.
keysWithBufferedRead.add(key);
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {//如果通道有数据可读且键是写事件,则从通道读出数据,写到网卡中发送出去
Send send;
try {
send = channel.write();//IO写
} catch (Exception e) {
sendFailed = true;
throw e;
}
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
/* cancel any defunct sockets */
if (!key.isValid())
close(channel, CloseMode.GRACEFUL);
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else if (e instanceof AuthenticationException) // will be logged later as error by clients
log.debug("Connection with {} disconnected due to authentication exception", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
if (e instanceof DelayedResponseAuthenticationException)
maybeDelayCloseOnAuthenticationFailure(channel);
else
close(channel, sendFailed ? CloseMode.NOTIFY_ONLY : CloseMode.GRACEFUL);
} finally {
maybeRecordTimePerConnection(channel, channelStartTimeNanos);
}
}
}
private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {
//if channel is ready and has bytes to read from socket or buffer, and has no
//previous receive(s) already staged or otherwise in progress then read from it
//通道可读且是读事件
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
&& !explicitlyMutedChannels.contains(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null) {
madeReadProgressLastPoll = true;
addToStagedReceives(channel, networkReceive);//将读到的数据缓存起来
}
if (channel.isMute()) {
outOfMemory = true; //channel has muted itself due to memory pressure.
} else {
madeReadProgressLastPoll = true;
}
}
}
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
if (!stagedReceives.containsKey(channel))
stagedReceives.put(channel, new ArrayDeque<>());
Deque<NetworkReceive> deque = stagedReceives.get(channel);//将通道和通道的数据映射起来
deque.add(receive);
}
最后读完数据后会将NetworkReceive这些服务器返回的数据写到completedReceives这个List中
private void addToCompletedReceives() {
if (!this.stagedReceives.isEmpty()) {
Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
KafkaChannel channel = entry.getKey();
if (!explicitlyMutedChannels.contains(channel)) {
Deque<NetworkReceive> deque = entry.getValue();
addToCompletedReceives(channel, deque);
if (deque.isEmpty())
iter.remove();
}
}
}
}
private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
NetworkReceive networkReceive = stagedDeque.poll();
this.completedReceives.add(networkReceive);
this.sensors.recordBytesReceived(channel.id(), networkReceive.size());
}
轮询完多路复用后NetworkClient会进行一些列的handler*函数处理响应结果
@Override
public List<ClientResponse> poll(long timeout, long now) {
...
// process completed actions
//处理响应结果
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();//从下面handle步骤中收集响应处理结果
handleCompletedSends(responses, updatedNow);//处理不需要等待响应结果请求从InFlightRequests中删除掉
handleCompletedReceives(responses, updatedNow);//处理接收数据
handleDisconnections(responses, updatedNow);//处理连接断开的请求
handleConnections();//设置连接状态
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);//处理请求超时
completeResponses(responses);//完成所有响应的处理
return responses;
}
- handleCompletedSends
在构建ClientRequest的时候需要通过生产者定义的acks数是否等于0来判断是否需要等待服务器的响应,如果不等于0,那么表示需要等待,等于0表示不需要等待,最后这个判断会存储在InFlightRequest中的expectResponse变量中
private void handleCompletedSends(List<ClientResponse> responses, long now) {
// if no response is expected then when the send is completed, return it
for (Send send : this.selector.completedSends()) {
InFlightRequest request = this.inFlightRequests.lastSent(send.destination());//从头队列取出请求,因为最后发送的请求总是在队尾,所以这里从队头开始处理
if (!request.expectResponse) {//如果不需要等待服务器响应,从InFlightRequests拿掉这个请求
this.inFlightRequests.completeLastSent(send.destination());
responses.add(request.completed(null, now));//添加请求响应结果
}
}
}
public NetworkClient.InFlightRequest completeLastSent(String node) {
NetworkClient.InFlightRequest inFlightRequest = requestQueue(node).pollFirst();//出队
inFlightRequestCount.decrementAndGet();//堆积的请求数减1
return inFlightRequest;
}
- handleCompletedReceives
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {//遍历响应结果
String source = receive.source();//从响应中获取节点Id
InFlightRequest req = inFlightRequests.completeNext(source);//根据节点Id中获取获取对应的堆积请求
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
throttleTimeSensor, now);
if (log.isTraceEnabled()) {
log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,
req.header.apiKey(), req.header.correlationId(), responseStruct);
}
// If the received response includes a throttle delay, throttle the connection.
AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct);//根据请求的头部获取对应的body,比如fetch请求,metadataUpdater请求等
maybeThrottle(body, req.header.apiVersion(), req.destination, now);
if (req.isInternalRequest && body instanceof MetadataResponse)/如果是元数据的请求和响应
metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);//处理元数据的响应操作
else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
else
responses.add(req.completed(body, now));//添加请求响应结果
}
}
- handleDisconnections
private void handleDisconnections(List<ClientResponse> responses, long now) {
for (Map.Entry<String, ChannelState> entry : this.selector.disconnected().entrySet()) {//遍历连接断开的请求
String node = entry.getKey();
log.debug("Node {} disconnected.", node);
processDisconnection(responses, node, now, entry.getValue());//处理连接断开
}
// we got a disconnect so we should probably refresh our metadata and see if that broker is dead
if (this.selector.disconnected().size() > 0)
metadataUpdater.requestUpdate();//重新更新元数据
}
private void processDisconnection(List<ClientResponse> responses,
String nodeId,
long now,
ChannelState disconnectState) {
connectionStates.disconnected(nodeId, now);//设置节点为断开状态
apiVersions.remove(nodeId);
nodesNeedingApiVersionsFetch.remove(nodeId);
switch (disconnectState.state()) {//根据断开的状态输出断开原因
case AUTHENTICATION_FAILED:
AuthenticationException exception = disconnectState.exception();
connectionStates.authenticationFailed(nodeId, now, exception);
metadataUpdater.handleAuthenticationFailure(exception);
log.error("Connection to node {} ({}) failed authentication due to: {}", nodeId, disconnectState.remoteAddress(), exception.getMessage());
break;
case AUTHENTICATE:
// This warning applies to older brokers which don't provide feedback on authentication failures
log.warn("Connection to node {} ({}) terminated during authentication. This may indicate " +
"that authentication failed due to invalid credentials.", nodeId, disconnectState.remoteAddress());
break;
case NOT_CONNECTED:
log.warn("Connection to node {} ({}) could not be established. Broker may not be available.", nodeId, disconnectState.remoteAddress());
break;
default:
break; // Disconnections in other states are logged at debug level in Selector
}
for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {//清除掉这个因为断开而连接不上的节点的请求
log.trace("Cancelled request {} {} with correlation id {} due to node {} being disconnected",
request.header.apiKey(), request.request, request.header.correlationId(), nodeId);
if (!request.isInternalRequest)//非内网(本地)请求??
responses.add(request.disconnected(now, disconnectState.exception()));//添加请求响应结果
else if (request.header.apiKey() == ApiKeys.METADATA)//如果是元数据更新操作
metadataUpdater.handleDisconnection(request.destination);
}
}
- handleConnections
private void handleConnections() {
for (String node : this.selector.connected()) {
// We are now connected. Node that we might not still be able to send requests. For instance,
// if SSL is enabled, the SSL handshake happens after the connection is established.
// Therefore, it is still necessary to check isChannelReady before attempting to send on this
// connection.
if (discoverBrokerVersions) {
this.connectionStates.checkingApiVersions(node);
nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
log.debug("Completed connection to node {}. Fetching API versions.", node);
} else {
this.connectionStates.ready(node);//设置连接状态为READY
log.debug("Completed connection to node {}. Ready.", node);
}
}
}
- handleTimedOutRequests
private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
List<String> nodeIds = this.inFlightRequests.nodesWithTimedOutRequests(now);//获取超时的节点
for (String nodeId : nodeIds) {
// close connection to the node
this.selector.close(nodeId);//关闭这个节点的连接
log.debug("Disconnecting from node {} due to request timeout.", nodeId);
processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);//和处理断开连接一样,处理响应结果
}
// we disconnected, so we should probably refresh our metadata
if (!nodeIds.isEmpty())
metadataUpdater.requestUpdate();//可能需要去刷新下metadata元数据,因为有可能服务器发生变化导致超时
}
public List<String> nodesWithTimedOutRequests(long now) {
List<String> nodeIds = new ArrayList<>();//记录超时的节点
for (Map.Entry<String, Deque<NetworkClient.InFlightRequest>> requestEntry : requests.entrySet()) {//遍历所有发送节点下的请求
String nodeId = requestEntry.getKey();
Deque<NetworkClient.InFlightRequest> deque = requestEntry.getValue();
if (hasExpiredRequest(now, deque))//根据当前时间判断是否超时
nodeIds.add(nodeId);
}
return nodeIds;
}
private Boolean hasExpiredRequest(long now, Deque<NetworkClient.InFlightRequest> deque) {
for (NetworkClient.InFlightRequest request : deque) {//遍历队列中每个请求
long timeSinceSend = Math.max(0, now - request.sendTimeMs);//当前时间减去请求发送的时间
if (timeSinceSend > request.requestTimeoutMs)//是否超时
return true;
}
return false;
}
经过上面一系列的handle函数的处理后会将收集到响应结果进行最后的回调处理
private void completeResponses(List<ClientResponse> responses) {
for (ClientResponse response : responses) {
try {
response.onComplete();
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
public void onComplete() {
if (callback != null)
callback.onComplete(this);
}
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {
RequestHeader requestHeader = response.requestHeader();
long receivedTimeMs = response.receivedTimeMs();
int correlationId = requestHeader.correlationId();
if (response.wasDisconnected()) {
log.trace("Cancelled request with header {} due to node {} being disconnected",
requestHeader, response.destination());
for (ProducerBatch batch : batches.values())
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now, 0L);
} else if (response.versionMismatch() != null) {
log.warn("Cancelled request {} due to a version mismatch with node {}",
response, response.destination(), response.versionMismatch());
for (ProducerBatch batch : batches.values())
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now, 0L);
} else {
log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId);
// if we have a response, parse it
if (response.hasResponse()) {
ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
TopicPartition tp = entry.getKey();
ProduceResponse.PartitionResponse partResp = entry.getValue();
ProducerBatch batch = batches.get(tp);
completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
}
this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
} else {
// this is the acks = 0 case, just complete all requests
for (ProducerBatch batch : batches.values()) {
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now, 0L);
}
}
}
}
handleProduceResponse方法不管是处理哪种结果都会调用到completeBatch,而completeBatch也会调用每个批次的done方法完成处理,最后释放内存
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
long now, long throttleUntilTimeMs) {
Errors error = response.error;
if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && !batch.isDone() &&
(batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
// If the batch is too large, we split the batch and send the split batches again. We do not decrement
// the retry attempts in this case.
log.warn(
"Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
correlationId,
batch.topicPartition,
this.retries - batch.attempts(),
error);
if (transactionManager != null)
transactionManager.removeInFlightBatch(batch);
this.accumulator.splitAndReenqueue(batch);
this.accumulator.deallocate(batch);
this.sensors.recordBatchSplit();
} else if (error != Errors.NONE) {//发送异常
if (canRetry(batch, response, now)) {//检查是否需要重发
log.warn(
"Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
correlationId,
batch.topicPartition,
this.retries - batch.attempts() - 1,
error);
if (transactionManager == null) {
reenqueueBatch(batch, now);//重发
} else if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
// If idempotence is enabled only retry the request if the current producer id is the same as
// the producer id of the batch.
log.debug("Retrying batch to topic-partition {}. ProducerId: {}; Sequence number : {}",
batch.topicPartition, batch.producerId(), batch.baseSequence());
reenqueueBatch(batch, now);
} else {
failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +
"batch but the producer id changed from " + batch.producerId() + " to " +
transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped."), false);
}
} else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
// If we have received a duplicate sequence error, it means that the sequence number has advanced beyond
// the sequence of the current batch, and we haven't retained batch metadata on the broker to return
// the correct offset and timestamp.
//
// The only thing we can do is to return success to the user and not return a valid offset and timestamp.
completeBatch(batch, response);
} else {
final RuntimeException exception;
if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
exception = new TopicAuthorizationException(batch.topicPartition.topic());
else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
else
exception = error.exception();
// tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust
// its retries -- if it did, we don't know whether the sequence number was accepted or not, and
// thus it is not safe to reassign the sequence.
failBatch(batch, response, exception, batch.attempts() < this.retries);//处理失败的批次
}
if (error.exception() instanceof InvalidMetadataException) {//如果是跟新元数据出错
if (error.exception() instanceof UnknownTopicOrPartitionException) {
log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
"topic-partition may not exist or the user may not have Describe access to it",
batch.topicPartition);
} else {
log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " +
"to request metadata update now", batch.topicPartition, error.exception().toString());
}
metadata.requestUpdate();//更新metadata元数据
}
} else {
completeBatch(batch, response);//完成批次处理
}
// Unmute the completed partition.
if (guaranteeMessageOrder)
this.accumulator.unmutePartition(batch.topicPartition, throttleUntilTimeMs);
}
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
...
if (batch.done(response.baseOffset, response.logAppendTime, null)) {
maybeRemoveFromInflightBatches(batch);
this.accumulator.deallocate(batch);
}
}
public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
...
if (this.finalState.compareAndSet(null, tryFinalState)) {
completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);//处理回调
return true;
}
...
return false;
}
在处理completeBatch的过程中,如果出现问题有可能导致重发
private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response, long now) {
return !batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now) && //还没有到最大的响应超时时间
batch.attempts() < this.retries && //已发送的次数小于最大的重发的次数
!batch.isDone() && //还没有调用过批次done方法
((response.error.exception() instanceof RetriableException) ||
(transactionManager != null && transactionManager.canRetry(response, batch)));
}
private void reenqueueBatch(ProducerBatch batch, long currentTimeMs) {
this.accumulator.reenqueue(batch, currentTimeMs);
maybeRemoveFromInflightBatches(batch);
this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
}
public void reenqueue(ProducerBatch batch, long now) {
batch.reenqueued(now);//记录重发批次
Deque<ProducerBatch> deque = getOrCreateDeque(batch.topicPartition);//获取批次队列
synchronized (deque) {
if (transactionManager != null)
insertInSequenceOrder(deque, batch);
else
deque.addFirst(batch);//重新入队,这里是入头部,尽可能的被重新发送出去,但不能保证有序(非事务情况下)
}
}
void reenqueued(long now) {
attempts.getAndIncrement();//重发次数+1
lastAttemptMs = Math.max(lastAppendTime, now);//最后一次重发时间
lastAppendTime = Math.max(lastAppendTime, now);/最后一次append时间
retry = true;//重发标志位标记为true
}
public void maybeRemoveFromInflightBatches(ProducerBatch batch) {
List<ProducerBatch> batches = inFlightBatches.get(batch.topicPartition);
if (batches != null) {//从堆积请求中去掉
batches.remove(batch);
if (batches.isEmpty()) {
inFlightBatches.remove(batch.topicPartition);
}
}
}