Backend/Kafka

[Kafka] Consumer Error Handling(retry, recovery)

findmypiece 2022. 2. 15. 00:55
728x90

spring-kafka 을 통해 Consumer 구현하면 컨슈밍이 실패했을 때 기본적으로 최초 요청을 포함해서 10회까지 재시도한다. 그리고 재시도가 모두 실패하면 해당 메시지 skip 된다.

 

만약 이러한 정책을 커스텀하게 운영하고 싶다면 ConcurrentKafkaListenerContainerFactory 을 재정의하면 된다. spring-kafka 2.8 이전 버전에서는 setRetryTemplate 을 통해 retry 정책을 정의하고 setRecoveryCallback 를 통해 recovery 로직을 정의하면 됐다.

 

그런데 spring-kafka 2.8 버전부터 setRetryTemplate 은 deprecated 되었고, setRecoveryCallback 는 아직 있지만 대부분 아래와 같이 setCommonErrorHandler 로 한번에 처리하는 것 같다. 

@Bean
fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<String, String>):
    ConcurrentKafkaListenerContainerFactory<String, String> {

    val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
    factory.consumerFactory = consumerFactory
    factory.setCommonErrorHandler(customErrorHandler())

    return factory
}

private fun customErrorHandler(): DefaultErrorHandler {
    val errorHandler =  DefaultErrorHandler(
        {consumerRecord, exception ->
            logger.error("""
                ${consumerRecord.topic()} consume Failure.
                cause: ${exception.message}
                message key: ${consumerRecord.key()}
                message value: ${consumerRecord.value()}
            """.trimIndent())
        FixedBackOff(1000, 5)
    )

    errorHandler.addNotRetryableExceptions(CustomException::class.java)

    return errorHandler
}

 

DefaultErrorHandler의 두번째 인자로 FixedBackOff를 통해 retry 정책을 정의하고 첫번째 인자로 추가되는 람다식이 retry가 모두 실패한 뒤에 수행될 recovery 함수이다. setRecoveryCallback 는 단독으로 사용할 때 어떻게 동작하는지는 확인하지 못했다.

 

그 외에 addNotRetryableExceptions.addNotRetryableExceptions() 를 통해 retry 하지 않을 Exception을 추가할 수 있다.

 

https://blogs.perficient.com/2021/02/15/kafka-consumer-error-handling-retry-and-recovery/
https://stackoverflow.com/questions/71051529/how-to-replace-deprecated-seektocurrenterrorhandler-with-defaulterrorhandler-sp/71052157
https://stackoverflow.com/questions/70884203/spring-boot-kafka-configure-defaulterrorhandler
https://johnnn.tech/q/what-do-i-do-to-enter-the-consumerrecordrecoverer-handler-after-exhausting-all-repeated-attempts-to-process-messages-in-the-kafka/
728x90

'Backend > Kafka' 카테고리의 다른 글

Kafka, Kafka Streams  (0) 2022.09.07
[Kafka] multi KafkaTemplate  (0) 2022.02.11
[Kafka] 삽질 기록..  (0) 2022.02.10
[Kafka] 카프카 명령어 정리  (0) 2021.12.30