范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

RocketMQ源码分析之主从同步服务组件HAService

  #头条创作挑战赛#
  上一篇:RocketMQ源码分析之核心磁盘数据结构CommitLog一、前言
  前面介绍了RocketMQ的CommitLog文件相关的类分析CommitLog物理日志相关的CommitLog类。其中有介绍到消息刷盘时高可用对应的submitReplicaRequest 方法,submitReplicaRequest 方法中如果配置的服务器的角色为SYNC_MASTER(从master同步),就会等待主从之间消息同步的进度达到设定的值之后才正常返回,如果超时则返回同步超时;// 提交复制请求 public CompletableFuture submitReplicaRequest(         AppendMessageResult result, MessageExt messageExt) {     // sync master的话,此时跟我们的dleger关系不大,主从同步,如果说主节点挂了以后,还得靠从节点手工运维切换成主节点     if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {         HAService service = this.defaultMessageStore.getHaService();         if (messageExt.isWaitStoreMsgOK()) {             // 通过HAService判断一下从节点是否ok             // 检查slave同步的位置是否小于 最大容忍的同步落后偏移量,如果是的则进行刷盘             if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {                 GroupCommitRequest request = new GroupCommitRequest(                         result.getWroteOffset() + result.getWroteBytes(),                         this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout() // 主从同步超时时间默认是3s                 );                 service.putRequest(request);                 service.getWaitNotifyObject().wakeupAll();                 return request.future(); // 可以通过future来等待主从同步完成             }             else {                 // 此时可能是从节点不可用                 return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);             }         }     }      return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); }public boolean isWaitStoreMsgOK() {     String result = this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);     if (null == result) {         return true;     }      return Boolean.parseBoolean(result); }
  这段代码的主要逻辑如下:如果服务器的角色设置为SYNC_MASTER,则进行下一步,否则直接跳过主从同步;获取HAService对象,检查消息是否本地存储完毕,如果没有则结束,否则进入下一步;检查slave同步的位置是否小于 最大容忍的同步落后偏移量参数haSlaveFallbehindMax,如果是的则进行主从同步刷盘。如果没有则返回slave不可用的状态;将消息落盘的最大物理偏移量也就是CommitLog上的偏移量作为参数构建一个GroupCommitRequest对象,然后提交到HAService;最多等待syncFlushTimeout长的时间,默认为5秒。在5秒内获取结果,然后根据结果判断是否返回超时;二、同步流程
  上面那段代码比较简单,因为主从的逻辑全部交给了 HAService  和 HAConnection  两个类处理了。这里先简单介绍一下整个同步的流程(同步模式)
  三、高可用服务HAService
  HAService是在RocketMQ的Broker启动的时候就会创建的,而创建的点在DefaultMessageStore这个消息存储相关的综合类中,在这个类的构造器中会创建HAService无论当前的Broker是什么角色。
  这里需要说明的是 Broker中的Master和Slaver两个角色,代码都是一样的,只不过是在实际执行的时候,走的分支不一样。 四、源码分析内部属性;构造函数;启动内部类;接受Slave连接处理; 检查同步进度和唤醒CommitLog刷盘线程; 主从同步客户端组件;Master同步日志(监听slave日志同步进度和同步日志、根据同步进度来唤醒刷盘CommitLog线程) ;1、内部属性
  在HAService中有几个比较重要的属性,这里需要简单的介绍一下:
  参数
  说明
  connectionList
  连接到master的slave连接列表,用于管理连接
  acceptSocketService
  用于接收连接用的服务,只监听OP_ACCEPT事件,监听到连接事件时候,创建HAConnection来处理读写请求事件
  waitNotifyObject
  一个消费等待模型类,用于处理高可用线程和CommitLog的刷盘线程交互
  push2SlaveMaxOffset
  master同步到slave的偏移量
  groupTransferService
  主从同步的检测服务,用于检查是否同步完成
  haClient
  高可用的服务,slave用来跟master建立连接,像master汇报偏移量和拉取消息
  // 主从同步服务 public class HAService {      private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);      // 连接数量     private final AtomicInteger connectionCount = new AtomicInteger(0);     // 主从建立的网络连接,因为一个master可能有多个slave     private final List connectionList = new LinkedList<>();     // 接收slave的socket服务     private final AcceptSocketService acceptSocketService;     // 所属的消息存储组件     private final DefaultMessageStore defaultMessageStore;     // 线程阻塞与唤醒同步对象     private final WaitNotifyObject waitNotifyObject = new WaitNotifyObject();     // 推送到slave最大偏移量     private final AtomicLong push2SlaveMaxOffset = new AtomicLong(0);     // 组传输服务     private final GroupTransferService groupTransferService;     // 主从同步客户端组件     private final HAClient haClient; }2、构造函数
  HAService只有一个构造器。逻辑也比较简单,创建一个AcceptSocketService开放一个端口为 10912的端口用于slave来简历连接,同时启动主从信息同步的任务groupTransferService用于接收CommitLog在高可用刷盘时提交任务。public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {     this.defaultMessageStore = defaultMessageStore;     // 创建,接受连接的服务, 开放的端口号为10912     this.acceptSocketService = new AcceptSocketService(             defaultMessageStore.getMessageStoreConfig().getHaListenPort()     );     // 创建主从信息同步的线程     this.groupTransferService = new GroupTransferService();     this.haClient = new HAClient(); }3、启动内部类
  HAService  在创建之后,会在 DefaultMessageStore  中调用其 start  方法,这个方法会启动其内部的几个内部类,用来主从同步; public void start() throws Exception {     // 接受连接的服务,开启端口,设置监听的事件     this.acceptSocketService.beginAccept();     // 开启服务不断检查是否有连接     this.acceptSocketService.start();     // 开启groupTransferService,接受CommitLog的主从同步请求     this.groupTransferService.start();     // 开启haClient,用于slave来建立与Master连接和同步     this.haClient.start(); }4、接受Slave连接
  AcceptSocketService这个类在Broker的Master和Slaver两个角色启动时都会创建,只不过区别是Slaver开启端口之后,并不会有别的Broker与其建立连接。因为只有在Broker的角色是Slave的时候才会指定要连接的Master地址。这个逻辑,在Broker启动的时候BrokerController类中运行的。// 主要是基于nio来实现的 class AcceptSocketService extends ServiceThread {      // 监听端口地址     private final SocketAddress socketAddressListen;     // nio里面的网络监听服务端     private ServerSocketChannel serverSocketChannel;     // 多路复用监听组件     private Selector selector;      // 给他传入一个监听端口号,构建好监听地址     public AcceptSocketService(final int port) {         this.socketAddressListen = new InetSocketAddress(port);     }      /**      * Starts listening to slave connections.      *      * @throws Exception If fails.      */     public void beginAccept() throws Exception {         // 打开nio网络监听服务端         this.serverSocketChannel = ServerSocketChannel.open();         // 打开selector多路复用组件         this.selector = RemotingUtil.openSelector();         // 设置我们的socket重用地址是true         this.serverSocketChannel.socket().setReuseAddress(true);         // 设置监听我们指定的端口号         this.serverSocketChannel.socket().bind(this.socketAddressListen);         // 配置是否nio阻塞模式是false         this.serverSocketChannel.configureBlocking(false);         // 把nio网络监听服务器注册到selector多路复用组件里去         this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);     }      /**      * {@inheritDoc}      */     @Override     public void shutdown(final boolean interrupt) {         super.shutdown(interrupt);         try {             this.serverSocketChannel.close();             this.selector.close();         } catch (IOException e) {             log.error("AcceptSocketService shutdown exception", e);         }     }      /**      * {@inheritDoc}      */     @Override     public void run() {         log.info(this.getServiceName() + " service started");          while (!this.isStopped()) {             try {                 // 通过selector多路复用组件监听我们的nio网络服务器是否有连接事件到达                 this.selector.select(1000);                 // 如果说确实是有从节点来连接我们,此时就会拿到一批selectedKeys                 Set selected = this.selector.selectedKeys();                  if (selected != null) {                     // 每一个新建立的连接都对应了一个selectionKey,就是一个连接的key句柄                     for (SelectionKey k : selected) {                         // 如果说过来的网络事件就是op_accept连接事件                         if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {                             // 通过调用nio网络监听服务器的accept函数,就可以完成tcp连接,获取到一个新的连接                             SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();                              if (sc != null) {                                 HAService.log.info("HAService receive new connection, "                                     + sc.socket().getRemoteSocketAddress());                                  try {                                     // 把获取到的从节点连接封装成一个HAConnection                                     HAConnection conn = new HAConnection(                                             HAService.this,                                             sc                                     );                                     // 同时启动这个从节点连接组件                                     conn.start();                                     // 把从节点连接加入到自己的连接列表里去                                     HAService.this.addConnection(conn);                                 } catch (Exception e) {                                     log.error("new HAConnection exception", e);                                     sc.close();                                 }                             }                         } else {                             log.warn("Unexpected ops in select " + k.readyOps());                         }                     }                      // 一次selectedKeys处理完毕了,就必须做一个clear                     selected.clear();                 }             } catch (Exception e) {                 log.error(this.getServiceName() + " service has exception.", e);             }         }          log.info(this.getServiceName() + " service end");     }      /**      * {@inheritDoc}      */     @Override     public String getServiceName() {         return AcceptSocketService.class.getSimpleName();     } }
  beginAccept 方法就是开启Socket,绑定10912端口,然后注册selector和指定监听的事件为OP_ACCEPT也就是建立连接事件。对应的IO模式为NIO模式。主要看其run方法,这个方法是Master用来接受Slave连接的核心。
  每过一秒检查一次是否有连接事件,如果有则建立连接,并把建立起来的连接加入到连接列表中进行保存。一直循环这个逻辑。5、检查同步进度和唤醒CommitLog刷盘线程
  GroupTransferService是CommitLog消息刷盘的类CommitLog与HAService打交道的一个中间类。在CommitLog中进行主从刷盘的时候,会创建一个CommitLog.GroupCommitRequest的内部类,这个类包含了当前Broker最新的消息的物理偏移量信息。然后把这个类丢给GroupTransferService处理,然后唤醒GroupTransferService。起始这个逻辑跟CommitLog内部的GroupCommitService逻辑一样。只不过对于同步部分的逻辑不一样,这里可以参考前面的文章存储部分(3)CommitLog物理日志相关的CommitLog类。
  先看 run  方法 /**  * 在run方法中会将传入的CommitLog.GroupCommitRequest从requestsWrite  * 转换到requestsRead中然后进行处理检查对应的同步请求的进度。检查的逻辑在doWaitTransfer中  */ public void run() {     log.info(this.getServiceName() + " service started");      while (!this.isStopped()) {         try {             // 这里进入等待,等待被唤醒,进入等待之前会调用onWaitEnd方法,然后调用swapRequests方法             // 把requestsWrite转换为requestsRead             this.waitForRunning(10);             // 进行请求处理             this.doWaitTransfer();         } catch (Exception e) {             log.warn(this.getServiceName() + " service has exception. ", e);         }     }      log.info(this.getServiceName() + " service end"); }
  再看doWaitTransfer  方法/**  * 1、比较Master推送到Slave的 偏移量push2SlaveMaxOffset是不是大于传进来的CommitLog.GroupCommitRequest中的偏移量  * 2、计算本次同步超时的时间节点,时间为当前时间加上参数系统配置参数syncFlushTimeout默认为5秒  * 3、如果第一步结果为true,则返回结果为PUT_OK。如果第一步为false,则每过一秒检查一次结果,如果超过5次了还没同步完成,则表示超时了那么返回结果为FLUSH_SLAVE_TIMEOUT。同时会唤醒CommitLog的刷盘线程。  */ private void doWaitTransfer() {     // 如果读请求不为空     if (!this.requestsRead.isEmpty()) {         for (CommitLog.GroupCommitRequest req : this.requestsRead) {             // 如果push到slave的偏移量 大于等于 请求中的消息的最大偏移量 表示slave同步完成             boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();             // 计算这次同步超时的时间点  同步的超时时间段为5s             long deadLine = req.getDeadLine();              // 如果没有同步完毕,并且还没达到超时时间,则等待1秒之后检查同步的进度             while (!transferOK && deadLine - System.nanoTime() > 0) {                 this.notifyTransferObject.waitForRunning(1000);                 transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();             }              // 超时或者同步成功的时候 唤醒主线程             req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);         }          this.requestsRead = new LinkedList<>();     } }
  主要逻辑如下:比较Master推送到Slave的 偏移量push2SlaveMaxOffset是不是大于传进来的CommitLog.GroupCommitRequest中的偏移量。计算本次同步超时的时间节点,时间为当前时间加上参数系统配置参数syncFlushTimeout默认为5秒。如果第一步结果为true,则返回结果为PUT_OK。如果第一步为false,则每过一秒检查一次结果,如果超过5次了还没同步完成,则表示超时了那么返回结果为FLUSH_SLAVE_TIMEOUT。同时会唤醒CommitLog的刷盘线程。6、主从同步客户端组件
  前面我们说到了只有是Salve角色的Broker才会真正的配置Master的地址,而HAClient是需要Master地址的,因此这个类真正在运行的时候只有Slave才会真正的使用到。
   先看看核心的参数信息// 从节点那边会用这个线程跟我们主节点建立连接,执行数据读写 class HAClient extends ServiceThread {      // 读数据缓冲区大小,4mb     private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;     // master地址     private final AtomicReference masterAddress = new AtomicReference<>();     // 从节点收到数据以后会返回一个8个字节的ack偏移量,固定8个字节     private final ByteBuffer reportOffset = ByteBuffer.allocate(8);     // nio网络连接     private SocketChannel socketChannel;     // nio多路复用组件     private Selector selector;     // 最近一次写数据时间戳     private long lastWriteTimestamp = System.currentTimeMillis();     // 当前上报过的偏移量     private long currentReportedOffset = 0;     // 分发位置     private int dispatchPosition = 0;     // 读数据缓冲区     private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);     // 备份数据缓冲区     private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); }
  基本上都是缓冲相关的配置。这里主要分析的是 run  方法中的逻辑 public void run() {     log.info(this.getServiceName() + " service started");      while (!this.isStopped()) {         try {             // 尝试去连接我们的master节点             if (this.connectMaster()) {                  // 是否要上报一下ack偏移量,间隔需要大于心跳的时间(5s)                 if (this.isTimeToReportOffset()) {                     // 向master 汇报当前 salve 的CommitLog的最大偏移量,并记录这次的同步时间                     boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);                     // 如果汇报完了就关闭连接                     if (!result) {                         this.closeMaster();                     }                 }                  // 如果说人家给你传输过来了数据                 this.selector.select(1000);                  // 向master拉取的信息                 boolean ok = this.processReadEvent();                 if (!ok) {                     this.closeMaster();                 }                  // 再次同步slave的偏移量如果,最新的偏移量大于已经汇报的情况下                 if (!reportSlaveMaxOffsetPlus()) {                     continue;                 }                  // 检查时间距离上次同步进度的时间间隔                 long interval =                     HAService.this.getDefaultMessageStore().getSystemClock().now()                         - this.lastWriteTimestamp;                 // 如果间隔大于心跳的时间,那么就关闭                 if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()                     .getHaHousekeepingInterval()) {                     log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress                         + "] expired, " + interval);                     this.closeMaster();                     log.warn("HAClient, master not response some time, so close connection");                 }             } else {                 // 等待                 this.waitForRunning(1000 * 5);             }         } catch (Exception e) {             log.warn(this.getServiceName() + " service has exception. ", e);             this.waitForRunning(1000 * 5);         }     }      log.info(this.getServiceName() + " service end"); }
  主要的逻辑如下:连接master,如果当前的broker角色是master,那么对应的masterAddress是空的,不会有后续逻辑。如果是slave,并且配置了master地址,则会进行连接进行后续逻辑处理检查是否需要向master汇报当前的同步进度,如果两次同步的时间小于5s,则不进行同步。每次同步之间间隔在5s以上,这个5s是心跳连接的间隔参数为haSendHeartbeatInterval向master 汇报当前 salve 的CommitLog的最大偏移量,并记录这次的同步时间从master拉取日志信息,主要就是进行消息的同步,同步出问题则关闭连接再次同步slave的偏移量,如果最新的偏移量大于已经汇报的情况下则从步骤1重头开始
  这里分析完了 run  方法,然后就要分析主要的日志同步的逻辑了,这个逻辑在 processReadEvent  方法中 private boolean processReadEvent() {     int readSizeZeroTimes = 0;     // 如果读取缓存还有没读取完,则一直读取     while (this.byteBufferRead.hasRemaining()) {         try {             // 可以把主从同步过来的数据读取到缓冲区里去             int readSize = this.socketChannel.read(this.byteBufferRead);             if (readSize > 0) {                 readSizeZeroTimes = 0;                 // 执行一次分发读请求                 boolean result = this.dispatchReadRequest();                 if (!result) {                     log.error("HAClient, dispatchReadRequest error");                     return false;                 }             } else if (readSize == 0) {                 if (++readSizeZeroTimes >= 3) {                     break;                 }             } else {                 log.info("HAClient, processReadEvent read socket < 0");                 return false;             }         } catch (IOException e) {             log.info("HAClient, processReadEvent read socket exception", e);             return false;         }     }      return true; }  private boolean dispatchReadRequest() {     // 请求的头信息     final int msgHeaderSize = 8 + 4; // phyoffset + size      while (true) {         // 获取分发的偏移差         int diff = this.byteBufferRead.position() - this.dispatchPosition;         // 如果偏移差大于头大小,说明存在请求体         if (diff >= msgHeaderSize) {             // 获取主master的最大偏移量             long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);             // 获取消息体             int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);             // 获取salve的最大偏移量             long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();              if (slavePhyOffset != 0) {                 if (slavePhyOffset != masterPhyOffset) {                     log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "                         + slavePhyOffset + " MASTER: " + masterPhyOffset);                     return false;                 }             }              // 如果偏移差大于 消息头和 消息体大小。则读取消息体             if (diff >= (msgHeaderSize + bodySize)) {                 byte[] bodyData = byteBufferRead.array();                 int dataStart = this.dispatchPosition + msgHeaderSize;                  // 把你读取到的数据追加到commitlog里面去                 HAService.this.defaultMessageStore.appendToCommitLog(                         masterPhyOffset, bodyData, dataStart, bodySize);                  // 记录分发的位置                 this.dispatchPosition += msgHeaderSize + bodySize;                  if (!reportSlaveMaxOffsetPlus()) {                     return false;                 }                  continue;             }         }          if (!this.byteBufferRead.hasRemaining()) {             this.reallocateByteBuffer();         }          break;     }      return true; }7、Master同步日志
  前面说过,在 HAService  的 AcceptSocketService  内部类中,Master会在建立连接的时候创建 HAConnection  用来处理读写事件。这里主要介绍构造函数和内部类就能了解原理了。
  成员变量public class HAConnection {      private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);      // 所属的HA高可用服务组件     private final HAService haService;     // nio网络连接     private final SocketChannel socketChannel;     // 跟我们建立连接的HA客户端组件的地址,从节点地址     private final String clientAddr;     // 网络连接写数据服务线程     private WriteSocketService writeSocketService;     // 网络连接读数据服务线程     private ReadSocketService readSocketService;      // 从节点请求获取的偏移量     private volatile long slaveRequestOffset = -1;     // 从节点同步数据后ack的偏移量     private volatile long slaveAckOffset = -1; }
  构造函数public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {     // 指定所属的 HAService     this.haService = haService;     // 指定的NIO的socketChannel     this.socketChannel = socketChannel;     // 客户端的地址     this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();     // 这是为非阻塞     this.socketChannel.configureBlocking(false);     // 是否启动SO_LINGER     // SO_LINGER作用:设置函数close()关闭TCP连接时的行为。缺省close()的行为是     // 如果有数据残留在socket发送缓冲区中则系统将继续发送这些数据给对方,等待被确认,然后返回。     this.socketChannel.socket().setSoLinger(false, -1);     // 是否开启TCP_NODELAY     this.socketChannel.socket().setTcpNoDelay(true);     if (NettySystemConfig.socketSndbufSize > 0) {         // 接收缓冲的大小         this.socketChannel.socket().setReceiveBufferSize(NettySystemConfig.socketSndbufSize);     }     if (NettySystemConfig.socketRcvbufSize > 0) {         // 发送缓冲的大小         this.socketChannel.socket().setSendBufferSize(NettySystemConfig.socketRcvbufSize);     }     // 把网络连接写数据服务线程和读数据服务线程都构建好     // 端口写服务     this.writeSocketService = new WriteSocketService(this.socketChannel);     // 端口读服务     this.readSocketService = new ReadSocketService(this.socketChannel);     // 增加haService中的连接数字段     this.haService.getConnectionCount().incrementAndGet(); }
  监听slave日志同步进度和同步日志
  WriteSocketService  监听的是 OP_WRITE  事件,注册的端口就是在 HAService  中开启的端口。 class WriteSocketService extends ServiceThread {      private final Selector selector;     private final SocketChannel socketChannel;      // 写数据头大小,12个字节     private final int headerSize = 8 + 4;     // 写数据头缓冲区     private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize);     // 从哪个位置开始传输     private long nextTransferFromWhere = -1;     // 对内存映射区域查询数据结果     private SelectMappedBufferResult selectMappedBufferResult;     // 最后一次写数据是否完成了,默认true     private boolean lastWriteOver = true;     // 最后一次写数据时间戳     private long lastWriteTimestamp = System.currentTimeMillis();      public WriteSocketService(final SocketChannel socketChannel) throws IOException {         this.selector = RemotingUtil.openSelector(); // 搞一个selector多路复用组件         this.socketChannel = socketChannel;         this.socketChannel.register(this.selector, SelectionKey.OP_WRITE); // 把这个网络连接注册到selector多路复用组件里去就可以了         this.setDaemon(true);     }      @Override     public void run() {         HAConnection.log.info(this.getServiceName() + " service started");          while (!this.isStopped()) {             try {                 // 如果说针对你的从节点此时可以执行写数据动作                 this.selector.select(1000);                  // 如果slave的读请求为 -1 表示没有slave 发出写请求,不需要处理                 if (-1 == HAConnection.this.slaveRequestOffset) {                     Thread.sleep(10);                     continue;                 }                  // nextTransferFromWhere 为-1 表示初始第一次同步,需要进行计算                 if (-1 == this.nextTransferFromWhere) {                     // 如果slave 同步完成 则下次同步从CommitLog的最大偏移量开始同步                     if (0 == HAConnection.this.slaveRequestOffset) {                         // 获取到commitlog里面最大的偏移量                         long masterOffset = HAConnection.this.haService.getDefaultMessageStore()                                 .getCommitLog().getMaxOffset();                         masterOffset = masterOffset                                 - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMappedFileSizeCommitLog());                          if (masterOffset < 0) {                             masterOffset = 0;                         }                          // 可以去设置一下当前需要从哪个位置开始来传输数据                         this.nextTransferFromWhere = masterOffset;                     } else {                         // 设置下次同步的位置,为 salve 读请求的位置                         this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;                     }                      log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr                         + "], and slave request " + HAConnection.this.slaveRequestOffset);                 }                  // 上次同步是否完成                 if (this.lastWriteOver) {                     // 上一次写数据时间戳到现在截止的差值                     long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;                      // 把这个时间差值跟ha发送心跳间隔做一个比对,如果超过了那个间隔,心跳间隔为 5000毫秒                     if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) {                          // Build Header                         // 开始去构建请求头,先设置我们要从哪个位置开始传输数据,心跳请求大小为12 字节                         this.byteBufferHeader.position(0);                         this.byteBufferHeader.limit(headerSize);                         this.byteBufferHeader.putLong(this.nextTransferFromWhere);                         this.byteBufferHeader.putInt(0);                         this.byteBufferHeader.flip();                          // 进行消息同步                         this.lastWriteOver = this.transferData();                         if (!this.lastWriteOver)                             continue;                     }                 }                 // 如果说是上一次传输还没完毕此时就传输数据就可以了                 else {                     this.lastWriteOver = this.transferData();                     // 如果还没同步完成则继续                     if (!this.lastWriteOver)                         continue;                 }                  // 构建完了header以后,此时就需要查询commitlog里面指定位置开始的一个数据片段                 SelectMappedBufferResult selectResult = HAConnection.this.haService                         .getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);                 if (selectResult != null) {                     int size = selectResult.getSize();                     // 检查要同步消息的长度,是不是大于单次同步的最大限制 默认为 32kb                     if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {                         size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();                     }                      long thisOffset = this.nextTransferFromWhere;                     this.nextTransferFromWhere += size;                      selectResult.getByteBuffer().limit(size);                     this.selectMappedBufferResult = selectResult;                      // Build Header                     this.byteBufferHeader.position(0);                     this.byteBufferHeader.limit(headerSize);                     this.byteBufferHeader.putLong(thisOffset);                     this.byteBufferHeader.putInt(size);                     this.byteBufferHeader.flip();                      this.lastWriteOver = this.transferData();                 } else {                     // 没有数据要传输呢?此时就是可以让我们的组件等待一会儿,等待100毫秒                     HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);                 }             } catch (Exception e) {                  HAConnection.log.error(this.getServiceName() + " service has exception.", e);                 break;             }         }          // 对等待通知组件做一个处理         HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();          if (this.selectMappedBufferResult != null) {             this.selectMappedBufferResult.release();         }          this.makeStop();          readSocketService.makeStop();          haService.removeConnection(HAConnection.this);          SelectionKey sk = this.socketChannel.keyFor(this.selector);         if (sk != null) {             sk.cancel();         }          try {             this.selector.close();             this.socketChannel.close();         } catch (IOException e) {             HAConnection.log.error("", e);         }          HAConnection.log.info(this.getServiceName() + " service end");     }      private boolean transferData() throws Exception {         int writeSizeZeroTimes = 0;         // Write Header         // 心跳的头没写满,先写头         while (this.byteBufferHeader.hasRemaining()) {             // 通过nio网络连接可以把请求头先发送出去             int writeSize = this.socketChannel.write(this.byteBufferHeader);             if (writeSize > 0) {                 writeSizeZeroTimes = 0;                 // 记录上次写的时间                 this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();             } else if (writeSize == 0) {                 // 重试3次 则不再重试                 if (++writeSizeZeroTimes >= 3) {                     break;                 }             } else {                 throw new Exception("ha master write header error < 0");             }         }          // 如果要同步的日志为null,则直接返回这次同步的结果是否同步完成         if (null == this.selectMappedBufferResult) {             return !this.byteBufferHeader.hasRemaining();         }          writeSizeZeroTimes = 0;          // Write Body         // 填充请求体         if (!this.byteBufferHeader.hasRemaining()) {             // 他会有一个要传输的一个commitlog里面的内存区域查询出来的数据片段             while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {                 // 把commitlog里面要传输的数据片段就写入到nio网络连接里去                 int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());                 if (writeSize > 0) {                     writeSizeZeroTimes = 0;                     this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();                 } else if (writeSize == 0) {                     // 重试3次                     if (++writeSizeZeroTimes >= 3) {                         break;                     }                 } else {                     throw new Exception("ha master write body error < 0");                 }             }         }          boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();          // 释放缓存         if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {             this.selectMappedBufferResult.release();             this.selectMappedBufferResult = null;         }          return result;     }      @Override     public String getServiceName() {         return WriteSocketService.class.getSimpleName();     }      @Override     public void shutdown() {         super.shutdown();     } }
   主要的逻辑如下:如果slave进行了日志偏移量的汇报,判断是不是第一次的进行同步以及对应的同步进度。设置下一次的同步位置检查上次同步是不是已经完成了,检查两次同步的周期是不是超过心跳间隔,如果是的则需要把心跳信息放到返回的头里面,然后进行消息同步。如果上次同步还没完成,则等待上次同步完成之后再继续从Master本地读取CommitLog的最大偏移量,根据上次同步的位置开始从CommitLog获取日志信息,然后放到缓存中如果缓存的大小大于单次同步的最大大小haTransferBatchSize默认是32kb,那么只同步32kb大小的日志。如果缓存为null,则等待100毫秒
  根据同步进度来唤醒刷盘CommitLog线程
  ReadSocketService 的作用主要是:根据Slave推送的日志同步进度,来唤醒HAService的GroupTransferService 然后进一步唤醒CommitLog的日志刷盘线程。这里主要看run 方法和processReadEvent 方法。class ReadSocketService extends ServiceThread {      // 读数据最大缓冲区大小,默认是1mb     private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;     // 多路复用监听组件     private final Selector selector;     // nio网络连接     private final SocketChannel socketChannel;     // 读数据缓冲区     private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);     // 处理消息位置     private int processPosition = 0;     // 最近一次读取到数据的时间戳     private volatile long lastReadTimestamp = System.currentTimeMillis();      public ReadSocketService(final SocketChannel socketChannel) throws IOException {         this.selector = RemotingUtil.openSelector();         this.socketChannel = socketChannel;         this.socketChannel.register(this.selector, SelectionKey.OP_READ);         this.setDaemon(true);     }      @Override     public void run() {         HAConnection.log.info(this.getServiceName() + " service started");          // 任务是否结束         while (!this.isStopped()) {             try {                 //  设置selector的阻塞时间,如果说从节点确实发送了请求过来                 this.selector.select(1000);                 // 处理salver读取消息的事件                 boolean ok = this.processReadEvent();                 if (!ok) {                     HAConnection.log.error("processReadEvent error");                     break;                 }                  // 检查此次处理时间是否超过心跳连接时间                 long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;                 if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {                     log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);                     break;                 }             } catch (Exception e) {                 HAConnection.log.error(this.getServiceName() + " service has exception.", e);                 break;             }         }          this.makeStop();          writeSocketService.makeStop();          haService.removeConnection(HAConnection.this);          HAConnection.this.haService.getConnectionCount().decrementAndGet();          SelectionKey sk = this.socketChannel.keyFor(this.selector);         if (sk != null) {             sk.cancel();         }          try {             this.selector.close();             this.socketChannel.close();         } catch (IOException e) {             HAConnection.log.error("", e);         }          HAConnection.log.info(this.getServiceName() + " service end");     }      @Override     public String getServiceName() {         return ReadSocketService.class.getSimpleName();     }      private boolean processReadEvent() {         int readSizeZeroTimes = 0;          // 如果说读取数据缓冲区已经没有空间了,此时就做一个flip,处理位置复位为0         if (!this.byteBufferRead.hasRemaining()) {             // 读请求缓冲转变为读取模式。             this.byteBufferRead.flip();             this.processPosition = 0;         }          // 但凡是读取数据缓冲区还有空间,就进入循环         while (this.byteBufferRead.hasRemaining()) {             try {                 // 一次性从网络连接里读取最大可能是1mb的读取缓冲空间的内容                 int readSize = this.socketChannel.read(this.byteBufferRead);                 if (readSize > 0) {                     readSizeZeroTimes = 0;                      // 更新一下本次读取到数据的时间戳                     this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore()                             .getSystemClock().now();                      // 读取缓冲区位置 - 处理位置,是大于等于8,至少说是读到了8个字节                     // 为什么是8个字节,因为salver向master发去拉取请求时,偏移量固定为8                     if ((this.byteBufferRead.position() - this.processPosition) >= 8) {                         // 获取消息开始的位置,做一个运算,用读取缓冲区位置 - 读取缓冲区位置 % 8                         int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);                         // 读取8个直接就是从节点返回的东西,就是从节点完成消息同步的最新的偏移量                         long readOffset = this.byteBufferRead.getLong(pos - 8);                         // 设置处理的位置                         this.processPosition = pos;                          // 把我们的slave的ack偏移量去做一个设置                         HAConnection.this.slaveAckOffset = readOffset;                         // 如果slave的 读请求 偏移量小于0 表示同步完成了                         if (HAConnection.this.slaveRequestOffset < 0) {                             // 重新设置slave的 读请求的 偏移量                             HAConnection.this.slaveRequestOffset = readOffset;                             log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);                         } else if (HAConnection.this.slaveAckOffset > HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()) {                             log.warn("slave[{}] request offset={} greater than local commitLog offset={}. ",                                     HAConnection.this.clientAddr,                                     HAConnection.this.slaveAckOffset,                                     HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset());                             return false;                         }                          // 如果说从节点已经接收到了一些数据之后,唤醒阻塞的线程, 我们就可以通知HAService去传输一些数据给从节点                         // 在消息的主从同步选择的模式是同步的时候,会唤醒被阻塞的消息写入的线程                         HAConnection.this.haService.notifyTransferSome(                                 HAConnection.this.slaveAckOffset                         );                     }                 } else if (readSize == 0) {                     // 如果数据为0超过3次,表示同步完成,直接结束                     if (++readSizeZeroTimes >= 3) {                         break;                     }                 } else {                     log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");                     return false;                 }             } catch (IOException e) {                 log.error("processReadEvent exception", e);                 return false;             }         }          return true;     } }
  整体的逻辑如下:每1s执行一次事件就绪选择,然后调用processReadEvent方法处理读请求,读取从服务器的拉取请求获取slave已拉取偏移量,因为有新的从服务器反馈拉取进度,需要通知某些生产者以便返回,因为如果消息发送使用同步方式,需要等待将消息复制到从服务器,然后才返回,故这里需要唤醒相关线程去判断自己关注的消息是否已经传输完成。也就是HAService的GroupTransferService如果读取到的字节数等于0,则重复三次,否则结束本次读请求处理;如果读取到的字节数小于0,表示连接被断开,返回false,后续会断开该连接。五、总结
  RocketMQ的主从同步之间的核心类就是 HAService  和 HAConnection  和其中的几个子类。结合前面的那个图可以简单的理解一下。

2023年特斯拉是跌下神坛还是再创辉煌?特斯拉疯狂打折2022年是特斯拉投资者们抱头痛哭的一年股票价格累计下跌了65,市值蒸发了高达6750亿美元。特斯拉市值下跌马斯克卖股票做收购,以及Twitter公司估值被投资人大幅桂冠电力研究报告高股息水电龙头,来水偏丰推动业绩反转(报告出品方作者广发证券,郭鹏,姜涛)一手握红水河优质资产,高分红配置价值突出(一)集团资产注入助力装机增长,2022年末水电装机10。23GW大唐集团旗下专业水电上市平台,整合集今晚调油价!国内汽柴油零售限价每吨分别下调205和195元来源央视财经今晚调油价春节出行加满一箱油将少花八元根据国家发改委消息,新一轮成品油调价窗口将于今天(1月17日)24时开启。据国家发改委价格监测中心监测,本轮成品油调价周期内(1月足协双巨头开始互相甩锅,陈戌源疑似弃卒保车,贾秀全被牵连冬日生活打卡季李铁案件已经调查有2个月之久的时间了,中国足坛的现在处于一个非常关键的时期,球迷们都期望在李铁这次事件里能够让卷入此事的圈内人被调查的彻底,毕竟多达20多个被带走调查新春走基层丨北兵南调他们跨越4000公里守护那片林海去年下半年以来,我国南方多个省份降雨量偏少,部分地区森林火险形势十分严峻。岁末年初,国家综合性消防救援队伍北兵南用,从黑龙江内蒙古等地抽调精锐力量,部署到南方部分重点林区,填补当地生活技能烹饪以及秘传任务详解生活技能烹饪烹饪内容介绍低级烹饪一共是16种,具体的等级要求可以看上图烹饪学到一定等级就没有包子了所以实际算的话是15种高级烹饪一共是11种所有的高级烹饪必须要有食谱才能制作秘制烹GamersNexus讲述EVGA超频团队利用RTX4090原型打破纪录故事IT之家1月16日消息,油管频道GamersNexus在最新一期视频中,分享了关于EVGAGPU最终章的故事。EVGA于去年9月宣布退出显卡市场,让游戏玩家发烧友和超频爱好者感到震太阳王路易十四有多强?单挑全欧不在乎,多线作战不落下风网络上有句调侃法国的话叫只有女人和矮子才能拯救法国。鼎鼎大名的太阳王路易十四身高不到170cm,相对而言,的确不算高,也算一个矮子。不过,路易十四治下的法国不仅不需要拯救,反而是一甲午战争之后,日本将所得赔款的三分之一用于乡村建设?在一期青年网络公开课中,中国人民大学教授三农问题专家温铁军谈到中日甲午战败后签订的马关条约,战争赔偿加赎辽费,合计大约有将近三亿两白银。日本拿到这个钱以后,三分之一购买军火,三分之张艺谋同框陈道明也不输!一个穿休闲装有魅力,一个穿西装好古板男士的穿衣打扮,最重要的就是摆脱油腻感,特别是对于中年阶段的男士来说,如果在挑选服装单品的时候,过于依赖成熟正式的感觉,其实只会适得其反。对于真正会打扮的男士来说,他们往往会用首选这才是小个子该有的打扮裤子露脚踝鞋子要加跟,想不显高都难小个子的姐妹们如何通过穿衣搭配来达到视觉增高的效果?请牢记裤子露脚踝和鞋子要加跟这两条穿搭原则。在视觉上达到11大于2的完美效果,大长腿轻松穿出来!这篇文章就从选款和搭配两个角度入
阳西税务优化出口退税服务,助力企业高质量发展我们是新登记不久的出口企业,对出口的涉税业务办理流程了解不多,税务部门主动上门给我们讲解如何申报办理过程要注意哪些细节如何规范备案单证等等,很实用!绿客门(广东)食品有限公司财务经华为P60和一加11推荐哪个?这两个手机价格上的差距有点大,256GB版本的一加11只要3999元,比华为P60便宜了1000元。对于拍照要求不是太高的朋友,个人建议选择一加11,因为价格更便宜更符合需求,主要东莞科创制造强市突围战破解产业空间碎片化全域招商激活新动能阳春三月,位于东莞谢岗的东莞比亚迪新能源汽车关键零部件项目正式动工开建,预计明年开始,比亚迪将在东莞量产新能源发动机。作为世界工厂,东莞是全国第15个GDP破万亿常住人口超千万的双华为P60系列MateX3系列推候补模式按排队顺序发货手机中国新闻3月23日,华为在春季新品发布会上正式推出了年度重磅新品华为P60系列以及华为MateX3系列,并且两款产品均已在华为商城上架。值得注意的是,此次华为还提供了60天候补小米和华为高管画风差异大,原因是什么?为什么小米高管和华为高管画风完全不一样小米和华为是两家不同的公司,它们的文化品牌形象和市场定位都有所不同,因此高管的画风也会有所不同。小米的品牌形象比较年轻时尚活泼,注重创新和用户中国式现代化的圳治样本供电转改直闯出深圳模式工业园区是工业生产的大本营根据地,深圳全力以赴打好工业园区转供电改造收官之战,真正破解中小企业转供电不合理加价问题。日前,深圳2172个工业园区转供电改造全面完成,标志着深圳市在全沙尘弥漫,北京出现火星同款蓝太阳北京日报客户端记者骆倩雯谢永利现在,北京昏暗的天色里,出现了火星同款蓝太阳,这是怎么回事?北京日报客户端谢永利摄气象专家表示,这是因为北京正遭受着沙尘天气,漫天沙尘之下,米氏散射(社群发布飞针送药研究发明无人机靶向急救给药系统大规模语言预训练模型前沿领域取得新进展跟踪科研成果,掌握最新动态!1有机太阳能电池稳定性研究取得进展近日,中国科学院国家纳米科学中心周惠琼课题组在有机太阳能电池稳定性研究方面取得重要进展,相关研究成果在线发表于Natu持续十年办国际人才创业大会,北京朝阳引凤更筑巢中国青年报客户端北京3月25日电(中青报中青网记者张敏)以集聚国际人才,创新数字经济为主题的第十届朝阳国际人才创业大会(ITEC)创新峰会今天举办。峰会涵盖全球创业赛颁奖及展示主旨Lafayette1482023春夏系列欣赏Lafayette1482023春夏系列,创意总监EmilySmith从纸的物质属性及从艺术载体变为艺术本身的可能性中汲取设计灵感。甄选高端质感面料,赋予其精雕细琢的结构构造,呈现预告闽东特色乡村振兴之路观察点启动仪式,明日相约寿宁!走进水洋村人居环境美丽庭院,品尝硒锌一桌菜置身亭溪大熟古村落,感受乡村新面貌前往银山花田景区,体验硐天福地美景古银硐之一迴龙硐明天,闽东特色乡村振兴之路观察点启动仪式将在寿宁县大安