Kafka Consumer 재시도 정책 - KimGyuBek/Threadly GitHub Wiki

개요

알림 서비스(notification-service)는 Spring Cloud Stream을 이용해 Kafka 메시지를 소비하고,
알림 생성 및 메일 발송용 유스케이스를 실행한다.

Consumer 단계에서 일시적인 장애가 발생하더라도 알림을 최대한 복구하고,
반복되는 실패는 모니터링 대상으로 분리하기 위해 Spring Cloud Stream의 재시도 + 에러 채널을 사용했다.


Kafka 수신 재시도의 의미

Kafka 기반 수신 흐름을 단계별로 쪼개면 다음과 같다.

  1. 프로듀서가 토픽으로 메시지 발행
  2. 브로커에 메시지가 저장
  3. 컨슈머 컨테이너가 poll()로 메시지를 가져옴
  4. Spring Cloud Stream이 가져온 레코드를 바인딩에 등록된 컨슈머 함수로 전달
  5. 컨슈머 함수가 예외 없이 종료되면 → 처리 성공 → 오프셋 커밋
  6. 컨슈머 함수에서 예외가 던져지면 → 처리 실패로 인식

수신 재시도는 이 흐름 중에서

  • “메시지가 브로커까지는 도달했고, 컨슈머가 이미 가져온 상태에서”
  • 컨슈머 내부 처리(도메인 유스케이스 호출 등)가 실패하는 경우에 대해
  • 동일 메시지에 대한 처리를 다시 시도하는 것을 의미한다.

즉, “메시지가 Kafka에 아예 도달하지 못한 경우”는 컨슈머 재시도로는 복구할 수 없다. 그 구간은 프로듀서/브로커 레벨의 문제다.


기존 구조의 한계

초기에는 Spring Cloud Stream 컨슈머에서 메시지를 한 번만 소비하고, 유스케이스를 호출하는 단순 구조였다.

이 구조에서는 다음과 같은 문제가 있었다.

  • DB 장애, 네트워크 타임아웃 등 시간이 지나면 회복될 수 있는 오류에 대해서도 한 번만 시도하고 끝난다.
  • 어떤 메시지가 몇 번 재시도되었는지, 최종 실패가 얼마나 발생했는지 메트릭으로 알 수 없다.
  • 특정 메시지가 계속 실패하는 경우(잘못된 데이터 등) 별도 분리 없이 로그에만 남고, 장애 징후를 수치로 파악하기 어렵다.

구성 및 동작 과정

spring:
  cloud:
    function:
      definition: notification; mail;
    stream:
      bindings:
        notification-in-0:
          destination: notification
          group: notification-consumer
          content-type: application/json
          error-handler-definition: notificationErrorHandler
          consumer:
            use-native-decoding: true
            max-attempts: 3
            back-off-initial-interval: 1000
            back-off-max-interval: 10000
            back-off-multiplier: 2.0
            concurrency: 3

Spring Cloud Stream은 이 설정을 기반으로

  1. notification 토픽에서 메시지를 읽고
  2. 컨슈머 컨테이너가 이를 notification-in-0 채널로 전달한 뒤
  3. notification 함수(notificationEventConsumer)를 호출해 실제 처리를 수행한다.

1. 재시도 옵션

  • max-attempts: 3
    • 최대 3번까지 시도
  • back-off-initial-interval: 1000
    • 첫 재시도까지 1000ms(1초) 대기
  • back-off-max-interval: 10000
    • 백오프 대기 시간이 최대 10초를 넘지 않도록 제한
  • back-off-multiplier: 2.0
    • 지수 백오프(Exponential Backoff) 설정.
    • 재시도 시마다 대기 시간을 2배씩 늘려서, 장애 구간에서 시스템을 과도하게 요청하지 않도록 제한
  • concurrency: 3
    • 동일 바인딩에 대해 최대 3개의 쓰레드가 병렬로 메시지를 처리한다.

2. 에러 핸들러 등록

  • error-handler-definition: notificationErrorHandler
    • 모든 재시도 이후에도 실패한 메시지를 처리할 에러 핸들러 빈 이름을 지정한다.
    • Threadly에서는 이 에러 핸들러에서 최종 실패 로그를 남기고, Counter를 증가시켜 Grafana에서 확인할 수 있도록 했다.

재시도 범위

Consumer 함수 내부에서 동기적으로 수행되는 모든 로직이 재시도 범위 대상이다.

Spring Cloud Stream은 해당 바인딩에 연결된 컨슈머 함수 호출 전체를 하나의 처리 단위로 보고, 그 안에서 **외부로 던져지는 예외를 기준으로 재시도 여부를 결정 **한다.

Resilience4j와 마찬가지로 Spring Cloud Stream재시도를 인지하려면, Consumer 함수 내부에서 예외를 캐치한뒤 삼키지 않고 다시 던져야한다.


GlobalExceptionHandler와의 관계

  • Kafka 컨슈머에서 던져진 예외는 GlobalExceptionHandler로 가지 않는다.
  • GlobalExceptionHandlerDispatcherServlet 안에서 처리되는 HTTP 요청에만 적용된다.
  • Kafka 리스너에서 발생한 예외는 DispatcherServlet 흐름 밖에서 돌아가기 때문에,
    • Spring MVC의 예외 처리(@ControllerAdvice 등)는 타지 않고
    • Spring Cloud Stream이 해당 바인딩 설정에 따라 재시도 및 에러 채널로의 전달을 담당한다.

즉, “도메인 예외”라고 해서 무조건 GlobalExceptionHandler로 가는 것이 아니고, 어느 실행 경로(HTTP vs 메시지 컨슈머)에서 발생했는지가 더 중요하다.


