type
status
date
slug
summary
tags
category
password
1、概述
RocketMQ 的存储设计非常精妙,它采用了 “CommitLog + 索引队列(ConsumeQueue/IndexFile)” 的结构。
存储组件 | 核心职责 | 物理路径示例 | 对应关系 |
CommitLog | 存储所有消息的真实内容(消息体+元数据),是消息的唯一物理存储。 | ../store/commitlog/ | 所有 Topic 的所有 Queue 的消息都混合写入同一个 CommitLog。 |
ConsumeQueue | 作为逻辑索引,存储指定Topic下特定消息队列(Queue)中消息在 CommitLog 的定位信息,提供快速随机读,方便消费者按队列拉取消息。 | ../store/consumequeue/{TopicName}/{QueueId}/ | 每个 Topic 的每个 QueueId 都有自己独立的 ConsumeQueue 文件 |
IndexFile | 提供基于消息Key(Key或Message ID) 或时间区间查询消息的能力。 | ../store/index/ | ㅤ |
辅助文件 | 用于维护 Broker 状态和元数据。
- checkpoint:记录CommitLog、ConsumeQueue、IndexFile最后一次刷盘的时间戳。
- abort:Broker启动时创建,正常关闭时删除。若异常关闭,此文件会保留,用于故障恢复判断。 | ../store/ | ㅤ |
一个完整的目录结构如下所示:

