RocketMQ源码分析之RemotingCommand网络通信协议源码分析
一、前言
在分析NameServer的请求和响应流程之前我们需要先看一下他的序列化协议是怎样的,RocketMQ支持的序列化协议有以下2种:JSON;RocketMQ自定义的协议;
json进行序列化其实是省力做法,效率是比较差的,序列化以后的数据格式是比较占用空间,一般成熟的中间件项目一般都会采用自定义的方式进行序列化和反序列化;二、RemotingCommand源码分析
RemotingCommand为RocketMQ中自定义协议组件,其中包含了序列化和反序列化代码逻辑;
但是不向服务直接提供调用,而是通过前文讲解的NettyRemotingServer 类中的NettyEncoder (编码器)和NettyDecoder (解码器)进行具体的调用;
序列化:就是将一段字节数组以固定的顺序的形式存放数据,第一个字节存放什么,后面4个字节存放什么,再后面几个字节存放什么;
反序列化:就是以固定的顺序取数据,你第一个字节存放的是消息的标志位,那你取出来就是消息的标志位,再后面4个为消息体的长度,那取出来就是消息体的长度,你再可以根据消息体的长度去获取对应长度字节的数据;1、数据模型public class RemotingCommand { public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type"; public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE"; public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version"; private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND private static final int RPC_ONEWAY = 1; // 0, RPC private static final Map, Field[]> CLASS_HASH_MAP = new HashMap, Field[]>(); private static final Map CANONICAL_NAME_CACHE = new HashMap(); // 1, Oneway // 1, RESPONSE_COMMAND private static final Map NULLABLE_FIELD_CACHE = new HashMap(); private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName(); private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName(); private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName(); private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName(); private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName(); private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName(); private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName(); private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName(); private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName(); private static volatile int configVersion = -1; private static AtomicInteger requestId = new AtomicInteger(0); private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON; static { final String protocol = System.getProperty(SERIALIZE_TYPE_PROPERTY, System.getenv(SERIALIZE_TYPE_ENV)); if (!isBlank(protocol)) { try { serializeTypeConfigInThisServer = SerializeType.valueOf(protocol); } catch (IllegalArgumentException e) { throw new RuntimeException("parser specified protocol error. protocol=" + protocol, e); } } } // code编号,请求编号 private int code; private LanguageCode language = LanguageCode.JAVA; // 编程语言,java private int version = 0; // 版本号 private int opaque = requestId.getAndIncrement(); // 请求id private int flag = 0; // 标识 private String remark; // 备注 private HashMap extFields; // 扩展字段 private transient CommandCustomHeader customHeader; // 自定义header头 // 这一次rpc调用的序列化类型,默认就是json格式 private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer; // 消息体,会把真正的消息体序列化成字节数组 private transient byte[] body; }2、序列化
org.apache.rocketmq.remoting.netty.NettyEncoder#encodepublic void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { ByteBuffer header = remotingCommand.encodeHeader(); out.writeBytes(header); byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); } } catch (Exception e) { log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { log.error(remotingCommand.toString()); } RemotingUtil.closeChannel(ctx.channel()); } }public ByteBuffer encodeHeader() { return encodeHeader(this.body != null ? this.body.length : 0); } public ByteBuffer encodeHeader(final int bodyLength) { // 1> header length size int length = 4; // 2> header data length byte[] headerData; headerData = this.headerEncode(); length += headerData.length; // 3> body data length length += bodyLength; ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // length result.putInt(length); // header length result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); result.flip(); return result; }
这里会去判断序列化协议的类型,json类型其实没什么好看的,JSON.toJSONString(obj, prettyFormat).getBytes(CHARSET_UTF8); 就没了,我们主要是看RocketMQ的自定义协议;private byte[] headerEncode() { // 把自定义headers放到一个ext fields map里去 this.makeCustomHeaderToNet(); if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { return RocketMQSerializable.rocketMQProtocolEncode(this); } else { return RemotingSerializable.encode(this); } }public void makeCustomHeaderToNet() { if (this.customHeader != null) { // 通过反射获取到自定义header类里面的fields Field[] fields = getClazzFields(customHeader.getClass()); if (null == this.extFields) { this.extFields = new HashMap(); } // 对自定义header类的fields进行遍历 for (Field field : fields) { if (!Modifier.isStatic(field.getModifiers())) { String name = field.getName(); if (!name.startsWith("this")) { Object value = null; try { field.setAccessible(true); value = field.get(this.customHeader); } catch (Exception e) { log.error("Failed to access field [{}]", name, e); } // 自定义header这些fields都是放到ext fields里面去 if (value != null) { this.extFields.put(name, value.toString()); } } } } } }private Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) { // 如果说你要是自定义了一套header以后,你搞一个类,实现接口 // 然后在这个自定义头的类里,可以定义一堆的field,这些field就是你的自定义的头 Field[] field = CLASS_HASH_MAP.get(classHeader); if (field == null) { // 通过反射直接获取到你自定义类里的头fields拿出来 field = classHeader.getDeclaredFields(); synchronized (CLASS_HASH_MAP) { CLASS_HASH_MAP.put(classHeader, field); } } return field; }public static byte[] markProtocolType(int source, SerializeType type) { byte[] result = new byte[4]; result[0] = type.getCode(); // header length里一共是4个字节,第一个字节是序列化类型code result[1] = (byte) ((source >> 16) & 0xFF); // 第二个字节开始到第四个字节,一共是3个字节都是跟header length是有关系的 result[2] = (byte) ((source >> 8) & 0xFF); result[3] = (byte) (source & 0xFF); return result; }
其实自定义序列化就是搞一个byte数组,采用固定的显示进行构建。
如:第一个字节放请求类型,后面四个字节放消息体总长度,在后面发具体的消息体。消息体前面几位为header长度,后面为header消息体等等,通过固定排列的顺序进行构建,这样解析的时候我们就可以根据字节顺序来读取消息了。public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) { // 用json进行序列化其实是省力做法,效率是比较差的,序列化以后的数据格式是比较占用空间一些 // 常规做法是自己对RemotingCommand协议数据对象进行序列化 // 编码,对象 -> 字节数组 // String remark byte[] remarkBytes = null; int remarkLen = 0; if (cmd.getRemark() != null && cmd.getRemark().length() > 0) { remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8); remarkLen = remarkBytes.length; } // HashMap extFields // ext fields,是我们可能的自定义headers就在这里,把扩展头序列化为字节数组 byte[] extFieldsBytes = null; int extLen = 0; if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) { extFieldsBytes = mapSerialize(cmd.getExtFields()); extLen = extFieldsBytes.length; } // 计算出来消息头总长度 int totalLen = calTotalLen(remarkLen, extLen); ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen); // int code(~32767) headerBuffer.putShort((short) cmd.getCode()); // LanguageCode language headerBuffer.put(cmd.getLanguage().getCode()); // int version(~32767) headerBuffer.putShort((short) cmd.getVersion()); // int opaque headerBuffer.putInt(cmd.getOpaque()); // int flag headerBuffer.putInt(cmd.getFlag()); // String remark if (remarkBytes != null) { headerBuffer.putInt(remarkBytes.length); headerBuffer.put(remarkBytes); } else { headerBuffer.putInt(0); } // HashMap extFields; if (extFieldsBytes != null) { headerBuffer.putInt(extFieldsBytes.length); headerBuffer.put(extFieldsBytes); } else { headerBuffer.putInt(0); } return headerBuffer.array(); }3、反序列化
org.apache.rocketmq.remoting.netty.NettyDecoder#decodepublic Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = null; try { frame = (ByteBuf) super.decode(ctx, in); if (null == frame) { return null; } ByteBuffer byteBuffer = frame.nioBuffer(); return RemotingCommand.decode(byteBuffer); } catch (Exception e) { log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); RemotingUtil.closeChannel(ctx.channel()); } finally { if (null != frame) { frame.release(); } } return null; }public static RemotingCommand decode(final ByteBuffer byteBuffer) throws RemotingCommandException { // 解码的过程就是编码过程的逆向过程 int length = byteBuffer.limit(); // 总长度 int oriHeaderLen = byteBuffer.getInt(); // 头长度 int headerLength = getHeaderLength(oriHeaderLen); // 搞一个头长度的字节数组,一次性把headers都读出来放到字节数组里去 byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); // 对header要做一个解码 RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; }
这里判断header是用什么协议进行序列化的,就会使用什么协议进行反序列化;private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) throws RemotingCommandException { switch (type) { case JSON: RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class); resultJson.setSerializeTypeCurrentRPC(type); return resultJson; case ROCKETMQ: RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData); resultRMQ.setSerializeTypeCurrentRPC(type); return resultRMQ; default: break; } return null; }
我们之间看rocketMQ自定义的协议吧,其实就是一个逆向的过程,你之前放的什么,他就根据字节拿出来;public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) throws RemotingCommandException { RemotingCommand cmd = new RemotingCommand(); ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray); // int code(~32767) cmd.setCode(headerBuffer.getShort()); // LanguageCode language cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get())); // int version(~32767) cmd.setVersion(headerBuffer.getShort()); // int opaque cmd.setOpaque(headerBuffer.getInt()); // int flag cmd.setFlag(headerBuffer.getInt()); // String remark int remarkLength = headerBuffer.getInt(); if (remarkLength > 0) { if (remarkLength > headerArray.length) { throw new RemotingCommandException("RocketMQ protocol decoding failed, remark length: " + remarkLength + ", but header length: " + headerArray.length); } byte[] remarkContent = new byte[remarkLength]; headerBuffer.get(remarkContent); cmd.setRemark(new String(remarkContent, CHARSET_UTF8)); } // HashMap extFields int extFieldsLength = headerBuffer.getInt(); if (extFieldsLength > 0) { if (extFieldsLength > headerArray.length) { throw new RemotingCommandException("RocketMQ protocol decoding failed, extFields length: " + extFieldsLength + ", but header length: " + headerArray.length); } byte[] extFieldsBytes = new byte[extFieldsLength]; headerBuffer.get(extFieldsBytes); cmd.setExtFields(mapDeserialize(extFieldsBytes)); } return cmd; }三、总结RemotingCommand为rocketMQ的序列化和反序列化的组件,所有消息都需要使用他进行处理;序列化和反序列化是根据约定的协议存放数据,再根据约定的协议取数据;
当年NBA的坏小子军团到底坏到了什么程度?可以令整个联盟闻风丧胆?只要说到8090年代,我又要炒回锅饭了,兄弟们别骂我。君临天下迈克尔乔丹封神之路中有专门的章节,写了浪费啊。(二十二)组团,坏孩子军团众生相198788赛季,青年军芝加哥公牛队冲劲
反转!艾克森向足协摊牌另有隐情?大家是否错怪艾神了?中国男足马上就要确定赴日本征战12强赛的名单了,在50多人的大名单中,最终要筛选出23名球员,再加上4到5人的备选人员。归化球员中除了蒋光泰和阿兰,其他球员都没有明确表示将要参赛,
怎样的搭配才不会很快被潮流淘汰?时尚就是不断的改变,找到适合自己的风格做个百变的女人。今天小柒给大家介绍几个时尚的风格穿搭。极简风格极简冷淡风穿搭,追求简洁有质感的穿搭风格,颜色素简却轻奢有品,极简风格永远都不会
劳保鞋有没有推荐的?卫尔盾透气,有钢头抗砸。纯绿色经典解放鞋啊还用犹豫的嘛1DDTX2代尔塔3鼎固4巴顿5奥朗DDTX和代尔塔的鞋比较贵,属于高端的,比较贵!劳保鞋的牌子有很多,其中上海青衫企业发展有
摩托大军往年火爆,今年却消失了,农民工返乡咋不骑摩托了?往年火爆的摩托大军,今年消失了,农民工返乡不骑摩托了,原因是多方面的,主要有一城市禁摩限电,不让骑摩托车,摩托车上不了高速,农民工都乘坐其他交通工具回家二疫情原因,人员流动受限,农
有一万块钱你愿意买电动车,还是摩托车呢?买通勤125的踏板车,油耗基本2。5个以内,算下来最多1。5毛一公里(指两桶油官方价目前都是7元多一升),一年开1万公里1500元,保养2000公里一次5次也就100多元,京东自己
中国汽车品牌平均售价排名曝光,谁是真的高价值品牌?高价值品牌不仅要以量产车售价判断近日网传的一张中国汽车品牌第一梯队品牌平均售价表的图片引起热议,上面有几个售价挺高的品牌。分别为高合,68万蔚来,43。47万理想,33。8万岚图,
北大仓和汾酒有什么区别?哪个更好?北大仓酒,是黑龙江北大仓集团有限公司出品的一款白酒,企业标准50度,这款酒是酱香型白酒,酒质微黄,有北国茅台之称,口感有麸曲的酱香,高梁酒的特殊味道,回味有香甜,不那么醇厚,略显单
武松真能吃下二斤牛肉喝下十八碗酒吗?说实在的,武松一顿饭才吃二斤牛肉,喝十八碗酒,跟古代的大胃王们一比,差得太远了!古人记载,有个叫申香的人,身高一丈八尺,人高马大,又特别能吃。他一顿饭能吃一石粮食,吃三十斤肉。根据
三一重工换帅,梁稳根为什么不选取儿子小梁?他儿子梁在中已经彻底退出三一重工的运营管理了,目前担任三湘银行的董事长,负责三一轻资产的运作。还有,三一重工是三一集团的子公司,梁稳根依旧是集团的董事长。机智机智机智不是不选,是暂
投资与投机的区别你知道吗?老矿工了,17年开始挖矿,经历了18年的矿难,14万一个月跌到1万!刚买的最新蚂蚁S9矿机31500,跌到一千多,下个月100多废铁价!从此不再挖矿!至今三年多过去了,去年11月朋