MessagePipeDistributor - minbox-projects/message-pipe GitHub Wiki

消息分发器(MessagePipeDistributor),顾名思义,它担任了消息分发的工作,是发送消息到客户端的中间桥梁。

1. 查找健康的客户端

分发消息之前需要获取分发消息的目标客户端,而客户端(Client)在项目启动时会将绑定的消息管道名称一并上报,服务端(Server)的ServiceDiscovery会将客户端与消息管道绑定的关系进行缓存。

ClientInformation client = serviceDiscovery.lookup(pipeName);

通过ServiceDiscovery#lookup方法就可以获取指定消息管道(MessagePipe)所绑定的客户端,并且是经过负载均衡策略(ClientLoadBalanceStrategy)过滤后健康的客户端信息。

2. Grpc发送消息

由于客户端(Client)在启动时会创建一个Grpc的服务,用于接收分发的消息,所以我们只需要在分发消息时建立与客户端的连接通道(ManagedChannel)即可。

2.1 建立消息通道

建立连接通道由ClientChannelManager类的establishChannel方法负责,源码如下所示:

/**
 * Establish a client channel
 *
 * @param information The {@link ClientInformation} instance
 * @return {@link ManagedChannel} instance
 */
public static ManagedChannel establishChannel(ClientInformation information) {
    String clientId = information.getClientId();
    if (ObjectUtils.isEmpty(information)) {
        throw new MessagePipeException("Client: " + clientId + " is not registered");
    }
    ManagedChannel channel = CLIENT_CHANNEL.get(clientId);
    if (ObjectUtils.isEmpty(channel) || channel.isShutdown()) {
        channel = ManagedChannelBuilder.forAddress(information.getAddress(), information.getPort())
                .usePlaintext()
                .build();
        CLIENT_CHANNEL.put(clientId, channel);
    }
    return channel;
}

grpc的消息通道建立完成后会被缓存到CLIENT_CHANNEL集合内进行临时缓存,如果连接分发消息时遇到了一些连接问题会通过调用ClientChannelManager#removeChannel方法进行删除,如果需要则再次创建一个新的通道。

3. 分发结果确认

消息分发到客户端(Client)后,客户端需要进行执行一系列的业务逻辑,最终根据业务判断返回执行的结果,如果响应状态为MessageResponseStatus#SUCCESS则表示消息处理成功(不过判断是否从阻塞队列删除消息并不是分发器决定,可以查看MessagePipe 内的handleFirst、handleToLast方法。)