Broker的功能点很多,安装程序启动的顺序去看源码,发现代码量比之前的组件要大很多。阅读过程中发现Broker会去持久化一些配置,并且会将消息数据存储在磁盘上。 整理和检索了网上的一些资料,列出了这些文件和相应的作用,如下。
store
commitlog
compaction
compactionLog
compactionCq
config
delayOffset.json
broker.properties
topics.json
topicQueueMapping.json
consumerOffset.json
lmqConsumerOffset.json
consumerOrderInfo.json
subscriptionGroup.json
timercheck
timermetrics
consumerFilter.json
messageRequestMode.json
tieredStoreMetadata.json
consumequeue
index
abort
checkpoint
lock
timerwheel
timerlog
CommitLog 该目录下存储了消息内容文件,默认每个文件1GB大小。目录下会包含多个文件,相邻的两个文件名称的差值正好为1GB。 文件格式:
序号
名称
长度
描述
1
totalSize
int
消息总字节数
2
magicCode
int
用于标记消息协议的版本
3
bodyCRC
int
crc校验码,用于校验消息内容是否正确
4
queueId
int
所属queueId
5
flag
int
消息的类型标记
6
queueOffset
long
7
physicOffset
long
8
sysFlag
int
标记消息类型
9
bornTimeStamp
long
消息产生的日期
10
bornHost
8字节或者20字节
ipv4或者ipv6的差别
11
storeTimestamp
long
存储消息的时间戳,在Broker收到消息的时候赋值
12
storeHostAddress
8字节或者20字节
存储消息的Broker进程地址
13
reconsumeTimes
int
被重复消费的次数
14
preparedTransactionOffset
long
15
bodyLen
int
消息内容长度
16
body
bodyLen
消息内容
17
topic
长度取决于从magicCode中获取的topic长度
18
propertiesLength
short
properties内容的长度
ConsumeQueue 每个Topic存在n(1..n)个数量的Queue,每个Queue对应一个ConsumQueue文件。该文件用于记录属于某个Topic的消息在CommitLog中的偏移量。
/**
* ConsumeQueue's store unit. Format:
* <pre>
* ┌───────────────────────────────┬───────────────────┬───────────────────────────────┐
* │ CommitLog Physical Offset │ Body Size │ Tag HashCode │
* │ (8 Bytes) │ (4 Bytes) │ (8 Bytes) │
* ├───────────────────────────────┴───────────────────┴───────────────────────────────┤
* │ Store Unit │
* │ │
* </pre>
* ConsumeQueue's store unit. Size: CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) = 20 Bytes
*/
CommitLog Physical Offset记录了消息在CommitLog中全局的offset,Body Size为消息的总长度。
ConsumeQueue文件是由DefaultMessageStore内部类ReputMessageService来完成的,该服务是一个定时任务,每隔1ms执行一次。主要看doReput函数做了哪些事情:
读取CommitLog文件内容,然后构建ConsumeQueue文件、Index、compact文件1 DefaultMessageStore.this.doDispatch(dispatchRequest)
doDisPatch函数会调用如下三个dispatcher,完成构建文件的任务1 2 3 4 5 6 7 8 this.dispatcherList = new LinkedList<>(); this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); // 1 this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); // 2 if (messageStoreConfig.isEnableCompaction()) { this.compactionStore = new CompactionStore(this); this.compactionService = new CompactionService(commitLog, this, compactionStore); this.dispatcherList.addLast(new CommitLogDispatcherCompaction(compactionService));//3 }
对于非事务消息和事务消息的提交消息,写入到ConsumeQueue文件,查看CommitLogDispatcherBuildConsumeQueue代码1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: DefaultMessageStore.this.putMessagePositionInfo(request); break; case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; } } }
与ComsumeQueue文件相关的还有consumequeue_ext文件,这个文件存放了一些扩展信息,查看CosumeQueue#putMessagePositionInfoWrapper(DispatchRequest request)函数的相关代码。如下:
1 2 3 4 5 ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); cqExtUnit.setFilterBitMap(request.getBitMap()); cqExtUnit.setMsgStoreTime(request.getStoreTimestamp()); cqExtUnit.setTagsCode(request.getTagsCode()); long extAddr = this.consumeQueueExt.put(cqExtUnit); //写入文件
通知客户端有新消息1 2 3 4 5 DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); notifyMessageArrive4MultiQueue(dispatchRequest);
Index 携带key的消息会构建索引文件,用于按照key来进行消息检索。查看CommitLogDispatcherBuildIndex类,最大单个文件2kw条索引数据。 每个Index文件的格式如下: IndexHeader(40bytes) + Slots(固定500w个 * 4bytes) + IndexItems(最多2000w个 * 20bytes)
IndexHeader格式: Begin TimeStamp(8bytes) + End TimeStamp(8bytes) + Begin Physical Offset(8bytes) + End Physical Offset(8bytes) + Hash Slot Count(4bytes) + Index Count(4bytes)
每个Slot存储当前分配到此Slot下最新的Index count ,通过此数据可以计算出此slot下最新一条IndexItem的位置。
IndexItem的格式: Key HashCode(4bytes) + Physical Offset(8bytes) + Time Diff(4bytes) + Next Index Pos(4bytes)
Time Diff用于和Header中的开始和结束时间戳来比较,结合Key HashCode来判断索引是否命中数据。 Next Index Pos用于指向之前命中该Slot的IndexItem。
Compaction 该目录下的文件,有特殊的用途。更像是核心功能开发完成之后,以打补丁的方式加入的特殊功能。在不依赖于其他存储的情况喜爱,用来存储kv数据。
Compact的过程是将Commitlog的数据按照某个topic下的某个queue(也称为partition),对于数据按key的维度只保留最新的数据。当然ConsumeQueue文件也需要做相应的Compact。
可以阅读如下文章了解更多细节。RocketMQ Compaction Topic的设计与实现
Config
文件名称
处理类
描述
delayOffset.json
ScheduleMessageService
延迟消息消费进度
broker.properties
BrokerController
Broker配置
topics.json
TopicConfigManager
存储每个topic的读写队列数、权限、是否顺序等信息
topicQueueMapping.json
TopicQueueMappingManager
当前Broker存储了哪些queue
consumerOffset.json
ConsumerOffsetManager
消费进度数据
lmqConsumerOffset.json
LmqConsumerOffsetManager
lmq模式下的消费进度数据
consumerOrderInfo.json
ConsumerOrderInfoManager
维护了topic+group+queueId维度的顺序消费情况OrderInfo
subscriptionGroup.json
SubscriptionGroupManager
SubscriptionGroup配置,订阅组配置内容,还有包含了一些内置的配置
timercheck
TimerCheckpoint
存储时间轮消息当前的瞬时状态
timermetrics
TimerMetrics
用于统计各个时间的时间轮消息的条数
consumerFilter.json
ConsumerFilterManager
每个Topic的ConsumerGroup消费过滤规则
messageRequestMode.json
MessageRequestModeManager
配置ConsumerGroup使用pop或pull模式消费
tieredStoreMetadata.json
TieredMetadataManager
分级存储topic、queue以及消息存储文件的meta data
abort DefaultMessageStore在启动Broker,加载磁盘日志文件之前,用来判断上次是否是结束进程的。内容存储了上次启动Broker的pid。
checkpoint 存储了当前消息刷盘的offset,用于重启或者故障后的恢复。
数据名称
描述
lastReadTimeMs
上次消费的时间节点
lastTimerLogFlushPos
最后刷新log的pos
lastTimerQueueOffset
最后一次消费的队列节点
masterTimerQueueOffset
主 Broker 的队列消费节点
lock 防止本地启动多个以此目录为存储目录的Broker进程。
timerwheel 该文件存储的内容如下
序号
名称
长度
描述
1
delayTime
long
延迟时间
2
firstPos
long
开始的位置
3
lastPos
long
结束的位置
4
num
int
消息的数量
5
magic
int
暂时未用
timerlog 主要逻辑都在TimeLog类中,下面为存储的日志格式
序号
名称
长度
描述
1
size
int
日志单元固定长度
2
pre pos
long
上一条日志的pos
3
magic value
int
标记消息处理逻辑delete(删除消息的消息)或者roll处理(消息延迟时间超过限定时间,需要做roll处理来进行展期)
4
curr write time
long
for trace写入时间轮的时间戳
5
delayed time
int
for check
6
offsetPy
long
在commitLog中的位置
7
sizePy
int
在commitLog中存储的字节数
8
hash code of real topic
int
消息后续投放的真正topic名称的hash值
9
reserved value
long
预留值,暂时没有作用
timerwheel, timerlog, checkpoint, config/timercheck, config/timermetrics都是时间轮调度消息用到的文件