Loki日志平台

Loki是一个Grafana的开源项目,用以低成本的构建日志系统。Loki只对日志的MetaData构建索引,日志的内容则会进行压缩存储。后端支持多种存储,官方比较推荐使用分布式对象存储MinIO。

部署方式

支持多种部署方式:包括k8s、docker部署,还有windows、linux、mac二进制部署。
还支持多种部署架构:单节点单磁盘、单节点多磁盘、多节点多磁盘。正式环境首选多节点多磁盘部署,具有高可用,企业级性能和可扩展性。由于高可用的需要,需要磁盘的数量需要为4的整数倍。
如下步骤都在centos7.9发行版进行

Read More

[ RocketMQ源码阅读 7 ] CommitLog文件刷盘方式

之前我们进行RocketMQ的搭建,其中有一个参数是用来配置刷盘方式的。存在“同步”和“异步”两种方式。

1
flushDiskType=ASYNC_FLUSH|SYNC_FLUSH

和刷新磁盘逻辑相关的代码可以从这里开始看DefaultFlushManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class DefaultFlushManager implements FlushManager {
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
private final FlushCommitLogService commitRealTimeService;

public DefaultFlushManager() {
if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new CommitLog.GroupCommitService();//同步
} else {
this.flushCommitLogService = new CommitLog.FlushRealTimeService();//异步
}
this.commitRealTimeService = new CommitLog.CommitRealTimeService();
}

@Override public void start() {
this.flushCommitLogService.start();
if (defaultMessageStore.isTransientStorePoolEnable()) {
this.commitRealTimeService.start();
}
}

从构造函数可以看到,要理解刷盘的行为,需要搞懂GroupCommitService同步刷盘,FlushRealTimeService 异步刷盘和CommitRealTimeService这三个类的名称取的不是很便于记忆和理解。

