Flink Kafka Connector 源码解析 - tenji/ks GitHub Wiki

Flink Kafka Connector 源码解析

版本:1.13.0

地址:https://github.com/apache/flink/tree/release-1.13.0

一、Flink Kafka Consumer 源码

1.1 类图

Flink Kafka Consumer 继承了 FlinkKafkaConsumerBase 抽象类,而 FlinkKafkaConsumerBase 抽象类又继承了 RichParallelSourceFunction,所以要实现一个自定义的 Source 时,有两种实现方式:一种是通过实现 SourceFunction 接口来自定义并行度为 1 的数据源;另一种是通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 来自定义具有并行度的数据源。FlinkKafkaConsumer 的继承关系如下图所示:

1.2 FlinkKafkaConsumerBase 的 open 方法

函数的初始化方法,继承自接口 RichFunction。它在实际工作方法(如映射或连接)之前调用,因此适合一次性设置工作。传递给函数的配置对象可用于配置和初始化。该配置包含程序组合中在函数上配置的所有参数。

  1. 确定 Offset 的提交模式:

    offsetCommitMode 有三种,ON_CHECKPOINTS, KAFKA_PERIODIC, DISABLED,如果打开 checkpoint,offest 会记录在 snapshot 中,否则 offset 会定期提交到 kafka topic,如果是 disabled,就不会提交 offset。

    // determine the offset commit mode
    this.offsetCommitMode =
            OffsetCommitModes.fromConfiguration(
                    getIsAutoCommitEnabled(),
                    enableCommitOnCheckpoints,
                    ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
  2. 创建分区发现器(partitionDiscoverer),用于自动获取新分区:

    // create the partition discoverer
    this.partitionDiscoverer =
            createPartitionDiscoverer(
                    topicsDescriptor,
                    getRuntimeContext().getIndexOfThisSubtask(),
                    getRuntimeContext().getNumberOfParallelSubtasks());
  3. 开启分区发现器,其实本质就是根据 Kafka 的配置信息,初始化一个 Kafka 消费者:

    this.partitionDiscoverer.open();
    
    public void open() throws Exception {
        closed = false;
        initializeConnections();
    }
    
    @Override
    protected void initializeConnections() {
        this.kafkaConsumer = new KafkaConsumer<>(kafkaProperties);
    }
  4. 为每个 SubTask 分配消费的 Topic 以及对应分区

    final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();

    discoverPartitions 方法:

    public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
        if (!closed && !wakeup) {
            try {
                List<KafkaTopicPartition> newDiscoveredPartitions;
    
                // (1) get all possible partitions, based on whether we are subscribed to fixed
                // topics or a topic pattern
                // 判断 topic 是否是固定的 topic,如果是固定 topic,则直接获取对应 topic 分区的元数据信息,
                // 如果是正则表达式,则获取所有 topic,然后进行正则匹配,未匹配的 topic 会被移除,
                // 匹配上的 topic 也会获取每个 topic 对应分区的元数据信息
                if (topicsDescriptor.isFixedTopics()) {
                    newDiscoveredPartitions =
                            getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
                } else {
                    List<String> matchedTopics = getAllTopics();
    
                    // retain topics that match the pattern
                    Iterator<String> iter = matchedTopics.iterator();
                    while (iter.hasNext()) {
                        if (!topicsDescriptor.isMatchingTopic(iter.next())) {
                            iter.remove();
                        }
                    }
    
                    if (matchedTopics.size() != 0) {
                        // get partitions only for matched topics
                        newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);
                    } else {
                        newDiscoveredPartitions = null;
                    }
                }
    
                // (2) eliminate partition that are old partitions or should not be subscribed by
                // this subtask
                // 如果查询到的分区信息为空(可能是订阅的 Topic 不存在等),就抛出异常提示找不到任何分区信息
                if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
                    throw new RuntimeException(
                            "Unable to retrieve any partitions with KafkaTopicsDescriptor: "
                                    + topicsDescriptor);
                } else {
                    // 校验此 SubTask 是否应该订阅此 Topic 对应的分区
                    Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
                    KafkaTopicPartition nextPartition;
                    while (iter.hasNext()) {
                        nextPartition = iter.next();
                        if (!setAndCheckDiscoveredPartition(nextPartition)) {
                            iter.remove();
                        }
                    }
                }
    
                // 返回新发现的 SubTask 应该订阅的 Topic 以及对应分区
                return newDiscoveredPartitions;
            } catch (WakeupException e) {
                // the actual topic / partition metadata fetching methods
                // may be woken up midway; reset the wakeup flag and rethrow
                wakeup = false;
                throw e;
            }
        } else if (!closed && wakeup) {
            // may have been woken up before the method call
            wakeup = false;
            throw new WakeupException();
        } else {
            throw new ClosedException();
        }
    }

    setAndCheckDiscoveredPartition 方法:

    public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
        if (isUndiscoveredPartition(partition)) {
            // 发现新分区
            discoveredPartitions.add(partition);
    
            // 判断该分区是否应该由此 SubTask 消费
            return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks)
                    == indexOfThisSubtask;
        }
    
        return false;
    }

    assign 方法:

    public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
        int startIndex =
                ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
    
        // here, the assumption is that the id of Kafka partitions are always ascending
        // starting from 0, and therefore can be used directly as the offset clockwise from the
        // start index
        // 分区按照 round-robin 分配(按照 SubTask 索引顺时针)
        return (startIndex + partition.getPartition()) % numParallelSubtasks;
    }

    注意:虽然 Flink Kafka Consumer 每个 SubTask 都会起一个 KafkaConsumer,但是每个 SubTask 是使用 Assign 来消费,而不是 Subscribe 来消费,每个 SubTask 应该消费的分区由上面的 assign 方法确定。因此,如果多个 Flink 作业使用相同的 groupId 来消费,最终每个 Flink 作业都会消费所有分区的消息,也就是会重复消费。

  5. 初始化 Topic 对应的分区和该分区要开始读取的起始偏移量信息,保存到 subscribedPartitionsToStartOffsets:

    // 是否有可恢复的 State 信息
    if (restoredState != null) {
        // 遍历分区信息,如果该分区的 restoredState 不存在,则默认该分区从最早的 offset 开始读取
        for (KafkaTopicPartition partition : allPartitions) {
            if (!restoredState.containsKey(partition)) {
                restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
            }
        }
    
        for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry :
                restoredState.entrySet()) {
            // seed the partition discoverer with the union state while filtering out
            // restored partitions that should not be subscribed by this subtask
            // 判断该 subtask 的索引和这个分区分配给的 subtask 索引是否一样,
            // 一样就把分区和要读取的开始位置信息记录到 subscribedPartitionsToStartOffsets 中
            if (KafkaTopicPartitionAssigner.assign(
                            restoredStateEntry.getKey(),
                            getRuntimeContext().getNumberOfParallelSubtasks())
                    == getRuntimeContext().getIndexOfThisSubtask()) {
                subscribedPartitionsToStartOffsets.put(
                        restoredStateEntry.getKey(), restoredStateEntry.getValue());
            }
        }
    
        // 判断是否要过滤不匹配的 topic 信息(通过配置项配置),是的话,
        // 就过滤掉 subscribedPartitionsToStartOffsets 不匹配的分区信息
        if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
            subscribedPartitionsToStartOffsets
                    .entrySet()
                    .removeIf(
                            entry -> {
                                if (!topicsDescriptor.isMatchingTopic(
                                        entry.getKey().getTopic())) {
                                    LOG.warn(
                                            "{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.",
                                            entry.getKey());
                                    return true;
                                }
                                return false;
                            });
        }
    
        LOG.info(
                "Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
                getRuntimeContext().getIndexOfThisSubtask(),
                subscribedPartitionsToStartOffsets.size(),
                subscribedPartitionsToStartOffsets);
    } else {
        // use the partition discoverer to fetch the initial seed partitions,
        // and set their initial offsets depending on the startup mode.
        // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
        // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily
        // determined
        // when the partition is actually read.
        // 如果没有要恢复的 State 信息,首先判断启动模式
        switch (startupMode) {
            // SPECIFIC_OFFSETS 模式下,specificStartupOffsets 为空就抛异常
            case SPECIFIC_OFFSETS:
                if (specificStartupOffsets == null) {
                    throw new IllegalStateException(
                            "Startup mode for the consumer set to "
                                    + StartupMode.SPECIFIC_OFFSETS
                                    + ", but no specific offsets were specified.");
                }
    
                // 如果不为空,则遍历分区信息
                for (KafkaTopicPartition seedPartition : allPartitions) {
                    Long specificOffset = specificStartupOffsets.get(seedPartition);
                    if (specificOffset != null) {
                        // since the specified offsets represent the next record to read, we
                        // subtract
                        // it by one so that the initial state of the consumer will be correct
                        // 如果 specificStartupOffsets 存在该分区,
                        // 就把 offset 设置成 specificOffset 减一,因为这个值代表需要读的下一条记录
                        subscribedPartitionsToStartOffsets.put(
                                seedPartition, specificOffset - 1);
                    } else {
                        // default to group offset behaviour if the user-provided specific
                        // offsets
                        // do not contain a value for this partition
                        // 如果不存在就用默认使用消费组的 offset
                        subscribedPartitionsToStartOffsets.put(
                                seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
                    }
                }
    
                break;
            case TIMESTAMP:
                // TIMESTAMP 模式,如果 startupOffsetsTimestamp 为空,就抛异常
                if (startupOffsetsTimestamp == null) {
                    throw new IllegalStateException(
                            "Startup mode for the consumer set to "
                                    + StartupMode.TIMESTAMP
                                    + ", but no startup timestamp was specified.");
                }
    
                // 如果不为空,则遍历分区,通过时间戳获取对应的 offset 信息,如果
                // 获取不到对应的 offset,则默认使用最新的 offset
                for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset :
                        fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp)
                                .entrySet()) {
                    subscribedPartitionsToStartOffsets.put(
                            partitionToOffset.getKey(),
                            (partitionToOffset.getValue() == null)
                                    // if an offset cannot be retrieved for a partition with the
                                    // given timestamp,
                                    // we default to using the latest offset for the partition
                                    ? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
                                    // since the specified offsets represent the next record to
                                    // read, we subtract
                                    // it by one so that the initial state of the consumer will
                                    // be correct
                                    : partitionToOffset.getValue() - 1);
                }
    
                break;
            default:
                for (KafkaTopicPartition seedPartition : allPartitions) {
                    subscribedPartitionsToStartOffsets.put(
                            seedPartition, startupMode.getStateSentinel());
                }
        }
    
        // 如果 subscribedPartitionsToStartOffsets 不为空,则根据本不同的启动模式,打印提示信息,
        // 其中 SPECIFIC_OFFSETS 模式下,有按照消费 offset 进行读取的会单独打印提示信息,
        // 如果为空就打印信息提示未找到分区的 offset 信息
        if (!subscribedPartitionsToStartOffsets.isEmpty()) {
            switch (startupMode) {
                case EARLIEST:
                    LOG.info(
                            "Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            subscribedPartitionsToStartOffsets.keySet());
                    break;
                case LATEST:
                    LOG.info(
                            "Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            subscribedPartitionsToStartOffsets.keySet());
                    break;
                case TIMESTAMP:
                    LOG.info(
                            "Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            startupOffsetsTimestamp,
                            subscribedPartitionsToStartOffsets.keySet());
                    break;
                case SPECIFIC_OFFSETS:
                    LOG.info(
                            "Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            specificStartupOffsets,
                            subscribedPartitionsToStartOffsets.keySet());
    
                    List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets =
                            new ArrayList<>(subscribedPartitionsToStartOffsets.size());
                    for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :
                            subscribedPartitionsToStartOffsets.entrySet()) {
                        if (subscribedPartition.getValue()
                                == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                            partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
                        }
                    }
    
                    if (partitionsDefaultedToGroupOffsets.size() > 0) {
                        LOG.warn(
                                "Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}"
                                        + "; their startup offsets will be defaulted to their committed group offsets in Kafka.",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                partitionsDefaultedToGroupOffsets.size(),
                                partitionsDefaultedToGroupOffsets);
                    }
                    break;
                case GROUP_OFFSETS:
                    LOG.info(
                            "Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            subscribedPartitionsToStartOffsets.keySet());
            }
        } else {
            LOG.info(
                    "Consumer subtask {} initially has no partitions to read from.",
                    getRuntimeContext().getIndexOfThisSubtask());
        }
    }
  6. 反序列化器初始化工作:

    this.deserializer.open(
            RuntimeContextInitializationContextAdapters.deserializationAdapter(
                    getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));

1.3 FlinkKafkaConsumerBase 的 run 方法

启动 Source,继承自接口 SourceFunction

  1. 判断 subscribedPartitionsToStartOffsets 是否为空,为空抛出异常

    if (subscribedPartitionsToStartOffsets == null) {
        throw new Exception("The partitions were not set for the consumer");
    }
  2. 初始化 offset 提交成功或者失败的计数器

    // initialize commit metrics and default offset callback method
    this.successfulCommits =
            this.getRuntimeContext()
                    .getMetricGroup()
                    .counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
    this.failedCommits =
            this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
  3. 获取 subtask 索引

    final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
  4. 初始化 offset 提交回调函数

    this.offsetCommitCallback =
            new KafkaCommitCallback() {
                @Override
                public void onSuccess() {
                    successfulCommits.inc();
                }
    
                @Override
                public void onException(Throwable cause) {
                    LOG.warn(
                            String.format(
                                    "Consumer subtask %d failed async Kafka commit.",
                                    subtaskIndex),
                            cause);
                    failedCommits.inc();
                }
            };
  5. 判断是否没有分配到分区消费,是的话就暂时进入闲置状态

    // mark the subtask as temporarily idle if there are no initial seed partitions;
    // once this subtask discovers some partitions and starts collecting records, the subtask's
    // status will automatically be triggered back to be active.
    if (subscribedPartitionsToStartOffsets.isEmpty()) {
        sourceContext.markAsTemporarilyIdle();
    }
  6. 初始化 kafkaFetcher,这是 flink 和 kafka 交互的核心

    this.kafkaFetcher =
            createFetcher(
                    sourceContext,
                    subscribedPartitionsToStartOffsets,
                    watermarkStrategy,
                    (StreamingRuntimeContext) getRuntimeContext(),
                    offsetCommitMode,
                    getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
                    useMetrics);

    确保 offset 模式在 DISABLED 和 ON_CHECKPOINTS 下,enable.auto.commit 属性为 false

    @Override
    protected AbstractFetcher<T, ?> createFetcher(
            SourceContext<T> sourceContext,
            Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
            SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
            StreamingRuntimeContext runtimeContext,
            OffsetCommitMode offsetCommitMode,
            MetricGroup consumerMetricGroup,
            boolean useMetrics)
            throws Exception {
    
        // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
        // this overwrites whatever setting the user configured in the properties
        // 确保 offset 模式在 DISABLED 和 ON_CHECKPOINTS 下,enable.auto.commit 属性为 false
        adjustAutoCommitConfig(properties, offsetCommitMode);
    
        return new KafkaFetcher<>(
                sourceContext,
                assignedPartitionsWithInitialOffsets,
                watermarkStrategy,
                runtimeContext.getProcessingTimeService(),
                runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
                runtimeContext.getUserCodeClassLoader(),
                runtimeContext.getTaskNameWithSubtasks(),
                deserializer,
                properties,
                pollTimeout,
                runtimeContext.getMetricGroup(),
                consumerMetricGroup,
                useMetrics);
    }

    KafkaFetcher 构造函数:

    public KafkaFetcher(
        SourceFunction.SourceContext<T> sourceContext,
        Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
        SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
        ProcessingTimeService processingTimeProvider,
        long autoWatermarkInterval,
        ClassLoader userCodeClassLoader,
        String taskNameWithSubtasks,
        KafkaDeserializationSchema<T> deserializer,
        Properties kafkaProperties,
        long pollTimeout,
        MetricGroup subtaskMetricGroup,
        MetricGroup consumerMetricGroup,
        boolean useMetrics)
        throws Exception {
    super(
            sourceContext,
            assignedPartitionsWithInitialOffsets,
            watermarkStrategy,
            processingTimeProvider,
            autoWatermarkInterval,
            userCodeClassLoader,
            consumerMetricGroup,
            useMetrics);
    
    // 反序列化器
    this.deserializer = deserializer;
    // 这个类在 Flink Kafka Consumer 中用于在运行 KafkaConsumer 类的线程和主线程之间切换数据和异常。
    // 该类行为实际上就像一个”大小是 1 的阻塞队列“,在异常报告、关闭和唤醒线程方面有一些额外的功能,而不中断线程。
    this.handover = new Handover();
    
    // KafkaConsumerThread 类运行 KafkaConsumer,连接 Broker 并拉取数据
    this.consumerThread =
            new KafkaConsumerThread(
                    LOG,
                    handover,
                    kafkaProperties,
                    unassignedPartitionsQueue,
                    getFetcherName() + " for " + taskNameWithSubtasks,
                    pollTimeout,
                    useMetrics,
                    consumerMetricGroup,
                    subtaskMetricGroup);
    this.kafkaCollector = new KafkaCollector();
    }
  7. 判断是否需要动态发现新分区,是则执行 runWithPartitionDiscovery,否则执行 runFetchLoop

    if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
        kafkaFetcher.runFetchLoop();
    } else {
        runWithPartitionDiscovery();
    }

    runWithPartitionDiscovery 本质上也是执行 runFetchLoop 方法,不过会启动一个线程,按照 discoveryIntervalMillis 这个参数,周期性的去执行发现新分区的操作。

    private void runWithPartitionDiscovery() throws Exception {
        final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();
        // 创建发现新分区的线程
        createAndStartDiscoveryLoop(discoveryLoopErrorRef);
    
        // 运行循环拉取数据任务
        kafkaFetcher.runFetchLoop();
    
        // make sure that the partition discoverer is waked up so that
        // the discoveryLoopThread exits
        // 确保分区发现器被唤醒
        partitionDiscoverer.wakeup();
        // 等待分区发现线程执行结束
        joinDiscoveryLoopThread();
    
        // rethrow any fetcher errors
        final Exception discoveryLoopError = discoveryLoopErrorRef.get();
        if (discoveryLoopError != null) {
            throw new RuntimeException(discoveryLoopError);
        }
    }

    runFetchLoop 方法循环拉取数据

    @Override
    public void runFetchLoop() throws Exception {
        try {
            // kick off the actual Kafka consumer
            // 启动 KafkaConsumerThread 线程
            consumerThread.start();
    
            while (running) {
                // this blocks until we get the next records
                // it automatically re-throws exceptions encountered in the consumer thread
                // 从 HandOver 中获取数据,在没有获取到下一个数据的时候,此方法会阻塞
                final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
    
                // get the records for each topic partition
                // 遍历所有分区
                for (KafkaTopicPartitionState<T, TopicPartition> partition :
                        subscribedPartitionStates()) {
    
                    // 获取该分区的记录
                    List<ConsumerRecord<byte[], byte[]>> partitionRecords =
                            records.records(partition.getKafkaPartitionHandle());
    
                    // 反序列化记录并放到 kafkaCollector 中,并发送数据
                    // 更新 offset,产生 timestamp 和 watermarks,如果收到流停止信息,就结束循环拉取数据
                    partitionConsumerRecordsHandler(partitionRecords, partition);
                }
            }
        } finally {
            // this signals the consumer thread that no more work is to be done
            consumerThread.shutdown();
        }
    
        // on a clean exit, wait for the runner thread
        // 等待 KafkaConsumerThread 线程结束
        try {
            consumerThread.join();
        } catch (InterruptedException e) {
            // may be the result of a wake-up interruption after an exception.
            // we ignore this here and only restore the interruption state
            Thread.currentThread().interrupt();
        }
    }

