生产者——消息累加器 - 969251639/study GitHub Wiki

消息累加器accumulator是kafka用来缓存待发送的消息,里面以多个批次的方式保存,提高网络带宽,实现高效率发送,其作用相当于一个缓冲队列,会根据主题和分区(TopicPartition对象)对消息进行分组,每一个TopicPartition对象会对应一个双端队列Deque,ProducerBatch表示一批消息,在KafkaProducer发送消息时,总是从队列队尾 (Tail)取出ProducerBatch(如果队列不为空),而Sender是从队列头(Head)取ProducerBatch进行处理

accumulator的创建在KafkaProducor的构造方法中

KafkaProducer(Map<String, Object> configs,
              Serializer<K> keySerializer,
              Serializer<V> valueSerializer,
              Metadata metadata,
              KafkaClient kafkaClient,
              ProducerInterceptors interceptors,
              Time time) {
    ...
    this.accumulator = new RecordAccumulator(logContext,
        config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
        this.compressionType,
        config.getInt(ProducerConfig.LINGER_MS_CONFIG),
        retryBackoffMs,
        deliveryTimeoutMs,
        metrics,
        PRODUCER_METRIC_GROUP_NAME,
        time,
        apiVersions,
        transactionManager,
        new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, 
            PRODUCER_METRIC_GROUP_NAME));
    ...
}

可以看到创建的accumulator实现是RecordAccumulator,创建时参数也比较多,先看下它的成员变量

    private final Logger log;//日志组件
    private volatile boolean closed;//是否关闭
    private final AtomicInteger flushesInProgress;正在flush的线程数
    private final AtomicInteger appendsInProgress;正在append的线程数
    private final int batchSize;//每个批次的大小,这个变量控制kafka生产者的发送效率,十分重要
    private final CompressionType compression;//压缩类型
    private final long lingerMs;//每个批次延迟发送的时间间隔,默认为0,表示不延迟
    private final long retryBackoffMs;//重试最少间隔多少毫秒后可以再执行
    private final long deliveryTimeoutMs;//发送的超时时间
    private final BufferPool free;//缓冲池
    private final Time time;//当前系统时间
    private final ApiVersions apiVersions;//当前api版本
    //批次,一个批次又主题的分区和一个双向队列组成(ProducerBatch包含了真正的消息内容)
    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
    private final IncompleteBatches incomplete;//记录待发送的批次
    // The following variables are only accessed by the sender thread, so we don't need to protect them.
    private final Map<TopicPartition, Long> muted;
    private int drainIndex;
    private final TransactionManager transactionManager;//事务管理器
    //下一批次过期时间
    private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire.

RecordAccumulator只有一个构造方法,也就是KafkaProducer调用的构造方法

    public RecordAccumulator(LogContext logContext,
                             int batchSize,
                             CompressionType compression,
                             long lingerMs,
                             long retryBackoffMs,
                             long deliveryTimeoutMs,
                             Metrics metrics,
                             String metricGrpName,
                             Time time,
                             ApiVersions apiVersions,
                             TransactionManager transactionManager,
                             BufferPool bufferPool) {
        this.log = logContext.logger(RecordAccumulator.class);//记录日志用
        this.drainIndex = 0;//
        this.closed = false;//是否关闭
        this.flushesInProgress = new AtomicInteger(0);//正在flush的线程数
        this.appendsInProgress = new AtomicInteger(0);//正在往累加器append的线程数
        this.batchSize = batchSize;//每个批次的大小,这个变量控制kafka生产者的发送效率,十分重要
        this.compression = compression;//压缩类型
        this.lingerMs = lingerMs;//每个批次延迟发送的时间间隔,默认为0,表示不延迟
        this.retryBackoffMs = retryBackoffMs;//重试最少间隔多少毫秒后可以再执行
        this.deliveryTimeoutMs = deliveryTimeoutMs;//发送的超时时间
        this.batches = new CopyOnWriteMap<>();//保存主题下的分区与批次队列的映射关系
        this.free = bufferPool;//Buffer缓存池,和batchSize息息相关
        this.incomplete = new IncompleteBatches();//记录待发送的批次
        this.muted = new HashMap<>();
        this.time = time;//当前系统时间
        this.apiVersions = apiVersions;//当前api版本
        this.transactionManager = transactionManager;//事务管理器
        registerMetrics(metrics, metricGrpName);//性能相关
    }

