[ RocketMQ源码阅读 7 ] CommitLog文件刷盘方式

之前我们进行RocketMQ的搭建,其中有一个参数是用来配置刷盘方式的。存在“同步”和“异步”两种方式。

1
flushDiskType=ASYNC_FLUSH|SYNC_FLUSH

和刷新磁盘逻辑相关的代码可以从这里开始看DefaultFlushManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class DefaultFlushManager implements FlushManager {
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
private final FlushCommitLogService commitRealTimeService;

public DefaultFlushManager() {
if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new CommitLog.GroupCommitService();//同步
} else {
this.flushCommitLogService = new CommitLog.FlushRealTimeService();//异步
}
this.commitRealTimeService = new CommitLog.CommitRealTimeService();
}

@Override public void start() {
this.flushCommitLogService.start();
if (defaultMessageStore.isTransientStorePoolEnable()) {
this.commitRealTimeService.start();
}
}

从构造函数可以看到,要理解刷盘的行为,需要搞懂GroupCommitService同步刷盘,FlushRealTimeService 异步刷盘和CommitRealTimeService这三个类的名称取的不是很便于记忆和理解。

为了理解刷盘操作,我们去看更上一层的逻辑。在Broker接收到客户端的消息写入请求,并完成一系列的解析、校验放入内存等工作后。后续就需要将消息持久化到磁盘。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {//同步刷盘
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {//消息中是否存在需要等待消息落盘的属性
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
flushDiskWatcher.add(request);
service.putRequest(request);//提交刷盘请求,将request存入队列后,会立马调用一次wakeup()
return request.future(); //返回刷盘请求,异步等待句柄
} else {
service.wakeup();//唤醒刷盘线程
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// Asynchronous flush
else {
if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {//按照配置不同,分别唤醒不同的刷盘线程
flushCommitLogService.wakeup();
} else {
commitRealTimeService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}

GroupCommitService和FlushRealTimeService的主要区别在于调用flush传入的刷新数据页数(RocketMQ内部逻辑概念,和计算机系统的页无关)。GroupCommitService每次都做刷新都传入0,FlushRealTimeService则按照规则进行计算出页数。我这边按照原逻辑改写的容易理解的伪代码:

1
2
3
4
5
int flushPhysicQueueLeastPages = 4;
if (currentTimeMillis >= (lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
flushPhysicQueueLeastPages = 0;
}
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);

这段代码避免了短时间内进行多次全量刷盘,从而提高了刷盘效率和性能呢。正因为异步刷盘不是每次都是全量刷盘,从这个角度来看才会被称为异步刷盘,其实本质上都是异步进行刷盘的。

####transientStorePoolEnable
我们在上述刷盘代码中看到了此配置项。该配置项是为了提高IO性能,但是在Broker JVM进程异常退出的时候增加丢消息的风险。感兴趣的同学可以看这篇文章

[ RocketMQ源码阅读 6 ] Broker磁盘文件格式与作用

Broker的功能点很多,安装程序启动的顺序去看源码,发现代码量比之前的组件要大很多。阅读过程中发现Broker会去持久化一些配置,并且会将消息数据存储在磁盘上。
整理和检索了网上的一些资料,列出了这些文件和相应的作用,如下。

  • store
    • commitlog
      • 000000000
      • xxxxxxxxxx
    • compaction
      • compactionLog
        • {topic}
          • 0
          • 1
          • {queueId}
      • compactionCq
        • {topic}
          • 0
          • 1
          • {queueId}
    • config
      • delayOffset.json
      • broker.properties
      • topics.json
      • topicQueueMapping.json
      • consumerOffset.json
      • lmqConsumerOffset.json
      • consumerOrderInfo.json
      • subscriptionGroup.json
      • timercheck
      • timermetrics
      • consumerFilter.json
      • messageRequestMode.json
      • tieredStoreMetadata.json
    • consumequeue
      • {topic}
        • 0
          • 00000000000000000000
        • 1
        • {queueId}
    • index
      • 20240305101010000
    • abort
    • checkpoint
    • lock
    • timerwheel
    • timerlog
      • 00000000000000000000

CommitLog

该目录下存储了消息内容文件,默认每个文件1GB大小。目录下会包含多个文件,相邻的两个文件名称的差值正好为1GB。
文件格式:

序号 名称 长度 描述
1 totalSize int 消息总字节数
2 magicCode int 用于标记消息协议的版本
3 bodyCRC int crc校验码,用于校验消息内容是否正确
4 queueId int 所属queueId
5 flag int 消息的类型标记
6 queueOffset long
7 physicOffset long
8 sysFlag int 标记消息类型
9 bornTimeStamp long 消息产生的日期
10 bornHost 8字节或者20字节 ipv4或者ipv6的差别
11 storeTimestamp long 存储消息的时间戳,在Broker收到消息的时候赋值
12 storeHostAddress 8字节或者20字节 存储消息的Broker进程地址
13 reconsumeTimes int 被重复消费的次数
14 preparedTransactionOffset long
15 bodyLen int 消息内容长度
16 body bodyLen 消息内容
17 topic 长度取决于从magicCode中获取的topic长度
18 propertiesLength short properties内容的长度

ConsumeQueue

每个Topic存在n(1..n)个数量的Queue,每个Queue对应一个ConsumQueue文件。该文件用于记录属于某个Topic的消息在CommitLog中的偏移量。

/**
 * ConsumeQueue's store unit. Format:
 * <pre>
 * ┌───────────────────────────────┬───────────────────┬───────────────────────────────┐
 * │    CommitLog Physical Offset  │      Body Size    │            Tag HashCode       │
 * │          (8 Bytes)            │      (4 Bytes)    │             (8 Bytes)         │
 * ├───────────────────────────────┴───────────────────┴───────────────────────────────┤
 * │                                     Store Unit                                    │
 * │                                                                                   │
 * </pre>
 * ConsumeQueue's store unit. Size: CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) = 20 Bytes
 */

CommitLog Physical Offset记录了消息在CommitLog中全局的offset,Body Size为消息的总长度。

ConsumeQueue文件是由DefaultMessageStore内部类ReputMessageService来完成的,该服务是一个定时任务,每隔1ms执行一次。主要看doReput函数做了哪些事情:

  1. 读取CommitLog文件内容,然后构建ConsumeQueue文件、Index、compact文件
    1
    DefaultMessageStore.this.doDispatch(dispatchRequest)
    doDisPatch函数会调用如下三个dispatcher,完成构建文件的任务
    1
    2
    3
    4
    5
    6
    7
    8
    this.dispatcherList = new LinkedList<>();
    this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); // 1
    this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); // 2
    if (messageStoreConfig.isEnableCompaction()) {
    this.compactionStore = new CompactionStore(this);
    this.compactionService = new CompactionService(commitLog, this, compactionStore);
    this.dispatcherList.addLast(new CommitLogDispatcherCompaction(compactionService));//3
    }
    对于非事务消息和事务消息的提交消息,写入到ConsumeQueue文件,查看CommitLogDispatcherBuildConsumeQueue代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
    final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
    switch (tranType) {
    case MessageSysFlag.TRANSACTION_NOT_TYPE:
    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
    DefaultMessageStore.this.putMessagePositionInfo(request);
    break;
    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
    break;
    }
    }
    }

