范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

RocketMQ源码分析之RemotingCommand网络通信协议源码分析

  一、前言
  在分析NameServer的请求和响应流程之前我们需要先看一下他的序列化协议是怎样的,RocketMQ支持的序列化协议有以下2种:JSON;RocketMQ自定义的协议;
  json进行序列化其实是省力做法,效率是比较差的,序列化以后的数据格式是比较占用空间,一般成熟的中间件项目一般都会采用自定义的方式进行序列化和反序列化;二、RemotingCommand源码分析
  RemotingCommand为RocketMQ中自定义协议组件,其中包含了序列化和反序列化代码逻辑;
  但是不向服务直接提供调用,而是通过前文讲解的NettyRemotingServer 类中的NettyEncoder (编码器)和NettyDecoder (解码器)进行具体的调用;
  序列化:就是将一段字节数组以固定的顺序的形式存放数据,第一个字节存放什么,后面4个字节存放什么,再后面几个字节存放什么;
  反序列化:就是以固定的顺序取数据,你第一个字节存放的是消息的标志位,那你取出来就是消息的标志位,再后面4个为消息体的长度,那取出来就是消息体的长度,你再可以根据消息体的长度去获取对应长度字节的数据;1、数据模型public class RemotingCommand {      public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";     public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE";     public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version";     private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);     private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND     private static final int RPC_ONEWAY = 1; // 0, RPC     private static final Map, Field[]> CLASS_HASH_MAP =         new HashMap, Field[]>();     private static final Map CANONICAL_NAME_CACHE = new HashMap();     // 1, Oneway     // 1, RESPONSE_COMMAND     private static final Map NULLABLE_FIELD_CACHE = new HashMap();     private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName();     private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName();     private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName();     private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName();     private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName();     private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName();     private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();     private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();     private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();     private static volatile int configVersion = -1;     private static AtomicInteger requestId = new AtomicInteger(0);      private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;      static {         final String protocol = System.getProperty(SERIALIZE_TYPE_PROPERTY, System.getenv(SERIALIZE_TYPE_ENV));         if (!isBlank(protocol)) {             try {                 serializeTypeConfigInThisServer = SerializeType.valueOf(protocol);             } catch (IllegalArgumentException e) {                 throw new RuntimeException("parser specified protocol error. protocol=" + protocol, e);             }         }     }      // code编号,请求编号     private int code;     private LanguageCode language = LanguageCode.JAVA; // 编程语言,java     private int version = 0; // 版本号     private int opaque = requestId.getAndIncrement(); // 请求id     private int flag = 0; // 标识     private String remark; // 备注     private HashMap extFields; // 扩展字段     private transient CommandCustomHeader customHeader; // 自定义header头     // 这一次rpc调用的序列化类型,默认就是json格式     private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;     // 消息体,会把真正的消息体序列化成字节数组     private transient byte[] body; }2、序列化
  org.apache.rocketmq.remoting.netty.NettyEncoder#encodepublic void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)     throws Exception {     try {         ByteBuffer header = remotingCommand.encodeHeader();         out.writeBytes(header);         byte[] body = remotingCommand.getBody();         if (body != null) {             out.writeBytes(body);         }     } catch (Exception e) {         log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);         if (remotingCommand != null) {             log.error(remotingCommand.toString());         }         RemotingUtil.closeChannel(ctx.channel());     } }public ByteBuffer encodeHeader() {     return encodeHeader(this.body != null ? this.body.length : 0); }  public ByteBuffer encodeHeader(final int bodyLength) {     // 1> header length size     int length = 4;      // 2> header data length     byte[] headerData;     headerData = this.headerEncode();      length += headerData.length;      // 3> body data length     length += bodyLength;      ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);      // length     result.putInt(length);      // header length     result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));      // header data     result.put(headerData);      result.flip();      return result; }
  这里会去判断序列化协议的类型,json类型其实没什么好看的,JSON.toJSONString(obj, prettyFormat).getBytes(CHARSET_UTF8); 就没了,我们主要是看RocketMQ的自定义协议;private byte[] headerEncode() {     // 把自定义headers放到一个ext fields map里去     this.makeCustomHeaderToNet();     if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {         return RocketMQSerializable.rocketMQProtocolEncode(this);     } else {         return RemotingSerializable.encode(this);     } }public void makeCustomHeaderToNet() {     if (this.customHeader != null) {         // 通过反射获取到自定义header类里面的fields         Field[] fields = getClazzFields(customHeader.getClass());         if (null == this.extFields) {             this.extFields = new HashMap();         }          // 对自定义header类的fields进行遍历         for (Field field : fields) {             if (!Modifier.isStatic(field.getModifiers())) {                 String name = field.getName();                 if (!name.startsWith("this")) {                     Object value = null;                     try {                         field.setAccessible(true);                         value = field.get(this.customHeader);                     } catch (Exception e) {                         log.error("Failed to access field [{}]", name, e);                     }                      // 自定义header这些fields都是放到ext fields里面去                     if (value != null) {                         this.extFields.put(name, value.toString());                     }                 }             }         }     } }private Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) {     // 如果说你要是自定义了一套header以后,你搞一个类,实现接口     // 然后在这个自定义头的类里,可以定义一堆的field,这些field就是你的自定义的头     Field[] field = CLASS_HASH_MAP.get(classHeader);      if (field == null) {         // 通过反射直接获取到你自定义类里的头fields拿出来         field = classHeader.getDeclaredFields();         synchronized (CLASS_HASH_MAP) {             CLASS_HASH_MAP.put(classHeader, field);         }     }     return field; }public static byte[] markProtocolType(int source, SerializeType type) {     byte[] result = new byte[4];      result[0] = type.getCode(); // header length里一共是4个字节,第一个字节是序列化类型code     result[1] = (byte) ((source >> 16) & 0xFF); // 第二个字节开始到第四个字节,一共是3个字节都是跟header length是有关系的     result[2] = (byte) ((source >> 8) & 0xFF);     result[3] = (byte) (source & 0xFF);     return result; }
  其实自定义序列化就是搞一个byte数组,采用固定的显示进行构建。
  如:第一个字节放请求类型,后面四个字节放消息体总长度,在后面发具体的消息体。消息体前面几位为header长度,后面为header消息体等等,通过固定排列的顺序进行构建,这样解析的时候我们就可以根据字节顺序来读取消息了。public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {     // 用json进行序列化其实是省力做法,效率是比较差的,序列化以后的数据格式是比较占用空间一些     // 常规做法是自己对RemotingCommand协议数据对象进行序列化     // 编码,对象 -> 字节数组      // String remark     byte[] remarkBytes = null;     int remarkLen = 0;     if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {         remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8);         remarkLen = remarkBytes.length;     }      // HashMap extFields     // ext fields,是我们可能的自定义headers就在这里,把扩展头序列化为字节数组     byte[] extFieldsBytes = null;     int extLen = 0;     if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) {         extFieldsBytes = mapSerialize(cmd.getExtFields());         extLen = extFieldsBytes.length;     }      // 计算出来消息头总长度     int totalLen = calTotalLen(remarkLen, extLen);      ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);     // int code(~32767)     headerBuffer.putShort((short) cmd.getCode());     // LanguageCode language     headerBuffer.put(cmd.getLanguage().getCode());     // int version(~32767)     headerBuffer.putShort((short) cmd.getVersion());     // int opaque     headerBuffer.putInt(cmd.getOpaque());     // int flag     headerBuffer.putInt(cmd.getFlag());     // String remark     if (remarkBytes != null) {         headerBuffer.putInt(remarkBytes.length);         headerBuffer.put(remarkBytes);     } else {         headerBuffer.putInt(0);     }     // HashMap extFields;     if (extFieldsBytes != null) {         headerBuffer.putInt(extFieldsBytes.length);         headerBuffer.put(extFieldsBytes);     } else {         headerBuffer.putInt(0);     }      return headerBuffer.array(); }3、反序列化
  org.apache.rocketmq.remoting.netty.NettyDecoder#decodepublic Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {     ByteBuf frame = null;     try {         frame = (ByteBuf) super.decode(ctx, in);         if (null == frame) {             return null;         }          ByteBuffer byteBuffer = frame.nioBuffer();          return RemotingCommand.decode(byteBuffer);     } catch (Exception e) {         log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);         RemotingUtil.closeChannel(ctx.channel());     } finally {         if (null != frame) {             frame.release();         }     }      return null; }public static RemotingCommand decode(final ByteBuffer byteBuffer) throws RemotingCommandException {     // 解码的过程就是编码过程的逆向过程     int length = byteBuffer.limit(); // 总长度     int oriHeaderLen = byteBuffer.getInt(); // 头长度     int headerLength = getHeaderLength(oriHeaderLen);      // 搞一个头长度的字节数组,一次性把headers都读出来放到字节数组里去     byte[] headerData = new byte[headerLength];     byteBuffer.get(headerData);      // 对header要做一个解码     RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));      int bodyLength = length - 4 - headerLength;     byte[] bodyData = null;     if (bodyLength > 0) {         bodyData = new byte[bodyLength];         byteBuffer.get(bodyData);     }     cmd.body = bodyData;      return cmd; }
  这里判断header是用什么协议进行序列化的,就会使用什么协议进行反序列化;private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) throws RemotingCommandException {     switch (type) {         case JSON:             RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);             resultJson.setSerializeTypeCurrentRPC(type);             return resultJson;         case ROCKETMQ:             RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);             resultRMQ.setSerializeTypeCurrentRPC(type);             return resultRMQ;         default:             break;     }      return null; }
  我们之间看rocketMQ自定义的协议吧,其实就是一个逆向的过程,你之前放的什么,他就根据字节拿出来;public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) throws RemotingCommandException {     RemotingCommand cmd = new RemotingCommand();     ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);     // int code(~32767)     cmd.setCode(headerBuffer.getShort());     // LanguageCode language     cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));     // int version(~32767)     cmd.setVersion(headerBuffer.getShort());     // int opaque     cmd.setOpaque(headerBuffer.getInt());     // int flag     cmd.setFlag(headerBuffer.getInt());     // String remark     int remarkLength = headerBuffer.getInt();     if (remarkLength > 0) {         if (remarkLength > headerArray.length) {             throw new RemotingCommandException("RocketMQ protocol decoding failed, remark length: " + remarkLength + ", but header length: " + headerArray.length);         }         byte[] remarkContent = new byte[remarkLength];         headerBuffer.get(remarkContent);         cmd.setRemark(new String(remarkContent, CHARSET_UTF8));     }      // HashMap extFields     int extFieldsLength = headerBuffer.getInt();     if (extFieldsLength > 0) {         if (extFieldsLength > headerArray.length) {             throw new RemotingCommandException("RocketMQ protocol decoding failed, extFields length: " + extFieldsLength + ", but header length: " + headerArray.length);         }         byte[] extFieldsBytes = new byte[extFieldsLength];         headerBuffer.get(extFieldsBytes);         cmd.setExtFields(mapDeserialize(extFieldsBytes));     }          return cmd; }三、总结RemotingCommand为rocketMQ的序列化和反序列化的组件,所有消息都需要使用他进行处理;序列化和反序列化是根据约定的协议存放数据,再根据约定的协议取数据;

