Kafka 설정 - 2-7-team/user-service GitHub Wiki

Producer 설정

@EnableKafka
@Configuration
public class KafkaProducerConfig {

	@Bean
	public ProducerFactory<String, ReservationCreateAlertEvent> producerFactory() {

                 @Value("${spring.kafka.bootstrap-servers}")
	             private String address;


		Map<String, Object> configProps = new HashMap<>();
		configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, address);  // kafka 실 서버 배포시 value 값으로 수정
		configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

		configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);   // 보내는 메시지 형식 여러개면 수정

		return new DefaultKafkaProducerFactory<>(configProps);
	}

	@Bean
	public KafkaTemplate<String, ReservationCreateAlertEvent> kafkaTemplate() {
		return new KafkaTemplate<>(producerFactory());
	}
}

producer build.gradle

  kafka:
    bootstrap-servers: localhost:9092 //추후 value로
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        spring.json.add.type.headers: false    (메시지 형식 여러개면 수정)
    template:
      default-topic: reservation.alerts   //기본값 topic ( 수정 가능)

Consumer 설정

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

	@Bean
	public ConsumerFactory<String, ReservationCreateAlertEvent> consumerFactory() {

                 @Value("${spring.kafka.bootstrap-servers}")
	             private String address;


		Map<String, Object> configProps = new HashMap<>();

		configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, address);  //실서버 배포 시 value로 수정
  
		configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

		configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

		configProps.put(JsonDeserializer.VALUE_DEFAULT_TYPE, ReservationCreateAlertEvent.class.getName());
		//기본 설정 ( 추후에 받는 메시지 클래스 많아지면 수정 ) 
		
		
		//configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "bookinghostpial.reservation_service.application.event");
		

		return new DefaultKafkaConsumerFactory<>(configProps);
	}


	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, ReservationCreateAlertEvent> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, ReservationCreateAlertEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());
		return factory;
	}
}

consumer build.gradle

  kafka:
    bootstrap-servers: localhost:9092   
    consumer:
      key-serializer: org.apache.kafka.common.serialization.StringDeSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonDeSerializer
    template:
      default-topic: reservation.alerts