1070. Aggregation Basic - dkkahm/study-kafka-with-spring GitHub Wiki

KGroupedStream 후 진행

  • groupByKey() 또는 group()

aggregate

  • 입력과 출력의 Type이 다른 경우
  • Initializer : Supplier
  • Aggregator : (aggKey, newValue, aggValue) -> aggregation
  • Materialized: Serde Spec for Result Table
@Configuration
public class InventoryStream {

    @Bean
    public KStream<String, InventoryMessage> kstreamInventory(StreamsBuilder builder) {
        var stringSerde = Serdes.String();
        var inventorySerde = new JsonSerde<>(InventoryMessage.class);
        var longSerde = Serdes.Long();

        var inventoryStream = builder.stream("t.commodity.inventory", Consumed.with(stringSerde, inventorySerde));
        inventoryStream.print(Printed.<String, InventoryMessage>toSysOut().withLabel("Inventory"));

        var inventoryTotalStream = inventoryStream
                .mapValues((k, v) -> v.getType().equalsIgnoreCase("ADD") ? v.getQuantity() : -1 * v.getQuantity())
                .groupByKey()
                .aggregate(() -> 0L,
                        (aggKey, newValue, aggValue) -> aggValue + newValue,
                        Materialized.with(stringSerde, longSerde)
                )
                .toStream();
        inventoryTotalStream.print(Printed.<String, Long>toSysOut().withLabel("InventoryTotal"));
        inventoryTotalStream.to("t.commodity.inventory-total");

        return inventoryStream;
    }
}

reduce

  • 입력과 출력의 Type이 같은 경우
    .groupByKey()
    .reduce(Long::sum, Materialized.with(stringSerde, longSerde))
    .toStream();