消费者——网络客户端 - 969251639/study GitHub Wiki

ConsumerNetworkClient封装了NetworkClient,不仅可以和集群元数据metadata进行通讯,同时也集成了DelayedTaskQueue,用于控制各种定时的与服务器通讯的任务,比如心跳任务等。它内部有个unsent缓冲队列,用来记录那些待发送的请求

    public ConsumerNetworkClient(LogContext logContext,
                                 KafkaClient client,
                                 Metadata metadata,
                                 Time time,
                                 long retryBackoffMs,
                                 int requestTimeoutMs,
                                 int maxPollTimeoutMs) {
        this.log = logContext.logger(ConsumerNetworkClient.class);
        this.client = client;//NetworkClient,用于网络请求响应操作
        this.metadata = metadata;//集群元数据
        this.time = time;//当前时间
        this.retryBackoffMs = retryBackoffMs;//重试的间隔时间
        this.maxPollTimeoutMs = Math.min(maxPollTimeoutMs, MAX_POLL_TIMEOUT_MS);//最大的poll超时时间
        this.requestTimeoutMs = requestTimeoutMs;//请求超时时间
    }

首先看它用来发送请求的send方法

    public RequestFuture<ClientResponse> send(Node node,
                                              AbstractRequest.Builder<?> requestBuilder,
                                              int requestTimeoutMs) {
        long now = time.milliseconds();
        RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();//用来处理请求响应的future
        ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
                requestTimeoutMs, completionHandler);//构造请求,至于构造什么请求看requestBuilder参数的具体实现类
        unsent.put(node, clientRequest);//将请求放到unsent队列

        // wakeup the client in case it is blocking in poll so that we can send the queued request
        client.wakeup();//唤醒NetworkClient多路复用器
        return completionHandler.future;
    }

send的方法实现很简单,主要就是将要请求的节点构造成kafka的ClientRequest后反倒unsent缓冲队列中,RequestFutureCompletionHandler 是ConsumerNetworkClient一个内部类,实现了RequestCompletionHandler接口,RequestCompletionHandler接口只有一个onComplete方法

public interface RequestCompletionHandler {
    public void onComplete(ClientResponse response);
}

所以每当服务器响应后都会回调RequestFutureCompletionHandler 的onComplete方法,另外如果响应失败则回调用RequestFutureCompletionHandler的onFailure扩展方法记录响应失败的请求

    private class RequestFutureCompletionHandler implements RequestCompletionHandler {
        private final RequestFuture<ClientResponse> future;
        private ClientResponse response;//响应体
        private RuntimeException e;//响应失败的异常

        ...
        public void onFailure(RuntimeException e) {
            this.e = e;
            pendingCompletion.add(this);
        }

        @Override
        public void onComplete(ClientResponse response) {
            this.response = response;
            pendingCompletion.add(this);//将响应放到pendingCompletion队列中等待处理
        }
    }

