之前我们进行RocketMQ的搭建,其中有一个参数是用来配置刷盘方式的。存在“同步”和“异步”两种方式。
1 | flushDiskType=ASYNC_FLUSH|SYNC_FLUSH |
和刷新磁盘逻辑相关的代码可以从这里开始看DefaultFlushManager
1 | class DefaultFlushManager implements FlushManager { |
从构造函数可以看到,要理解刷盘的行为,需要搞懂GroupCommitService同步刷盘,FlushRealTimeService 异步刷盘和CommitRealTimeService这三个类的名称取的不是很便于记忆和理解。
为了理解刷盘操作,我们去看更上一层的逻辑。在Broker接收到客户端的消息写入请求,并完成一系列的解析、校验放入内存等工作后。后续就需要将消息持久化到磁盘。
1 | public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt) { |
GroupCommitService和FlushRealTimeService的主要区别在于调用flush传入的刷新数据页数(RocketMQ内部逻辑概念,和计算机系统的页无关)。GroupCommitService每次都做刷新都传入0,FlushRealTimeService则按照规则进行计算出页数。我这边按照原逻辑改写的容易理解的伪代码:
1 | int flushPhysicQueueLeastPages = 4; |
这段代码避免了短时间内进行多次全量刷盘,从而提高了刷盘效率和性能呢。正因为异步刷盘不是每次都是全量刷盘,从这个角度来看才会被称为异步刷盘,其实本质上都是异步进行刷盘的。
transientStorePoolEnable
我们在上述刷盘代码中看到了此配置项。该配置项是为了提高IO性能,但是在Broker JVM进程异常退出的时候增加丢消息的风险。感兴趣的同学可以看这篇文章