Nacos Naming Server源码分析-服务注册注销

之前分析客户端的代码,发现Naming相关的后端调用主要有这几个,代码为NamingClientProxyDelegate这个class:

  1. registerService, deregisterService (grpc和http 两种实现)
  2. subscribe, unsubscribe grpc实现
  3. getServiceList grpc实现

我们先去看Naming Server的grpc实现

registerService

代码调用链路,grpc方式注册的服务,默认为临时服务。

  1. InstanceRequestHandler#handle(InstanceRequest request, RequestMeta meta)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
    Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
    InstanceUtil.setInstanceIdIfEmpty(request.getInstance(), service.getGroupedServiceName());
    switch (request.getType()) {
    case NamingRemoteConstants.REGISTER_INSTANCE:
    return registerInstance(service, request, meta); // 此处继续看下去
    case NamingRemoteConstants.DE_REGISTER_INSTANCE:
    return deregisterInstance(service, request, meta);
    default:
    throw new NacosException(NacosException.INVALID_PARAM,
    String.format("Unsupported request type %s", request.getType()));
    }
    }
  2. InstanceRequestHandler#registerInstance(Service service, InstanceRequest request, RequestMeta meta)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
    throws NacosException {
    clientOperationService
    .registerInstance(service, request.getInstance(), meta.getConnectionId()); // 此处继续看下去
    NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
    meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
    request.getInstance().getIp(), request.getInstance().getPort()));
    return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }
  3. EphemeralClientOperationServiceImpl#registerInstance(Service service, Instance instance, String clientId)
    默认注册的临时服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);//校验实例是否过期

Service singleton = ServiceManager
.getInstance().getSingleton(service); //获取单例服务
if (!singleton.isEphemeral()) {
throw new NacosRuntimeException(NacosException.INVALID_PARAM,
String.format("Current service %s is persistent service, can't register ephemeral instance.",
singleton.getGroupedServiceName()));
}
Client client = clientManager.getClient(clientId);//获取已经连接的客户端信息
checkClientIsLegal(client, clientId); //校验是否是临时实例
InstancePublishInfo instanceInfo = getPublishInfo(instance);//增加一些额外信息,对象转换
client.addServiceInstance(singleton, instanceInfo);// 此处继续看下去
client.setLastUpdatedTime();
client.recalculateRevision();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
  1. AbstractClient#addServiceInstance(Service service, InstancePublishInfo instanceInfo)
1
2
3
4
5
6
7
8
9
10
11
12
13
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
if (instancePublishInfo instanceof BatchInstancePublishInfo) {
InstancePublishInfo old = publishers.put(service, instancePublishInfo);
MetricsMonitor.incrementIpCountWithBatchRegister(old, (BatchInstancePublishInfo) instancePublishInfo);
} else {
if (null == publishers.put(service, instancePublishInfo)) {
MetricsMonitor.incrementInstanceCount();
}
}
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
return true;
}

这里最终将实例信息,保存到一个map,做了metric统计和发布一个事件,上述列出的代码总共发布了4个事件:

  1. RegisterInstanceTraceEvent 这种TraceEvent类事件作为Nacos的扩展机制使用,需要自己定制处理逻辑
  2. ClientOperationEvent.ClientRegisterServiceEvent
  3. MetadataEvent.InstanceMetadataEvent
  4. ClientEvent.ClientChangedEvent

Read More

Nacos权限模块整合方案

Nacos是阿里巴巴开源的,用于服务发现和配置管理的中间件。配置中心已经使用Apollo,所以我们只需要使用服务发现能力即可。

问题

学习研究Nacos过程中,发现如下几点:

  1. Nacos的支持插件机制,包括权限模块
  2. Nacos管理后台,支持使用LDAP的方式对接已有权限系统
  3. Spring-cloud-nacos客户端需要配置用户名和密码,才能正常注册

第2点是为了用户方便使用公司内部账号访问Nacos。但这结合第3点使用就出现了冲突。假设一个部门多个人同时进行开发,这样某个人的用户名和密码就会暴露给了其他开发人员。

1
2
3
spring.cloud.nacos.discovery.namespace=provider
spring.cloud.nacos.discovery.username=test
spring.cloud.nacos.discovery.password=123456

Nacos本地身份验证和Ldap身份验证的区别

翻了一下源码,这两个方式,主要区别在于,ldap方式会先用Nacos本地身份验证,如果验证失败,才会使用ldap方式验证。

manager类涉及到身份认证,区别在上述已经说明。authPluginService涉及权限验证,两种方式代码完全是一样的,不存在差别。
鉴权模块类图

Read More

Go语言协程调度器GPM模型

为了提高Go程序对于CPU的利用率。Go语言的设计者,设计了GPM模型,并且实现了一个高效的协程调度器。

协程Goroutine

从操作系统层面去看,一般可以把线程分为“用户态”和“内核态”。“用户态”可以理解为编程语言实现的线程,而“内核态”可以理解为操作系统实现的线程。“用户态”线程必须和“内核态”线程绑定,才能正常执行。
一个线程中的用户态和内核态

GPM模型

Go的Goroutine即为“用户态”线程的Go语言实现。而管理Goroutine和“内核态”线程绑定关系的管理器,被称为“协程调度器”。“协程调度器”的模型有三个组件组成:G(协程Goroutine)、M(线程Thread)、P(处理器Processor)。
GPM模型

Read More

Loki日志平台

Loki是一个Grafana的开源项目,用以低成本的构建日志系统。Loki只对日志的MetaData构建索引,日志的内容则会进行压缩存储。后端支持多种存储,官方比较推荐使用分布式对象存储MinIO。

部署方式