之前说过Kafka发送消息的时候其实是先发送到累加器缓存,所以先看下发送的代码,如何发送到累计器中

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    ...
    RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);//将消息追加到累加器
    if (result.batchIsFull || result.newBatchCreated) {//如果添加到累加器后累加器已满或者是新的批次创建
        log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
        this.sender.wakeup();//唤醒发送线程进行真正的消息发送
    }
    ...
}

所以累加器最重要的方法就是append方法

    public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        appendsInProgress.incrementAndGet();//append线程数加1
        ByteBuffer buffer = null;
        if (headers == null) headers = Record.EMPTY_HEADERS;//如果header为nullnew一个空的header
        try {
            // check if we have an in-progress batch
            //根据TopicPartition从缓存获取Deque,没有创建一个
            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {//上锁,保证有序
                if (closed)//累加器关闭,抛异常
                    throw new KafkaException("Producer closed while send in progress");
                //尝试追加到队列的一个批次中
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null)//追加成功,返回
                    return appendResult;
            }

            // we don't have an in-progress record batch try to allocate a new batch
            //下面是内存的申请,不需要锁,但在申请内存过程中会导致因内存不足而阻塞
            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
            //如果待发送的消息大于设置批次大小,则用发送的消息的大小区申请,否则申请一个批次的大小
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            buffer = free.allocate(size, maxTimeToBlock);//申请内存
            synchronized (dq) {//继续上锁添加到队列
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)//累加器关闭,抛异常
                    throw new KafkaException("Producer closed while send in progress");
                //再次尝试添加到队列,有可能上面的操作执行过程中有消息被发送而导致队列中有空闲的空间,
                //同时由于append只会向队列中的最后一个批次添加,
                //如果多线程添加,那么会有多个线程因为第一次append失败而导致两个都去做新的批次的创建,
                //而导致第一个创建的批次的内存使用未满而浪费,所以第二次append依旧上锁
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null) {//追加成功,返回
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    return appendResult;
                }
                //创建一个新的批次
                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));//追加到新批次

                dq.addLast(batch);//放入队尾
                incomplete.add(batch);//添加为完成发送的批次

                // Don't deallocate this buffer in the finally block as it's being used in the record batch
                buffer = null;//防止finally的代码回收这部分内存
                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
            }
        } finally {
            if (buffer != null)//释放申请为使用的内存,主要在第二次追加到队列成功时
                free.deallocate(buffer);
            appendsInProgress.decrementAndGet();//append线程数减1
        }
    }

上面的流程过程虽然不多,但涉及到的内容还是挺广的,逐一分析下

  1. 根据TopicPartition从缓存获取Deque,没有创建一个
    Deque<ProducerBatch> dq = getOrCreateDeque(tp);

    private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
        Deque<ProducerBatch> d = this.batches.get(tp);
        if (d != null)
            return d;
        d = new ArrayDeque<>();
        Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);//Map的putIfAbsent方法不会覆盖原有值,如果存在的话
        if (previous == null)
            return d;
        else
            return previous;
    }
  1. 第一次尝试追加
    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                         Callback callback, Deque<ProducerBatch> deque) {
        ProducerBatch last = deque.peekLast();//取出队尾(peek方法出队时不删除出队的数据)
        if (last != null) {//如果队尾不为空
            //尝试往队尾的批次添加消息
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
            if (future == null)//追加失败,关闭追加
                last.closeForRecordAppends();
            else//追加成功,返回一个result
                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
        }
        return null;//队尾为空,则说明这个双向队列为空,也就是没有任何批次在队列里面,需要去创建批次,故返回null
    }

