[ RocketMQ源码阅读 6 ] Broker磁盘文件格式与作用

Broker的功能点很多,安装程序启动的顺序去看源码,发现代码量比之前的组件要大很多。阅读过程中发现Broker会去持久化一些配置,并且会将消息数据存储在磁盘上。
整理和检索了网上的一些资料,列出了这些文件和相应的作用,如下。

  • store
    • commitlog
      • 000000000
      • xxxxxxxxxx
    • compaction
      • compactionLog
        • {topic}
          • 0
          • 1
          • {queueId}
      • compactionCq
        • {topic}
          • 0
          • 1
          • {queueId}
    • config
      • delayOffset.json
      • broker.properties
      • topics.json
      • topicQueueMapping.json
      • consumerOffset.json
      • lmqConsumerOffset.json
      • consumerOrderInfo.json
      • subscriptionGroup.json
      • timercheck
      • timermetrics
      • consumerFilter.json
      • messageRequestMode.json
      • tieredStoreMetadata.json
    • consumequeue
      • {topic}
        • 0
          • 00000000000000000000
        • 1
        • {queueId}
    • index
      • 20240305101010000
    • abort
    • checkpoint
    • lock
    • timerwheel
    • timerlog
      • 00000000000000000000

CommitLog

该目录下存储了消息内容文件,默认每个文件1GB大小。目录下会包含多个文件,相邻的两个文件名称的差值正好为1GB。
文件格式:

序号 名称 长度 描述
1 totalSize int 消息总字节数
2 magicCode int 用于标记消息协议的版本
3 bodyCRC int crc校验码,用于校验消息内容是否正确
4 queueId int 所属queueId
5 flag int 消息的类型标记
6 queueOffset long
7 physicOffset long
8 sysFlag int 标记消息类型
9 bornTimeStamp long 消息产生的日期
10 bornHost 8字节或者20字节 ipv4或者ipv6的差别
11 storeTimestamp long 存储消息的时间戳,在Broker收到消息的时候赋值
12 storeHostAddress 8字节或者20字节 存储消息的Broker进程地址
13 reconsumeTimes int 被重复消费的次数
14 preparedTransactionOffset long
15 bodyLen int 消息内容长度
16 body bodyLen 消息内容
17 topic 长度取决于从magicCode中获取的topic长度
18 propertiesLength short properties内容的长度

ConsumeQueue

每个Topic存在n(1..n)个数量的Queue,每个Queue对应一个ConsumQueue文件。该文件用于记录属于某个Topic的消息在CommitLog中的偏移量。

/**
 * ConsumeQueue's store unit. Format:
 * <pre>
 * ┌───────────────────────────────┬───────────────────┬───────────────────────────────┐
 * │    CommitLog Physical Offset  │      Body Size    │            Tag HashCode       │
 * │          (8 Bytes)            │      (4 Bytes)    │             (8 Bytes)         │
 * ├───────────────────────────────┴───────────────────┴───────────────────────────────┤
 * │                                     Store Unit                                    │
 * │                                                                                   │
 * </pre>
 * ConsumeQueue's store unit. Size: CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) = 20 Bytes
 */

CommitLog Physical Offset记录了消息在CommitLog中全局的offset,Body Size为消息的总长度。

ConsumeQueue文件是由DefaultMessageStore内部类ReputMessageService来完成的,该服务是一个定时任务,每隔1ms执行一次。主要看doReput函数做了哪些事情:

  1. 读取CommitLog文件内容,然后构建ConsumeQueue文件、Index、compact文件
    1
    DefaultMessageStore.this.doDispatch(dispatchRequest)
    doDisPatch函数会调用如下三个dispatcher,完成构建文件的任务
    1
    2
    3
    4
    5
    6
    7
    8
    this.dispatcherList = new LinkedList<>();
    this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); // 1
    this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); // 2
    if (messageStoreConfig.isEnableCompaction()) {
    this.compactionStore = new CompactionStore(this);
    this.compactionService = new CompactionService(commitLog, this, compactionStore);
    this.dispatcherList.addLast(new CommitLogDispatcherCompaction(compactionService));//3
    }
    对于非事务消息和事务消息的提交消息,写入到ConsumeQueue文件,查看CommitLogDispatcherBuildConsumeQueue代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
    final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
    switch (tranType) {
    case MessageSysFlag.TRANSACTION_NOT_TYPE:
    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
    DefaultMessageStore.this.putMessagePositionInfo(request);
    break;
    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
    break;
    }
    }
    }

与ComsumeQueue文件相关的还有consumequeue_ext文件,这个文件存放了一些扩展信息,查看CosumeQueue#putMessagePositionInfoWrapper(DispatchRequest request)函数的相关代码。如下:

