0040. Simple Consumer - dkkahm/study-kafka-with-spring GitHub Wiki

Entity

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Employee {

    @JsonProperty("employee_id")
    private String employeeId;

    private String name;

    @JsonProperty("birth_date")
    @JsonDeserialize(using = CustomLocalDateDeserializer.class)
    private LocalDate birthDate;
}

Consumer

  • just value
@Service
public class EmployeeJsonConsumer {

    private static final Logger log = LoggerFactory.getLogger(EmployeeJsonConsumer.class);

    private ObjectMapper objectMapper = new ObjectMapper();

    @KafkaListener(topics = "t_employee")
    public void consume(String message) throws JsonProcessingException {
        var emp = objectMapper.readValue(message, Employee.class);
        log.info("Employee is {}", emp);
    }
}
  • with ConsumerRecord (topic, partition, offset, key, value, ...)
@Service
public class EmployeeJsonConsumer {

    private static final Logger log = LoggerFactory.getLogger(EmployeeJsonConsumer.class);

    private ObjectMapper objectMapper = new ObjectMapper();

    @KafkaListener(topics = "t_employee")
    public void consume(ConsumerRecord<String, String> message) throws JsonProcessingException {
        var emp = objectMapper.readValue(message.value(), Employee.class);
        log.info("Employee is {}", emp);
    }
}
  • with Consumer and ConsumerRecord
@Service
public class EmployeeJsonConsumer {

    private static final Logger log = LoggerFactory.getLogger(EmployeeJsonConsumer.class);

    private ObjectMapper objectMapper = new ObjectMapper();

    @KafkaListener(topics = "t_employee")
    public void consume(Consumer<String, String> consumer, ConsumerRecord<String, String> message) throws JsonProcessingException {
        var emp = objectMapper.readValue(message.value(), Employee.class);
        log.info("Employee is {}", emp);
    }
}