Nacos Naming Server源码分析-distro协议实现

向Nacos注册服务,存在两种类型。临时(Ephemeral)服务和持久(Persistent)服务。服务信息的存储和节点间同步,分别基于Distro协议和Raft协议。

Distro协议

可以从DistroClientComponentRegistry入手去阅读源代码,下面这些成员变量,都是用来实现协议的。

  1. ServerMemberManager 服务端成员管理器,有3种模式分别是:从磁盘文件读取,从指定地址读取,单点模式
  2. DistroProtocol
  3. DistroComponentHolder 各组件组合器
  4. DistroTaskEngineHolder task任务异步执行器

DistroProtocol

看下来主要有两个功能点

1
2
3
4
5
6
7
8
private void startDistroTask() {
if (EnvUtil.getStandaloneMode()) {
isInitialized = true;
return;
}
startVerifyTask(); // 1
startLoadTask(); // 2
}

startVerifyTask()

startVerifyTask()
-> DistroVerifyTimedTask#run()
-> DistroVerifyExecuteTask#run()

发送方,发送:
获取当前节点所有客户端信息,获取所有Nacos Server节点。将客户端信息发送到所有节点,进行校验。我们看下具体校验逻辑是啥?

接收方,校验逻辑:
DistroDataRequestHandler#handle()
->handleVerify(
)
->DistroProtocol#onVerify()
->DistroClientDataProcessor#processVerifyData(
)
->EphemeralIpPortClientManager#verifyClient(*)

比较一下内存中版本号,如果版本号不一致,就更新内存中服务实例信息。

发送方,处理校验结果:
DistroVerifyCallbackWrapper#onResponse(*)

1
2
3
4
5
6
7
8
9
10
11
public void onResponse(Response response) {
if (checkResponse(response)) {
NamingTpsMonitor.distroVerifySuccess(member.getAddress(), member.getIp());
distroCallback.onSuccess();
} else {
Loggers.DISTRO.info("Target {} verify client {} failed, sync new client", targetServer, clientId);
NotifyCenter.publishEvent(new ClientEvent.ClientVerifyFailedEvent(clientId, targetServer)); //往下看
NamingTpsMonitor.distroVerifyFail(member.getAddress(), member.getIp());
distroCallback.onFailed(null);
}
}

除了记录log和metric,就是发布了一个ClientEvent.ClientVerifyFailedEvent事件。该事件最终触发同步数据请求,发送到校验失败的服务器。
收到请求后的处理逻辑在这里:
DistroDataRequestHandler#handle()
->handleSyncData(
)
-> DistroProtocol#onReceive()
-> DistroClientDataProcessor#handlerClientSyncData(
) ->upgradeClient(*)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void upgradeClient(Client client, ClientSyncData clientSyncData) {
Set<Service> syncedService = new HashSet<>();
// process batch instance sync logic
processBatchInstanceDistroData(syncedService, client, clientSyncData);
List<String> namespaces = clientSyncData.getNamespaces();
List<String> groupNames = clientSyncData.getGroupNames();
List<String> serviceNames = clientSyncData.getServiceNames();
List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();

for (int i = 0; i < namespaces.size(); i++) {
Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
Service singleton = ServiceManager.getInstance().getSingleton(service);
syncedService.add(singleton);
InstancePublishInfo instancePublishInfo = instances.get(i);
if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
client.addServiceInstance(singleton, instancePublishInfo);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
NotifyCenter.publishEvent(
new MetadataEvent.InstanceMetadataEvent(singleton, instancePublishInfo.getMetadataId(), false));
}
}
for (Service each : client.getAllPublishedService()) {
if (!syncedService.contains(each)) {
client.removeServiceInstance(each);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
}
}
client.setRevision(clientSyncData.getAttributes().<Integer>getClientAttribute(ClientConstants.REVISION, 0));//更新客户端信息版本号
}

这里显示将收到的数据进行封装,然后维护更新内存种数据。同时发出了3个事件:

  1. ClientRegisterServiceEvent //通知订阅者更新服务实例信息
  2. InstanceMetadataEvent //维护更新内存的Metadata
  3. ClientDeregisterServiceEvent //维护内存数据,通知订阅者更新服务信息

startLoadTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void load() throws Exception {
while (memberManager.allMembersWithoutSelf().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
}
while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
}
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));//从远端拉取一次全量数据
}
}
}