1
2
3
4
5
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());
long extAddr = this.consumeQueueExt.put(cqExtUnit); //写入文件
  1. 通知客户端有新消息
    1
    2
    3
    4
    5
    DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
    notifyMessageArrive4MultiQueue(dispatchRequest);

Index

携带key的消息会构建索引文件,用于按照key来进行消息检索。查看CommitLogDispatcherBuildIndex类,最大单个文件2kw条索引数据。
每个Index文件的格式如下:
IndexHeader(40bytes) + Slots(固定500w个 * 4bytes) + IndexItems(最多2000w个 * 20bytes)

IndexHeader格式:
Begin TimeStamp(8bytes) + End TimeStamp(8bytes) + Begin Physical Offset(8bytes) + End Physical Offset(8bytes) + Hash Slot Count(4bytes) + Index Count(4bytes)

每个Slot存储当前分配到此Slot下最新的Index count,通过此数据可以计算出此slot下最新一条IndexItem的位置。

IndexItem的格式:
Key HashCode(4bytes) + Physical Offset(8bytes) + Time Diff(4bytes) + Next Index Pos(4bytes)

Time Diff用于和Header中的开始和结束时间戳来比较,结合Key HashCode来判断索引是否命中数据。
Next Index Pos用于指向之前命中该Slot的IndexItem。

RocketMQ Index结构

Compaction

该目录下的文件,有特殊的用途。更像是核心功能开发完成之后,以打补丁的方式加入的特殊功能。在不依赖于其他存储的情况喜爱,用来存储kv数据。

Compact的过程是将Commitlog的数据按照某个topic下的某个queue(也称为partition),对于数据按key的维度只保留最新的数据。当然ConsumeQueue文件也需要做相应的Compact。

可以阅读如下文章了解更多细节。
RocketMQ Compaction Topic的设计与实现

Config

文件名称 处理类 描述
delayOffset.json ScheduleMessageService 延迟消息消费进度
broker.properties BrokerController Broker配置
topics.json TopicConfigManager 存储每个topic的读写队列数、权限、是否顺序等信息
topicQueueMapping.json TopicQueueMappingManager 当前Broker存储了哪些queue
consumerOffset.json ConsumerOffsetManager 消费进度数据
lmqConsumerOffset.json LmqConsumerOffsetManager lmq模式下的消费进度数据
consumerOrderInfo.json ConsumerOrderInfoManager 维护了topic+group+queueId维度的顺序消费情况OrderInfo
subscriptionGroup.json SubscriptionGroupManager SubscriptionGroup配置,订阅组配置内容,还有包含了一些内置的配置
timercheck TimerCheckpoint 存储时间轮消息当前的瞬时状态
timermetrics TimerMetrics 用于统计各个时间的时间轮消息的条数
consumerFilter.json ConsumerFilterManager 每个Topic的ConsumerGroup消费过滤规则
messageRequestMode.json MessageRequestModeManager 配置ConsumerGroup使用pop或pull模式消费
tieredStoreMetadata.json TieredMetadataManager 分级存储topic、queue以及消息存储文件的meta data

abort

DefaultMessageStore在启动Broker,加载磁盘日志文件之前,用来判断上次是否是结束进程的。内容存储了上次启动Broker的pid。

checkpoint

存储了当前消息刷盘的offset,用于重启或者故障后的恢复。

数据名称 描述
lastReadTimeMs 上次消费的时间节点
lastTimerLogFlushPos 最后刷新log的pos
lastTimerQueueOffset 最后一次消费的队列节点
masterTimerQueueOffset 主 Broker 的队列消费节点

lock

防止本地启动多个以此目录为存储目录的Broker进程。

timerwheel

该文件存储的内容如下

序号 名称 长度 描述
1 delayTime long 延迟时间
2 firstPos long 开始的位置
3 lastPos long 结束的位置
4 num int 消息的数量
5 magic int 暂时未用

timerlog

主要逻辑都在TimeLog类中,下面为存储的日志格式

序号 名称 长度 描述
1 size int 日志单元固定长度
2 pre pos long 上一条日志的pos
3 magic value int 标记消息处理逻辑delete(删除消息的消息)或者roll处理(消息延迟时间超过限定时间,需要做roll处理来进行展期)
4 curr write time long for trace写入时间轮的时间戳
5 delayed time int for check
6 offsetPy long 在commitLog中的位置
7 sizePy int 在commitLog中存储的字节数
8 hash code of real topic int 消息后续投放的真正topic名称的hash值
9 reserved value long 预留值,暂时没有作用

timerwheel, timerlog, checkpoint, config/timercheck, config/timermetrics都是时间轮调度消息用到的文件