websocket proxy用法 - housekeeper-software/tech GitHub Wiki

用途

websocket proxy由c++语言编写,底层基于libwebsockets、libzeromq、protocolbuffer 三个库构建。其目的是接受客户端的websocket连接,并将消息通过 zeromq转发给后端。后端也可以通过zeromq向 websocket proxy发送消息,最终消息会写入客户端。
websocket proxy 自定义了一种基于Protocol Buffer协议,此协议在zeromq上进行传输。websocket proxy和后端都必须按照此种协议进行交换信息。

架构图

自定义 Protocol Buffer协议


syntax = "proto3";

package protocol;

option optimize_for = LITE_RUNTIME;

message MQMessage{
	string id = 1;  //连接标识(参考下文)
	uint32 type = 2;  //消息类型,如下定义
        bool ack = 3;  //是否是回复消息,这个由zeromq定义的消息模式,我们使用 REQ/REP简单模型,回复包置ack=true
	string extension = 4; //传输扩展消息,一般打包成json,websocket proxy第一个包会打包额外的信息给后端,见下文
	bytes payload = 5; //这里透传客户端的消息内容。
}
其中 type 定义如下:  
enum ZMQProtocol {
 ZMQ_TYPE_MESSAGE,
 ZMQ_TYPE_BROADCAST,
 ZMQ_TYPE_KILL,
 ZMQ_TYPE_OFFLINE
};

websocket proxy转给后端的第一个包中的 extension: 
{"address":"","time":double}
address:是设备端真实ip ,time:是上线时间,unix时间格式。

ZMQProtocol 详解

enum ZMQProtocol {
 ZMQ_TYPE_MESSAGE,
 ZMQ_TYPE_BROADCAST,
 ZMQ_TYPE_KILL,
 ZMQ_TYPE_OFFLINE
};
websocket proxy传给后端的消息中只有如下两种类型:  
ZMQ_TYPE_MESSAGE:消息包,正文在 payload中  
ZMQ_TYPE_OFFLINE:客户端掉线通知,没有正文

后端给websocket proxy的消息类型如下:  
ZMQ_TYPE_MESSAGE:正常的消息,websocket proxy将转发给客户端 
ZMQ_TYPE_BROADCAST:广播消息,websocket proxy将转发给所有在线的客户端
ZMQ_TYPE_KILL:后端希望踢掉某个客户端连接
ZMQ_TYPE_RESTART:java端通知 proxy 重启网络

java端重启之后的处理流程

因为 websocket proxy有消息缓存功能,所以,java端重启之后,需要先开启 pushish,然后发送ZMQ_TYPE_RESTART,此后等待几秒钟再启动consumer

配置容器

配置docker-compose 如下: 
      - PROXY_PRODUCER=tcp://127.0.0.1:50001
      - PROXY_CONSUMER=tcp://*:50000
      - PROXY_HANDLER=/entrance
      - PROXY_WEBSOCKET_OPTIONS=ws,15888,0,30,100,,,

连接标识

连接标识类似: 12123-01-122121,分为三段,中间以中横线分割。具体含义如下:   
连接的序号-线程编号-服务器时间戳  
连接序号是一个uint64的数字,一个不会重复的id用以标识一个客户端连接。由客户端连接创建之后websocket proxy分配。  
线程编号:标识这个连接在哪个线程中处理。 
服务器时间戳:用于判断服务器是否重启过。  
这个字符串可以精确标识一个连接,直到其生命周期结束。  后端在收到一个包(通常是登录包之类的),应该用这个id标识这个连接,当后端需要发送消息给此连接时,
需要设置消息的id字段。  
websocket proxy会通过服务器时间戳来判断这个消息是否属于过期消息(比如websocket proxy重启过),如过期,则直接抛弃。否则将根据id查找到这个连接,并将消息转发给客户端。  
有个例外是: ZMQ_TYPE_BROADCAST,此刻无需设置id。  

java端用法

java 在pom中引入: 

   <dependency>
            <groupId>org.zeromq</groupId>
            <artifactId>jeromq</artifactId>
            <version>0.5.3</version>
        </dependency>

        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.17.3</version>
        </dependency>
还要添加 ZMQMessage.java (https://github.com/housekeeper-software/tech/blob/main/ZMQMessage.java)。  
以下是消费端的示例:其他的参考: https://github.com/zeromq/jeromq

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import protocol.ZMQMessage;


public class ServerApplication {

    public class Consumer implements Runnable {
        public void run() {

        }
    }

    public static void main(String[] args) throws Exception {
        try (ZContext context = new ZContext()) {
            // Socket to talk to clients
            ZMQ.Socket socket = context.createSocket(SocketType.REP);
            socket.bind("tcp://*:50001");

            while (!Thread.currentThread().isInterrupted()) {
                // Block until a message is received
                byte[] data = socket.recv(0);
                ZMQMessage.MQMessage message = ZMQMessage.MQMessage.parseFrom(data);

                // Print the message
                System.out.println(
                        "Received: [" + message.getPayload().toStringUtf8() + message.getExtension() + "]"
                );

                // Send a response
                ZMQMessage.MQMessage.Builder builder = ZMQMessage.MQMessage.newBuilder();
                builder.setId(message.getId());
                builder.setAck(true);
                ZMQMessage.MQMessage reply = builder.build();
                byte[] rdata = reply.toByteArray();
                socket.send(rdata, 0);
            }
        }
    }
}

性能测试

websocket proxy 单机可支持10万左右的连接  
zeromq 在4核测试 10万条消息/s (消息长度 30 bytes)
⚠️ **GitHub.com Fallback** ⚠️