Broker的功能点很多,安装程序启动的顺序去看源码,发现代码量比之前的组件要大很多。阅读过程中发现Broker会去持久化一些配置,并且会将消息数据存储在磁盘上。
整理和检索了网上的一些资料,列出了这些文件和相应的作用,如下。
- store
- commitlog
- 000000000
- xxxxxxxxxx
- compaction
- compactionLog
- {topic}
- 0
- 1
- …
- {queueId}
- {topic}
- compactionCq
- {topic}
- 0
- 1
- …
- {queueId}
- {topic}
- compactionLog
- config
- delayOffset.json
- broker.properties
- topics.json
- topicQueueMapping.json
- consumerOffset.json
- lmqConsumerOffset.json
- consumerOrderInfo.json
- subscriptionGroup.json
- timercheck
- timermetrics
- consumerFilter.json
- messageRequestMode.json
- tieredStoreMetadata.json
- consumequeue
- {topic}
- 0
- 00000000000000000000
- 1
- …
- {queueId}
- 0
- {topic}
- index
- 20240305101010000
- abort
- checkpoint
- lock
- timerwheel
- timerlog
- 00000000000000000000
- commitlog
CommitLog
该目录下存储了消息内容文件,默认每个文件1GB大小。目录下会包含多个文件,相邻的两个文件名称的差值正好为1GB。
文件格式:
序号 | 名称 | 长度 | 描述 |
---|---|---|---|
1 | totalSize | int | 消息总字节数 |
2 | magicCode | int | 用于标记消息协议的版本 |
3 | bodyCRC | int | crc校验码,用于校验消息内容是否正确 |
4 | queueId | int | 所属queueId |
5 | flag | int | 消息的类型标记 |
6 | queueOffset | long | |
7 | physicOffset | long | |
8 | sysFlag | int | 标记消息类型 |
9 | bornTimeStamp | long | 消息产生的日期 |
10 | bornHost | 8字节或者20字节 | ipv4或者ipv6的差别 |
11 | storeTimestamp | long | 存储消息的时间戳,在Broker收到消息的时候赋值 |
12 | storeHostAddress | 8字节或者20字节 | 存储消息的Broker进程地址 |
13 | reconsumeTimes | int | 被重复消费的次数 |
14 | preparedTransactionOffset | long | |
15 | bodyLen | int | 消息内容长度 |
16 | body | bodyLen | 消息内容 |
17 | topic | 长度取决于从magicCode中获取的topic长度 | |
18 | propertiesLength | short | properties内容的长度 |
ConsumeQueue
每个Topic存在n(1..n)个数量的Queue,每个Queue对应一个ConsumQueue文件。该文件用于记录属于某个Topic的消息在CommitLog中的偏移量。
/**
* ConsumeQueue's store unit. Format:
* <pre>
* ┌───────────────────────────────┬───────────────────┬───────────────────────────────┐
* │ CommitLog Physical Offset │ Body Size │ Tag HashCode │
* │ (8 Bytes) │ (4 Bytes) │ (8 Bytes) │
* ├───────────────────────────────┴───────────────────┴───────────────────────────────┤
* │ Store Unit │
* │ │
* </pre>
* ConsumeQueue's store unit. Size: CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) = 20 Bytes
*/
CommitLog Physical Offset记录了消息在CommitLog中全局的offset,Body Size为消息的总长度。
ConsumeQueue文件是由DefaultMessageStore内部类ReputMessageService来完成的,该服务是一个定时任务,每隔1ms执行一次。主要看doReput函数做了哪些事情:
- 读取CommitLog文件内容,然后构建ConsumeQueue文件、Index、compact文件doDisPatch函数会调用如下三个dispatcher,完成构建文件的任务
1
DefaultMessageStore.this.doDispatch(dispatchRequest)
对于非事务消息和事务消息的提交消息,写入到ConsumeQueue文件,查看CommitLogDispatcherBuildConsumeQueue代码1
2
3
4
5
6
7
8this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); // 1
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); // 2
if (messageStoreConfig.isEnableCompaction()) {
this.compactionStore = new CompactionStore(this);
this.compactionService = new CompactionService(commitLog, this, compactionStore);
this.dispatcherList.addLast(new CommitLogDispatcherCompaction(compactionService));//3
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
与ComsumeQueue文件相关的还有consumequeue_ext文件,这个文件存放了一些扩展信息,查看CosumeQueue#putMessagePositionInfoWrapper(DispatchRequest request)函数的相关代码。如下:
1 | ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); |
- 通知客户端有新消息
1
2
3
4
5DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
notifyMessageArrive4MultiQueue(dispatchRequest);
Index
携带key的消息会构建索引文件,用于按照key来进行消息检索。查看CommitLogDispatcherBuildIndex类,最大单个文件2kw条索引数据。
每个Index文件的格式如下:
IndexHeader(40bytes) + Slots(固定500w个 * 4bytes) + IndexItems(最多2000w个 * 20bytes)
IndexHeader格式:
Begin TimeStamp(8bytes) + End TimeStamp(8bytes) + Begin Physical Offset(8bytes) + End Physical Offset(8bytes) + Hash Slot Count(4bytes) + Index Count(4bytes)
每个Slot存储当前分配到此Slot下最新的Index count,通过此数据可以计算出此slot下最新一条IndexItem的位置。
IndexItem的格式:
Key HashCode(4bytes) + Physical Offset(8bytes) + Time Diff(4bytes) + Next Index Pos(4bytes)
Time Diff用于和Header中的开始和结束时间戳来比较,结合Key HashCode来判断索引是否命中数据。
Next Index Pos用于指向之前命中该Slot的IndexItem。
Compaction
该目录下的文件,有特殊的用途。更像是核心功能开发完成之后,以打补丁的方式加入的特殊功能。在不依赖于其他存储的情况喜爱,用来存储kv数据。
Compact的过程是将Commitlog的数据按照某个topic下的某个queue(也称为partition),对于数据按key的维度只保留最新的数据。当然ConsumeQueue文件也需要做相应的Compact。
可以阅读如下文章了解更多细节。
RocketMQ Compaction Topic的设计与实现
Config
文件名称 | 处理类 | 描述 |
---|---|---|
delayOffset.json | ScheduleMessageService | 延迟消息消费进度 |
broker.properties | BrokerController | Broker配置 |
topics.json | TopicConfigManager | 存储每个topic的读写队列数、权限、是否顺序等信息 |
topicQueueMapping.json | TopicQueueMappingManager | 当前Broker存储了哪些queue |
consumerOffset.json | ConsumerOffsetManager | 消费进度数据 |
lmqConsumerOffset.json | LmqConsumerOffsetManager | lmq模式下的消费进度数据 |
consumerOrderInfo.json | ConsumerOrderInfoManager | 维护了topic+group+queueId维度的顺序消费情况OrderInfo |
subscriptionGroup.json | SubscriptionGroupManager | SubscriptionGroup配置,订阅组配置内容,还有包含了一些内置的配置 |
timercheck | TimerCheckpoint | 存储时间轮消息当前的瞬时状态 |
timermetrics | TimerMetrics | 用于统计各个时间的时间轮消息的条数 |
consumerFilter.json | ConsumerFilterManager | 每个Topic的ConsumerGroup消费过滤规则 |
messageRequestMode.json | MessageRequestModeManager | 配置ConsumerGroup使用pop或pull模式消费 |
tieredStoreMetadata.json | TieredMetadataManager | 分级存储topic、queue以及消息存储文件的meta data |
abort
DefaultMessageStore在启动Broker,加载磁盘日志文件之前,用来判断上次是否是结束进程的。内容存储了上次启动Broker的pid。
checkpoint
存储了当前消息刷盘的offset,用于重启或者故障后的恢复。
数据名称 | 描述 |
---|---|
lastReadTimeMs | 上次消费的时间节点 |
lastTimerLogFlushPos | 最后刷新log的pos |
lastTimerQueueOffset | 最后一次消费的队列节点 |
masterTimerQueueOffset | 主 Broker 的队列消费节点 |
lock
防止本地启动多个以此目录为存储目录的Broker进程。
timerwheel
该文件存储的内容如下
序号 | 名称 | 长度 | 描述 |
---|---|---|---|
1 | delayTime | long | 延迟时间 |
2 | firstPos | long | 开始的位置 |
3 | lastPos | long | 结束的位置 |
4 | num | int | 消息的数量 |
5 | magic | int | 暂时未用 |
timerlog
主要逻辑都在TimeLog类中,下面为存储的日志格式
序号 | 名称 | 长度 | 描述 |
---|---|---|---|
1 | size | int | 日志单元固定长度 |
2 | pre pos | long | 上一条日志的pos |
3 | magic value | int | 标记消息处理逻辑delete(删除消息的消息)或者roll处理(消息延迟时间超过限定时间,需要做roll处理来进行展期) |
4 | curr write time | long | for trace写入时间轮的时间戳 |
5 | delayed time | int | for check |
6 | offsetPy | long | 在commitLog中的位置 |
7 | sizePy | int | 在commitLog中存储的字节数 |
8 | hash code of real topic | int | 消息后续投放的真正topic名称的hash值 |
9 | reserved value | long | 预留值,暂时没有作用 |
timerwheel, timerlog, checkpoint, config/timercheck, config/timermetrics都是时间轮调度消息用到的文件