一个RocketMQ文件存储的简单实现
本文介绍了RocketMQ的文件存储原理,并且用MappedByteBuffer实现了一个简单的RocketMQ文件持久化和读取。本文适合希望进一部了解RocketMQ底层文件存储原理的开发者,学习本文需要对消息队列有一定的使用经验,对Java NIO文件读写有一定的了解。
主要内容:
1.RocketMQ文件简介
2.RocketMQ文件结构说明
3.MappedByteBuffer简介
4.最精简的RocketMQ文件存储实现(干货)1.RocketMQ文件简介
RocketMQ具有其强大的存储能力和强大的消息索引能力,从众多消息中间件产品中脱颖而出,其原理很值得学习。
RocketMQ存储用的是本地文件存储系统,效率高也可靠。存储文件主要分为CommitLog,ConsumeQueue,Index 三类文件。
CommitLog
消息存储文件,所有消息主题的消息都存储在 CommitLog 文件中。 CommitLog单个文件大小默认1G,文件文件名是起始偏移量,总共20位,左边补零,起始偏移量是0。
比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824
ConsumeQueue
消息消费的逻辑队列,作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。
IndexFile
消息索引文件,主要存储消息 Key 与 Offset 的对应关系。
消息消费队列是RocketMQ专门为消息订阅构建的索引文件,提高根据主题与消息队 列检索消息的速度
config文件夹
config 文件夹中 存储着Topic和Consumer等相关信息。主题和消费者群组相关的信息就存在在此。
topics.json : topic 配置属性
subscriptionGroup.json :消息消费组配置信息。
delayOffset.json :延时消息队列拉取进度。
consumerOffset.json :集群消费模式消息消进度。
consumerFilter.json :主题消息过滤信息。
几种文件的存储目录:
RocketMQ文件目录
2.RocketMQ文件结构说明
RocketMQ文件逻辑图
ConsumeQueue
ConsumeQueue 文件保存在 store 目录下的 consumequeue 目录中。
ConsumeQueue每条数据占20字节空间,包含三部分内容:消息的offset、消息大小size、tag的hashCode。单个ConsumeQueue文件最多保存30W条数据。8byte
(commitlogoffset)4byte
(msgLength)8byte
(tagCode)
一个topic会分成多个逻辑队列,每个逻辑队列对应一个ConsumeQueue文件,根据topic和queueId来组织文件,如果TopicA有两个队列0,1,那么TopicA和QueueId=0组成一个ConsumeQueue,TopicA和QueueId=1组成另一个ConsumeQueue。见RocketMQ文件逻辑图:
CommitLog
消息存放的物理文件,每台broker上的commitlog被本机所有的queue共享,不做任何区分。
文件的默认位置如下,仍然可通过配置文件修改:
${user.home} store${commitlog}${fileName}
CommitLog的消息存储单元长度不固定,文件顺序写,随机读。消息的存储结构如下表所示,按照编号顺序以及编号对应的内容依次存储。
message1
totalSize
queueId
queueOffset
PhysicalOffset
body
topic
其它
message2
totalSize
queueId
queueOffset
PhysicalOffset
body
topic
其它
message3
totalSize
queueId
queueOffset
PhysicalOffset
body
topic
其它
字段说明:
单commitLog优点:对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致IOWAIT增高。
缺点:写虽然完全是顺序写,但是读却变成了完全的随机读。读一条消息,会先读ConsumeQueue,再读CommitLog,增加了开销。要保证CommitLog与ConsumeQueue完全的一致,增加了编程的复杂度
Config的offsetTable.offset
和ConsumeQueue索引文件对应 ,这个offset是ConsumeQueue文件的(已经消费的)下标/行数,可以直接定位到ConsumeQueue并找到commitlogOffset从而找到消息体原文,
这个offset是消息消费进度的核心
{
"offsetTable":{
"zxp_test_topic@zxp_test_group2":{0:16,1:17,2:23,3:43
},
"TopicTest@please_rename_unique_group_name_4":{0:250,1:250,2:250,3:250
},
"%RETRY%zxp_test_group2@zxp_test_group2":{0:3
}
"order_topic@zxp_test_group3":{0:0,1:3,2:3,3:3
}
}
}
OffsetStore分为以下2种,分别存储在客户端和服务器端:
本地文件类型
BROADCASTING模式,各个Consumer没有互相干扰,使用LoclaFileOffsetStore,把Offset存储在Consumer本地,因为每条消息会被消费组内所有的消费者消费,同消费组的消费者相互独立,消费进度要单独存储,会以文本文件的形式存储在客户端,对应的数据结构为LocalFileOffsetStore
Broker代存储类型
在集群模式下,同一条消息只会被同一个消费组消费一次,消费进度会参与到负载均衡中,故消费进度是需要共享的,另外,消费者发生异常或重启为了保证可以从上一次消费的地方继续进行消费,这时的offset是统一保存到broker服务端的。对应的数据结构为RemoteBrokerOffsetStore。
IndexFile
用于为生成的索引文件提供访问服务,通过消息Key值查询消息真正的实体内容。在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引;
indexFile存放的位置:${rocketmq.home}/store/index/indexFile(年月日时分秒等组成文件名)
我们发送的消息体中,包含 Message Key 或 Unique Key ,那么就会给它们每一个都构建索引。根据消息 Key 计算 Hash 槽的位置根据 Hash 槽的数量和 Index 索引来计算 Index 条目的起始位置
将当前 Index 条目的索引值,写在 Hash 槽 absSlotPos 位置上;将 Index 条目的具体信息 (hashcode/消息偏移量/时间差值/hash槽的值) ,从起始偏移量 absIndexPos 开始,顺序按字节写入。
由于出现了多个偏移量的概念,所以我总结一下:CommitLog中的offset(消息体偏移量) 体现在commitlog文件名称中,对应这个CommitLog文件所有消息在整个topic的队列中起始偏移量(方便通过ConsumeQueue.commitlogOffset找到当前要消费的消息存在于哪个commitlog文件)ConsumeQueue中的commitlogOffset(消息体偏移量) 定位了当前这条消息在commitlog中的偏移量offsettable.offset(下标) 定位了当前已经消费的ConsumeQueue的下标是哪条消息
3.MappedByteBuffer简介
以前我们操作大文件都是用BufferedInputStream、BufferedOutputStream等带缓冲的IO流处理,但是针对大文件读写性能不理想。
MappedByteBuffer是Java提供的基于操作系统虚拟内存映射(MMAP)技术的文件读写API, 采用direct buffer的方式读写文件内容, 底层不再通过read、write、seek等系统调用实现文件的读写, 所以效率非常高。主要用于操作大文件, 如上百M、上GB的大文件。RocketMQ使用 MappedByteBuffer实现高性能的文件读写。
MMAP原理
一台服务器把本机磁盘文件的内容发送到客户端,一般分为两个步骤:read:读取本地文件内容;write:将读取的内容通过网络发送出去。
普通文件读写
这两个操作发生了两次系统调用,每次系统调用都得先从用户态切换到内核态,等内核完成任务后,再从内核态切换回用户态,也就是消息发送过程中一共发生了 4 次用户态与内核态的上下文切换。另外还 发生了 4 次数据拷贝,其中两次是 DMA 的拷贝,另外两次则是通过 CPU 拷贝的,分别是: DMA把数据从磁盘拷贝到内核态缓冲区;CPU把数据从内核态缓冲区拷贝到用户缓冲区;CPU把数据从用户缓冲区拷贝到内核的网络驱动的 socket 缓冲区;DMA把数据从网络驱动的 socket 缓冲区拷贝到网卡的缓冲区中。
mmap文件读写
系统调用函数在调用进程的虚拟地址空间中创建一个新 映射。这个映射会直接把内核缓冲区里的数据映射到用户空间,这样就不用从内核空间到用户空间来回复制数据了。
应用进程调用 mmap(),DMA 把数据从磁盘拷贝到内核缓冲区里;
应用进程调用 write(),CPU直接将内核缓冲区的数据拷贝到 socket 缓冲区中;
DMA把数据从内核的 socket 缓冲区拷贝到网卡的缓冲区里。
通过上面的分析,我们可以发现,比起原始版本,mmap + write 的方式依然需要4 次用户态与内核态的上下文切换,但是少了一次内存拷贝。
代码示例:public static void read() throws IOException { try (RandomAccessFile file = new RandomAccessFile(new File("test.txt"), "r")) { //get Channel FileChannel fileChannel = file.getChannel(); //get mappedByteBuffer from fileChannel MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size()); // check buffer LOG.info("is Loaded in physical memory: {}",buffer.isLoaded()); //只是一个提醒而不是guarantee LOG.info("capacity {}",buffer.capacity()); //read the buffer for (int i = 0; i < buffer.limit(); i++) { LOG.info("get {}", buffer.get()); } } } public static void writeWithMap() throws IOException { try (RandomAccessFile file = new RandomAccessFile(new File("a.txt"), "rw")) { //get Channel FileChannel fileChannel = file.getChannel(); //get mappedByteBuffer from fileChannel MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096 * 8 ); // check buffer LOG.info("is Loaded in physical memory: {}",buffer.isLoaded()); //只是一个提醒而不是guarantee LOG.info("capacity {}",buffer.capacity()); //write the content buffer.put("dhy".getBytes()); } }
FileChannel的map方法有三个参数:MapMode:映射模式,可取值有READ_ONLY(只读映射)、READ_WRITE(读写映射)、PRIVATE(私有映射),READ_ONLY只支持读,READ_WRITE支持读写,而PRIVATE只支持在内存中修改,不会写回磁盘;position和size:映射区域,可以是整个文件,也可以是文件的某一部分,单位为字节。4.最精简的RocketMQ文件存储实现(干货)
1.简单索引文件读写
模拟conusmQueue创建10个索引,长度固定20,保存到文件。public class FileWrite { public static void main(String[] args) throws IOException { FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/111.txt")), StandardOpenOption.WRITE, StandardOpenOption.READ); MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096); fileChannel.close(); for(int i =0;i<10;i++){ mappedByteBuffer.position(i*20); ByteBuffer b = ByteBuffer.allocate(20); b.putLong(100);//8byte(commitlog offset) b.putInt(1000);//4byte (msgLength) b.putLong(20);//8byte (tagCode) b.flip(); mappedByteBuffer.put(b); } mappedByteBuffer.force(); } } public class FileRead { public static void main(String[] args) throws IOException { FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/111.txt")), StandardOpenOption.WRITE, StandardOpenOption.READ); MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096); fileChannel.close(); for(int i =0;i<10;i++){ mappedByteBuffer.position(i*20); long commitlogOffset = mappedByteBuffer.getLong(); long msgLen = mappedByteBuffer.getInt(); long tagCode = mappedByteBuffer.getLong(); System.out.println("文件读取:commitlogOffset:"+commitlogOffset+",msgLen:"+msgLen+",tagCode:"+tagCode); } } }
运行结果:
2.基于consumeQueue和CommitLog的读写
手动创建100个消息体,存入commitLog,然后创建索引文件public class CommitLogWriteTest { private static Long commitLogOffset = 0L;//8byte(commitlog offset) private static Long lastTotalSize = 0L; public static void main(String[] args) throws IOException { List list = createCommitLog(); createConsumerQueue(list); } private static List createCommitLog() throws IOException { FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")), StandardOpenOption.WRITE, StandardOpenOption.READ); MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600); fileChannel.close(); List list = new ArrayList<>(); Random random = new Random(); int count = 0; for (int i = 0; i < 100; i++) { long commitLogOffset = lastTotalSize; String topic = "Topic-test"; String msgId = UUID.randomUUID().toString(); String msgBody = "消息内容" + "msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsg".substring(0, random.nextInt(48));// long queueOffset =i;//索引偏移量 String transactionId = UUID.randomUUID().toString(); /* 数据格式,位置固定 int totalSize;//消息长度 String msgId; String topic; long queueOffset;//索引偏移量 long bodySize;//消息长度 byte[] body;//消息内容 String transactionId; long commitLogOffset;//从第一个文件开始算的偏移量 */ int totalSize = 8 //totalSize长度 + 64 //msgId长度 + 64 //topic长度 + 8 //索引偏移量长度 + 8 //消息长度长度 + msgBody.getBytes(StandardCharsets.UTF_8).length //消息内容长度 + 64 //transactionId长度 + 64 //commitLogOffset长度; ; ByteBuffer b = ByteBuffer.allocate(totalSize); // //如果3个消息长度分别是100,200,350,则偏移量分别是0,100,300 mappedByteBuffer.position(Integer.valueOf(commitLogOffset+"")); b.putLong(totalSize);//totalSize b.put(getBytes(msgId, 64));//msgId b.put(getBytes(topic, 64));//topic,定长64 b.putLong(queueOffset);//索引偏移量 b.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySize b.put(msgBody.getBytes(StandardCharsets.UTF_8));//body b.put(getBytes(transactionId, 64)); b.putLong(commitLogOffset);//bodySize b.flip(); mappedByteBuffer.put(b); allTotalSize = totalSize + allTotalSize; System.out.println("写入消息,第:" + i + "次"); System.out.println("totalSize:" + totalSize); System.out.println("msgId:" + msgId); System.out.println("topic:" + topic); System.out.println("msgBody:" + msgBody); System.out.println("transactionId:" + transactionId); System.out.println("commitLogOffset:" + commitLogOffset); ConsumerQueueData consumerQueueData = new ConsumerQueueData(); consumerQueueData.setOffset(commitLogOffset); consumerQueueData.setMsgLength(totalSize); consumerQueueData.setTagCode(100L); list.add(consumerQueueData); count ++; } mappedByteBuffer.force(); System.out.println("commitLog数据保存完成,totalSize:" + count); return list; } private static void createConsumerQueue(List list) throws IOException { FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/consumerQueue.txt")), StandardOpenOption.WRITE, StandardOpenOption.READ); MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096); fileChannel.close(); int count = 0; for (int i = 0; i < list.size(); i++) { ConsumerQueueData consumerQueueData = list.get(i); mappedByteBuffer.position(i * 20); ByteBuffer b = ByteBuffer.allocate(20); b.putLong(consumerQueueData.getOffset());//8byte(commitlog offset) b.putInt(consumerQueueData.getMsgLength());//4byte (msgLength) b.putLong(consumerQueueData.getTagCode());//8byte (tagCode) b.flip();//很重要,使读指针从头开始 mappedByteBuffer.put(b); count++; System.out.println("createConsumerQueue:" + JSON.toJSONString(consumerQueueData)); } System.out.println("ConsumerQueue数据保存完成count:" + count); mappedByteBuffer.force(); } //将变长字符串定长byte[],方便读取 private static byte[] getBytes(String s, int length) { int fixLength = length - s.getBytes().length; if (s.getBytes().length < length) { byte[] S_bytes = new byte[length]; System.arraycopy(s.getBytes(), 0, S_bytes, 0, s.getBytes().length); for (int x = length - fixLength; x < length; x++) { S_bytes[x] = 0x00; } return S_bytes; } return s.getBytes(StandardCharsets.UTF_8); } }
运行结果:(数据有100条,没展示全部)
读取索引文件,然后根据偏移量在commitLog文件中读取消息public class CommitLogReadTest { static FileChannel commitLogfileChannel = null; public static void main(String[] args) throws IOException { FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/consumerQueue.txt")), StandardOpenOption.WRITE, StandardOpenOption.READ); MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096); fileChannel.close(); int index = 0 ; for(int i =index;i<100;i++){ //根据索引下标读取索引,实际情况是用户消费的最新点位,存在在broker的偏移量文件中 mappedByteBuffer.position(i*20); long commitlogOffset = mappedByteBuffer.getLong(); // System.out.println(commitlogOffset); long msgLen = mappedByteBuffer.getInt(); Long tag = mappedByteBuffer.getLong(); //System.out.println("======读取到consumerQueue,commitlogOffset:"+commitlogOffset+",msgLen :"+msgLen+"==="); //根据偏移量读取CcommitLog readCommitLog(Integer.valueOf(commitlogOffset+"")); } } public static MappedByteBuffer initFileChannel() throws IOException { MappedByteBuffer mappedByteBuffer = null; if(mappedByteBuffer == null){ commitLogfileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")), StandardOpenOption.WRITE, StandardOpenOption.READ); mappedByteBuffer = commitLogfileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600); commitLogfileChannel.close(); } return mappedByteBuffer; } /* * * 根据偏移量读取CommitLog * */ public static void readCommitLog(int offset) throws IOException { /*写入顺序,读的时候也按这个顺序读取 b.putLong(totalSize);//totalSize b.put(getBytes(msgId, 64));//msgId b.put(getBytes(topic, 64));//topic,定长64 b.putLong(queueOffset);//索引偏移量 b.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySize b.put(msgBody.getBytes(StandardCharsets.UTF_8));//body b.put(getBytes(transactionId, 64)); b.putLong(commitLogOffset);//commitLogOffset */ System.out.println("=================commitlog读取偏移量为"+offset+"的消息==================="); MappedByteBuffer mappedByteBuffer = initFileChannel(); //很重要,按偏移量读取文件,入参为索引文件记录的偏移量 mappedByteBuffer.position(offset); long totalSize = mappedByteBuffer.getLong();//消息长度 byte[] msgIdByte = new byte[64];//uuid 固定是64 mappedByteBuffer.get(msgIdByte); byte[] topicByte = new byte[64];// 固定是64 mappedByteBuffer.get(topicByte); long queueOffset = mappedByteBuffer.getLong(); Long bodySize = mappedByteBuffer.getLong(); byte[] bodyByte = new byte[Integer.parseInt(bodySize+"")];//bodySize 长度不固定 mappedByteBuffer.get(bodyByte); byte[] transactionIdByte = new byte[64];//uuid 固定是64 mappedByteBuffer.get(transactionIdByte); long commitLogOffset = mappedByteBuffer.getLong();//偏移量 System.out.println("totalSize:"+totalSize); System.out.println("msgId:"+new String(msgIdByte)); System.out.println("topic:"+new String(topicByte)); System.out.println("queueOffset:"+queueOffset); System.out.println("bodySize:"+bodySize); System.out.println("body:"+new String(bodyByte)); System.out.println("transactionId:"+new String(transactionIdByte)); System.out.println("commitLogOffset:"+commitLogOffset); } }
运行结果:(数据有100条,没展示全部)
总结:
本文介绍了RocketMQ的文件存储基本原理,并基于Java NIO的MappedByteBuffer实现了对RocketMQ的存储文件CommotLog,索引文件ConsumeQueue的写入,以及按索引下标读取CommotLog的,希望能加深大家对RocketMQ文件存储的理解。
八月十五中秋节,推荐6道大菜,上桌有面子,大人小孩抢着吃人是故乡亲,月是故乡明。转眼间,就将迎来中国四大传统节日之一的中秋节。今年的9月10日,是八月十五中秋节,也是自古以来象征月圆人团圆的日子。在中秋习俗里,有一样非常重要的老传统,就
外酥里嫩的椒盐虾仁一口一个停不下来外酥里嫩的椒盐虾仁一口一个停不下来,每次去饭店必点椒盐系列的菜,偏爱这个味道今天在家做了椒盐虾仁,外酥里嫩,太好吃啦!不用剥皮,一口一个太过瘾了食材虾仁250g玉米淀粉2大勺少许葱
来到甘肃旅行,以下6个美食不容错过,每一个都让人馋涎欲滴引导语西北地区有着十分独特的自然风光,对于南方人来说是具备了独特的魅力的,在西北地区能够享受到的不仅仅是来自于大自然的馈赠,在这里更是有着其他地区所没有的独特美食,如果要去西北地区
去陕西旅游,这6种特色美食一定要尝尝,外地有钱也难吃到说到陕西,人们脑海中浮现的或许是兵马俑。李白这样写道长安白日照春空,绿杨结烟垂袅风。诗人笔下的陕西让人充满了无数遐想,让人不由得回想起在历史长河中历经风雨的古城。让陕西在全国火出圈
项羽本纪项籍者,下相人也,字羽。初起时,年二十四。其季父季父叔父。项梁,梁父即楚将项燕,为秦将王翦所戮者也。项氏世世为楚将,封于项,故姓项氏。项籍少时,学书不成,去学剑,又不成。项梁怒之。
话说三国刘备一hr编过草鞋吃过苦,打过曹魏揍过吴,终是皇叔负了蜀,接着奏乐接着舞!有同学说刘备无能,取荆州不行,娶孙尚香可以,他的江山都是哭来的睡来的,是白嫖玩家的典范,靠着收买人心的权术把戏
刘伯承要为李学先做特制鞋毛主席我曾请你到枣园吃饭老红军李学先的故事(9)难忘的国庆阅兵式李学先我出生在雇农家庭,从小随父母亲逃荒讨饭,稍大一点就去地主庄园打工,受尽了折磨。一九二九年我参加了红军,决心为穷人打天下,经历过土地革命
西南大混战,细说川滇黔地区的权力更替四川军阀派系之多,战争之频,为祸之烈,政局之乱,即使在北洋时期也是全国少见。袁世凯死去以后,四川饱受几乎连绵不断的战争之苦,吸毒成风,经济混乱,省级政府腐败无能,形同虚设。变化无常
杨虎城将军陵园著名爱国将领杨虎城将军陵园建于长安区韦曲镇东1公里处,背依凤栖原,与杜公祠牛头寺毗邻,环境优美,风景秀丽。1936年西北军将领杨虎城将军与东北军将领张学良将军共同发动西安事变,对停
看到司马南的际遇,不由想起三个人第一个李白大家都知道,出事前司马南参与了某管弦交响乐团的演出,在其中,司马南饰演李白吟诵了一首将进酒,其间,竟涕泪交加。马粉们笑嘻嘻,这司马南表演真是很到位啊。其实,哪里是表演,是
春秋列国皆有谏臣,谁能左右君王的意见?春秋战国时代有很多谏臣,比如赵国的触龙齐国的邹忌楚国的鬻拳。还有君王身边的很多臣子,大多都充当过谏臣之职,比如秦国的蹇叔齐国的管仲楚国的屈原等等。这些谏臣大多都是胆大的,他们最担心