SpringCloudGateway基于Nacos实现动态路由优化

young 423 2022-06-11

之前已经通过Nacos进行了Gateway的动态路由处理(SpringCloud Gateway基于Nacos配置中心动态路由),同时抽取了Nacos的操作方法(NacosConfig操作),但是每次新增Gateway项目的时候,都还是需要将ApplicationEventPublisher重复去写,于是考虑将此部分功能也抽取出来。

common-nacos-listener

pom

<dependencies>
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <scope>provided</scope>
    </dependency>
</dependencies>

定义Nacos监听接口

定义两个接口,一个是纯监听时需要的接口,一个是需要初始化并且监听时需要的接口

import java.util.Properties;
import java.util.function.Consumer;

public interface NacosManualListener {

    String getDataId();

    String getGroup();

    /**
     * 为null时取NacosProperties
     */
    default Properties getProperties() {
        return null;
    }

    Consumer<String> refreshConsumer();

    default boolean dataLog() {
        return true;
    }
}
public interface NacosManualInitAndListener extends NacosManualListener {
    long DEFAULT_TIMEOUT = 5000L;

    default long getTimeout() {
        return DEFAULT_TIMEOUT;
    }

    Consumer<String> initConsumer();

}

增加Nacos处理类对接口的支持

处理NacosConfigListenerService,使其支持上述两个接口

@Slf4j
@ConditionalOnBean(NacosConfigProperties.class)
@Service
public class NacosConfigListenerService {
   private static final long DEFAULT_TIMEOUT = 5000L;

   @Autowired
   private NacosConfigProperties nacosConfigProperties;

   public void initAndAddListener(NacosManualInitAndListener config) throws NacosException {
      Properties properties = config.getProperties();
      if (properties == null) {
         properties = getNacosProperties();
      }
      initAndAddListener(properties, config.getDataId(), config.getGroup(), config.getTimeout(),
            config.initConsumer(), config.refreshConsumer(), config.dataLog());
   }

   public void addListener(NacosManualListener config) throws NacosException {
      Properties properties = config.getProperties();
      if (properties == null) {
         properties = getNacosProperties();
      }
      addListener(properties, config.getDataId(), config.getGroup(), config.refreshConsumer(), config.dataLog());
   }

   public void initAndAddListener(String dataId, String group, Consumer<String> consumer) throws NacosException {
      initAndAddListener(dataId, group, consumer, false);
   }

   public void initAndAddListener(String dataId, String group, Consumer<String> consumer, boolean dataLog)
         throws NacosException {
      initAndAddListener(dataId, group, consumer, consumer, dataLog);
   }

   public void initAndAddListener(String dataId, String group, Consumer<String> initConsumer,
         Consumer<String> listenerConsumer) throws NacosException {
      initAndAddListener(dataId, group, initConsumer, listenerConsumer, false);
   }

   public void initAndAddListener(String dataId, String group, Consumer<String> initConsumer,
         Consumer<String> listenerConsumer, boolean dataLog) throws NacosException {
      initAndAddListener(dataId, group, DEFAULT_TIMEOUT, initConsumer, listenerConsumer, dataLog);
   }

   public void initAndAddListener(Properties properties, String dataId, String group, Consumer<String> consumer)
         throws NacosException {
      initAndAddListener(properties, dataId, group, DEFAULT_TIMEOUT, consumer, consumer, false);
   }

   public void initAndAddListener(String dataId, String group, long timeout, Consumer<String> initConsumer,
         Consumer<String> listenerConsumer, boolean dataLog) throws NacosException {
      initAndAddListener(getNacosProperties(), dataId, group, timeout, initConsumer, listenerConsumer, dataLog);
   }

   public void initAndAddListener(Properties properties, String dataId, String group, long timeout,
         Consumer<String> initConsumer, Consumer<String> listenerConsumer, boolean dataLog) throws NacosException {
      String config = getConfig(properties, dataId, group, timeout);
      log.info("初始化数据,dataId:{},group:{}", dataId, group);
      if (dataLog) {
         initConsumer = logConsumer(dataId, group).andThen(initConsumer);
      }
      initConsumer.accept(config);
      addListener(properties, dataId, group, listenerConsumer, dataLog);
   }

