1060. StateStore - dkkahm/study-kafka-with-spring GitHub Wiki

StoreValue

  • StateStore에 저장할 Data Type
@Data
@NoArgsConstructor
public class FeedbackRatingStoreValue {
    private long countRating;
    private long sumRating;
}

Transfomer

  • 생성자에서 StateStore 이름 전달받음
  • init()에서 ProcessContext 통해서 StateStore 가져오기
  • transform()시 StateStore에서 값을 가져와서 연산하고 다시 저장하기
    • 아직 저장되지 않은 경우 null
public class FeedbackTransformer implements ValueTransformer<FeedbackMessage, FeedbackRatingMessage> {
    private ProcessorContext processorContext;
    private final String stateStoreName;
    private KeyValueStore<String, FeedbackRatingStoreValue> ratingStateStore;

    public FeedbackTransformer(String stateStoreName) {
        if(StringUtils.isEmpty(stateStoreName)) {
            throw new IllegalArgumentException("State store name must not empty");
        }

        this.stateStoreName = stateStoreName;
    }

    @Override
    public void init(ProcessorContext processorContext) {
        this.processorContext = processorContext;
        this.ratingStateStore = (KeyValueStore<String, FeedbackRatingStoreValue>)this.processorContext.getStateStore(stateStoreName);
    }

    @Override
    public FeedbackRatingMessage transform(FeedbackMessage feedbackMessage) {
        var storeValue = Optional.ofNullable(ratingStateStore.get(feedbackMessage.getLocation()))
                .orElse(new FeedbackRatingStoreValue());

        // update new store
        var newSumRating = storeValue.getSumRating() + feedbackMessage.getRating();
        storeValue.setSumRating(newSumRating);
        var newCountRating = storeValue.getCountRating() + 1;
        storeValue.setCountRating(newCountRating);

        // put new store to state store
        ratingStateStore.put(feedbackMessage.getLocation(), storeValue);

        // build branch rating
        var branchRating = new FeedbackRatingMessage();
        branchRating.setLocation(feedbackMessage.getLocation());
        double averageRating = Math.round((double) newSumRating / newCountRating * 10d) / 10d;
        branchRating.setAverageRating(averageRating);

        return branchRating;
    }

    @Override
    public void close() {

    }
}

Stream

  • StateStore에 저장될 값의 Serde 구성
  • StoreSupplier 구성
  • StoreBuilder 구성해서 StreamsBuilder에 설정
  • transformValues 호출 시 StateStore 이름 전달
@Configuration
public class FeedbackRatingStream {

    @Bean
    public KStream<String, FeedbackMessage> kstreamFeedbackRating(StreamsBuilder builder) {
        var stringSerde = Serdes.String();
        var feedbackSerde = new JsonSerde<>(FeedbackMessage.class);
        var feedbackRatingSerde = new JsonSerde<>(FeedbackRatingMessage.class);
        var feedbackRatingStoreValueSerde = new JsonSerde<>(FeedbackRatingStoreValue.class);

        var feedbackStream = builder.stream("t.commodity.feedback", Consumed.with(stringSerde, feedbackSerde));

        var feedbackRatingStatStoreName = "feedbackRatingStatStore";
        var storeSupplier = Stores.inMemoryKeyValueStore(feedbackRatingStatStoreName);
        var storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, stringSerde, feedbackRatingStoreValueSerde);
        builder.addStateStore(storeBuilder);

        var feedbackRatingStream = feedbackStream.transformValues(() -> new FeedbackTransformer(feedbackRatingStatStoreName), feedbackRatingStatStoreName);
        feedbackRatingStream.print(Printed.<String, FeedbackRatingMessage>toSysOut().withLabel("FeedbackRating"));

        feedbackRatingStream.to("t.commodity.feedback.rating", Produced.with(stringSerde, feedbackRatingSerde));

        return feedbackStream;
    }
}