[ Rust笔记 二] 基础-基本类型

bool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  fn main() {
let x = true;
let y: bool = !x; // 取反运算
let z = x && y; // 逻辑与,带短路功能
println!("{}", z);
let z = x || y; // 逻辑或,带短路功能
println!("{}", z);
let z = x & y; // 按位与,不带短路功能
println!("{}", z);
let z = x | y; // 按位或,不带短路功能
println!("{}", z);
let z = x ^ y; // 按位异或,不带短路功能
println!("{}", z);
}

char

1
2
3
4
5
6
7
8
9
10
11
12
13
   unicode
都是4个字节
let love = '❤'; // 可以直接嵌入任何unicode字符
转义符
let c1 = '\n'; // 换行符
let c2 = '\x7f'; // 8 bit字符变量
let c3 = '\u{7FFF}'; // unicode字符

都是1个字节
let x :u8 = 1;
let y :u8 = b'A';
let s :&[u8;5] = b"hello";
let r :&[u8;14] = br#"hello \n world"#;

整数类型

i8 u8, i16 u16 … i128, u128, isize, usize

1
2
3
4
5
6
7
8
9
let var1 : i32 = 32;      // 十进制表示
let var2 : i32 = 0xFF; // 以0x开头代表十六进制表示
let var3 : i32 = 0o55; // 以0o开头代表八进制表示
let var4 : i32 = 0b1001; // 以0b开头代表二进制表示
let var6 = 123usize; // i6变量是usize类型
let var7 = 0x_ff_u8; // i7变量是u8类型
let var8 = 32; // 不写类型,默认为 i32 类型

let var5 = 0x_1234_ABCD; // 使用下划线分割数字,不影响语义,但是极大地提升了阅读体验。

(pointer size)isize和usize占用字节数和32位或者64位系统有关系

浮点类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
   let f1 = 123.0f64;         // type f64
let f2 = 0.1f64; // type f64
let f3 = 0.1f32; // type f32
let f4 = 12E+99_f64; // type f64 科学计数法
let f5 : f64 = 2.; // type f64


enum FpCategory {
Nan,
Infinite,
Zero,
Subnormal,
Normal,
}

Read More

[ Rust笔记 一] 基础-变量与类型

变量

1
2
3
4
5
6
let variable : i32 = 100;

fn main() {
let x = 5;
x = 10;
}

这段代码会报错,变量是只读的。mut关键字申明的变量才是可写的。

1
2
let mut x = 5; // mut x: i32
x = 10;

mut x是一个“模式”​,我们还可以用这种方式同时声明多个变量

1
2
let (mut a, mut b) = (1, 2);
let Point { x: ref a, y: ref b} = p;

Rust中,每个变量必须被合理初始化之后才能被使用。
编译器会帮我们做一个执行路径的静态分析,确保变量在使用前一定被初始化:

1
2
3
4
5
6
7
8
9
fn test(condition: bool) {
let x: i32; // 声明 x,不必使用 mut 修饰
if condition {
x = 1; // 初始化 x,不需要 x 是 mut 的,因为这是初始化,不是修改
println!("{}", x);
}
// 如果条件不满足,x 没有被初始化
// 但是没关系,只要这里不使用 x 就没事
}

Read More

Nacos Naming Server源码分析-NamingGrpcClientProxy启动阶段

NamingGrpcClientProxy是Nacos SDK中,Grpc调用的主要实现类。话不多说,看一下类图:

NamingGrpcClientProxy

该类的启动过程

NamingGrpcClientProxy#NamingGrpcClientProxy
->start()
->RpcClient#start()

构造函数逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
super(securityProxy); //客户端鉴权
this.namespaceId = namespaceId;
this.uuid = UUID.randomUUID().toString();
this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
Map<String, String> labels = new HashMap<>();
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
labels.put(Constants.APPNAME, AppNameUtils.getAppName());
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels,
RpcClientTlsConfig.properties(properties.asProperties())); //创建grpc客户端 GrpcSdkClient,最终初始化RpcClient。并将客户端状态设置为INITIALIZED
this.redoService = new NamingGrpcRedoService(this, properties); //创建redoService
NAMING_LOGGER.info("Create naming rpc client for uuid->{}", uuid);
start(serverListFactory, serviceInfoHolder); //继续往下看
}

Read More

Nacos Naming Server源码分析-distro协议实现

向Nacos注册服务,存在两种类型。临时(Ephemeral)服务和持久(Persistent)服务。服务信息的存储和节点间同步,分别基于Distro协议和Raft协议。