1.4 FlinkKafkaConsumerBase 的 initializeState 方法

继承自接口 CheckpointedFunctionCheckpointedFunction 是 Stateful Transformation Functions 的核心接口,用于跨 Stream 维护 State。initializeState 在parallel function 初始化的时候(第一次初始化或者从前一次 checkpoint recover 的时候)被调用,通常用来初始化 state,以及处理 state recovery 的逻辑。

@Override
public final void initializeState(FunctionInitializationContext context) throws Exception {

    OperatorStateStore stateStore = context.getOperatorStateStore();

    // 从 stateStore 里获取存储的 offset 信息
    this.unionOffsetStates =
            stateStore.getUnionListState(
                    new ListStateDescriptor<>(
                            OFFSETS_STATE_NAME,
                            createStateSerializer(getRuntimeContext().getExecutionConfig())));

    if (context.isRestored()) {
        restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());

        // populate actual holder for restored state
        // 遍历 unionOffsetStates 把 offset 信息放到 restoredState 中
        // 在 open 方法中需要从 restoredState 中恢复数据
        for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
            restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
        }

        LOG.info(
                "Consumer subtask {} restored state: {}.",
                getRuntimeContext().getIndexOfThisSubtask(),
                restoredState);
    } else {
        LOG.info(
                "Consumer subtask {} has no restore state.",
                getRuntimeContext().getIndexOfThisSubtask());
    }
}

