1090. Windowing - dkkahm/study-kafka-with-spring GitHub Wiki
종류
- Tumbling Time Window
- 설정된 Tumbling 시간 단위로 짜르고, 이전 시간 단위가 끝나면 바로 이어서 다음 시간 단위 시작
- Hopping Time Window
- 설정된 Tumbling 시간 단위로 짜르고, Hopping 시간 단위로 Sliding(Hoping)
- ...
Tumbling Time Window
- Windowed Serde for Key
- groupBy()
- windowBy()
- Topic으로 보내려면
- toStream()
- Produced.with KeySerde에 windowSerde 사용
- Prined 사용하려면,
- toStream()
- print(Printed.toSysout())
@Bean
public KStream<String, InventoryMessage> kstreamInventory(StreamsBuilder builder) {
var stringSerde = Serdes.String();
var inventorySerde = new JsonSerde<>(InventoryMessage.class);
var longSerde = Serdes.Long();
var windowLength = Duration.ofMinutes(1);
var windowSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, windowLength.toMillis());
var inventoryTimestampExtractor = new InventoryTimestampExtractor();
var inventoryStream = builder.stream("t.commodity.inventory",
Consumed.with(stringSerde, inventorySerde, inventoryTimestampExtractor, null));
inventoryStream.print(Printed.<String, InventoryMessage>toSysOut().withLabel("Inventory"));
var inventoryTotalStream = inventoryStream
.mapValues((k, v) -> v.getType().equalsIgnoreCase("ADD") ? v.getQuantity() : -1 * v.getQuantity())
.groupByKey()
.windowedBy(TimeWindows.of(windowLength))
.reduce(Long::sum, Materialized.with(stringSerde, longSerde))
.toStream();
inventoryTotalStream.through("t.commodity.inventory-total", Produced.with(windowSerde, longSerde))
.print(Printed.toSysOut());
return inventoryStream;
}
Hopping Time Window
- TimeWindow.of().advancedBy(hopLength)
var windowLength = Duration.ofMinutes(1);
var hopLength = Duration.ofSeconds(20);
var windowSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, windowLength.toMillis());
.groupByKey()
.windowedBy(TimeWindows.of(windowLength).advanceBy(hopLength))
.reduce(Long::sum, Materialized.with(stringSerde, longSerde))