生产者——发送器 - 969251639/study GitHub Wiki

Kafka的每个生产者都有一个对应的sender线程处理IO,也就是消息的发送,它在生产者创建时就run在ioThrad的线程中

    KafkaProducer(Map<String, Object> configs,
                  Serializer<K> keySerializer,
                  Serializer<V> valueSerializer,
                  Metadata metadata,
                  KafkaClient kafkaClient,
                  ProducerInterceptors interceptors,
                  Time time) {
        ...
        this.sender = newSender(logContext, kafkaClient, this.metadata);//创建新的发送器
        String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;//sender的线程名为kafka-producer-network-thread+clientId
        this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
        this.ioThread.start();//启动线程
        ...
    }

从newSender可以知道,sender依赖三个组件,一个是logContext用来记录日志,一个是kafkaClient用来处理网络,一个是metadata用来获取集群的元数据
Sender的主要流程是根据RecordAccumulator累加器的情况,筛选出那些要发送的节点,然后根据每个节点的网络连接情况(kafkaClient控制),过滤出网络通的节点,为每一个node生成一个请求,最后调用kafkaClient发送出去

    Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) {
        ...
        //重发次数
        int retries = configureRetries(producerConfig, transactionManager != null, log);
        //acks数
        short acks = configureAcks(producerConfig, transactionManager != null, log);
        return new Sender(logContext,
                client,//NetworkClient,用来处理网络请求
                metadata,//集群元数据
                this.accumulator,//消息累加器
                maxInflightRequests == 1,//是否保证消息有序???
                producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),//最大请求大小,max.request.size控制
                acks,//ack数
                retries,//重发次数
                metricsRegistry.senderMetrics,
                time,//当前时间
                requestTimeoutMs,//发送请求超时时间
                producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),//重发的间隔时间
                this.transactionManager,//事务管理器
                apiVersions);//kafka版本号
    }

