Consumer - Tuong-Nguyen/Angular-D3-Cometd GitHub Wiki

Consumer

Consumer Internals

// Connection information
Properties props = new Properties();
props.put(“bootstrap.servers”, “BROKER-1:9092, BROKER-2:9093”);
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);

// Create Consumer object
public class KafkaConsumerApp {
  public static void main(String[] args){
    Properties props = new Properties();
    props.put(“bootstrap.servers”, “BROKER-1:9092, BROKER-2:9093”);
    props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
    props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
    
    KafkaConsumer myConsumer = new KafkaConsumer(props);
  }
}

Subscribe to topics

Subscribe to a Topic

ArrayList<String> topics = new ArrayList<String>();
topics.add(“myTopic”);
topics.add(“myOtherTopic”);

myConsumer.subscribe(topics);

When a consumer subscribes to a topic, it can get messages from all partitions of that topics.
When a new partition is added for the topic, consumer can get messages from this partition too.
The order of messages in a partition is correct while order of messages from different partitions are unknown.

Assign to partitions

Assign to Partitions

Consumer can assign to partitions and get messages from these partitions. Consumer cannot get messages when a new partition is added for a topic.

// Set partition assignments:
TopicPartition partition0 = new TopicPartition(“myTopic”, 0);
ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
partitions.add(partition0);

myConsumer.assign(partitions);

Poll loop

try {
  while (true) {
    ConsumerRecords<String, String> records = myConsumer.poll(100);
  
    // Your processing logic goes here...
  }
}
finally {
    myConsumer.close();
}

Single threaded operator for continuously getting messages.

Offset and Uncommitted read

Offset - Uncommitted

The offset of the consumer is not updated after messages are read. Consumer must commit these read messages.
This can be done:

  • Automatically (enable.auto.commit = true): after a timeout (5 seconds).
  • Manually: commitSync() or commitAsync()

Consumer Group

Group ID is used by consumer to specify its group.

Consumer Group

Zookeeper will manage the group and assign each member to handle some partitions of the topic. When a member is down, its partitions will be re-assigned to alive members.

A single partition is handled by at most 1 consumer in a group. If there are more members in a group than number of partitions, some members become redundant and do not get any message.

⚠️ **GitHub.com Fallback** ⚠️