RocketMQ源码之broker高可用CommitLog管理组件DLedgerCommitLog
#头条创作挑战赛#一、前言
前文 RocketMQ源码分析之核心磁盘数据结构CommitLog 让我们知道这个CommitLog是干什么用的,就是broker会将消息写入本地磁盘的CommitLog文件中。
但是 CommitLog采用的 Master/Slave 部署模式,提供了一定的高可用性。但这样的部署模式,有一定缺陷。比如故障转移方面,如果主节点挂了,还需要人为手动进行重启或者切换,无法自动将一个从节点转换为主节点。
所以RocketMQ通过DLedgerCommitLog来实现基于 raft 协议的 commitlog 存储库,也是 RocketMQ 实现新的高可用多副本架构的关键。 二、源码分析DLedgerCommitLog初始化时机; DLedgerCommitLog成员变量; DLedgerCommitLog 构造函数;加载所有磁盘文件mappedfile的数据;数据恢复;追加消息; 消息查找;1、DLedgerCommitLog初始化时机
DefaultMessageStore构造时进行根据是否开启高可用来初始化DLedgerCommitLog还是 commitLog;
上面这行代码也就是说如果开启了高可用的话默认初始化一个DLedgerCommitLog否则就初始化原始的commitLog,我们到这里就可以想到了,这个DLedgerCommitLog和原始的CommitLog相比肯定是多了往子节点同步的部分。2、DLedgerCommitLog成员变量/** * Store all metadata downtime for recovery, data protection reliability * 他是commitlog的子类,他可以去继承我么的commitlog把数据写入到本地磁盘文件里去,以及flush这样的功能 * 对于我们的数据恢复、以及数据保护可以去做一个多副本策略,高可用架构 */ public class DLedgerCommitLog extends CommitLog { // 开源dledger框架的高可用同步服务器组件 private final DLedgerServer dLedgerServer; // 开源dledger框架的配置组件 private final DLedgerConfig dLedgerConfig; // 开源dledger框架的mmap内存映射文件存储组件 private final DLedgerMmapFileStore dLedgerFileStore; // 开源dledger框架的mmap内存映射文件list private final MmapFileList dLedgerFileList; //The id identifies the broker role, 0 means master, others means slave private final int id; // 消息序列器 private final MessageSerializer messageSerializer; // 用于记录消息追加的时耗(日志追加所持有锁时间) private volatile long beginTimeInDledgerLock = 0; //This offset separate the old commitlog from dledger commitlog // 记录的旧Commitlog文件中的最大偏移量,如果访问的偏移量大于它,则访问Dledger 管理的文件 private long pidedCommitlogOffset = -1; // 是否正在恢复旧的Commitlog文件 private boolean isInrecoveringOldCommitlog = false; private final StringBuilder msgIdBuilder = new StringBuilder(); }
我们可以看到DLedgerCommitLog实际上是继承了CommitLog的,那么DLedgerCommitLog的存储结构又是怎么样的呢,如何兼容CommitLog呢,其实我们根据上面的知识可以想到其实我们的主从高可用只是比普通模式的Log需要多记录一些term,channel等这些元数据信息:
看到这里我们能想到,我们只要把commitLog的原本信息放到body里不就可以兼容commitLog了,而且改动也不大,对于历史数据也能很好的兼容,rocketmq确实是这么做的。 3、DLedgerCommitLog构造函数public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) { // 调用父类的构造函数 也就是说开启了主从架构也会兼容历史的消息 super(defaultMessageStore); dLedgerConfig = new DLedgerConfig(); // 是否强制删除文件,取自Broker配置属性cleanFileForciblyEnable,默认为true dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable()); // DLedger存储类型,固定为基于文件的存储模式 dLedgerConfig.setStoreType(DLedgerConfig.FILE); // Leader节点的id 名称,示例配置:n0,其配置要求第二个字符后必须是数字。 dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId()); // DLeger group 的名称,建议与broker 配置属性brokerName 保持一致 dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup()); // DLeger Group 中所有的节点信息,其配置示例n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913。多个节点使用分号隔开。 dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers()); // 设置DLedger 的日志文件的根目录,取自borker 配件文件中的storePathRootDir ,即RocketMQ 的数据存储根路径。 dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()); // 设置DLedger 的单个日志文件的大小,取自Broker 配置文件中的mapedFileSizeCommitLog,即与Commitlog 文件的单个文件大小一致 dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog()); // DLedger 日志文件的删除时间,取自Broker 配置文件中的deleteWhen,默认为凌晨4 点 dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen()); // DLedger 日志文件保留时长,取自Broker 配置文件中的fileReservedHours,默认为72h dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1); dLedgerConfig.setPreferredLeaderId(defaultMessageStore.getMessageStoreConfig().getPreferredLeaderId()); dLedgerConfig.setEnableBatchPush(defaultMessageStore.getMessageStoreConfig().isEnableBatchPush()); id = Integer.parseInt(dLedgerConfig.getSelfId().substring(1)) + 1; // 初始化DledgerServer 主要是进行主从复制以及选举使用 dLedgerServer = new DLedgerServer(dLedgerConfig); dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore(); // 在dledger框架的存储层里加一个append钩子,追加数据之前需要什么定位到这条数据的一个位置 // 然后加入进去这条数据在commitlog里面的全局物理offset DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> { // 我们上面说过其实当我们开启了主从同步之后我们追加消息的时候 // 其实只有body是存储的原始的commitLog结构其他对于客户端都是无用的信息 // 所以这里设置的追加消息的钩子函数就是为了返回body的offset assert bodyOffset == DLedgerEntry.BODY_OFFSET; buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION); buffer.putLong(entry.getPos() + bodyOffset); }; dLedgerFileStore.addAppendHook(appendHook); dLedgerFileList = dLedgerFileStore.getDataFileList(); this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); }
主要流程节点: 调用父类的构造函数 也就是说开启了主从架构也会兼容历史的消息 构建配置文件类 根据DledgerConfig构建DledgerServer 主要负责主从日志同步以及选举 设置追加消息的钩子函数 4、加载所有磁盘文件mappedfile的数据
这里其实就是去加载commitLog中的信息为了进行历史消息的兼容public boolean load() { return super.load(); }
最后还是调用到了父类CommitLog中的load方法,其中mappedFileQueue的load方法,前文 RocketMQ源码分析之映射文件队列MappedFileQueue 有进行讲解;// CommitLog里面数据都是在多个磁盘文件里的,每个磁盘文件都是一个MappedFile // 他应该是属于把所有的磁盘文件mappedfile的数据,从磁盘里load加载到映射内存区域里来 public boolean load() { boolean result = this.mappedFileQueue.load(); log.info("load commit log " + (result ? "OK" : "Failed")); return result; }5、数据恢复加载commitLog以及index文件的wrotePosition,flushedPosition,committedPosition重要的指针; 如果存在dLedgerFile则恢复返回;调用commitLog的recoverNormall() 进行commitLog文件的恢复; 如果不存在旧的commitLog直接结束文件日志的恢复流程; 如果存在则尝试找到最后一个commitLog文件,如果没找到就停止;从最后一个文件的最后写入点尝试查找写入的魔数,如果存在魔数并且等CommitLog.BLANK_MAGIC_CODE则无需写入魔数;初始化pidedCommitlogOffset,等于最后一个文件的起始偏移量加上文件的大小,即该指针指向最后一个文件的结束位置;将最后一个文件全部写满,其方法为设置消息体的大小以及魔数;设置最后一个文件的WrotePosition,CommittedPosition,FlushedPosition 表示文件已经被写满;private void recover(long maxPhyOffsetOfConsumeQueue) { // 主要是加载commitLog以及index文件的wrotePosition,flushedPosition,committedPosition重要的指针 dLedgerFileStore.load(); if (dLedgerFileList.getMappedFiles().size() > 0) { // 如果存在dLedgerFile 只需要恢复dLedgerFile即可 // 存在dLedgerFile 恢复dLedgerFile dLedgerFileStore.recover(); // 设置pidedCommitlogOffset为dLedger文件的最小offset // 作为和老的commitLog的分割,小于这个offset需要访问老的commitLog pidedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset(); MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); if (mappedFile != null) { // 如果存在旧的commitLog则禁止删除Dledger防止出现日志断层影响查询 disableDeleteDledger(); } // 最大物理offset long maxPhyOffset = dLedgerFileList.getMaxWrotePosition(); // Clear ConsumeQueue redundant data if (maxPhyOffsetOfConsumeQueue >= maxPhyOffset) { log.warn("[TruncateCQ]maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, maxPhyOffset); this.defaultMessageStore.truncateDirtyLogicFiles(maxPhyOffset); } return; } //Indicate that, it is the first time to load mixed commitlog, need to recover the old commitlog isInrecoveringOldCommitlog = true; //No need the abnormal recover // 调用commitLog的recoverNormall() 进行commitLog文件的恢复 super.recoverNormally(maxPhyOffsetOfConsumeQueue); isInrecoveringOldCommitlog = false; // 如果不存在旧的commitLog直接结束文件日志的恢复流程 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // 不存在旧的commitLog直接返回 if (mappedFile == null) { return; } ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); byteBuffer.position(mappedFile.getWrotePosition()); boolean needWriteMagicCode = true; // 1 TOTAL SIZE byteBuffer.getInt(); //size int magicCode = byteBuffer.getInt(); if (magicCode == CommitLog.BLANK_MAGIC_CODE) { needWriteMagicCode = false; } else { log.info("Recover old commitlog found a illegal magic code={}", magicCode); } dLedgerConfig.setEnableDiskForceClean(false); pidedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize(); log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} pidedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), pidedCommitlogOffset); if (needWriteMagicCode) { byteBuffer.position(mappedFile.getWrotePosition()); byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition()); byteBuffer.putInt(BLANK_MAGIC_CODE); mappedFile.flush(0); } // 设置最后一个文件的WrotePosition,CommittedPosition,FlushedPosition 表示文件已经被写满 mappedFile.setWrotePosition(mappedFile.getFileSize()); mappedFile.setCommittedPosition(mappedFile.getFileSize()); mappedFile.setFlushedPosition(mappedFile.getFileSize()); dLedgerFileList.getLastMappedFile(pidedCommitlogOffset); log.info("Will set the initial commitlog offset={} for dledger", pidedCommitlogOffset); }6、追加消息public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) { StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); setMessageInfo(msg, tranType); final String finalTopic = msg.getTopic(); // Back to Results AppendMessageResult appendResult; AppendFuture dledgerFuture; EncodeResult encodeResult; encodeResult = this.messageSerializer.serialize(msg); if (encodeResult.status != AppendMessageStatus.PUT_OK) { return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status))); } putMessageLock.lock(); //spin or ReentrantLock ,depending on store config long elapsedTimeInLock; long queueOffset; try { beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); encodeResult.setQueueOffsetKey(queueOffset, false); // 追加消息的时候不再写入之前的commitLog // 而是调用dlegerserver的handleAppend进行日志的写入&子节点日志的复制(后面会详细讲解) // 只有超过半数以上的节点复制成功才会返回成功 // 如果追加成功则会返回追加成功的起始偏移量即pos属性类似于commitLog中的物理偏移量 AppendEntryRequest request = new AppendEntryRequest(); request.setGroup(dLedgerConfig.getGroup()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setBody(encodeResult.getData()); dledgerFuture = (AppendFuture) dLedgerServer.handleAppend(request); if (dledgerFuture.getPos() == -1) { return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); } // 根据dledger的起始偏移量计算真正的消息的存储offset。 long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET; int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock); switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // The next update ConsumeQueue information DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1); break; default: break; } } catch (Exception e) { log.error("Put message error", e); return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); } finally { beginTimeInDledgerLock = 0; putMessageLock.unlock(); } if (elapsedTimeInLock > 500) { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult); } return dledgerFuture.thenApply(appendEntryResponse -> { PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR; switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { case SUCCESS: putMessageStatus = PutMessageStatus.PUT_OK; break; case INCONSISTENT_LEADER: case NOT_LEADER: case LEADER_NOT_READY: case DISK_FULL: putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; break; case WAIT_QUORUM_ACK_TIMEOUT: //Do not return flush_slave_timeout to the client, for the ons client will ignore it. putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; break; case LEADER_PENDING_FULL: putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; break; } PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); if (putMessageStatus == PutMessageStatus.PUT_OK) { // Statistics storeStatsService.getSinglePutMessageTopicTimesTotal(finalTopic).add(1); storeStatsService.getSinglePutMessageTopicSizeTotal(msg.getTopic()).add(appendResult.getWroteBytes()); } return putMessageResult; }); }7、消息查找
消息的查找起始和原来还是没有什么区别的,还是使用二分查找法通过offset获取mappedFile文件,只是多了一个pidedCommitlogOffset的判断是否是老数据,如果是老数据直接走commitLog,新数据就走Dledger维护的文件列表;public SelectMappedBufferResult getMessage(final long offset, final int size) { // 如果是小于pidedCommitlogOffset 证明是旧数据 -> 从commitLog获取 if (offset < pidedCommitlogOffset) { return super.getMessage(offset, size); } // 从 dledger获取 int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData(); MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0); if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); // 获取文件并转换为 DLedgerSelectMappedBufferResult 类型 return convertSbr(mappedFile.selectMappedBuffer(pos, size)); } return null; }三、总结DLedger在整合时,使用DLedger 条目包裹RocketMQ 中的CommitLog 条目,即在DLedger 条目的body字段来存储整条CommitLog 条目; 引入pidedCommitlogOffset 变量,表示物理偏移量小于该值的消息存在于旧的CommitLog 文件中,实现升级DLedger 集群后能访问到旧的数据; 新DLedger 集群启动后,会将最后一个CommitLog 填充,即新的数据不会再写入到原先的CommitLog 文件; 消息追加到DLedger 数据日志文件中,返回的偏移量不是DLedger 条目的起始偏移量,而是DLedger 条目中body 字段的起始偏移量,即真实消息的起始偏移量,保证消息物理偏移量的语义与RocketMQ Commitlog 一样;
真假福利房?芯片大厂长江存储劝退低绩效员工,被离职还要补款数十万本文来源时代财经作者王婷图源pixabay被美国列入实体清单一个月后,国产NAND存储芯片龙头长江存储科技有限责任公司(简称长江存储)被传裁员。日前,长江存储一名老员工在知乎上爆料
荣誉时刻!吴京主演电影票房首位破300亿的演员随着流浪地球2票房达到10亿,吴京也创造了一项前无古人后无来者的成就,主演电影票房首位破300亿的演员。2015年,吴京自编自导自演的电影战狼1获得了5。4亿人民币的票房2年后,电
美将断供华为?华为恐再现危机!随着中国的崛起,中美之间的关系也变的微妙起来,摩擦不断,尤其是针对我国的高科技企业的打压,可谓无所不用其极。到最后直接撕下脸面,通过行政手段来干预制裁,华为作为国内首屈一指的高科技
隐于儒乡小镇的股票作手拜访一位隐居曲阜交易禅师的随记历时三年的抗疫,终于告一段落了,好想出去走一走活动一下筋骨啊!周五接到上大任教的学长的电话,他邀请我一起去山东拜访他的一位好友客居孔子故里的一位股票交易员,他称之为交易禅师。我曾去
如果宇宙末日到了,那会如何?人类还能生存吗?宇宙中的万事万物最终都会消亡。曾被认为是永生的黑洞也会一点一点逝去。黑洞可以存在很久以兆亿年计时,但他们确实会消逝。所以这之后会发生什么呢?这个问题已经被揭露,现在我们一直在解答这
快记录,慢管理,这款笔记软件惊艳了我屏幕的前的各位或多或少可能都用过笔记软件吧,不管是手机或者电脑自带的记事本,或者最近网上比较出名的Notion我来等笔记软件。笔记软件可以有效的帮助我们记录一些资料,起到一种速记的
人类对外星生存的启示流浪地球的思考随着科学技术的进步,人类对宇宙的好奇心和探索热情不断增强。在这样的背景下,科幻电影流浪地球的出现,启发了人们对外星生存的思考。电影流浪地球以一群人类搭乘的飞船为主线,在地球即将毁灭
人类来到世上是一件概率极低的事情,那么这个概率究竟有多低?综述世界日新月异,每天都会发生不同的事,有的时候我们总会感叹,人活着真的很不容易。在迷惘的时候,我们也会问自己一个问题,我们为何来到这个世上?实际上,你不知道的是,我们的生命何其珍
苹果折叠屏笔记本将亮相20。5寸超大屏根据分析师郭明錤爆料数据显示,苹果计划在2025年推出折叠屏版本的MacBook,预计屏幕尺寸为20。5英寸,配备全尺寸的屏幕键盘,此外还支持外界键盘,将作为苹果的一个全新的产品线
华擎13代i7迷你主机评测性能不够惊喜,价格足够惊吓著名科技网站TechPowerUp于今日放出了华擎(ASRock)旗下的十三代酷睿迷你主机NUCSBOX1360PD4(IntelCorei71360P)的评测,从测试的结果来看,
不负春光争朝夕乘势而为开好局春回大地,万物勃发。2月1日,红花岗区优特钢精深加工技改项目现场,一台台大型机械设备整齐排列,一列列施工人员整装待命,随着遵义市2023年第一季度重大项目集中开工号角的吹响,蓄势待