0100. JsonSerializer - dkkahm/study-kafka-with-spring GitHub Wiki

LocalDateTime

public interface DateConstant {

    String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
    DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern(DateConstant.DATE_TIME_FORMAT);

}
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;

public class LocalDateTimeSerializer extends StdSerializer<LocalDateTime> {

    private static final long serialVersionUID = 1L;

    public LocalDateTimeSerializer() {
        this(null);
    }

    public LocalDateTimeSerializer(Class<LocalDateTime> t) {
        super(t);
    }

    @Override
    public void serialize(LocalDateTime value, JsonGenerator gen, SerializerProvider arg2) throws IOException {
        gen.writeString(DateConstant.DATE_TIME_FORMATTER.format(value));
    }
}
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;

public class LocalDateTimeDeserializer extends StdDeserializer<LocalDateTime> {

    private static final long serialVersionUID = 1L;

    public LocalDateTimeDeserializer() {
        super(LocalDate.class);
    }

    @Override
    public LocalDateTime deserialize(JsonParser parser, DeserializationContext context) throws IOException {
        return LocalDateTime.parse(parser.readValueAs(String.class), DateConstant.DATE_TIME_FORMATTER);
    }
}

Producer

  • application.properties
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
  • messages
@Data
@NoArgsConstructor
public class PromotionMessage {
    private String promotionCode;

    public PromotionMessage(PromotionRequest request) {
        promotionCode = request.getPromotionCode();
    }
}
  • producer
@Service
@Slf4j
public class PromotionProducer {

    @Autowired
    private KafkaTemplate<String, PromotionMessage> kafkaTemplate;

    public void publish(PromotionMessage message) {
        kafkaTemplate.send("t.commodity.promotion", message);
    }
}

Consumer

  • application.properties
spring.kafka.consumer.group-id=pattern-group
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.broker.message
  • message
  • listener
@Service
@Slf4j
public class OrderListener {

    @KafkaListener(topics = "t.commodity.order")
    public void listen(OrderMessage message) {
        var totalItemAmount = message.getPrice() * message.getQuantity();
        log.info("Processing order {}, item {}, credit card number {}. Total amount for this item is {}",
                message.getOrderNumber(), message.getItemName(), message.getCreditCardNumber(), totalItemAmount);
    }
}

하나의 topic에서 여러 format의 json message를 처리하는 경우

@Slf4j
@Service
@KafkaListener(topics = "t.commodity.promotion")
public class PromotionListener {

    @KafkaHandler
    public void listenPromotion(PromotionMessage message) {
        log.info("Processing promotion : {}", message);
    }

    @KafkaHandler
    public void listenDiscount(DiscountMessage message) {
        log.info("Processing discount : {}", message);
    }

//    @KafkaHandler(isDefault = true)
//    public void listenDefault(Object object) {
//        log.info("listenDefault: object is {}", object.toString());
//    }
}