可以看到,消息的添加最终都会添加到队列中队尾的那个批次中
在分析批次的tryAppend之前,先看下ProducerBatch这个组件的结构,首先看它的构造方法

    public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) {
        this.createdMs = createdMs;//创建时间
        this.lastAttemptMs = createdMs;//最后一次尝试追加的时间
        this.recordsBuilder = recordsBuilder;//真正存储消息的内存记录器
        this.topicPartition = tp;//记录主题下的分区
        this.lastAppendTime = createdMs;//最后一次追加时间
        this.produceFuture = new ProduceRequestResult(topicPartition);//记录追加结果的Future
        this.retry = false;//是否重试
        this.isSplitBatch = isSplitBatch;//是否切割批次
        float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                                recordsBuilder.compressionType());//压缩率
        recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);//设置压缩率
    }

再看tryAppend方法

    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
        if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {//估算当前批次的内存容量是否能够容纳这次发送的消息大小
            return null;//批次内存不够则返回null
        } else {
            Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);//追加到记录内存的最后
            //估算批次中占用内存最大的一个消息大小
            this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                    recordsBuilder.compressionType(), key, value, headers));
            this.lastAppendTime = now;//记录最后一次追加的时间
            //创建一个用于记录消息响应的future
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length,
                                                                   Time.SYSTEM);
            // we have to keep every future returned to the users in case the batch needs to be
            // split to several new batches and resent.
            thunks.add(new Thunk(callback, future));//保存用户的回调和拦截器中的回调,每次发送完收到服务器的响应时都会调用每个批次的done方法,done方法会对这些回调进行处理
            this.recordCount++;//给批次的消息数加1
            return future;
        }
    }
  1. 第一次追加失败,从内存缓冲池中申请新的内存空间
//下面是内存的申请,不需要锁,但在申请内存过程中会导致因内存不足而阻塞
            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));//如果待发送的消息大于设置批次大小,则用发送的消息的大小区申请,否则申请一个批次的大小
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            buffer = free.allocate(size, maxTimeToBlock);//申请内存

上面的代码有一个很关键的组件,这个组件与kafka的性能息息相关,这个组件是BufferPool,Kafka的Buffer缓存池,因为每次创建ByteBuffer(kafka消息的存储内容)都是一个很耗性能,所以Kafka实现了一个缓存池来管理这部分内存的使用和回收

KafkaProducer构造方法构造累加器时会创建这个缓存池

new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, 
            PRODUCER_METRIC_GROUP_NAME)

    public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
        this.poolableSize = poolableSize;//池中每个批次的大小
        this.lock = new ReentrantLock();//阻塞的锁
        this.free = new ArrayDeque<>();//申请的空闲的未使用的内存块,每块大小为poolableSize,存在双向队列中
        this.waiters = new ArrayDeque<>();//因内存不足而导致阻塞的线程,存放在双向队列中
        this.totalMemory = memory;//缓存池的总大小
        this.nonPooledAvailableMemory = memory;//剩余的可用内存,不包括上面的free
        //下面都是与性能分析相关
        this.metrics = metrics;
        this.time = time;
        this.waitTime = this.metrics.sensor(WAIT_TIME_SENSOR_NAME);
        MetricName rateMetricName = metrics.metricName("bufferpool-wait-ratio",
                                                   metricGrpName,
                                                   "The fraction of time an appender waits for space allocation.");
        MetricName totalMetricName = metrics.metricName("bufferpool-wait-time-total",
                                                   metricGrpName,
                                                   "The total time an appender waits for space allocation.");
        this.waitTime.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName));
    }

上面会涉及到两个很重要的参数,一个是poolableSize,由生产者的batch.size配置项控制,一个是totalMemory,由生产者的buffer.memory配置项控制为了更好的理解kafka的bufferPool,先看下它的具体结构

上图包含两个部分,红色和绿色的总和代表BufferPool的总量,用totalMemory表示(由buffer.memory配置);绿色代表可使用的空间,它又包括两个部分:上半部分代表未申请未使用的部分,用nonPooledAvailableMemory表示;下半部分代表已经申请但没有使用的部分,用一个ByteBuffer队列(Deque)表示,我们称这个队列为free,队列中的ByteBuffer的大小用poolableSize表示(由batch.size配置)。

所以可以得到下面两个计算公式
totalMemory = 可用空间(绿色部分) + 已用空间(红色部分)
可用空间 = nonPooledAvailableMemory + poolableSize * free.size


