[ RocketMQ源码阅读 7 ] CommitLog文件刷盘方式

之前我们进行RocketMQ的搭建,其中有一个参数是用来配置刷盘方式的。存在“同步”和“异步”两种方式。

1
flushDiskType=ASYNC_FLUSH|SYNC_FLUSH

和刷新磁盘逻辑相关的代码可以从这里开始看DefaultFlushManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class DefaultFlushManager implements FlushManager {
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
private final FlushCommitLogService commitRealTimeService;

public DefaultFlushManager() {
if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new CommitLog.GroupCommitService();//同步
} else {
this.flushCommitLogService = new CommitLog.FlushRealTimeService();//异步
}
this.commitRealTimeService = new CommitLog.CommitRealTimeService();
}

@Override public void start() {
this.flushCommitLogService.start();
if (defaultMessageStore.isTransientStorePoolEnable()) {
this.commitRealTimeService.start();
}
}

从构造函数可以看到,要理解刷盘的行为,需要搞懂GroupCommitService同步刷盘,FlushRealTimeService 异步刷盘和CommitRealTimeService这三个类的名称取的不是很便于记忆和理解。

为了理解刷盘操作,我们去看更上一层的逻辑。在Broker接收到客户端的消息写入请求,并完成一系列的解析、校验放入内存等工作后。后续就需要将消息持久化到磁盘。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {//同步刷盘
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {//消息中是否存在需要等待消息落盘的属性
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
flushDiskWatcher.add(request);
service.putRequest(request);//提交刷盘请求,将request存入队列后,会立马调用一次wakeup()
return request.future(); //返回刷盘请求,异步等待句柄
} else {
service.wakeup();//唤醒刷盘线程
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// Asynchronous flush
else {
if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {//按照配置不同,分别唤醒不同的刷盘线程
flushCommitLogService.wakeup();
} else {
commitRealTimeService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}

GroupCommitService和FlushRealTimeService的主要区别在于调用flush传入的刷新数据页数(RocketMQ内部逻辑概念,和计算机系统的页无关)。GroupCommitService每次都做刷新都传入0,FlushRealTimeService则按照规则进行计算出页数。我这边按照原逻辑改写的容易理解的伪代码:

1
2
3
4
5
int flushPhysicQueueLeastPages = 4;
if (currentTimeMillis >= (lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
flushPhysicQueueLeastPages = 0;
}
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);

这段代码避免了短时间内进行多次全量刷盘,从而提高了刷盘效率和性能呢。正因为异步刷盘不是每次都是全量刷盘,从这个角度来看才会被称为异步刷盘,其实本质上都是异步进行刷盘的。

transientStorePoolEnable

我们在上述刷盘代码中看到了此配置项。该配置项是为了提高IO性能,但是在Broker JVM进程异常退出的时候增加丢消息的风险。感兴趣的同学可以看这篇文章