范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

RocketMQ源码之DLedger存储实现DLedgerMmapFileStore

  #头条创作挑战赛#一、前言
  前文我们分析了 RocketMQ源码之broker高可用CommitLog管理组件DLedgerCommitLog,本文我们分析DLedgerCommitLog中的mmap内存映射文件存储组件:DLedgerMmapFileStore;二、源码分析DLedgerMmapFileStore抽象父类DLedgerStore;DLedgerMmapFileStore成员变量;DLedgerMmapFileStore构造函数;flush数据服务线程;清理空间服务线程;文件内存映射存储实现启动;追加数据;对数据文件进行截断;以follower身份追加数据;根据索引去查询一个数据条目;数据存储格式及编码;1、DLedgerMmapFileStore抽象父类DLedgerStore// 存储组件抽象类 public abstract class DLedgerStore {      // 获取到当前server的成员状态     public MemberState getMemberState() {         return null;     }      // 作为leader把一个entry追加到磁盘里去     public abstract DLedgerEntry appendAsLeader(DLedgerEntry entry);      /**      * 作为follower把一个entry追加到磁盘里去      * 再追加的时候,我是需要知道是哪个leader同步了这个entry给我的      * @param entry 日志条目      * @param leaderTerm leader选举周期      * @param leaderId leaderid      * @return      */     public abstract DLedgerEntry appendAsFollower(             DLedgerEntry entry, long leaderTerm, String leaderId);      // 根据索引获取日志entry     public abstract DLedgerEntry get(Long index);      // 获取已经提交的index索引     public abstract long getCommittedIndex();      // 在一轮term里更新已经提交index索引     public void updateCommittedIndex(long term, long committedIndex) {      }      // 获取结尾term     public abstract long getLedgerEndTerm();     // 获取结尾index     public abstract long getLedgerEndIndex();     // 获取开始index     public abstract long getLedgerBeginIndex();      // 更新结尾index和term     protected void updateLedgerEndIndexAndTerm() {         if (getMemberState() != null) {             getMemberState().updateLedgerIndexAndTerm(getLedgerEndIndex(), getLedgerEndTerm());         }     }      // 在存储组件里发起一次flush操作     public void flush() {      }      // 对指定的leader term和id发起一个entry的截断,truncate     public long truncate(DLedgerEntry entry, long leaderTerm, String leaderId) {         return -1;     }      // 存储组件启动     public void startup() {      }      // 存储组件停止     public void shutdown() {      }  }2、DLedgerMmapFileStore成员变量// 基于mmap内存映射文件的存储组件实现,是我们需要重点研究的 public class DLedgerMmapFileStore extends DLedgerStore {      public static final String CHECK_POINT_FILE = "checkpoint";     public static final String END_INDEX_KEY = "endIndex";     public static final String COMMITTED_INDEX_KEY = "committedIndex";     public static final int MAGIC_1 = 1;     public static final int CURRENT_MAGIC = MAGIC_1;     public static final int INDEX_UNIT_SIZE = 32;      private static Logger logger = LoggerFactory.getLogger(DLedgerMmapFileStore.class);      // 追加entries钩子     public List appendHooks = new ArrayList<>();     // 开始index索引     private long ledgerBeginIndex = -1;     // 结束index索引     private long ledgerEndIndex = -1;     // 已经提交的index索引     private long committedIndex = -1;     // 已经提交的pos位置     private long committedPos = -1;     // 结束term条目     private long ledgerEndTerm;     // dledger核心配置组件     private DLedgerConfig dLedgerConfig;     // server节点成员状态     private MemberState memberState;     // mmap内存映射数据文件list     private MmapFileList dataFileList;     // mmap内存映射索引文件list     private MmapFileList indexFileList;     // 线程本地副本里面的entry缓冲组件     private ThreadLocal localEntryBuffer;     // 线程本地副本里面的index缓冲组件     private ThreadLocal localIndexBuffer;     // flush数据服务组件     private FlushDataService flushDataService;     // 清理空间服务组件     private CleanSpaceService cleanSpaceService;     // 磁盘是否已经满了标识     private volatile boolean isDiskFull = false;     // 最近一次检查点时间戳     private long lastCheckPointTimeMs = System.currentTimeMillis();     // 是否已经加载过的boolean标识     private AtomicBoolean hasLoaded = new AtomicBoolean(false);     // 是否已经完成恢复的boolean标识     private AtomicBoolean hasRecovered = new AtomicBoolean(false);     // 完整存储路径set     private volatile Set fullStorePaths = Collections.emptySet(); }3、DLedgerMmapFileStore构造函数public DLedgerMmapFileStore(DLedgerConfig dLedgerConfig, MemberState memberState) {     // 赋值一个dledger配置组件     this.dLedgerConfig = dLedgerConfig;     // 赋值一个server成员状态组件     this.memberState = memberState;     // 我们的dledger数据存储路径里面如果说包含有多路径分隔符     if (dLedgerConfig.getDataStorePath().contains(DLedgerConfig.MULTI_PATH_SPLITTER)) {         // 把数据文件list封装为一个多路径mmap内存映射文件list         this.dataFileList = new MultiPathMmapFileList(                 dLedgerConfig,                 dLedgerConfig.getMappedFileSizeForEntryData(),                 this::getFullStorePaths         );     } else {         // 把数据文件list封装为一个mmap文件list         this.dataFileList = new MmapFileList(                 dLedgerConfig.getDataStorePath(),                 dLedgerConfig.getMappedFileSizeForEntryData()         );     }     // 把索引文件list封装成一个mmap内存映射文件list     this.indexFileList = new MmapFileList(             dLedgerConfig.getIndexStorePath(),             dLedgerConfig.getMappedFileSizeForEntryIndex()     );     // 线程副本的内存分配,localentry的内存分配,4mb     localEntryBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(4 * 1024 * 1024));     // 线程副本的内存分配,localindex的内存分配,一个索引单元大小(32个字节)* 2     localIndexBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(INDEX_UNIT_SIZE * 2));     // 构建从内存里flush数据到磁盘文件里的服务组件     flushDataService = new FlushDataService("DLedgerFlushDataService", logger);     // 构建清理空间服务组件     cleanSpaceService = new CleanSpaceService("DLedgerCleanSpaceService", logger); }4、flush数据服务线程分析
  实际flush动作会调用MmapFileList#flush方法,RocketMQ源码分析之文件内存映射对象层MappedFile核心方法 分析过,不再赘叙;// flush数据服务 class FlushDataService extends ShutdownAbleThread {      public FlushDataService(String name, Logger logger) {         super(name, logger);     }      // 他会不断的周期性的运行,但是支持关闭他     @Override     public void doWork() {         try {             long start = System.currentTimeMillis();              // 他会周期性的去触发数据文件的flush动作             DLedgerMmapFileStore.this.dataFileList.flush(0);             // 他会周期性的去触发索引文件的flush动作             DLedgerMmapFileStore.this.indexFileList.flush(0);              long elapsed;             if ((elapsed = DLedgerUtils.elapsed(start)) > 500) {                 logger.info("Flush data cost={} ms", elapsed);             }              // 如果说超过了一个检查点时间间隔,还需要去发起一次检查点持久化             if (DLedgerUtils.elapsed(lastCheckPointTimeMs) > dLedgerConfig.getCheckPointInterval()) {                 persistCheckPoint();                 lastCheckPointTimeMs = System.currentTimeMillis();             }              // 休眠flush间隔时间             waitForRunning(dLedgerConfig.getFlushFileInterval());         } catch (Throwable t) {             logger.info("Error in {}", getName(), t);             DLedgerUtils.sleep(200);         }     }  }5、清理空间服务线程// 清理空间服务线程 class CleanSpaceService extends ShutdownAbleThread {      // 获取到磁盘空间已经使用比例     double storeBaseRatio = DLedgerUtils.getDiskPartitionSpaceUsedPercent(             dLedgerConfig.getStoreBaseDir()     );     // 数据存储路径里物理占用比例     double dataRatio = calcDataStorePathPhysicRatio();      public CleanSpaceService(String name, Logger logger) {         super(name, logger);     }      @Override public void doWork() {         try {             storeBaseRatio = DLedgerUtils.getDiskPartitionSpaceUsedPercent(dLedgerConfig.getStoreBaseDir());             dataRatio = calcDataStorePathPhysicRatio();             long hourOfMs = 3600L * 1000L;             long fileReservedTimeMs = dLedgerConfig.getFileReservedHours() *  hourOfMs;             if (fileReservedTimeMs < hourOfMs) {                 logger.warn("The fileReservedTimeMs={} is smaller than hourOfMs={}", fileReservedTimeMs, hourOfMs);                 fileReservedTimeMs =  hourOfMs;             }             //If the disk is full, should prevent more data to get in             DLedgerMmapFileStore.this.isDiskFull = isNeedForbiddenWrite();             boolean timeUp = isTimeToDelete();             boolean checkExpired = isNeedCheckExpired();             boolean forceClean = isNeedForceClean();             boolean enableForceClean = dLedgerConfig.isEnableDiskForceClean();             int intervalForcibly = 120 * 1000;             if (timeUp || checkExpired) {                 int count = getDataFileList().deleteExpiredFileByTime(                         fileReservedTimeMs,                         100,                         intervalForcibly,                         forceClean && enableForceClean                 );                  if (count > 0 || (forceClean && enableForceClean) || isDiskFull) {                     logger.info("Clean space count={} timeUp={} checkExpired={} forceClean={} enableForceClean={} diskFull={} storeBaseRatio={} dataRatio={}",                         count, timeUp, checkExpired, forceClean, enableForceClean, isDiskFull, storeBaseRatio, dataRatio);                 }                  if (count > 0) {                     DLedgerMmapFileStore.this.reviseLedgerBeginIndex();                 }             }             getDataFileList().retryDeleteFirstFile(intervalForcibly);             waitForRunning(100);         } catch (Throwable t) {             logger.info("Error in {}", getName(), t);             DLedgerUtils.sleep(200);         }     }      private boolean isTimeToDelete() {         String when = DLedgerMmapFileStore.this.dLedgerConfig.getDeleteWhen();         if (DLedgerUtils.isItTimeToDo(when)) {             return true;         }          return false;     }      private boolean isNeedCheckExpired() {         if (storeBaseRatio > dLedgerConfig.getDiskSpaceRatioToCheckExpired()             || dataRatio > dLedgerConfig.getDiskSpaceRatioToCheckExpired()) {             return true;         }         return false;     }      private boolean isNeedForceClean() {         if (storeBaseRatio > dLedgerConfig.getDiskSpaceRatioToForceClean()             || dataRatio > dLedgerConfig.getDiskSpaceRatioToForceClean()) {             return true;         }         return false;     }      private boolean isNeedForbiddenWrite() {         if (storeBaseRatio > dLedgerConfig.getDiskFullRatio()             || dataRatio > dLedgerConfig.getDiskFullRatio()) {             return true;         }         return false;     }      // 计算数据存储路径物理比例     public double calcDataStorePathPhysicRatio() {         //         Set fullStorePath = new HashSet<>();         String storePath = dLedgerConfig.getDataStorePath();         String[] paths = storePath.trim().split(DLedgerConfig.MULTI_PATH_SPLITTER);          double minPhysicRatio = 100;          // 遍历每一个path路径         for (String path : paths) {             double physicRatio = DLedgerUtils.isPathExists(path) ?                     DLedgerUtils.getDiskPartitionSpaceUsedPercent(path) : -1;             minPhysicRatio = Math.min(minPhysicRatio, physicRatio);             if (physicRatio > dLedgerConfig.getDiskSpaceRatioToForceClean()) {                 fullStorePath.add(path);             }         }          DLedgerMmapFileStore.this.setFullStorePaths(fullStorePath);          return minPhysicRatio;     }  }6、文件内存映射存储实现启动// 对存储组件可以去执行startup启动函数 @Override public void startup() {     // 数据文件和索引文件加载     load();     // 数据恢复     recover();     // flush数据服务组件启动     flushDataService.start();     // 清理空间服务组件     cleanSpaceService.start(); }public void load() {     if (!hasLoaded.compareAndSet(false, true)) {         return;     }     // mmap内存映射数据文件加载和mmap内存映射索引文件加载     if (!this.dataFileList.load() || !this.indexFileList.load()) {         logger.error("Load file failed, this usually indicates fatal error, you should check it manually");         System.exit(-1);     } }7、追加数据// 我们可以把一条数据entry追加到我们的存储组件里来 @Override public DLedgerEntry appendAsLeader(DLedgerEntry entry) {     // 当前节点的状态是否是Leader,如果不是,则抛出异常     PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);     // 当前磁盘是否已满,其判断依据是DLedger的根目录或数据文件目录的使用率超过了允许使用的最大值,默认值为85%     PreConditions.check(!isDiskFull, DLedgerResponseCode.DISK_FULL);      // 从线程本地副本里获取到一个自己当前线程的数据缓冲区和索引缓冲区     ByteBuffer dataBuffer = localEntryBuffer.get();     ByteBuffer indexBuffer = localIndexBuffer.get();     // 把entry数据编码到数据缓冲区里去     DLedgerEntryCoder.encode(entry, dataBuffer);     // 通过数据缓冲区里面的remaining可以获取到entry大小     int entrySize = dataBuffer.remaining();      // 对server成员状态加锁     synchronized (memberState) {         PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER, null);         PreConditions.check(memberState.getTransferee() == null, DLedgerResponseCode.LEADER_TRANSFERRING, null);          // 所以说endindex+1了以后,从-1到0,随着追加数据累加的索引值         long nextIndex = ledgerEndIndex + 1;         // 设置一下索引值         entry.setIndex(nextIndex);         // 通过server成员状态获取到term第几轮         entry.setTerm(memberState.currTerm());         // 设置魔数         entry.setMagic(CURRENT_MAGIC);          // 把累加索引、当前term、魔数,写入到了数据缓冲区里去         DLedgerEntryCoder.setIndexTerm(                 dataBuffer,                 nextIndex,                 memberState.currTerm(),                 CURRENT_MAGIC         );          // 我准备把这条数据预追加到我们的数据文件mmapfiles里去         long prePos = dataFileList.preAppend(dataBuffer.remaining());         entry.setPos(prePos);         PreConditions.check(prePos != -1, DLedgerResponseCode.DISK_ERROR, null);         DLedgerEntryCoder.setPos(dataBuffer, prePos);          // 在正式写入数据之前可以回调我们的追加hook钩子         for (AppendHook writeHook : appendHooks) {             writeHook.doHook(entry, dataBuffer.slice(), DLedgerEntry.BODY_OFFSET);         }          // 数据文件mmapfiles追加对应的数据         long dataPos = dataFileList.append(                 dataBuffer.array(),                 0,                 dataBuffer.remaining()         );          PreConditions.check(dataPos != -1, DLedgerResponseCode.DISK_ERROR, null);         PreConditions.check(dataPos == prePos, DLedgerResponseCode.DISK_ERROR, null);          // 关于dledger索引追加写入         DLedgerEntryCoder.encodeIndex(                 dataPos,                 entrySize,                 CURRENT_MAGIC,                 nextIndex,                 memberState.currTerm(),                 indexBuffer         );          // 索引文件mmapfiles追加一条索引进文件         long indexPos = indexFileList.append(                 indexBuffer.array(),                 0,                 indexBuffer.remaining(),                 false         );          PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null);         if (logger.isDebugEnabled()) {             logger.info("[{}] Append as Leader {} {}", memberState.getSelfId(), entry.getIndex(), entry.getBody().length);         }          // 每次追加一条数据写入,写入完了以后endIndex就会累加         ledgerEndIndex++;         ledgerEndTerm = memberState.currTerm(); // 拿到成员状态的当前term         if (ledgerBeginIndex == -1) {             ledgerBeginIndex = ledgerEndIndex;         }         updateLedgerEndIndexAndTerm();          return entry;     } }8、对数据文件进行截断// 对数据文件做一个截断,有一部分数据就直接不要了 @Override public long truncate(DLedgerEntry entry, long leaderTerm, String leaderId) {     PreConditions.check(memberState.isFollower(), DLedgerResponseCode.NOT_FOLLOWER, null);      // 获取到线程本地副本里的数据缓冲区和索引缓冲区     ByteBuffer dataBuffer = localEntryBuffer.get();     ByteBuffer indexBuffer = localIndexBuffer.get();     DLedgerEntryCoder.encode(entry, dataBuffer);     int entrySize = dataBuffer.remaining();      synchronized (memberState) {         PreConditions.check(memberState.isFollower(), DLedgerResponseCode.NOT_FOLLOWER, "role=%s", memberState.getRole());         PreConditions.check(leaderTerm == memberState.currTerm(), DLedgerResponseCode.INCONSISTENT_TERM, "term %d != %d", leaderTerm, memberState.currTerm());         PreConditions.check(leaderId.equals(memberState.getLeaderId()), DLedgerResponseCode.INCONSISTENT_LEADER, "leaderId %s != %s", leaderId, memberState.getLeaderId());          // 直接去根据索引读取一条数据出来         boolean existedEntry;         try {             DLedgerEntry tmp = get(entry.getIndex());             existedEntry = entry.equals(tmp);         } catch (Throwable ignored) {             existedEntry = false;         }          long truncatePos = existedEntry ? entry.getPos() + entry.getSize() : entry.getPos();         if (truncatePos != dataFileList.getMaxWrotePosition()) {             logger.warn("[TRUNCATE]leaderId={} index={} truncatePos={} != maxPos={}, this is usually happened on the old leader", leaderId, entry.getIndex(), truncatePos, dataFileList.getMaxWrotePosition());         }          // 对这个位置开始的数据发起一个截断         dataFileList.truncateOffset(truncatePos);         if (dataFileList.getMaxWrotePosition() != truncatePos) {             logger.warn("[TRUNCATE] rebuild for data wrotePos: {} != truncatePos: {}", dataFileList.getMaxWrotePosition(), truncatePos);             PreConditions.check(dataFileList.rebuildWithPos(truncatePos), DLedgerResponseCode.DISK_ERROR, "rebuild data truncatePos=%d", truncatePos);         }          // 修订数据文件mmapfiles的已经flush位置         reviseDataFileListFlushedWhere(truncatePos);          if (!existedEntry) {             long dataPos = dataFileList.append(dataBuffer.array(), 0, dataBuffer.remaining());             PreConditions.check(dataPos == entry.getPos(), DLedgerResponseCode.DISK_ERROR, " %d != %d", dataPos, entry.getPos());         }          // 数据文件做了一个截断,索引文件也需要做一个截断         long truncateIndexOffset = entry.getIndex() * INDEX_UNIT_SIZE;         indexFileList.truncateOffset(truncateIndexOffset);         if (indexFileList.getMaxWrotePosition() != truncateIndexOffset) {             logger.warn("[TRUNCATE] rebuild for index wrotePos: {} != truncatePos: {}", indexFileList.getMaxWrotePosition(), truncateIndexOffset);             PreConditions.check(indexFileList.rebuildWithPos(truncateIndexOffset), DLedgerResponseCode.DISK_ERROR, "rebuild index truncatePos=%d", truncateIndexOffset);         }          reviseIndexFileListFlushedWhere(truncateIndexOffset);         DLedgerEntryCoder.encodeIndex(entry.getPos(), entrySize, entry.getMagic(), entry.getIndex(), entry.getTerm(), indexBuffer);         long indexPos = indexFileList.append(indexBuffer.array(), 0, indexBuffer.remaining(), false);         PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null);         ledgerEndTerm = entry.getTerm();         ledgerEndIndex = entry.getIndex();         reviseLedgerBeginIndex();         updateLedgerEndIndexAndTerm();          return entry.getIndex();     } }9、以follower身份追加数据// 以follower身份追加数据 @Override public DLedgerEntry appendAsFollower(DLedgerEntry entry, long leaderTerm, String leaderId) {     PreConditions.check(memberState.isFollower(), DLedgerResponseCode.NOT_FOLLOWER, "role=%s", memberState.getRole());     PreConditions.check(!isDiskFull, DLedgerResponseCode.DISK_FULL);      // 数据缓冲区和索引缓冲区     ByteBuffer dataBuffer = localEntryBuffer.get();     ByteBuffer indexBuffer = localIndexBuffer.get();     DLedgerEntryCoder.encode(entry, dataBuffer);     int entrySize = dataBuffer.remaining();      synchronized (memberState) {         PreConditions.check(memberState.isFollower(), DLedgerResponseCode.NOT_FOLLOWER, "role=%s", memberState.getRole());          long nextIndex = ledgerEndIndex + 1;          PreConditions.check(nextIndex == entry.getIndex(), DLedgerResponseCode.INCONSISTENT_INDEX, null);         PreConditions.check(leaderTerm == memberState.currTerm(), DLedgerResponseCode.INCONSISTENT_TERM, null);         PreConditions.check(leaderId.equals(memberState.getLeaderId()), DLedgerResponseCode.INCONSISTENT_LEADER, null);          // 在指定位置里追加数据进去         long dataPos = dataFileList.append(                 dataBuffer.array(),                 0,                 dataBuffer.remaining()         );          PreConditions.check(dataPos == entry.getPos(), DLedgerResponseCode.DISK_ERROR, "%d != %d", dataPos, entry.getPos());          DLedgerEntryCoder.encodeIndex(                 dataPos,                 entrySize,                 entry.getMagic(),                 entry.getIndex(),                 entry.getTerm(),                 indexBuffer         );         // 追加索引数据         long indexPos = indexFileList.append(                 indexBuffer.array(),                 0,                 indexBuffer.remaining(),                 false         );          PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null);          ledgerEndTerm = entry.getTerm();         ledgerEndIndex = entry.getIndex();         if (ledgerBeginIndex == -1) {             ledgerBeginIndex = ledgerEndIndex;         }         // 更新结尾index和term         updateLedgerEndIndexAndTerm();          return entry;     }  }10、根据索引去查询一个数据条目// 根据索引去查询一个数据条目 @Override public DLedgerEntry get(Long index) {     indexCheck(index);      // 定义好索引内存缓冲片段和数据内存缓冲片段     SelectMmapBufferResult indexSbr = null;     SelectMmapBufferResult dataSbr = null;      try {         // 直接通过索引文件mmapfiles去查询数据         // index本身其实第几条索引,每一条索引可以是一个单元是有自己大小,所以定位索引的偏移量         // index*unit_size,从那个位置开始读取unit_size大小的一条数据         indexSbr = indexFileList.getData(                 index * INDEX_UNIT_SIZE,                 INDEX_UNIT_SIZE         );          PreConditions.check(indexSbr != null && indexSbr.getByteBuffer() != null, DLedgerResponseCode.DISK_ERROR, "Get null index for %d", index);          indexSbr.getByteBuffer().getInt(); //magic         long pos = indexSbr.getByteBuffer().getLong();         int size = indexSbr.getByteBuffer().getInt();          // 根据数据位置和大小,再次从数据文件mmapfiles里面读取出来一条数据就可以了         dataSbr = dataFileList.getData(pos, size);          PreConditions.check(dataSbr != null && dataSbr.getByteBuffer() != null, DLedgerResponseCode.DISK_ERROR, "Get null data for %d", index);          // 把这个数据做一个解码         DLedgerEntry dLedgerEntry = DLedgerEntryCoder.decode(dataSbr.getByteBuffer());         PreConditions.check(pos == dLedgerEntry.getPos(), DLedgerResponseCode.DISK_ERROR, "%d != %d", pos, dLedgerEntry.getPos());          return dLedgerEntry;     } finally {         // 把之前读取的索引和数据的缓冲片段做一个释放         SelectMmapBufferResult.release(indexSbr);         SelectMmapBufferResult.release(dataSbr);     } }11、数据存储格式及编码
  日志条目
  DLedgerEntryCoder#encode()/**  * 编码  * @param entry 日志条目  * @param byteBuffer 缓冲区  */ public static void encode(DLedgerEntry entry, ByteBuffer byteBuffer) {     byteBuffer.clear();     int size = entry.computeSizeInBytes();     //always put magic on the first position     // 魔数,4 字节     byteBuffer.putInt(entry.getMagic());     // 条目总长度,包含Header(协议头) + 消息体,占4 字节     byteBuffer.putInt(size);     // 当前条目的index,占8 字节     byteBuffer.putLong(entry.getIndex());     // 当前条目所属的投票轮次,占8 字节     byteBuffer.putLong(entry.getTerm());     // 该条目的物理偏移量,类似于commitlog 文件的物理偏移量,占8 字节     byteBuffer.putLong(entry.getPos());     // 保留字段,当前版本未使用,占4 字节     byteBuffer.putInt(entry.getChannel());     // 当前版本未使用,占4 字节     byteBuffer.putInt(entry.getChainCrc());     // body 的CRC 校验和,用来区分数据是否损坏,占4 字节。     byteBuffer.putInt(entry.getBodyCrc());     // 用来存储body 的长度,占4 个字节。     byteBuffer.putInt(entry.getBody().length);     // 具体消息的内容。     byteBuffer.put(entry.getBody());     byteBuffer.flip(); }
  日志索引
  DLedgerEntryCoder#encodeIndex()/**  * 日志索引编码  * @param pos 日志条目在文件的偏移量  * @param size 条目大小  * @param magic 魔数  * @param index 索引  * @param term 投票轮次  * @param byteBuffer 缓冲区  */ public static void encodeIndex(long pos, int size, int magic, long index, long term, ByteBuffer byteBuffer) {     byteBuffer.clear();     // 魔数,4 字节     byteBuffer.putInt(magic);     // 日志条目在文件的偏移量,8字节     byteBuffer.putLong(pos);     // 条目大小,4字节     byteBuffer.putInt(size);     // 日志条目索引,8字节     byteBuffer.putLong(index);     // 投票轮次,8字节     byteBuffer.putLong(term);     byteBuffer.flip(); }