Distro协议

可以从DistroClientComponentRegistry入手去阅读源代码,下面这些成员变量,都是用来实现协议的。

  1. ServerMemberManager 服务端成员管理器,有3种模式分别是:从磁盘文件读取,从指定地址读取,单点模式
  2. DistroProtocol
  3. DistroComponentHolder 各组件组合器
  4. DistroTaskEngineHolder task任务异步执行器

DistroProtocol

看下来主要有两个功能点

1
2
3
4
5
6
7
8
private void startDistroTask() {
if (EnvUtil.getStandaloneMode()) {
isInitialized = true;
return;
}
startVerifyTask(); // 1
startLoadTask(); // 2
}

startVerifyTask()

startVerifyTask()
-> DistroVerifyTimedTask#run()
-> DistroVerifyExecuteTask#run()

发送方,发送:
获取当前节点所有客户端信息,获取所有Nacos Server节点。将客户端信息发送到所有节点,进行校验。我们看下具体校验逻辑是啥?

接收方,校验逻辑:
DistroDataRequestHandler#handle()
->handleVerify(
)
->DistroProtocol#onVerify()
->DistroClientDataProcessor#processVerifyData(
)
->EphemeralIpPortClientManager#verifyClient(*)

比较一下内存中版本号,如果版本号不一致,就更新内存中服务实例信息。

发送方,处理校验结果:
DistroVerifyCallbackWrapper#onResponse(*)

1
2
3
4
5
6
7
8
9
10
11
public void onResponse(Response response) {
if (checkResponse(response)) {
NamingTpsMonitor.distroVerifySuccess(member.getAddress(), member.getIp());
distroCallback.onSuccess();
} else {
Loggers.DISTRO.info("Target {} verify client {} failed, sync new client", targetServer, clientId);
NotifyCenter.publishEvent(new ClientEvent.ClientVerifyFailedEvent(clientId, targetServer)); //往下看
NamingTpsMonitor.distroVerifyFail(member.getAddress(), member.getIp());
distroCallback.onFailed(null);
}
}

除了记录log和metric,就是发布了一个ClientEvent.ClientVerifyFailedEvent事件。该事件最终触发同步数据请求,发送到校验失败的服务器。
收到请求后的处理逻辑在这里:
DistroDataRequestHandler#handle()
->handleSyncData(
)
-> DistroProtocol#onReceive()
-> DistroClientDataProcessor#handlerClientSyncData(
) ->upgradeClient(*)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void upgradeClient(Client client, ClientSyncData clientSyncData) {
Set<Service> syncedService = new HashSet<>();
// process batch instance sync logic
processBatchInstanceDistroData(syncedService, client, clientSyncData);
List<String> namespaces = clientSyncData.getNamespaces();
List<String> groupNames = clientSyncData.getGroupNames();
List<String> serviceNames = clientSyncData.getServiceNames();
List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();

for (int i = 0; i < namespaces.size(); i++) {
Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
Service singleton = ServiceManager.getInstance().getSingleton(service);
syncedService.add(singleton);
InstancePublishInfo instancePublishInfo = instances.get(i);
if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
client.addServiceInstance(singleton, instancePublishInfo);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
NotifyCenter.publishEvent(
new MetadataEvent.InstanceMetadataEvent(singleton, instancePublishInfo.getMetadataId(), false));
}
}
for (Service each : client.getAllPublishedService()) {
if (!syncedService.contains(each)) {
client.removeServiceInstance(each);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
}
}
client.setRevision(clientSyncData.getAttributes().<Integer>getClientAttribute(ClientConstants.REVISION, 0));//更新客户端信息版本号
}

这里显示将收到的数据进行封装,然后维护更新内存种数据。同时发出了3个事件:

  1. ClientRegisterServiceEvent //通知订阅者更新服务实例信息
  2. InstanceMetadataEvent //维护更新内存的Metadata
  3. ClientDeregisterServiceEvent //维护内存数据,通知订阅者更新服务信息

startLoadTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void load() throws Exception {
while (memberManager.allMembersWithoutSelf().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
}
while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
}
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));//从远端拉取一次全量数据
}
}
}

Nacos Naming Server源码分析-服务订阅

服务订阅的目的是为了,能够第一时间感知到被调用服务的实例变化,从而能够及时更新本地缓存,避免调用失败。