上面说过Sender是运行在ioThread线程上,它实现了Runnable接口,所以Sender的核心肯定是在run方法中

    public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        while (running) {//是否running
            try {
                run(time.milliseconds());//调用run方法,运行sender线程
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // okay we stopped accepting requests but there may still be
        // requests in the accumulator or waiting for acknowledgment,
        // wait until these are completed.
        //判断被关闭的条件
        while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
            try {//正常关闭,则将剩余的未发送完的消息发送完
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
        if (forceClose) {//如果是被强制关闭,则中断那些还没发送的消息
            // We need to fail all the incomplete batches and wake up the threads waiting on
            // the futures.
            log.debug("Aborting incomplete batches due to forced shutdown");
            this.accumulator.abortIncompleteBatches();
        }
        try {//关闭网络客户端
            this.client.close();
        } catch (Exception e) {
            log.error("Failed to close network client", e);
        }

        log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }

在分析真正的run方法前先看下它的退出情况,首先如果不是强制关闭并且还有消息在累加器中待发送或者还有在等待服务器响应的结果,则在此运行run方法正常的处理这部分内容

        while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
  • !forceClose:非强制关闭
  • this.accumulator.hasUndrained():还有消息待发送
    public boolean hasUndrained() {
        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {//遍历所有批次
            Deque<ProducerBatch> deque = entry.getValue();//获取队列
            synchronized (deque) {
                if (!deque.isEmpty())//队列不为空,表明还有消息在队列中待发送
                    return true;
            }
        }
        return false;
    }
  • this.client.inFlightRequestCount():等待服务器响应的结果
    @Override
    public int inFlightRequestCount() {//inFlightRequests记录那些发送出去了但服务器还没有响应的请求
        return this.inFlightRequests.count();
    }

如果是强制关闭,也就是forceClose=true,那么终止累加器中剩余的待发送的消息,也就是会丢弃这部分消息,也不管前面发送出去的消息服务器有没有返回

    if (forceClose) {
        // We need to fail all the incomplete batches and wake up the threads waiting on
        // the futures.
        log.debug("Aborting incomplete batches due to forced shutdown");
        this.accumulator.abortIncompleteBatches();
    }

    public void abortIncompleteBatches() {
        // We need to keep aborting the incomplete batch until no thread is trying to append to
        // 1. Avoid losing batches.
        // 2. Free up memory in case appending threads are blocked on buffer full.
        // This is a tight loop but should be able to get through very quickly.
        do {
            abortBatches();//终止累加器中的消息
        } while (appendsInProgress());//如果还有线程在append
        // After this point, no thread will append any messages because they will see the close
        // flag set. We need to do the last abort after no thread was appending in case there was a new
        // batch appended by the last appending thread.
        abortBatches();
        this.batches.clear();//清空所有批次,即清空累加器
    }

    private boolean appendsInProgress() {
        return appendsInProgress.get() > 0;//appendsInProgress记录了正在append消息到累加器的线程数
    }

    private void abortBatches() {
        abortBatches(new KafkaException("Producer is closed forcefully."));
    }

    void abortBatches(final RuntimeException reason) {
        for (ProducerBatch batch : incomplete.copyAll()) {//遍历那些待发送的消息,incomplete存储了这些消息,在append成功后
            Deque<ProducerBatch> dq = getDeque(batch.topicPartition);//获取队列
            synchronized (dq) {
                batch.abortRecordAppends();//终止这些正在append的消息
                dq.remove(batch);//将该批次从队列中移除
            }
            batch.abort(reason);//终止那些已append但没有发送的消息
            deallocate(batch);//释放内存
        }
    }

    public void abortRecordAppends() {
        recordsBuilder.abort();
    }

    public void abort() {
        closeForRecordAppends();//关闭流
        buffer().position(initialPosition);//重置nio的缓存位置
        aborted = true;//标记为已终止
    }

    public void closeForRecordAppends() {
        if (appendStream != CLOSED_STREAM) {
            try {
                appendStream.close();//关闭追加流
            } catch (IOException e) {
                throw new KafkaException(e);
            } finally {
                appendStream = CLOSED_STREAM;
            }
        }
    }

    public void abort(RuntimeException exception) {
        if (!finalState.compareAndSet(null, FinalState.ABORTED))
            throw new IllegalStateException("Batch has already been completed in final state " + finalState.get());

        log.trace("Aborting batch for partition {}", topicPartition, exception);
        completeFutureAndFireCallbacks(ProduceResponse.INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, exception);//处理回调
    }

    private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
        // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
        produceFuture.set(baseOffset, logAppendTime, exception);

        // execute callbacks
        for (Thunk thunk : thunks) {//遍历回调方法
            try {
                if (exception == null) {
                    RecordMetadata metadata = thunk.future.value();
                    if (thunk.callback != null)
                        thunk.callback.onCompletion(metadata, null);//成功的回调
                } else {
                    if (thunk.callback != null)
                        thunk.callback.onCompletion(null, exception);//异常的回调
                }
            } catch (Exception e) {
                log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
            }
        }

        produceFuture.done();//唤醒等待获取发送结果的线程
    }

接下来看强制关闭的方法

    public void forceClose() {
        this.forceClose = true;//深圳强制关闭
        initiateClose();//初始化关闭处理
    }
    public void initiateClose() {
        // Ensure accumulator is closed first to guarantee that no more appends are accepted after
        // breaking from the sender loop. Otherwise, we may miss some callbacks when shutting down.
        this.accumulator.close();//关闭消息累加器
        this.running = false;//将Sender设置为非运行状态
        this.wakeup();//唤醒网络请求处理读写
    }

关闭需要注意的是,强制关闭的话会丢弃未发送的消息,但会处理已异常的方式处理回调,如果是正常的退出,那么则会继续将那些为发送的消息继续发送后退出

接下来看Sender的真正run方法

    void run(long now) {
        if (transactionManager != null) {//处理事务相关
            ...
        }

        long pollTimeout = sendProducerData(now);//将消息写到nio的通道中,触发写事件
        client.poll(pollTimeout, now);//多路复用监听到上面步骤的写事件,处理网络读写  
    }

可以看到如果不看事务的处理,那么run就只需要做两个动作,一个是将buffer的数据写到channel,然后网络处理器NetworkClient会真正去处理IO读写

    private long sendProducerData(long now) {
        Cluster cluster = metadata.fetch();//获取集群信息
        // get the list of partitions with data ready to send
        //从累加器中获取可以发送的节点
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (!result.unknownLeaderTopics.isEmpty()) {//存在没有leader的节点
            // The set of topics with unknown leader contains topics with leader election pending as well as
            // topics which may have expired. Add the topic again to metadata to ensure it is included
            // and request metadata update, since there are messages to send to the topic.
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);//将主题添加到metadata中

            log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
                result.unknownLeaderTopics);
            this.metadata.requestUpdate();//修改needUpdate为true,更新matedata
        }

        // remove any nodes we aren't ready to send to
        // 过滤掉那些网络不通的节点
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {//检查网络连接
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }

        // create produce requests
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);//转成节点id和待发送的数据的映射
        addToInflightBatches(batches);//添加待发送批次
        if (guaranteeMessageOrder) {//是否保证有序???
            // Mute all the partitions drained
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        accumulator.resetNextBatchExpiryTime();
        List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
        expiredBatches.addAll(expiredInflightBatches);

        // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
        // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
        // we need to reset the producer id here.
        if (!expiredBatches.isEmpty())//存在超时的批次
            log.trace("Expired {} batches in accumulator", expiredBatches.size());
        for (ProducerBatch expiredBatch : expiredBatches) {
            String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
                + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
            failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);//处理超时的批次
            if (transactionManager != null && expiredBatch.inRetry()) {//事务相关
                // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
            }
        }
        sensors.updateProduceRequestMetrics(batches);//性能相关

        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
        // time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
        // sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
        // that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
        pollTimeout = Math.max(pollTimeout, 0);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            // if some partitions are already ready to be sent, the select time would be 0;
            // otherwise if some partition already has some data accumulated but not ready yet,
            // the select time will be the time difference between now and its linger expiry time;
            // otherwise the select time will be the time difference between now and the metadata expiry time;
            pollTimeout = 0;
        }
        sendProduceRequests(batches, now);//发送创建request
        return pollTimeout;
    }