   public ConfigService getConfigService(Properties properties) throws NacosException {
      return NacosFactory.createConfigService(properties);
   }

   public ConfigService getConfigService() throws NacosException {
      ConfigService configService = getConfigService(getNacosProperties());
      return configService;
   }

   public String getConfig(String dataId, String group) throws NacosException {
      return getConfig(dataId, group, DEFAULT_TIMEOUT);
   }

   public String getConfig(String dataId, String group, Long timeout) throws NacosException {
      return getConfig(getNacosProperties(), dataId, group, timeout);
   }

   public String getConfig(Properties properties, String dataId, String group, Long timeout) throws NacosException {
      checkDataIdAndGroupBlank(dataId, group);
      log.info("获取配置,dataId:{},group:{}", dataId, group);
      if (timeout <= 0) {
         timeout = DEFAULT_TIMEOUT;
      }
      String config = getConfigService(properties).getConfig(dataId, group, timeout);
      return config;
   }

   public <T> T getConfigAndParse(String dataId, String group, Function<String, T> parseFunction)
         throws NacosException {
      return getConfigAndParse(dataId, group, DEFAULT_TIMEOUT, parseFunction);
   }

   public <T> T getConfigAndParse(String dataId, String group, Long timeout, Function<String, T> parseFunction)
         throws NacosException {
      return getConfigAndParse(getNacosProperties(), dataId, group, timeout, parseFunction);
   }

   public <T> T getConfigAndParse(Properties properties, String dataId, String group, Long timeout,
         Function<String, T> parseFunction) throws NacosException {
      String config = getConfig(properties, dataId, group, timeout);
      return parseFunction.apply(config);
   }

   public void addListener(Properties properties, String dataId, String group, Listener listener)
         throws NacosException {
      log.info("添加dataId:{},group:{}的监听", dataId, group);
      getConfigService(properties).addListener(dataId, group, listener);
   }

   public void addListener(Properties properties, String dataId, String group, Consumer<String> consumer,
         boolean dataLog) throws NacosException {
      checkDataIdAndGroupBlank(dataId, group);
      log.info("添加dataId:{},group:{}的监听", dataId, group);
      if (dataLog) {
         consumer = logConsumer(dataId, group).andThen(consumer);
      }
      getConfigService(properties).addListener(dataId, group, new NacosListener(consumer));
   }

   public void addListener(String dataId, String group, Listener listener) throws NacosException {
      addListener(getNacosProperties(), dataId, group, listener);
   }

   public void addListener(String dataId, String group, Consumer<String> consumer) throws NacosException {
      addListener(getNacosProperties(), dataId, group, consumer, true);
   }

   public boolean pushConfig(Properties properties, String dataId, String group, String config) throws NacosException {
      log.info("向dataId:{},group:{}推送配置", dataId, group);
      return getConfigService(properties).publishConfig(dataId, group, config);
   }

   public boolean pushConfig(String dataId, String group, String config) throws NacosException {
      return pushConfig(getNacosProperties(), dataId, group, config);
   }

   public Properties getNacosProperties() {
      return nacosConfigProperties.assembleConfigServiceProperties();
   }

   private void checkDataIdAndGroupBlank(String dataId, String group) {
      if (StringUtils.isEmpty(dataId) || StringUtils.isEmpty(group)) {
         throw new RuntimeException("dataId or group is blank");
      }
   }

   private Consumer<String> logConsumer(String dataId, String group) {
      return conf -> log.info("获取到dataId:{},group:{}的数据 config:{}", dataId, group, conf);
   }

   public static class NacosListener implements Listener {

      private final Consumer<String> doReceiveConfigInfoConsumer;

      public NacosListener(Consumer<String> doReceiveConfigInfoConsumer) {
         this.doReceiveConfigInfoConsumer = doReceiveConfigInfoConsumer;
      }

      @Override
      public Executor getExecutor() {
         return null;
      }

      @Override
      public void receiveConfigInfo(String config) {
         doReceiveConfigInfoConsumer.accept(config);
      }
   }
}

创建自动触发监听操作的处理类