另外unsent缓冲队列也是ConsumerNetworkClient一个内部类UnsentRequests来实现,内部结构用了一个ConcurrentHashMap来实现线程安全,ConcurrentHashMap的key是发送的节点,value是请求队列

    private final static class UnsentRequests {
        private final ConcurrentMap<Node, ConcurrentLinkedQueue<ClientRequest>> unsent;

        private UnsentRequests() {
            unsent = new ConcurrentHashMap<>();//使用ConcurrentHashMap做缓冲队列,每个Broker节点一个缓冲队列
        }

        public void put(Node node, ClientRequest request) {
            // the lock protects the put from a concurrent removal of the queue for the node
            synchronized (unsent) {
                ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node);
                if (requests == null) {//队列为空创建一个
                    requests = new ConcurrentLinkedQueue<>();
                    unsent.put(node, requests);
                }
                requests.add(request);//入队到队尾
            }
        }

        public int requestCount(Node node) {//统计节点下有多少个待发送的请求
            ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node);
            return requests == null ? 0 : requests.size();
        }

        public int requestCount() {//统计所有待发送的请求数
            int total = 0;
            //循环遍历所有节点下的所有请求数后汇总
            for (ConcurrentLinkedQueue<ClientRequest> requests : unsent.values())
                total += requests.size();
            return total;
        }

        public boolean hasRequests(Node node) {//节点下是否有待发送的请求在缓冲队列中
            ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node);
            return requests != null && !requests.isEmpty();//队列不为空则表示有待发送的请求
        }

        public boolean hasRequests() {//是否有待发送的请求在缓冲队列中
            //遍历所有节点下的缓冲队列
            for (ConcurrentLinkedQueue<ClientRequest> requests : unsent.values())
                if (!requests.isEmpty())//队列不为空则表示有待发送的请求,返回true
                    return true;
            return false;
        }

        private Collection<ClientRequest> removeExpiredRequests(long now) {//移除过期的请求
            List<ClientRequest> expiredRequests = new ArrayList<>();
            //遍历所有节点下的缓冲队列
            for (ConcurrentLinkedQueue<ClientRequest> requests : unsent.values()) {
                Iterator<ClientRequest> requestIterator = requests.iterator();
                while (requestIterator.hasNext()) {
                    ClientRequest request = requestIterator.next();//从队头开始检查,队头都是最先加入到队列中的,所以只要队头没有过期的,则也就不需要后面的检查,break跳出
                    long elapsedMs = Math.max(0, now - request.createdTimeMs());//用当前时间减去请求的创建时间,队头的创建时间
                    if (elapsedMs > request.requestTimeoutMs()) {//是否过期
                        expiredRequests.add(request);//加入到过期队列expiredRequests中
                        requestIterator.remove();/从缓冲队列中移除
                    } else//只要碰到还没有过期的请求,则跳出,继续下一个节点的检查
                        break;
                }
            }
            return expiredRequests;//返回过期的请求队列
        }

        public void clean() {//清除所有的请求
            // the lock protects removal from a concurrent put which could otherwise mutate the
            // queue after it has been removed from the map
            synchronized (unsent) {
                Iterator<ConcurrentLinkedQueue<ClientRequest>> iterator = unsent.values().iterator();
                while (iterator.hasNext()) {//挨个节点的移除,只要节点下的请求队列不为空
                    ConcurrentLinkedQueue<ClientRequest> requests = iterator.next();
                    if (requests.isEmpty())
                        iterator.remove();
                }
            }
        }

        public Collection<ClientRequest> remove(Node node) {//移除指定节点的请求
            // the lock protects removal from a concurrent put which could otherwise mutate the
            // queue after it has been removed from the map
            synchronized (unsent) {
                ConcurrentLinkedQueue<ClientRequest> requests = unsent.remove(node);
                return requests == null ? Collections.<ClientRequest>emptyList() : requests;
            }
        }

        public Iterator<ClientRequest> requestIterator(Node node) {//返回指定节点的请求迭代器
            ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node);
            return requests == null ? Collections.<ClientRequest>emptyIterator() : requests.iterator();
        }

        public Collection<Node> nodes() {//返回缓冲队列中的所有发送节点
            return unsent.keySet();
        }
    }

ConsumerNetworkClient还有几个和生产者一样的辅助方法

//返回集群中负载最低的节点
    public Node leastLoadedNode() {
        lock.lock();
        try {
            return client.leastLoadedNode(time.milliseconds());//内部使用inFlightRequests中待响应数最少的节点来实现,inFlightRequests记录了发送待响应或正发送的请求,如果某节点下inFlightRequests比较少,那么表示服务器的负载比较低
        } finally {
            lock.unlock();
        }
    }

    public boolean hasReadyNodes(long now) {//判断节点是否准备好
        lock.lock();
        try {
            return client.hasReadyNodes(now);//内部会挨个检查所有节点的连接状态是否是READY状态
        } finally {
            lock.unlock();
        }
    }

    public boolean awaitMetadataUpdate(Timer timer) {//阻塞等待metadata响应
        int version = this.metadata.requestUpdate();//获取当前的metadata的版本号
        do {
            poll(timer);//发送网络请求
            AuthenticationException ex = this.metadata.getAndClearAuthenticationException();
            if (ex != null)//是否有授权失败的异常,有则抛出异常
                throw ex;
        } while (this.metadata.version() == version && timer.notExpired());//如果版本号没变且时间还未超时,继续刷新metadata
        return this.metadata.version() > version;//返回是否是新版本的metadata
    }

    boolean ensureFreshMetadata(Timer timer) {//是否需要发起刷新metadata的请求
        if (this.metadata.updateRequested() || this.metadata.timeToNextUpdate(timer.currentTimeMs()) == 0) {//metadata的needUpdate为true或者metadata的缓存时间已到期
            return awaitMetadataUpdate(timer);//阻塞发起刷新metadata的请求
        } else {
            // the metadata is already fresh
            return true;
        }
    }