与ComsumeQueue文件相关的还有consumequeue_ext文件,这个文件存放了一些扩展信息,查看CosumeQueue#putMessagePositionInfoWrapper(DispatchRequest request)函数的相关代码。如下:

1
2
3
4
5
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());
long extAddr = this.consumeQueueExt.put(cqExtUnit); //写入文件
  1. 通知客户端有新消息
    1
    2
    3
    4
    5
    DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
    notifyMessageArrive4MultiQueue(dispatchRequest);

Index

携带key的消息会构建索引文件,用于按照key来进行消息检索。查看CommitLogDispatcherBuildIndex类,最大单个文件2kw条索引数据。
每个Index文件的格式如下:
IndexHeader(40bytes) + Slots(固定500w个 * 4bytes) + IndexItems(最多2000w个 * 20bytes)

IndexHeader格式:
Begin TimeStamp(8bytes) + End TimeStamp(8bytes) + Begin Physical Offset(8bytes) + End Physical Offset(8bytes) + Hash Slot Count(4bytes) + Index Count(4bytes)

每个Slot存储当前分配到此Slot下最新的Index count,通过此数据可以计算出此slot下最新一条IndexItem的位置。

IndexItem的格式:
Key HashCode(4bytes) + Physical Offset(8bytes) + Time Diff(4bytes) + Next Index Pos(4bytes)