从上图可以看出申请size大小的内存有这么几种结束方式(红色框部分)
1、异常结束,比如申请的内存过大超过总量限定
2、直接用队列中的ByteBuffer分配内存
3、用avaliableMemory分配内存

蓝色框内的为大多数的内存分配方式,就是从队列中直接拿想要的ByteBuffer,也是kafka希望的分配方式;黄色的框为分配内存时队列中的内存不符合其分配的条件(队列为空或大小不匹配),从availableMemory中分配;
绿色框为当前内存池中内存不足时阻塞等待的情况,具体就是有一个累加器accumulated,如果累加器没有累加到size大小,说明还没有足够的内存释放出来,所以就会阻塞等待内存释放,内存释放之后会唤醒阻塞的线程,将可以分配的内存大小累加到累加器accumulated上,这样直到累加器accumulated大小满足size,就直接分配。
这里面还有一个原则就是如果还没给累加器accumulated累加过一次的话,也就是accumulated==0的时候,那么会优先尝试从队列中获取内存(有可能释放的内存释放到队列中)。
释放内存的话就比较简单了,如果释放的大小等于poolableSize的话,就把它放入free队列,否则释放到nonPooledAvailableMemory中(nonPooledAvailableMemory+=size)。所以只有固定大小的内存块被释放后才会进入池化列表,非常规释放后只会增加可用内存大小。

BufferPool是线程安全的,用一个ReentrantLock来保证,并且用一个Deque waiters队列来记录申请不到足够空间而阻塞的线程,此队列中实际记录的是阻塞线程对应的Condition对象,将阻塞线程对应的Condition加入队列,等待唤醒,唤醒的顺序根据入队顺序决定(先进先出)。

