声明自定义接口,用于定义监听的topic,group,以及数据处理逻辑
public interface KafkaConsumer<T> {
/**
* 获取topic集合
*/
String[] topics();
/**
* 获取监听group
*/
String group();
/**
* 处理数据
* @param data 从kafka拉取的数据
*/
void process(T data);
/**
* 配置容器
* @param container 容器实例
*/
default void deployContainer(ConcurrentMessageListenerContainer<String, T> container){
}
}
通过Spring容器管理所有自定义接口的实现,并通过ConcurrentKafkaListenerContainerFactory
创建ConcurrentMessageListenerContainer
容器,对容器进行配置,如果在Spring Boot
的配置文件中进行了配置,那么会在创建容器时传入。
创建容器之后,将容器启动即可。
Spring提供了8个用于监听的接口,可通过官网进行查看,选择自己需要的接口实现。
@ConditionalOnClass(value = {ConcurrentKafkaListenerContainerFactory.class, KafkaProperties.class})
@EnableKafka
@Configuration
public class KafkaConsumerListenerConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumerListenerConfig.class);
public <T> KafkaConsumerListenerConfig(ConcurrentKafkaListenerContainerFactory<String, T> containerFactory,
List<KafkaConsumer<T>> consumers) {
log.info("======>>初始化kafka监听");
registryListener(containerFactory, consumers);
log.info("<<======初始化kafka监听");
}
/**
* 注册监听器
*/
private <T> void registryListener(ConcurrentKafkaListenerContainerFactory<String, T> factory,
List<KafkaConsumer<T>> consumers) {
if (CollectionUtils.isEmpty(consumers)) {
log.info("无需要初始化的监听");
return;
}
for (KafkaConsumer<T> consumer : consumers) {
log.info("初始化监听:topic:【{}】,group:【{}】", Arrays.asList(consumer.topics()), consumer.group());
checkConsumer(consumer);
ConcurrentMessageListenerContainer<String, T> container = createListenerContainer(factory, consumer);
container.start();
}
}
private <T> ConcurrentMessageListenerContainer<String, T> createListenerContainer(ConcurrentKafkaListenerContainerFactory<String, T> factory,
KafkaConsumer<T> consumer) {
ConcurrentMessageListenerContainer<String, T> container = factory.createContainer(consumer.topics());
container.setBeanName(consumer.group() + "-" + String.join("-", consumer.topics()));
container.setConcurrency(3);
consumer.deployContainer(container);
// 防止被修改的配置
ContainerProperties containerProperties = container.getContainerProperties();
containerProperties.setMessageListener(new Listener<>(consumer));
containerProperties.setAsyncAcks(false);
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
containerProperties.setGroupId(consumer.group());
return container;
}
/**
* 检查监听配置
*/
private <T> void checkConsumer(KafkaConsumer<T> consumer) {
Class<? extends KafkaConsumer> clazz = consumer.getClass();
String[] topics = consumer.topics();
Assert.notEmpty(topics, clazz + " topics method return is empty!");
Assert.noNullElements(topics, clazz + " topics method return has null element!");
String group = consumer.group();
Assert.hasText(group, clazz + " group method return is blank!");
}
/**
* 定义监听
*/
private static class Listener<T> implements AcknowledgingConsumerAwareMessageListener<String, T> {
private final KafkaConsumer<T> kafkaConsumer;
public Listener(KafkaConsumer<T> consumer) {
this.kafkaConsumer = consumer;
}
@Override
public void onMessage(ConsumerRecord<String, T> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
log.info("group【{}】接收到来自topic【{}】的消息",kafkaConsumer.group(), data.topic());
// 处理数据
kafkaConsumer.process(data.value());
// 提交offset
log.info("group【{}】提交topic【{}】的offset",kafkaConsumer.group(),data.topic());
consumer.commitAsync();
}
}
}