import com.alibaba.nacos.api.exception.NacosException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Collections;
import java.util.List;

@Slf4j
@ConditionalOnBean({NacosConfigListenerService.class})
@Service
public class ManualListenerProcessor {
    // 兼容老项目处理,不强制通过实现接口进行nacos初始化操作
    // 通过Spring注入所有NacosManualListener接口的实现类,如果没有相关实现的时候,list为空
  	// required = false,避免没有实现类的时候Spring抛出异常
   @Autowired(required = false)
   private List<NacosManualListener> nacosManualListenerList = Collections.emptyList();
   @Autowired
   private NacosConfigListenerService nacosConfigListenerService;

   @PostConstruct
   public void initAndAddListener() {
      if (nacosManualListenerList.isEmpty()) {
         return;
      }
      for (NacosManualListener listenerService : nacosManualListenerList) {
         try {
           	  // 如果是NacosManualInitAndListener接口的实现类,则调用initAndAddListener方法
             if (listenerService instanceof NacosManualInitAndListener){
                    nacosConfigListenerService.initAndAddListener((NacosManualInitAndListener) listenerService);
                }else {
               	// 否则调用addListener方法
                    nacosConfigListenerService.addListener(listenerService);
                }
         } catch (NacosException e) {
            log.error("手动添加监听失败", e);
            throw new RuntimeException("手动添加监听失败", e);
         }
      }
   }
}

common-gateway

pom

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
    <scope>provided</scope>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>xxx.xxx.xxx</groupId>
    <artifactId>common-nacos-listener</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-gateway</artifactId>
    <scope>provided</scope>
    <optional>true</optional>
</dependency>

创建动态路由的抽象类

@Slf4j
public abstract class AbstractDynamicRoute implements ApplicationEventPublisherAware {
   private ApplicationEventPublisher publisher;

   @Autowired
   private RouteDefinitionLocator routeDefinitionLocator;

   @Autowired
   private RouteDefinitionWriter routeDefinitionWriter;

   /**
    * 更新路由
    */
   public final String addOrModifyRoute(RouteDefinition definition) {
      try {
         routeDefinitionWriter.save(Mono.just(definition)).subscribe();
         this.publisher.publishEvent(new RefreshRoutesEvent(this));
         return "success";
      } catch (Exception e) {
         log.error("update route fail", e);
         return "update route fail";
      }
   }

   /**
    * 删除路由
    */
   public final String deleteRoute(String id) {
      try {
         log.info("delete route : {}", id);
         this.routeDefinitionWriter.delete(Mono.just(id)).subscribe();
         this.publisher.publishEvent(new RefreshRoutesEvent(this));
         return "delete success";
      } catch (Exception e) {
         log.error("delete fail", e);
         return "delete fail";
      }

   }

   @Override
   public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
      this.publisher = applicationEventPublisher;
   }

   protected abstract void initConfigAndListenerConfigRefresh();

   /**
    * 获取当前注册的路由信息
    */
   protected final List<RouteDefinition> getRunningRouteInfo() {
      List<RouteDefinition> runningDefinitionList = Optional.ofNullable(routeDefinitionLocator.getRouteDefinitions())
            .map(Flux::collectList).map(Mono::block).orElse(Collections.emptyList());
      return runningDefinitionList;
   }
}

创建基于Nacos的动态路由处理类

@Slf4j
public abstract class AbstractDynamicRouteByNacos extends AbstractDynamicRoute implements NacosManualInitAndListener {

   @Autowired
   protected NacosConfigProperties nacosConfigProperties;
   @Autowired
   protected Environment environment;

   @Autowired
   protected NacosConfigListenerService nacosConfigListenerService;
   @Autowired
   protected ObjectMapper objectMapper;

   /**
    * 刷新网关操作
    *
    * @return
    */
   @Override
   public Consumer<String> refreshConsumer() {
      return conf -> {
         log.info("refresh gateway dynamic route begin ====================>>");
         List<RouteDefinition> runningDefinitionList = getRunningRouteInfo();
         List<RouteDefinition> definitionList = getRouteDefinitions(conf);
         saveOrUpdateRoute(definitionList);
         removeRoute(runningDefinitionList, definitionList);
         log.info("refresh gateway dynamic route end <<====================");
      };
   }