1.5 FlinkKafkaConsumerBase 的 snapshotState 方法

继承自接口 CheckpointedFunction。snapshotState 在 checkpoint 的时候会被调用,用于 snapshot state,通常用于 flush, commit, synchronize 外部系统。

@Override
public final void snapshotState(FunctionSnapshotContext context) throws Exception {
    // 消费线程没有在运行状态,取消快照
    if (!running) {
        LOG.debug("snapshotState() called on closed source");
    } else {
        unionOffsetStates.clear();

        final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
        // fetcher 还没有初始化时,把 run 方法中初始化的 offset 信息放到 unionOffsetStates 中
        if (fetcher == null) {
            // the fetcher has not yet been initialized, which means we need to return the
            // originally restored offsets or the assigned partitions
            for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :
                    subscribedPartitionsToStartOffsets.entrySet()) {
                unionOffsetStates.add(
                        Tuple2.of(
                                subscribedPartition.getKey(), subscribedPartition.getValue()));
            }

            // 如果 offsetCommitMode 是 ON_CHECKPOINTS 类型,需要将待提交的 offset 信息提交到 pendingOffsetsToCommit
            // 然后等待 notifyCheckpointComplete 方法最终将 offset 提交到 Kafka
            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                // the map cannot be asynchronously updated, because only one checkpoint call
                // can happen
                // on this function at a time: either snapshotState() or
                // notifyCheckpointComplete()
                pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
            }
        } else {
            // 如果 fetcher 不为空,则获取当前分区和 offset 信息,并记录到 unionOffsetStates 中
            HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();

            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                // the map cannot be asynchronously updated, because only one checkpoint call
                // can happen
                // on this function at a time: either snapshotState() or
                // notifyCheckpointComplete()
                pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
            }

            for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :
                    currentOffsets.entrySet()) {
                unionOffsetStates.add(
                        Tuple2.of(
                                kafkaTopicPartitionLongEntry.getKey(),
                                kafkaTopicPartitionLongEntry.getValue()));
            }
        }

        if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
            // truncate the map of pending offsets to commit, to prevent infinite growth
            // 为防止内存泄露,移除最早的未提交的 Checkpoint 记录
            while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
                pendingOffsetsToCommit.remove(0);
            }
        }
    }
}

