기본 동작 (DefaultErrorHandler)
- FixedBackOff
- interval 0
- retry 10
- 재시도 소진 시 오류 출력 후 다음 메시지 처리 계속 진행
KafkaListener에 error handler 설정
@Service(value = "myFoodOrderErrorHandler")
public class FoodOrderErrorHandler implements ConsumerAwareListenerErrorHandler {
private static final Logger log = LoggerFactory.getLogger(FoodOrderErrorHandler.class);
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
log.warn("Food order error. {} because {}", message.getPayload(), exception.getMessage());
return null;
}
}
- @KafkaListener(topics = "t_food_order", errorHandler = "myFoodOrderErrorHandler")
- @KafkaListener에 errorHandler 설정이 있는 경우는 @KafkaListener에 설정된 errorHandler가 호출됨 (아래 섹션 참조)
- Global Error Handler
public class GlobalErrorHandler implements ConsumerAwareErrorHandler {
private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);
@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?, ?> consumer) {
log.warn("Global error handler for message {}", consumerRecord.value().toString());
}
}
- kafkaListenerContainerFactory Bean 재정의
@Configuration
public class KafkaConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
var properties = kafkaProperties.buildConsumerProperties();
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "120000");
return new DefaultKafkaConsumerFactory<Object, Object>(properties);
}
@Bean(value = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer
) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());
factory.setErrorHandler(new GlobalErrorHandler());
return factory;
}
}
- @KafkaListener에 설정된 errorHandler 처리 후 Global Error Handler도 처리하고자 한다면 re-throw해야 함
@Service(value = "myFoodOrderErrorHandler")
public class FoodOrderErrorHandler implements ConsumerAwareListenerErrorHandler {
private static final Logger log = LoggerFactory.getLogger(FoodOrderErrorHandler.class);
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
log.warn("Food order error. {} because {}", message.getPayload(), exception.getMessage());
if (exception.getCause() instanceof RuntimeException) {
throw exception;
}
return null;
}
}
private RetryTemplate createRetryTemplate() {
var retryTemplate = new RetryTemplate();
var retryPolicy = new SimpleRetryPolicy(3);
retryTemplate.setRetryPolicy(retryPolicy);
var backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(10_000);
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
@Bean(value = "foodOrderRetryContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> foodOrderRetryContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer
) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());
factory.setErrorHandler(new GlobalErrorHandler());
factory.setRetryTemplate(createRetryTemplate());
return factory;
}
- @KafkaListener(topics = "t_food_order", containerFactory = "foodOrderRetryContainerFactory")
- Kafka Consumer Error Handling, Retry, and Recovery
- ContainerFactory Bean 정의
@Bean(value = "foodOrderDeadLetterRecoveryContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> foodOrderDeadLetterRecoveryContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
KafkaTemplate<Object, Object> kafkaTemplate
) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());
var recover = new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, e) -> new TopicPartition("t_food_order_dlt", record.partition()));
var backOff = new FixedBackOff(10_000, 3);
var errorHandler = new SeekToCurrentErrorHandler(recover, backOff);
factory.setErrorHandler(errorHandler);
return factory;
}