ServiceDiscovery - minbox-projects/message-pipe GitHub Wiki
ServiceDiscovery
提供了获取通过负载均衡策略筛选后的客户端实例。
默认实现类ClientServiceDiscovery
内部实现了监听ServiceEvent
服务事件,缓存客户端列表以及处理客户端的上下线状态,过期设置等。
ServiceDiscovery
接口仅对外提供了一个获取客户端的方法#lookup(String pipeNamePattern)
,通过该方法可以健康在线的客户端,如果并有则会返回 null .
/**
* Use regular expressions to obtain ClientIds
*
* @param pipeName The {@link MessagePipe} specific name
* @return The {@link MessagePipe} binding clientIds
*/
protected Set<String> regexGetClientIds(String pipeName) {
Iterator<String> iterator = PIPE_CLIENTS.keySet().iterator();
while (iterator.hasNext()) {
// PipeName when the client is registered,May be a regular expression
String pipeNamePattern = iterator.next();
boolean isMatch = Pattern.compile(pipeNamePattern).matcher(pipeName).matches();
if (isMatch) {
return PIPE_CLIENTS.get(pipeNamePattern);
}
}
return null;
}
由于消息管道(MessagePipe
)名称支持正则表达式的方式配置,所以我们通过管道名称获取绑定的客户端时需要支持正则表达式匹配验证通过后进行筛选,
/**
* Listen for {@link ServiceEvent}
* <p>
* Process client services according to different event types
*
* @param event The {@link ServiceEvent} instance
*/
@Override
public void onApplicationEvent(ServiceEvent event) {
ServiceEventType eventType = event.getServiceEventType();
List<ClientInformation> clients = event.getClients();
switch (eventType) {
case REGISTER:
this.handingRegister(clients);
break;
case HEART_BEAT:
this.handingHeartBeat(clients);
break;
case RESET_INSTANCE:
this.handingResetInstances(clients);
break;
case EXPIRE:
this.handingExpired();
break;
}
}
ServiceEvent
事件监听是在ClientServiceDiscovery
类内实现的具体业务逻辑处理,所支持的ServiceEventType
详见:ServiceEventType
/**
* Register a service
* <p>
* Cache client information in a local collection
* The relationship between the binding pipeline and the client
*
* @param information The client information
*/
protected void registerService(ClientInformation information) {
information.setStatus(ClientStatus.ON_LINE);
this.CLIENTS.put(information.getClientId(), information);
String[] bindingPipeNames = information.getBindingPipeNames();
if (!ObjectUtils.isEmpty(bindingPipeNames)) {
for (String pipeName : bindingPipeNames) {
Set<String> pipeBindClientIds = PIPE_CLIENTS.get(pipeName);
pipeBindClientIds = Optional.ofNullable(pipeBindClientIds).orElse(new HashSet<>());
pipeBindClientIds.add(information.getClientId());
PIPE_CLIENTS.put(pipeName, pipeBindClientIds);
log.info("Client, Pipe: {}, IP: {}, Port: {}, registration is successful.",
pipeName, information.getAddress(), information.getPort());
}
}
}
首先新注册的客户端的状态为设置为上线(ON_LINE
)。
然后并将客户端的基本信息缓存到本地内存集合中(CLIENTS
),方便其他方法获取使用。
最后根据注册客户端时携带的已绑定的消息管道名称列表进行关系映射绑定,存放到PIPE_CLIENTS
二级集合内,提供给#lookup
方法使用。
/**
* Update the last heartbeat time of the client
* <p>
* When receiving the heartbeat, if the client is not registered, perform registration
*
* @param clients List of clients waiting to update their heartbeat time
*/
protected void handingHeartBeat(List<ClientInformation> clients) {
Long currentTime = System.currentTimeMillis();
clients.stream().forEach(client -> {
log.debug("Receiving client: {}, heartbeat sent.", client.getClientId());
ClientInformation cacheClient = CLIENTS.get(client.getClientId());
if (ObjectUtils.isEmpty(cacheClient)) {
client.setLastReportTime(currentTime);
this.registerService(client);
} else {
cacheClient.setLastReportTime(currentTime);
}
});
}
客户端(Client
)如果在线,定时会向服务端(Server
)发送心跳包,服务端接收后会将本次上报时间(毫秒)更新到内存集合的客户端实例(ClientInformation
)中,最后上报时间是剔除过期客户端的重要依据。
/**
* Handling reset client instance collection
*
* @param clients Client list after reset
* @see ServiceEventType#RESET_INSTANCE
*/
protected void handingResetInstances(List<ClientInformation> clients) {
this.CLIENTS.clear();
this.PIPE_CLIENTS.clear();
clients.stream().forEach(client -> this.registerService(client));
log.info("Client collection, reset instance list is complete.");
}
该处理逻辑只有在ServerServiceType#NACOS
配置方式时才会被调用,服务端(Server
)启动后会订阅NamingService的服务变动,如果出现变动就在第一时间通知到消息管道的服务端,收到通知后就会调用该方法来重置本地缓存的客户端列表。
/**
* Dealing with client expiration
*/
protected void handingExpired() {
if (!ObjectUtils.isEmpty(CLIENTS)) {
Long currentTime = System.currentTimeMillis();
CLIENTS.values().stream().forEach(client -> {
String clientId = client.getClientId();
long intervalSeconds = (currentTime - client.getLastReportTime()) / 1000;
if (intervalSeconds > serverConfiguration.getExpiredExcludeThresholdSeconds()
&& ClientStatus.ON_LINE.equals(client.getStatus())) {
client.setStatus(ClientStatus.OFF_LINE);
log.info("MessagePipe Client:{},status updated to offline.", clientId);
} else if (intervalSeconds <= serverConfiguration.getExpiredExcludeThresholdSeconds()
&& ClientStatus.OFF_LINE.equals(client.getStatus())) {
client.setStatus(ClientStatus.ON_LINE);
log.info("MessagePipe Client:{},status updated to online.", clientId);
}
});
}
}
如果通过ServerServiceType#GRPC
方式启动服务端(Server
),会在GRpcServerApplicationService
启动服务入口类内创建一个定时调度线程池,默认每间隔10秒执行一次过期的客户端服务剔除,届时就会发布ServiceEventType#EXPIRE
类型的服务事件,从而调用#handingExpired
方法来处理剔除过期客户端逻辑。