1010. Project Setup and Simple Stream App - dkkahm/study-kafka-with-spring GitHub Wiki

dependencies

  • Apache Kafka
  • Apache Kafka Stream
  • com.fasterxml.jackson.core:jackson-databind

application.properties

  • Default serialization for producer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

Topology

  • <String, String> Topic -> Uppercase -> <String, String> Topic
@Configuration
public class PromotionUppercaseStream {

    @Bean
    public KStream<String, String> kstreamPromotionUppercase(StreamsBuilder builder) {
        KStream<String, String> sourceStream = builder.stream("t.commodity.promotion", Consumed.with(Serdes.String(), Serdes.String()));
        KStream<String, String> uppercaseStream = sourceStream.mapValues(s -> s.toUpperCase());
        uppercaseStream.to("t.commodity.promotion.uppercase");

        sourceStream.print(Printed.<String, String>toSysOut().withLabel("Original Stream"));
        uppercaseStream.print(Printed.<String, String>toSysOut().withLabel("Uppercase Stream"));

        return sourceStream;
    }
}

Kafka Stream Configuration

@Configuration
@EnableKafkaStreams
public class KafkaStreamConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamConfig() {
        var props = new HashMap<String, Object>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        return new KafkaStreamsConfiguration(props);
    }
}