Nacos Naming Server源码分析-服务注册注销

之前分析客户端的代码,发现Naming相关的后端调用主要有这几个,代码为NamingClientProxyDelegate这个class:

  1. registerService, deregisterService (grpc和http 两种实现)
  2. subscribe, unsubscribe grpc实现
  3. getServiceList grpc实现

我们先去看Naming Server的grpc实现

registerService

代码调用链路,grpc方式注册的服务,默认为临时服务。

  1. InstanceRequestHandler#handle(InstanceRequest request, RequestMeta meta)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
    Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
    InstanceUtil.setInstanceIdIfEmpty(request.getInstance(), service.getGroupedServiceName());
    switch (request.getType()) {
    case NamingRemoteConstants.REGISTER_INSTANCE:
    return registerInstance(service, request, meta); // 此处继续看下去
    case NamingRemoteConstants.DE_REGISTER_INSTANCE:
    return deregisterInstance(service, request, meta);
    default:
    throw new NacosException(NacosException.INVALID_PARAM,
    String.format("Unsupported request type %s", request.getType()));
    }
    }
  2. InstanceRequestHandler#registerInstance(Service service, InstanceRequest request, RequestMeta meta)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
    throws NacosException {
    clientOperationService
    .registerInstance(service, request.getInstance(), meta.getConnectionId()); // 此处继续看下去
    NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
    meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
    request.getInstance().getIp(), request.getInstance().getPort()));
    return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }
  3. EphemeralClientOperationServiceImpl#registerInstance(Service service, Instance instance, String clientId)
    默认注册的临时服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);//校验实例是否过期

Service singleton = ServiceManager
.getInstance().getSingleton(service); //获取单例服务
if (!singleton.isEphemeral()) {
throw new NacosRuntimeException(NacosException.INVALID_PARAM,
String.format("Current service %s is persistent service, can't register ephemeral instance.",
singleton.getGroupedServiceName()));
}
Client client = clientManager.getClient(clientId);//获取已经连接的客户端信息
checkClientIsLegal(client, clientId); //校验是否是临时实例
InstancePublishInfo instanceInfo = getPublishInfo(instance);//增加一些额外信息,对象转换
client.addServiceInstance(singleton, instanceInfo);// 此处继续看下去
client.setLastUpdatedTime();
client.recalculateRevision();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
  1. AbstractClient#addServiceInstance(Service service, InstancePublishInfo instanceInfo)
1
2
3
4
5
6
7
8
9
10
11
12
13
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
if (instancePublishInfo instanceof BatchInstancePublishInfo) {
InstancePublishInfo old = publishers.put(service, instancePublishInfo);
MetricsMonitor.incrementIpCountWithBatchRegister(old, (BatchInstancePublishInfo) instancePublishInfo);
} else {
if (null == publishers.put(service, instancePublishInfo)) {
MetricsMonitor.incrementInstanceCount();
}
}
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
return true;
}

这里最终将实例信息,保存到一个map,做了metric统计和发布一个事件,上述列出的代码总共发布了4个事件:

  1. RegisterInstanceTraceEvent 这种TraceEvent类事件作为Nacos的扩展机制使用,需要自己定制处理逻辑
  2. ClientOperationEvent.ClientRegisterServiceEvent
  3. MetadataEvent.InstanceMetadataEvent
  4. ClientEvent.ClientChangedEvent

还有一大部分的逻辑,藏在了这些事件处理代码中。这里涉及到Nacos另一个组件NotifyCenter,事件通知模块,用以异步处理事件。我们这边暂时不讲这一块,直接去看最终的处理逻辑是啥。

ClientOperationEvent.ClientRegisterServiceEvent

事件处理代码位置:ClientServiceIndexesManager#onEvent()->handleClientOperation()->addPublisherIndexes(*)

1
2
3
4
private void addPublisherIndexes(Service service, String clientId) {
publisherIndexes.computeIfAbsent(service, key -> new ConcurrentHashSet<>()).add(clientId); // 数据落内存,做kv索引
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true)); //继续看这个事件触发了啥逻辑
}

ServiceChangedEvent处理逻辑,通知所有订阅该服务的客户端去更新服务实例。这里就和之前说的客户端订阅逻辑对应起来了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void onEvent(Event event) {
if (event instanceof ServiceEvent.ServiceChangedEvent) { // 推送所有订阅者
// If service changed, push to all subscribers.
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
delayTaskEngine.addTask(service,
new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
MetricsMonitor.incrementServiceChangeCount(service);
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) { //发生订阅事件,之推送给订阅的客户端,相当于初始化推送
// If service is subscribed by one client, only push this client.
ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
Service service = subscribedEvent.getService();
delayTaskEngine.addTask(service,
new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(), subscribedEvent.getClientId()));
}
}

这里的delayTaskEngine,是PushDelayTaskExecuteEngine类的实例。真正执行推送逻辑的调用链路,简单展示出来为:

1
public class PushDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {}

NacosDelayTaskExecuteEngine.processingExecutor
–>ProcessRunnable#run
–>processTasks()
–>PushDelayTaskProcessor#process()
–>PushExecuteTask#run()
–>PushExecutorDelegate#doPushWithCallback(
) //这里继续看下去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data,
NamingPushCallback callBack) {
getPushExecuteService(clientId, subscriber).doPushWithCallback(clientId, subscriber, data, callBack);
}