中国古代史上最难造反的一个朝代,从建国到灭亡,没有一个人成功在多数人的心目中,无论是北宋或是南宋,都有着相当差劲的军事实力。面对强大的外来势力,一向重文轻武的宋朝似乎总是无能为力。签订了一条又一条的合约,通过合约的形式来缓解周边国家的入侵,11月28日政协日历,一图速览1948年11月,国民党在东北华北战场接连失利,蒋介石感到大势已去,企图模仿曾国藩大练民团,在川滇黔三省成立编练新军司令部,做垂死挣扎。1949年1月,黄埔军校第一期毕业生曾任国民一纸桃花扇惹怒康熙,差点被诛九族,幸因自己姓孔得保全家过去封建王朝中,一代又一代的君王统治人民,不管是汉族还是少数民族,统治者都会学习一套学术理论让人民信任自己,而这套理论就是儒家思想,清朝的康熙帝也是如此,他认为想要统治汉族便要在思三台故事梓州风云录作者邹开歧开场白东川沃土钟灵秀,郪国后裔皆风流。古今多少英雄事,且看一一革命风云卷梓州!话说天府之国,巴蜀四川。中间是盆地,四面皆是山。在大山连小山,高山重矮山,开门就见山的四川盆从商鞅变法体现诚信之本商鞅也叫公孙鞅,到秦国后,通过秦国的宠臣景监求见孝公,向孝公讲述富国强兵的办法。秦孝公听了高兴万分,留公孙鞅一起商议国家大事。公孙鞅想实行变法,但秦国的贵族都不赞同。经过激烈的争论你知道植树节是为纪念谁的吗?每年3月12号是植树节,但是你知道植树节是怎么来的吗?端午节是为了纪念屈原,那植树节是为了纪念谁呢?当时中华民国南京政府成立不久孙中山时任临时总统,在1912年5月设立了农林部,下盎格鲁撒克逊盎格鲁撒克逊(AngloSaxon)一词常常被人提起,通常指英系民族,就象日耳曼指德系民族,斯拉夫指俄系民族一样。在美国如果说某人是WASP,这不是说他是爱惹麻烦的黄蜂,而是指他是乌克兰战争背后的地缘政治与文明圈对抗乌克兰战争背后的地缘政治与文明圈对抗乌克兰战争背后的地缘政治与文明圈对抗,是亚欧大陆文明复兴和东方文明复兴的前奏!世界老大美利坚领导下的新自由主义主要在盎格鲁撒克逊文化圈里流行,也无线电史话1962年西屋诡异娃娃接收180kHz电唱机发射信号1962年西屋莎拉纳德第15部分洋娃娃60年前,大众机械的编辑们对最新的玩具产品进行了一些测试,看看它们在粗放使用下的表现如何。研究结果发表在1962年11月号上。他们测试的最有趣一年亏损100亿!从亚马逊Alexa到GoogleX,硅谷巨头登月计划被砍编辑编辑部新智元导读从香饽饽到狗不理,硅谷大厂们面向未来的登月项目,究竟发生了什么?市场寒冬来袭,科技巨头纷纷抱团取暖。从裁员冻结招聘到削减成本,一切的目的,就是要少花钱,多赚钱。电动屋押注入门级电动小车一年推三款新品欲今年就盈亏平衡电动屋押注入门级电动小车一年推三款新品欲今年就盈亏平衡原创202201082245经济观察报经济观察网记者刘晓林濮振宇25万以上的新能源汽车市场已经竞争非常激烈,除了新造车品牌,所
广西玉林博白县暴雨持续发生内涝消防部门紧急疏散群众2022年7月4日下午,广西玉林,博白县双旺镇曾村十队发生内涝灾害,当地消防部门立即携带水域救援装备赶赴现场救援,在知情人的指引下奔赴灾区内部实施搜救,共疏散25户,82名群众,全山东残疾小伙,靠十字绣年入千万,坦言我不拜财神,只拜岳云鹏天有不测风云,人有旦夕祸福,这句话放在山东日照小伙匡志刚身上,真是再合适不过了。15岁时,匡志刚为救父亲,毅然决然冲进了火场,以肉体为父亲开辟了一条生路,自己却倒在了熊熊烈火中。虽中考作文撞题自家餐馆,背后故事很暖心点蓝色字关注中央广电总台中国之声人在外遇到困难是难免的。只要告诉店员要套餐A,找地方坐,吃完直接走就行。以套餐A的故事为材料写一篇文章,出现在了湖北武汉2022年中考语文科目的试卷与自家餐馆爱心餐撞题,女孩把父母写进中考作文备考时完全没想到会考这个,一点准备都没有。7月2日,武汉市2022年中考考生侯雪婷回忆起作文题时说。侯雪婷的中考成绩过了示范高中资格线,语文分数也超出了她的预期。武汉市2022年中完整打印版2022年贵州省黔东南州中考语文真试题卷(附参考答案)完整打印版2022年贵州省黔东南州中考语文真试题卷(附参考答案)。语文中考试卷试题举例选择题3下列句中,加点的成语使用恰当的一项是()A回首百年,无论风云变幻沧海桑田,中国青年爱党福建省森林消防总队严密组织党委中心组带机关第二次理论学习2022年以来,福建省森林消防总队党委坚持以深入学习贯彻习近平新时代中国特色社会主义思想为主线,以习近平总书记系列重要讲话精神为主题,以学习党的创新理论国家法律法规为主要内容,坚持1989年富豪2岁儿子被拐卖,被养父母困在农村种田32年,无妻无女2021年4月28日,四川省开江县某小区门口,突然铺上了全新的红地毯,大门上还拉起了横幅。好奇的人群为了一探究竟,纷纷涌向小区门口。被围在人群中心的是一对五十岁左右的夫妻,妻子手中6年2。95亿美金!布克成为人生大赢家目前,一年一度的NBA交易市场正在热火朝天进行,参赛各队都在积极备战着,力争帮助球队签下心仪球员,以在新赛季走得更远,吸引了不少球迷的目光。今天,联盟多笔交易签约正式官宣,一些球员外媒俄乌冲突阴影笼罩G20外长会据路透社伦敦7月6日报道,二十国集团(G20)外长本周将前往印尼度假胜地巴厘岛开会。报道认为,这场会议被俄乌冲突蒙上阴影,俄罗斯的出席可能将在G20中制造裂痕,而东道主印尼则试图斡刚上线的浪子降魔,飙升榜第一,陈国坤化身虎王,口吐火焰陈国坤主演的浪子降魔今日上映了,虽然是个大反派,可是比主角还厉害,在电影中化身虎王,为了给儿子报仇雪恨,不惜重出江湖,口吐火焰召唤闪电,打的人类俯首臣称!陈国坤在周星驰电影少林足球欧洲LHC重启有望破解暗物质之谜据美国雅虎新闻网站7月6日报道,2022年7月5日,位于瑞士日内瓦附近的欧洲核子研究中心大型强子对撞机(LHC)开始以有史以来最高功率运行,全力寻找暗物质。同日,该中心表示,参与L