NacosConfig操作

young 841 2021-10-17

使用Nacos做注册中心,会遇到如html,json,txt这类数据的监听,或者需要对数据进行特殊处理的情况,此时需要采用通过手动获取配置中心的数据初始化并且添加监听

手动添加Nacos监听

public void addNacosListener(String dataId, String group) {
		try {
			Properties properties = new Properties();
			// gatewayConfig为配置的需要监听的参数
			// nacos地址
			properties.setProperty("serverAddr", gatewayConfig.getNacosServerAddr());
			// 工作空间
			properties.setProperty("namespace", gatewayConfig.getNacosNamespace());
			// 访问密码(如果开启权限校验需要)
			properties.setProperty("password", gatewayConfig.getPassword());
			// 访问账户(如果开启权限校验需要)
			properties.setProperty("username", gatewayConfig.getUserName());
			ConfigService configService = NacosFactory.createConfigService(properties);
			// 从远程获取配置
			String configInfo = configService.getConfig(dataId, group, 5000);
			log.info("获取dataId:{},group:{}配置: {}", dataId, group,configInfo );
			// 初始化操作
			init(configInfo);
			// 添加监听
			configService.addListener(dataId, group, new Listener() {
				// 配置更新操作
				@Override
				public void receiveConfigInfo(String configInfo) {
					log.info("获取dataId:{},group:{} 更新配置: {}", dataId, group,configInfo );
					// 更新数据操作
					doRefreshProcess(configInfo);
				}

				@Override
				public Executor getExecutor() {
					return null;
				}

			});
		} catch (NacosException e) {
			throw new RuntimeException("Nacos手动监听异常",e)
		}
	}

通过此方法便可以进行gateway的动态路由操作已经一些配置的的个性化处理

重构

然而经常进行手动监听配置时,发现目前的写法过于臃肿,重复的东西过多,遂考虑对这类方法进行重构。

此操作分为三步

  1. 添加Nacos的连接配置
  2. 从指定的dataId和group获取配置进行初始化
  3. 从监听的dataId和group中获取更新的配置进行配置刷新

分析
项目中使用的是spring-cloud-alibaba-nacos-config,自动装配时的配置都存在NacosConfigProperties中,故可以采用NacosConfigProperties获取Nacos的连接配置,同时也需要提供与当前配置不同的连接配置。

配置初始化操作和配置刷新操作,均为一个参数类型为String,无返回值的方法,与Java8的Consumer相同

重构

private static final long DEFAULT_TIMEOUT = 5000L;

	@Autowired
	private NacosConfigProperties nacosConfigProperties;
	
	public void initAndAddListener(String dataId,String group,Consumer<String> init,Consumer<String> refresh) throws NacosException{
        Properties properties = nacosConfigProperties.assembleConfigServiceProperties();
        ConfigService configService = NacosFactory.createConfigService(properties);
        String config = configService.getConfig(dataId, group, DEFAULT_TIMEOUT);
        init.accept(config);
        configService.addListener(dataId, group, new Listener() {
            @Override
            public Executor getExecutor() {
                return null;
            }

            @Override
            public void receiveConfigInfo(String configInfo) {
                refresh.accept(config);
            }
        });
    }

此时将初始化操作和刷新缓存操作,封装成一个Consumer,传进来即可

提炼工具类

再将此方法进行封装,提成一个公共的工具类,供其他系统使用

@Slf4j
@ConditionalOnBean(NacosConfigProperties.class)
@Service
public class NacosConfigListenerService {
	private static final long DEFAULT_TIMEOUT = 5000L;

	@Autowired
	private NacosConfigProperties nacosConfigProperties;

	public void initAndAddListener(String dataId, String group, Consumer<String> consumer) throws NacosException {
		initAndAddListener(dataId, group, consumer, false);
	}

	public void initAndAddListener(String dataId, String group, Consumer<String> consumer, boolean dataLog)
			throws NacosException {
		initAndAddListener(dataId, group, consumer, consumer, dataLog);
	}

	public void initAndAddListener(String dataId, String group, Consumer<String> initConsumer,
			Consumer<String> listenerConsumer) throws NacosException {
		initAndAddListener(dataId, group, initConsumer, listenerConsumer, false);
	}

	public void initAndAddListener(String dataId, String group, Consumer<String> initConsumer,
			Consumer<String> listenerConsumer, boolean dataLog) throws NacosException {
		initAndAddListener(dataId, group, DEFAULT_TIMEOUT, initConsumer, listenerConsumer, dataLog);
	}

	public void initAndAddListener(Properties properties, String dataId, String group, Consumer<String> consumer)
			throws NacosException {
		initAndAddListener(properties, dataId, group, DEFAULT_TIMEOUT, consumer, consumer, false);
	}

