branch
KStream<String, OrderPatternMessage>[] patternStream = maskedOrderStream.mapValues(CommodityStreamUtil::mapToOrderPattern)
.branch(CommodityStreamUtil.isPlastic(), (k, v) -> true);
int plasticIndex = 0;
int notPlasticIndex = 1;
patternStream[plasticIndex].to("t.commodity.pattern-two.plastic", Produced.with(stringSerde, orderPatternSerde));
patternStream[notPlasticIndex].to("t.commodity.pattern-two.notplastic", Produced.with(stringSerde, orderPatternSerde));
final var branchProducer = Produced.with(stringSerde, orderPatternSerde);
new KafkaStreamBrancher<String, OrderPatternMessage>()
.branch(CommodityStreamUtil.isPlastic(), kstream -> kstream.to("t.commodity.pattern-two.plastic", branchProducer))
.defaultBranch(kstream -> kstream.to("t.commodity.pattern-two.notplastic", branchProducer))
.onTopOf(maskedOrderStream.mapValues(CommodityStreamUtil::mapToOrderPattern));
map
KStream<String, OrderRewardMessage> rewardStream = maskedOrderStream.filter(CommodityStreamUtil.isLargeQuantity())
.filterNot(CommodityStreamUtil.isChip())
.map(CommodityStreamUtil.mapToOrderRewardChangeKey());
public static KeyValueMapper<String, OrderMessage, KeyValue<String, OrderRewardMessage>> mapToOrderRewardChangeKey() {
return (key, value) -> KeyValue.pair(value.getOrderLocation(), mapToOrderReward(value));
}
mapValues
flashMap
flatMapValues
selectKey
filter
public static Predicate<String, OrderMessage> isLargeQuantity() {
return (key, value) -> value.getQuantity() > 200;
}
to
forEach
peek
transformValues
- mapValues vs transformValues
- Transformer
- timestamp가 설정된 범위를 넘어갈 경우 null로 변환
public class FlashSaleVoteValueTransformer implements ValueTransformer<FlashSaleVoteMessage, FlashSaleVoteMessage> {
private final long voteStartTime;
private final long voteEndTime;
private ProcessorContext processorContext;
public FlashSaleVoteValueTransformer(LocalDateTime voteStart, LocalDateTime voteEnd) {
this.voteStartTime = LocalDateTimeUtil.toEpochTimestamp(voteStart);
this.voteEndTime = LocalDateTimeUtil.toEpochTimestamp(voteEnd);
}
@Override
public void init(ProcessorContext processorContext) {
this.processorContext = processorContext;
}
@Override
public FlashSaleVoteMessage transform(FlashSaleVoteMessage value) {
var recordTime = processorContext.timestamp();
return (recordTime >= voteStartTime && recordTime <= voteEndTime) ? value : null;
}
@Override
public void close() {
}
}
var flashSaleVoteStream = builder.stream("t.commodity.flashsale-vote",
Consumed.with(stringSerde, flashSaleVoteSerde))
.transformValues(() -> new FlashSaleVoteValueTransformer(voteStart, voteEnd))
.filter((key, value) -> value != null)