0120. Async Request Reply - dkkahm/study-kafka-with-spring GitHub Wiki

Message

@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderReplyMessage {
    private String replyMessage;
}

Consumer

  • @SendTo and return message
@Slf4j
@Service
public class OrderListener2 {

    @KafkaListener(topics = "t.commodity.order")
    @SendTo("t.commodity.order-reply")
    public OrderReplyMessage listen(ConsumerRecord<String, OrderMessage> consumerRecord) {
        var headers = consumerRecord.headers();
        var orderMessage = consumerRecord.value();

        log.info("Processing order {}, item {}, credit card number {}",
                orderMessage.getOrderNumber(), orderMessage.getItemName(), orderMessage.getCreditCardNumber());
        log.info("Headers are : ");
        headers.forEach(h -> log.info("  key : {}, value : {}", h.key(), new String(h.value())));

        var bonusPercentage = Double.parseDouble(new String(headers.lastHeader("surpriseBonus").value()));
        var bonusAmount = (bonusPercentage / 100) * orderMessage.getPrice() * orderMessage.getQuantity();

        log.info("Surprise bonus is {}", bonusAmount);

        var replyMessage = new OrderReplyMessage();
        replyMessage.setReplyMessage("Order " + orderMessage.getOrderNumber()
                + " item " + orderMessage.getItemName() + " processed");

        return replyMessage;
    }
}