最后看ConsumerNetworkClient最重要的poll方法,kafka消费者最后都会调用此poll进行网络IO操作进行真正的请求发送
poll有多个重载方法,最后都会调用下面的方法

public void poll(Timer timer, PollCondition pollCondition, boolean disableWakeup)

它有三个参数,timer用来控制发送时间的时间控制器,pollCondition用来判断是否需要阻塞发送,disableWakeup用来判断是否需要wakeup休眠中的poll线程

    public void poll(Timer timer, PollCondition pollCondition, boolean disableWakeup) {
        // there may be handlers which need to be invoked if we woke up the previous call to poll
        firePendingCompletedRequests();

        lock.lock();
        try {
            // Handle async disconnects prior to attempting any sends
            handlePendingDisconnects();

            // send all the requests we can send now
            long pollDelayMs = trySend(timer.currentTimeMs());

            // check whether the poll is still needed by the caller. Note that if the expected completion
            // condition becomes satisfied after the call to shouldBlock() (because of a fired completion
            // handler), the client will be woken up.
            if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock())) {
                // if there are no requests in flight, do not block longer than the retry backoff
                long pollTimeout = Math.min(timer.remainingMs(), pollDelayMs);
                if (client.inFlightRequestCount() == 0)
                    pollTimeout = Math.min(pollTimeout, retryBackoffMs);
                client.poll(pollTimeout, timer.currentTimeMs());
            } else {
                client.poll(0, timer.currentTimeMs());
            }
            timer.update();

            // handle any disconnects by failing the active requests. note that disconnects must
            // be checked immediately following poll since any subsequent call to client.ready()
            // will reset the disconnect status
            checkDisconnects(timer.currentTimeMs());
            if (!disableWakeup) {
                // trigger wakeups after checking for disconnects so that the callbacks will be ready
                // to be fired on the next call to poll()
                maybeTriggerWakeup();
            }
            // throw InterruptException if this thread is interrupted
            maybeThrowInterruptException();

            // try again to send requests since buffer space may have been
            // cleared or a connect finished in the poll
            trySend(timer.currentTimeMs());

            // fail requests that couldn't be sent if they have expired
            failExpiredRequests(timer.currentTimeMs());

            // clean unsent requests collection to keep the map from growing indefinitely
            unsent.clean();
        } finally {
            lock.unlock();
        }

        // called without the lock to avoid deadlock potential if handlers need to acquire locks
        firePendingCompletedRequests();
    }
  1. 如果有待处理的响应的请求,则先处理这部分响应的请求
    firePendingCompletedRequests();

    private void firePendingCompletedRequests() {
        boolean completedRequestsFired = false;
        for (;;) {
            RequestFutureCompletionHandler completionHandler = pendingCompletion.poll();//从pendingCompletion队列中获取数据
            if (completionHandler == null)//如果为空,则没有响应数据要处理,跳出循环返回
                break;

            completionHandler.fireCompletion();//回调处理响应的方法
            completedRequestsFired = true;//如果有响应要处理,那么可能会有一些因为处理响应后与服务器交互的请求,将completedRequestsFired 设置为true,用来唤醒多路复用器
        }

        // wakeup the client in case it is blocking in poll for this future's completion
        if (completedRequestsFired)//唤醒因为等待客户端完成响应处理的请求的多路复用器
            client.wakeup();
    }

    public void fireCompletion() {
        //根据不同的响应内容处理处理不同的结果
        if (e != null) {//响应异常
            future.raise(e);
        } else if (response.authenticationException() != null) {//授权失败异常
            future.raise(response.authenticationException());
        } else if (response.wasDisconnected()) {//服务器断连异常
            log.debug("Cancelled request with header {} due to node {} being disconnected",
                    response.requestHeader(), response.destination());
            future.raise(DisconnectException.INSTANCE);
        } else if (response.versionMismatch() != null) {//版本不匹配异常
            future.raise(response.versionMismatch());
        } else {//成功响应的执行
            future.complete(response);
        }
    }