예외 처리 흐름 요약

  1. Kafka 리스너 컨테이너가 메시지를 가져온다.
  2. 컨테이너가 해당 바인딩에 등록된 Consumer 함수를 호출한다.
  3. Consumer 내부에서 예외 발생
  4. Consumer 내부에서 예외를 try-catch로 잡고 끝내면, 컨테이너 입장에서는 “성공 처리”되어, 재시도는 발생하지 않는다.
  5. Spring Cloud Stream에서 재시도 처리를 하려면, 예외를 내부에서 삼키지 않고 Consumer 함수 밖으로 던져야 한다.
  6. 컨테이너가 바인딩에 설정된 max-attempts, back-off-* 값을 보고 재시도 여부와, 재시도 포기(최종 실패) 시점을 판단한다.

재시도를 고려한 설계 필요

Consumer 함수 내부에서 호출되는 모든 메서드는 “여기서 예외가 던져지면 재시도 대상이 된다” 는 점을 의도적으로 고려해야 한다.

  • 재시도 가능한 예외
    • 예: 일시적인 DB/네트워크 오류, 외부 시스템 타임아웃 등
    • 예외를 던져서 Spring Cloud Stream이 재시도하도록 둔다.
  • 재시도로 해결되지 않는 예외
    • 한 번 실패하면 재시도를 해도 결과가 바뀌지 않는 영구적인 비즈니스 상태이므로 재시도 대상이 되면 안 된다.
    • 예: 존재하지 않는 게시글, 이미 삭제된 리소스, 잘못된 이메일 주소 등

따라서 컨슈머 내부에서는 예외 처리 전략을 재시도 정책 관점에서 설계해야 한다.


모니터링 환경 구축

Resilience4j와 달리, Spring Cloud Stream은 “재시도 라이브러리”가 아니라 메시지 바인딩/컨슈머 인프라를 제공하는 라이브러리다.

  • Resilience4j는 메트릭을 자동으로 노출해 주지만,
  • Spring Cloud Stream의 재시도는 별도로 메트릭을 직접 카운트해야 한다.

알림 서비스에서는 MicrometerCounter를 이용해 다음 항목들을 메트릭으로 관리한다.

  • 재시도 중인 처리 건수
  • 정상 처리된 메시지 수
  • 모든 재시도 이후에도 실패한 최종 실패 수

Consumer 측 메트릭 수집

public class MailConsumer {

  private final Counter retryAttemptCounter;
  private final Counter consumeSuccessCounter;

  public MailConsumer(SendMailUseCase sendMailUseCase, MeterRegistry meterRegistry) {
    this.sendMailUseCase = sendMailUseCase;
    this.retryAttemptCounter = Counter.builder("mail_consumer_retry_attempt")
        .tag("binding", "mail-in-0")
        .description("MailConsumer 재시도 횟수")
        .register(meterRegistry);
    this.consumeSuccessCounter = Counter.builder("mail_consumer_success")
        .tag("binding", "mail-in-0")
        .description("정상 처리된 메일 수")
        .register(meterRegistry);
  }
  // ...
}

컨슈머 함수 내부에서는 재시도가 활성화 된 경우, DELIVERY_ATTEMPT 헤더를 통해 재시도 여부를 판별한다.

AtomicInteger attempt = message.getHeaders()
    .get(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, AtomicInteger.class);
if(attempt !=null&&attempt.

get() >1){retryAttemptCounter.

increment(); }

이를 통해 “재시도 중인 처리”를 별도로 카운트하고, 얼마나 많은 메시지가 재시도 구간에 머무는지 Grafana에서 확인할 수 있다.


ErrorHandler 측 메트릭 수집

모든 재시도 이후에도 실패한 메시지는 에러 핸들러에서 최종 실패처리한다.

Counter finalFailureCounter = Counter.builder("notification_consumer_final_failure")
    .tag("binding", "notification-in-0")
    .description("모든 재시도 이후에도 실패한 알림 수")
    .register(meterRegistry);

에러 핸들러에서는

  1. finalFailureCounter.increment()로 메트릭 증가
  2. KafkaConsumerLogUtils.logFinalFailure(errorMessage) 등을 통해 최종 실패 예외 정보를 로그로 남긴다.

동작 검증

예외를 임의로 발생시키는 테스트를 실행하고, k6를 활용해 부하를 줘서 재시도/최종 실패/에러 핸들러 메트릭이 기대대로 증가하는지 검증했다.

로그

WARN KafkaConsumerLogUtils - [Mail] 처리 실패(재시도 예정): eventId=V_Y_6PwUTWCMmwUB
WARN KafkaConsumerLogUtils - [Mail] 재시도 처리 중(2): eventId=V_Y_6PwUTWCMmwUB
WARN KafkaConsumerLogUtils - [Mail] 처리 실패(재시도 예정): eventId=V_Y_6PwUTWCMmwUB
WARN KafkaConsumerLogUtils - [Mail] 재시도 처리 중(3): eventId=V_Y_6PwUTWCMmwUB
WARN KafkaConsumerLogUtils - [Mail] 처리 실패(재시도 예정): eventId=V_Y_6PwUTWCMmwUB
ERROR KafkaConsumerLogUtils - 최종 실패: topic=mail, partition=1, offset=0, [email protected], payload=MailEvent[eventId=V_Y_6PwUTWCMmwUB, mailType=VERIFICATION, [email protected], model={userName=test_user, verificationUrl=http://localhost:8080/api/auth/verify-email?code=c4a421}]

모니터링


관련 문서

⚠️ **GitHub.com Fallback** ⚠️