下面看具体的实现

  • 内存申请
    public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
        if (size > this.totalMemory)//申请的大小超出池的总大小,抛异常
            throw new IllegalArgumentException("Attempt to allocate " + size
                                               + " bytes, but there is a hard limit of "
                                               + this.totalMemory
                                               + " on memory allocations.");

        ByteBuffer buffer = null;
        this.lock.lock();//锁住
        try {
            // check if we have a free buffer of the right size pooled
            //如果要申请的内存大小和池中的批次大小一致,且空闲的未使用的队列中有,直接从队列中拿,这是大部分场景
            if (size == poolableSize && !this.free.isEmpty())
                return this.free.pollFirst();

            // now check if the request is immediately satisfiable with the
            // memory on hand or if we need to block
            int freeListSize = freeSize() * this.poolableSize;//计算空闲未使用的内存大小,也就是free部分的总大小
            if (this.nonPooledAvailableMemory + freeListSize >= size) {如果总共可用的内存大小大于等于需要申请的大小,则从剩余可用的内存申请
                // we have enough unallocated or pooled memory to immediately
                // satisfy the request, but need to allocate the buffer
                freeUp(size);//如果可用的内存不够分配size大小的内存空间,则重free队列中释放,直到可用内存足够分配size大小的内存
                this.nonPooledAvailableMemory -= size;//重新计算可用内存的大小
            } else {//进到这里因内存不够而导致阻塞
                // we are out of memory and will have to block
                int accumulated = 0;//收集释放出来的内存计数器
                Condition moreMemory = this.lock.newCondition();//锁住的条件
                try {
                    long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);//剩余时间
                    this.waiters.addLast(moreMemory);//将阻塞的线程加入到waiters队列中
                    // loop over and over until we have a buffer or have reserved
                    // enough memory to allocate one
                    while (accumulated < size) {//累计器累计到的可用内存小于需要申请的内存
                        long startWaitNs = time.nanoseconds();
                        long timeNs;
                        boolean waitingTimeElapsed;
                        try {
                            waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);//等待内存释放
                        } finally {性能监控用
                            long endWaitNs = time.nanoseconds();
                            timeNs = Math.max(0L, endWaitNs - startWaitNs);
                            recordWaitTime(timeNs);
                        }

                        if (waitingTimeElapsed) {//超时
                            throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                        }

                        remainingTimeToBlockNs -= timeNs;//记录剩余时间

                        // check if we can satisfy this request from the free list,
                        // otherwise allocate memory
                        //如果申请的大小是一个批次的大小,且有剩余的空闲内存,则从队列取出
                        if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                            // just grab a buffer from the free list
                            buffer = this.free.pollFirst();
                            accumulated = size;//跳出循环的条件
                        } else {
                            // we'll need to allocate memory, but we may only get
                            // part of what we need on this iteration
                            freeUp(size - accumulated);//释放free的内存
                            int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
                            this.nonPooledAvailableMemory -= got;//提前预约申请的内存
                            accumulated += got;//累加收集到释放的内存
                        }
                    }
                    // Don't reclaim memory on throwable since nothing was thrown
                    accumulated = 0;//如果没有抛异常,那么累计器清零,因为申请的内存已经记录到了nonPooledAvailableMemory 或者从free中获取,这样便不会计算到下面的nonPooledAvailableMemory中
                } finally {
                    // When this loop was not able to successfully terminate don't loose available memory
                    this.nonPooledAvailableMemory += accumulated;//如果因为抛异常而导致累计到的内存没有被申请成功,那么重新将这部分没有使用的累计内存放回可用内存
                    this.waiters.remove(moreMemory);//移除线程
                }
            }
        } finally {
            // signal any additional waiters if there is more memory left
            // over for them
            try {
                if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
                    this.waiters.peekFirst().signal();//如果已有可用的内存,去唤醒因内存不足而阻塞的线程
            } finally {
                // Another finally... otherwise find bugs complains
                lock.unlock();//解锁
            }
        }

        if (buffer == null)不是从free队列中获取的内存,则开始真正的分配nonPooledAvailableMemory部分的内存
            return safeAllocateByteBuffer(size);
        else
            return buffer;
    }
    private ByteBuffer safeAllocateByteBuffer(int size) {
        boolean error = true;
        try {
            ByteBuffer buffer = allocateByteBuffer(size);//调用NIO的ByteBuffer的内存分配
            error = false;
            return buffer;
        } finally {
            if (error) {
                this.lock.lock();
                try {
                    this.nonPooledAvailableMemory += size;//失败,回收分配失败的内存
                    if (!this.waiters.isEmpty())
                        this.waiters.peekFirst().signal();//唤醒其他等待线程
                } finally {
                    this.lock.unlock();
                }
            }
        }
    }

    // Protected for testing.
    protected ByteBuffer allocateByteBuffer(int size) {
        return ByteBuffer.allocate(size);
    }

    private void freeUp(int size) {
        //从free队列中拿内存,直到可用内存可以分配size大小的内存
        while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
            this.nonPooledAvailableMemory += this.free.pollLast().capacity();
    }
  • 内存释放
    public void deallocate(ByteBuffer buffer, int size) {
        lock.lock();//上锁
        try {
            if (size == this.poolableSize && size == buffer.capacity()) {//释放的内存是批次的大小
                buffer.clear();//清空buffer的内容
                this.free.add(buffer);//放回队列
            } else {
                this.nonPooledAvailableMemory += size;//放回可用内存
            }
            Condition moreMem = this.waiters.peekFirst();
            if (moreMem != null)//如果有线程阻塞
                moreMem.signal();//唤醒阻塞线程
        } finally {
            lock.unlock();//解锁
        }
    }

所以只要发送的消息的大小小于配置的batch.size大小(大部分场景肯定都是小于batch.size),则都会用缓存池中的队列,高效的使用内存

  1. 第二次追加
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    return appendResult;
                }

如果是因为内存不够而导致添加失败,这里申请完内存空间后再尝试追加,有可能有消息被发送而导致队列中有空闲的内存

  1. 第二次追加失败,则使用新的申请的内存创建一个新的批次
                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);//构造内存记录器
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());//构造新批次
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));//追加到该批次中

                dq.addLast(batch);//新增到双向队列
                incomplete.add(batch);//存储该批次到imcomplete中
  1. 收尾
        try {
            ...
                // Don't deallocate this buffer in the finally block as it's being used in the record batch
                buffer = null;//防止finally的代码回收这部分内存
                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
            }
        } finally {
            if (buffer != null)//释放申请为使用的内存,主要在第二次追加到队列成功时
                free.deallocate(buffer);
            appendsInProgress.decrementAndGet();//append线程数减1
        }