1.6 FlinkKafkaConsumerBase 的 notifyCheckpointComplete 方法

继承自接口 CheckpointListener。通知 Listener 具有给定 checkpointId 的 Checkpoint 已完成并已提交。

@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
    if (!running) {
        LOG.debug("notifyCheckpointComplete() called on closed source");
        return;
    }

    final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
    if (fetcher == null) {
        LOG.debug("notifyCheckpointComplete() called on uninitialized source");
        return;
    }

    if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
        // only one commit operation must be in progress
        if (LOG.isDebugEnabled()) {
            LOG.debug(
                    "Consumer subtask {} committing offsets to Kafka/ZooKeeper for checkpoint {}.",
                    getRuntimeContext().getIndexOfThisSubtask(),
                    checkpointId);
        }

        try {
            // 根据 checkpointId 找不到对应的 Checkpoint 位置,打印告警信息,返回
            final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
            if (posInMap == -1) {
                LOG.warn(
                        "Consumer subtask {} received confirmation for unknown checkpoint id {}",
                        getRuntimeContext().getIndexOfThisSubtask(),
                        checkpointId);
                return;
            }

            @SuppressWarnings("unchecked")
            Map<KafkaTopicPartition, Long> offsets =
                    (Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);

            // remove older checkpoints in map
            // 移除老的 checkpoints
            for (int i = 0; i < posInMap; i++) {
                pendingOffsetsToCommit.remove(0);
            }

            if (offsets == null || offsets.size() == 0) {
                LOG.debug(
                        "Consumer subtask {} has empty checkpoint state.",
                        getRuntimeContext().getIndexOfThisSubtask());
                return;
            }

            // 提交 Offset 信息到 Kafka
            fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
        } catch (Exception e) {
            if (running) {
                throw e;
            }
            // else ignore exception if we are no longer running
        }
    }
}

