老弟问我,RocketMQ中的ProcessQueue怎么理解?
今天来分享 RocketMQ 中一个非常重要又不太好理解的知识点-ProcessQueue。
一句话概括,ProcessQueue 就是 MessageQueue 的消费快照。看下面这张图:
1 ProcessQueue 构建
RocketMQ 客户端启动时,会开启一个 rebalance 线程,代码如下: //MQClientInstance.java public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: //... // Start rebalance service this.rebalanceService.start(); //... } } }
这个线程会不停的做重平衡操作,对 ProcessQueue 进行维护。在重平衡线程类 RebalanceImpl 定义了一个变量 processQueueTable,数据结构如下:
可以看到,在 processQueueTable 这个数据结构上维护了 MessageQueue 和 ProcessQueue 的映射。
下面看一下维护 processQueueTable 的代码: private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet, final boolean isOrder) { boolean changed = false; Iterator> it = this.processQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry next = it.next(); MessageQueue mq = next.getKey(); ProcessQueue pq = next.getValue(); if (mq.getTopic().equals(topic)) { if (!mqSet.contains(mq)) { //从processQueueTable上移除 } else if (pq.isPullExpired()) { switch (this.consumeType()) { case CONSUME_ACTIVELY://拉模式 break; case CONSUME_PASSIVELY://推模式 //从processQueueTable上移除 break; default: break; } } } } //创建ProcessQueue并放到processQueueTable List pullRequestList = new ArrayList(); for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { //... ProcessQueue pq = new ProcessQueue(); long nextOffset = -1L; try { nextOffset = this.computePullFromWhereWithException(mq); } catch (Exception e) { log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq); continue; } if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { //封装好processQueueTable后再创建一个PullRequest进行消息拉取 log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } this.dispatchPullRequest(pullRequestList); return changed; } 2 拉取消息
上一节中构建 ProcessQueue 后,会再创建一个 PullRequest,这个 PullRequest 封装了 MessageQueue 和 ProcessQueue,创建成功后被放到了 PullMessageService 中的 pullRequestQueue 变量: //PullMessageService.java private final LinkedBlockingQueue pullRequestQueue = new LinkedBlockingQueue(); public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }
这里以 RocketMQ 的推模式为例,Consumer 拉取到消息后,会进行如下处理: 对拉取到的消息根据 TAG 再次 进行过滤; 更新 PullRequest 下次拉取的偏移量 nextOffset;
把拉取的消息封装到 ProcessQueue 的 msgTreeMap( 放到 msgTreeMap 之前首先要获取到写锁 treeMapLock ); 封装 ConsumeRequest 进行消息消费; 封装消息拉取请求再次进行拉取。
代码如下: //DefaultMQPushConsumerImpl.java public void onSuccess(PullResult pullResult) { if (pullResult != null) { //1. 对拉取到的消息根据 TAG 再次进行过滤 pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: //2. 更新 PullRequest 下次拉取的偏移量 nextOffset pullRequest.setNextOffset(pullResult.getNextBeginOffset()); if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { //3. 把拉取的消息封装到 ProcessQueue 的 msgTreeMap boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); //4. 封装 ConsumeRequest 进行消息消费 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); //5. 封装消息拉取请求 if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } break; //... } } } 3 消费消息
在上一节提到过,拉取到消息后,会把消息封装成一个 ConsumeRequest,这个线程类会调用消费者定义的 MessageListener 进行消费处理。看一下源代码: //ConsumeMessageConcurrentlyService.ConsumeRequest public void run() { if (this.processQueue.isDropped()) { log.info("the message queue not be able to consume, because it"s dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; } MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try { status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); }//... if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } }
消息消费成功后,会调用 processConsumeResult 方法进行结果处理。对于广播模式,发送失败后不会做重试,相当于把消息丢弃,而对于集群模式,消费失败的消息会发送到 Broker 端等待消费者重新拉取进行重试。
消费结果处理完后,消费成功的消息会从 ProcessQueue 的 msgTreeMap 中移除( 需要获取到写锁 treeMapLock ),同时从 msgTreeMap 中获取最小的 Offset 来更新对应 MessageQueue 的偏移量。这个逻辑可以参考下面代码:public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ) { int ackIndex = context.getAckIndex(); switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; break; //... } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: //... break; case CLUSTERING: List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); //消费失败的,发送回Broker boolean result = this.sendMessageBack(msg, context); //... } break; default: break; } //从msgTreeMap中移除并返回msgTreeMap第一条消息的offset long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } } 4 消费者限流4.1 缓存消息数量
如果消费者缓存的消息数量大于 RocketMQ 配置的阈值(默认 1000),就会触发延迟拉取,而消费者缓存的消息数量就来自 ProcessQueue,看下面代码: long cachedMessageCount = processQueue.getMsgCount().get(); if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); return; } 4.2 缓存的消息大小
如果消费者缓存的消息大小大于 RocketMQ 配置的阈值(默认 100M),就会触发延迟拉取,而消费者缓存的消息大小就来自 ProcessQueue,看下面代码: long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); return; } 4.3 消息间隔
对于普通消息,如果消费偏移量间隔大于配置的阈值(默认 2000),就会触发延迟拉取,而消息间隔就来自 ProcessQueue,看下面代码: if (!this.consumeOrderly) { if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); return; } } 4.4 获取锁失败
对于顺序消息,如果获取锁失败,也会触发延迟拉取,而判断获取锁是否成功,也是在 ProcessQueue,看下面代码: if (processQueue.isLocked()) { //... } else { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } 5 总结
ProcessQueue 是 MessageQueue 的消费快照,可以协助消费者进行消息拉取、消息消费、更新偏移量、限流。最后,看一下 ProcessQueue 的数据结构:
来源:https://mp.weixin.qq.com/s/zB7dM9xt26c6Z04PvYfOmQ
赣江新区新祺周卫生院2022年第二次公开招聘工作人员公告赣江新区新祺周卫生院2022年第二次公开招聘工作人员公告根据工作需要,经研究,赣江新区新祺周卫生院决定面向社会公开招聘工作人员,现将有关事项公告如下一招聘岗位人数及条件本次计划招聘
手机这个功能,谨慎使用!有人因它损失20万现如今手机的功能越来越丰富从最开始的打电话发短信到如今的网购看剧玩游戏只要我们能想到的手机几乎都可以做不过,手机屏幕可不能随便共享!案例一日前,高明的杨女士(化名)接到自称是X东金
第2篇基础(二)编写Qt多窗口程序导语程序要实现的功能是程序开始出现一个对话框,按下按钮后便能进入主窗口,如果直接关闭这个对话框,便不能进入主窗口,整个程序也将退出。当进入主窗口后,我们按下按钮,会弹出一个对话框,
闲情艺致仙女养成记中国小康网独家专稿文沙子我们期待生活中,更多的人关注自己备受岁月摧残的身体,哪怕身体日渐衰老,也能把灵魂锤炼成仙女。感冒了,涕泗横流。宝宝着急,说妈妈我来喂你药。老妈默默做了红糖姜
大哥啊,算我求求你了,自首吧,别再侮辱我的智商了万事皆有可能,这种事情是有可能的。曾经的山西首富,现在怎么这样了。真想知道这是个什么游戏。还有人跟我一样,有恐海症吗?这么丑的鞋子,还是给你自己留着吧。看来它还挺像你奶奶的吗?就是
奥特曼来年新奥不是新盖亚,有防卫队有机甲,变身者是领导阶级令和戴拿德凯奥特曼已经接近尾声,最终BOSS母体斯菲亚扎沃尔斯已经降临。德凯距离结束只剩下两集,很多奥迷应该都在期待令和盖亚的消息,期待着新的大地破坏者和新的B王。然而,如今有情报
iPhone15plus降价千元引热议,苹果这些年喜欢开倒车?快消八谈苹果iPhone12月29日,iPhone15plus有可能降价千元的消息引发舆论关注,而消费者的质疑不解也随之见诸网络,这不仅意味着iPhone14plus的用户将蒙受隐
制造业的困境制造之城广东顺德。笔者调研了数十家制造企业。夕日充满无尽活力的制造之城,当下工业区门可罗雀街道两边不少关门闭户没了机器运转与员工的忙碌陈旧的街区冷落车马稀。。制造业,国之本。制造业
详规改革与实践重庆创新场景规划营造情感空间编者按深入贯彻党的二十大报告关于提高城市规划建设治理水平的精神,全面落实全国国土空间规划纲要(20212035年)要求,就要有效发挥详细规划上承总体规划下接实施治理的关键作用,推动
豫股2022谁的市值笑到最后?谁更受资本热捧?编者按壬寅年末,2022即将收官。这一年,中国经济经历了复杂考验,三重压力不减。全国上下努力拼经济,以拼达稳,韧性发展。值此年终岁末之际,大河财立方推出盘点2022策划,带你一起回
近70亿元增持合肥10。5代线京东方加码大尺寸面板业务12月29日,京东方(000725)发布公告,拟以69。29亿元受让合肥兴融所持合肥京东方显示技术有限公司(以下简称合肥京东方)28。33的股权。受让完成后,京东方在合肥京东方中的