Kafka idempotency and transactionality - tenji/ks GitHub Wiki

Kafka 幂等性和事务性

消息传输保障

  • at most once
    每条消息传输零次或者一次,即消息可能会丢失。
  • at least once
    每条消息会进行多次传输尝试,至少一次成功,即消息传输可能重复但不会丢失。
  • exactly once
    每条消息有且只有一次,即消息传输既不会丢失也不会重复。

而 Kafka 其实有两次消息传递,一次生产者发送消息给 Kafka,一次消费者去 Kafka 消费消息,两次传递都会影响最终结果。两次都是精确一次,最终结果才是精确一次。两次中有一次会丢失消息,或者有一次会重复,那么最终的结果就是可能丢失或者重复的。

Producer 端消息传递

acks参数

acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。该参数有如下选项。

  • acks=0

生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。

  • acks=1

只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用 Future 对象的 get() 方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)。

  • acks=all

只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。不过,它的延迟比acks=1时更高,因为我们要等待不只一个服务器节点接收消息。

max.in.flight.requests.per.connection参数

该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。

Consumer 端消息传递

At-most-once Kafka Consumer (Zero or more deliveries)

最多一次消费语义是 Kafka 消费者的默认实现,配置这种消费者最简单的方式是:

  1. enable.auto.commit设置为true
  2. auto.commit.interval.ms设置为一个较低的时间范围。
  3. consumer.commitSync()不要调用该方法。

由于上面的配置,就可以使得 Kafka 有线程负责按照指定间隔提交 Offset,但是这种方式会使得kafka消费者有两种消费语义:

At-most-once,消费者的 Offset 已经提交,但是消息还在处理,这个时候挂了,再重启的时候会从上次提交的 Offset 处消费,导致上次在处理的消息部分丢失。

At-least-once,消费者已经处理完了,但是 Offset 还没提交,那么这个时候消费者挂了,就会导致消费者重复消费消息处理。但是由于auto.commit.interval.ms设置为一个较低的时间范围,会降低这种情况出现的概率。

也就是在这种配置下,大概率会满足at most once,极小概率出现at least once

At-Least-Once Kafka Consumer (One or more message deliveries, duplicate possible)

实现最少一次消费语义的消费者也很简单。

  1. 设置enable.auto.commitfalse
  2. 消息处理完之后手动调用consumer.commitSync()

这种方式就是要手动在处理完该次 poll 得到消息之后,调用 Offset 异步提交函数consumer.commitSync()。建议是消费者内部实现幂等(),来避免消费者重复处理消息进而得到重复结果。at most once发生的场景是消费者的消息处理完并输出到结果库(也可能是部分处理完),但是 Offset 还没提交,这个时候消费者挂掉了,再重启的时候会重新消费并处理消息。

Exactly-once-Kafka Consumer via subscribe (one and only one message delivery)

使用subscribe实现exactly once很简单,具体思路如下:

  1. enable.auto.commit设置为false
  2. 不调用consumer.commitSync()
  3. 使用subcribe订阅 topic。
  4. 实现一个ConsumerRebalanceListener,在该 listener 内部执行 consumer.seek(topicPartition ,offset),从指定的 topic/partition 的 offset 处启动。
  5. 在处理消息的时候,要保存住每条消息的 offset。以原子事务的方式保存 offset 和处理的消息结果。传统数据库实现原子事务比较简单。但对于非传统数据库,比如 hdfs 或者 nosql,为了实现这个目标,只能将 offset 与消息保存在一起。
  6. 实现幂等,作为保护层。

代码实现如下:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;
public class ExactlyOnceDynamicConsumer {

    private static OffsetManager offsetManager = new OffsetManager("storage2");

    public static void main(String[] str) throws InterruptedException {

        System.out.println("Starting ManualOffsetGuaranteedExactlyOnceReadingDynamicallyBalancedPartitionConsumer ...");

        readMessages();

    }

    /**
     */
    private static void readMessages() throws InterruptedException {

        KafkaConsumer<String, String> consumer = createConsumer();

        // Manually controlling offset but register consumer to topics to get dynamically assigned partitions.
        // Inside MyConsumerRebalancerListener use consumer.seek(topicPartition,offset) to control offset

        consumer.subscribe(Arrays.asList("normal-topic"), new MyConsumerRebalancerListener(consumer));

        processRecords(consumer);
    }

    private static KafkaConsumer<String, String> createConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        String consumeGroup = "cg3";

        props.put("group.id", consumeGroup);

        // Below is a key setting to turn off the auto commit.
        props.put("enable.auto.commit", "false");
        props.put("heartbeat.interval.ms", "2000");
        props.put("session.timeout.ms", "6001");

        // Control maximum data on each poll, make sure this value is bigger than the maximum single record size
        props.put("max.partition.fetch.bytes", "40");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer<String, String>(props);
    }

    private static void processRecords(KafkaConsumer<String, String> consumer) {

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {

                System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
                offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());

            }
        }
    }


}

public class MyConsumerRebalancerListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {

    private OffsetManager offsetManager = new OffsetManager("storage2");
    private Consumer<String, String> consumer;

    public MyConsumerRebalancerListener(Consumer<String, String> consumer) {
        this.consumer = consumer;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

        for (TopicPartition partition : partitions) {

            offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {


        for (TopicPartition partition : partitions) {
            consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));
        }
    }


}

/**
 * The partition offset are stored in an external storage. In this case in a file system.
 * <p/>
 */
public class OffsetManager {


    private String storagePrefix;

    public OffsetManager(String storagePrefix) {
        this.storagePrefix = storagePrefix;
    }

    /**
     * Overwrite the offset for the topic in an external storage.
     *
     * @param topic     - Topic name.
     * @param partition - Partition of the topic.
     * @param offset    - offset to be stored.
     */
    void saveOffsetInExternalStore(String topic, int partition, long offset) {

        try {

            FileWriter writer = new FileWriter(storageName(topic, partition), false);

            BufferedWriter bufferedWriter = new BufferedWriter(writer);
            bufferedWriter.write(offset + "");
            bufferedWriter.flush();
            bufferedWriter.close();

        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    /**
     * @return he last offset + 1 for the provided topic and partition.
     */
    long readOffsetFromExternalStore(String topic, int partition) {

        try {

            Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));

            return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;

        } catch (Exception e) {
            e.printStackTrace();
        }

        return 0;
    }

    private String storageName(String topic, int partition) {
        return storagePrefix + "-" + topic + "-" + partition;
    }

}

Exactly-once-Kafka Consumer via assign

...

幂等性和事务性的原理

参考链接

⚠️ **GitHub.com Fallback** ⚠️