private ProducerRecord<String, OrderMessage> buildProducerRecord(OrderMessage message) {
int surpriseBonus = StringUtils.startsWithIgnoreCase(message.getOrderLocation(), "A") ? 25 : 15;
List<Header> headers = new ArrayList<>();
var surpriseBonusHeader = new RecordHeader("surpriseBonus", Integer.toString(surpriseBonus).getBytes());
headers.add(surpriseBonusHeader);
return new ProducerRecord<>("t.commodity.order", null, message.getOrderNumber(), message, headers);
}
public void publish(OrderMessage message) {
var producerRecord = buildProducerRecord(message);
kafkaTemplate.send(producerRecord)
....
}
@Slf4j
@Service
public class OrderListener {
@KafkaListener(topics = "t.commodity.order")
public void listen(ConsumerRecord<String, OrderMessage> consumerRecord) {
var headers = consumerRecord.headers();
var orderMessage = consumerRecord.value();
log.info("Processing order {}, item {}, credit card number {}",
orderMessage.getOrderNumber(), orderMessage.getItemName(), orderMessage.getCreditCardNumber());
log.info("Headers are : ");
headers.forEach(h -> log.info(" key : {}, value : {}", h.key(), new String(h.value())));
var bonusPercentage = Double.parseDouble(new String(headers.lastHeader("surpriseBonus").value()));
var bonusAmount = (bonusPercentage / 100) * orderMessage.getPrice() * orderMessage.getQuantity();
log.info("Surprise bonus is {}", bonusAmount);
}
}