支持多种部署方式:包括k8s、docker部署,还有windows、linux、mac二进制部署。
还支持多种部署架构:单节点单磁盘、单节点多磁盘、多节点多磁盘。正式环境首选多节点多磁盘部署,具有高可用,企业级性能和可扩展性。由于高可用的需要,需要磁盘的数量需要为4的整数倍。
如下步骤都在centos7.9发行版进行

Read More

[ RocketMQ源码阅读 7 ] CommitLog文件刷盘方式

之前我们进行RocketMQ的搭建,其中有一个参数是用来配置刷盘方式的。存在“同步”和“异步”两种方式。

1
flushDiskType=ASYNC_FLUSH|SYNC_FLUSH

和刷新磁盘逻辑相关的代码可以从这里开始看DefaultFlushManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class DefaultFlushManager implements FlushManager {
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
private final FlushCommitLogService commitRealTimeService;

public DefaultFlushManager() {
if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new CommitLog.GroupCommitService();//同步
} else {
this.flushCommitLogService = new CommitLog.FlushRealTimeService();//异步
}
this.commitRealTimeService = new CommitLog.CommitRealTimeService();
}

@Override public void start() {
this.flushCommitLogService.start();
if (defaultMessageStore.isTransientStorePoolEnable()) {
this.commitRealTimeService.start();
}
}

从构造函数可以看到,要理解刷盘的行为,需要搞懂GroupCommitService同步刷盘,FlushRealTimeService 异步刷盘和CommitRealTimeService这三个类的名称取的不是很便于记忆和理解。

为了理解刷盘操作,我们去看更上一层的逻辑。在Broker接收到客户端的消息写入请求,并完成一系列的解析、校验放入内存等工作后。后续就需要将消息持久化到磁盘。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {//同步刷盘
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {//消息中是否存在需要等待消息落盘的属性
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
flushDiskWatcher.add(request);
service.putRequest(request);//提交刷盘请求,将request存入队列后,会立马调用一次wakeup()
return request.future(); //返回刷盘请求,异步等待句柄
} else {
service.wakeup();//唤醒刷盘线程
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// Asynchronous flush
else {
if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {//按照配置不同,分别唤醒不同的刷盘线程
flushCommitLogService.wakeup();
} else {
commitRealTimeService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}

GroupCommitService和FlushRealTimeService的主要区别在于调用flush传入的刷新数据页数(RocketMQ内部逻辑概念,和计算机系统的页无关)。GroupCommitService每次都做刷新都传入0,FlushRealTimeService则按照规则进行计算出页数。我这边按照原逻辑改写的容易理解的伪代码:

1
2
3
4
5
int flushPhysicQueueLeastPages = 4;
if (currentTimeMillis >= (lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
flushPhysicQueueLeastPages = 0;
}
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);

这段代码避免了短时间内进行多次全量刷盘,从而提高了刷盘效率和性能呢。正因为异步刷盘不是每次都是全量刷盘,从这个角度来看才会被称为异步刷盘,其实本质上都是异步进行刷盘的。

transientStorePoolEnable

我们在上述刷盘代码中看到了此配置项。该配置项是为了提高IO性能,但是在Broker JVM进程异常退出的时候增加丢消息的风险。感兴趣的同学可以看这篇文章

[ RocketMQ源码阅读 6 ] Broker磁盘文件格式与作用

Broker的功能点很多,安装程序启动的顺序去看源码,发现代码量比之前的组件要大很多。阅读过程中发现Broker会去持久化一些配置,并且会将消息数据存储在磁盘上。
整理和检索了网上的一些资料,列出了这些文件和相应的作用,如下。

  • store
    • commitlog
      • 000000000
      • xxxxxxxxxx
    • compaction
      • compactionLog
        • {topic}
          • 0
          • 1
          • {queueId}
      • compactionCq
        • {topic}
          • 0
          • 1
          • {queueId}
    • config
      • delayOffset.json
      • broker.properties
      • topics.json
      • topicQueueMapping.json
      • consumerOffset.json
      • lmqConsumerOffset.json
      • consumerOrderInfo.json
      • subscriptionGroup.json
      • timercheck
      • timermetrics
      • consumerFilter.json
      • messageRequestMode.json
      • tieredStoreMetadata.json
    • consumequeue
      • {topic}
        • 0
          • 00000000000000000000
        • 1
        • {queueId}
    • index
      • 20240305101010000
    • abort
    • checkpoint
    • lock
    • timerwheel
    • timerlog
      • 00000000000000000000

Read More

中年人的第一台NAS

最近闲来无事,想体验一下NAS的玩法。那么说干就干。

发扬垃圾佬的精神,追求极致性价比。性能、功耗、价格不可能三角,尽量把钱花在刀刃上。考虑静音,所以必须要no fans。

网上查了一些资料,大概看了如下几款cpu。

cpu 制程 主频 TDP 价格
J3160 14nm 4核4线程 1.6GHz 6w 板u 100 rmb左右
J1900 22nm 4核4线程 2.42GHz 10w 整个小主机或者软路由某鱼200rmb
N5095 10nm 4核4线程 2GHz 15w 板U 538rmb

从这3个cpu看,J3160不论功耗还是价格,对我来说是最为合适的。而且ddr3的内存也是价格实惠。功耗比较喜人才6w。算一下一年电费多少,假设整机功耗10w:


10 * 24 * 365 * 0.6元/1000 = 52元

相当喜人。。。

配件 来源 价格
铭瑄N3160 闲鱼 106
动力之星dc转atx电源转换卡 拼多多 43
悠品12V5Adc电源 京东一手 38
ddr3 1600内存条8GB台式机内存 闲鱼 43
开机按钮 拼多多 5
硬盘120GB SSD 闲鱼 43
8GB u盘 安装黑群晖 京东 14
SATA线*2 京东 9
总计 301

Read More