如果已经使用到了申请的内存创建的新批次,那么需要将buffer = null,这样在进入到finally的时候才不会回收这部分内存,而因为申请完内存后,且在创建新批次或第二次追加的过程中抛异常,那么buffer != null,需要在finally中回收这部分内存,避免浪费

从append的方法可以看到最后返回的是RecordAppendResult,RecordAppendResult是内部类,记录了消息追加的结果,他的结构十分简单

    public final static class RecordAppendResult {
        public final FutureRecordMetadata future;//追加的记录的Future
        public final boolean batchIsFull;//该批次是否已经满了,满的条件是批次的内存已满或者队列中有批次
        public final boolean newBatchCreated;//是否是新创建的批次,追加时为false,否则用申请的内存新创建的为true

        public RecordAppendResult(FutureRecordMetadata future, boolean batchIsFull, boolean newBatchCreated) {
            this.future = future;
            this.batchIsFull = batchIsFull;
            this.newBatchCreated = newBatchCreated;
        }
    }

注意RecordAppendResult中的两个成员变量,batchIsFull和newBatchCreated,这两个变量控制着Kafka生产者在将消息追加到累加器后是否唤醒Sender线程去发送消息

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    ...
    RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
    if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
        this.sender.wakeup();
    }
    ...
}

所以这里的batchIsFull并非是指真正的批次已满,这里有个两个含义,一个是如果队列已有批次,或者该批次已经真正的满了,那么说明已经有消息写入到了批次中,需要kafka去发送,而newBatchCreated为true则说明了有新的消息写入到新的批次,也需要kafka生产者去发送消息

new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true)   

最后说下追加的最后一个组件FutureRecordMetadata,也就是RecordAppendResult中的future变量,每当追加批次成功的时候都会创建出一个FutureRecordMetadata来保存批次的内容,比如主题的分区,消息键值对,批次的消息数等

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
    ...
    FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length,
                                                                   Time.SYSTEM);
    ...
}

public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long createTimestamp,
                            Long checksum, int serializedKeySize, int serializedValueSize, Time time) {
    this.result = result;//请求结果
    this.relativeOffset = relativeOffset;//相对偏移量,也就是追加前的消息数,比如该批次追加第5个消息,那么这里就是4
    this.createTimestamp = createTimestamp;//创建时间
    this.checksum = checksum;//校验码
    this.serializedKeySize = serializedKeySize;//序列化后键的大小
    this.serializedValueSize = serializedValueSize;//序列化后值的大小
    this.time = time;//追加的时间,也就是消息的发送时间
}

FutureRecordMetadata中可以调用get方法实现同步发送,也就是get方法会阻塞等待发送的响应结果,因为FutureRecordMetadata是实现了Future接口

    @Override
    public RecordMetadata get() throws InterruptedException, ExecutionException {
        this.result.await();//阻塞等待
        if (nextRecordMetadata != null)
            return nextRecordMetadata.get();
        return valueOrError();//获取响应结果或错误
    }

    @Override
    public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        // Handle overflow.
        long now = time.milliseconds();
        long timeoutMillis = unit.toMillis(timeout);
        long deadline = Long.MAX_VALUE - timeoutMillis < now ? Long.MAX_VALUE : now + timeoutMillis;
        boolean occurred = this.result.await(timeout, unit);//阻塞等待timeout
        if (!occurred)//超时
            throw new TimeoutException("Timeout after waiting for " + timeoutMillis + " ms.");
        if (nextRecordMetadata != null)
            return nextRecordMetadata.get(deadline - time.milliseconds(), TimeUnit.MILLISECONDS);
        return valueOrError();//获取响应结果或错误
    }

可以看到get方法的阻塞又会委托result的await方法,result又是ProduceRequestResult类,ProduceRequestResult类在创建批次时就会创建,与每个批次对应,记录了主题下的分区的该批次发送结果

    public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) {
        ...
        this.produceFuture = new ProduceRequestResult(topicPartition);
        ...
    }

    public ProduceRequestResult(TopicPartition topicPartition) {
        this.topicPartition = topicPartition;
    }

而ProduceRequestResult内部又使用一个CountDownLatch来实现阻塞