不管future对成功或失败的处理都会调用监听器进行相应的处理

    public void raise(RuntimeException e) {
        try {
            if (e == null)
                throw new IllegalArgumentException("The exception passed to raise must not be null");

            if (!result.compareAndSet(INCOMPLETE_SENTINEL, e))
                throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");

            fireFailure();
        } finally {
            completedLatch.countDown();//唤醒因为同步阻塞获取结果的线程
        }
    }
    private void fireFailure() {
        RuntimeException exception = exception();
        while (true) {
            RequestFutureListener<T> listener = listeners.poll();//获取监听器
            if (listener == null)//如果监听器为空,返回
                break;
            listener.onFailure(exception);//回调监听器的onFailure方法
        }
    }

    public void complete(T value) {
        try {
            if (value instanceof RuntimeException)
                throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException");

            if (!result.compareAndSet(INCOMPLETE_SENTINEL, value))
                throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
            fireSuccess();
        } finally {
            completedLatch.countDown();//唤醒因为同步阻塞获取结果的线程
        }
        }
    }

    private void fireSuccess() {
        T value = value();
        while (true) {
            RequestFutureListener<T> listener = listeners.poll();//获取监听器
            if (listener == null)//如果监听器为空,返回
                break;
            listener.onSuccess(value);//回调监听器的onSuccess方法
        }
    }
  1. 处理断开连接的节点
    handlePendingDisconnects();

    private void handlePendingDisconnects() {
        lock.lock();
        try {
            while (true) {
                Node node = pendingDisconnects.poll();//从队列中获取断开连接的节点
                if (node == null)//获取的节点为空,跳出循环
                    break;

                failUnsentRequests(node, DisconnectException.INSTANCE);//处理断开的节点中待发送的请求
                client.disconnect(node.idString());//断开该节点的连接
            }
        } finally {
            lock.unlock();
        }
    }

    private void failUnsentRequests(Node node, RuntimeException e) {
        // clear unsent requests to node and fail their corresponding futures
        lock.lock();
        try {
            Collection<ClientRequest> unsentRequests = unsent.remove(node);//从unsent缓冲队列中移除该节点下的所有请求
            for (ClientRequest unsentRequest : unsentRequests) {
                RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) unsentRequest.callback();
                handler.onFailure(e);//回调这些请求的onFailure方法进行失败处理
            }
        } finally {
            lock.unlock();
        }
    }
  1. 发送所有能满足发送条件的请求,也就是发送unsent缓冲队列中那些节点OK的请求,将其写到nio的通道中并从unsent队列移除,可以理解为将请求从unsent队列转移到了channel中
    long pollDelayMs = trySend(timer.currentTimeMs());

    private long trySend(long now) {
        long pollDelayMs = maxPollTimeoutMs;

        // send any requests that can be sent now
        for (Node node : unsent.nodes()) {//循环遍历所有unsent缓冲队列
            Iterator<ClientRequest> iterator = unsent.requestIterator(node);
            if (iterator.hasNext())//找出最大的一个延迟发送时间
                pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now));

            while (iterator.hasNext()) {
                ClientRequest request = iterator.next();
                if (client.ready(node, now)) {//判断节点的状态是否OK
                    client.send(request, now);//将请求转到nio的通道中,并且记录到inFlightRequests中
                    iterator.remove();//从unsent队列中移除
                }
            }
        }
        return pollDelayMs;
    }
  1. 判断是否需要阻塞
            if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock())) {//判断是否需要阻塞
                // if there are no requests in flight, do not block longer than the retry backoff
                long pollTimeout = Math.min(timer.remainingMs(), pollDelayMs);
                if (client.inFlightRequestCount() == 0)
                    pollTimeout = Math.min(pollTimeout, retryBackoffMs);
                client.poll(pollTimeout, timer.currentTimeMs());//发送nio通道中的数据
            } else {
                client.poll(0, timer.currentTimeMs());//发送nio通道中的数据
            }
            timer.update();//更新最新的时间
  1. 检查断开连接的情况,一般就是trySend中未发送的那部分请求
    private void checkDisconnects(long now) {
        // any disconnects affecting requests that have already been transmitted will be handled
        // by NetworkClient, so we just need to check whether connections for any of the unsent
        // requests have been disconnected; if they have, then we complete the corresponding future
        // and set the disconnect flag in the ClientResponse
        for (Node node : unsent.nodes()) {//遍历unsent缓冲队列的节点
            if (client.connectionFailed(node)) {//检查节点的连接情况
                // Remove entry before invoking request callback to avoid callbacks handling
                // coordinator failures traversing the unsent list again.
                Collection<ClientRequest> requests = unsent.remove(node);//从unsent缓冲队列中移除这个因断开连接的节点下的所有请求
                for (ClientRequest request : requests) {
                    RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
                    AuthenticationException authenticationException = client.authenticationException(node);
                    handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()),
                            request.callback(), request.destination(), request.createdTimeMs(), now, true,
                            null, authenticationException, null));//调用请求的回调方法
                }
            }
        }
    }
  1. 是否需要中断poll中线程
            if (!disableWakeup) {//是否能中断poll线程
                // trigger wakeups after checking for disconnects so that the callbacks will be ready
                // to be fired on the next call to poll()
                maybeTriggerWakeup();//尝试去中断poll线程
            }
            // throw InterruptException if this thread is interrupted
            maybeThrowInterruptException();//如果线程被中断抛出中断异常

    public void maybeTriggerWakeup() {
        if (!wakeupDisabled.get() && wakeup.get()) {//如果wakeupDisabled为false且wakeup返回true
            log.debug("Raising WakeupException in response to user wakeup");
            wakeup.set(false);//重置wakeup为false
            throw new WakeupException();//抛出WakeupException异常后中断poll线程
        }
    }