Time Diff用于和Header中的开始和结束时间戳来比较,结合Key HashCode来判断索引是否命中数据。
Next Index Pos用于指向之前命中该Slot的IndexItem。

RocketMQ Index结构

Compaction

该目录下的文件,有特殊的用途。更像是核心功能开发完成之后,以打补丁的方式加入的特殊功能。在不依赖于其他存储的情况喜爱,用来存储kv数据。

Compact的过程是将Commitlog的数据按照某个topic下的某个queue(也称为partition),对于数据按key的维度只保留最新的数据。当然ConsumeQueue文件也需要做相应的Compact。

可以阅读如下文章了解更多细节。
RocketMQ Compaction Topic的设计与实现

Config

文件名称 处理类 描述
delayOffset.json ScheduleMessageService 延迟消息消费进度
broker.properties BrokerController Broker配置
topics.json TopicConfigManager 存储每个topic的读写队列数、权限、是否顺序等信息
topicQueueMapping.json TopicQueueMappingManager 当前Broker存储了哪些queue
consumerOffset.json ConsumerOffsetManager 消费进度数据
lmqConsumerOffset.json LmqConsumerOffsetManager lmq模式下的消费进度数据
consumerOrderInfo.json ConsumerOrderInfoManager 维护了topic+group+queueId维度的顺序消费情况OrderInfo
subscriptionGroup.json SubscriptionGroupManager SubscriptionGroup配置,订阅组配置内容,还有包含了一些内置的配置
timercheck TimerCheckpoint 存储时间轮消息当前的瞬时状态
timermetrics TimerMetrics 用于统计各个时间的时间轮消息的条数
consumerFilter.json ConsumerFilterManager 每个Topic的ConsumerGroup消费过滤规则
messageRequestMode.json MessageRequestModeManager 配置ConsumerGroup使用pop或pull模式消费
tieredStoreMetadata.json TieredMetadataManager 分级存储topic、queue以及消息存储文件的meta data

abort

DefaultMessageStore在启动Broker,加载磁盘日志文件之前,用来判断上次是否是结束进程的。内容存储了上次启动Broker的pid。

checkpoint

存储了当前消息刷盘的offset,用于重启或者故障后的恢复。

数据名称 描述
lastReadTimeMs 上次消费的时间节点
lastTimerLogFlushPos 最后刷新log的pos
lastTimerQueueOffset 最后一次消费的队列节点
masterTimerQueueOffset 主 Broker 的队列消费节点

lock

防止本地启动多个以此目录为存储目录的Broker进程。

timerwheel

该文件存储的内容如下

序号 名称 长度 描述
1 delayTime long 延迟时间
2 firstPos long 开始的位置
3 lastPos long 结束的位置
4 num int 消息的数量
5 magic int 暂时未用

timerlog

主要逻辑都在TimeLog类中,下面为存储的日志格式

