KafkaListener - Neethahiremath/Wiki GitHub Wiki

refer links:

https://www.baeldung.com/spring-kafka

https://docs.spring.io/spring-kafka/reference/html/

create a class to listen to topic

@Component
@Getter
@Setter
@Slf4j
public class Consumer {

  @Autowired
  public Consumer() {
  }

  @PostConstruct
  public void init() {
   // if any on load required
  }

  @KafkaListener(
      topics = {"${kafka.topic}"},
      groupId = "${kafka.group-id}")
  void eventListener(ConsumerRecord<String, String> record) {
    try {
  // convert the record to suitable pojo and process it 
 // gson.fromJson(record, Entity.class); 
    } catch (Exception exception) {
      log.error(
          "An Exception occurred while processing an event from topic:{}, partition:{}, Offset:{} value:{} exception : {}",
          record.topic(),
          record.partition(),
          record.offset(),
          record.value(),
          exception);
    }
  }

configure the kafka listner and consumer, config is ConfigurationProperties for kafka parameters from yml

 @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
      kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(config.getConcurrency());
    factory.getContainerProperties().setPollTimeout(config.getMaxPollInterval());
    return factory;
  }

 @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> value = new HashMap<>();
    value.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
    value.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
    value.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    value.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    value.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.getAutoOffsetReset());
    value.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
    value.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    value.put(
        ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, config.getAutoCommitIntervalMs());
    value.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.getSessionTimeoutMs());
    value.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
    value.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, config.getMaxPollInterval());

		/*
		 * value.put( ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
		 * "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor");
		 */

    return new DefaultKafkaConsumerFactory<>(value);
  }

ex yml kafka config:

kafka: 
  auto-commit-interval-ms: 500
  auto-offset-reset: earliest
  bootstrap-servers: 127.0.0.1:9092
  enable-auto-commit: true
  session-timeout-ms: 60000
  output-topic: outputTopic
  concurrency: 1
  max-poll-records: 100
  max-poll-interval: 3000000
  acks: all
  retries: 0
  linger-ms: 1
  batch-size: 16384
  buffer-memory: 33554432