Kafka Producer Consumer 테스트 작성 가이드 - dpTablo/spring-boot-template-reactive GitHub Wiki

프로젝트의 Kafka Producer와 Consumer에 대한 테스트 환경에 대하여 설명합니다.

Kafka 테스트에는 아래 프레임워크의 의존성이 있습니다.

  • testcontainers
  • confluent 사의 cp-kafka

추가적으로 자체 구현한 @TestContainersKafkaTest 어노테이션 클래스를 이용하여 테스트를 수행합니다.

cp-kafka의 docker 이미지를 testcontainers 환경에서 구동하여 테스트를 수행합니다.

cp-kafka와 Apache Kafka 호환성

실제 런타임에서 사용되는 Apache Kafka 의 버전에 상응하는 cp-kafka의 버전을 사용하도록 환경을 구성합니다. TestContainersKafkaSettings 에 cp-kafka의 docker image 정보를 변경하여 테스트 환경에서 사용되는 cp-kafka의 버전을 지정할 수 있습니다.

두 프레임워크간의 버전 호환성에 관한 정보는 아래 문서를 참조합니다.

테스트 흐름

Kafka Producer_Consumer 테스트 작성 가이드 drawio

테스트 클래스에 @TestContainersKafkaTest 어노테이션을 선언하면 테스트 케이스가 동작할 때 KafkaTestSupportExtension 이 동작합니다. KafkaTestSupportExtension 는 Testcontainers 로 실제 테스트에 사용되는 kafka 서비스의 생성, 시작, 정지를 수행합니다. KafkaTestSupportExtension 에서는 실행된 kafka 테스트 서비스의 정보를 시스템 환경변수로 설정하여 spring kafka 초기화 시점에 반영됩니다.

테스트 케이스 메소드 내 코드를 실행하는 시점에서는 실행된 kafka 테스트 서비스를 사용하여 kafka producer와 consumer 를 테스트 합니다.

테스트 클래스의 모든 테스트 케이스가 종료되면 KafkaTestSupportExtension 이 서비스를 종료하고 docker container 를 삭제하며 테스트가 종료됩니다.

application-tc.xml

src/test/resources 경로에 application-tc.yml 설정파일이 있습니다. spring profile 명이 ‘tc’ 인 경우에는 testcontainers 를 이용한 테스트 환경에 필요한 설정을 지정합니다.

kafka 테스트 환경에서는 application-tc.yml 내에 사용하는 설정이 없습니다. 별도의 사용하고자 하는 옵션이 없는 경우에는 사용하지 않아도 무방합니다.

Producer 테스트 예제코드와 설명

아래 코드는 LoginUserTopic 의 producer 테스트 케이스의 예제 입니다. 해당 코드는 프로젝트에 포함되어 있습니다.

@TestContainersKafkaTest
@ActiveProfiles("tc")
@ContextConfiguration(classes = {
        KafkaProperties.class,
        KafkaConfiguration.class,
        LoginUserProducerConfiguration.class,
        LoginUserTopicConfiguration.class,
        LoginUserProducer.class
})
@EnableAutoConfiguration
class LoginUserProducerTest {
    @Qualifier("loginUserProducer")
    @Autowired
    private LoginUserProducer producer;

    @DisplayName("메세지 송신 성공")
    @Test
    void test() throws ExecutionException, InterruptedException {
        // given
        var loginUserTopic = LoginUserTopic.builder()
                .userId("user1")
                .name("사용자1")
                .loginTime(LocalDateTime.now())
                .build();

        // when
        var completableFuture = producer.sendMessage(loginUserTopic);
        var sendResult = completableFuture.get();

        // then
        assertThat(sendResult).isNotNull();

        var producerRecord = sendResult.getProducerRecord();
        assertThat(producerRecord).isNotNull();
        assertThat(producerRecord.topic()).isEqualTo(LoginUserTopic.TOPIC_NAME);

        var loginUserTopicInSendResult = producerRecord.value();
        assertThat(loginUserTopicInSendResult).isEqualTo(loginUserTopic);
    }
}

@TestContainersKafkaTest

kafka 테스트 환경을 구성하고 수행하기 위한 어노테이션 입니다. 전체 자동 구성이 비활성화 되고 Kafka 테스트와 관련된 구성만 적용됩니다.

@ActiveProfiles(”tc”)

profile을 ‘tc’ 로 지정하여 테스트를 수행합니다.

@ContextConfiguration

spring configuration 대상 클래스를 지정합니다. 기본적으로 아래 두개의 클래스는 대상에 포함됩니다.

  • KafkaProperties.class
  • KafkaConfiguration.class

producer 구성에 필요한 클래스도 지정되어야 합니다. 위 예제 코드에서는 아래 3개의 클래스가 추가되었습니다.

  • LoginUserProducerConfiguration : producer 구성
  • LoginUserTopicConfiguration : topic 구성
  • LoginUserProducer : producer 구현체 (@KafkaListener bean)

@EnableAutoConfiguration

@ContextConfiguration 에서 지정한 클래스들에 대한 구성을 자동화합니다.

Consumer 테스트 예제코드와 설명

@TestContainersKafkaTest
@ActiveProfiles("tc")
@ContextConfiguration(classes = {
        KafkaProperties.class,
        KafkaConfiguration.class,
        LoginUserConsumerConfiguration.class,
        LoginUserTopicConfiguration.class,
        LoginUserConsumer.class
})
@EnableAutoConfiguration
class LoginUserConsumerTest {
    @Qualifier("loginUserConsumer")
    @Autowired
    private LoginUserConsumer consumer;

    @Autowired
    private KafkaProperties kafkaProperties;

    private KafkaTemplate<String, LoginUserTopic> kafkaTemplate;

    @BeforeEach
    public void beforeEach() {
        Map<String, Object> configMap = new HashMap<>();
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        var producerFactory = new DefaultKafkaProducerFactory<String, LoginUserTopic>(configMap);
        kafkaTemplate = new KafkaTemplate<>(producerFactory);
    }

    @DisplayName("메세지 수신 성공")
    @Test
    void test() throws ExecutionException, InterruptedException {
        // given

        // when
        var completableFuture = kafkaTemplate.send(LoginUserTopic.TOPIC_NAME, LoginUserTopic.builder()
                .userId("user1")
                .name("사용자1")
                .loginTime(LocalDateTime.now())
                .build());
        var sendResult = completableFuture.get();

        var messageConsumed = consumer.getLatch().await(5, TimeUnit.SECONDS);

        // then
        assertThat(sendResult).isNotNull();
        assertThat(messageConsumed).isTrue();
        assertThat(consumer.getPayload());
    }
}

Producer 테스트 예제코드와 구성은 동일합니다. 다른점은 KafkaTemplate 을 직접 생성하여 메세지 전송을 보내는 부분을 구현했다는 점 입니다.