序号 名称 长度 描述
1 size int 日志单元固定长度
2 pre pos long 上一条日志的pos
3 magic value int 标记消息处理逻辑delete(删除消息的消息)或者roll处理(消息延迟时间超过限定时间,需要做roll处理来进行展期)
4 curr write time long for trace写入时间轮的时间戳
5 delayed time int for check
6 offsetPy long 在commitLog中的位置
7 sizePy int 在commitLog中存储的字节数
8 hash code of real topic int 消息后续投放的真正topic名称的hash值
9 reserved value long 预留值,暂时没有作用

timerwheel, timerlog, checkpoint, config/timercheck, config/timermetrics都是时间轮调度消息用到的文件

中年人的第一台NAS

最近闲来无事,想体验一下NAS的玩法。那么说干就干。

发扬垃圾佬的精神,追求极致性价比。性能、功耗、价格不可能三角,尽量把钱花在刀刃上。考虑静音,所以必须要no fans。

网上查了一些资料,大概看了如下几款cpu。

cpu 制程 主频 TDP 价格
J3160 14nm 4核4线程 1.6GHz 6w 板u 100 rmb左右
J1900 22nm 4核4线程 2.42GHz 10w 整个小主机或者软路由某鱼200rmb
N5095 10nm 4核4线程 2GHz 15w 板U 538rmb

从这3个cpu看,J3160不论功耗还是价格,对我来说是最为合适的。而且ddr3的内存也是价格实惠。功耗比较喜人才6w。算一下一年电费多少,假设整机功耗10w:


10 * 24 * 365 * 0.6元/1000 = 52元

相当喜人。。。

配件 来源 价格
铭瑄N3160 闲鱼 106
动力之星dc转atx电源转换卡 拼多多 43
悠品12V5Adc电源 京东一手 38
ddr3 1600内存条8GB台式机内存 闲鱼 43
开机按钮 拼多多 5
硬盘120GB SSD 闲鱼 43
8GB u盘 安装黑群晖 京东 14
SATA线*2 京东 9
总计 301

主板还能淘到更便宜的,但是买个放心吧,稍微买个看着靠谱一些的。尽量选本地发货的,到手比较快。

装机一次点亮

弄个鞋盒子当机箱

打两个洞好散热

就这样吧

装好机器琢磨怎么玩,玩了一段时间,给各位汇报一下。

  1. 最近各个听歌平台都开始收费,所以某网盘找了不少的无损音乐,用cloud sync套件同步了喜欢的音乐。手机端可以用DS audio进行播放
    网上推崇的flex体验较差,不大会折腾,折腾好久。包括用了刮削工具配合flex,也不是很好的体验。
  2. 用DS file在手机端同步手机照片和视频
  3. 建共享文件夹,方便各个设备同步文件
  4. docker部署了青龙面板和赚京东豆脚本,能把电费赚回来
  5. 打开SMB服务之后,并且开启SMB1为最低支持的协议,可以在小米盒子MX播放器直接播放NAS上的视频

[ RocketMQ源码阅读 5 ] 集群部署与架构

RocketMQ官方文档介绍了多种部署方式。我们抛开Local和Cluster集群的差异(Broker和Proxy是否部署在同一个进程),再去分析几种部署方式。

部署方式

  1. 单组节点单副本
    只部署一个Broker,无灾备能力。所以不推荐生产使用。
    图1

  2. 多组节点单副本
    只部署2-3个Master,不部署Slave
    图2

问题来了,多个Master之间数据如何同步的?个人猜测是通过双写来实现,后面再去分析吧。

  1. 多组节点多副本-异步复制
    图3

  2. 多组节点多副本-同步双写
    图4

  3. 主备自动切换模式
    图5

单组节点单副本不推荐在生产环境使用。多组节点多副本模式,节点间数据同步存在两种方案,主要是在写入性能和数据同步时效性做取舍。

几种模式优缺点,官方文档已经给出部署方式。除了单组节点单副本模式,我们先一起来看一下这5种部署方式的具体配置参数。

非主备自动切换模式

本地启动双节点NameServer

