[Spring] ConcurrentKafkaListenerContainerFactoryConfigurer를 사용하고 싶다.

안녕하세요, 하마연구소 입니다.

스프링 어플리케이션에서 카프카 토픽에서 값을 쉽게 가져오기 위하여 @KafkaListener 어노테이션을 사용합니다.
여러개의 Kafka 서버에 접근해야할 필요가 있어서, 즉 @KafkaListener 어노테이션을 여러개 사용해야 해서, KafkaListenerContainerFactory를 수동으로 만들어야했습니다.
기왕 만드는김에 최대한 Spring에서 기본적으로 동작하는 방식으로 처리하려고 하니, ConcurrentKafkaListenerContainerFactoryConfigurer 이 녀석을 사용해야했습니다.
(아따 이름 엄청 기네요~~~)

spring-boot 버전 1.5.19를 사용하고 있으며, 그에따라 spring-boot-autoconfigure도 버전 1.5.19 입니다.
아래는 ConcurrentKafkaListenerContainerFactoryConfigurer.java 파일입니다.

/*
 * Copyright 2012-2016 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.boot.autoconfigure.kafka;

import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.config.ContainerProperties;

/**
 * Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults.
 *
 * @author Gary Russell
 * @since 1.5.0
 */
public class ConcurrentKafkaListenerContainerFactoryConfigurer {

	private KafkaProperties properties;

	/**
	 * Set the {@link KafkaProperties} to use.
	 * @param properties the properties
	 */
	void setKafkaProperties(KafkaProperties properties) {
		this.properties = properties;
	}

	/**
	 * Configure the specified Kafka listener container factory. The factory can be
	 * further tuned and default settings can be overridden.
	 * @param listenerContainerFactory the {@link ConcurrentKafkaListenerContainerFactory}
	 * instance to configure
	 * @param consumerFactory the {@link ConsumerFactory} to use
	 */
	public void configure(
			ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
			ConsumerFactory<Object, Object> consumerFactory) {
		listenerContainerFactory.setConsumerFactory(consumerFactory);
		Listener container = this.properties.getListener();
		ContainerProperties containerProperties = listenerContainerFactory
				.getContainerProperties();
		if (container.getAckMode() != null) {
			containerProperties.setAckMode(container.getAckMode());
		}
		if (container.getAckCount() != null) {
			containerProperties.setAckCount(container.getAckCount());
		}
		if (container.getAckTime() != null) {
			containerProperties.setAckTime(container.getAckTime());
		}
		if (container.getPollTimeout() != null) {
			containerProperties.setPollTimeout(container.getPollTimeout());
		}
		if (container.getConcurrency() != null) {
			listenerContainerFactory.setConcurrency(container.getConcurrency());
		}
	}

}

setKafkaProperties() 메서드의 접근제어자(Access Modifier)가 default 입니다.
하~~~~ 한숨이 나옵니다.
ConcurrentKafkaListenerContainerFactoryConfigurer 인스턴스로 생성하고 setKafkaProperties() 메서드 호출하고 configure() 메서드 호출해야하는데요.

왜, public으로 안해놨을까요?

setKafkaProperties()을 사용할 수 있는 방법은 reflection을 이용하여 강제로 호출하는 방법, 또는 나의 프로젝트안에 org.springframework.boot.autoconfigure.kafka 패키지를 만들어서 이 패키지 안에서 호출하는 방법, 또는 ConcurrentKafkaListenerContainerFactoryConfigurer.java 내의 코드를 복사하는 방법이 있습니다.

단순하게 문제해결만을 위해서는 이 방법들을 사용하면 되겠지만, 뭔가 아름답지는 않습니다.

참고로 스프링부트 버전 2.X 를 확인해봐도 setKafkaProperties() 이 메서드의 접근제어자는 동일하게 default 입니다.

우선 프로젝트는 진행해야하니 reflection을 이용하여 강제로 setKafkaProperties() 메서드를 호출하였습니다.

// setKafkaProperties()가 access modifier가 public이 아니라서 reflection으로 강제 수행
Method setKafkaPropertiesMethod = ReflectionUtils.findMethod(configurer.getClass(), "setKafkaProperties", KafkaProperties.class);
ReflectionUtils.makeAccessible(setKafkaPropertiesMethod);
ReflectionUtils.invokeMethod(setKafkaPropertiesMethod, configurer, kafkaProperties);

뭔가 더 깔끔한 방법이 있을까, 고민되긴 합니다.

감사합니다.

0 0 votes
Article Rating
Subscribe
Notify of
guest

0 Comments
Most Voted
Newest Oldest
Inline Feedbacks
View all comments