안녕하세요, 하마연구소 입니다.
스프링 어플리케이션에서 카프카 토픽에서 값을 쉽게 가져오기 위하여 @KafkaListener
어노테이션을 사용합니다.
여러개의 Kafka 서버에 접근해야할 필요가 있어서, 즉 @KafkaListener
어노테이션을 여러개 사용해야 해서, KafkaListenerContainerFactory
를 수동으로 만들어야했습니다.
기왕 만드는김에 최대한 Spring에서 기본적으로 동작하는 방식으로 처리하려고 하니, ConcurrentKafkaListenerContainerFactoryConfigurer
이 녀석을 사용해야했습니다.
(아따 이름 엄청 기네요~~~)
spring-boot 버전 1.5.19를 사용하고 있으며, 그에따라 spring-boot-autoconfigure도 버전 1.5.19 입니다.
아래는 ConcurrentKafkaListenerContainerFactoryConfigurer.java
파일입니다.
/*
* Copyright 2012-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.config.ContainerProperties;
/**
* Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults.
*
* @author Gary Russell
* @since 1.5.0
*/
public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private KafkaProperties properties;
/**
* Set the {@link KafkaProperties} to use.
* @param properties the properties
*/
void setKafkaProperties(KafkaProperties properties) {
this.properties = properties;
}
/**
* Configure the specified Kafka listener container factory. The factory can be
* further tuned and default settings can be overridden.
* @param listenerContainerFactory the {@link ConcurrentKafkaListenerContainerFactory}
* instance to configure
* @param consumerFactory the {@link ConsumerFactory} to use
*/
public void configure(
ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
ConsumerFactory<Object, Object> consumerFactory) {
listenerContainerFactory.setConsumerFactory(consumerFactory);
Listener container = this.properties.getListener();
ContainerProperties containerProperties = listenerContainerFactory
.getContainerProperties();
if (container.getAckMode() != null) {
containerProperties.setAckMode(container.getAckMode());
}
if (container.getAckCount() != null) {
containerProperties.setAckCount(container.getAckCount());
}
if (container.getAckTime() != null) {
containerProperties.setAckTime(container.getAckTime());
}
if (container.getPollTimeout() != null) {
containerProperties.setPollTimeout(container.getPollTimeout());
}
if (container.getConcurrency() != null) {
listenerContainerFactory.setConcurrency(container.getConcurrency());
}
}
}
setKafkaProperties()
메서드의 접근제어자(Access Modifier)가 default 입니다.
하~~~~ 한숨이 나옵니다.ConcurrentKafkaListenerContainerFactoryConfigurer
인스턴스로 생성하고 setKafkaProperties()
메서드 호출하고 configure() 메서드 호출해야하는데요.
왜, public으로 안해놨을까요?
setKafkaProperties()
을 사용할 수 있는 방법은 reflection을 이용하여 강제로 호출하는 방법, 또는 나의 프로젝트안에 org.springframework.boot.autoconfigure.kafka 패키지를 만들어서 이 패키지 안에서 호출하는 방법, 또는 ConcurrentKafkaListenerContainerFactoryConfigurer.java
내의 코드를 복사하는 방법이 있습니다.
단순하게 문제해결만을 위해서는 이 방법들을 사용하면 되겠지만, 뭔가 아름답지는 않습니다.
참고로 스프링부트 버전 2.X 를 확인해봐도 setKafkaProperties() 이 메서드의 접근제어자는 동일하게 default 입니다.
우선 프로젝트는 진행해야하니 reflection을 이용하여 강제로 setKafkaProperties()
메서드를 호출하였습니다.
// setKafkaProperties()가 access modifier가 public이 아니라서 reflection으로 강제 수행
Method setKafkaPropertiesMethod = ReflectionUtils.findMethod(configurer.getClass(), "setKafkaProperties", KafkaProperties.class);
ReflectionUtils.makeAccessible(setKafkaPropertiesMethod);
ReflectionUtils.invokeMethod(setKafkaPropertiesMethod, configurer, kafkaProperties);
뭔가 더 깔끔한 방법이 있을까, 고민되긴 합니다.
감사합니다.