范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

探秘RocketMQ5。0client端Nameserver地址更新的源码实现方式

  源码版本rocketmq:release-5.0.0Client如何更新本地namesrvAddr
  client更新Nameserver的定时器主要是在这里启动的this.startScheduledTask();
  我们进去这个方法看一看
  如果clientConfig获取的namesrvAddr地址为空就会启动一个定时任务
  其中定时任务中方法MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
  就是通过我们设置的wsAddr(也就是获取namesrvAddr的url地址)地址去获取namesrvAddr地址
  线程池scheduledExecutorService的初始化private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MQClientFactoryScheduledThread"));
  可以看到是线程池延迟1s后执行,每隔2分钟去http服务器去拉取namesrvAddr的地址
  实际更新本地的namesrvAddr地址是通过updateNameServerAddressList方法
  最终实际去更新的还是NettyRemotingClient中的namesrvAddrList值
  @Override     public void updateNameServerAddressList(List addrs) {         List old = this.namesrvAddrList.get();         boolean update = false;          if (!addrs.isEmpty()) {             if (null == old) {                 update = true;             } else if (addrs.size() != old.size()) {                 update = true;             } else {                 for (int i = 0; i < addrs.size() && !update; i++) {                     if (!old.contains(addrs.get(i))) {                         update = true;                     }                 }             }              if (update) {                 Collections.shuffle(addrs);                 LOGGER.info("name server address updated. NEW : {} , OLD: {}", addrs, old);                 this.namesrvAddrList.set(addrs);                  // should close the channel if choosed addr is not exist.                 if (this.namesrvAddrChoosed.get() != null && !addrs.contains(this.namesrvAddrChoosed.get())) {                     String namesrvAddr = this.namesrvAddrChoosed.get();                     for (String addr : this.channelTables.keySet()) {                         if (addr.contains(namesrvAddr)) {                             ChannelWrapper channelWrapper = this.channelTables.get(addr);                             if (channelWrapper != null) {                                 closeChannel(channelWrapper.getChannel());                             }                         }                     }                 }             }         }     }
  这里除了更新namesrvAddrList的地址外,还通过Collections.shuffle(addrs)方法对addrs的地址进行了随机打散,就是为了后面再随机选用一个namesrvAddr地址,同时还做了一个事情就是如果新获取到的namesrvAddr和原有的对比,如果旧的namesrvAddr地址有被新namesrvAddr地址剔除的就会将客户端与旧NameServer断开tcp连接closeChannel(channelWrapper.getChannel());
  值得注意的这里都是客户端的namesrvAddr更新,还没有涉及到namesrvAddr的重连,所以我们还需要看看namesrvAddr重连的定时器
  跟随源码可以定位到重连的定时器主要在NettyRemotingClient这个类中启动的 NettyRemotingClient.this.scanAvailableNameSrv()
  其中timer的初始化也是new了一个简单的Timerprivate final Timer timer = new Timer("ClientHouseKeepingService", true);
  可以看到是启动就直接执行没有延时时间,循环时间默认3s执行一次,可通过参数com.rocketmq.remoting.client.connect.timeout设置
  这里我们进入到scanAvailableNameSrv方法看看	private void scanAvailableNameSrv() {         List nameServerList = this.namesrvAddrList.get();         if (nameServerList == null) {             LOGGER.debug("scanAvailableNameSrv Addresses of name server is empty!");             return;         }          for (final String namesrvAddr : nameServerList) {             scanExecutor.execute(new Runnable() {                 @Override                 public void run() {                     try {                         Channel channel = NettyRemotingClient.this.getAndCreateChannel(namesrvAddr);                         if (channel != null) {                             NettyRemotingClient.this.availableNamesrvAddrMap.putIfAbsent(namesrvAddr, true);                         } else {                             NettyRemotingClient.this.availableNamesrvAddrMap.remove(namesrvAddr);                         }                     } catch (Exception e) {                         LOGGER.error("scanAvailableNameSrv get channel of {} failed, ", namesrvAddr, e);                     }                 }             });         }      }
  可以看到这里面又通过一个线程池scanExecutor去异步扫描多个namesrvAddr
  我们这里看看scanExecutor线程池的初始化配置	this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,             new ArrayBlockingQueue(32), new ThreadFactory() {                 private final AtomicInteger threadIndex = new AtomicInteger(0);                  @Override                 public Thread newThread(Runnable r) {                     return new Thread(r, "NettyClientScan_thread_" + this.threadIndex.incrementAndGet());                 }             }         );
  我们进入NettyRemotingClient.this.getAndCreateChannel方法看看
  注意这里第一行有一个判断就是null == addr
  这个判断是为了MQClientInstance.this.updateTopicRouteInfoFromNameServer(); 这里准备的
  还记得我们的startScheduledTask()方法吗 这里是开了一个定时器,通过方法MQClientInstance.this.updateTopicRouteInfoFromNameServer();定时去更新topic的信息
  默认时间可以看到是30s
  我们可以看看TopicRouteData的相关数据
  值得一提的我们发送消息最终使用的topic、queue相关信息是使用的TopicPublishInfo属性,TopicRouteData与TopicPublishInfo相关的转化是通过 public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route)方法
  我们可以看看转换后的TopicPublishInfo是啥样
  然后我们来看看getAndCreateNameserverChannel()方法        private Channel getAndCreateNameserverChannel() throws InterruptedException {         String addr = this.namesrvAddrChoosed.get();         if (addr != null) {             ChannelWrapper cw = this.channelTables.get(addr);             if (cw != null && cw.isOK()) {                 return cw.getChannel();             }         }          final List addrList = this.namesrvAddrList.get();         if (this.namesrvChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {             try {                 addr = this.namesrvAddrChoosed.get();                 if (addr != null) {                     ChannelWrapper cw = this.channelTables.get(addr);                     if (cw != null && cw.isOK()) {                         return cw.getChannel();                     }                 }                  if (addrList != null && !addrList.isEmpty()) {                     for (int i = 0; i < addrList.size(); i++) {                         int index = this.namesrvIndex.incrementAndGet();                         index = Math.abs(index);                         index = index % addrList.size();                         String newAddr = addrList.get(index);                          this.namesrvAddrChoosed.set(newAddr);                         LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);                         Channel channelNew = this.createChannel(newAddr);                         if (channelNew != null) {                             return channelNew;                         }                     }                     throw new RemotingConnectException(addrList.toString());                 }             } catch (Exception e) {                 LOGGER.error("getAndCreateNameserverChannel: create name server channel exception", e);             } finally {                 this.namesrvChannelLock.unlock();             }         } else {             LOGGER.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);         }          return null;     }
  有一些值得注意的是通过namesrvAddrChoosed获取nameserver地址连接的channal存储在channelTables中复用如果namesrvAddrChoosed获取到的nameserver地址不存在或者不是存活的则随机再从namesrvAddrList获取一个地址 具体核心代码如下                        int index = this.namesrvIndex.incrementAndGet();                         index = Math.abs(index);                         index = index % addrList.size();                         String newAddr = addrList.get(index);                          this.namesrvAddrChoosed.set(newAddr);
  这里就是每次发送消息更新topic的流程
  然后我们来看看scanAvailableNameSrv流程的this.createChannel(addr)方法private Channel createChannel(final String addr) throws InterruptedException { 	//获取原来的ChannelWrapper         ChannelWrapper cw = this.channelTables.get(addr); 	// 如果存在并且存活则直接返回         if (cw != null && cw.isOK()) {             return cw.getChannel();         }          if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {             try { 		// 是否创建新的连接                 boolean createNewConnection; 		//双重检查 类似双重检查的单例模式                 cw = this.channelTables.get(addr);                 if (cw != null) {                      if (cw.isOK()) {                         return cw.getChannel();                     } else if (!cw.getChannelFuture().isDone()) {                         createNewConnection = false;                     } else {                         this.channelTables.remove(addr);                         createNewConnection = true;                     }                 } else {                     createNewConnection = true;                 } 		            // 创建新的连接                 if (createNewConnection) { 		                // 连接到 Nameserver                     ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));                     LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr);                     cw = new ChannelWrapper(channelFuture);                     this.channelTables.put(addr, cw);                 }             } catch (Exception e) {                 LOGGER.error("createChannel: create channel exception", e);             } finally {                 this.lockChannelTables.unlock();             }         } else {             LOGGER.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);         }          if (cw != null) {             ChannelFuture channelFuture = cw.getChannelFuture();             if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {                 if (cw.isOK()) {                     LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());                     return cw.getChannel();                 } else {                     LOGGER.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString());                 }             } else {                 LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),                     channelFuture.toString());             }         }          return null;     } 小结
  总的来说客户端获取namesrvAddr的方式是三秒更新一次,所以客户端3s就能感受到namesrvAddr是否下线。
  连接相关的信息存储在channelTables中,即
  总结客户端每隔30s就会去扫描判断Nameserver是否存活客户端每隔2分钟才会去http服务器拉取最新的Nameserver地址客户端topic相关的缓存信息每隔30s会去Nameserver拉取最新的topic、broker相关信息

新疆广汇男篮招聘总经理,李洪庆和霍楠都是不错的人选郭舰这两天网上流出一则消息,新疆广汇男篮正在招聘总经理,我上网查阅了一下资料,确认确有此事。他们已经在网上发布了招聘公告,里面包含岗位介绍和任职要求,很多球迷看到年薪百万,非常的亢秦岭商於古道上的小镇,一路风光旖旎如画,2元美食让人惊艳秦岭里,有一条比蜀道更传奇的秦楚大道商於(古音念w)古道。它以东南西北的走向贯穿大秦岭,一头连接着长江流域的南襄盆地(南阳襄阳),一头连接着陕西的关中平原(西安咸阳渭南宝鸡),是古三亚南海观音飞机降落前总在其上盘旋,其中有何深意?佛教起源于公元前5世纪,于西汉时期传入我国,因其教义广受百姓欢迎,时至今日,仍有许多虔诚的信徒。谈及佛教,就避不开五大佛教名山,五尊佛像虽各有千秋,但他们都被赋予了普度众生的神圣使一张图看懂丙察察自驾怎么走附带详细路书,拿走不谢丙察察大流沙丙察察分别代表线上的三个地点云南的丙中洛西藏的察瓦龙西藏的察隅。它曾是进藏越野路线的终极代表丙中洛丙中洛雾里村以前不自驾一趟丙察察,都不敢说自己是自驾老司机因为相对其他廊桥看浙里庆元咏归桥一座屹立不倒的红色丰碑来源人民日报中央厨房环城西路二十号工作室浙江丽水市庆元县,松源溪穿城而过。溪水之上,各式水泥桥飞架两岸。不过,那座从历史硝烟中走来的咏归桥,却别有韵味。有人说,松源溪是庆元人民的母平凉华亭一生绝痴处寻梦莲花台巍巍关山,万木竞秀,草木叠翠,碧峰耸峙。据有关史料记载,华亭莲花台始建于秦汉,兴盛于唐代,佛道儒三教并存。唐朝皇帝李纯就敇封其山为西陇之名山,敇封食粮僧道3500人,建有不同风格的美媒俄罗斯人出境游不再首选西欧据美国消费者新闻与商业频道网站9月9日报道,西欧曾是俄罗斯游客的首选目的地,但现在情况发生了变化。报道称,40多岁的俄罗斯人马克斯曾经在法国逛博物馆,在意大利享受美食,在西班牙的山湖南引以为傲的特色美食湖南,因其位于洞庭湖以南被称为湖南,当地气候湿润,物产丰富,被称为鱼米之乡。优越的自然环境,丰厚的自然资源和人文历史,让湖南人酝酿出了很多特色美食。来湖南游玩,除了可以体验到美妙绝理想销量断崖式下跌,老车主怨声载道,造车新势力还有没有未来从8月份,新能源汽车的销量数据来看呢,理想ONE真的是遭遇了滑铁卢,是非常不理想的。虽然我一直对于理想这款车,我还是心存敬意的,说实话一个三缸发动机,一个增程式技术,然后再加上30华为Mate50输了?中纪委点赞华为,网友别寒了有担当的中企的心华为Mate50VSiPhone14,谁输谁赢?关于这个问题,相信大部分消费者心里都有自己的答案。最近经常会在网上看到苹果赢麻了华为输惨了唱衰苹果支持华为等讨论。说实话,拿华为Ma先别急着换机,五款新机遭曝光,并有2023年的机型虽然现在的手机市场中有非常多的新机,但是对大多数用户来说,还是选择等待自身期待的新机,有可能是千元手机,也有可能是明年的新旗舰,也有可能是中端手机,这都是很正常的情况,所以从这个角
使用有锁iphoneX是什么体验?感谢邀请。关于这个问题,我真的有可以好好跟题主说一下这个事情。本人有一部有锁版iPhoneX和一部无锁版iPhone8同时使用。当时买有锁iPhoneX时因为便宜就是图便宜还时最新奢侈品等级是怎么区分的?这里我就讲一下大家比较经常听到的奢侈品中的六大蓝血和八大红血。1。六大蓝血蓝血的定义是来自于西班牙贵族,他们用蓝色血液去称呼拥有贵族血统的人。六大蓝血指的就是,Dior(迪奥),C微单和单反,你选哪一个?选择还是单反优点系统,功能,操作,画质均属一流。我选微单。我使用的相机是尼康z6。微单是相机发展的趋势,各大相机厂家都在微单领域积极开发新产品及镜头群,其中索尼在微单领域已深耕多年全屋净水器哪个牌子好?求推荐?这个问题很难用那个品牌好来回答。因为,评价净水器品牌好坏的因素主要有四点,故障率,产品寿命,便利程度,信任度。综合来讲,能够达到这些要求的品牌都应当是靠谱的,至多只是程度有所差别,为什么石铁陨石,铁陨石大部分都有沙眼撞击纹而矿石就没有呢?原因很简单,因为成因不同,一个是经过经过岩浆分异作用然后各种地质作用,生物圈的表生作用形成的矿物高度成熟矿石。另一个大部分都没有经过分异,甚至只经过一些简单的碰撞具有动力变质作用,宇宙的终极是熵,熵到底是什么?万事万物为何熵只增不减?夏商和西周,东周分两段,春秋和战国,一统秦两汉,三分魏蜀吴,两晋前后延,南北朝并列,隋唐五代传,宋元明清后,皇朝至此完。按照这个学说,熵后面应该还有椆,嫀,釬,巍,薥,蜈,瑨,宝宝在头条你写一篇两千到三千字的文章,从准备到写成大概要多久?时间长短由多方面因素决定的,如果仅从动笔开始到结束,两三千字文章,有一两小时完成,有三五个月完成不了。根据我的体会,主要是四个有关,即与文章的内容有关,准备的时间有关,作品的质量有马上退市的股票为什么还有人买?新股上市会有人卖,股票退市仍旧会有人选择买,明知道股票隔天就要退市了,为什么还有人会选择大量买入,并且很多个股退市的最后一个交易日都是以涨停收盘,甚至在退市的最后一个交易日成交量还本人某985机械大二在读,打算去德国重读机械本科,德国大学机械本科考试难吗?按时毕业难吗?学了两年机械,还想学这个专业,说明你对这个专业其马不反感。想去德国学机械,无疑是个极好的选择,德国的公立大学不收学费,这样你以留美一半的费用享受到同等水平的教育,性价比极高。德国的三国全面战争武将为什么总掉马?三国全面战争武将为什么总要掉马?根据三国演义我们来回答这个问题。三国时期,(220年280年)是上承东汉下启西晋的一段历史时期,分为曹魏蜀汉东吴三个政权。赤壁之战时,曹操被孙刘联军有哪些好玩的休闲类游戏?谢邀请,你好,说实话,一天到晚忙忙碌碌没有时间玩游戏,所以也不知道什么游戏好玩。祝你周日愉快!可以考虑一下Minecraft(mc)又叫我的世界这是一款非常灵活有趣的游戏你可以在这