范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

老弟问我,RocketMQ中的ProcessQueue怎么理解?

  今天来分享 RocketMQ 中一个非常重要又不太好理解的知识点-ProcessQueue。
  一句话概括,ProcessQueue 就是 MessageQueue 的消费快照。看下面这张图:
  1 ProcessQueue 构建
  RocketMQ 客户端启动时,会开启一个 rebalance 线程,代码如下: //MQClientInstance.java public void start() throws MQClientException {  synchronized (this) {   switch (this.serviceState) {    case CREATE_JUST:     //...     // Start rebalance service     this.rebalanceService.start();    //...   }  } }
  这个线程会不停的做重平衡操作,对 ProcessQueue 进行维护。在重平衡线程类 RebalanceImpl 定义了一个变量 processQueueTable,数据结构如下:
  可以看到,在 processQueueTable 这个数据结构上维护了 MessageQueue 和 ProcessQueue 的映射。
  下面看一下维护 processQueueTable 的代码: private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet,  final boolean isOrder) {  boolean changed = false;   Iterator> it = this.processQueueTable.entrySet().iterator();  while (it.hasNext()) {   Entry next = it.next();   MessageQueue mq = next.getKey();   ProcessQueue pq = next.getValue();    if (mq.getTopic().equals(topic)) {    if (!mqSet.contains(mq)) {     //从processQueueTable上移除    } else if (pq.isPullExpired()) {     switch (this.consumeType()) {      case CONSUME_ACTIVELY://拉模式       break;      case CONSUME_PASSIVELY://推模式       //从processQueueTable上移除       break;      default:       break;     }    }   }  }     //创建ProcessQueue并放到processQueueTable  List pullRequestList = new ArrayList();  for (MessageQueue mq : mqSet) {   if (!this.processQueueTable.containsKey(mq)) {    //...    ProcessQueue pq = new ProcessQueue();     long nextOffset = -1L;    try {     nextOffset = this.computePullFromWhereWithException(mq);    } catch (Exception e) {     log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);     continue;    }     if (nextOffset >= 0) {     ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);     if (pre != null) {      log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);     } else {         //封装好processQueueTable后再创建一个PullRequest进行消息拉取      log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);      PullRequest pullRequest = new PullRequest();      pullRequest.setConsumerGroup(consumerGroup);      pullRequest.setNextOffset(nextOffset);      pullRequest.setMessageQueue(mq);      pullRequest.setProcessQueue(pq);      pullRequestList.add(pullRequest);      changed = true;     }    } else {     log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);    }   }  }   this.dispatchPullRequest(pullRequestList);   return changed; } 2 拉取消息
  上一节中构建 ProcessQueue 后,会再创建一个 PullRequest,这个 PullRequest 封装了 MessageQueue 和 ProcessQueue,创建成功后被放到了 PullMessageService 中的 pullRequestQueue 变量: //PullMessageService.java private final LinkedBlockingQueue pullRequestQueue = new LinkedBlockingQueue();  public void executePullRequestImmediately(final PullRequest pullRequest) {  try {   this.pullRequestQueue.put(pullRequest);  } catch (InterruptedException e) {   log.error("executePullRequestImmediately pullRequestQueue.put", e);  } }
  这里以 RocketMQ 的推模式为例,Consumer 拉取到消息后,会进行如下处理: 对拉取到的消息根据 TAG 再次 进行过滤; 更新 PullRequest 下次拉取的偏移量 nextOffset;
  把拉取的消息封装到 ProcessQueue 的 msgTreeMap( 放到 msgTreeMap 之前首先要获取到写锁 treeMapLock ); 封装 ConsumeRequest 进行消息消费; 封装消息拉取请求再次进行拉取。
  代码如下: //DefaultMQPushConsumerImpl.java public void onSuccess(PullResult pullResult) {  if (pullResult != null) {      //1. 对拉取到的消息根据 TAG 再次进行过滤   pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,    subscriptionData);    switch (pullResult.getPullStatus()) {    case FOUND:     //2. 更新 PullRequest 下次拉取的偏移量 nextOffset     pullRequest.setNextOffset(pullResult.getNextBeginOffset());          if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {      DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);     } else {      //3. 把拉取的消息封装到 ProcessQueue 的 msgTreeMap      boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());      //4. 封装 ConsumeRequest 进行消息消费      DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(       pullResult.getMsgFoundList(),       processQueue,       pullRequest.getMessageQueue(),       dispatchToConsume);                     //5. 封装消息拉取请求      if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {       DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());      } else {       DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);      }     }     break;    //...   }  } } 3 消费消息
  在上一节提到过,拉取到消息后,会把消息封装成一个 ConsumeRequest,这个线程类会调用消费者定义的 MessageListener 进行消费处理。看一下源代码: //ConsumeMessageConcurrentlyService.ConsumeRequest public void run() {  if (this.processQueue.isDropped()) {   log.info("the message queue not be able to consume, because it"s dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);   return;  }   MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;  ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);  ConsumeConcurrentlyStatus status = null;   try {   status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);  }//...   if (!processQueue.isDropped()) {   ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);  } }
  消息消费成功后,会调用 processConsumeResult 方法进行结果处理。对于广播模式,发送失败后不会做重试,相当于把消息丢弃,而对于集群模式,消费失败的消息会发送到 Broker 端等待消费者重新拉取进行重试。
  消费结果处理完后,消费成功的消息会从 ProcessQueue 的 msgTreeMap 中移除( 需要获取到写锁 treeMapLock ),同时从 msgTreeMap 中获取最小的 Offset 来更新对应 MessageQueue 的偏移量。这个逻辑可以参考下面代码:public void processConsumeResult(  final ConsumeConcurrentlyStatus status,  final ConsumeConcurrentlyContext context,  final ConsumeRequest consumeRequest ) {  int ackIndex = context.getAckIndex();   switch (status) {   case CONSUME_SUCCESS:    if (ackIndex >= consumeRequest.getMsgs().size()) {     ackIndex = consumeRequest.getMsgs().size() - 1;    }    int ok = ackIndex + 1;    break;   //...  }  switch (this.defaultMQPushConsumer.getMessageModel()) {   case BROADCASTING:    //...    break;   case CLUSTERING:    List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {     MessageExt msg = consumeRequest.getMsgs().get(i);     //消费失败的,发送回Broker     boolean result = this.sendMessageBack(msg, context);     //...    }     break;   default:    break;  }     //从msgTreeMap中移除并返回msgTreeMap第一条消息的offset  long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());  if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {   this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);  } } 4 消费者限流4.1 缓存消息数量
  如果消费者缓存的消息数量大于 RocketMQ 配置的阈值(默认 1000),就会触发延迟拉取,而消费者缓存的消息数量就来自 ProcessQueue,看下面代码: long cachedMessageCount = processQueue.getMsgCount().get(); if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);  return; } 4.2 缓存的消息大小
  如果消费者缓存的消息大小大于 RocketMQ 配置的阈值(默认 100M),就会触发延迟拉取,而消费者缓存的消息大小就来自 ProcessQueue,看下面代码: long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);  return; } 4.3 消息间隔
  对于普通消息,如果消费偏移量间隔大于配置的阈值(默认 2000),就会触发延迟拉取,而消息间隔就来自 ProcessQueue,看下面代码: if (!this.consumeOrderly) {  if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {   this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);   return;  } } 4.4 获取锁失败
  对于顺序消息,如果获取锁失败,也会触发延迟拉取,而判断获取锁是否成功,也是在 ProcessQueue,看下面代码: if (processQueue.isLocked()) {  //... } else {  this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } 5 总结
  ProcessQueue 是 MessageQueue 的消费快照,可以协助消费者进行消息拉取、消息消费、更新偏移量、限流。最后,看一下 ProcessQueue 的数据结构:
  来源:https://mp.weixin.qq.com/s/zB7dM9xt26c6Z04PvYfOmQ

赣江新区新祺周卫生院2022年第二次公开招聘工作人员公告赣江新区新祺周卫生院2022年第二次公开招聘工作人员公告根据工作需要,经研究,赣江新区新祺周卫生院决定面向社会公开招聘工作人员,现将有关事项公告如下一招聘岗位人数及条件本次计划招聘手机这个功能,谨慎使用!有人因它损失20万现如今手机的功能越来越丰富从最开始的打电话发短信到如今的网购看剧玩游戏只要我们能想到的手机几乎都可以做不过,手机屏幕可不能随便共享!案例一日前,高明的杨女士(化名)接到自称是X东金第2篇基础(二)编写Qt多窗口程序导语程序要实现的功能是程序开始出现一个对话框,按下按钮后便能进入主窗口,如果直接关闭这个对话框,便不能进入主窗口,整个程序也将退出。当进入主窗口后,我们按下按钮,会弹出一个对话框,闲情艺致仙女养成记中国小康网独家专稿文沙子我们期待生活中,更多的人关注自己备受岁月摧残的身体,哪怕身体日渐衰老,也能把灵魂锤炼成仙女。感冒了,涕泗横流。宝宝着急,说妈妈我来喂你药。老妈默默做了红糖姜大哥啊,算我求求你了,自首吧,别再侮辱我的智商了万事皆有可能,这种事情是有可能的。曾经的山西首富,现在怎么这样了。真想知道这是个什么游戏。还有人跟我一样,有恐海症吗?这么丑的鞋子,还是给你自己留着吧。看来它还挺像你奶奶的吗?就是奥特曼来年新奥不是新盖亚,有防卫队有机甲,变身者是领导阶级令和戴拿德凯奥特曼已经接近尾声,最终BOSS母体斯菲亚扎沃尔斯已经降临。德凯距离结束只剩下两集,很多奥迷应该都在期待令和盖亚的消息,期待着新的大地破坏者和新的B王。然而,如今有情报iPhone15plus降价千元引热议,苹果这些年喜欢开倒车?快消八谈苹果iPhone12月29日,iPhone15plus有可能降价千元的消息引发舆论关注,而消费者的质疑不解也随之见诸网络,这不仅意味着iPhone14plus的用户将蒙受隐制造业的困境制造之城广东顺德。笔者调研了数十家制造企业。夕日充满无尽活力的制造之城,当下工业区门可罗雀街道两边不少关门闭户没了机器运转与员工的忙碌陈旧的街区冷落车马稀。。制造业,国之本。制造业详规改革与实践重庆创新场景规划营造情感空间编者按深入贯彻党的二十大报告关于提高城市规划建设治理水平的精神,全面落实全国国土空间规划纲要(20212035年)要求,就要有效发挥详细规划上承总体规划下接实施治理的关键作用,推动豫股2022谁的市值笑到最后?谁更受资本热捧?编者按壬寅年末,2022即将收官。这一年,中国经济经历了复杂考验,三重压力不减。全国上下努力拼经济,以拼达稳,韧性发展。值此年终岁末之际,大河财立方推出盘点2022策划,带你一起回近70亿元增持合肥10。5代线京东方加码大尺寸面板业务12月29日,京东方(000725)发布公告,拟以69。29亿元受让合肥兴融所持合肥京东方显示技术有限公司(以下简称合肥京东方)28。33的股权。受让完成后,京东方在合肥京东方中的
北约成员国立陶宛挑战俄罗斯战略底线当地时间6月21日,欧盟驻俄罗斯大使马库斯埃德雷尔在莫斯科接受俄外交部召见后离开。俄方要求立陶宛立即取消禁止经立境内铁路向加里宁格勒州运输大部分种类货物的限制,立方称这是根据欧盟对3岁男孩竟有召唤蚊子的技能?引470万人在线围观,网友揭露原因真蹊跷!一个三岁的小男孩,仅凭一个字母的发音就能召唤出满屏的蚊子,他是怎么做到的?视频拍摄地点是湖北的恩施,从视频上来看应该是下午五六点钟的时候,一个穿黑衣服的三岁小男孩蹲在地上,花木兰荣耀典藏皮肤上线,日月双形态特效,重剑一技能红日玄鸟S28赛季将在6月23号正式开启,大家期待已久的花木兰的荣耀典藏皮肤,也将伴随新赛季一起上线。花木兰九霄神明以日月更替为创作灵感,与花木兰双形态的玩法相结合,接下来就带大家一起看看柯文哲称金厦大桥可反攻大陆,朱立伦态度大变,呼吁两岸对话几天前,台北市长柯文哲突然抛出兴建金厦大桥的主张,震动岛内舆论,蓝绿白各阵营政客纷纷站队表明态度,两种泾渭分明的观点持续激烈碰撞。支持者认为兴建大桥的想法非常好,对两岸经济往来大有主张兴建金厦大桥挨批无知柯文哲是骂蔡英文吗来源华夏经纬网台北市长民众党主席柯文哲日前以党主席身份到金门,主张兴建金门到厦门的跨海大桥,却遭到各界猛烈砲轰。(台湾中时电子报资料照片)图片来源台湾中时新闻网据台湾中时新闻网报道民国时期倡导天乳运动,严查妇女束胸,发现一次罚金50大洋民国是一个非常特殊的历史时期,虽然仅有37年的时间,但却是一个大师辈出名流涌现的时代。在这个既短暂又充满矛盾的时代里,各种思想潮流相互碰撞。尤其是在面对解放女性的问题上,更是表现出29岁女孩找到亲生父母后肝癌离世作为第3个女儿被送养,孕期患癌被丈夫抛弃,临终前找回血缘之亲病好些了吗?想起潘丽芬,记者给她发了一声微信问候。上一次采访她的时候,她已经被诊断为肝癌。习惯秒回的潘丽芬,一直没有回复。点开她的微信朋友圈,她已经一个月以上没有更新。拨打她手机,王思聪与两年轻女子约会!热心帮拎行李显绅士,两女身材高挑迷人34岁王思聪外出被拍到34岁王思聪,可以说正在为自己下半生的幸福而努力奋斗中。早前携带年轻女友前往上海新天地疯狂购物后,又被网友拍打要携带女友外出游玩。还屁颠屁颠地跟在女子后面帮忙祥源新材(300980。SZ)减持数过半量科高投及其一致行动人累计减持1。54股份格隆汇6月23日丨祥源新材(300980。SZ)公布,公司于2022年6月23日收到持股5以上股东湖北量科高投创业投资有限公司(简称量科高投)出具的湖北量科高投创业投资有限公司及其相互尊重是家庭幸福的基础夫妻之间只有信任,才能让婚姻更甜蜜也只有尊重,才能营造出一个良好的家庭氛围。有些男人认为婚姻是一个人吃饱,全家不饿,不能有一点委屈,女方也不能责怪自己的缺点。假如一个男人真的存在这翁帆和杨振宁结婚18年了,杨振宁100岁精神矍铄,她变化很大2004年,28岁的翁帆和82岁的杨振宁共同走进了婚姻的殿堂。54岁的年龄差距让他们一下子处于舆论的漩涡中间,人们评论这是一对老夫少妻组合,男方贪图年轻和美貌,而女方贪图名和利。甚