kafka - noonecare/opensourcebigdatatools GitHub Wiki

kafka

kafka 作为消息队列有以下特色:

  • 具有很大的缓存,可以作为一个临时保存数据的中转站
  • 有 partition, 消息不是递交给一个 consumer, 而是一个 consumer group 去消费。可以水平扩展。
  • 缓存的消息有备份

下面具体来说说如何使用 kafka:

  • 如何定位一个 kafka?
  • 使用与 kafka 关联的 zookeeper 的地址定位 Kafka。
  • 使用 High Level API 时,只需要指明 zookeeper, topic 即可消费 kafka topic 的消息。
  • 使用 Low Level API 时,需要指明 topic partition 所在的 brokerlist 的 leader broker, 才能读写这个 topic partition 的消息。brokerlist 中的 followder 会备份 leader broker 的数据,但是所有读写操作都是在 leader broker 上操作。另外,当 leader broker 因为一些原因,失败后。kafka 会从 follower broker 中选出一个作为新的 leader broker。
  • 数据中转: kafka 会把数据分发到不同的 topic 中,你可以选择收听哪些个 topic 的消息。
  • 如何选择收听多个 topic 的消息,有没有什么统配符?
  • 貌似在 kafka 中,没有匹配 topic 名的通配符(RabbitMQ 中有 # *)。
  • producer 如何把数据发送到指定的一些 topic? 收听消息需要指定 topic, 推送消息同样需要指定 topics, producer 指定了 topics , 然后消息就会被推送到指定的 topics 中。
  • 作为缓存,如何从指定的位置读取历史数据?
  • High Level API 最能从最新位置读取消息。Lowd Level API 可以设定从哪里读取消息。
  • 当收听消息的 consumer group 中有多个 consumer 中,消息是如何分配给多个 consumer 的,是简单的 round-robin? 还是根据什么指标做为key 分配?
  • 使用 High Level API 时,consumer group 读取 kafka 的 topic 时, 哪个 consumer 去读取哪个 parition 是动态分配的。我举个例子, 原来有 4 个 partion, 有两个 consumer, 那么这时候可能 consumer0 负责读取 partition0, partition1; consumer1 负责读取 partition2, partition3。后来又起了两个 consumer, 那么现在可能是 consumer0 负责读取 partition0, consumer1 负责读取 partition1, consumer2 负责读取 partition2, consumer3 负责读取 partition3。 kafka 会动态的分配 parition 的读取任务给 consumer,以尽量做到任务平均分配,充分利用多台机器的性能。正是这个特点使得kafka 是 Horizontally Scalable.
  • 使用 Low Level API 时, 你可以指定使用哪个 Consumer 去读取哪个 Partition 。
  • 为什么要有分区?
  • Kafka 可以保证,同一个分区中的消息会依照先后顺序被消费。
  • topic 的分区数,决定了,同一时间消费消息的 consumer 最多能有几个(一般 consumer 数会多于 topic 的分区数,所以有几个分区一般就有几个 consumer 在消费数据)。
  • consumer groupid 有什么用?
  • 标示 consumer group, 具有相同 groupid 的 consumer 属于同一 consumer group。

API

  • kafka 对于java 提供了 High Level 的API 和 Low Level 的 API。High Level API 使用简单,但是能做的操作很少; Low Level API 使用复杂,但是操作灵活。
  • High Level API, 比如:
import org.apache.spark.streaming.kafka._
// Create a map of topics to number of receiver threads to use
val topics = List(("pandas", 1), ("logs", 1)).toMap
val topicLines = KafkaUtils.createStream(ssc, zkQuorum, group, topics)
StreamingLogInput.processLines(topicLines.map(_._2))

只需要指明 zkQuorum(kafka 关联的 zookeeper 集群), group(groupId, 具有相同 groupId 的 consumer 会作为一个 consumer group 消费消息), topics( 指出感兴趣的 topic)。kafka 会自动帮你找到 topic partition 的 broker, 找出 leader broker, 读消息; 更新 leader broker 操作。这个 API 有个缺点就是只能从最新位置读取消息(使用 Low Level API 你就可以操作 offset,但是 High Level API 给你隐藏了这些细节)。

使用 Low Level API, 你就会用到更多的概念,做更多的操作。

producer, consumer, broker 等等他们之间沟通,都是通过发送 request 完成的。 需要自己更新 offset (比 High Level API 麻烦,也能实现更多的花样)。 需要自己找到 Leader Broker。需要自己检查 Leader Broker 有没有变化。 需要自己指明如何读取topic的各个 partition。 使用 low level API, 需要读 Kafka Protocol