1190. Avro with Kafka Stream - dkkahm/study-kafka-with-spring GitHub Wiki

pom.xml

  • Avro with Spring Project Setup
  • kafka-streams
  • kafka-streams-avro-serde
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-streams</artifactId>
		</dependency>
		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>kafka-streams-avro-serde</artifactId>
			<version>7.0.1</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.avro</groupId>
					<artifactId>avro</artifactId>
				</exclusion>
			</exclusions>
		</dependency>

Kafka Stream Config

  • StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG as SpecificAvroSerde.class.getName()
  • AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG
@Configuration
@EnableKafkaStreams
public class KafkaStreamConfig {
    public static final String SCHEMA_REGISTRY_URL = "http://localhost:8085";

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamConfig() {
        var props = new HashMap<String, Object>();

        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "sc-kafka-stream");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "3000");

        return new KafkaStreamsConfiguration(props);
    }
}

Stream

  • Input Serde
    • SpecificAvroSerde with SchemaRegistry Config
@Configuration
public class HelloStream {

    @Bean
    public KStream<String, HelloPositiveUppercase> kstreamHello(StreamsBuilder builder) {
        var helloSerde = new SpecificAvroSerde<Hello>();
        var helloSerdeConfig = Collections.singletonMap("schema.registry.url", KafkaStreamConfig.SCHEMA_REGISTRY_URL);
        helloSerde.configure(helloSerdeConfig, false);

        var helloPositiveUppercaseStream = builder.stream("sc-hello", Consumed.with(Serdes.String(), helloSerde))
                .mapValues(this::mapHello);

        helloPositiveUppercaseStream.print(Printed.<String, HelloPositiveUppercase>toSysOut().withLabel("Hello Positive Uppercase"));

        return helloPositiveUppercaseStream;
    }

    private HelloPositiveUppercase mapHello(Hello original) {
        var result = new HelloPositiveUppercase();
        result.setPositiveInt(Math.abs(original.getMyIntField()));
        result.setUppercaseString(original.getMyStringField().toString().toUpperCase());
        return result;
    }
}
⚠️ **GitHub.com Fallback** ⚠️