Nacos Naming Server源码分析-服务订阅

服务订阅的目的是为了,能够第一时间感知到被调用服务的实例变化,从而能够及时更新本地缓存,避免调用失败。

subscribe和unsubscribe的逻辑,只有临时实例才有,永久实例是不存在订阅逻辑的。所以还是比较推荐用临时类型的服务注册,这也是官方客户端默认的类型

看一下服务调用的逻辑如何实现的:

SubscribeServiceRequestHandler#handle(*)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
String namespaceId = request.getNamespace();
String serviceName = request.getServiceName();
String groupName = request.getGroupName();
String app = request.getHeader("app", "unknown");
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
Service service = Service.newService(namespaceId, groupName, serviceName, true);
Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),
namespaceId, groupedServiceName, 0, request.getClusters());
ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,
true, subscriber.getIp());//做了按照cluster过滤。并且还存在防止服务实例过少,被调用方打爆的保护措施
if (request.isSubscribe()) {
clientOperationService.subscribeService(service, subscriber, meta.getConnectionId()); //继续往下看
NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
} else {
clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
}
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}

-> EphemeralClientOperationServiceImpl#subscribe(*)

1
2
3
4
5
6
7
8
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
Client client = clientManager.getClient(clientId);
checkClientIsLegal(client, clientId);
client.addServiceSubscriber(singleton, subscriber); //继续往下看
client.setLastUpdatedTime();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}

和上一篇一样,大部头的逻辑都是通过事件机制驱动的:

  1. SubscribeServiceTraceEvent //处理逻辑留给用户进行扩展,所以没啥好讲的
  2. ClientOperationEvent.ClientSubscribeServiceEvent 处理函数如下
    1
    2
    3
    4
    5
    6
    7
    8
    //ClientServiceIndexesManager.class
    private void addSubscriberIndexes(Service service, String clientId) {
    Set<String> clientIds = subscriberIndexes.computeIfAbsent(service, key -> new ConcurrentHashSet<>());
    // Fix #5404, Only first time add need notify event.
    if (clientIds.add(clientId)) {//客户端第一次订阅服务,才会触发事件
    NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
    }
    }
  3. ServiceEvent.ServiceSubscribedEvent
    这里也就是客户端第一次订阅该服务,才会触发该事件,做一次服务实例信息推送。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    //NamingSubscriberServiceV2Impl.class
    public void onEvent(Event event) {
    if (event instanceof ServiceEvent.ServiceChangedEvent) {
    // If service changed, push to all subscribers.
    ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
    Service service = serviceChangedEvent.getService();
    delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
    MetricsMonitor.incrementServiceChangeCount(service);
    } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
    // If service is subscribed by one client, only push this client.
    ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
    Service service = subscribedEvent.getService();
    delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
    subscribedEvent.getClientId()));
    }
    }

总结:

  1. 服务订阅功能的目的是避免服务调用失败
  2. 服务订阅第一次发起,服务端会全量推一次订阅服务所有实例信息