Backend/Kafka
[Kafka] Consumer Error Handling(retry, recovery)
findmypiece
2022. 2. 15. 00:55
728x90
spring-kafka 을 통해 Consumer 구현하면 컨슈밍이 실패했을 때 기본적으로 최초 요청을 포함해서 10회까지 재시도한다. 그리고 재시도가 모두 실패하면 해당 메시지 skip 된다.
만약 이러한 정책을 커스텀하게 운영하고 싶다면 ConcurrentKafkaListenerContainerFactory 을 재정의하면 된다. spring-kafka 2.8 이전 버전에서는 setRetryTemplate 을 통해 retry 정책을 정의하고 setRecoveryCallback 를 통해 recovery 로직을 정의하면 됐다.
그런데 spring-kafka 2.8 버전부터 setRetryTemplate 은 deprecated 되었고, setRecoveryCallback 는 아직 있지만 대부분 아래와 같이 setCommonErrorHandler 로 한번에 처리하는 것 같다.
@Bean
fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<String, String>):
ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory
factory.setCommonErrorHandler(customErrorHandler())
return factory
}
private fun customErrorHandler(): DefaultErrorHandler {
val errorHandler = DefaultErrorHandler(
{consumerRecord, exception ->
logger.error("""
${consumerRecord.topic()} consume Failure.
cause: ${exception.message}
message key: ${consumerRecord.key()}
message value: ${consumerRecord.value()}
""".trimIndent())
FixedBackOff(1000, 5)
)
errorHandler.addNotRetryableExceptions(CustomException::class.java)
return errorHandler
}
DefaultErrorHandler의 두번째 인자로 FixedBackOff를 통해 retry 정책을 정의하고 첫번째 인자로 추가되는 람다식이 retry가 모두 실패한 뒤에 수행될 recovery 함수이다. setRecoveryCallback 는 단독으로 사용할 때 어떻게 동작하는지는 확인하지 못했다.
그 외에 addNotRetryableExceptions.addNotRetryableExceptions() 를 통해 retry 하지 않을 Exception을 추가할 수 있다.
https://blogs.perficient.com/2021/02/15/kafka-consumer-error-handling-retry-and-recovery/
https://stackoverflow.com/questions/71051529/how-to-replace-deprecated-seektocurrenterrorhandler-with-defaulterrorhandler-sp/71052157
https://stackoverflow.com/questions/70884203/spring-boot-kafka-configure-defaulterrorhandler
https://johnnn.tech/q/what-do-i-do-to-enter-the-consumerrecordrecoverer-handler-after-exhausting-all-repeated-attempts-to-process-messages-in-the-kafka/
728x90