public class ProduceRequestResult {
    private final CountDownLatch latch = new CountDownLatch(1);
    private final TopicPartition topicPartition;
    ...
    public ProduceRequestResult(TopicPartition topicPartition) {
        this.topicPartition = topicPartition;
    }
    ...
    public void done() {//响应成功调用latch的countDown,唤醒调用了await方法的阻塞线程
        if (baseOffset == null)
            throw new IllegalStateException("The method `set` must be invoked before this method.");
        this.latch.countDown();
    }
    public void await() throws InterruptedException {//阻塞等待countDown
        latch.await();
    }
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {//阻塞等待countDown,超时返回false
        return latch.await(timeout, unit);
    }
    ...
}

而当响应成功时会调用ProducerBatch的done方法,done方法又会调用completeFutureAndFireCallbacks方法,触发latch的countDown

public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
    ...
    completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
    ...
}
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
    ...
    produceFuture.done();
    ...
}

另外,累加器还有一个很重要的方法是ready方法和drain方法,ready方法用来检测每个待发送的批次中每个主题所对应的节点是否能发送,drain方法用来将主题转成节点与消息体的映射,因为真正的发送是针对broker的发送,而broker又对应一个节点,这是解决方便网络发送的前提,而对开发者来说,不需要关注发往哪台broker,只需要关注需要发送到那个主题下的那个分区,所以drain方法相当于将两者做个转换

    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        Set<Node> readyNodes = new HashSet<>();//记录可以向那些node发送的集合
        long nextReadyCheckDelayMs = Long.MAX_VALUE;//记录下一次可以调用ready的时间间隔,防止ready调用过高
        Set<String> unknownLeaderTopics = new HashSet<>();//记录那些没有leader的node,可以理解为是非法的topic

        boolean exhausted = this.free.queued() > 0;//是否有线程因内存不足而阻塞
        //遍历当前累加器的所有批次,这里的batche是线程安全的写时复制的映射器
        //this.batches = new CopyOnWriteMap<>();
        //那么在这里遍历的都是当前可读的批次,如果有新批次,那么这里是遍历,而是在下一次,因为写的时候CopyOnWriteMap会写到另外一个新的内存地址
        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
            TopicPartition part = entry.getKey();//获取主题下的分区
            Deque<ProducerBatch> deque = entry.getValue();//获取主题下的分区对应的双向队列

            Node leader = cluster.leaderFor(part);//找出该主题所对应的leader节点
            synchronized (deque) {//上锁,不能追加新数据到该队列
                if (leader == null && !deque.isEmpty()) {//如果找不到leader且队列不为空
                    // This is a partition for which leader is not known, but messages are available to send.
                    // Note that entries are currently not removed from batches when deque is empty.
                    unknownLeaderTopics.add(part.topic());//记录到没有leader的容器中
                } else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {//如果可以向那些node发送的集合不包含该leader
                    ProducerBatch batch = deque.peekFirst();//获取第一个批次
                    if (batch != null) {//批次不为空
                        long waitedTimeMs = batch.waitedTimeMs(nowMs);
                        boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        boolean full = deque.size() > 1 || batch.isFull();
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        //full: 批次已满或者队列已有批次
                        //expired: 超时
                        //exhausted: 是否有其他线程在等待BufferPool释放内存空间
                        //closed: 累加器是否关闭
                        //flushInProgress: 是否有线程在等待flush操作完成
                        //以上五个条件任一条件成立,则sendable为true
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        if (sendable && !backingOff) {
                            readyNodes.add(leader);//添加到可以发送的节点的集合
                        } else {
                            long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                            // Note that this results in a conservative estimate since an un-sendable partition may have
                            // a leader that will later be found to have sendable data. However, this is good enough
                            // since we'll just wake up and then sleep again for the remaining time.
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);//计算下一次需要调用ready方法的时间间隔
                        }
                    }
                }
            }
        }
        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
    }

    public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
        if (nodes.isEmpty())
            return Collections.emptyMap();

        Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
        for (Node node : nodes) {
            List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);//找出该批次的主题所对应的节点
            batches.put(node.id(), ready);//记录节点和可以发送的批次的关系
        }
        return batches;
    }
⚠️ **GitHub.com Fallback** ⚠️