RocketMQ源码分析之consumer消息拉取PullMessageProcessor
#头条创作挑战赛#一、前言
当consumer向broker发起RequestCode.PULL_MESSAGE消息拉取请求时,broker是怎么处理的呢?带着这个疑问,就开始分析消息拉取过程中broker端的处理流程;
从BrokerController的registerProcessor方法注册的事件处理器以及我们之前分析的BrokerController核心组件来看,consumer消息拉取处理组件为:PullMessageProcessor;
二、源码导读类继承关系及构造方法;处理请求方法;处理请求方法涉及到的核心值对象;从messageStore中拉取到具体的消息;
注:处理请求方法中延迟执行拉取消息逻辑是通过拉取消息长轮询挂起服务 PullRequestHoldService进行实现的,这个会放在下一篇进行分析;三、源码分析1、类继承关系及构造方法// 拉取消息处理组件 public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; // 消费消息回调钩子 private List consumeMessageHookList; public PullMessageProcessor(final BrokerController brokerController) { this.brokerController = brokerController; } }
2、处理请求方法// netty网络服务器收到的拉取消息请求的处理组件 @Override public RemotingCommand processRequest( final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { return this.processRequest(ctx.channel(), request, true); }
RocketMQ作者将这个方法写的比较长,建议大家写的时候过长的方法还是拆分一下;处理请求方法里面最重要的逻辑就是找消息存储组件查询到这一次要拉取的消息org.apache.rocketmq.store.DefaultMessageStore#getMessage ;这个方法是MessageStore 消息存储组件中的,这个后续会进行详细分析,这一篇就只分析getMessage方法
我们先以processRequest作为入口进行分析:Channel 结合NIO中的Channel概念,进行理解 RemotingCommand 传入的请求参数,这个在之前的分析中提到过 brokerAllowSuspend 是否允许被挂起,也就是是否允许在未找到消息的时候,暂时挂起处理线程,第一次传入的参数默认为true。 // 处理拉取消息请求 private RemotingCommand processRequest( final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException { final long beginTimeMills = this.brokerController.getMessageStore().now(); RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader( PullMessageRequestHeader.class); response.setOpaque(request.getOpaque()); log.debug("receive PullMessage request command, {}", request); // 校验当前 broker 是否可读 if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1())); return response; } // 查询或者自动创建 consumerGroup 对应的 subscriptionGroupConfig // 保存subscriptionGroupConfig信息到UsersxxxstoreconfigsubscriptionGroup.json中 SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager() .findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST))); return response; } if (!subscriptionGroupConfig.isConsumeEnable()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup()); return response; } // 如果没有消息时,是否在 broker 端挂起等待,默认true final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag()); // 是否消息拉取时就提交 offset final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag()); // 请求头是否包含订阅信息 final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag()); // 获取挂起超时 final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0; // 当前这个请求是要拉取哪个topic的数据 TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig( requestHeader.getTopic()); if (null == topicConfig) { log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel)); response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL))); return response; } // 判断读的权限 if (!PermName.isReadable(topicConfig.getPerm())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden"); return response; } // 判断 queueId 是否超过读取队列数 if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) { String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress()); log.warn(errorInfo); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorInfo); return response; } // 第一个是拿消费组对这个topic的订阅数据 SubscriptionData subscriptionData = null; // 消费者过滤数据 ConsumerFilterData consumerFilterData = null; // 如果说有一个订阅标识 if (hasSubscriptionFlag) { try { // 针对某个topic,去进行一定的订阅,支持某一种表达式类型 subscriptionData = FilterAPI.build( requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType() ); // 如果说过滤表达式的类型不是tag标签 if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) { // 构建针对topic,消费组做一个订阅,我的表达式类型,子版本号 consumerFilterData = ConsumerFilterManager.build( requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(), requestHeader.getExpressionType(), requestHeader.getSubVersion() ); assert consumerFilterData != null; } } catch (Exception e) { log.warn("Parse the consumer"s subscription[{}] failed, group: {}", requestHeader.getSubscription(), requestHeader.getConsumerGroup()); response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); response.setRemark("parse the consumer"s subscription failed"); return response; } } // 如果说没有订阅标识 else { // 一开始你需要跟我建立一个连接,连接建立了以后消费组和消费者都会归consumer manager来管理 // 先获取到你的消费组的信息 ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo( requestHeader.getConsumerGroup()); if (null == consumerGroupInfo) { log.warn("the consumer"s group info not exist, group: {}", requestHeader.getConsumerGroup()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); response.setRemark("the consumer"s group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); return response; } if (!subscriptionGroupConfig.isConsumeBroadcastEnable() && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way"); return response; } // 从我的消费组订阅的topic里的订阅数据获取出来 subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()); if (null == subscriptionData) { log.warn("the consumer"s subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); response.setRemark("the consumer"s subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); return response; } if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) { log.warn("The broker"s subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(), subscriptionData.getSubString()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST); response.setRemark("the consumer"s subscription not latest"); return response; } if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) { consumerFilterData = this.brokerController.getConsumerFilterManager().get( requestHeader.getTopic(), requestHeader.getConsumerGroup() ); if (consumerFilterData == null) { response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST); response.setRemark("The broker"s consumer filter data is not exist!Your expression may be wrong!"); return response; } if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) { log.warn("The broker"s consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}", requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion()); response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST); response.setRemark("the consumer"s consumer filter data not latest"); return response; } } } // 校验表达式类型 if (!ExpressionType.isTagType(subscriptionData.getExpressionType()) && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType()); return response; } // 构建出来一个消息过滤器 MessageFilter messageFilter; // 重试消息是否启用过滤 if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) { messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager()); } else { messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager()); } // 找消息存储组件查询到这一次我要拉取的消息 final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage( requestHeader.getConsumerGroup(), // 消费组,谁 requestHeader.getTopic(), // topic,对哪个topic requestHeader.getQueueId(), // queueId,对topic里的哪个queueId requestHeader.getQueueOffset(), // queue偏移量,从哪个queue的offset偏移量开始 requestHeader.getMaxMsgNums(), // 最大的消息数量,拉取多少条数据 messageFilter // 消息过滤器,如何过滤数据 ); // 如果说拉取到了数据以后 if (getMessageResult != null) { response.setRemark(getMessageResult.getStatus().name()); responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); responseHeader.setMinOffset(getMessageResult.getMinOffset()); responseHeader.setMaxOffset(getMessageResult.getMaxOffset()); // 拉取消息是有一个读写分离的概念,正常情况下,我写入和拉取消息都是针对master节点来的 // 有可能master节点负载和压力很大,我就会建议你从slave节点来拉取 if (getMessageResult.isSuggestPullingFromSlave()) { // 设置SuggestWhichBrokerId为1 responseHeader.setSuggestWhichBrokerId( subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly() ); } else { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) { case ASYNC_MASTER: case SYNC_MASTER: break; // 如果本机是slave角色 case SLAVE: // 并且不允许从slave获取数据 if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) { response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } break; } // 设置建议读取消息的节点 if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) { // consume too slow ,redirect to another machine // 消费太慢,指向从节点 if (getMessageResult.isSuggestPullingFromSlave()) { responseHeader.setSuggestWhichBrokerId( subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly() ); } // consume ok else { responseHeader.setSuggestWhichBrokerId( subscriptionGroupConfig.getBrokerId() ); } } else { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } switch (getMessageResult.getStatus()) { case FOUND: response.setCode(ResponseCode.SUCCESS); break; case MESSAGE_WAS_REMOVING: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); break; case NO_MATCHED_LOGIC_QUEUE: case NO_MESSAGE_IN_QUEUE: if (0 != requestHeader.getQueueOffset()) { response.setCode(ResponseCode.PULL_OFFSET_MOVED); // XXX: warn and notify me log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", requestHeader.getQueueOffset(), getMessageResult.getNextBeginOffset(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getConsumerGroup() ); } else { response.setCode(ResponseCode.PULL_NOT_FOUND); } break; case NO_MATCHED_MESSAGE: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); break; case OFFSET_FOUND_NULL: response.setCode(ResponseCode.PULL_NOT_FOUND); break; case OFFSET_OVERFLOW_BADLY: response.setCode(ResponseCode.PULL_OFFSET_MOVED); // XXX: warn and notify me log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}", requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress()); break; case OFFSET_OVERFLOW_ONE: response.setCode(ResponseCode.PULL_NOT_FOUND); break; case OFFSET_TOO_SMALL: response.setCode(ResponseCode.PULL_OFFSET_MOVED); log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}", requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(), getMessageResult.getMinOffset(), channel.remoteAddress()); break; default: assert false; break; } // 执行消费消息的hook函数 if (this.hasConsumeMessageHook()) { ConsumeMessageContext context = new ConsumeMessageContext(); context.setConsumerGroup(requestHeader.getConsumerGroup()); context.setTopic(requestHeader.getTopic()); context.setQueueId(requestHeader.getQueueId()); String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); switch (response.getCode()) { case ResponseCode.SUCCESS: int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount; context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS); context.setCommercialRcvTimes(incValue); context.setCommercialRcvSize(getMessageResult.getBufferTotalSize()); context.setCommercialOwner(owner); break; case ResponseCode.PULL_NOT_FOUND: if (!brokerAllowSuspend) { context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS); context.setCommercialRcvTimes(1); context.setCommercialOwner(owner); } break; case ResponseCode.PULL_RETRY_IMMEDIATELY: case ResponseCode.PULL_OFFSET_MOVED: context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS); context.setCommercialRcvTimes(1); context.setCommercialOwner(owner); break; default: assert false; break; } // 执行消费消息回调钩子 this.executeConsumeMessageHookBefore(context); } switch (response.getCode()) { // 请求成功 case ResponseCode.SUCCESS: // 数据统计 this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), getMessageResult.getMessageCount()); this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(), getMessageResult.getBufferTotalSize()); this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount()); // 从内存发送数据 if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), (int) (this.brokerController.getMessageStore().now() - beginTimeMills)); response.setBody(r); } // 从磁盘发送数据 else { try { FileRegion fileRegion = new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult); channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { getMessageResult.release(); if (!future.isSuccess()) { log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause()); } } }); } catch (Throwable e) { log.error("transfer many message by pagecache exception", e); getMessageResult.release(); } response = null; } break; case ResponseCode.PULL_NOT_FOUND: // hasSuspendFlag, 构建消息拉取时的拉取标记,默认为true if (brokerAllowSuspend && hasSuspendFlag) { // 取自 DefaultMQPullConsumer 的 brokerSuspendMaxTimeMillis属性 long pollingTimeMills = suspendTimeoutMillisLong; // 如果不支持长轮询,则忽略 brokerSuspendMaxTimeMillis 属性, // 使用 shortPollingTimeMills,默认为1000ms作为下一次拉取消息的等待时间 if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); } String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); // 创建 PullRequest, 然后提交给 PullRequestHoldService 线程去调度,触发消息拉取 PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); // pullRequestHoldService 定时线程,最大延迟5秒判断是否有消息到达,然后执行消息的拉取 this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); // 设置response=null,则此时此次调用不会向客户端输出任何字节,客户端网络请求客户端的读事件不会触发,不会触发对响应结果的处理,处于等待状态 response = null; break; } case ResponseCode.PULL_RETRY_IMMEDIATELY: break; case ResponseCode.PULL_OFFSET_MOVED: if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) { MessageQueue mq = new MessageQueue(); mq.setTopic(requestHeader.getTopic()); mq.setQueueId(requestHeader.getQueueId()); mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); OffsetMovedEvent event = new OffsetMovedEvent(); event.setConsumerGroup(requestHeader.getConsumerGroup()); event.setMessageQueue(mq); event.setOffsetRequest(requestHeader.getQueueOffset()); event.setOffsetNew(getMessageResult.getNextBeginOffset()); this.generateOffsetMovedEvent(event); log.warn( "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}", requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(), responseHeader.getSuggestWhichBrokerId()); } else { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}", requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(), responseHeader.getSuggestWhichBrokerId()); } break; default: assert false; } } else { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("store getMessage return null"); } boolean storeOffsetEnable = brokerAllowSuspend; storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; // 基于broker来存储你最新提交的offset偏移量 if (storeOffsetEnable) { // 提交offset,保存在内存中ConsumerOffsetManager.offsetTable中 this.brokerController.getConsumerOffsetManager().commitOffset( RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset() ); } return response; }3、处理请求方法涉及到的核心值对象public class SubscriptionGroupConfig { // 消费组名称 private String groupName; // 是否启用消费 private boolean consumeEnable = true; // 是否启用从最小偏移量开始消费 private boolean consumeFromMinEnable = true; // 是否启用消费广播 private boolean consumeBroadcastEnable = true; // 重试队列数量 private int retryQueueNums = 1; // 重试最大次数 private int retryMaxTimes = 16; // masterid private long brokerId = MixAll.MASTER_ID; // 慢消费的时候选用哪个broker private long whichBrokerWhenConsumeSlowly = 1; // 是否启用通知消费者ids变化 private boolean notifyConsumerIdsChangedEnable = true; }// 核心的topic元数据结构 public class TopicConfig { private static final String SEPARATOR = " "; public static int defaultReadQueueNums = 16; // 默认的topic是有16个read queue和write queue public static int defaultWriteQueueNums = 16; private String topicName; // 当前你的broker会告诉你的这个nameserver,我这里放了哪些topic的队列 private int readQueueNums = defaultReadQueueNums; // 我的这个broker机器对这个队列,放了多少个read queue和write queue private int writeQueueNums = defaultWriteQueueNums; private int perm = PermName.PERM_READ | PermName.PERM_WRITE; private TopicFilterType topicFilterType = TopicFilterType.SINGLE_TAG; // 默认的topic过滤类型是基于tag private int topicSysFlag = 0; private boolean order = false; public TopicConfig() { } }// 消费组信息 public class ConsumerGroupInfo { private static final InternalLogger log = InternalLoggerFactory.getLogger( LoggerName.BROKER_LOGGER_NAME); // 消费组名称 private final String groupName; // 消费组订阅数据,这个消费组订阅了哪些topic private final ConcurrentMap subscriptionTable = new ConcurrentHashMap(); // 消费组跟broker之间的各个网络连接 private final ConcurrentMap channelInfoTable = new ConcurrentHashMap(16); // 消费类型,pull模型还是push模型 private volatile ConsumeType consumeType; // 消息模型,集群模式还是广播模式 private volatile MessageModel messageModel; // 从哪里开始消费的策略 private volatile ConsumeFromWhere consumeFromWhere; // 最近一次更新时间戳 private volatile long lastUpdateTimestamp = System.currentTimeMillis(); public ConsumerGroupInfo(String groupName, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere) { this.groupName = groupName; this.consumeType = consumeType; this.messageModel = messageModel; this.consumeFromWhere = consumeFromWhere; } }public class SubscriptionData implements Comparable { public final static String SUB_ALL = "*"; // 是否启用类过滤模式 private boolean classFilterMode = false; // topic主题 private String topic; // 子字符串 private String subString; // tags private Set tagsSet = new HashSet(); // code private Set codeSet = new HashSet(); private long subVersion = System.currentTimeMillis(); private String expressionType = ExpressionType.TAG; @JSONField(serialize = false) private String filterClassSource; public SubscriptionData() { } }// 消费者客户端网络连接信息 public class ClientChannelInfo { // 消费者客户端网络连接 private final Channel channel; // 消费者客户端网络连接id private final String clientId; // 编程语言code private final LanguageCode language; // 版本号 private final int version; // 最近一次更新时间戳 private volatile long lastUpdateTimestamp = System.currentTimeMillis(); public ClientChannelInfo(Channel channel) { this(channel, null, null, 0); } public ClientChannelInfo(Channel channel, String clientId, LanguageCode language, int version) { this.channel = channel; this.clientId = clientId; this.language = language; this.version = version; } }4、从messageStore中拉取到具体的消息
根据配置实例化messageFilter,并且从messageStore中拉取到具体的消息。可以看到这里就是方法的核心,接下来继续看getMessage是如何拉取到消息的 public GetMessageResult getMessage( final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter) { // 判断 store 是否关闭 if (this.shutdown) { log.warn("message store has shutdown, so getMessage is forbidden"); return null; } // 判断当前运行状态是否可读 if (!this.runningFlags.isReadable()) { log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits()); return null; } if (MixAll.isLmq(topic) && this.isLmqConsumeQueueNumExceeded()) { log.warn("message store is not available, broker config enableLmq and enableMultiDispatch, lmq consumeQueue num exceed maxLmqConsumeQueueNum config num"); return null; } long beginTime = this.getSystemClock().now(); GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; // 待查找队列的偏移量 long nextBeginOffset = offset; // 当前队列的最小偏移量 long minOffset = 0; // 当前队列的最大偏移量 long maxOffset = 0; // lazy init when find msg. GetMessageResult getResult = null; // 当前commitLog的最大偏移量 final long maxOffsetPy = this.commitLog.getMaxOffset(); // 先获取到这个queueId对应的一个consumequeue ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) { // 获取 ConsumeQueue 的最小逻辑 offset minOffset = consumeQueue.getMinOffsetInQueue(); // 最大逻辑offset maxOffset = consumeQueue.getMaxOffsetInQueue(); // 消息队列无数据 if (maxOffset == 0) { status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } // 查询的队列offset太小 else if (offset < minOffset) { status = GetMessageStatus.OFFSET_TOO_SMALL; nextBeginOffset = nextOffsetCorrection(offset, minOffset); } // 查询的offset溢出一个 else if (offset == maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_ONE; nextBeginOffset = nextOffsetCorrection(offset, offset); } // 查询的队列offset过大 else if (offset > maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; if (0 == minOffset) { nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else { nextBeginOffset = nextOffsetCorrection(offset, maxOffset); } } else { // 根据指定的offset可以去查找出一段consumequeue里面的数据 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); // 用于检测bufferConsumeQueue中每个offset对应的物理偏移量的commitLog数据是否存在 if (bufferConsumeQueue != null) { try { // 预先设置状态未NO_MATCHED_MESSAGE status = GetMessageStatus.NO_MATCHED_MESSAGE; long nextPhyFileStartOffset = Long.MIN_VALUE; long maxPhyOffsetPulling = 0; int i = 0; // 最多需要校验的消息条数 final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); // 是否记录消费落后磁盘量 final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded(); getResult = new GetMessageResult(maxMsgNums); ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 对这段consumequeue里面的数据,每一条数据都是一个消息offset+sizes+tags // 读取第1-8个字节为物理偏移量offsetPy long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); // 第9-12个字节为消息大小sizePy int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); // 第13-16个字节为tagsCode long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); maxPhyOffsetPulling = offsetPy; // 表示上一轮的消息,未在commitLog获取到 if (nextPhyFileStartOffset != Long.MIN_VALUE) { // 如果下一个消息起始offset大于当前的要获取的offsetPy if (offsetPy < nextPhyFileStartOffset) continue; } // 通过最大偏移量-当前数据偏移量和内存大小作比较 // 判断数据是否可以从内存获取 boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy); // 检查拉取的消息总大小是否到达上限,如果达到则中止这次消息拉取 if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) { break; } boolean extRet = false, isTagsCodeLegal = true; // 判断tagsCode是否小于等于0 if (consumeQueue.isExtAddr(tagsCode)) { // 读取下一条消息,保存到cqExtUnit中 extRet = consumeQueue.getExt(tagsCode, cqExtUnit); if (extRet) { tagsCode = cqExtUnit.getTagsCode(); } else { // can"t find ext content.Client will filter messages by tag also. log.error("[BUG] can"t find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}", tagsCode, offsetPy, sizePy, topic, group); isTagsCodeLegal = false; } } // 消息过滤,如果匹配不成功并且消息为空,暂时设置状态为NO_MATCHED_MESSAGE // 匹配成功有以下几种情况 // SubscriptionData对象为空; // SubscriptionData.classFilterMode变量为true; // SubscriptionData对象的subString变量等于*; // SubscriptionData对象的codeSet集合包含tagsCode值; if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } continue; } // 根据consumequeue里指定的消息物理便偏移量,去commitlog查询一条消息 SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); if (null == selectResult) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; } // 未读取到消息,表示对应的mappedFile已经删除,从下一个文件的起始位置开始读取消息 nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); continue; } // 过滤真实的消息 if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } // release... selectResult.release(); continue; } // 获取消息计数 this.storeStatsService.getGetMessageTransferedMsgCount().add(1); // 他会把这条消息数据添加到结果里面去 getResult.addMessage(selectResult); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; } if (diskFallRecorded) { // 消费落后的数据量 = 最大offset - 此次消费的最大offset long fallBehind = maxOffsetPy - maxPhyOffsetPulling; // 记录消费落后的数据量 brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind); } // 下一个开始的offset nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long diff = maxOffsetPy - maxPhyOffsetPulling; // 内存大小 long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); // 当消费进度落后量大于物理内存时,建议调换到从库去处理读 getResult.setSuggestPullingFromSlave(diff > memory); } finally { bufferConsumeQueue.release(); } } else { // 没找到消息 status = GetMessageStatus.OFFSET_FOUND_NULL; // 设置从下一个mappedFile相同位置开始读取 nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset)); log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: " + maxOffset + ", but access logic queue failed."); } } } else { status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } // 统计拉取消息和未拉取到消息的次数 if (GetMessageStatus.FOUND == status) { this.storeStatsService.getGetMessageTimesTotalFound().add(1); } else { this.storeStatsService.getGetMessageTimesTotalMiss().add(1); } long elapsedTime = this.getSystemClock().now() - beginTime; this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime); // lazy init no data found. if (getResult == null) { getResult = new GetMessageResult(0); } getResult.setStatus(status); getResult.setNextBeginOffset(nextBeginOffset); getResult.setMaxOffset(maxOffset); getResult.setMinOffset(minOffset); return getResult; }四、总结前置判断;调用MessageStore的getMessage方法拉取消息;执行消费消息回调钩子;基于broker来存储你最新提交的offset偏移量;
消息拉取为了提高网络性能,在消息服务端根据拉取偏移量去物理文件查找消息时没有找到,并不立即返回消息未找到,而是会将该线程挂起一段时间,然后再次重试,直到重试。挂起分为长轮询或短轮询,在broker 端可以通过 longPollingEnable=true 来开启长轮询。
注:下文会分析长轮询和短轮询;