   /**
    * 初始化网关操作
    *
    * @return
    */
   @Override
   public Consumer<String> initConsumer() {
      return conf -> {
         log.info("init gateway dynamic route begin====================>>");
         List<RouteDefinition> definitionList = getRouteDefinitions(conf);
         saveOrUpdateRoute(definitionList);
         log.info("init gateway dynamic route end <<====================");
      };
   }

   @Override
   public boolean dataLog() {
      return true;
   }

   @Override
   public void initConfigAndListenerConfigRefresh() {
      try {
         nacosConfigListenerService.initAndAddListener(getProperties(), getDataId(), getGroup(), getTimeout(),
               initConsumer(), refreshConsumer(), dataLog());
      } catch (NacosException e) {
         log.error("添加监听失败", e);
      }
   }

   protected void modifyMultiArgs(RouteDefinition definition) {
      List<PredicateDefinition> predicates = definition.getPredicates();
      for (PredicateDefinition predicate : predicates) {
         Map<String, String> predicateArgs = predicate.getArgs();
         if (predicateArgs == null || predicateArgs.size() != 1) {
            continue;
         }
         String[] args = StringUtils.tokenizeToStringArray(predicateArgs.values().iterator().next(), ",");
         predicateArgs.clear();
         for (int i = 0; i < args.length; i++) {
            predicateArgs.put(NameUtils.generateName(i), args[i]);
         }
      }
   }

   protected void removeRoute(List<RouteDefinition> runningDefinitionList, List<RouteDefinition> definitionList) {
      List<String> newRunningDefinitionIds =
            definitionList.stream().map(RouteDefinition::getId).collect(Collectors.toList());
      runningDefinitionList.stream().map(RouteDefinition::getId)
            .filter(routId -> !newRunningDefinitionIds.contains(routId)).forEach(this::deleteRoute);
   }

   protected void saveOrUpdateRoute(List<RouteDefinition> definitionList) {
      for (RouteDefinition definition : definitionList) {
         modifyMultiArgs(definition);
         log.info("update route : {}", definition);
         addOrModifyRoute(definition);
      }
   }

   protected List<RouteDefinition> getRouteDefinitions(String conf) {
      List<RouteDefinition> definitionList = null;
      try {
         definitionList = objectMapper.readValue(conf, new TypeReference<List<RouteDefinition>>() {});
      } catch (JsonProcessingException e) {
         log.error("反序列化路由失败",e);
      }
      return definitionList;
   }
}

创建基于Nacos的动态路由默认实现

public class DefaultDynamicRouteByNacos extends AbstractDynamicRouteByNacos {
	
   @Override
   public String getDataId() {
      return environment.getProperty("nacos.gateway.route.config.data-id");
   }

   @Override
   public String getGroup() {
      return environment.getProperty("nacos.gateway.route.config.group");
   }

   @Override
   public Properties getProperties() {
      return nacosConfigListenerService.getNacosProperties();
   }

}

注入默认实现

@Configuration
public class DynamicRouteConditionConfig {

	 // 如果有AbstractDynamicRoute的其他实现,则该默认实现不会注入
   @Bean
   @ConditionalOnMissingBean(value = {AbstractDynamicRoute.class})
   @ConditionalOnBean(value = {NacosConfigProperties.class, NacosConfigListenerService.class})
   @ConditionalOnClass(value = {NacosConfigAutoConfiguration.class})
   public DefaultDynamicRouteByNacos defaultDynamicRouteByNacos() {
      return new DefaultDynamicRouteByNacos();
   }
}

如此一来,新的Gateway项目只需要引入相关依赖,实现NacosManualInitAndListener接口,配置相关配置即可,如果动态路由配置的data-id与group-id与默认的一致,可以不写相关代码即可使gateway项目具有动态路由的功能。

pom中的依赖的scope都用了provide,所以需要引入的项目还需要显式的引入相关的依赖。

同时,对于其他需要进行手动监听的的项目,引入common-nacos-listener,也可以更方便的去处理监听。