声明自定义接口,用于定义监听的topic,group,以及数据处理逻辑
public interface KafkaConsumer<T> { /** * 获取topic集合 */ String[] topics(); /** * 获取监听group */ String group(); /** * 处理数据 * @param data 从kafka拉取的数据 */ void process(T data); /** * 配置容器 * @param container 容器实例 */ default void deployContainer(ConcurrentMessageListenerContainer<String, T> container){ } }
通过Spring容器管理所有自定义接口的实现,并通过ConcurrentKafkaListenerContainerFactory
创建ConcurrentMessageListenerContainer
容器,对容器进行配置,如果在Spring Boot
的配置文件中进行了配置,那么会在创建容器时传入。
创建容器之后,将容器启动即可。
Spring提供了8个用于监听的接口,可通过官网进行查看,选择自己需要的接口实现。
@ConditionalOnClass(value = {ConcurrentKafkaListenerContainerFactory.class, KafkaProperties.class}) @EnableKafka @Configuration public class KafkaConsumerListenerConfig { private static final Logger log = LoggerFactory.getLogger(KafkaConsumerListenerConfig.class); public <T> KafkaConsumerListenerConfig(ConcurrentKafkaListenerContainerFactory<String, T> containerFactory, List<KafkaConsumer<T>> consumers) { log.info("======>>初始化kafka监听"); registryListener(containerFactory, consumers); log.info("<<======初始化kafka监听"); } /** * 注册监听器 */ private <T> void registryListener(ConcurrentKafkaListenerContainerFactory<String, T> factory, List<KafkaConsumer<T>> consumers) { if (CollectionUtils.isEmpty(consumers)) { log.info("无需要初始化的监听"); return; } for (KafkaConsumer<T> consumer : consumers) { log.info("初始化监听:topic:【{}】,group:【{}】", Arrays.asList(consumer.topics()), consumer.group()); checkConsumer(consumer); ConcurrentMessageListenerContainer<String, T> container = createListenerContainer(factory, consumer); container.start(); } } private <T> ConcurrentMessageListenerContainer<String, T> createListenerContainer(ConcurrentKafkaListenerContainerFactory<String, T> factory, KafkaConsumer<T> consumer) { ConcurrentMessageListenerContainer<String, T> container = factory.createContainer(consumer.topics()); container.setBeanName(consumer.group() + "-" + String.join("-", consumer.topics())); container.setConcurrency(3); consumer.deployContainer(container); // 防止被修改的配置 ContainerProperties containerProperties = container.getContainerProperties(); containerProperties.setMessageListener(new Listener<>(consumer)); containerProperties.setAsyncAcks(false); containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); containerProperties.setGroupId(consumer.group()); return container; } /** * 检查监听配置 */ private <T> void checkConsumer(KafkaConsumer<T> consumer) { Class<? extends KafkaConsumer> clazz = consumer.getClass(); String[] topics = consumer.topics(); Assert.notEmpty(topics, clazz + " topics method return is empty!"); Assert.noNullElements(topics, clazz + " topics method return has null element!"); String group = consumer.group(); Assert.hasText(group, clazz + " group method return is blank!"); } /** * 定义监听 */ private static class Listener<T> implements AcknowledgingConsumerAwareMessageListener<String, T> { private final KafkaConsumer<T> kafkaConsumer; public Listener(KafkaConsumer<T> consumer) { this.kafkaConsumer = consumer; } @Override public void onMessage(ConsumerRecord<String, T> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) { log.info("group【{}】接收到来自topic【{}】的消息",kafkaConsumer.group(), data.topic()); // 处理数据 kafkaConsumer.process(data.value()); // 提交offset log.info("group【{}】提交topic【{}】的offset",kafkaConsumer.group(),data.topic()); consumer.commitAsync(); } } }