[Spring] ConcurrentKafkaListenerContainerFactoryConfigurer를 사용하고 싶다.
안녕하세요, 하마연구소입니다.
스프링 어플리케이션에서 카프카 토픽에서 값을 쉽게 가져오기 위하여 @KafkaListener 어노테이션을 사용합니다.
여러개의 Kafka 서버에 접근해야할 필요가 있어서, 즉 @KafkaListener 어노테이션을 여러개 사용해야 해서, KafkaListenerContainerFactory를 수동으로 만들어야했습니다.
기왕 만드는김에 최대한 Spring에서 기본적으로 동작하는 방식으로 처리하려고 하니, ConcurrentKafkaListenerContainerFactoryConfigurer 이 녀석을 사용해야했습니다.
(아따 이름 엄청 기네요~~~)
spring-boot 버전 1.5.19를 사용하고 있으며, 그에따라 spring-boot-autoconfigure도 버전 1.5.19 입니다.
아래는 ConcurrentKafkaListenerContainerFactoryConfigurer.java 파일입니다.
setKafkaProperties() 메서드의 접근제어자(Access Modifier)가 default 입니다.
하~~~~ 한숨이 나옵니다.
ConcurrentKafkaListenerContainerFactoryConfigurer 인스턴스로 생성하고 setKafkaProperties() 메서드 호출하고 configure() 메서드 호출해야하는데요.
왜, public으로 안해놨을까요?
setKafkaProperties()을 사용할 수 있는 방법은 reflection을 이용하여 강제로 호출하는 방법, 또는 나의 프로젝트안에 org.springframework.boot.autoconfigure.kafka 패키지를 만들어서 이 패키지 안에서 호출하는 방법, 또는 ConcurrentKafkaListenerContainerFactoryConfigurer.java 내의 코드를 복사하는 방법이 있습니다.
단순하게 문제해결만을 위해서는 이 방법들을 사용하면 되겠지만, 뭔가 아름답지는 않습니다.
참고로 스프링부트 버전 2.X 를 확인해봐도 setKafkaProperties() 이 메서드의 접근제어자는 동일하게 default 입니다.
우선 프로젝트는 진행해야하니 reflection을 이용하여 강제로 setKafkaProperties() 메서드를 호출하였습니다.
뭔가 더 깔끔한 방법이 있을까, 고민되긴 합니다.
감사합니다.
스프링 어플리케이션에서 카프카 토픽에서 값을 쉽게 가져오기 위하여 @KafkaListener 어노테이션을 사용합니다.
여러개의 Kafka 서버에 접근해야할 필요가 있어서, 즉 @KafkaListener 어노테이션을 여러개 사용해야 해서, KafkaListenerContainerFactory를 수동으로 만들어야했습니다.
기왕 만드는김에 최대한 Spring에서 기본적으로 동작하는 방식으로 처리하려고 하니, ConcurrentKafkaListenerContainerFactoryConfigurer 이 녀석을 사용해야했습니다.
(아따 이름 엄청 기네요~~~)
spring-boot 버전 1.5.19를 사용하고 있으며, 그에따라 spring-boot-autoconfigure도 버전 1.5.19 입니다.
아래는 ConcurrentKafkaListenerContainerFactoryConfigurer.java 파일입니다.
/* * Copyright 2012-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.springframework.boot.autoconfigure.kafka; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.config.ContainerProperties; /** * Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults. * * @author Gary Russell * @since 1.5.0 */ public class ConcurrentKafkaListenerContainerFactoryConfigurer { private KafkaProperties properties; /** * Set the {@link KafkaProperties} to use. * @param properties the properties */ void setKafkaProperties(KafkaProperties properties) { this.properties = properties; } /** * Configure the specified Kafka listener container factory. The factory can be * further tuned and default settings can be overridden. * @param listenerContainerFactory the {@link ConcurrentKafkaListenerContainerFactory} * instance to configure * @param consumerFactory the {@link ConsumerFactory} to use */ public void configure( ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory, ConsumerFactory<Object, Object> consumerFactory) { listenerContainerFactory.setConsumerFactory(consumerFactory); Listener container = this.properties.getListener(); ContainerProperties containerProperties = listenerContainerFactory .getContainerProperties(); if (container.getAckMode() != null) { containerProperties.setAckMode(container.getAckMode()); } if (container.getAckCount() != null) { containerProperties.setAckCount(container.getAckCount()); } if (container.getAckTime() != null) { containerProperties.setAckTime(container.getAckTime()); } if (container.getPollTimeout() != null) { containerProperties.setPollTimeout(container.getPollTimeout()); } if (container.getConcurrency() != null) { listenerContainerFactory.setConcurrency(container.getConcurrency()); } } }
setKafkaProperties() 메서드의 접근제어자(Access Modifier)가 default 입니다.
하~~~~ 한숨이 나옵니다.
ConcurrentKafkaListenerContainerFactoryConfigurer 인스턴스로 생성하고 setKafkaProperties() 메서드 호출하고 configure() 메서드 호출해야하는데요.
왜, public으로 안해놨을까요?
setKafkaProperties()을 사용할 수 있는 방법은 reflection을 이용하여 강제로 호출하는 방법, 또는 나의 프로젝트안에 org.springframework.boot.autoconfigure.kafka 패키지를 만들어서 이 패키지 안에서 호출하는 방법, 또는 ConcurrentKafkaListenerContainerFactoryConfigurer.java 내의 코드를 복사하는 방법이 있습니다.
단순하게 문제해결만을 위해서는 이 방법들을 사용하면 되겠지만, 뭔가 아름답지는 않습니다.
참고로 스프링부트 버전 2.X 를 확인해봐도 setKafkaProperties() 이 메서드의 접근제어자는 동일하게 default 입니다.
우선 프로젝트는 진행해야하니 reflection을 이용하여 강제로 setKafkaProperties() 메서드를 호출하였습니다.
// setKafkaProperties()가 access modifier가 public이 아니라서 reflection으로 강제 수행 Method setKafkaPropertiesMethod = ReflectionUtils.findMethod(configurer.getClass(), "setKafkaProperties", KafkaProperties.class); ReflectionUtils.makeAccessible(setKafkaPropertiesMethod); ReflectionUtils.invokeMethod(setKafkaPropertiesMethod, configurer, kafkaProperties);
뭔가 더 깔끔한 방법이 있을까, 고민되긴 합니다.
감사합니다.
댓글
댓글 쓰기