为了理解刷盘操作,我们去看更上一层的逻辑。在Broker接收到客户端的消息写入请求,并完成一系列的解析、校验放入内存等工作后。后续就需要将消息持久化到磁盘。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {//同步刷盘
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {//消息中是否存在需要等待消息落盘的属性
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
flushDiskWatcher.add(request);
service.putRequest(request);//提交刷盘请求,将request存入队列后,会立马调用一次wakeup()
return request.future(); //返回刷盘请求,异步等待句柄
} else {
service.wakeup();//唤醒刷盘线程
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// Asynchronous flush
else {
if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {//按照配置不同,分别唤醒不同的刷盘线程
flushCommitLogService.wakeup();
} else {
commitRealTimeService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}

GroupCommitService和FlushRealTimeService的主要区别在于调用flush传入的刷新数据页数(RocketMQ内部逻辑概念,和计算机系统的页无关)。GroupCommitService每次都做刷新都传入0,FlushRealTimeService则按照规则进行计算出页数。我这边按照原逻辑改写的容易理解的伪代码:

1
2
3
4
5
int flushPhysicQueueLeastPages = 4;
if (currentTimeMillis >= (lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
flushPhysicQueueLeastPages = 0;
}
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);

这段代码避免了短时间内进行多次全量刷盘,从而提高了刷盘效率和性能呢。正因为异步刷盘不是每次都是全量刷盘,从这个角度来看才会被称为异步刷盘,其实本质上都是异步进行刷盘的。

transientStorePoolEnable

我们在上述刷盘代码中看到了此配置项。该配置项是为了提高IO性能,但是在Broker JVM进程异常退出的时候增加丢消息的风险。感兴趣的同学可以看这篇文章

[ RocketMQ源码阅读 6 ] Broker磁盘文件格式与作用

Broker的功能点很多,安装程序启动的顺序去看源码,发现代码量比之前的组件要大很多。阅读过程中发现Broker会去持久化一些配置,并且会将消息数据存储在磁盘上。
整理和检索了网上的一些资料,列出了这些文件和相应的作用,如下。

  • store
    • commitlog
      • 000000000
      • xxxxxxxxxx
    • compaction
      • compactionLog
        • {topic}
          • 0
          • 1
          • {queueId}
      • compactionCq
        • {topic}
          • 0
          • 1
          • {queueId}
    • config
      • delayOffset.json
      • broker.properties
      • topics.json
      • topicQueueMapping.json
      • consumerOffset.json
      • lmqConsumerOffset.json
      • consumerOrderInfo.json
      • subscriptionGroup.json
      • timercheck
      • timermetrics
      • consumerFilter.json
      • messageRequestMode.json
      • tieredStoreMetadata.json
    • consumequeue
      • {topic}
        • 0
          • 00000000000000000000
        • 1
        • {queueId}
    • index
      • 20240305101010000
    • abort
    • checkpoint
    • lock
    • timerwheel
    • timerlog
      • 00000000000000000000

Read More

中年人的第一台NAS

最近闲来无事,想体验一下NAS的玩法。那么说干就干。

发扬垃圾佬的精神,追求极致性价比。性能、功耗、价格不可能三角,尽量把钱花在刀刃上。考虑静音,所以必须要no fans。

网上查了一些资料,大概看了如下几款cpu。

cpu 制程 主频 TDP 价格
J3160 14nm 4核4线程 1.6GHz 6w 板u 100 rmb左右
J1900 22nm 4核4线程 2.42GHz 10w 整个小主机或者软路由某鱼200rmb
N5095 10nm 4核4线程 2GHz 15w 板U 538rmb

从这3个cpu看,J3160不论功耗还是价格,对我来说是最为合适的。而且ddr3的内存也是价格实惠。功耗比较喜人才6w。算一下一年电费多少,假设整机功耗10w:


10 * 24 * 365 * 0.6元/1000 = 52元

相当喜人。。。

配件 来源 价格
铭瑄N3160 闲鱼 106
动力之星dc转atx电源转换卡 拼多多 43
悠品12V5Adc电源 京东一手 38
ddr3 1600内存条8GB台式机内存 闲鱼 43
开机按钮 拼多多 5
硬盘120GB SSD 闲鱼 43
8GB u盘 安装黑群晖 京东 14
SATA线*2 京东 9
总计 301

Read More

[ RocketMQ源码阅读 4 ] ControllerManager

该组件的核心就是一个Raft协议的实现。这个Raft协议的实现用的也不是淘宝系的JRaft,而是第三方的产品DLedger。对于生产要求比较严格的大厂,这个倒是比较意外。

用这个组件一般就是用来实现选主,或者当一个存储数据库来使用。前几篇有说过RocketMQ使用ControllerManager组件来实现的灾备切换,那么我们来看一下究竟是如何实现的。

对外接口

ControllerRequestProcessor

名称 描述 Raft操作类型
CONTROLLER_ALTER_SYNC_STATE_SET 往Raft日志同步syncStateSet
CONTROLLER_ELECT_MASTER 进行leader选举,将最终选出的Leader同步到Raft日志。让人震惊的是选主没有使用到Raft,而是自己实现的策略。
CONTROLLER_GET_REPLICA_INFO 按照brokerName获取所有broker信息
CONTROLLER_GET_METADATA_INFO 获取当前Raft集群所有Peer信息
BROKER_HEARTBEAT Broker进行心跳
CONTROLLER_GET_SYNC_STATE_DATA 获取syncStateSet数据
UPDATE_CONTROLLER_CONFIG 更新ControllerManager配置
GET_CONTROLLER_CONFIG 读取ControllerManager配置
CLEAN_BROKER_DATA 清理Broker数据
CONTROLLER_GET_NEXT_BROKER_ID 获取下一个BrokerId
CONTROLLER_APPLY_BROKER_ID 写入BrokerId
CONTROLLER_REGISTER_BROKER 使用唯一的BrokerId注册Broker

[ RocketMQ源码阅读 3 ] NameServer

启动过程

NameServer模块下的代码结构,项目启动类为NamesrvStartup。启动过程主要做了如下几件事情。

  1. 读取配置文件和命令行参数
  2. 初始化并启动netty服务端和netty客户端
  3. 初始化并启动NamesrvController,注册processor,提供对外接口

代码目录结构

NameServer代码结构

对外接口

processor目录下则为主要的对外接口。通过梳理NameServer的接口,以了解更多的功能细节。
注册对外接口

Read More

[ RocketMQ源码阅读 2 ] RocketMQ主要组件

我选择的源码版本是5.1.4 源码地址来进行学习,如下是来自官方的架构图

架构图

按照5.0弹性无状态代理模式的架构图,我们可以将RocketMQ 分为如下主要模块:

  1. Console/MqAdmin

  2. Proxy

  3. Broker (local模式下与Proxy部署在同一个进程,cluster模式下和Proxy分为两个进程部署)

  4. NameServer

  5. Controller(ControllerManager) 可以和NameServer部署在同一个进程

该组件使得RocketMQ具有,主备自动切换的能力。

6.各种语言的Client

各个组件的功能作用,后面再一个个分析和解释。但我想有些中间件和架构爱好者看这个图,就能大概知道各个组件有啥用途。

[ RocketMQ源码阅读 1 ]带着疑问去阅读源码

最近上班比较清闲,之前一直在微信读书上读了不少闲书。颇有游手好闲,不务正业的样子。最近内心思索半天,觉得这样下去自己的职业生涯可能走不远。又由于最近公司在考虑消息队列的选型,所以想着花点时间去深入了解一下消息队列的机制。一方面是对RocketMQ比较感兴趣;另一方面也是希望对所学的技术做一个记录,将来出去面试,作为展示自我技术学习成果的窗口。

为什么选择RocketMQ?

Java 事务消息 社区活跃 支撑阿里的业务

我准备带着一些问题去进行阅读,相比于一头扎进代码海洋,这样更有针对性,并且收获可能更大。这样的一个大型开源产品,不去看一些细枝末节的细节,可能更有效率,也更节约时间。

带着哪些问题去看?

1、NameServer服务注册机制?

2、消息底层文件存储机制?

3、定时消息实现机制?

4、顺序消息实现机制?

5、事务型消息实现机制?

6、高可用、故障转移机制?