之前分析客户端的代码,发现Naming相关的后端调用主要有这几个,代码为NamingClientProxyDelegate这个class:
registerService, deregisterService (grpc和http 两种实现)
subscribe, unsubscribe grpc实现
getServiceList grpc实现
我们先去看Naming Server的grpc实现
registerService 代码调用链路,grpc方式注册的服务,默认为临时服务。
InstanceRequestHandler#handle(InstanceRequest request, RequestMeta meta)
1 2 3 4 5 6 7 8 9 10 11 12 13 public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException { Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true); InstanceUtil.setInstanceIdIfEmpty(request.getInstance(), service.getGroupedServiceName()); switch (request.getType()) { case NamingRemoteConstants.REGISTER_INSTANCE: return registerInstance(service, request, meta); // 此处继续看下去 case NamingRemoteConstants.DE_REGISTER_INSTANCE: return deregisterInstance(service, request, meta); default: throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType())); } }
InstanceRequestHandler#registerInstance(Service service, InstanceRequest request, RequestMeta meta)
1 2 3 4 5 6 7 8 9 private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) throws NacosException { clientOperationService .registerInstance(service, request.getInstance(), meta.getConnectionId()); // 此处继续看下去 NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(), meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(), request.getInstance().getIp(), request.getInstance().getPort())); return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE); }
EphemeralClientOperationServiceImpl#registerInstance(Service service, Instance instance, String clientId) 默认注册的临时服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public void registerInstance(Service service, Instance instance, String clientId) throws NacosException { NamingUtils.checkInstanceIsLegal(instance);//校验实例是否过期 Service singleton = ServiceManager .getInstance().getSingleton(service); //获取单例服务 if (!singleton.isEphemeral()) { throw new NacosRuntimeException(NacosException.INVALID_PARAM, String.format("Current service %s is persistent service, can't register ephemeral instance.", singleton.getGroupedServiceName())); } Client client = clientManager.getClient(clientId);//获取已经连接的客户端信息 checkClientIsLegal(client, clientId); //校验是否是临时实例 InstancePublishInfo instanceInfo = getPublishInfo(instance);//增加一些额外信息,对象转换 client.addServiceInstance(singleton, instanceInfo);// 此处继续看下去 client.setLastUpdatedTime(); client.recalculateRevision(); NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId)); NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false)); }
AbstractClient#addServiceInstance(Service service, InstancePublishInfo instanceInfo)
1 2 3 4 5 6 7 8 9 10 11 12 13 public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) { if (instancePublishInfo instanceof BatchInstancePublishInfo) { InstancePublishInfo old = publishers.put(service, instancePublishInfo); MetricsMonitor.incrementIpCountWithBatchRegister(old, (BatchInstancePublishInfo) instancePublishInfo); } else { if (null == publishers.put(service, instancePublishInfo)) { MetricsMonitor.incrementInstanceCount(); } } NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this)); Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId()); return true; }
这里最终将实例信息,保存到一个map ,做了metric统计和发布一个事件,上述列出的代码总共发布了4个事件:
RegisterInstanceTraceEvent 这种TraceEvent类事件作为Nacos的扩展机制使用,需要自己定制处理逻辑
ClientOperationEvent.ClientRegisterServiceEvent
MetadataEvent.InstanceMetadataEvent
ClientEvent.ClientChangedEvent
还有一大部分的逻辑,藏在了这些事件处理代码中。这里涉及到Nacos另一个组件NotifyCenter,事件通知模块,用以异步处理事件。我们这边暂时不讲这一块,直接去看最终的处理逻辑是啥。
ClientOperationEvent.ClientRegisterServiceEvent 事件处理代码位置:ClientServiceIndexesManager#onEvent()->handleClientOperation( )->addPublisherIndexes(*)
1 2 3 4 private void addPublisherIndexes(Service service, String clientId) { publisherIndexes.computeIfAbsent(service, key -> new ConcurrentHashSet<>()).add(clientId); // 数据落内存,做kv索引 NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true)); //继续看这个事件触发了啥逻辑 }
ServiceChangedEvent处理逻辑,通知所有订阅该服务的客户端去更新服务实例。这里就和之前说的客户端订阅逻辑对应起来了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public void onEvent(Event event) { if (event instanceof ServiceEvent.ServiceChangedEvent) { // 推送所有订阅者 // If service changed, push to all subscribers. ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event; Service service = serviceChangedEvent.getService(); delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay())); MetricsMonitor.incrementServiceChangeCount(service); } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) { //发生订阅事件,之推送给订阅的客户端,相当于初始化推送 // If service is subscribed by one client, only push this client. ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event; Service service = subscribedEvent.getService(); delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(), subscribedEvent.getClientId())); } }
这里的delayTaskEngine,是PushDelayTaskExecuteEngine类的实例 。真正执行推送逻辑的调用链路,简单展示出来为:
1 public class PushDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {}
NacosDelayTaskExecuteEngine.processingExecutor –>ProcessRunnable#run –>processTasks() –>PushDelayTaskProcessor#process() –>PushExecuteTask#run() –>PushExecutorDelegate#doPushWithCallback( ) //这里继续看下去
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data, NamingPushCallback callBack) { getPushExecuteService(clientId, subscriber).doPushWithCallback(clientId, subscriber, data, callBack); } private PushExecutor getPushExecuteService(String clientId, Subscriber subscriber) { Optional<SpiPushExecutor> result = SpiImplPushExecutorHolder.getInstance() .findPushExecutorSpiImpl(clientId, subscriber); if (result.isPresent()) { return result.get(); } // use nacos default push executor return clientId.contains(IpPortBasedClient.ID_DELIMITER) ? udpPushExecuteService : rpcPushExecuteService; }
这里的push实现有两种协议,udp(Nacos 1.x)和grpc(Nacos 2.x)两种。 这里的推送实现,用到了大量的线程池技术,来最大程度利用cpu资源,提高推送性能。
该事件处理类为NamingMetadataManager,主要执行逻辑是维护本地内存的状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void handleInstanceMetadataEvent(MetadataEvent.InstanceMetadataEvent event) { Service service = event.getService(); String metadataId = event.getMetadataId(); if (containInstanceMetadata(service, metadataId)) { updateExpiredInfo(event.isExpired(), ExpiredMetadataInfo.newExpiredInstanceMetadata(event.getService(), event.getMetadataId())); } } private void updateExpiredInfo(boolean expired, ExpiredMetadataInfo expiredMetadataInfo) { if (expired) { expiredMetadataInfos.add(expiredMetadataInfo); } else { expiredMetadataInfos.remove(expiredMetadataInfo); } }
ClientEvent.ClientChangedEvent 处理类为DistroClientDataProcessor,该事件的处理逻辑是,会将新增加的客户端,通知到其他节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 //DistroClientDataProcessor.class private void syncToAllServer(ClientEvent event) { Client client = event.getClient(); if (isInvalidClient(client)) { return; } if (event instanceof ClientEvent.ClientDisconnectEvent) { DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.DELETE); } else if (event instanceof ClientEvent.ClientChangedEvent) { DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.CHANGE);//继续往下看 } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 //DistroProtocol.class public void sync(DistroKey distroKey, DataOperation action, long delay) { for (Member each : memberManager.allMembersWithoutSelf()) { syncToTarget(distroKey, action, each.getAddress(), delay); } } public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) { DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), targetServer); DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer); } }
Distro 值得另外起一篇来介绍,这是Nacos同步临时服务实例信息的协议(类似Gossip协议),这里先不展开。最终的逻辑在下面的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 //DistroDelayTaskProcessor#process(*) public boolean process(NacosTask task) { if (!(task instanceof DistroDelayTask)) { return true; } DistroDelayTask distroDelayTask = (DistroDelayTask) task; DistroKey distroKey = distroDelayTask.getDistroKey(); switch (distroDelayTask.getAction()) { case DELETE: DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder); distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask); return true; case CHANGE: case ADD: DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);//继续往下看 distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask); return true; default: return false; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 //DistroSyncChangeTask.class @Override protected boolean doExecute() { String type = getDistroKey().getResourceType(); DistroData distroData = getDistroData(type); if (null == distroData) { Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString()); return true; } return getDistroComponentHolder().findTransportAgent(type) .syncData(distroData, getDistroKey().getTargetServer());//继续往下看 }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 //DistroClientTransportAgent.class public boolean syncData(DistroData data, String targetServer) { if (isNoExistTarget(targetServer)) { return true; } DistroDataRequest request = new DistroDataRequest(data, data.getType()); Member member = memberManager.find(targetServer); if (checkTargetServerStatusUnhealthy(member)) { Loggers.DISTRO .warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy, key: {}", targetServer, data.getDistroKey()); return false; } try { Response response = clusterRpcClientProxy.sendRequest(member, request); return checkResponse(response); } catch (NacosException e) { Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! key: {}", data.getDistroKey(), e); } return false; }
deregisterService 1 2 3 4 5 6 7 8 9 //InstanceRequestHandler.class private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) { clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId()); NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(System.currentTimeMillis(), meta.getClientIp(), true, DeregisterInstanceReason.REQUEST, service.getNamespace(), service.getGroup(), service.getName(), request.getInstance().getIp(), request.getInstance().getPort())); return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 //EphemeralClientOperationServiceImpl.class @Override public void deregisterInstance(Service service, Instance instance, String clientId) { if (!ServiceManager.getInstance().containSingleton(service)) { Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", service); return; } Service singleton = ServiceManager.getInstance().getSingleton(service); Client client = clientManager.getClient(clientId); checkClientIsLegal(client, clientId); InstancePublishInfo removedInstance = client.removeServiceInstance(singleton); //这里发布了ClientEvent.ClientChangedEvent事件 client.setLastUpdatedTime(); client.recalculateRevision(); if (null != removedInstance) { NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId)); NotifyCenter.publishEvent( new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getMetadataId(), true)); } }
这里发布了3个事件:
DeregisterInstanceTraceEvent 没有具体处理逻辑,需要用户自行实现
ClientOperationEvent.ClientDeregisterServiceEvent 1 2 3 4 5 6 7 8 //ClientServiceIndexesManager.class private void removePublisherIndexes(Service service, String clientId) { publisherIndexes.computeIfPresent(service, (s, ids) -> { ids.remove(clientId); //维护内存数据 NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true)); //这个事件上述有提到,通知所有订阅者,更新服务实例信息 return ids.isEmpty() ? null : ids; }); }
MetadataEvent.InstanceMetadataEvent //上述已有提及,主要是维护内存数据 4.ClientEvent.ClientChangedEvent 上述已经有提及
总结一下,服务(临时服务)注册注销,主要都是通过事件通知机制实现。不管注册,注销动作,都其实做了如下的事情:
维护内存服务信息
通知订阅该服务的客户端更新实例信息
同步到其他Nacos节点