1110. Stream Stream Join - dkkahm/study-kafka-with-spring GitHub Wiki
Inner Join
- Serdes
- Key Serde
- Primary Value Serde
- Secondary Value Serde
- Join Value Serde
- Streams
- join
- Joiner : (Primary Value, Secondary Value) -> Join Value
- Window 제약
- Join Serde (StreamJoined) : Key, Primary, Secondary
@Configuration
public class OrderPaymentStream {
@Bean
public KStream<String, OnlineOrderMessage> kstreamOrderPayment(StreamsBuilder builder) {
var stringSerde = Serdes.String();
var orderSerde = new JsonSerde<>(OnlineOrderMessage.class);
var paymentSerde = new JsonSerde<>(OnlinePaymentMessage.class);
var orderPaymentSerde = new JsonSerde<>(OnlineOrderPaymentMessage.class);
var orderStream = builder.stream("t.commodity.online-order",
Consumed.with(stringSerde, orderSerde, new OnlineOrderTimestampExtractor(), null));
var paymentStream = builder.stream("t.commodity.online-payment",
Consumed.with(stringSerde, paymentSerde, new OnlinePaymentTimestampExtractor(), null));
orderStream.join(paymentStream, this::joinOrderPayment, JoinWindows.of(Duration.ofMinutes(1)),
StreamJoined.with(stringSerde, orderSerde, paymentSerde))
.to("t.commodity.join-order-payment", Produced.with(stringSerde, orderPaymentSerde));
return orderStream;
}
private OnlineOrderPaymentMessage joinOrderPayment(OnlineOrderMessage order, OnlinePaymentMessage payment) {
var result = new OnlineOrderPaymentMessage();
result.setOnlineOrderNumber(order.getOnlineOrderNumber());
result.setOrderDateTime(order.getOrderDateTime());
result.setTotalAmount(order.getTotalAmount());
result.setUsername(order.getUsername());
result.setPaymentDateTime(payment.getPaymentDateTime());
result.setPaymentMethod(payment.getPaymentMethod());
result.setPaymentNumber(payment.getPaymentNumber());
return result;
}
}
Left Join
- leftJoin
- Joiner 에서 Secondary Message가 null 이 될 수 있어 처리 필요함
Outer Join
- outerJoin
- Joiner 에서 Primary, Secondary Message 모두 null 될 수 있음