探秘RocketMQ5。0client端Nameserver地址更新的源码实现方式
源码版本rocketmq:release-5.0.0Client如何更新本地namesrvAddr
client更新Nameserver的定时器主要是在这里启动的this.startScheduledTask();
我们进去这个方法看一看
如果clientConfig获取的namesrvAddr地址为空就会启动一个定时任务
其中定时任务中方法MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
就是通过我们设置的wsAddr(也就是获取namesrvAddr的url地址)地址去获取namesrvAddr地址
线程池scheduledExecutorService的初始化private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MQClientFactoryScheduledThread"));
可以看到是线程池延迟1s后执行,每隔2分钟去http服务器去拉取namesrvAddr的地址
实际更新本地的namesrvAddr地址是通过updateNameServerAddressList方法
最终实际去更新的还是NettyRemotingClient中的namesrvAddrList值
@Override public void updateNameServerAddressList(List addrs) { List old = this.namesrvAddrList.get(); boolean update = false; if (!addrs.isEmpty()) { if (null == old) { update = true; } else if (addrs.size() != old.size()) { update = true; } else { for (int i = 0; i < addrs.size() && !update; i++) { if (!old.contains(addrs.get(i))) { update = true; } } } if (update) { Collections.shuffle(addrs); LOGGER.info("name server address updated. NEW : {} , OLD: {}", addrs, old); this.namesrvAddrList.set(addrs); // should close the channel if choosed addr is not exist. if (this.namesrvAddrChoosed.get() != null && !addrs.contains(this.namesrvAddrChoosed.get())) { String namesrvAddr = this.namesrvAddrChoosed.get(); for (String addr : this.channelTables.keySet()) { if (addr.contains(namesrvAddr)) { ChannelWrapper channelWrapper = this.channelTables.get(addr); if (channelWrapper != null) { closeChannel(channelWrapper.getChannel()); } } } } } } }
这里除了更新namesrvAddrList的地址外,还通过Collections.shuffle(addrs)方法对addrs的地址进行了随机打散,就是为了后面再随机选用一个namesrvAddr地址,同时还做了一个事情就是如果新获取到的namesrvAddr和原有的对比,如果旧的namesrvAddr地址有被新namesrvAddr地址剔除的就会将客户端与旧NameServer断开tcp连接closeChannel(channelWrapper.getChannel());
值得注意的这里都是客户端的namesrvAddr更新,还没有涉及到namesrvAddr的重连,所以我们还需要看看namesrvAddr重连的定时器
跟随源码可以定位到重连的定时器主要在NettyRemotingClient这个类中启动的 NettyRemotingClient.this.scanAvailableNameSrv()
其中timer的初始化也是new了一个简单的Timerprivate final Timer timer = new Timer("ClientHouseKeepingService", true);
可以看到是启动就直接执行没有延时时间,循环时间默认3s执行一次,可通过参数com.rocketmq.remoting.client.connect.timeout设置
这里我们进入到scanAvailableNameSrv方法看看 private void scanAvailableNameSrv() { List nameServerList = this.namesrvAddrList.get(); if (nameServerList == null) { LOGGER.debug("scanAvailableNameSrv Addresses of name server is empty!"); return; } for (final String namesrvAddr : nameServerList) { scanExecutor.execute(new Runnable() { @Override public void run() { try { Channel channel = NettyRemotingClient.this.getAndCreateChannel(namesrvAddr); if (channel != null) { NettyRemotingClient.this.availableNamesrvAddrMap.putIfAbsent(namesrvAddr, true); } else { NettyRemotingClient.this.availableNamesrvAddrMap.remove(namesrvAddr); } } catch (Exception e) { LOGGER.error("scanAvailableNameSrv get channel of {} failed, ", namesrvAddr, e); } } }); } }
可以看到这里面又通过一个线程池scanExecutor去异步扫描多个namesrvAddr
我们这里看看scanExecutor线程池的初始化配置 this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(32), new ThreadFactory() { private final AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyClientScan_thread_" + this.threadIndex.incrementAndGet()); } } );
我们进入NettyRemotingClient.this.getAndCreateChannel方法看看
注意这里第一行有一个判断就是null == addr
这个判断是为了MQClientInstance.this.updateTopicRouteInfoFromNameServer(); 这里准备的
还记得我们的startScheduledTask()方法吗 这里是开了一个定时器,通过方法MQClientInstance.this.updateTopicRouteInfoFromNameServer();定时去更新topic的信息
默认时间可以看到是30s
我们可以看看TopicRouteData的相关数据
值得一提的我们发送消息最终使用的topic、queue相关信息是使用的TopicPublishInfo属性,TopicRouteData与TopicPublishInfo相关的转化是通过 public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route)方法
我们可以看看转换后的TopicPublishInfo是啥样
然后我们来看看getAndCreateNameserverChannel()方法 private Channel getAndCreateNameserverChannel() throws InterruptedException { String addr = this.namesrvAddrChoosed.get(); if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } } final List addrList = this.namesrvAddrList.get(); if (this.namesrvChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { addr = this.namesrvAddrChoosed.get(); if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } } if (addrList != null && !addrList.isEmpty()) { for (int i = 0; i < addrList.size(); i++) { int index = this.namesrvIndex.incrementAndGet(); index = Math.abs(index); index = index % addrList.size(); String newAddr = addrList.get(index); this.namesrvAddrChoosed.set(newAddr); LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex); Channel channelNew = this.createChannel(newAddr); if (channelNew != null) { return channelNew; } } throw new RemotingConnectException(addrList.toString()); } } catch (Exception e) { LOGGER.error("getAndCreateNameserverChannel: create name server channel exception", e); } finally { this.namesrvChannelLock.unlock(); } } else { LOGGER.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); } return null; }
有一些值得注意的是通过namesrvAddrChoosed获取nameserver地址连接的channal存储在channelTables中复用如果namesrvAddrChoosed获取到的nameserver地址不存在或者不是存活的则随机再从namesrvAddrList获取一个地址 具体核心代码如下 int index = this.namesrvIndex.incrementAndGet(); index = Math.abs(index); index = index % addrList.size(); String newAddr = addrList.get(index); this.namesrvAddrChoosed.set(newAddr);
这里就是每次发送消息更新topic的流程
然后我们来看看scanAvailableNameSrv流程的this.createChannel(addr)方法private Channel createChannel(final String addr) throws InterruptedException { //获取原来的ChannelWrapper ChannelWrapper cw = this.channelTables.get(addr); // 如果存在并且存活则直接返回 if (cw != null && cw.isOK()) { return cw.getChannel(); } if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { // 是否创建新的连接 boolean createNewConnection; //双重检查 类似双重检查的单例模式 cw = this.channelTables.get(addr); if (cw != null) { if (cw.isOK()) { return cw.getChannel(); } else if (!cw.getChannelFuture().isDone()) { createNewConnection = false; } else { this.channelTables.remove(addr); createNewConnection = true; } } else { createNewConnection = true; } // 创建新的连接 if (createNewConnection) { // 连接到 Nameserver ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr)); LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr); cw = new ChannelWrapper(channelFuture); this.channelTables.put(addr, cw); } } catch (Exception e) { LOGGER.error("createChannel: create channel exception", e); } finally { this.lockChannelTables.unlock(); } } else { LOGGER.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); } if (cw != null) { ChannelFuture channelFuture = cw.getChannelFuture(); if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { if (cw.isOK()) { LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); return cw.getChannel(); } else { LOGGER.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString()); } } else { LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), channelFuture.toString()); } } return null; } 小结
总的来说客户端获取namesrvAddr的方式是三秒更新一次,所以客户端3s就能感受到namesrvAddr是否下线。
连接相关的信息存储在channelTables中,即
总结客户端每隔30s就会去扫描判断Nameserver是否存活客户端每隔2分钟才会去http服务器拉取最新的Nameserver地址客户端topic相关的缓存信息每隔30s会去Nameserver拉取最新的topic、broker相关信息
故乡往事(未完待续)故乡往事风将蒲公英吹落,自此它的种子回归大地。这是一段记录,关于童年,关于故乡,关于亲情,关于爱与救赎。记忆深处的这一段回忆,予你予他。愿我们都能获得内心的安宁。开篇在我的童年记忆
你听,春天来了(文紫情冬语)你听,春天来了(文紫情冬语)远远近近间或响起的鞭炮声,感觉年的味道愈来愈浓了,虽然又是禁燃鞭炮烟花的一年。鞭炮声响,似乎也是年的声音,辞旧迎新的篇章里写着喜气和年味。不喜放鞭炮,却
你还好吗?我的生活也是头条窗外,寒风催岁月,遍地枯叶黄,冬天萧瑟的情景没有了春秋的诗情画意,在这寒冷的季节,唯有呆在家里才让人感到温暖。灌了一个热水袋,倚窗而坐,伴我的只有西下的夕阳带来的丝
背靠着洁白无瑕的墙上望着窗外落下的雨滴,此时的你在干嘛呢?望着窗外飘着的细雨,看着玻璃窗。上雨水划过留下的水珠,这是季节交替的雨季,冷与暖的交汇就像悲与喜的交感,于是天空再也耐不住乌云的灰沉,下起了雨,落下了泪!人行道边,马路上,一把把颜
体验文字的魅力3一起来学英语吧你觉得孤单就对了,那是让你看清自己的时机。你觉得不被了解就对了,那是让你认清冤家的时机。你觉得黑暗就对了,那样你才分辨得出什么是你的光辉。你觉得无助就对了,那样你才干明白谁是你的贵
武汉印象之悠悠千年卓刀泉是的,这就是卓刀泉!这就是一千八百多年来生生不息倍受尊崇的卓刀泉。这就是在华夏子孙中久享盛名,集忠义仁勇智于一体,儒道佛共同信仰的对象美髯公关羽卓刀于此的卓刀泉。此刻,她静静地伏在
肝好不好,脸知道,养肝护肝从现在开始肝不好,脸知道,你的肝还好吗?脸上出现这三个问题,要小心!1脸色发黄发黑2视力下降,眼睛无神3粉刺丛生肝是如何作出问题的?1过量饮酒酒精对肝细胞有明显的毒性作用。2过度劳累重体力劳
行走的健康卫士,didoG28SPro智能手表让父母更省心父母的年纪越来越大,每每想起心里总是有着不太舒服的情绪,特别是父亲常年被高血压折磨做儿子的却无能为力。为了随时能看到二老的动态,特意让师傅上门给家里装了一台安防摄像头。这段时间在回
卡拉布里亚被罗马逼平让人失望愤怒,但米兰从不惧怕任何挑战直播吧1月10日讯上轮意甲联赛,米兰22遭罗马绝平。赛后在Instagram中,米兰队长卡拉布里亚发文表达了自己的感受,并激励了全队。卡拉布里亚写道对于22的结果,我感到失望和愤怒
人民银行济南分行多措并举,推动全省科创金融事业发展记者高寒2022年以来,中国人民银行济南分行按照济南市科创金融改革试验区建设有关要求及工作部署。在全省范围内,人民银行济南分行也多措并举,加大金融资源对科技创新活动的支持力度加大专
利好!赣州公积金政策有变!事关贷款好消息!赣州市住房公积金管理中心发布通知明确对2023年1月1日起受理的贷款将公积金贷款可贷金额计算公式中的个贷率系数调整为7重大利好!个贷率系数调整为7那么,跟贷款金额又有什么关