使用Nacos做注册中心,会遇到如html,json,txt这类数据的监听,或者需要对数据进行特殊处理的情况,此时需要采用通过手动获取配置中心的数据初始化并且添加监听
手动添加Nacos监听
public void addNacosListener(String dataId, String group) {
try {
Properties properties = new Properties();
// gatewayConfig为配置的需要监听的参数
// nacos地址
properties.setProperty("serverAddr", gatewayConfig.getNacosServerAddr());
// 工作空间
properties.setProperty("namespace", gatewayConfig.getNacosNamespace());
// 访问密码(如果开启权限校验需要)
properties.setProperty("password", gatewayConfig.getPassword());
// 访问账户(如果开启权限校验需要)
properties.setProperty("username", gatewayConfig.getUserName());
ConfigService configService = NacosFactory.createConfigService(properties);
// 从远程获取配置
String configInfo = configService.getConfig(dataId, group, 5000);
log.info("获取dataId:{},group:{}配置: {}", dataId, group,configInfo );
// 初始化操作
init(configInfo);
// 添加监听
configService.addListener(dataId, group, new Listener() {
// 配置更新操作
@Override
public void receiveConfigInfo(String configInfo) {
log.info("获取dataId:{},group:{} 更新配置: {}", dataId, group,configInfo );
// 更新数据操作
doRefreshProcess(configInfo);
}
@Override
public Executor getExecutor() {
return null;
}
});
} catch (NacosException e) {
throw new RuntimeException("Nacos手动监听异常",e)
}
}
通过此方法便可以进行gateway的动态路由操作已经一些配置的的个性化处理
重构
然而经常进行手动监听配置时,发现目前的写法过于臃肿,重复的东西过多,遂考虑对这类方法进行重构。
此操作分为三步
- 添加Nacos的连接配置
- 从指定的dataId和group获取配置进行初始化
- 从监听的dataId和group中获取更新的配置进行配置刷新
分析
项目中使用的是spring-cloud-alibaba-nacos-config,自动装配时的配置都存在NacosConfigProperties
中,故可以采用NacosConfigProperties获取Nacos的连接配置,同时也需要提供与当前配置不同的连接配置。
配置初始化操作和配置刷新操作,均为一个参数类型为String,无返回值的方法,与Java8的Consumer相同
重构
private static final long DEFAULT_TIMEOUT = 5000L;
@Autowired
private NacosConfigProperties nacosConfigProperties;
public void initAndAddListener(String dataId,String group,Consumer<String> init,Consumer<String> refresh) throws NacosException{
Properties properties = nacosConfigProperties.assembleConfigServiceProperties();
ConfigService configService = NacosFactory.createConfigService(properties);
String config = configService.getConfig(dataId, group, DEFAULT_TIMEOUT);
init.accept(config);
configService.addListener(dataId, group, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configInfo) {
refresh.accept(config);
}
});
}
此时将初始化操作和刷新缓存操作,封装成一个Consumer,传进来即可
提炼工具类
再将此方法进行封装,提成一个公共的工具类,供其他系统使用
@Slf4j
@ConditionalOnBean(NacosConfigProperties.class)
@Service
public class NacosConfigListenerService {
private static final long DEFAULT_TIMEOUT = 5000L;
@Autowired
private NacosConfigProperties nacosConfigProperties;
public void initAndAddListener(String dataId, String group, Consumer<String> consumer) throws NacosException {
initAndAddListener(dataId, group, consumer, false);
}
public void initAndAddListener(String dataId, String group, Consumer<String> consumer, boolean dataLog)
throws NacosException {
initAndAddListener(dataId, group, consumer, consumer, dataLog);
}
public void initAndAddListener(String dataId, String group, Consumer<String> initConsumer,
Consumer<String> listenerConsumer) throws NacosException {
initAndAddListener(dataId, group, initConsumer, listenerConsumer, false);
}
public void initAndAddListener(String dataId, String group, Consumer<String> initConsumer,
Consumer<String> listenerConsumer, boolean dataLog) throws NacosException {
initAndAddListener(dataId, group, DEFAULT_TIMEOUT, initConsumer, listenerConsumer, dataLog);
}
public void initAndAddListener(Properties properties, String dataId, String group, Consumer<String> consumer)
throws NacosException {
initAndAddListener(properties, dataId, group, DEFAULT_TIMEOUT, consumer, consumer, false);
}
public void initAndAddListener(String dataId, String group, long timeout, Consumer<String> initConsumer,
Consumer<String> listenerConsumer, boolean dataLog) throws NacosException {
initAndAddListener(getNacosProperties(), dataId, group, timeout, initConsumer, listenerConsumer, dataLog);
}
public void initAndAddListener(Properties properties, String dataId, String group, long timeout,
Consumer<String> initConsumer, Consumer<String> listenerConsumer, boolean dataLog) throws NacosException {
String config = getConfig(properties, dataId, group, timeout);
log.info("初始化数据,dataId:{},group:{}", dataId, group);
if (dataLog) {
initConsumer = logConsumer(dataId, group).andThen(initConsumer);
}
initConsumer.accept(config);
addListener(properties, dataId, group, listenerConsumer, dataLog);
}
public ConfigService getConfigService(Properties properties) throws NacosException {
return NacosFactory.createConfigService(properties);
}
public ConfigService getConfigService() throws NacosException {
return getConfigService(getNacosProperties());
}
public String getConfig(String dataId, String group) throws NacosException {
return getConfig(dataId, group, DEFAULT_TIMEOUT);
}
public String getConfig(String dataId, String group, Long timeout) throws NacosException {
return getConfig(getNacosProperties(), dataId, group, timeout);
}
public String getConfig(Properties properties, String dataId, String group, Long timeout) throws NacosException {
checkDataIdAndGroupBlank(dataId, group);
log.info("获取配置,dataId:{},group:{}", dataId, group);
if (timeout <= 0) {
timeout = DEFAULT_TIMEOUT;
}
return getConfigService(properties).getConfig(dataId, group, timeout);
}
public <T> T getConfigAndParse(String dataId, String group, Function<String, T> parseFunction)
throws NacosException {
return getConfigAndParse(dataId, group, DEFAULT_TIMEOUT, parseFunction);
}
public <T> T getConfigAndParse(String dataId, String group, Long timeout, Function<String, T> parseFunction)
throws NacosException {
return getConfigAndParse(getNacosProperties(), dataId, group, timeout, parseFunction);
}
public <T> T getConfigAndParse(Properties properties, String dataId, String group, Long timeout,
Function<String, T> parseFunction) throws NacosException {
String config = getConfig(properties, dataId, group, timeout);
return parseFunction.apply(config);
}
public void addListener(Properties properties, String dataId, String group, Listener listener)
throws NacosException {
log.info("添加dataId:{},group:{}的监听", dataId, group);
getConfigService(properties).addListener(dataId, group, listener);
}
public void addListener(Properties properties, String dataId, String group, Consumer<String> consumer,
boolean dataLog) throws NacosException {
checkDataIdAndGroupBlank(dataId, group);
log.info("添加dataId:{},group:{}的监听", dataId, group);
if (dataLog) {
consumer = logConsumer(dataId, group).andThen(consumer);
}
getConfigService(properties).addListener(dataId, group, new NacosListener(consumer));
}
public void addListener(String dataId, String group, Listener listener) throws NacosException {
addListener(getNacosProperties(), dataId, group, listener);
}
public void addListener(String dataId, String group, Consumer<String> consumer) throws NacosException {
addListener(getNacosProperties(), dataId, group, consumer, true);
}
public boolean pushConfig(Properties properties, String dataId, String group, String config) throws NacosException {
log.info("向dataId:{},group:{}推送配置", dataId, group);
return getConfigService(properties).publishConfig(dataId, group, config);
}
public boolean pushConfig(String dataId, String group, String config) throws NacosException {
return pushConfig(getNacosProperties(), dataId, group, config);
}
public Properties getNacosProperties() {
return nacosConfigProperties.assembleConfigServiceProperties();
}
private void checkDataIdAndGroupBlank(String dataId, String group) {
if (StringUtils.isEmpty(dataId) || StringUtils.isEmpty(group)) {
throw new RuntimeException("dataId or group is blank");
}
}
private Consumer<String> logConsumer(String dataId, String group) {
return conf -> log.info("获取到dataId:{},group:{}的数据 config:{}", dataId, group, conf);
}
public static class NacosListener implements Listener {
private final Consumer<String> doReceiveConfigInfoConsumer;
public NacosListener(Consumer<String> doReceiveConfigInfoConsumer) {
this.doReceiveConfigInfoConsumer = doReceiveConfigInfoConsumer;
}
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String config) {
doReceiveConfigInfoConsumer.accept(config);
}
}
}