1080. Custom Timestamp Extractor - dkkahm/study-kafka-with-spring GitHub Wiki

Custom Timestamp Extractor 구현

public class InventoryTimestampExtractor implements TimestampExtractor {
    @Override
    public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
        var inventoryMessage = (InventoryMessage)record.value();
        return inventoryMessage != null ? LocalDateTimeUtil.toEpochTimestamp(inventoryMessage.getTransactionTime()) : record.timestamp();
    }
}

Stream build 시 사용 (Consumed.with)

        var inventoryTimestampExtractor = new InventoryTimestampExtractor();

        var inventoryStream = builder.stream("t.commodity.inventory",
                Consumed.with(stringSerde, inventorySerde, inventoryTimestampExtractor, null));