카프카 프로듀서와 컨슈머를 예전에 개발한 경험이 있는데 이번에 다시 개발하려니 예전에 잘못알고 있던 것도 있었고 새로 알게된 것 중 모호했던 것들을 정리한다.
1. spring-kafka, spring-cloud-stream
예전에 프로듀서와 컨슈머를 만들 때에는 spring-kafka 의존성을 사용했다. 그런데 리서치하다보니 spring-cloud-stream 라는 의존성도 있었다.
사실 이 중 뭘 사용해도 상관은 없다. 차이가 있다면 spring-kafka 는 메시지큐 중 Kafka 에 종속적이고 spring-cloud-stream 는 Kafka 외에 RabbitMQ 같은 다른 메시지큐도 지원하기 때문에 구성이 좀 더 추상적이다. 이에 구현은 spring-kafka 이 더 쉽지만 기능은 spring-cloud-stream 이 더 많다.
spring-cloud-stream 를 사용할 경우 프로듀서 구현은 큰 차이가 없지만 컨슈머의 경우 메시지를 분석해서 또 다른 토픽으로 메시지를 보내야 한다면 이를 위한 파이프라인을 함수형 프로그래밍 방식으로 보다 쉽게 구성할 수 있다.
이러한 컨슈머 구현방식은 kafka-streams 를 이용하는 걸로 보인다. kafka-streams 는 카프카의 프로듀서, 컨슈머를 모두 포함하는 것이 아니라 컨슈머에만 해당하는 라이브러리로 연속적인 파이프라인 구성을 도와준다.
나는 결국 spring-kafka 를 선택했는데 Kafka 만 사용하기도 했지만 무엇보다 spring-cloud-stream 을 사용할 경우 프로듀싱 시 KafkaTemplate 이 아닌 StreamBridge 를 이용하게 되는데 프로듀싱 결과에 따른 후처리를 하는 방법을 찾지 못해서이다.
KafkaTemplate 를 이용할 경우 동기방식과 비동기방식이 존재하고 각각 프로듀싱의 성공 여부에 따라 적절한 처리를 할 수 있다. 그런데 StreamBridge 의 경우 기본적으로 비동기 방식이라는 것은 확인이 되었지만 프로듀싱의 성공 여부를 어떻게 체크하는지는 찾지 못했다.
2. exactly once
프로듀서를 구현하다가 application.yml 에 명시할 구성정보를 리서치 하다보니 exactly once 라는 용어에 눈에 띄였다. 단순하게 사용한다면 spring.kafka.bootstrap-servers 에 브로커 host만 명시하면 된다. 이 경우 프로듀서는 at-least-once 방식으로 동작하게 된다.
at-least-once는 메시지가 최소 한번은 브로커에 전달된다는 의미로 프로듀싱에 실패하더라도 지속적인 재시도를 통해 메시지 전달을 보장하는 방식이다.
이는 메시지의 유실은 방지하지만 메시지의 중복이 발생할 수 있다. 프로듀싱에 실패하는 경우는 다양하겠지만 timeout 으로 실패하는 경우도 있는데 이런 상황에서 재시도를 하기 때문에 브로커에는 동일한 메시지가 중복으로 쌓일 수 있고 이를 소비하는 컨슈머에서도 메시지가 중복으로 처리될 수 있는 것이다.
이러한 메시지의 중복을 막기 위해 Kafka 에서는 exactly once 방식의 프로듀서를 지원한다. 용어 그대로 브로커에 메시지를 딱 한번만 적재한다.
spring-kafka 를 사용한다면 application.yml 에서 spring.kafka.producer.properties.enable.idempotence를 true로 지정하면 된다.
참고로 이 설정을 하면 spring.kafka.producer.acks 도 강제로 all 로 할당되고 임의로 1로 지정하면 프로듀싱 시 Exception 이 발생한다.
프로듀서 에서 메시지를 보내면 브로커에서는 이게 정상적으로 적재되었다는 신호를 보내게 되는데 그게 ack 이다. spring.kafka.producer.acks 는 브로커가 ack를 보내는 기준을 지정하는 것이다.
예를 들어 브로커의 HA 구성을 위해 Replicas 을 여러개 둔 상태에서 spring.kafka.producer.acks 를 1로 지정했다면 브로커에서는 마스터에만 메시지가 정상 저장되면 ack를 보내게 되고 all 로 지정했다면 모든 Replicas에 메시지가 Replication 되어야만 ack 를 보내게 된다.
예상할 수 있듯이 exactly once 방식의 프로듀서를 사용할 경우 사용하지 않을 때보다 프로듀싱 시간이 좀 더 소요된다. 딱 한번만 적재되는 대신 처리시간이 늘어나는 것이다.
그런데 현업에서 별로 추천하는 옵션은 아니다. 속도도 속도인데 HA 구성된 Replicas 중 하나의 작은 장애에도 메시지 적재가 실패할 수 있기 때문이다.
현실적으로 마스터 브로커에만 메시지가 잘 적재되었다면 프로듀서 측에서는 성공으로 보는게 맞다. HA 구성된 전체 Replicas 에 잘 복사되는지 여부는 어떻게 보면 프로듀서의 책임 영역이 아니다.
3. 컨슈머 중복처리
프로듀서에서 exactly once 를 통해 메시지 중복 적재를 방지한다고 하더라도 메시지에 대한 처리는 컨슈머에서 중복으로 발생할 수 있다. 결론부터 말하면 컨슈머에서 메시지가 중복처리되는 것은 카프카 설정만으로 완전히 해결할 수 있는 방법은 없다. 다만 컨슈머에서의 메시지 처리는 유실도 발생할 수 있는데 그건 설정을 통해 방지할 수 있다.
컨슈머에서도 메시지를 처리한 뒤에 브로커로 정상적으로 처리되었다는 신호를 보내게 되는게 이를 offset commit 이라고 한다. offset 은 브로커에 쌓여있는 연속된 데이터 중 컨슈머가 읽어갈 위치를 의미하는 것으로 컨슈머가 장애복구나 신규 추가로 리밸런스가 일어날 경우 참조된다.
offset commit 은 기본적으로 자동커밋인데 이 경우 일정주기(default 5초)마다 poll() 한 마지막 offset을 commit 한다. 메시지 처리 여부와 무관하게 offset commit 이 먼저 수행되는 셈이다.
이 상황에서 메시지 처리 도중 리밸런스가 발생할 경우 메시지는 전부 유실되게 된다. poll() 한 메시지가 모두 처리 되더라도 다음번 자동커밋 주기 전에 리밸런스가 발생한다면 메시지가 중복으로 처리되게 된다.
이를 방지하기 위해 읽어온 메시지가 정상처리 된 뒤에 수동으로 offset commit 을 하도록 설정할 수 있다. application.yml 에 spring.kafka.consumer.enable-auto-commit 을 false 로 지정하면 된다.
수동커밋을 사용할 경우 메시지 처리가 완료된 뒤에 offser commit 을 날리기 때문에 메시지가 유실 되지는 않지만 메시지 처리 도중 리밸런스가 발생할 경우 여전히 메시지 처리는 중복이 발생할 수 있다.
그나마 이를 최소화 할 수 있는 방법은 application.yml 파일에 spring.kafka.listener.ack-mode 로 수동커밋 방식을 상세하게 컨트롤 하는 것이다. 여기에 지정할 수 있는 값은 아래와 같으며 default 는 BATCH 이다. 이를 RECORD 바꿔주면 중복처리되더라도 그 메시지 범위를 최소화 할 수 있다.
- RECORD: 수신기에 의해 각 레코드가 처리된 후 offiset을 commit 한다.
- BATCH : 다음 폴링 전에 직전까지 처리된 메시지 offiset을 commit 한다.
- TIME: AckTime 마다 직전까지 처리된 메시지 offset을 commit 한다.
(AckTime 는 밀리세컨드초로 지정하며 기본값은 5초이다) - COUNT : AckCount 마다 직전까지 처리된 메시지 offset을 commit 한다.
(AckCount 기본값은 1이다) - COUNT_TIME : COUNT 또는 TIME 중 하나의 조건이 충족할 경우 메시지 offset을 commit 한다.
- MANUAL : BATCH 방식과 비슷한데 다른 점은 Ack 처리를 컨슈머에서 직접 해줘야 한다.
- MANUAL_IMMEDIATE : Ack 처리를 컨슈머에서 직접 해줘야 하고 즉시 offiset이 commit 된다.
하지만 이 경우에도 우리가 익히 알고 있는 timeout 예외로 인해 중복은 발생할 수 있다. 컨슈머에서 브로커에서 kafka.consumer.max-poll-records(default: 500) 단위로 메시지를 읽어와서 kafka.consumer.properties.max.poll.interval.ms(default: 5분) 안에 추가 폴링을 해와야 한다.
만약 그렇게 하지 않으면 브로커 입장에서는 해당 컨슈머가 장애상황이라고 판단해서 리벨런스를 통해 파티션을 다른 컨슈머에게 연결하게 되는데 기존 컨슈머에서 여전히 메시지 처리가 수행중이라면 메시지는 중복으로 소비되게 된다.
만약 컨슈머의 메시지 유실과 더불어 중복처리도 완전히 해결하려면 코드 레벨에서 추가 작업을 해주면 되는데 컨슈밍 도중 예외가 발생할 경우 catch 로 잡아 추가 전파를 막는 것이다.
이 경우 예외를 catch 로 잡았기 때문에 브로커에는 정상적으로 offset commit 이 수행되고 코드레벨에서 catch 로 잡은 메시지만 추후 별도로 처리하면 된다.
이 때 1개 메시지당 컨슈머 처리의 타임아웃 시간도 지정해줘야 kafka.consumer.properties.max.poll.interval.ms 에 의한 예외가 발생하지 않을텐데 500건/5분 을 하면 1건당 처리시간이 0.6초로 떨어져야 한다. 즉, 컨슈머 로직의 처리시간이 0.6초가 넘어가면 예외를 발생시키고 catch 로 잡으면 되는 것이다.
만약 0.6초의 시간이 부담스럽다면 kafka.consumer.max-poll-records 를 줄이거나 kafka.consumer.properties.max.poll.interval.ms를 늘리면 되겠다.
마지막으로 offset 은 컨슈머의 리밸런스가 일어나지 않으면 참조되지 않는다고 했는데 컨슈머에서 Exception 이 발생하여 재시도 되는 경우는 어떨까?
spring-kafka 에서는 컨슈머 처리 도중 Exception 이 발생할 경우 자체적으로 최초 처리를 포함해서 10번의 재시도를 수행하도록 되어 있다. 이러한 재시도 역시 offset commit 을 참조하지 않고 현재 처리 중인 메시지를 가지고 재시도를 할 뿐이다.
3.kafka transaction
exactly once 를 리서치 하다보면 kafka transaction 도 많이 등장한다. exactly once 에 kafka transaction 이 활용되는 것처럼 보이지만 exactly once 방식의 프로듀서만 필요하다면 kafka transaction 는 굳이 필요없다.
kafka transaction 은 우리가 흔히 아는 트랜잭션으로 all or nothing 를 위한 설정일 뿐이다. 즉, 브로커에 적재하는 메시지에 대해 트랜잭션을 적용하는 것이다.
참고로 exactly once 를 구현할 경우 컨슈머에 spring.kafka.consumer.isolation-level 를 read_committed 로 설정해야 한다는 글도 간혹 보이는데 이는 kafka transaction 을 사용할때 유효한 설정이다.
https://velog.io/@ehdrms2034/카프카-프로듀서-옵션
https://stackoverflow.com/questions/68542265/how-to-setup-kafka-idempotent-producer-in-spring-boot
https://stackoverflow.com/questions/58894281/difference-between-idempotence-and-exactly-once-in-kafka-stream
https://stackoverflow.com/questions/64939661/does-enabling-idempotence-on-a-kafka-producer-decrease-throughput
https://blog.voidmainvoid.net/354
https://stackoverflow.com/questions/52570118/spring-kafka-and-exactly-once-delivery-guarantee
https://sjo200.tistory.com/45
https://dydwnsekd.tistory.com/83
https://gunju-ko.github.io/kafka/2018/03/31/Kafka-Transaction.html
'Backend > Kafka' 카테고리의 다른 글
Kafka, Kafka Streams (0) | 2022.09.07 |
---|---|
[Kafka] Consumer Error Handling(retry, recovery) (0) | 2022.02.15 |
[Kafka] multi KafkaTemplate (0) | 2022.02.11 |
[Kafka] 카프카 명령어 정리 (0) | 2021.12.30 |