LearningKafka学习笔记(五) - 18965050/learning-kafka GitHub Wiki


  • 消费者消息API
  • 基于Java的Kafka消费者
  • 消费分区消息

Consumer API

高阶API

高阶API使用在不需要处理消息offset的情况下 consumer属于某个consumer分组,consumer分组名称不能相同.

  • kafka.javaapi.consumer.ConsumerConnector producer类方法

  • kafka.consumer.KafkaStream: 由ConsumerConnector.createMessageStreams()方法返回, 可根据topic遍历其中的消息

  • kafka.consumer.ConsumerConfig, 包括如下配置:

    • zookeeper.connect: zk连接配置
    • group.id: 消费者组id
    • zookeeper.session.timeout.ms: zk会话连接超时
    • zookeeper.sync.time.ms: zk leader和follower之间同步超时
    • auto.commit.interval.ms: 自动提交时间间隔
  • 一个线程对应于一个topic分区. 因此注意创建的线程数和topic分区数一致

低阶API

低阶API是无状态的,并提供了broker和consumer更细粒度的通信控制 使用流程

  • 从存活的broker中找到leader broker

  • 从leader broker中找到日志中数据的开始offset和结束offset,

  • kafka.javaapi.consumer.SimpleConsumer producer类方法

  • kafka.api.FetchRequest

  • kafka.javaapi.OffsetRequest

  • kafka.javaapi.OffsetFetchRequest

  • kafka.javaapi.OffsetCommitRequest

  • kafka.javaapi.TopicMetadataRequest

Consumer属性列表

consumer属性列表-1 consumer属性列表-2