Nacos Naming Server源码分析-NamingGrpcClientProxy启动阶段

NamingGrpcClientProxy是Nacos SDK中,Grpc调用的主要实现类。话不多说,看一下类图:

NamingGrpcClientProxy

该类的启动过程

NamingGrpcClientProxy#NamingGrpcClientProxy
->start()
->RpcClient#start()

构造函数逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
super(securityProxy); //客户端鉴权
this.namespaceId = namespaceId;
this.uuid = UUID.randomUUID().toString();
this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
Map<String, String> labels = new HashMap<>();
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
labels.put(Constants.APPNAME, AppNameUtils.getAppName());
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels,
RpcClientTlsConfig.properties(properties.asProperties())); //创建grpc客户端 GrpcSdkClient,最终初始化RpcClient。并将客户端状态设置为INITIALIZED
this.redoService = new NamingGrpcRedoService(this, properties); //创建redoService
NAMING_LOGGER.info("Create naming rpc client for uuid->{}", uuid);
start(serverListFactory, serviceInfoHolder); //继续往下看
}
1
2
3
4
5
6
7
private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
rpcClient.serverListFactory(serverListFactory); //获取后端服务列表工厂类
rpcClient.registerConnectionListener(redoService);
rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder)); //监听服务实例变更
rpcClient.start(); //继续往下看
NotifyCenter.registerSubscriber(this);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public final void start() throws NacosException {
boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);//将状态设置为STARTING
if (!success) {
return;
}

clientEventExecutor = new ScheduledThreadPoolExecutor(2, new NameThreadFactory("com.alibaba.nacos.client.remote.worker"));

// connection event consumer.
clientEventExecutor.submit(() -> { //异步执行,向redoService通知连接创建和连接关闭事件
while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
ConnectionEvent take;
try {
take = eventLinkedBlockingQueue.take();
if (take.isConnected()) { //发生reconnect进行通知
notifyConnected(take.connection);
} else if (take.isDisConnected()) {//发生closeConnection进行通知
notifyDisConnected(take.connection);
}
} catch (Throwable e) {
// Do nothing
}
}
});

clientEventExecutor.submit(() -> {
while (true) {
try {
if (isShutdown()) {
break;
}
//在switchServerAsync会往reconnectionSignal队列添加ReconnectContext
ReconnectContext reconnectContext = reconnectionSignal.poll(rpcClientConfig.connectionKeepAlive(), TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
if (System.currentTimeMillis() - lastActiveTimeStamp >= rpcClientConfig.connectionKeepAlive()) {
boolean isHealthy = healthCheck();//健康检查
if (!isHealthy) {
//设置客户端状态UNHEALTHY,重置reconnectContext属性,等待后续重连
} else {//重置最后检查时间
lastActiveTimeStamp = System.currentTimeMillis();
continue;
}
}

}
if (reconnectContext.serverInfo != null) {
//检查重连ip是否在server列表里面
}
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
} catch (Throwable throwable) {
// Do nothing
}
}
});

Connection connectToServer = null;
rpcClientStatus.set(RpcClientStatus.STARTING);

int startUpRetryTimes = rpcClientConfig.retryTimes();
while (startUpRetryTimes >= 0 && connectToServer == null) {
try {
startUpRetryTimes--;
ServerInfo serverInfo = nextRpcServer();

LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}",
rpcClientConfig.name(), serverInfo);

connectToServer = connectToServer(serverInfo);//创建grpc连接
} catch (Throwable e) {
LoggerUtils.printIfWarnEnabled(LOGGER,
"[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}",
rpcClientConfig.name(), e.getMessage(), startUpRetryTimes, e);
}

}

if (connectToServer != null) {
LoggerUtils
.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}",
rpcClientConfig.name(), connectToServer.serverInfo.getAddress(),
connectToServer.getConnectionId());
this.currentConnection = connectToServer;
rpcClientStatus.set(RpcClientStatus.RUNNING); //连接成功,设置Running状态
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED, currentConnection));
} else {
switchServerAsync();//连接失败,进行重连
}

registerServerRequestHandler(new ConnectResetRequestHandler()); //处理来自服务端的ConnectResetRequest请求

// register client detection request.
registerServerRequestHandler((request, connection) -> { //处理ClientDetectionRequest请求
if (request instanceof ClientDetectionRequest) {
return new ClientDetectionResponse();
}

return null;
});

}

往reconnectionSignal队列添加ReconnectContext的场景(触发重新连接):

  1. ConnectResetRequestHandler,收到服务端ConnectResetRequest请求
  2. RpcClient#onServerListChange,监听到Server列表有变化
  3. 第一次启动start过程中,连接后端失败
  4. GrpcClient#bindRequestStream 创建双向流,从stream收到onCompleted和onError
  5. RpcClient发送请求失败

初始化阶段,sdk与server的请求

client->server ServerCheckRequest 同步单向请求(Unary请求)
建立BiStream连接
client->server ConnectionSetupRequest 服务端保存连接,发送请求SetupAckRequest
server->client SetupAckRequest
client->server SetupAckResponse

其他

server->client ConnectResetRequest server端主动触发客户端进行重连,起到负载均衡的作用
server->client ClientDetectionRequest server主动探活