向Nacos注册服务,存在两种类型。临时(Ephemeral)服务和持久(Persistent)服务。服务信息的存储和节点间同步,分别基于Distro协议和Raft协议。
Distro协议
可以从DistroClientComponentRegistry入手去阅读源代码,下面这些成员变量,都是用来实现协议的。
- ServerMemberManager 服务端成员管理器,有3种模式分别是:从磁盘文件读取,从指定地址读取,单点模式
- DistroProtocol
- DistroComponentHolder 各组件组合器
- DistroTaskEngineHolder task任务异步执行器
DistroProtocol
看下来主要有两个功能点
1 2 3 4 5 6 7 8
| private void startDistroTask() { if (EnvUtil.getStandaloneMode()) { isInitialized = true; return; } startVerifyTask(); // 1 startLoadTask(); // 2 }
|
startVerifyTask()
startVerifyTask()
-> DistroVerifyTimedTask#run()
-> DistroVerifyExecuteTask#run()
发送方,发送:
获取当前节点所有客户端信息,获取所有Nacos Server节点。将客户端信息发送到所有节点,进行校验。我们看下具体校验逻辑是啥?
接收方,校验逻辑:
DistroDataRequestHandler#handle()
->handleVerify()
->DistroProtocol#onVerify()
->DistroClientDataProcessor#processVerifyData()
->EphemeralIpPortClientManager#verifyClient(*)
比较一下内存中版本号,如果版本号不一致,就更新内存中服务实例信息。
发送方,处理校验结果:
DistroVerifyCallbackWrapper#onResponse(*)
1 2 3 4 5 6 7 8 9 10 11
| public void onResponse(Response response) { if (checkResponse(response)) { NamingTpsMonitor.distroVerifySuccess(member.getAddress(), member.getIp()); distroCallback.onSuccess(); } else { Loggers.DISTRO.info("Target {} verify client {} failed, sync new client", targetServer, clientId); NotifyCenter.publishEvent(new ClientEvent.ClientVerifyFailedEvent(clientId, targetServer)); //往下看 NamingTpsMonitor.distroVerifyFail(member.getAddress(), member.getIp()); distroCallback.onFailed(null); } }
|
除了记录log和metric,就是发布了一个ClientEvent.ClientVerifyFailedEvent事件。该事件最终触发同步数据请求,发送到校验失败的服务器。
收到请求后的处理逻辑在这里:
DistroDataRequestHandler#handle()
->handleSyncData()
-> DistroProtocol#onReceive()
-> DistroClientDataProcessor#handlerClientSyncData() ->upgradeClient(*)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| private void upgradeClient(Client client, ClientSyncData clientSyncData) { Set<Service> syncedService = new HashSet<>(); // process batch instance sync logic processBatchInstanceDistroData(syncedService, client, clientSyncData); List<String> namespaces = clientSyncData.getNamespaces(); List<String> groupNames = clientSyncData.getGroupNames(); List<String> serviceNames = clientSyncData.getServiceNames(); List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos(); for (int i = 0; i < namespaces.size(); i++) { Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i)); Service singleton = ServiceManager.getInstance().getSingleton(service); syncedService.add(singleton); InstancePublishInfo instancePublishInfo = instances.get(i); if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) { client.addServiceInstance(singleton, instancePublishInfo); NotifyCenter.publishEvent( new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId())); NotifyCenter.publishEvent( new MetadataEvent.InstanceMetadataEvent(singleton, instancePublishInfo.getMetadataId(), false)); } } for (Service each : client.getAllPublishedService()) { if (!syncedService.contains(each)) { client.removeServiceInstance(each); NotifyCenter.publishEvent( new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId())); } } client.setRevision(clientSyncData.getAttributes().<Integer>getClientAttribute(ClientConstants.REVISION, 0));//更新客户端信息版本号 }
|
这里显示将收到的数据进行封装,然后维护更新内存种数据。同时发出了3个事件:
- ClientRegisterServiceEvent //通知订阅者更新服务实例信息
- InstanceMetadataEvent //维护更新内存的Metadata
- ClientDeregisterServiceEvent //维护内存数据,通知订阅者更新服务信息
startLoadTask()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private void load() throws Exception { while (memberManager.allMembersWithoutSelf().isEmpty()) { Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init..."); TimeUnit.SECONDS.sleep(1); } while (distroComponentHolder.getDataStorageTypes().isEmpty()) { Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register..."); TimeUnit.SECONDS.sleep(1); } for (String each : distroComponentHolder.getDataStorageTypes()) { if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) { loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));//从远端拉取一次全量数据 } } }
|