private PushExecutor getPushExecuteService(String clientId, Subscriber subscriber) {
Optional<SpiPushExecutor> result = SpiImplPushExecutorHolder.getInstance()
.findPushExecutorSpiImpl(clientId, subscriber);
if (result.isPresent()) {
return result.get();
}
// use nacos default push executor
return clientId.contains(IpPortBasedClient.ID_DELIMITER) ? udpPushExecuteService : rpcPushExecuteService;
}

这里的push实现有两种协议,udp(Nacos 1.x)和grpc(Nacos 2.x)两种。这里的推送实现,用到了大量的线程池技术,来最大程度利用cpu资源,提高推送性能。

MetadataEvent.InstanceMetadataEvent

该事件处理类为NamingMetadataManager,主要执行逻辑是维护本地内存的状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void handleInstanceMetadataEvent(MetadataEvent.InstanceMetadataEvent event) {
Service service = event.getService();
String metadataId = event.getMetadataId();
if (containInstanceMetadata(service, metadataId)) {
updateExpiredInfo(event.isExpired(),
ExpiredMetadataInfo.newExpiredInstanceMetadata(event.getService(), event.getMetadataId()));
}
}

private void updateExpiredInfo(boolean expired, ExpiredMetadataInfo expiredMetadataInfo) {
if (expired) {
expiredMetadataInfos.add(expiredMetadataInfo);
} else {
expiredMetadataInfos.remove(expiredMetadataInfo);
}
}

ClientEvent.ClientChangedEvent

处理类为DistroClientDataProcessor,该事件的处理逻辑是,会将新增加的客户端,通知到其他节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//DistroClientDataProcessor.class
private void syncToAllServer(ClientEvent event) {
Client client = event.getClient();
if (isInvalidClient(client)) {
return;
}
if (event instanceof ClientEvent.ClientDisconnectEvent) {
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.DELETE);
} else if (event instanceof ClientEvent.ClientChangedEvent) {
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.CHANGE);//继续往下看
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//DistroProtocol.class
public void sync(DistroKey distroKey, DataOperation action, long delay) {
for (Member each : memberManager.allMembersWithoutSelf()) {
syncToTarget(distroKey, action, each.getAddress(), delay);
}
}

public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
targetServer);
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
}
}

Distro值得另外起一篇来介绍,这是Nacos同步临时服务实例信息的协议(类似Gossip协议),这里先不展开。最终的逻辑在下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//DistroDelayTaskProcessor#process(*)
public boolean process(NacosTask task) {
if (!(task instanceof DistroDelayTask)) {
return true;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
switch (distroDelayTask.getAction()) {
case DELETE:
DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
return true;
case CHANGE:
case ADD:
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);//继续往下看
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
default:
return false;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
//DistroSyncChangeTask.class

@Override
protected boolean doExecute() {
String type = getDistroKey().getResourceType();
DistroData distroData = getDistroData(type);
if (null == distroData) {
Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
return true;
}
return getDistroComponentHolder().findTransportAgent(type)
.syncData(distroData, getDistroKey().getTargetServer());//继续往下看
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//DistroClientTransportAgent.class

public boolean syncData(DistroData data, String targetServer) {
if (isNoExistTarget(targetServer)) {
return true;
}
DistroDataRequest request = new DistroDataRequest(data, data.getType());
Member member = memberManager.find(targetServer);
if (checkTargetServerStatusUnhealthy(member)) {
Loggers.DISTRO
.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy, key: {}", targetServer,
data.getDistroKey());
return false;
}
try {
Response response = clusterRpcClientProxy.sendRequest(member, request);
return checkResponse(response);
} catch (NacosException e) {
Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! key: {}", data.getDistroKey(), e);
}
return false;
}

deregisterService

1
2
3
4
5
6
7
8
9
//InstanceRequestHandler.class

private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), true, DeregisterInstanceReason.REQUEST, service.getNamespace(),
service.getGroup(), service.getName(), request.getInstance().getIp(), request.getInstance().getPort()));
return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//EphemeralClientOperationServiceImpl.class
@Override
public void deregisterInstance(Service service, Instance instance, String clientId) {
if (!ServiceManager.getInstance().containSingleton(service)) {
Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", service);
return;
}
Service singleton = ServiceManager.getInstance().getSingleton(service);
Client client = clientManager.getClient(clientId);
checkClientIsLegal(client, clientId);
InstancePublishInfo removedInstance = client.removeServiceInstance(singleton); //这里发布了ClientEvent.ClientChangedEvent事件
client.setLastUpdatedTime();
client.recalculateRevision();
if (null != removedInstance) {
NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId));
NotifyCenter.publishEvent(
new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getMetadataId(), true));
}
}

这里发布了3个事件:

  1. DeregisterInstanceTraceEvent 没有具体处理逻辑,需要用户自行实现
  2. ClientOperationEvent.ClientDeregisterServiceEvent
    1
    2
    3
    4
    5
    6
    7
    8
    //ClientServiceIndexesManager.class
    private void removePublisherIndexes(Service service, String clientId) {
    publisherIndexes.computeIfPresent(service, (s, ids) -> {
    ids.remove(clientId); //维护内存数据
    NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true)); //这个事件上述有提到,通知所有订阅者,更新服务实例信息
    return ids.isEmpty() ? null : ids;
    });
    }
  3. MetadataEvent.InstanceMetadataEvent //上述已有提及,主要是维护内存数据
    4.ClientEvent.ClientChangedEvent 上述已经有提及

总结一下,服务(临时服务)注册注销,主要都是通过事件通知机制实现。不管注册,注销动作,都其实做了如下的事情:

  1. 维护内存服务信息
  2. 通知订阅该服务的客户端更新实例信息
  3. 同步到其他Nacos节点