逐步分析上面的过程

  1. 从集群和消息累加器中获取哪些可以发送的节点
        Cluster cluster = metadata.fetch();//获取集群信息
        //从累加器中获取可以发送的节点
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

  1. 如果存在没有leader的节点,那么需要发送刷新metadata的请求
        if (!result.unknownLeaderTopics.isEmpty()) {//存在没有leader的节点
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);//将主题添加到metadata中

            log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
                result.unknownLeaderTopics);
            this.metadata.requestUpdate();//修改needUpdate为true,更新matedata
        }
  1. 过滤掉那些连接失败的
        // 过滤掉那些网络不通的节点
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {//检查网络连接
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }
  1. 重置下一次发送超时的时间
    accumulator.resetNextBatchExpiryTime();

    public void resetNextBatchExpiryTime() {
        nextBatchExpiryTimeMs = Long.MAX_VALUE;
    }
  1. 读取超时的请求或者批次
        List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
        expiredBatches.addAll(expiredInflightBatches);

expiredInflightBatches:记录那些已发送没收到响应或正准备发送的超时的批次

    private List<ProducerBatch> getExpiredInflightBatches(long now) {
        List<ProducerBatch> expiredBatches = new ArrayList<>();//记录超时的批次的List

        for (Iterator<Map.Entry<TopicPartition, List<ProducerBatch>>> batchIt = inFlightBatches.entrySet().iterator(); batchIt.hasNext();) {//inFlightBatches存储了已发送没收到响应或正准备发送的批次的集合
            Map.Entry<TopicPartition, List<ProducerBatch>> entry = batchIt.next();
            List<ProducerBatch> partitionInFlightBatches = entry.getValue();
            if (partitionInFlightBatches != null) {
                Iterator<ProducerBatch> iter = partitionInFlightBatches.iterator();
                while (iter.hasNext()) {
                    ProducerBatch batch = iter.next();
                    //是否超时,默认30s,记录在累加器中的deliveryTimeoutMs(delivery.timeout.ms:30s)
                    if (batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now)) {
                        iter.remove();移除该批次
                        // expireBatches is called in Sender.sendProducerData, before client.poll.
                        // The !batch.isDone() invariant should always hold. An IllegalStateException
                        // exception will be thrown if the invariant is violated.
                        if (!batch.isDone()) {//批次还没有调用过done方法,添加到过期的List中
                            expiredBatches.add(batch);
                        } else {
                            throw new IllegalStateException(batch.topicPartition + " batch created at " +
                                batch.createdMs + " gets unexpected final state " + batch.finalState());
                        }
                    } else {
                        accumulator.maybeUpdateNextBatchExpiryTime(batch);
                        break;
                    }
                }
                if (partitionInFlightBatches.isEmpty()) {
                    batchIt.remove();
                }
            }
        }
        return expiredBatches;
    }