1.7 HandOver 类

Handover 是一个 Utility 类,用于将数据(记录的缓冲区)和异常从生产者线程切换到消费者线程。它的行为实际上就像一个“大小是一的阻塞队列”,在异常报告、关闭和唤醒线程方面有一些额外的功能,而不中断线程。这个类在 Flink Kafka Consumer 中用于在运行 KafkaConsumer 类的线程和主线程之间切换数据和异常。

  1. produce 方法

    从生产者手中移交元素。如果 Handover 已经有一个尚未被 consumer 线程拾取的元素,则此调用将阻塞,直到 consumer 拾取前一个元素。

    此行为类似于“大小是一”的阻塞队列。

    public void produce(final ConsumerRecords<byte[], byte[]> element)
            throws InterruptedException, WakeupException, ClosedException {
    
        checkNotNull(element);
    
        synchronized (lock) {
            while (next != null && !wakeupProducer) {
                // 元素不为空,则阻塞等待 consumer 处理
                lock.wait();
            }
    
            wakeupProducer = false;
    
            // if there is still an element, we must have been woken up
            // 如果元素依然不为空,说明线程被唤醒了
            if (next != null) {
                throw new WakeupException();
            }
            // if there is no error, then this is open and can accept this element
            // 没有错误,则接受该元素
            else if (error == null) {
                next = element;
                lock.notifyAll();
            }
            // an error marks this as closed for the producer
            else {
                throw new ClosedException();
            }
        }
    }
  2. pollNext 方法

    轮询 Handover 中的下一个元素,可能会阻塞,直到下一个元素可用。此方法的行为类似于从阻塞队列轮询。

    public ConsumerRecords<byte[], byte[]> pollNext() throws Exception {
        synchronized (lock) {
            // 元素为空且错误为空,则阻塞等待
            while (next == null && error == null) {
                lock.wait();
            }
    
            ConsumerRecords<byte[], byte[]> n = next;
            if (n != null) {
                // 元素置空并返回,同时唤醒等待的线程
                next = null;
                lock.notifyAll();
                return n;
            } else {
                ExceptionUtils.rethrowException(error, error.getMessage());
    
                // this statement cannot be reached since the above method always throws an
                // exception
                // this is only here to silence the compiler and any warnings
                return ConsumerRecords.empty();
            }
        }
    }
  3. reportError 方法

    报告异常。如果给定的异常当前在 pollNext() 方法中被阻止,或者下次调用该方法时,consumer 将立即抛出该异常。

    调用此方法后,produce(ConsumerRecords)pollNext() 方法将始终返回异常。

    public void reportError(Throwable t) {
        checkNotNull(t);
    
        synchronized (lock) {
            // do not override the initial exception
            // 配置异常,不覆盖原始异常
            if (error == null) {
                error = t;
            }
            next = null;
            lock.notifyAll();
        }
    }
  4. wakeupProducer 方法

    唤醒 producer 线程。

  5. close 方法

二、Flink Kafka Producer 源码

⚠️ **GitHub.com Fallback** ⚠️