探秘RocketMQ5。0client端Nameserver地址更新的源码实现方式
源码版本rocketmq:release-5.0.0Client如何更新本地namesrvAddr
client更新Nameserver的定时器主要是在这里启动的this.startScheduledTask();
我们进去这个方法看一看
如果clientConfig获取的namesrvAddr地址为空就会启动一个定时任务
其中定时任务中方法MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
就是通过我们设置的wsAddr(也就是获取namesrvAddr的url地址)地址去获取namesrvAddr地址
线程池scheduledExecutorService的初始化private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MQClientFactoryScheduledThread"));
可以看到是线程池延迟1s后执行,每隔2分钟去http服务器去拉取namesrvAddr的地址
实际更新本地的namesrvAddr地址是通过updateNameServerAddressList方法
最终实际去更新的还是NettyRemotingClient中的namesrvAddrList值
@Override public void updateNameServerAddressList(List addrs) { List old = this.namesrvAddrList.get(); boolean update = false; if (!addrs.isEmpty()) { if (null == old) { update = true; } else if (addrs.size() != old.size()) { update = true; } else { for (int i = 0; i < addrs.size() && !update; i++) { if (!old.contains(addrs.get(i))) { update = true; } } } if (update) { Collections.shuffle(addrs); LOGGER.info("name server address updated. NEW : {} , OLD: {}", addrs, old); this.namesrvAddrList.set(addrs); // should close the channel if choosed addr is not exist. if (this.namesrvAddrChoosed.get() != null && !addrs.contains(this.namesrvAddrChoosed.get())) { String namesrvAddr = this.namesrvAddrChoosed.get(); for (String addr : this.channelTables.keySet()) { if (addr.contains(namesrvAddr)) { ChannelWrapper channelWrapper = this.channelTables.get(addr); if (channelWrapper != null) { closeChannel(channelWrapper.getChannel()); } } } } } } }
这里除了更新namesrvAddrList的地址外,还通过Collections.shuffle(addrs)方法对addrs的地址进行了随机打散,就是为了后面再随机选用一个namesrvAddr地址,同时还做了一个事情就是如果新获取到的namesrvAddr和原有的对比,如果旧的namesrvAddr地址有被新namesrvAddr地址剔除的就会将客户端与旧NameServer断开tcp连接closeChannel(channelWrapper.getChannel());
值得注意的这里都是客户端的namesrvAddr更新,还没有涉及到namesrvAddr的重连,所以我们还需要看看namesrvAddr重连的定时器
跟随源码可以定位到重连的定时器主要在NettyRemotingClient这个类中启动的 NettyRemotingClient.this.scanAvailableNameSrv()
其中timer的初始化也是new了一个简单的Timerprivate final Timer timer = new Timer("ClientHouseKeepingService", true);
可以看到是启动就直接执行没有延时时间,循环时间默认3s执行一次,可通过参数com.rocketmq.remoting.client.connect.timeout设置
这里我们进入到scanAvailableNameSrv方法看看 private void scanAvailableNameSrv() { List nameServerList = this.namesrvAddrList.get(); if (nameServerList == null) { LOGGER.debug("scanAvailableNameSrv Addresses of name server is empty!"); return; } for (final String namesrvAddr : nameServerList) { scanExecutor.execute(new Runnable() { @Override public void run() { try { Channel channel = NettyRemotingClient.this.getAndCreateChannel(namesrvAddr); if (channel != null) { NettyRemotingClient.this.availableNamesrvAddrMap.putIfAbsent(namesrvAddr, true); } else { NettyRemotingClient.this.availableNamesrvAddrMap.remove(namesrvAddr); } } catch (Exception e) { LOGGER.error("scanAvailableNameSrv get channel of {} failed, ", namesrvAddr, e); } } }); } }
可以看到这里面又通过一个线程池scanExecutor去异步扫描多个namesrvAddr
我们这里看看scanExecutor线程池的初始化配置 this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(32), new ThreadFactory() { private final AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyClientScan_thread_" + this.threadIndex.incrementAndGet()); } } );
我们进入NettyRemotingClient.this.getAndCreateChannel方法看看
注意这里第一行有一个判断就是null == addr
这个判断是为了MQClientInstance.this.updateTopicRouteInfoFromNameServer(); 这里准备的
还记得我们的startScheduledTask()方法吗 这里是开了一个定时器,通过方法MQClientInstance.this.updateTopicRouteInfoFromNameServer();定时去更新topic的信息
默认时间可以看到是30s
我们可以看看TopicRouteData的相关数据
值得一提的我们发送消息最终使用的topic、queue相关信息是使用的TopicPublishInfo属性,TopicRouteData与TopicPublishInfo相关的转化是通过 public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route)方法
我们可以看看转换后的TopicPublishInfo是啥样
然后我们来看看getAndCreateNameserverChannel()方法 private Channel getAndCreateNameserverChannel() throws InterruptedException { String addr = this.namesrvAddrChoosed.get(); if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } } final List addrList = this.namesrvAddrList.get(); if (this.namesrvChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { addr = this.namesrvAddrChoosed.get(); if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } } if (addrList != null && !addrList.isEmpty()) { for (int i = 0; i < addrList.size(); i++) { int index = this.namesrvIndex.incrementAndGet(); index = Math.abs(index); index = index % addrList.size(); String newAddr = addrList.get(index); this.namesrvAddrChoosed.set(newAddr); LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex); Channel channelNew = this.createChannel(newAddr); if (channelNew != null) { return channelNew; } } throw new RemotingConnectException(addrList.toString()); } } catch (Exception e) { LOGGER.error("getAndCreateNameserverChannel: create name server channel exception", e); } finally { this.namesrvChannelLock.unlock(); } } else { LOGGER.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); } return null; }
有一些值得注意的是通过namesrvAddrChoosed获取nameserver地址连接的channal存储在channelTables中复用如果namesrvAddrChoosed获取到的nameserver地址不存在或者不是存活的则随机再从namesrvAddrList获取一个地址 具体核心代码如下 int index = this.namesrvIndex.incrementAndGet(); index = Math.abs(index); index = index % addrList.size(); String newAddr = addrList.get(index); this.namesrvAddrChoosed.set(newAddr);
这里就是每次发送消息更新topic的流程
然后我们来看看scanAvailableNameSrv流程的this.createChannel(addr)方法private Channel createChannel(final String addr) throws InterruptedException { //获取原来的ChannelWrapper ChannelWrapper cw = this.channelTables.get(addr); // 如果存在并且存活则直接返回 if (cw != null && cw.isOK()) { return cw.getChannel(); } if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { // 是否创建新的连接 boolean createNewConnection; //双重检查 类似双重检查的单例模式 cw = this.channelTables.get(addr); if (cw != null) { if (cw.isOK()) { return cw.getChannel(); } else if (!cw.getChannelFuture().isDone()) { createNewConnection = false; } else { this.channelTables.remove(addr); createNewConnection = true; } } else { createNewConnection = true; } // 创建新的连接 if (createNewConnection) { // 连接到 Nameserver ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr)); LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr); cw = new ChannelWrapper(channelFuture); this.channelTables.put(addr, cw); } } catch (Exception e) { LOGGER.error("createChannel: create channel exception", e); } finally { this.lockChannelTables.unlock(); } } else { LOGGER.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); } if (cw != null) { ChannelFuture channelFuture = cw.getChannelFuture(); if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { if (cw.isOK()) { LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); return cw.getChannel(); } else { LOGGER.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString()); } } else { LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), channelFuture.toString()); } } return null; } 小结
总的来说客户端获取namesrvAddr的方式是三秒更新一次,所以客户端3s就能感受到namesrvAddr是否下线。
连接相关的信息存储在channelTables中,即
总结客户端每隔30s就会去扫描判断Nameserver是否存活客户端每隔2分钟才会去http服务器拉取最新的Nameserver地址客户端topic相关的缓存信息每隔30s会去Nameserver拉取最新的topic、broker相关信息
使用有锁iphoneX是什么体验?感谢邀请。关于这个问题,我真的有可以好好跟题主说一下这个事情。本人有一部有锁版iPhoneX和一部无锁版iPhone8同时使用。当时买有锁iPhoneX时因为便宜就是图便宜还时最新
奢侈品等级是怎么区分的?这里我就讲一下大家比较经常听到的奢侈品中的六大蓝血和八大红血。1。六大蓝血蓝血的定义是来自于西班牙贵族,他们用蓝色血液去称呼拥有贵族血统的人。六大蓝血指的就是,Dior(迪奥),C
微单和单反,你选哪一个?选择还是单反优点系统,功能,操作,画质均属一流。我选微单。我使用的相机是尼康z6。微单是相机发展的趋势,各大相机厂家都在微单领域积极开发新产品及镜头群,其中索尼在微单领域已深耕多年
全屋净水器哪个牌子好?求推荐?这个问题很难用那个品牌好来回答。因为,评价净水器品牌好坏的因素主要有四点,故障率,产品寿命,便利程度,信任度。综合来讲,能够达到这些要求的品牌都应当是靠谱的,至多只是程度有所差别,
为什么石铁陨石,铁陨石大部分都有沙眼撞击纹而矿石就没有呢?原因很简单,因为成因不同,一个是经过经过岩浆分异作用然后各种地质作用,生物圈的表生作用形成的矿物高度成熟矿石。另一个大部分都没有经过分异,甚至只经过一些简单的碰撞具有动力变质作用,
宇宙的终极是熵,熵到底是什么?万事万物为何熵只增不减?夏商和西周,东周分两段,春秋和战国,一统秦两汉,三分魏蜀吴,两晋前后延,南北朝并列,隋唐五代传,宋元明清后,皇朝至此完。按照这个学说,熵后面应该还有椆,嫀,釬,巍,薥,蜈,瑨,宝宝
在头条你写一篇两千到三千字的文章,从准备到写成大概要多久?时间长短由多方面因素决定的,如果仅从动笔开始到结束,两三千字文章,有一两小时完成,有三五个月完成不了。根据我的体会,主要是四个有关,即与文章的内容有关,准备的时间有关,作品的质量有
马上退市的股票为什么还有人买?新股上市会有人卖,股票退市仍旧会有人选择买,明知道股票隔天就要退市了,为什么还有人会选择大量买入,并且很多个股退市的最后一个交易日都是以涨停收盘,甚至在退市的最后一个交易日成交量还
本人某985机械大二在读,打算去德国重读机械本科,德国大学机械本科考试难吗?按时毕业难吗?学了两年机械,还想学这个专业,说明你对这个专业其马不反感。想去德国学机械,无疑是个极好的选择,德国的公立大学不收学费,这样你以留美一半的费用享受到同等水平的教育,性价比极高。德国的
三国全面战争武将为什么总掉马?三国全面战争武将为什么总要掉马?根据三国演义我们来回答这个问题。三国时期,(220年280年)是上承东汉下启西晋的一段历史时期,分为曹魏蜀汉东吴三个政权。赤壁之战时,曹操被孙刘联军
有哪些好玩的休闲类游戏?谢邀请,你好,说实话,一天到晚忙忙碌碌没有时间玩游戏,所以也不知道什么游戏好玩。祝你周日愉快!可以考虑一下Minecraft(mc)又叫我的世界这是一款非常灵活有趣的游戏你可以在这