subscribe和unsubscribe的逻辑,只有临时实例才有,永久实例是不存在订阅逻辑的。所以还是比较推荐用临时类型的服务注册,这也是官方客户端默认的类型

看一下服务调用的逻辑如何实现的:

SubscribeServiceRequestHandler#handle(*)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
String namespaceId = request.getNamespace();
String serviceName = request.getServiceName();
String groupName = request.getGroupName();
String app = request.getHeader("app", "unknown");
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
Service service = Service.newService(namespaceId, groupName, serviceName, true);
Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),
namespaceId, groupedServiceName, 0, request.getClusters());
ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,
true, subscriber.getIp());//做了按照cluster过滤。并且还存在防止服务实例过少,被调用方打爆的保护措施
if (request.isSubscribe()) {
clientOperationService.subscribeService(service, subscriber, meta.getConnectionId()); //继续往下看
NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
} else {
clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
}
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}

-> EphemeralClientOperationServiceImpl#subscribe(*)

1
2
3
4
5
6
7
8
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
Client client = clientManager.getClient(clientId);
checkClientIsLegal(client, clientId);
client.addServiceSubscriber(singleton, subscriber); //继续往下看
client.setLastUpdatedTime();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}

和上一篇一样,大部头的逻辑都是通过事件机制驱动的:

  1. SubscribeServiceTraceEvent //处理逻辑留给用户进行扩展,所以没啥好讲的
  2. ClientOperationEvent.ClientSubscribeServiceEvent 处理函数如下
1
2
3
4
5
6
7
8
//ClientServiceIndexesManager.class
private void addSubscriberIndexes(Service service, String clientId) {
Set<String> clientIds = subscriberIndexes.computeIfAbsent(service, key -> new ConcurrentHashSet<>());
// Fix #5404, Only first time add need notify event.
if (clientIds.add(clientId)) {//客户端第一次订阅服务,才会触发事件
NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
}
}
  1. ServiceEvent.ServiceSubscribedEvent
    这里也就是客户端第一次订阅该服务,才会触发该事件,做一次服务实例信息推送。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//NamingSubscriberServiceV2Impl.class
public void onEvent(Event event) {
if (event instanceof ServiceEvent.ServiceChangedEvent) {
// If service changed, push to all subscribers.
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
MetricsMonitor.incrementServiceChangeCount(service);
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
// If service is subscribed by one client, only push this client.
ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
Service service = subscribedEvent.getService();
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
subscribedEvent.getClientId()));
}
}

服务订阅和注册的唯一项如何确定,由如下三个属性决定:

1
2
3
4
5
6
7
public class Service implements Serializable {

private final String namespace;

private final String group;

private final String name;

总结:

  1. 服务订阅功能的目的是避免服务调用失败
  2. 服务订阅第一次发起,服务端会全量推一次订阅服务所有实例信息

Nacos Naming Server源码分析-服务注册注销

之前分析客户端的代码,发现Naming相关的后端调用主要有这几个,代码为NamingClientProxyDelegate这个class:

  1. registerService, deregisterService (grpc和http 两种实现)
  2. subscribe, unsubscribe grpc实现
  3. getServiceList grpc实现

我们先去看Naming Server的grpc实现

registerService

代码调用链路,grpc方式注册的服务,默认为临时服务。

  1. InstanceRequestHandler#handle(InstanceRequest request, RequestMeta meta)
1
2
3
4
5
6
7
8
9
10
11
12
13
public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
InstanceUtil.setInstanceIdIfEmpty(request.getInstance(), service.getGroupedServiceName());
switch (request.getType()) {
case NamingRemoteConstants.REGISTER_INSTANCE:
return registerInstance(service, request, meta); // 此处继续看下去
case NamingRemoteConstants.DE_REGISTER_INSTANCE:
return deregisterInstance(service, request, meta);
default:
throw new NacosException(NacosException.INVALID_PARAM,
String.format("Unsupported request type %s", request.getType()));
}
}
  1. InstanceRequestHandler#registerInstance(Service service, InstanceRequest request, RequestMeta meta)
1
2
3
4
5
6
7
8
9
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
throws NacosException {
clientOperationService
.registerInstance(service, request.getInstance(), meta.getConnectionId()); // 此处继续看下去
NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
request.getInstance().getIp(), request.getInstance().getPort()));
return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
}
  1. EphemeralClientOperationServiceImpl#registerInstance(Service service, Instance instance, String clientId)
    默认注册的临时服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);//校验实例是否过期