- abort:该文件在 Broker 启动后会自动创建,正常关闭Broker,该文件会自动消失。若在没有启动 Broker 的情况下,发现这个文件是存在的,则说明之前 Broker 的关闭是非正常关闭。
- checkpoint:存储 CommitLog、ConsumeQueue、IndexFile 的最后刷盘时间戳
- commitlog:存放 CommitLog 文件,即存放所有物理消息。
- config:存放 Broker 运行期间的一些配置数据
- consumequeue:存放 ConsumeQueue 文件
- index:存放 IndexFile 文件
- lock:运行期间使用到的全局资源锁
2、CommitLog
CommitLog 是 RocketMQ 的核心存储文件,它存储了所有主题、所有队列的原始消息内容。CommitLog 的特点:
- 物理位置位于
${ROCKETMQ_HOME}/store/commitlog/
目录下。
- CommitLog 的存储其实是分多层的,
CommitLog -> MappedFileQueue -> MappedFile
,其中真正存储数据的是MappedFile。CommitLog 目录中存放着很多的 MappedFile 文件,当前 Broker 中的所有消息都是落盘到这些MappedFile 文件中的。
- 所有消息都严格按照顺序追加(Append) 写入当前活跃的 MappedFile 文件。每个文件默认固定为 1GB(可配置,
mapedFileSizeCommitLog
),写满后自动生成一个新的文件。
- 文件名由 20 位十进制数构成,是当前文件的第一条消息的起始位移偏移量。
- 单个 Broker 实例下的所有 Queue 共用一个日志数据文件 CommitLog,也就是说所有 Topic 的所有 Queue 的消息都混合写入同一个 CommitLog。
为什么不采用 Kafka 的设计,不同的 Partition 分别存储在一个独立的物理文件呢?
在 Kafka 的设计中,一旦 Kafka 中 Topic 的 Partition 数量过多,队列文件会过多,那么会给磁盘的 IO 读写造成比较大的压力,也就造成了性能瓶颈。所以 RocketMQ 进行了优化,消息主题统一存储在 CommitLog 中。当然它也有它的优缺点。
- 优点:由于消息主题都是通过 CommitLog 来进行读写,ConsumerQueue 中只存储很少的数据,所以队列更加轻量化。对于磁盘的访问是串行化从而避免了磁盘的竞争。
- 缺点:消息写入磁盘虽然是基于顺序写,但是读的过程确实是随机的。读取一条消息会先读取 ConsumeQueue,再读 CommitLog,会降低消息读的效率。
2.1 CommitLog的存储格式
每条消息在 CommitLog 中不仅存储了业务发送的 Body,还包含 RocketMQ 系统所需的属性。其结构大致如下:
字段 | 长度 | 说明 |
消息总长度 | 4字节 | 整个消息的长度 |
Magic Code | 4字节 | 固定值,用于标识消息版本等 |
Body CRC | 4字节 | 消息体的 CRC 校验码,用于检测数据损坏 |
队列ID | 4字节 | 消息所属的队列 ID |
主题长度 | 1字节 | 主题名称的长度 |
主题 | 可变 | 主题名称 |
消息Flag | 4字节 | 系统标志,如是否压缩、事务消息等 |
扩展属性长度 | 2字节 | 扩展属性(Properties)的长度 |
扩展属性 | 可变 | 键值对形式的消息属性,如 KEYS, TAGS, WAIT 等 |
消息体长度 | 4字节 | 消息实际内容(Body)的长度 |
消息体 | 可变 | 生产者发送的原始消息内容 |
2.2 CommitLog的清理
Rocketmq 清理消息是以 CommitLog 文件为单位进行清理的,除了用户手动清理外,在以下情况下也会被自动清理,无论文件中的消息是否被消费过:
- CommitLog 文件过期(默认过期时间为 72 小时),且到达清理时间点(默认为凌晨 4 点)后,自动清理过期文件。
- CommitLog 文件过期,且磁盘空间占用率已达过期清理警戒线(默认 75%)后,无论是否达到清理时间点,都会自动清理过期文件。
- 磁盘占用率达到清理警戒线(默认 85%)后,开始按照设定好的规则清理文件,无论是否过期。默认会从最老的文件开始清理。
- 磁盘占用率达到系统危险警戒线(默认 90%)后,Broker 将拒绝消息写入。
3、ConsumeQueue
ConsumeQueue 是 CommitLog 的逻辑索引文件。每个 Topic 的每个 Queue 都会单独维护一个 ConsumeQueue 文件。因为所有 Topic 的所有 Queue 都会混合写入同一个 CommitLog 文件,ConsumeQueue 的作用就是提供快速随机读,在 CommitLog 文件中快速定位到消息。
ConsumeQueue 的工作流程:消费者拉取消息时,先查询 ConsumeQueue 这个“轻量级索引”,得到消息在 CommitLog 中的物理位置,然后通过
mmap
的方式直接从 CommitLog 中批量读取消息内容,极大地提升了消费端的读取效率。ConsumeQueue 的特点:
- 物理文件:位于
${ROCKETMQ_HOME}/store/consumequeue/{Topic}/{QueueId}/
目录下。
- 文件大小:每个文件约包含 30万个条目,固定大小(约5.72MB)。
- 异步构建:ConsumeQueue 中的数据是由后台线程 ReputMessageService 异步地从 CommitLog 中提取并构建的。
3.1 ConsumeQueue的索引格式
ConsumeQueue 中的每个条目非常精简,固定为 20个字节,包含三个关键信息:
字段 | 长度 | 说明 |
CommitLog Offset | 8字节 | 该消息在 CommitLog 文件中的起始物理偏移量 |
Size | 4字节 | 该消息在 CommitLog 中占用的总字节数 |
Message Tag HashCode | 8字节 | 消息 Tag 的哈希值,用于 Tag 过滤 |
3.2 Offset的管理
RocketMQ 中消息 ConsumerOffset 用于表示消费者的消费进度,根据消费模式的不同,ConsumerOffset 也有不同的存储方式:
- 集群模式(默认):
- 管理方:Broker 集中管理和存储
- 存储格式:Offset 相关数据以 JSON 的形式持久化到 Broker 磁盘文件中,文件路径为当前用户主目录下的
store/config/consumerOffset.json
。
- 广播模式:
- 管理方:Consumer 本地维护一份 Offset
- 存储格式:Offset 相关数据以 JSON 的形式持久化到Consumer本地磁盘文件中,默认文件路径为当前用户主目录下的
.rocketmq_offsets/clientId/{clientId}/clientId/{group}/Offsets.json
。
3.3 基于Offset的查找过程(消费者拉取消息)
- 消费者向 Broker 发送拉取请求,携带
Topic
、QueueId
、ConsumerOffset
。
- Broker 根据这些信息找到对应的 ConsumeQueue 文件。
- 从
ConsumerOffset
位置开始,读取一批(如 32条)20字节的条目。ConsumeQueue 的索引 Key 是消费位点Offset,是个从 1 开始递增的数字,通过位点对 ConsumeQueue 文件大小(30万)取余,可以定位到指定的 ConsumeQueue 文件以及在该文件中的数据块。因为数据块大小固定,所以可以算出对应的数据块在 CommitLog 文件中的偏移量CommitLog Offset
。
- 根据条目中的
CommitLog Offset
和Size
,到 CommitLog 中批量读取完整的消息内容。
- 将完整的消息返回给消费者。
4、IndexFile
IndexFile 提供了另一种消息索引方式——基于 Key 的查询。主要用于 RocketMQ 控制台的消息查询功能。
IndexFile 的特点:
- 物理文件:位于
${ROCKETMQ_HOME}/store/index/
目录下,文件名以创建时间戳命名。
- 文件大小:每个文件大小固定(约400MB)。
- 存储结构:采用 “哈希索引 + 文件顺序写” 的混合结构,类似 LevelDB 的 SSTable。
- 支持按 Message Key/UniqKey 查询:这是 ConsumeQueue 无法直接提供的功能。生产者在发送消息时可以设置
KEYS
属性(如订单ID),后续可以通过这个 Key 快速定位消息。
- 查询效率高:通过哈希计算直接定位到哈希槽,再遍历短链表,可以快速找到目标消息的 CommitLog 偏移量。
4.1 IndexFile的存储格式
IndexFile 结构更复杂,主要包含三部分:
- Header(文件头):存储一些元数据,如索引消息的起始/结束时间、起始/结束偏移量等。
- Hash Slot(哈希槽):固定数量的槽(默认5百万个),每个槽存放一个指针,指向该哈希值对应的索引条目链表的头部。
- Index Entry(索引条目):存储具体的索引信息,每个条目包含:
keyHash
: 消息 Key(或 Topic + “#” + UniqKey)的哈希值。phyOffset
: 消息在 CommitLog 中的物理偏移量。timeDiff
: 该消息存储时间与文件头起始时间的差值。prevIndex
: 同一个哈希槽中,前一个索引条目的位置。用于解决哈希冲突,形成链表。

4.2 工作流程(按 Key 查询)
- 计算查询 Key 的哈希值。
- 对哈希槽数量取模,找到对应的 Hash Slot。
- 读取该 Slot 中的值,获得第一个 Index Entry 的位置。
- 遍历该位置的 Index Entry 链表,比较
keyHash
是否匹配。
- 匹配成功后,根据
phyOffset
去 CommitLog 读取完整消息。
- Author:mcbilla
- URL:http://mcbilla.com/article/27785c7d-7c1d-80af-a914-cef07e73479e
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!