expiredBatches:记录那些已发送没收到响应或正准备发送的超时或还在消息累加器中但已过期的批次

    public List<ProducerBatch> expiredBatches(long now) {
        List<ProducerBatch> expiredBatches = new ArrayList<>();//记录过期的批次
        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {//遍历累加器中的批次
            // expire the batches in the order of sending
            Deque<ProducerBatch> deque = entry.getValue();
            synchronized (deque) {
                while (!deque.isEmpty()) {
                    ProducerBatch batch = deque.getFirst();//从头部取,头部肯定是最先过期的,如果头部不过期,跳出while继续下一个批次
                    if (batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, now)) {//过期
                        deque.poll();//从队列中弹出
                        batch.abortRecordAppends();//终止该批次的append操作
                        expiredBatches.add(batch);//添加到过期容器
                    } else {
                        maybeUpdateNextBatchExpiryTime(batch);
                        break;
                    }
                }
            }
        }
        return expiredBatches;
    }
  1. 处理超时的批次
        if (!expiredBatches.isEmpty())//存在超时的批次
            log.trace("Expired {} batches in accumulator", expiredBatches.size());
        for (ProducerBatch expiredBatch : expiredBatches) {//遍历超时的批次
            String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
                + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
            failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);//处理超时的批次
            if (transactionManager != null && expiredBatch.inRetry()) {//事务相关
                // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
            }
        }

    private void failBatch(ProducerBatch batch, long baseOffset, long logAppendTime, RuntimeException exception,
        boolean adjustSequenceNumbers) {
        if (transactionManager != null) {//事务相关
            ...
        }
        //记录错误的消息
        this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);

        if (batch.done(baseOffset, logAppendTime, exception)) {//调用每个批次下的done方法done方法会处理每个批次里面的消息的回调方法
            maybeRemoveFromInflightBatches(batch);//从已发送没收到响应或正准备发送的批次的集合移除该批次,也就是从inFlightBatches中移除
            this.accumulator.deallocate(batch);//回收内存
        }
    }
  1. 创建发送的request,写入到nio的channel中
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
        pollTimeout = Math.max(pollTimeout, 0);
        if (!result.readyNodes.isEmpty()) {//如果已有节点可以发送,那么不需要阻塞selector,将pollTimeout设置为0
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            pollTimeout = 0;
        }
        sendProduceRequests(batches, now);//发送创建request

sendProduceRequests是整个sendProducerData方法中最重要的方法,该方法会真正的将数据写到nio的通道中

    private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
        for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
            sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
    }

    private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
        if (batches.isEmpty())//待发送的批次为空,返回
            return;

        Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());//缓存主题下的分区和消息内存记录器的映射
        final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());//缓存主题下的分区和它对应的批次的映射

        // find the minimum magic version used when creating the record sets
        byte minUsedMagic = apiVersions.maxUsableProduceMagic();
        for (ProducerBatch batch : batches) {
            if (batch.magic() < minUsedMagic)
                minUsedMagic = batch.magic();
        }

        for (ProducerBatch batch : batches) {//开始遍历待发送的批次
            TopicPartition tp = batch.topicPartition;//主题分区
            MemoryRecords records = batch.records();//消息的内存记录器

            // down convert if necessary to the minimum magic used. In general, there can be a delay between the time
            // that the producer starts building the batch and the time that we send the request, and we may have
            // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
            // the new message format, but found that the broker didn't support it, so we need to down-convert on the
            // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
            // not all support the same message format version. For example, if a partition migrates from a broker
            // which is supporting the new magic version to one which doesn't, then we will need to convert.
            if (!records.hasMatchingMagic(minUsedMagic))
                records = batch.records().downConvert(minUsedMagic, 0, time).records();
            produceRecordsByPartition.put(tp, records);//缓存主题下的分区和消息内存记录器的映射
            recordsByPartition.put(tp, batch);//缓存主题下的分区和它对应的批次的映射
        }

        String transactionalId = null;
        if (transactionManager != null && transactionManager.isTransactional()) {//事务相关
            transactionalId = transactionManager.transactionalId();
        }
        ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
                produceRecordsByPartition, transactionalId);
        RequestCompletionHandler callback = new RequestCompletionHandler() {//请求成功后的回调
            public void onComplete(ClientResponse response) {
                handleProduceResponse(response, recordsByPartition, time.milliseconds());
            }
        };

        String nodeId = Integer.toString(destination);//获取待发送的节点id
        ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
                requestTimeoutMs, callback);//构造发送请求
        client.send(clientRequest, now);//发送请求,这里只是将消息写到通道,还没有真正的处理网络IO
        log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
    }

可以看到sender的具体与网络的操作都交给了他的网络客户端NetworkClient去操作,而NetworkClient就是处理这部分的组件
https://github.com/969251639/study/wiki/%E7%94%9F%E4%BA%A7%E8%80%85%E2%80%94%E2%80%94%E7%BD%91%E7%BB%9C

⚠️ **GitHub.com Fallback** ⚠️