SpringBoot动态创建Kafka消费者

young 1,760 2022-10-10

官网地址

声明自定义接口,用于定义监听的topic,group,以及数据处理逻辑

public interface KafkaConsumer<T> {

    /**
     * 获取topic集合
     */
    String[] topics();

    /**
     * 获取监听group
     */
    String group();

    /**
     * 处理数据
     * @param data 从kafka拉取的数据
     */
    void process(T data);

    /**
     * 配置容器
     * @param container 容器实例
     */
    default void deployContainer(ConcurrentMessageListenerContainer<String, T> container){

    }
}

通过Spring容器管理所有自定义接口的实现,并通过ConcurrentKafkaListenerContainerFactory创建ConcurrentMessageListenerContainer容器,对容器进行配置,如果在Spring Boot的配置文件中进行了配置,那么会在创建容器时传入。

创建容器之后,将容器启动即可。

Spring提供了8个用于监听的接口,可通过官网进行查看,选择自己需要的接口实现。

@ConditionalOnClass(value = {ConcurrentKafkaListenerContainerFactory.class, KafkaProperties.class})
@EnableKafka
@Configuration
public class KafkaConsumerListenerConfig {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerListenerConfig.class);

    public <T> KafkaConsumerListenerConfig(ConcurrentKafkaListenerContainerFactory<String, T> containerFactory,
                                           List<KafkaConsumer<T>> consumers) {
        log.info("======>>初始化kafka监听");
        registryListener(containerFactory, consumers);
        log.info("<<======初始化kafka监听");
    }

    /**
     * 注册监听器
     */
    private <T> void registryListener(ConcurrentKafkaListenerContainerFactory<String, T> factory,
                                      List<KafkaConsumer<T>> consumers) {

        if (CollectionUtils.isEmpty(consumers)) {
            log.info("无需要初始化的监听");
            return;
        }

        for (KafkaConsumer<T> consumer : consumers) {
            log.info("初始化监听:topic:【{}】,group:【{}】", Arrays.asList(consumer.topics()), consumer.group());
            checkConsumer(consumer);
            ConcurrentMessageListenerContainer<String, T> container = createListenerContainer(factory, consumer);
            container.start();
        }
    }

    private <T> ConcurrentMessageListenerContainer<String, T> createListenerContainer(ConcurrentKafkaListenerContainerFactory<String, T> factory,
                                                                                      KafkaConsumer<T> consumer) {
        ConcurrentMessageListenerContainer<String, T> container = factory.createContainer(consumer.topics());
        container.setBeanName(consumer.group() + "-" + String.join("-", consumer.topics()));
        container.setConcurrency(3);
        consumer.deployContainer(container);
        // 防止被修改的配置
        ContainerProperties containerProperties = container.getContainerProperties();
        containerProperties.setMessageListener(new Listener<>(consumer));
        containerProperties.setAsyncAcks(false);
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        containerProperties.setGroupId(consumer.group());
        return container;
    }

    /**
     * 检查监听配置
     */
    private <T> void checkConsumer(KafkaConsumer<T> consumer) {
        Class<? extends KafkaConsumer> clazz = consumer.getClass();
        String[] topics = consumer.topics();
        Assert.notEmpty(topics, clazz + " topics method return is empty!");
        Assert.noNullElements(topics, clazz + " topics method return has null element!");
        String group = consumer.group();
        Assert.hasText(group, clazz + " group method return is blank!");
    }

    /**
     * 定义监听
     */
    private static class Listener<T> implements AcknowledgingConsumerAwareMessageListener<String, T> {
        private final KafkaConsumer<T> kafkaConsumer;

        public Listener(KafkaConsumer<T> consumer) {
            this.kafkaConsumer = consumer;
        }

        @Override
        public void onMessage(ConsumerRecord<String, T> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            log.info("group【{}】接收到来自topic【{}】的消息",kafkaConsumer.group(), data.topic());
            // 处理数据
            kafkaConsumer.process(data.value());
            // 提交offset
            log.info("group【{}】提交topic【{}】的offset",kafkaConsumer.group(),data.topic());
            consumer.commitAsync();
        }
    }
}