当有其他线程调用wakeup方法时,可以从poll线程中跳出(poll本身是个外围无限循环的调度,可以通过外围调用wakeup方法来中断poll的循环)

    public void wakeup() {
        // wakeup should be safe without holding the client lock since it simply delegates to
        // Selector's wakeup, which is thread-safe
        log.debug("Received user wakeup");
        this.wakeup.set(true);//设置wakeup为true,表示可以中断poll中的线程
        this.client.wakeup();
    }

    private void maybeThrowInterruptException() {
        if (Thread.interrupted()) {//如果运行的poll线程的中断标志位被设置为true则抛出InterruptedException异常
            throw new InterruptException(new InterruptedException());
        }
    }
  1. 再次尝试发送
            // try again to send requests since buffer space may have been
            // cleared or a connect finished in the poll
            //可能有线程因缓存空间不够而阻塞,那么上面的poll发送之后可以腾出新的空间或者有新的连接被创建了
            trySend(timer.currentTimeMs());
  1. 处理过期的请求
    private void failExpiredRequests(long now) {
        // clear all expired unsent requests and fail their corresponding futures
        Collection<ClientRequest> expiredRequests = unsent.removeExpiredRequests(now);//获取那些过期的请求
        for (ClientRequest request : expiredRequests) {
            RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
            handler.onFailure(new TimeoutException("Failed to send request after " + request.requestTimeoutMs() + " ms."));//回调future中的onFailure方法
        }
    }
⚠️ **GitHub.com Fallback** ⚠️