发送可以跟随目前的LBS,在多个主机上向MQ发布消息,提高分布的效率。
接收也需要如此,但不需要LBS支持。因为,目前只打算部署一台MQ+eproxy,但是消费端可以有多个。
而且,为了尽量保证消息消费的时序性,我们可以多开几个消费队列,发布端(eproxy)可以保证来自于同一个用户的消息放置到同一个队列 。
如下所示:
std::string str = message->SerializeAsString();
const utils::ConsistentHash::Node *node = to_web_hash_.GetNode(message->user().uid());
if (!node) {
producer_list_[0]->Publish(std::move(str), base::Time::Now());
} else {
producer_list_[node->index]->Publish(std::move(str), base::Time::Now());
}
eproxy的消费端(后端是发送端)的逻辑也可以确保同一个用户的消息一定由同一个工程师发送出去:
void MainApp::OnRabbitMessageArrival(amqp_message_t *amqp_message) {
std::unique_ptr<protocol::CooCareMessage> message(
new protocol::CooCareMessage());
if (!message->ParseFromArray(amqp_message->body.bytes, amqp_message->body.len)) {
return;
}
DLOG(INFO) << "send message to user from WEB: " << message->payload();
if (message->to().empty()) {
client_list_[0]->SendMessage(std::move(message), amqp_message->properties.timestamp, base::Time::Now());
} else {
const utils::ConsistentHash::Node *node = to_user_hash_.GetNode(message->to());
if (!node) {
client_list_[0]->SendMessage(std::move(message), amqp_message->properties.timestamp, base::Time::Now());
} else {
client_list_[node->index]->SendMessage(std::move(message), amqp_message->properties.timestamp, base::Time::Now());
}
}
}
一般而言,某个工程师不会无法工作。要么全部无法工作,比如通讯服务器出现异常,要么都能工作。所以,这种分配不会造成问题。
经过分离之后,将双向通讯的负载能力分布到不同的服务器上,提高系统的吞吐能力。
首先,我们需要使用容器部署,每个jar包运行于单独的容器。且,每个容器专注消费若干队列中的一个。
实际上,spring的 async 注解有一定概率导致消息乱序。但可能影响不大,出现的概率较低。因为人工输入消息总是很慢的。
目前,可以使用一台物理服务器克隆几个镜像,每个容器消费一个队列。如果单机出现资源紧张,可以使用多台物理主机,每台主机跑一个容器。
如果这样还出现资源紧张,可以增加消费队列,并再次扩展物理主机。
消费端注意两个地方:
1)是否手工ack
2)qos取多少合适
手工ack,qos=1可以实现不丢消息。但是严重影响性能,这个需要测试才知道是否能满足现在的业务场景需求。
后端每个节点都可以同时连接三个生产队列(如果不够还可以增加),可以顺序使用生产队列,也可以使用一致性哈希算法。
这里也需要注意一点:生产端是否需要confirm。这个也会严重影响生产效率。
publish消息的时候可以加一个当前时间作为timestamp,精确到微秒
单机 rabbitmq在不设置confirm和ack的情况下,qos=100,吞吐可以达到1万条/秒。要想确保消息不丢失,还需要做rabbitmq集群,但应该没必要,
消息的价值不高。
单机 通讯服务器消息处理能力应该不低于3000条/秒。(通过观察eproxy的生产效率大概可知)
eproxy通过几个工程师连接与servicecenter通讯,每个连接中消息是被缓存的,突然断线可能会造成部分消息丢失(比如已经丢进内核缓冲区,但没有发送成功的消息会丢失),断线后延迟10毫秒立即重连,然后继续发送缓存消息。目前最大缓存数量为1万条消息,超出将被存入数据库做记录,然后丢弃。
为了解决消息丢失的问题,可以配置成tcp连接消息确认(如同rabbitmq的confirm机制),只有收到服务器ack的消息才被认为发送成功。这个功能可以在docker-compose中配置:
EPROXY_MESSAGE_NO_ACK=false。
因为通讯服务器处理完一条消息总会发送一个ack回复。
这样可以确保消息不会丢失,只有收到ack之后,消息才会从队列中删除,并且发送下一条消息。
缺点就是发送效率大大的降低。
这个队列中的消息有过期时间(1小时)。
消息来源主要有:
1)eproxy消费端的事件(对应于后端的生产)
2)eproxy生产端的事件(对应于后端的消费)
3)工程师连接的事件 (从service center收发消息)
事件为json格式,有固定的发送周期。
可以通过scheme判断消息源,有: producer, consumer, communication三种
可以通过id进一步判断发送者:producer,consumer就是队列名称,communication就是工程师通讯id。
其中 status(boolean) :true/false, 当为false的时候需要注意,这是一个系统出现错误的报告,需要预警。
再者就是观察 : event字段,判断事件类型:
communication:
1) commconnect:通讯连接事件,status表示成功失败
2) commlogin:通讯登录事件
3) weblogin:web登录事件,出错了后台需要解决问题了。
4)discard:消息缓冲区满导致丢消息事件。json中包含消息内容(已经转为json格式)这个也需要马上响应
5) report:周期性报告:包含
{
"maxMessageCount":最大缓存长度,比如10000
"discardCount":丢失的消息总数
"successCount" : 发送成功的总数
"messageList":目前缓存的消息总数。这个数 大于一定数目,或者长期大于一丁数目,我们就需要考虑扩容了。
}
consumer:
1)connect:连接rabbitmq的情况
2)report: 收到了多少个消息,如果服务器一直持续运行,这个数字是服务器上线以来收到的消息总数
producer:
1)discard:消息缓冲区满导致丢消息事件。json中包含消息内容(已经转为json格式)这个也需要马上响应
2)connect:连接rabbitmq事件
3)timeout:rabbitmq连接超时断开
4)report:周期报告:
{
"maxMessageCount":最大缓存长度,比如10000
"discardCount":丢失的消息总数
"successCount" : 发送成功的总数
"messageList":目前缓存的消息总数。这个数 大于一定数目,或者长期大于一丁数目,我们就需要考虑扩容了。
}
通讯服务器为了支持docker-compose做了一些配置上的优化,与核心业务无关。
同时处理了SIGTERM事件(docker stop的时候docker会发送这个信号给容器内进程),实现GracefulShutdown。
如果主机CPU占用过高,可以考虑将 servicecenter单独部署。同时增加 usercenter 容器数量。
如图片,文件等,可以通过http接口先通过后端上传到HP S3,并返回mediaid,然后客户端再发消息,并打包进mediaid。后端就不用再传给HP了。
如果根据用户id将消息均衡分布到不同队列中,需要用到一致性哈希算法。