范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

探秘RocketMQ5。0client端Nameserver地址更新的源码实现方式

  源码版本rocketmq:release-5.0.0Client如何更新本地namesrvAddr
  client更新Nameserver的定时器主要是在这里启动的this.startScheduledTask();
  我们进去这个方法看一看
  如果clientConfig获取的namesrvAddr地址为空就会启动一个定时任务
  其中定时任务中方法MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
  就是通过我们设置的wsAddr(也就是获取namesrvAddr的url地址)地址去获取namesrvAddr地址
  线程池scheduledExecutorService的初始化private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MQClientFactoryScheduledThread"));
  可以看到是线程池延迟1s后执行,每隔2分钟去http服务器去拉取namesrvAddr的地址
  实际更新本地的namesrvAddr地址是通过updateNameServerAddressList方法
  最终实际去更新的还是NettyRemotingClient中的namesrvAddrList值
  @Override     public void updateNameServerAddressList(List addrs) {         List old = this.namesrvAddrList.get();         boolean update = false;          if (!addrs.isEmpty()) {             if (null == old) {                 update = true;             } else if (addrs.size() != old.size()) {                 update = true;             } else {                 for (int i = 0; i < addrs.size() && !update; i++) {                     if (!old.contains(addrs.get(i))) {                         update = true;                     }                 }             }              if (update) {                 Collections.shuffle(addrs);                 LOGGER.info("name server address updated. NEW : {} , OLD: {}", addrs, old);                 this.namesrvAddrList.set(addrs);                  // should close the channel if choosed addr is not exist.                 if (this.namesrvAddrChoosed.get() != null && !addrs.contains(this.namesrvAddrChoosed.get())) {                     String namesrvAddr = this.namesrvAddrChoosed.get();                     for (String addr : this.channelTables.keySet()) {                         if (addr.contains(namesrvAddr)) {                             ChannelWrapper channelWrapper = this.channelTables.get(addr);                             if (channelWrapper != null) {                                 closeChannel(channelWrapper.getChannel());                             }                         }                     }                 }             }         }     }
  这里除了更新namesrvAddrList的地址外,还通过Collections.shuffle(addrs)方法对addrs的地址进行了随机打散,就是为了后面再随机选用一个namesrvAddr地址,同时还做了一个事情就是如果新获取到的namesrvAddr和原有的对比,如果旧的namesrvAddr地址有被新namesrvAddr地址剔除的就会将客户端与旧NameServer断开tcp连接closeChannel(channelWrapper.getChannel());
  值得注意的这里都是客户端的namesrvAddr更新,还没有涉及到namesrvAddr的重连,所以我们还需要看看namesrvAddr重连的定时器
  跟随源码可以定位到重连的定时器主要在NettyRemotingClient这个类中启动的 NettyRemotingClient.this.scanAvailableNameSrv()
  其中timer的初始化也是new了一个简单的Timerprivate final Timer timer = new Timer("ClientHouseKeepingService", true);
  可以看到是启动就直接执行没有延时时间,循环时间默认3s执行一次,可通过参数com.rocketmq.remoting.client.connect.timeout设置
  这里我们进入到scanAvailableNameSrv方法看看	private void scanAvailableNameSrv() {         List nameServerList = this.namesrvAddrList.get();         if (nameServerList == null) {             LOGGER.debug("scanAvailableNameSrv Addresses of name server is empty!");             return;         }          for (final String namesrvAddr : nameServerList) {             scanExecutor.execute(new Runnable() {                 @Override                 public void run() {                     try {                         Channel channel = NettyRemotingClient.this.getAndCreateChannel(namesrvAddr);                         if (channel != null) {                             NettyRemotingClient.this.availableNamesrvAddrMap.putIfAbsent(namesrvAddr, true);                         } else {                             NettyRemotingClient.this.availableNamesrvAddrMap.remove(namesrvAddr);                         }                     } catch (Exception e) {                         LOGGER.error("scanAvailableNameSrv get channel of {} failed, ", namesrvAddr, e);                     }                 }             });         }      }
  可以看到这里面又通过一个线程池scanExecutor去异步扫描多个namesrvAddr
  我们这里看看scanExecutor线程池的初始化配置	this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,             new ArrayBlockingQueue(32), new ThreadFactory() {                 private final AtomicInteger threadIndex = new AtomicInteger(0);                  @Override                 public Thread newThread(Runnable r) {                     return new Thread(r, "NettyClientScan_thread_" + this.threadIndex.incrementAndGet());                 }             }         );
  我们进入NettyRemotingClient.this.getAndCreateChannel方法看看
  注意这里第一行有一个判断就是null == addr
  这个判断是为了MQClientInstance.this.updateTopicRouteInfoFromNameServer(); 这里准备的
  还记得我们的startScheduledTask()方法吗 这里是开了一个定时器,通过方法MQClientInstance.this.updateTopicRouteInfoFromNameServer();定时去更新topic的信息
  默认时间可以看到是30s
  我们可以看看TopicRouteData的相关数据
  值得一提的我们发送消息最终使用的topic、queue相关信息是使用的TopicPublishInfo属性,TopicRouteData与TopicPublishInfo相关的转化是通过 public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route)方法
  我们可以看看转换后的TopicPublishInfo是啥样
  然后我们来看看getAndCreateNameserverChannel()方法        private Channel getAndCreateNameserverChannel() throws InterruptedException {         String addr = this.namesrvAddrChoosed.get();         if (addr != null) {             ChannelWrapper cw = this.channelTables.get(addr);             if (cw != null && cw.isOK()) {                 return cw.getChannel();             }         }          final List addrList = this.namesrvAddrList.get();         if (this.namesrvChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {             try {                 addr = this.namesrvAddrChoosed.get();                 if (addr != null) {                     ChannelWrapper cw = this.channelTables.get(addr);                     if (cw != null && cw.isOK()) {                         return cw.getChannel();                     }                 }                  if (addrList != null && !addrList.isEmpty()) {                     for (int i = 0; i < addrList.size(); i++) {                         int index = this.namesrvIndex.incrementAndGet();                         index = Math.abs(index);                         index = index % addrList.size();                         String newAddr = addrList.get(index);                          this.namesrvAddrChoosed.set(newAddr);                         LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);                         Channel channelNew = this.createChannel(newAddr);                         if (channelNew != null) {                             return channelNew;                         }                     }                     throw new RemotingConnectException(addrList.toString());                 }             } catch (Exception e) {                 LOGGER.error("getAndCreateNameserverChannel: create name server channel exception", e);             } finally {                 this.namesrvChannelLock.unlock();             }         } else {             LOGGER.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);         }          return null;     }
  有一些值得注意的是通过namesrvAddrChoosed获取nameserver地址连接的channal存储在channelTables中复用如果namesrvAddrChoosed获取到的nameserver地址不存在或者不是存活的则随机再从namesrvAddrList获取一个地址 具体核心代码如下                        int index = this.namesrvIndex.incrementAndGet();                         index = Math.abs(index);                         index = index % addrList.size();                         String newAddr = addrList.get(index);                          this.namesrvAddrChoosed.set(newAddr);
  这里就是每次发送消息更新topic的流程
  然后我们来看看scanAvailableNameSrv流程的this.createChannel(addr)方法private Channel createChannel(final String addr) throws InterruptedException { 	//获取原来的ChannelWrapper         ChannelWrapper cw = this.channelTables.get(addr); 	// 如果存在并且存活则直接返回         if (cw != null && cw.isOK()) {             return cw.getChannel();         }          if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {             try { 		// 是否创建新的连接                 boolean createNewConnection; 		//双重检查 类似双重检查的单例模式                 cw = this.channelTables.get(addr);                 if (cw != null) {                      if (cw.isOK()) {                         return cw.getChannel();                     } else if (!cw.getChannelFuture().isDone()) {                         createNewConnection = false;                     } else {                         this.channelTables.remove(addr);                         createNewConnection = true;                     }                 } else {                     createNewConnection = true;                 } 		            // 创建新的连接                 if (createNewConnection) { 		                // 连接到 Nameserver                     ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));                     LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr);                     cw = new ChannelWrapper(channelFuture);                     this.channelTables.put(addr, cw);                 }             } catch (Exception e) {                 LOGGER.error("createChannel: create channel exception", e);             } finally {                 this.lockChannelTables.unlock();             }         } else {             LOGGER.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);         }          if (cw != null) {             ChannelFuture channelFuture = cw.getChannelFuture();             if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {                 if (cw.isOK()) {                     LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());                     return cw.getChannel();                 } else {                     LOGGER.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString());                 }             } else {                 LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),                     channelFuture.toString());             }         }          return null;     } 小结
  总的来说客户端获取namesrvAddr的方式是三秒更新一次,所以客户端3s就能感受到namesrvAddr是否下线。
  连接相关的信息存储在channelTables中,即
  总结客户端每隔30s就会去扫描判断Nameserver是否存活客户端每隔2分钟才会去http服务器拉取最新的Nameserver地址客户端topic相关的缓存信息每隔30s会去Nameserver拉取最新的topic、broker相关信息

故乡往事(未完待续)故乡往事风将蒲公英吹落,自此它的种子回归大地。这是一段记录,关于童年,关于故乡,关于亲情,关于爱与救赎。记忆深处的这一段回忆,予你予他。愿我们都能获得内心的安宁。开篇在我的童年记忆你听,春天来了(文紫情冬语)你听,春天来了(文紫情冬语)远远近近间或响起的鞭炮声,感觉年的味道愈来愈浓了,虽然又是禁燃鞭炮烟花的一年。鞭炮声响,似乎也是年的声音,辞旧迎新的篇章里写着喜气和年味。不喜放鞭炮,却你还好吗?我的生活也是头条窗外,寒风催岁月,遍地枯叶黄,冬天萧瑟的情景没有了春秋的诗情画意,在这寒冷的季节,唯有呆在家里才让人感到温暖。灌了一个热水袋,倚窗而坐,伴我的只有西下的夕阳带来的丝背靠着洁白无瑕的墙上望着窗外落下的雨滴,此时的你在干嘛呢?望着窗外飘着的细雨,看着玻璃窗。上雨水划过留下的水珠,这是季节交替的雨季,冷与暖的交汇就像悲与喜的交感,于是天空再也耐不住乌云的灰沉,下起了雨,落下了泪!人行道边,马路上,一把把颜体验文字的魅力3一起来学英语吧你觉得孤单就对了,那是让你看清自己的时机。你觉得不被了解就对了,那是让你认清冤家的时机。你觉得黑暗就对了,那样你才分辨得出什么是你的光辉。你觉得无助就对了,那样你才干明白谁是你的贵武汉印象之悠悠千年卓刀泉是的,这就是卓刀泉!这就是一千八百多年来生生不息倍受尊崇的卓刀泉。这就是在华夏子孙中久享盛名,集忠义仁勇智于一体,儒道佛共同信仰的对象美髯公关羽卓刀于此的卓刀泉。此刻,她静静地伏在肝好不好,脸知道,养肝护肝从现在开始肝不好,脸知道,你的肝还好吗?脸上出现这三个问题,要小心!1脸色发黄发黑2视力下降,眼睛无神3粉刺丛生肝是如何作出问题的?1过量饮酒酒精对肝细胞有明显的毒性作用。2过度劳累重体力劳行走的健康卫士,didoG28SPro智能手表让父母更省心父母的年纪越来越大,每每想起心里总是有着不太舒服的情绪,特别是父亲常年被高血压折磨做儿子的却无能为力。为了随时能看到二老的动态,特意让师傅上门给家里装了一台安防摄像头。这段时间在回卡拉布里亚被罗马逼平让人失望愤怒,但米兰从不惧怕任何挑战直播吧1月10日讯上轮意甲联赛,米兰22遭罗马绝平。赛后在Instagram中,米兰队长卡拉布里亚发文表达了自己的感受,并激励了全队。卡拉布里亚写道对于22的结果,我感到失望和愤怒人民银行济南分行多措并举,推动全省科创金融事业发展记者高寒2022年以来,中国人民银行济南分行按照济南市科创金融改革试验区建设有关要求及工作部署。在全省范围内,人民银行济南分行也多措并举,加大金融资源对科技创新活动的支持力度加大专利好!赣州公积金政策有变!事关贷款好消息!赣州市住房公积金管理中心发布通知明确对2023年1月1日起受理的贷款将公积金贷款可贷金额计算公式中的个贷率系数调整为7重大利好!个贷率系数调整为7那么,跟贷款金额又有什么关
唐山打人事件有新进展啦!7男2女背景曝光9人10年犯11案一窝端唐山打人事件有新进展啦!7男2女背景曝光9人10年犯11案一窝端后续唐山打人事件被打女子最新情况公布,打人者结局很满意。震撼唐山打人事件她不想被欺负,所以马上回应。这次还公布了被打刘德华抖音演唱会,一首十七岁一晃好多年五九五三刘德华不单单是一个名字,而是一个时代的标签是八零后的整个青春啊!666可可之地隔离期间在家看偶像,感谢这个时代互联网捂脸9547听着听着想哭了他与我岁月不饶人!阳光自律了一卖淫嫖娼行为被偷拍上传至互联网,一般怎么处理有这样一个案例桑某利用寻花的名义,大量偷拍和他人的视频,并传播。阿瑶与桑某约定在某酒店开展卖淫嫖娼活动。双方发生关系后,阿瑶就走了!几个月后,阿瑶的朋友告诉她,在网站上发现她和一个印度超过英国成为世界第五大经济体,是必然也是世界史的转折点印度超越英国是一种必然现象,因为无论从体谅人口,国土面积,历史传承各个方面来说,运动具有雄厚的实力,他需要的只是潜心追赶。而英国200年的世界第一是站在拥有全世界的资源和市场基础上这座山比天门山还高10米,大多数人去张家界没去过说到张家界,不少旅游爱好者都能侃侃而谈去天门山走玻璃栈道天门洞口感受自然壮阔,去武陵源听溪水清唱看峰林笔挺,去张家界大峡谷走玻璃桥挑战蹦极,再多玩几天,还有茅岩河的漂流,九天洞的清著名女主播劳春燕一路奋斗到央视,与丈夫分居,如今咋样在央视的众多主播里面,有一个叫劳春燕的女主播。其实她一开始并不是央视的人,而是从上海电视台打拼到上海卫视,又从上海卫视到央视的。这一路事业的打拼,对于劳春燕来说,十分的热血,励志。说谎的代价你见过萧慕莲吗?可知道她长什么模样?苏凉问宁靖。宁靖点头,见过一回。好。苏凉若有所思,我有个计划,你看是否可行。二皇子府。端木敖一早发了大火,摔了两个名贵的花瓶,下人都噤若寒蝉。府谁会成为下一个经济体,在未来,会对世界经济产生重大影响在过去三十年中,亚洲大陆经历了快速的变化有的国家已经实现了像西方那样的繁荣。然而,也有许多发展中国家正处于这种增长的边缘有人甚至说它正在创造一个多级世界这是世界新秩序的开始而这些国美企警告后,老美重新修改芯片限制?多家中企可以松口气了自从芯片法案一落地之后,老美有了多种举动加快对芯片四方联盟的小圈子建设限制芯片之母EDA领域还有日前对英伟达AMD发出停止出货高性能智能芯片等。毫无疑问,老美又在芯片限制上开始动手长期服用奥美拉唑,这4种不良反应,或要多留意,建议早了解此外,据统计,全世界有7。08亿人患有胃病,中国作为一个人口大国,在疾病排名中也排名第一,因炎症性疾病导致胃癌猝死的人数高达236800人,这也严重影响了人们的健康。作为重要的消化别人劝我不要娶女护士,结完婚后我终于明白了,网友哈哈哈终究还是高估了自己的能力本来就是生意还谈什么感情呢你有多久没有见过这样的场面了厂里新发的工服总感觉哪里有点不对遇到这样的事你怎么看呢假如被她射中这事是不是就稳了天气太冷又看到这个小