	public void initAndAddListener(String dataId, String group, long timeout, Consumer<String> initConsumer,
			Consumer<String> listenerConsumer, boolean dataLog) throws NacosException {
		initAndAddListener(getNacosProperties(), dataId, group, timeout, initConsumer, listenerConsumer, dataLog);
	}

	public void initAndAddListener(Properties properties, String dataId, String group, long timeout,
			Consumer<String> initConsumer, Consumer<String> listenerConsumer, boolean dataLog) throws NacosException {
		String config = getConfig(properties, dataId, group, timeout);
		log.info("初始化数据,dataId:{},group:{}", dataId, group);
		if (dataLog) {
			initConsumer = logConsumer(dataId, group).andThen(initConsumer);
		}
		initConsumer.accept(config);
		addListener(properties, dataId, group, listenerConsumer, dataLog);
	}

	public ConfigService getConfigService(Properties properties) throws NacosException {
		return NacosFactory.createConfigService(properties);
	}

	public ConfigService getConfigService() throws NacosException {
        return getConfigService(getNacosProperties());
	}

	public String getConfig(String dataId, String group) throws NacosException {
		return getConfig(dataId, group, DEFAULT_TIMEOUT);
	}

	public String getConfig(String dataId, String group, Long timeout) throws NacosException {
		return getConfig(getNacosProperties(), dataId, group, timeout);
	}

	public String getConfig(Properties properties, String dataId, String group, Long timeout) throws NacosException {
		checkDataIdAndGroupBlank(dataId, group);
		log.info("获取配置,dataId:{},group:{}", dataId, group);
		if (timeout <= 0) {
			timeout = DEFAULT_TIMEOUT;
		}
        return getConfigService(properties).getConfig(dataId, group, timeout);
	}

	public <T> T getConfigAndParse(String dataId, String group, Function<String, T> parseFunction)
			throws NacosException {
		return getConfigAndParse(dataId, group, DEFAULT_TIMEOUT, parseFunction);
	}

	public <T> T getConfigAndParse(String dataId, String group, Long timeout, Function<String, T> parseFunction)
			throws NacosException {
		return getConfigAndParse(getNacosProperties(), dataId, group, timeout, parseFunction);
	}

	public <T> T getConfigAndParse(Properties properties, String dataId, String group, Long timeout,
			Function<String, T> parseFunction) throws NacosException {
		String config = getConfig(properties, dataId, group, timeout);
		return parseFunction.apply(config);
	}

	public void addListener(Properties properties, String dataId, String group, Listener listener)
			throws NacosException {
		log.info("添加dataId:{},group:{}的监听", dataId, group);
		getConfigService(properties).addListener(dataId, group, listener);
	}

	public void addListener(Properties properties, String dataId, String group, Consumer<String> consumer,
			boolean dataLog) throws NacosException {
		checkDataIdAndGroupBlank(dataId, group);
		log.info("添加dataId:{},group:{}的监听", dataId, group);
		if (dataLog) {
			consumer = logConsumer(dataId, group).andThen(consumer);
		}
		getConfigService(properties).addListener(dataId, group, new NacosListener(consumer));
	}

	public void addListener(String dataId, String group, Listener listener) throws NacosException {
		addListener(getNacosProperties(), dataId, group, listener);
	}

	public void addListener(String dataId, String group, Consumer<String> consumer) throws NacosException {
		addListener(getNacosProperties(), dataId, group, consumer, true);
	}

	public boolean pushConfig(Properties properties, String dataId, String group, String config) throws NacosException {
		log.info("向dataId:{},group:{}推送配置", dataId, group);
		return getConfigService(properties).publishConfig(dataId, group, config);
	}

	public boolean pushConfig(String dataId, String group, String config) throws NacosException {
		return pushConfig(getNacosProperties(), dataId, group, config);
	}

	public Properties getNacosProperties() {
		return nacosConfigProperties.assembleConfigServiceProperties();
	}

	private void checkDataIdAndGroupBlank(String dataId, String group) {
		if (StringUtils.isEmpty(dataId) || StringUtils.isEmpty(group)) {
			throw new RuntimeException("dataId or group is blank");
		}
	}

	private Consumer<String> logConsumer(String dataId, String group) {
		return conf -> log.info("获取到dataId:{},group:{}的数据 config:{}", dataId, group, conf);
	}

	public static class NacosListener implements Listener {

		private final Consumer<String> doReceiveConfigInfoConsumer;

		public NacosListener(Consumer<String> doReceiveConfigInfoConsumer) {
			this.doReceiveConfigInfoConsumer = doReceiveConfigInfoConsumer;
		}

		@Override
		public Executor getExecutor() {
			return null;
		}

		@Override
		public void receiveConfigInfo(String config) {
			doReceiveConfigInfoConsumer.accept(config);
		}
	}
}