1
listenPort=10001 #修改默认端口号,避免在本地启动多个NameServer进程发生端口冲突
1
listenPort=10002 #修改默认端口号,避免在本地启动多个NameServer进程发生端口冲突

在idea启动NameServer需要添加如下配置,上述修改端口的配置内容,放入-c指定的配置文件内
NameServer进程启动配置项

多组节点单副本

Broker-1的配置,作为Master节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
brokerClusterName = local  #所属集群名称
brokerName = broker-a #Broker名称
brokerId = 0 #0表示Master节点
listenPort=11001 #本地监听端口

deleteWhen = 04 #指定删除过期消息文件的时间点为凌晨4点
fileReservedTime = 48 #指定未发生更新的消息存储文件的保留时长为48小时,48小时后过期,将会被删除
brokerRole = ASYNC_MASTER #Broker角色,异步复制Master:ASYNC_MASTER,同步双写Master:SYNC_MASTER,slave节点:SLAVE
flushDiskType = ASYNC_FLUSH #消息刷新到磁盘的方式: ASYNC_FLUSH: 异步刷盘,SYNC_FLUSH:同步刷盘

autoCreateTopicEnable = true #是否允许 Broker 自动创建Topic
autoCreateSubscriptionGroup = true #是否允许 Broker 自动创建订阅组

# 这是nameserver的启动地址,broker会基于该nameserver进行通信
namesrvAddr=127.0.0.1:10001;127.0.0.1:10002

# 这是存储路径,你设置为你的rocketmq运行目录的store子目录
storePathRootDir=D:/rocketmqHome/broker-1/store

# 这是commitLog的存储路径
storePathCommitLog=D:/rocketmqHome/broker-1/store/commitlog

# consume queue文件的存储路径
storePathConsumeQueue=D:/rocketmqHome/broker-1/store/consumequeue

# 消息索引文件的存储路径
storePathIndex=D:/rocketmqHome/broker-1/index

# checkpoint文件的存储路径
storeCheckpoint=D:/rocketmqHome/broker-1/checkpoint

# abort文件的存储路径
abortFile=D:/rocketmqHome/broker-1/store/abort

Broker-2和Broker-3的配置,也是作为Master节点进行部署,只需要修改brokerName和listenPort

多组节点多副本

通过组合使用如下brokerName和brokerId参数,就可以配置出想要的多组节点多副本的集群。角色为Master的Broker brokerId=0 slave则为>0,同一个副本的BrokerName必须相同。给出一个副本配置示例如下:

1
2
3
brokerName = broker-a
brokerId = 0
brokerRole = ASYNC_MASTER
1
2
3
brokerName = broker-a
brokerId = 1
brokerRole = SLAVE

主备自动切换模式

NameServer需要增加如下配置,进而打开Controller主备自动切换模式。如下举个栗子
新增:

1
2
3
4
5
6
enableControllerInNamesrv = true
#下面三个配置Dleger组件
controllerDLegerGroup = group
#Raft的peers地址,前缀和第三个配置对应
controllerDLegerPeers = n0-127.0.0.1:12001;n1-127.0.0.1:12002;n2-127.0.0.1:12003
controllerDLegerSelfId = n0

Broker的配置需要进行修改
新增:

1
2
enableControllerMode = true #打开controller模式
controllerAddr = 127.0.0.1:12001;127.0.0.1:12002;127.0.0.1:12003 #配置controller集群的地址

删除:

1
2
brokerId =
brokerRole =

是经过测试,发现这两个参数在此部署模式下是不需要的。brokerId自动下发,brokerRole 默认为SYNC_MASTER且不可以覆盖。
测试了关闭其中一个Master,SLAVE能自动被选举为Master,做到灾备自动切换。

总结