Service singleton = ServiceManager
.getInstance().getSingleton(service); //获取单例服务
if (!singleton.isEphemeral()) {
throw new NacosRuntimeException(NacosException.INVALID_PARAM,
String.format("Current service %s is persistent service, can't register ephemeral instance.",
singleton.getGroupedServiceName()));
}
Client client = clientManager.getClient(clientId);//获取已经连接的客户端信息
checkClientIsLegal(client, clientId); //校验是否是临时实例
InstancePublishInfo instanceInfo = getPublishInfo(instance);//增加一些额外信息,对象转换
client.addServiceInstance(singleton, instanceInfo);// 此处继续看下去
client.setLastUpdatedTime();
client.recalculateRevision();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
  1. AbstractClient#addServiceInstance(Service service, InstancePublishInfo instanceInfo)
1
2
3
4
5
6
7
8
9
10
11
12
13
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
if (instancePublishInfo instanceof BatchInstancePublishInfo) {
InstancePublishInfo old = publishers.put(service, instancePublishInfo);
MetricsMonitor.incrementIpCountWithBatchRegister(old, (BatchInstancePublishInfo) instancePublishInfo);
} else {
if (null == publishers.put(service, instancePublishInfo)) {
MetricsMonitor.incrementInstanceCount();
}
}
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
return true;
}

这里最终将实例信息,保存到一个map,做了metric统计和发布一个事件,上述列出的代码总共发布了4个事件:

  1. RegisterInstanceTraceEvent 这种TraceEvent类事件作为Nacos的扩展机制使用,需要自己定制处理逻辑
  2. ClientOperationEvent.ClientRegisterServiceEvent
  3. MetadataEvent.InstanceMetadataEvent
  4. ClientEvent.ClientChangedEvent

Read More

Nacos权限模块整合方案

Nacos是阿里巴巴开源的,用于服务发现和配置管理的中间件。配置中心已经使用Apollo,所以我们只需要使用服务发现能力即可。

问题

学习研究Nacos过程中,发现如下几点:

  1. Nacos的支持插件机制,包括权限模块
  2. Nacos管理后台,支持使用LDAP的方式对接已有权限系统
  3. Spring-cloud-nacos客户端需要配置用户名和密码,才能正常注册

第2点是为了用户方便使用公司内部账号访问Nacos。但这结合第3点使用就出现了冲突。假设一个部门多个人同时进行开发,这样某个人的用户名和密码就会暴露给了其他开发人员。

1
2
3
spring.cloud.nacos.discovery.namespace=provider
spring.cloud.nacos.discovery.username=test
spring.cloud.nacos.discovery.password=123456

Nacos本地身份验证和Ldap身份验证的区别

翻了一下源码,这两个方式,主要区别在于,ldap方式会先用Nacos本地身份验证,如果验证失败,才会使用ldap方式验证。

manager类涉及到身份认证,区别在上述已经说明。authPluginService涉及权限验证,两种方式代码完全是一样的,不存在差别。
鉴权模块类图

Read More

Go语言协程调度器GPM模型

为了提高Go程序对于CPU的利用率。Go语言的设计者,设计了GPM模型,并且实现了一个高效的协程调度器。

协程Goroutine

从操作系统层面去看,一般可以把线程分为“用户态”和“内核态”。“用户态”可以理解为编程语言实现的线程,而“内核态”可以理解为操作系统实现的线程。“用户态”线程必须和“内核态”线程绑定,才能正常执行。
一个线程中的用户态和内核态

GPM模型

Go的Goroutine即为“用户态”线程的Go语言实现。而管理Goroutine和“内核态”线程绑定关系的管理器,被称为“协程调度器”。“协程调度器”的模型有三个组件组成:G(协程Goroutine)、M(线程Thread)、P(处理器Processor)。
GPM模型

Read More

Loki日志平台

Loki是一个Grafana的开源项目,用以低成本的构建日志系统。Loki只对日志的MetaData构建索引,日志的内容则会进行压缩存储。后端支持多种存储,官方比较推荐使用分布式对象存储MinIO。

部署方式

支持多种部署方式:包括k8s、docker部署,还有windows、linux、mac二进制部署。
还支持多种部署架构:单节点单磁盘、单节点多磁盘、多节点多磁盘。正式环境首选多节点多磁盘部署,具有高可用,企业级性能和可扩展性。由于高可用的需要,需要磁盘的数量需要为4的整数倍。
如下步骤都在centos7.9发行版进行

Read More