Transfer.Kafka - dotnet-shashlik/shashlik.eventbus GitHub Wiki

Kafka

NuGet:Shashlik.EventBus.Kafka

注册方式

方式一:最简配置(仅指定 server)

services.AddEventBus()
    .AddKafka("localhost:9092");

方式二:手动配置

services.AddEventBus()
    .AddKafka(r =>
    {
        r.AddOrUpdate("bootstrap.servers", "localhost:9092");
        r.AddOrUpdate("allow.auto.create.topics", "true");
    });

方式三:通过配置文件

services.AddEventBus()
    .AddKafka(configuration.GetSection("Kafka"));

配置文件示例:

Kafka:
  Properties:
    "bootstrap.servers": "localhost:9092"
    "allow.auto.create.topics": "true"
  TopicNumPartitions: 3
  TopicReplicationFactor: 1
  ConsumerPoolSize: 4

方式四:自定义 Kafka 配置字典

services.AddEventBus()
    .AddKafka(new Dictionary<string, string>
    {
        { "bootstrap.servers", "localhost:9092" }
    });

配置项 EventBusKafkaOptions

配置项 默认值 说明
Properties 见下方 Kafka 配置字典,详见 librdkafka 配置
TopicNumPartitions -1 创建新 topic 时的分区数量,-1 使用 broker 默认值
TopicReplicationFactor -1 创建新 topic 时的副本集数量,-1 使用 broker 默认值,需考虑 broker 数量
ConsumerPoolSize 4 每个 EventHandler 的消费者池大小
ConfigTopic null 自定义 topic 创建配置,优先级高于 TopicNumPartitionsTopicReplicationFactor

Properties 默认值

{
    { "bootstrap.servers", "localhost" },
    { "enable.auto.offset.store", "false" },  // 不要覆盖
    { "enable.auto.commit", "true" },          // 不要覆盖
    { "auto.offset.reset", "earliest" },       // 不要覆盖
    { "request.required.acks", "-1" }          // producer 阻塞确认
}

以下配置项请勿覆盖,否则会影响消息的接收确认机制:

  • enable.auto.offset.store
  • enable.auto.commit
  • auto.offset.reset

group.id 由框架自动设置为 EventHandler 名称,也请勿覆盖。

内部机制

  • 生产者使用 Producer 池,用完归还
  • 消费者按 ConsumerPoolSize 创建多个 Consumer
  • 消息处理成功后才 StoreOffset 提交偏移量,失败不提交,下次继续消费
  • 自动创建 topic(如 broker 允许)