RocketMQ消息的存储
from:cnblogs.com/shanml/p/16428961.htmBroker对消息的处理
BrokerController 初始化的过程中,调用registerProcessor 方法注册了处理器,在注册处理器的代码中可以看到创建了处理消息发送的处理器对象SendMessageProcessor ,然后将其注册到远程服务中:public class BrokerController { // 初始化 public boolean initialize() throws CloneNotSupportedException { // ... // 注册处理器 this.registerProcessor(); // ... } // 注册处理器 public void registerProcessor() { /** * 发送消息处理器 */ SendMessageProcessor sendProcessor = new SendMessageProcessor(this); // ... // 注册消息发送处理器 this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); // 省略其他注册... } }
在Broker收到生产者的发送消息请求时,会进入到 SendMessageProcessor 的processRequest 方法中处理请求,然后又会调用asyncProcessRequest 异步处理消息,然后从请求中解析请求头数据,并判断是否是批量发送消息的请求,如果是批量发送消息调用asyncSendBatchMessage 方法处理,否则调用asyncSendMessage 方法处理单个消息:public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { // 处理请求 @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { RemotingCommand response = null; try { // 处理请求 response = asyncProcessRequest(ctx, request).get(); } catch (InterruptedException | ExecutionException e) { log.error("process SendMessage error, request : " + request.toString(), e); } return response; } // 异步处理请求 public CompletableFuture asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: return this.asyncConsumerSendMsgBack(ctx, request); default: // 解析请求头 SendMessageRequestHeader requestHeader = parseRequestHeader(request); // ... if (requestHeader.isBatch()) { // 批量消息发送处理 return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { // 单个消息发送处理 return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader); } } } // 单个消息发送处理 private CompletableFuture asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, SendMessageRequestHeader requestHeader) { // ... CompletableFuture putMessageResult = null; String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); // 是否使用事务 if (transFlag != null && Boolean.parseBoolean(transFlag)) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return CompletableFuture.completedFuture(response); } // 事务处理 putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); } else { // 消息持久化 putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); } return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt); } }
以单个消息的发送处理方法 asyncSendMessage 为例看一下消息的接收过程:创建 MessageExtBrokerInner 对象,对消息的相关内容进行封装,将主题信息、队列ID、消息内容、消息属性、发送消息时间、发送消息的主机地址等信息设置到MessageExtBrokerInner中判断是否使用了事务,如果未使用事务调用 brokerController 的getMessageStore 方法获取MessageStore 对象,然后调用asyncPutMessage 方法对消息进行持久化存储返回消息的存储结果 public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { // 单个消息发送处理 private CompletableFuture asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, SendMessageRequestHeader requestHeader) { // ... // 创建MessageExtBrokerInner对象,之后使用这个对象来操纵消息 MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); // 设置主题 msgInner.setTopic(requestHeader.getTopic()); // 设置消息所在的队列ID msgInner.setQueueId(queueIdInt); if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { return CompletableFuture.completedFuture(response); } // 设置消息内容 msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); // 设置属性 Map origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); MessageAccessor.setProperties(msgInner, origProps); // 设置发送消息时间 msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); // 设置发送消息的主机地址 msgInner.setBornHost(ctx.channel().remoteAddress()); // 设置存储消息的主机地址 msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName(); // 属性中添加集群名称 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName); // 如果属性中包含PROPERTY_WAIT_STORE_MSG_OK if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) { String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK); // 设置消息属性 msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue); } else { msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); } CompletableFuture putMessageResult = null; String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); // 是否使用事务 if (transFlag != null && Boolean.parseBoolean(transFlag)) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return CompletableFuture.completedFuture(response); } // 事务处理 putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); } else { // 消息写入 putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); } // 返回消息持久化结果 return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt); } }
MessageStore 是一个接口,在BrokerController 的初始化方法中可以看到,具体使用的是DefaultMessageStore :public class BrokerController { private MessageStore messageStore; public boolean initialize() throws CloneNotSupportedException { boolean result = this.topicConfigManager.load(); // ... if (result) { try { // 创建DefaultMessageStore this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); // ... } catch (IOException e) { result = false; log.error("Failed to initialize", e); } } // 获取MessageStore public MessageStore getMessageStore() { return messageStore; } } 消息存储
DefaultMessageStore 中有一个CommitLog 类型的成员变量,在DefaultMessageStore 中的构造函数中可以看到,如果启用了Dleger,使用的是DLedgerCommitLog ,DLedgerCommitLog 是CommitLog 的子类,如果未启用Dleger,就使用CommitLog 自己(接下来会以CommitLog 为例)。
在 DefaultMessageStore 的asyncPutMessage 方法中,首先进行了一系列的合法性校验,校验通过后会调用CommitLog 的asyncPutMessage 进行消息写入:public class DefaultMessageStore implements MessageStore { private final CommitLog commitLog; // CommitLog public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { // ... // 如果启用了Dleger if (messageStoreConfig.isEnableDLegerCommitLog()) { // 使用DLedgerCommitLog this.commitLog = new DLedgerCommitLog(this); } else { // 否则使用CommitLog this.commitLog = new CommitLog(this); } // ... } @Override public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) { // 校验存储状态 PutMessageStatus checkStoreStatus = this.checkStoreStatus(); if (checkStoreStatus != PutMessageStatus.PUT_OK) { return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null)); } // 校验消息合法性 PutMessageStatus msgCheckStatus = this.checkMessage(msg); if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null)); } // 进行一系列校验 PutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg); if (msgCheckStatus == PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED) { return CompletableFuture.completedFuture(new PutMessageResult(lmqMsgCheckStatus, null)); } long beginTime = this.getSystemClock().now(); // 调用CommitLog的asyncPutMessage方法写入消息 CompletableFuture putResultFuture = this.commitLog.asyncPutMessage(msg); putResultFuture.thenAccept((result) -> { long elapsedTime = this.getSystemClock().now() - beginTime; if (elapsedTime > 500) { log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().add(1); } }); return putResultFuture; } } 合法性校验Broker存储检查
checkStoreStatus 主要对Broker是否可以写入消息进行检查,包含以下几个方面:MessageStore 是否已经处于关闭状态,如果处于关闭状态不再受理消息的存储Broker是否是从节点,从节点只能读不能写 Broker是否有写权限,如果没有写入权限,不能进行写入操作 操作系统是否处于PAGECACHE繁忙状态,处于繁忙状态同样不能进行写入操作 private PutMessageStatus checkStoreStatus() { // 是否处于停止状态 if (this.shutdown) { log.warn("message store has shutdown, so putMessage is forbidden"); return PutMessageStatus.SERVICE_NOT_AVAILABLE; } // 是否SLAVE角色 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("broke role is slave, so putMessage is forbidden"); } return PutMessageStatus.SERVICE_NOT_AVAILABLE; } // 是否可写 if (!this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("the message store is not writable. It may be caused by one of the following reasons: " + "the broker"s disk is full, write to logic queue error, write to index file error, etc"); } return PutMessageStatus.SERVICE_NOT_AVAILABLE; } else { this.printTimes.set(0); } // 操作系统是否处于PAGECACHE繁忙状态 if (this.isOSPageCacheBusy()) { return PutMessageStatus.OS_PAGECACHE_BUSY; } return PutMessageStatus.PUT_OK; } 消息长度检查
checkMessage 方法主要是对主题的长度校验和消息属性的长度校验: private PutMessageStatus checkMessage(MessageExtBrokerInner msg) { // 如果主题的长度大于最大值 if (msg.getTopic().length() > Byte.MAX_VALUE) { log.warn("putMessage message topic length too long " + msg.getTopic().length()); return PutMessageStatus.MESSAGE_ILLEGAL; } // 如果消息属性长度大于最大值 if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); return PutMessageStatus.MESSAGE_ILLEGAL; } return PutMessageStatus.PUT_OK; } checkLmqMessage
checkLmqMessage 主要判断在开启LMQ(Light Message Queue)时是否超过了最大消费数量: private PutMessageStatus checkLmqMessage(MessageExtBrokerInner msg) { // 如果消息属性不为空、存在PROPERTY_INNER_MULTI_DISPATCH属性、并且超过了最大消费数量 if (msg.getProperties() != null && StringUtils.isNotBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) && this.isLmqConsumeQueueNumExceeded()) { return PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED; } return PutMessageStatus.PUT_OK; } private boolean isLmqConsumeQueueNumExceeded() { // 开启了LMQ && 开启了多个队列分发 && 消费数量大于了限定值 if (this.getMessageStoreConfig().isEnableLmq() && this.getMessageStoreConfig().isEnableMultiDispatch() && this.lmqConsumeQueueNum.get() > this.messageStoreConfig.getMaxLmqConsumeQueueNum()) { return true; } return false; } 消息写入
对消息进行校验完毕之后,调用了 CommitLog 的asyncPutMessage 进行消息写入,为了简单起见,这里我们先不考虑事务,处理流程如下:首先对消息的相关属性进行了设置,主要包括以下内容 存储时间 消息内容的CRC校验和 如果发送消息的主机地址或者当前存储消息的Broker地址使用了IPV6,设置相应的IPV6标识 获取当前线程绑定的PutMessageThreadLocal对象,里面有一个 MessageExtEncoder 类型的成员变量,调用它的encode方法可以对消息进行编码,将数据先写入内存buffer,然后调用MessageExtBrokerInner 的setEncodedBuff 方法将buffer设置到encodedBuff 中加锁,从 mappedFileQueue 中获取上一次使用的映射文件mappedFile ,并更新消息的存储时间, 如果mappedFile 为空或者已写满,说明是第一次写入消息还没有创建文件或者上一次写入的文件已达到规定的大小,需要新建一个文件,如果新建文件为空打印错误日志并返回结果mappedFile可以看做是每一个Commitlog文件的映射对象,Commitlog文件的大小限定为1G mappedFileQueue是所有mappedFile的集合,可以理解为CommitLog文件所在的目录 调用mappedFile 的appendMessage 方法向文件中追加消息数据,在调用方法时传入了回调函数appendMessageCallback,在CommitLog的构造函数中可以看到是DefaultAppendMessageCallback 类型的,所以会进入到DefaultAppendMessageCallback中进行消息写入,如果写入成功,数据会留在操作系统的PAGECACHE中调用submitFlushRequest 方法执行刷盘策略,判断是否需要立刻将PAGECACHE中的数据刷到磁盘public class CommitLog { // 所有mappedFile集合 protected final MappedFileQueue mappedFileQueue; // ThreadLocal private final ThreadLocal putMessageThreadLocal; // 写入消息的回调函数 private final AppendMessageCallback appendMessageCallback; public CommitLog(final DefaultMessageStore defaultMessageStore) { // 构造函数 //... // 创建回调函数 this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); //... } public CompletableFuture asyncPutMessage(final MessageExtBrokerInner msg) { // 设置存储时间 msg.setStoreTimestamp(System.currentTimeMillis()); // 设置消息的CRC值 msg.setBodyCRC(UtilAll.crc32(msg.getBody())); // 写入结果 AppendMessageResult result = null; // 获取存储统计服务 StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); // 获取主题 String topic = msg.getTopic(); // 获取事务类型 final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // 省略事务相关处理 } // 获取发送消息的主机地址 InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost(); if (bornSocketAddress.getAddress() instanceof Inet6Address) { // 如果是IPV6 msg.setBornHostV6Flag(); // 设置IPV6标识 } // 获取存储消息的主机地址 InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost(); if (storeSocketAddress.getAddress() instanceof Inet6Address) { msg.setStoreHostAddressV6Flag(); // 设置IPV6标识 } // 获取当前线程绑定的PutMessageThreadLocal对象 PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get(); // 调用encode方法对消息进行编码,并写入buffer PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg); if (encodeResult != null) { return CompletableFuture.completedFuture(encodeResult); } // 将存储编码消息的buffer设置到msg中 msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer); // 创建PutMessageContext PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg)); long elapsedTimeInLock = 0; MappedFile unlockMappedFile = null; // 加锁 putMessageLock.lock(); try { // 获取上一次写入的文件 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // 获取系统时间戳 long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; // 再次更新存储时间戳,保证全局顺序 msg.setStoreTimestamp(beginLockTimestamp); // 如果mapppedFile为空或者已满,说明是第一次写入消息还没有创建文件或者上一次写入的文件已满,需要新建一个文件 if (null == mappedFile || mappedFile.isFull()) { // 使用偏移量0创建一个新的文件 mappedFile = this.mappedFileQueue.getLastMappedFile(0); } // 如果依旧为空 if (null == mappedFile) { // 提示错误 log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null)); } // 写入消息 result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); // ... elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; } finally { beginTimeInLock = 0; putMessageLock.unlock(); } // ... PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // 统计相关 storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes()); // 执行刷盘 CompletableFuture flushResultFuture = submitFlushRequest(result, msg); CompletableFuture replicaResultFuture = submitReplicaRequest(result, msg); return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> { if (flushStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(flushStatus); } if (replicaStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(replicaStatus); } // 返回结果 return putMessageResult; }); } } 写入内存Buffer编码消息
MessageExtEncoder 是CommitLog 的一个内部类,它被CommitLog 的另外一个内部类PutMessageThreadLocal 所引用,ThreadLocal 一般用于多线程环境下,为每个线程创建自己的副本变量,从而互不影响,PutMessageThreadLocal在构造函数中对MessageExtEncoder进行了实例化,并指定了创建缓冲区的大小:public class CommitLog { // ThreadLocal private final ThreadLocal putMessageThreadLocal; // 添加消息的ThreadLocal对象 static class PutMessageThreadLocal { private MessageExtEncoder encoder; // 引用MessageExtEncoder private StringBuilder keyBuilder; PutMessageThreadLocal(int size) { // 创建MessageExtEncoder,size用来指定分配内存的大小 encoder = new MessageExtEncoder(size); keyBuilder = new StringBuilder(); } // ... } }
MessageExtEncoder 中使用了ByteBuffer 作为消息内容存放的缓冲区,上面可知缓冲区的大小是在PutMessageThreadLocal 的构造函数中指定的,MessageExtEncoder的 encode方法中对消息进了编码并将数据写入分配的缓冲区:对消息属性数据的长度进行校验判断是否超过限定值 对总消息内容长度进行校验,判断是否超过最大的长度限制 根据总消息内容长度对buffer进行初始化,也就是根据消息需要的大小申请一块内存区域 将消息相关信息写入buffer: 写入消息长度写入魔数写入消息体CRC校验和写入队列ID写入标识队列的偏移量, 需要注意这里还没达到偏移量的值,先占位稍后写入文件的物理偏移量, 先占位稍后写入写入系统标识写入发送消息的时间戳写入发送消息的主机地址写入存储时间戳写入存储消息的主机地址RECONSUMETIMESPrepared Transaction Offset写入消息体长度和消息内容写入主题长度写入主题写入属性长度和属性内容 public class CommitLog { // MessageExtEncoder public static class MessageExtEncoder { // 字节缓冲区,存储消息内容的buffer private final ByteBuffer encoderBuffer; MessageExtEncoder(final int size) { // 分配内存 this.encoderBuffer = ByteBuffer.allocateDirect(size); this.maxMessageSize = size; } // 对消息进行编码并写入buffer protected PutMessageResult encode(MessageExtBrokerInner msgInner) { // 消息属性数据 final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); // 属性数据长度 final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; // 校验长度是否超过最大值 if (propertiesLength > Short.MAX_VALUE) { log.warn("putMessage message properties length too long. length={}", propertiesData.length); return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); } // 获取主题数据 final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length;// 主题数据长度 // 获取消息体内容长度 final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; // 总消息内容长度 final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength); // 是否超过最大长度限制 if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + ", maxMessageSize: " + this.maxMessageSize); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); } // 初始化 this.resetByteBuffer(encoderBuffer, msgLen); // 1 写入消息长度 this.encoderBuffer.putInt(msgLen); // 2 写入魔数 this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 3 写入消息体CRC校验和 this.encoderBuffer.putInt(msgInner.getBodyCRC()); // 4 写入队列ID this.encoderBuffer.putInt(msgInner.getQueueId()); // 5 写入标识 this.encoderBuffer.putInt(msgInner.getFlag()); // 6 队列的偏移量, 稍后写入 this.encoderBuffer.putLong(0); // 7 文件的物理偏移量, 稍后写入 this.encoderBuffer.putLong(0); // 8 写入系统标识 this.encoderBuffer.putInt(msgInner.getSysFlag()); // 9 写入发送消息的时间戳 this.encoderBuffer.putLong(msgInner.getBornTimestamp()); // 10 写入发送消息的主机地址 socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer); // 11 写入存储时间戳 this.encoderBuffer.putLong(msgInner.getStoreTimestamp()); // 12 写入存储消息的主机地址 socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer); // 13 RECONSUMETIMES this.encoderBuffer.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset()); // 15 写入消息体长度 this.encoderBuffer.putInt(bodyLength); if (bodyLength > 0) this.encoderBuffer.put(msgInner.getBody());// 写入消息内容 // 16 写入主题长度 this.encoderBuffer.put((byte) topicLength); // 写入主题 this.encoderBuffer.put(topicData); // 17 写入属性长度 this.encoderBuffer.putShort((short) propertiesLength); if (propertiesLength > 0) this.encoderBuffer.put(propertiesData); // 写入属性数据 encoderBuffer.flip(); return null; } } } 写入内存映射文件
前面提到 MappedFile 可以看做是每一个Commitlog文件的映射,里面记录了文件的大小以及数据已经写入的位置,还有两个字节缓冲区ByteBuffer和MappedByteBuffer,它们的继承关系如下:
ByteBuffer :字节缓冲区,用于在内存中分配空间,可以在JVM堆中分配内存(HeapByteBuffer),也可以在堆外分配内存(DirectByteBuffer)。
MappedByteBuffer :是ByteBuffer的子类,它是将磁盘的文件内容映射到虚拟地址空间,通过虚拟地址访问物理内存中映射的文件内容,也叫文件映射,可以减少数据的拷贝。
MappedFile提供了两种方式来进行内容的写入,对应不同的init方法:
第一种通过ByteBuffer分配缓冲区并将内容写入缓冲区,并且使用了暂存池对内存进行管理,需要时进行申请,使用完毕后回收,类似于数据库连接池。
第二种是通过MappedByteBuffer,对CommitLog进行文件映射,然后进行消息写入。
综上所述,开启使用暂存池时会使用ByteBuffer,否则使用MappedByteBuffer进行内容写入。 public class MappedFile extends ReferenceResource { // 记录文件的写入位置 protected final AtomicInteger wrotePosition = new AtomicInteger(0); // 文件大小 protected int fileSize; // 字节buffer protected ByteBuffer writeBuffer = null; // 文件映射 private MappedByteBuffer mappedByteBuffer; // 暂存池,类似线程池,只不过池中存放的是申请的内存 protected TransientStorePool transientStorePool = null; // 初始化 public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException { init(fileName, fileSize); // 从暂存池中获取一块内存 this.writeBuffer = transientStorePool.borrowBuffer(); this.transientStorePool = transientStorePool; } // 初始化 private void init(final String fileName, final int fileSize) throws IOException { // ... try { // 获取文件 this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); // 进行文件映射 this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); TOTAL_MAPPED_FILES.incrementAndGet(); ok = true; } catch (FileNotFoundException e) { // ... } catch (IOException e) { // ... } finally { if (!ok && this.fileChannel != null) { this.fileChannel.close(); } } } } // 暂存池 public class TransientStorePool { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private final int poolSize; // 暂存池大小 private final int fileSize; // 申请的每一块内存大小 private final Deque availableBuffers; // 双端队列,存放申请的内存 private final MessageStoreConfig storeConfig; // 存储配置 public TransientStorePool(final MessageStoreConfig storeConfig) { this.storeConfig = storeConfig; this.poolSize = storeConfig.getTransientStorePoolSize(); this.fileSize = storeConfig.getMappedFileSizeCommitLog(); this.availableBuffers = new ConcurrentLinkedDeque<>(); } /** * 初始化 */ public void init() { // 根据暂存池大小申请内存 for (int i = 0; i < poolSize; i++) { // 申请直接内存 ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize); final long address = ((DirectBuffer) byteBuffer).address(); Pointer pointer = new Pointer(address); LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize)); // 放入到暂存池中 availableBuffers.offer(byteBuffer); } } }
经过之前的步骤,消息内容已经写入到内存缓冲区中,并且也知道准备进行写入的CommitLog对应的映射文件,接下来就可以调用MappedFile的 appendMessagesInner 方法将内存中的内容写入映射文件,处理逻辑如下:MappedFile 中记录了文件的写入位置,获取准备写入的位置,如果写入的位置小于文件大小,意味着当前文件可以进行内容写入,反之说明此文件已写满,不能继续下一步,需要返回错误信息如果writeBuffer不为空,使用writeBuffer,否则使用mappedByteBuffer的 slice 方法创建一个与MappedFile 共享的内存区byteBuffer,设置byteBuffer的写入位置,之后通过byteBuffer来进行消息写入,由于是共享内存区域,所以写入的内容会影响到writeBuffer或者mappedByteBuffer中调用回调函数的doAppend方法进行写入,前面可知回调函数是 DefaultAppendMessageCallback 类型的更新 MappedFile 写入位置,返回写入结果public class MappedFile extends ReferenceResource { // 记录文件的写入位置 protected final AtomicInteger wrotePosition = new AtomicInteger(0); // 文件大小 protected int fileSize; // 字节buffer protected ByteBuffer writeBuffer = null; // 文件映射 private MappedByteBuffer mappedByteBuffer; // 写入消息 public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb, PutMessageContext putMessageContext) { // 调用appendMessagesInner return appendMessagesInner(msg, cb, putMessageContext); } public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb, PutMessageContext putMessageContext) { assert messageExt != null; assert cb != null; // 获取写入位置 int currentPos = this.wrotePosition.get(); // 如果写指针小于文件大小 if (currentPos < this.fileSize) { // 如果writeBuffer不为空,使用writeBuffer的slice方法创建共享内存区,否则使用mappedByteBuffer ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); // 设置共享内存区的写入位置 byteBuffer.position(currentPos); AppendMessageResult result; if (messageExt instanceof MessageExtBrokerInner) { // 单个消息处理 // 通过共享内存区byteBuffer写入数据 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt, putMessageContext); } else if (messageExt instanceof MessageExtBatch) { // 批量消息 // 通过共享内存区byteBuffer写入数据 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt, putMessageContext); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } // 更新MappedFile的写入位置 this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } }
进入到 DefaultAppendMessageCallback 的doAppend 方法中,首先来看方法的入参:fileFromOffset:文件的起始位置偏移量 byteBuffer:缓冲区,也就是上一步中创建的共享内存区 maxBlank:上一步中可知传入的是文件总大小减去当前要写入的位置,也就是文件剩余空间大小 msgInner:消息内容的封装体 putMessageContext:消息写入上下文
方法的处理逻辑如下: 计算文件要写入位置偏移量:文件起始位置偏移量 + 准备写入位置的偏移量 从消息写入上下文中获取主题所属队列的KEY,根据KEY从主题队列路由表中获取队列偏移量,如果获取为空,将偏移量初始化为0并加入到路由表中 从msgInner中获取之前已经写入到内存的消息数据 preEncodeBuffer ,并获取消息内容的长度校验是否有足够的空间写入数据,如果消息长度 + END_FILE_MIN_BLANK_LENGTH (预留空间大小) 大于剩余空间,说明超出了限定的文件大小,此时只将文件大小和魔数写入文件,然后返回写入结果,结果类型为END_OF_FILE (超过文件大小)。这里可以看出每个CommitLog文件需要预留一部分空间(8个字节)用于存储文件大小和魔数。 计算队列偏移量在preEncodeBuffer 中的位置,之前在编码消息步骤时并未写入队列的偏移量值的大小,这里需要找到对应位置更新队列偏移量的值再次更新消息的存储时间,并将preEncodeBuffer的内容写入文件共享缓冲区byteBuffer,**此时消息内容已经写入文件对应的内存buffer中,驻留在操作系统的PAGECACHE中,接下来需要根据刷盘策略决定何时将内容保存到硬盘中。 **
消息写入结果 PUT_OK:写入成功; END_OF_FILE:超过文件大小; MESSAGE_SIZE_EXCEEDED:消息长度超过最大允许长度: PROPERTIES_SIZE_EXCEEDED:消息、属性超过最大允许长度; UNKNOWN_ERROR:未知异常; public class CommitLog { class DefaultAppendMessageCallback implements AppendMessageCallback { // 预留空间大小,8个字节 private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4; public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) { // 计算写入位置物理偏移量:文件起始位置 + 准备写入位置的偏移量 long wroteOffset = fileFromOffset + byteBuffer.position(); Supplier msgIdSupplier = () -> { int sysflag = msgInner.getSysFlag(); int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen); MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer); msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer msgIdBuffer.putLong(msgIdLen - 8, wroteOffset); return UtilAll.bytes2string(msgIdBuffer.array()); }; // 获取消息队列信息 String key = putMessageContext.getTopicQueueTableKey(); // 从主题队列路由表中获取队列偏移量 Long queueOffset = CommitLog.this.topicQueueTable.get(key); // 如果偏移量为空 if (null == queueOffset) { queueOffset = 0L; // 初始化为0 // 添加到路由表中 CommitLog.this.topicQueueTable.put(key, queueOffset); } boolean multiDispatchWrapResult = CommitLog.this.multiDispatch.wrapMultiDispatch(msgInner); if (!multiDispatchWrapResult) { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } // 如果开启事务需要特殊处理 final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); // ... // 获取之前已经写入到buffer的消息数据 ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); // 获取数据长度 final int msgLen = preEncodeBuffer.getInt(0); // 校验是否有足够的空间写入数据,如果消息长度 + 预留空间大小 大于最大值 if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this.msgStoreItemMemory.clear(); // 1 设置文件大小 this.msgStoreItemMemory.putInt(maxBlank); // 2 写入魔数 this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 开始时间 final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // 将文件大小和魔数写入buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8); // 返回写入结果,由于剩余空间不足以写入消息内容,这里返回类型为END_OF_FILE return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */ msgIdSupplier, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } // 计算队列偏移量的位置 int pos = 4 + 4 + 4 + 4 + 4; // 6 写入队列偏移量 preEncodeBuffer.putLong(pos, queueOffset); pos += 8; // 7 写入物理偏移量 preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position()); int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; // 8 系统标识, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP pos += 8 + 4 + 8 + ipLen; // 计算存储时间戳的写入位置 // 更新新存储时间戳 preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp()); final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // 将preEncodeBuffer的数据写入byteBuffer byteBuffer.put(preEncodeBuffer); // 清空buffer msgInner.setEncodedBuff(null); // 设置返回结果 AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // The next update ConsumeQueue information CommitLog.this.topicQueueTable.put(key, ++queueOffset); CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner); break; default: break; } return result; } } } 刷盘
由于篇幅原因,刷盘机制将另写一篇文章。
总结
参考
丁威、周继锋《RocketMQ技术内幕》
https://github.com/apache/rocketmq/blob/develop/docs/cn/Example_LMQ.md
RocketMQ版本:4.9.3
抱团发展官方组织助力白酒产区化12月7日,北京商报记者了解到,在成立贵州省白酒产业发展促进会(以下简称发展促进会)后,贵州省又发布贵州省赤水河流域酱香型白酒生产环境保护条例(以下简称保护条例)。酱酒市场从快速发
百元入手性价比爆表!雷柏V600S双模无线游戏手柄评测文氪新抢先看导言2022年终降至,双十二黑五圣诞活动即将到来,此时大作云集,TGA等年度大奖也纷纷开始征集颁布,一时间玩家社区热闹非凡。正逢游戏玩家过年前夕,国内老牌外设厂商雷柏发
每日一习话新征程始终推进党的自我革命视频加载中(欢迎点击视频,观看本期每日一习话)习近平新征程上,我们要始终推进党的自我革命。一个饱经沧桑而初心不改的党,才能基业常青一个铸就辉煌仍勇于自我革命的党,才能无坚不摧。这段
让亿万孩子共享优质教育党的二十大报告从实施科教兴国战略,强化现代化建设人才支撑的高度,对办好人民满意的教育作出专门部署,彰显了以人民为中心发展教育的价值追求,为推动教育改革发展指明了方向。基础教育在国民
新手宝妈如何哺乳?11个问题带您读懂哺乳那些事谈到母乳喂养,无论是新晋妈妈还是二胎妈妈,大都在产后会遇到一些问题或者是困惑,那么就让我们来帮大家扫除这些扰人的小垃圾吧!1。按时哺乳还是按需哺乳?一定是按需给宝宝喂奶,而不是按时
一名年轻宝妈的诞娃经历在一次意外中,她成功的迎来了她生命中最重要的人怀孕了本是花样年华的她,从此过上了普通人的育儿生儿养儿的一生,但是她成为后悔过。当她胆怯的将这个消息告知家里父母时,母亲的态度让她一度
詹姆斯谈AD缺阵下一个人要站出来,每个人都要做到更多今日,湖人102116客场不敌骑士。赛后,湖人前锋勒布朗詹姆斯接受了采访。当被问到如果安东尼戴维斯要继续缺席一段时间的最佳应对方法,詹姆斯说就是下一个人挺身而出,这就是团队的意义。
成都最美街道,大邑这条GAI等你打卡!桃源大道日前,2022年成都市秋季最美街道评选揭晓,大邑县桃源大道成功上榜。本次桃源大道的入选,是继花水湾镇温泉南街,斜源晒药巷,安仁镇红星街树人街迎宾路二段,江镇飞凤街后,雪山小
天边榜单新十条之后第一站去哪儿?衡山雾凇赏雪正当时12月7日,国务院联防联控机制发布关于进一步优化落实疫情防控的通知,不再对跨地区流动人员查验核酸检测阴性证明和健康码。这一消息无疑是对旅游业的利好,消息发布后,各旅游平台机票等产品
云南贡山县独龙江乡旅游独龙江乡,是云南旅游的最后秘境,几十年来,不为外人所知,它比欧美旅游还稀奇,你再有钱也不一定能去独龙江乡旅游,因为它经常不定时关闭。独龙江,流经察隅县贡山县缅甸克钦邦,也是横断山脉
豆腐皮怎么做才好吃?学会这个方法,做出来香气扑鼻,特别下饭豆腐皮怎么做才好吃?学会这个方法,做出来香气扑鼻,特别下饭。天气凉了,可以适当多吃一些豆制品,比如豆腐和豆腐皮等。豆腐皮中含有大量优质蛋白质以及B族维生素,而且易于被人体消化吸收,