比较一下这几种部署模式,引用官网的描述:

  1. 单组节点单副本
    不建议生产使用

  2. 多组节点单副本

  • 单个Master宕机或重启维护对应用读写无影响
  • Master宕机期间,Broker上未被消费的消息不可被订阅,影响时效性
  • Master宕机,磁盘损坏的情况下,异步刷盘丢失少量消息,同步刷盘一条不丢
  • 性能最高
  1. 多组节点多副本-异步复制
  • 单个Master宕机或重启维护对应用读写无影响
  • Master宕机,可以继续从Slave消费,不需要人工干预,不影响时效性
  • Master宕机,磁盘损坏情况下会丢失少量消息
  • 性能和第2中模式几乎一样

4.多组节点多副本-同步双写

  • 单个Master宕机或重启维护对应用读写无影响
  • Master宕机,可以继续从Slave消费,不需要人工干预,不影响时效性
  • Master宕机,磁盘损坏情况下不会丢失消息
  • 性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高
  • 目前版本在主节点宕机后,备机不能自动切换为主机

5.主备自动切换模式
相比第4中模式,能够完成主备自动切换。

总结一下,个人认为模式3,5比较适合生产部署。3 侧重性能,5 侧重可靠性。4模式由于不具备灾备切换的能力所以不建议部署。除非在5的模式中如果Controller组件经常发生故障,那么考虑用4替换5的模式。

[ RocketMQ源码阅读 4 ] ControllerManager

该组件的核心就是一个Raft协议的实现。这个Raft协议的实现用的也不是淘宝系的JRaft,而是第三方的产品DLedger。对于生产要求比较严格的大厂,这个倒是比较意外。

用这个组件一般就是用来实现选主,或者当一个存储数据库来使用。前几篇有说过RocketMQ使用ControllerManager组件来实现的灾备切换,那么我们来看一下究竟是如何实现的。

对外接口

ControllerRequestProcessor

名称 描述 Raft操作类型
CONTROLLER_ALTER_SYNC_STATE_SET 往Raft日志同步syncStateSet
CONTROLLER_ELECT_MASTER 进行leader选举,将最终选出的Leader同步到Raft日志。让人震惊的是选主没有使用到Raft,而是自己实现的策略。
CONTROLLER_GET_REPLICA_INFO 按照brokerName获取所有broker信息
CONTROLLER_GET_METADATA_INFO 获取当前Raft集群所有Peer信息
BROKER_HEARTBEAT Broker进行心跳
CONTROLLER_GET_SYNC_STATE_DATA 获取syncStateSet数据
UPDATE_CONTROLLER_CONFIG 更新ControllerManager配置
GET_CONTROLLER_CONFIG 读取ControllerManager配置
CLEAN_BROKER_DATA 清理Broker数据
CONTROLLER_GET_NEXT_BROKER_ID 获取下一个BrokerId
CONTROLLER_APPLY_BROKER_ID 写入BrokerId
CONTROLLER_REGISTER_BROKER 使用唯一的BrokerId注册Broker

[ RocketMQ源码阅读 3 ] NameServer

启动过程

NameServer模块下的代码结构,项目启动类为NamesrvStartup。启动过程主要做了如下几件事情。

  1. 读取配置文件和命令行参数
  2. 初始化并启动netty服务端和netty客户端
  3. 初始化并启动NamesrvController,注册processor,提供对外接口

代码目录结构

NameServer代码结构

对外接口

processor目录下则为主要的对外接口。通过梳理NameServer的接口,以了解更多的功能细节。
注册对外接口

ClientRequestProcessor

接口名称 接口功能
GET_ROUTEINFO_BY_TOPIC 按照topic名称获取路由信息

DefaultRequestProcessor

