深入理解SpringCloud一(3)Nacos配置中心
一、应用启动的时候如何从配置中心拉取配置文件
通过上一节的介绍,我们已经知道了配置加载的扩展点。下面我们已具体的Nacos配置中心来进行说明。
NacosConfigBootstrapConfiguration是@BootstrapConfiguration的配置类,在bootstrap 的SpringApplication创建的过程中,会加载这个类。这个Configuration类包括两个Bean,分别是NacosConfigManager,NacosPropertySourceLocator。
NacosConfigManager的核心作用是创建NacosConfigService,通过NacosConfigService从远程配置中心拉取配置。private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException { group = null2defaultGroup(group); ParamUtils.checkKeyParam(dataId, group); ConfigResponse cr = new ConfigResponse(); cr.setDataId(dataId); cr.setTenant(tenant); cr.setGroup(group); //省略一些代码 try { String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs); cr.setContent(ct[0]); configFilterChainManager.doFilter(null, cr); content = cr.getContent(); return content; } catch (NacosException ioe) { } content = cr.getContent(); return content; }
NacosPropertySourceLocator是PropertySourceLocator的实现类,上一节已经详细介绍过PropertySourceLocator。@Override public PropertySource<?> locate(Environment env) { nacosConfigProperties.setEnvironment(env); ConfigService configService = nacosConfigManager.getConfigService(); //省略一些代码 nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, timeout); CompositePropertySource composite = new CompositePropertySource( NACOS_PROPERTY_SOURCE_NAME); loadSharedConfiguration(composite); loadExtConfiguration(composite); loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env); return composite; }二、配置中心的配置变动时,如何通知到应用
NacosConfigAutoConfiguration是@EnableAutoConfiguration的配置类,当application创建SpringApplication的过程中会被加载,会加载两个重要的Bean,NacosContextRefresher和NacosConfigManager,NacosConfigManager上面已经结束过。
NacosContextRefresher主要作用就是注册Nacos监听器。@Override public void onApplicationEvent(ApplicationReadyEvent event) { // many Spring context if (this.ready.compareAndSet(false, true)) { this.registerNacosListenersForApplications(); } }private void registerNacosListener(final String groupKey, final String dataKey) { String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey); Listener listener = listenerMap.computeIfAbsent(key, lst -> new AbstractSharedListener() { @Override public void innerReceive(String dataId, String group, String configInfo) { refreshCountIncrement(); nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo); // todo feature: support single refresh for listening //当配置文件变动时,会发布RefreshEvent事情 applicationContext.publishEvent( new RefreshEvent(this, null, "Refresh Nacos config")); if (log.isDebugEnabled()) { log.debug(String.format( "Refresh Nacos config group=%s,dataId=%s,configInfo=%s", group, dataId, configInfo)); } } }); try { //注册监听器 configService.addListener(dataKey, groupKey, listener); } catch (NacosException e) { } }
监听器又是如何触发的?
Nacos使用长轮询方式获取配置,判断文件是否有变动,如果有变动则触发监听器。
NacosConfigService创建的时候,会创建ClientWorker对象,同时会创建长轮询任务。public NacosConfigService(Properties properties) throws NacosException { //省略其他代码 this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); }public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { //初始化一个延迟任务,10毫秒后调用checkConfigInfo this.executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); }public void checkConfigInfo() { // Dispatch taskes. int listenerSize = cacheMap.get().size(); // Round up the longingTaskCount. int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // The task list is no order.So it maybe has issues when changing. executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }
我们继续往下看LongPollingRunnable @Override public void run() { //省略其他代码 // check server config 从服务器询问缓存的文件是否有变动 List changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); if (!CollectionUtils.isEmpty(changedGroupKeys)) { LOGGER.info("get changedGroupKeys:" + changedGroupKeys); } } }List checkUpdateDataIds(List cacheDatas, List inInitializingCacheList) throws Exception { StringBuilder sb = new StringBuilder(); for (CacheData cacheData : cacheDatas) { if (!cacheData.isUseLocalConfigInfo()) { sb.append(cacheData.dataId).append(WORD_SEPARATOR); sb.append(cacheData.group).append(WORD_SEPARATOR); if (StringUtils.isBlank(cacheData.tenant)) { sb.append(cacheData.getMd5()).append(LINE_SEPARATOR); } else { sb.append(cacheData.getMd5()).append(WORD_SEPARATOR); sb.append(cacheData.getTenant()).append(LINE_SEPARATOR); } if (cacheData.isInitializing()) { // It updates when cacheData occours in cacheMap by first time. inInitializingCacheList .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant)); } } } boolean isInitializingCacheList = !inInitializingCacheList.isEmpty(); return checkUpdateConfigStr(sb.toString(), isInitializingCacheList); }List checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception { Map params = new HashMap(2); params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); Map headers = new HashMap(2); headers.put("Long-Pulling-Timeout", "" + timeout); try { // In order to prevent the server from handling the delay of the client"s long task, // increase the client"s read timeout to avoid this problem. //询问服务器,配置是否有变动,这里客户端读超时时间需要设置长一点 long readTimeoutMs = timeout + (long) Math.round(timeout >> 1); HttpRestResult result = agent .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), readTimeoutMs); if (result.ok()) { setHealthServer(true); return parseUpdateDataIdResponse(result.getData()); } else { setHealthServer(false); } } catch (Exception e) { setHealthServer(false); } return Collections.emptyList(); }@Override public void run() { try { // check server config List changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); if (!CollectionUtils.isEmpty(changedGroupKeys)) { LOGGER.info("get changedGroupKeys:" + changedGroupKeys); } //如果服务端文件有变动,则从服务端重新拉取配置 for (String groupKey : changedGroupKeys) { try { String[] ct = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(ct[0]); if (null != ct[1]) { cache.setType(ct[1]); } } catch (NacosException ioe) { } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { //如果文件MD5不一致,则触发监听器 cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); } catch (Throwable e) { executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } }
如果文件MD5不一致,则触发监听器void checkListenerMd5() { for (ManagerListenerWrap wrap : listeners) { if (!md5.equals(wrap.lastCallMd5)) { safeNotifyListener(dataId, group, content, type, md5, wrap); } } } private void safeNotifyListener(final String dataId, final String group, final String content, final String type, final String md5, final ManagerListenerWrap listenerWrap) { final Listener listener = listenerWrap.listener; Runnable job = new Runnable() { @Override public void run() { try { ConfigResponse cr = new ConfigResponse(); cr.setDataId(dataId); cr.setGroup(group); cr.setContent(content); configFilterChainManager.doFilter(null, cr); String contentTmp = cr.getContent(); //回调监听器 listener.receiveConfigInfo(contentTmp); listenerWrap.lastCallMd5 = md5; } catch (NacosException ex) { } } }; try { if (null != listener.getExecutor()) { listener.getExecutor().execute(job); } else { job.run(); } } catch (Throwable t) { } }
最终实现回调public void innerReceive(String dataId, String group, String configInfo) { refreshCountIncrement(); nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo); // todo feature: support single refresh for listening //发布RefreshEvent事件 applicationContext.publishEvent( new RefreshEvent(this, null, "Refresh Nacos config")); if (log.isDebugEnabled()) { log.debug(String.format( "Refresh Nacos config group=%s,dataId=%s,configInfo=%s", group, dataId, configInfo)); } }三、总结
判断文件是否变动的核心是长轮询,客户端比较简单,只需要设置较长时间的读超时即可。后面我们会继续探究Nacos服务端的长轮询是如何时间的。
你们觉得国产手机哪个牌子最好?感谢您的阅读!你们觉得国产手机哪个牌子最好?国产手机的发展越来越让我们惊喜到,他们所带来的优势也在越来越强,我们在考虑一款手机性能的时候,其实更多的是考虑这个品牌能够给我们带来的价
第一次购买iPad平板究竟买哪一种划算?先看看目前在售的iPad型号,分别是12。9英寸iPadPro(第五代),11英寸iPadPro(第三代),10。9英寸iPadAir(第四代),10。2英寸iPad(第八代),7
币圈狗屎齐飞作者任娅斐来源盒饭财经(IDdaxiongfan)币圈一天,人间十年。狗狗币你听过,但你听过屎币猪币猫币蛇币皮卡丘蛤蟆币么?在狗狗币被马斯克带飞之后,数百种山寨币接连冒出,还迎来了
贾跃亭又出新PPT,售价超过100万的FF91要量产了?中国经济周刊记者吕江涛北京报道5月8日,FaradayFuture(法拉第未来)官方微博发布消息称,法拉第未来(下称FF公司)的首款预量产电动车FF91将在纽约与消费者见面,但具体
蔚来理想小鹏,都是宁德时代的打工仔造车的不如卖电池的。来源AI蓝媒汇IDlanmeih001作者关关编辑魏晓去年7月,资本市场有一场重头戏。高瓴资本重仓宁德时代,以每股161元的价格,认购了100亿元的定增。现在宁
理想ONE烧油的电动车能叫新能源吗?经过几轮的洗牌,国内的造车新势力能交出满意答卷的只剩下理想蔚来小鹏等寥寥几家。2020年,理想共交付了约3。26万台新车,虽然比起蔚来的4。37万台稍逊一筹,但比起小鹏的2。7万台
中国人为什么对印度的头号印象居然是邋遢印度,这几年在中国互联网上一直占据热门吐槽榜的前几位。其实作为印度的铁杆盟友,美国和日本也是中国互联网热门吐槽榜的常客,但至少美国和日本在上世纪80年代跟中国还有过昙花一现的蜜月期
美团小米接连退出,支付宝孤军奋战,9000万用户如何选择?对于募捐互助平台,大部分人想到的有水滴筹轻松筹相互宝这类平台。何为互助平台?简单来说,互助平台相当于一个面向大众,且门槛低的保险,其特点是以重疾和意外伤害为主。在老百姓发生重大疾病
如何用手机拍出更好看的月亮?想用手机拍好大月亮,可以借助建筑光影等周边环境构图拍。还可直接用外接镜头拍出天文大月亮。比如用手机外接长焦拍摄的圆月(利用手机双重曝光合成飞机)就是在家中窗台手机拍摄(如下图)借助
家里的旧音箱如何才能改成可以播放手机音乐的音箱?如果是多媒体电脑有源音箱,那么直接把音频输入线插在手机耳机输出插口即可把手机音频输出放大。如果手机上面没有3。5mm耳机输出插口,或者不喜欢有线的束缚,那么可以买一个蓝牙接收器,或
为何说OPPOFindx3失败?品牌高端旗舰到底有多难做?哪里有失败?是你认为的失败?国产手机里最好的屏幕,硬件,和更好的手感,闪充和ip68防水,虽然镜头模组比上一代有缩水,但成像效果也是前几位的水准,同样的体积和电池容量,还有几个你所