当年NBA的坏小子军团到底坏到了什么程度?可以令整个联盟闻风丧胆?只要说到8090年代,我又要炒回锅饭了,兄弟们别骂我。君临天下迈克尔乔丹封神之路中有专门的章节,写了浪费啊。(二十二)组团,坏孩子军团众生相198788赛季,青年军芝加哥公牛队冲劲反转!艾克森向足协摊牌另有隐情?大家是否错怪艾神了?中国男足马上就要确定赴日本征战12强赛的名单了,在50多人的大名单中,最终要筛选出23名球员,再加上4到5人的备选人员。归化球员中除了蒋光泰和阿兰,其他球员都没有明确表示将要参赛,怎样的搭配才不会很快被潮流淘汰?时尚就是不断的改变,找到适合自己的风格做个百变的女人。今天小柒给大家介绍几个时尚的风格穿搭。极简风格极简冷淡风穿搭,追求简洁有质感的穿搭风格,颜色素简却轻奢有品,极简风格永远都不会劳保鞋有没有推荐的?卫尔盾透气,有钢头抗砸。纯绿色经典解放鞋啊还用犹豫的嘛1DDTX2代尔塔3鼎固4巴顿5奥朗DDTX和代尔塔的鞋比较贵,属于高端的,比较贵!劳保鞋的牌子有很多,其中上海青衫企业发展有摩托大军往年火爆,今年却消失了,农民工返乡咋不骑摩托了?往年火爆的摩托大军,今年消失了,农民工返乡不骑摩托了,原因是多方面的,主要有一城市禁摩限电,不让骑摩托车,摩托车上不了高速,农民工都乘坐其他交通工具回家二疫情原因,人员流动受限,农有一万块钱你愿意买电动车,还是摩托车呢?买通勤125的踏板车,油耗基本2。5个以内,算下来最多1。5毛一公里(指两桶油官方价目前都是7元多一升),一年开1万公里1500元,保养2000公里一次5次也就100多元,京东自己中国汽车品牌平均售价排名曝光,谁是真的高价值品牌?高价值品牌不仅要以量产车售价判断近日网传的一张中国汽车品牌第一梯队品牌平均售价表的图片引起热议,上面有几个售价挺高的品牌。分别为高合,68万蔚来,43。47万理想,33。8万岚图,北大仓和汾酒有什么区别?哪个更好?北大仓酒,是黑龙江北大仓集团有限公司出品的一款白酒,企业标准50度,这款酒是酱香型白酒,酒质微黄,有北国茅台之称,口感有麸曲的酱香,高梁酒的特殊味道,回味有香甜,不那么醇厚,略显单武松真能吃下二斤牛肉喝下十八碗酒吗?说实在的,武松一顿饭才吃二斤牛肉,喝十八碗酒,跟古代的大胃王们一比,差得太远了!古人记载,有个叫申香的人,身高一丈八尺,人高马大,又特别能吃。他一顿饭能吃一石粮食,吃三十斤肉。根据三一重工换帅,梁稳根为什么不选取儿子小梁?他儿子梁在中已经彻底退出三一重工的运营管理了,目前担任三湘银行的董事长,负责三一轻资产的运作。还有,三一重工是三一集团的子公司,梁稳根依旧是集团的董事长。机智机智机智不是不选,是暂投资与投机的区别你知道吗?老矿工了,17年开始挖矿,经历了18年的矿难,14万一个月跌到1万!刚买的最新蚂蚁S9矿机31500,跌到一千多,下个月100多废铁价!从此不再挖矿!至今三年多过去了,去年11月朋
坚持喝这个汤两个月,月经恢复到生娃前的状态喝这个汤之前每次月经量让人堪忧,两天就差不多快结束了,第三天用护垫就可以,第四天基本就已经干净了,让我感觉自己快要闭经了,一想到这个就很绝望。有一次跟朋友聊天,她就比我小一岁,但她三体上映了,凶手至今还未定罪,林奇何时能瞑目?在万众期待中,三体电视剧终于登陆中央电视台和腾讯视频。在电视剧片头,总出品人林奇的名字已经被框上了。是的,这个对三体充满热爱甚至是狂热的人并没有等到电视剧的上映就去世了,而他和凶手风筝9郑耀先为什么除掉高占龙留下田湖?历史背景在这里电视剧风筝中,红色特工之王郑耀先在除掉了中统要员高占龙后,却留下了更加阴险歹毒的田湖,导致最后差点命丧其手。很多人认为,这是郑耀先的一大失误,是风筝的一个明显漏洞,但是如果把这段剧甄嬛传皇后是如何凭实力,用一碗鸭子汤把皇帝气走的呢?皇帝因为有大臣建议,让他不要太过宠功臣年羹尧,不然后果严重,常言道,忠言逆耳,也是佩服这位大臣,发现老板脸色不对,马上来了句,臣大概是多虑了,就一溜烟跑了。留下老板自我在那窝火,窝流浪地球2满江红谁的票房会更高?第二篇电影票房属于商业价值,怎么才能够创造更高的商业价值,那还是要看电影本身的价值。下面就慢慢说郭帆龚格尔两个小菜鸟的流浪地球2怎么挑战张艺谋陈宇两个大佬的满江红。故事情节先说流浪地球2兔票选美旗袍秀里,我对春晚有个期待娱兔迎春娱兔迎春兔子本来应该乖巧可爱,我们都愿意称作小兔几小兔兔。艺术讲究创新,它的生命力也在于创新。不断的推陈出新,才有了艺术的与人共鸣。但有时候,科技树能点歪,白费周章。艺术这欧阳娜娜罕见携妈妈露面,穿黑裙好清纯,母亲傅娟60岁仍有玉女范黑色在百搭方面,可以说是独树一帜,代表着神秘成熟和沉稳。比如,黑裙总给人一种特别有气质的氛围感,无论是日常出街,还是出席重要场合,都是大家的不二之选。无论是什么年纪,都能有不同款式爆了!剧版三体热搜霸屏,网友一口气炫完四集整个宇宙开始闪烁!1月15日2130杀青两年后电视剧版三体在CCTV8腾讯视频等平台首播可以没见过世面,但一定不能没看过三体作为现象级科幻小说三体连续多年位居国内畅销图书排行榜前列怎样保护未阳老人?返乡到家后出现症状咋办?这些问题要注意!春节将至返乡之前要做哪些准备?回家途中怎么做好防护?返乡后怎样保护未阳老人?阳康人群在饮食运动方面要注意什么?平安过春节,送你9个健康锦囊来源人民日报编辑阿婕审核叶子施晓斌猜你关注97年河南老汉捡到弃婴,捡垃圾养大,如今养女身家千万如何报答?2022年7月10日凌晨,在北京市天安门广场的升旗仪式上,有个年轻的女孩蹲下身。她拍了拍自己的肩膀,朝着身边的老年人说道爸爸,你到我背上来,我背着你看升旗!她的父亲面露窘迫,双手一刘亦菲的妈妈刘晓莉两婚两离,至今单身,长得比女儿还漂亮3年四部作品,从金粉世家到神雕侠侣,刘亦菲为自己挣来20余年的神仙姐姐称号。她的再度回归,是2022年的梦华录,大家看到她那美到极致的容颜,感叹她是冻龄美人时,刘亦菲却表示,自己在