接口名称 接口功能
PUT_KV_CONFIG 修改kv配置
GET_KV_CONFIG 获取kv配置
DELETE_KV_CONFIG 删除kv配置
GET_KVLIST_BY_NAMESPACE 获取某个namespace的kv配置
QUERY_DATA_VERSION 获取缓存的broker元数据版本信息,并通过版本号判断数据是否有变化
REGISTER_BROKER broker注册
UNREGISTER_BROKER broker注销
BROKER_HEARTBEAT broker心跳
GET_BROKER_MEMBER_GROUP 按照clusterName和brokerName获取所有注册信息
GET_BROKER_CLUSTER_INFO 获取所有broker注册信息
WIPE_WRITE_PERM_OF_BROKER 关闭某个broker的写操作
ADD_WRITE_PERM_OF_BROKER 打开某个broker的写操作
GET_ALL_TOPIC_LIST_FROM_NAMESERVER 获取所有topic列表
DELETE_TOPIC_IN_NAMESRV 删除topic
REGISTER_TOPIC_IN_NAMESRV 注册topic
GET_TOPICS_BY_CLUSTER 获取某个cluster下的所有topic列表
GET_SYSTEM_TOPIC_LIST_FROM_NS 获取系统topic列表
GET_UNIT_TOPIC_LIST 获取unit topic列表
GET_HAS_UNIT_SUB_TOPIC_LIST
GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST
GET_NAMESRV_CONFIG 读取NameServer所有的配置
UPDATE_NAMESRV_CONFIG 更新NameServer的配置

通过这些api能大致了解到NameServer部分核心功能。NameServer主要的作用是开辟了一些内存空间,用于存放Topic与Broker的元数据信息。

核心组件

  1. NamesrvController
    提供对外接口,心跳检查Broker注册信息是否过期
  2. RouteInfoManager
    缓存topic、broker、queue的路由信息

队列水位

NamesrvController存在两个队列defaultThreadPoolQueueclientRequestThreadPoolQueue,后台会定时打印这两个队列的长度和消费延迟时间。

队列 默认容量 功能
defaultThreadPoolQueue 10000 执行对于DefaultRequestProcessor的请求
clientRequestThreadPoolQueue 50000 执行对于ClientRequestProcessor的请求

一些观察结果

  1. 4.0和5.0都Broker选主机制是有差别的,4.0没有使用Raft协议进行选主,5.0则是采用了Raft协议
  2. NamesrvController会定期检查Broker注册信息是否过期,超过120s未收到Broker的心跳,就会注销Broker

[ RocketMQ源码阅读 2 ] RocketMQ主要组件

我选择的源码版本是5.1.4 源码地址来进行学习,如下是来自官方的架构图

架构图

按照5.0弹性无状态代理模式的架构图,我们可以将RocketMQ 分为如下主要模块:

  1. Console/MqAdmin

  2. Proxy

  3. Broker (local模式下与Proxy部署在同一个进程,cluster模式下和Proxy分为两个进程部署)

  4. NameServer

  5. Controller(ControllerManager) 可以和NameServer部署在同一个进程

该组件使得RocketMQ具有,主备自动切换的能力。

6.各种语言的Client

各个组件的功能作用,后面再一个个分析和解释。但我想有些中间件和架构爱好者看这个图,就能大概知道各个组件有啥用途。

[ RocketMQ源码阅读 1 ]带着疑问去阅读源码

最近上班比较清闲,之前一直在微信读书上读了不少闲书。颇有游手好闲,不务正业的样子。最近内心思索半天,觉得这样下去自己的职业生涯可能走不远。又由于最近公司在考虑消息队列的选型,所以想着花点时间去深入了解一下消息队列的机制。一方面是对RocketMQ比较感兴趣;另一方面也是希望对所学的技术做一个记录,将来出去面试,作为展示自我技术学习成果的窗口。

为什么选择RocketMQ?

Java 事务消息 社区活跃 支撑阿里的业务

我准备带着一些问题去进行阅读,相比于一头扎进代码海洋,这样更有针对性,并且收获可能更大。这样的一个大型开源产品,不去看一些细枝末节的细节,可能更有效率,也更节约时间。

带着哪些问题去看?

1、NameServer服务注册机制?

2、消息底层文件存储机制?

3、定时消息实现机制?

4、顺序消息实现机制?

5、事务型消息实现机制?

6、高可用、故障转移机制?