Backend/Kafka

[Kafka] multi KafkaTemplate

findmypiece 2022. 2. 11. 21:52
728x90

SpringBoot 에서 Kafka 연동은 매우 간단하다. org.springframework.kafka:spring-kafka 의존성을 추가하고 application.yml 에 Kafka 관련 구성정보만 추가하면 된다.

 

그런데 서로 다른 여러개의 브로커를 동시에 참조해야 할 때는 이 방법을 사용할 수 없다. application.yml 에 Kafka 브로커 정보는 spring.kafka.bootstrap-servers 에 지정하고 "," 로 구분된 여러개의 브로커 host를 지정할 수도 있긴 한데 이는 HA, LB를 위한 구성으로 순차적으로 접근하되 정상적인 상태의 단일 브로커만 연결된다. 또한 HA, LB를 위한 구성인만큼 지정된 브로커들은 기본적으로 모두 동일한 토픽이 존재해야 한다.

 

서로 다른 여러개의 브로커를 참조해야 한다면 KafkaTemplate 를 여러개 구성할 수 밖에 없다. 별로 어려울 것은 없다. 아래와 같이 그냥 javaConfig 방식으로 Kafka 구성정보를 만들고 이를 가지고 KafkaTemplate Bean 을 만들면 된다. 참고로 아래 코드에서 ApplicationProp는 @ConfigurationProperties 를 이용해서 구성정보가 바인딩 된 객체이다.

@Configuration
class KafkaConfig(
    private val applicationProp: ApplicationProp,
) {

    private val firstBroker = applicationProp.kafka.firstBroker
    private val secondBroker = applicationProp.kafka.secondBroker

    @Bean
    fun firstKafkaTemplate(): KafkaTemplate<String, String> =
        KafkaTemplate(
            DefaultKafkaProducerFactory(
                mapOf(
                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to firstBroker.host,
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
                )
            )
        )

    @Bean
    fun secondKafkaTemplate(): KafkaTemplate<String, String> =
        KafkaTemplate(
            DefaultKafkaProducerFactory(
                mapOf(
                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to secondBroker.host,
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
                )
            )
        )
}

 

그런데 한 가지 주의할 점이 있다. application.yml 를 통해 Kafka 를 연동할 경우 브로커 host 만 spring.kafka.bootstrap-servers 에 지정하면 기본적으로 잘 동작한다. 이것만 지정하더라도 연동에 필요한 필수 구성정보가 default 값으로 설정되기 때문이다.

 

Kafka 연동에 필요한 필수 구성정보는 여러가지가 있겠지만 최소한 브로커 host와 메시지 key, value 의 직렬화 정보는 반드시 필요하다. application.yml 방식을 사용할 경우 key, value 의 직렬화 방식을 지정하지 않으면 SpringSerializer 가 default 로 설정된다.

 

그런데 javaConfig 방식을 사용할 경우에는 key, value 의 직렬화 정보를 지정하지 않으면 default 값이 자동으로 구성되지 않는다. 그렇다고 모든 필수 구성정보의 default 값이 자동 구성되지 않는 것은 아니다.

 

하지만 key, value 의 직렬화 정보는 구성되지 않기 때문에 지정하지 않으면 프로듀싱시 에러가 발생한다. 이에 javaConfig 방식으로 Kafka 를 연동할 경우 BOOTSTRAP_SERVERS_CONFIG, KEY_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_CLASS_CONFIG 는 반드시 지정해 줘야 한다.

 

https://always-kimkim.tistory.com/entry/kafka101-configuration-bootstrap-servers
728x90