使用 Netty 作为java 程序的消息循环 - housekeeper-software/tech GitHub Wiki

场景

一般,如果在java服务器实现除了http协议以外的通讯连接,一般可以使用netty的框架即可。但是,有时候,我们可能选用其他的java通讯库来实现tcp/udp/websocket等协议。此刻,我们可以借助 netty的异步事件循环的能力提高程序的可靠性,并尽量减少锁的使用。

方法举例

我们结合Netty NIO和 java_websocket实现一个简单的websoket 客户端。此例中完全抛弃了锁的使用,并且是线程安全的。

import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.framing.Framedata;
import org.java_websocket.handshake.ServerHandshake;

import java.net.URI;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class SocketLayer {
    private final EventLoopGroup eventLoopGroup;
    private final int pingInterval;
    private final SocketHandler handler;
    private Future<?> pingTimer = null;
    private WebSocketClient client = null;

    public interface SocketHandler {
        void onSocketOpen(ServerHandshake serverHandshake);

        void onSocketClosed(int code, String reason, boolean remote);

        void onSocketMessage(byte[] data);

        void onSocketMessage(String data);
    }

    public SocketLayer(SocketHandler handler, int pingInterval) {
        this.handler = handler;
        this.eventLoopGroup = new NioEventLoopGroup(1);
        this.pingInterval = pingInterval;
    }

    public void start(URI uri) {
        if (client != null)
            return;
        client = new WebSocketClient(uri) {
            @Override
            public void onOpen(ServerHandshake handshakedata) {
                System.out.println("onOpen");
                handler.onSocketOpen(handshakedata);

                pingTimer = eventLoopGroup.next().scheduleAtFixedRate(() -> {
                    client.sendPing();
                }, pingInterval, pingInterval, TimeUnit.SECONDS);
            }

            @Override
            public void onMessage(ByteBuffer blob) {
                handler.onSocketMessage(blob.array());
            }

            @Override
            public void onMessage(String s) {
                handler.onSocketMessage(s);
            }

            @Override
            public void onClose(int code, String reason, boolean remote) {
                System.out.printf("websocket onClose(%d,%s,%b)\r\n", code, reason, remote);
                eventLoopGroup.next().execute(() -> stopPing());
                handler.onSocketClosed(code, reason, remote);
            }

            @Override
            public void onError(Exception ex) {
                ex.printStackTrace();
            }

            @Override
            public void onWebsocketPong(WebSocket conn, Framedata f) {
                SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd 'at' HH:mm:ss z");
                Date date = new Date(System.currentTimeMillis());
                System.out.println(formatter.format(date) + ":pong");
            }
        };
        client.setConnectionLostTimeout(pingInterval * 3);
        client.connect();
    }

    public void stop() {
        try {
            eventLoopGroup.next().execute(() -> {
                if (client != null) {
                    try {
                        stopPing();
                        client.closeBlocking();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    client = null;
                }
            });
            eventLoopGroup.shutdownGracefully().sync();
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }

    public boolean send(byte[] data) {
        if (!client.isOpen())
            return false;
        try {
            client.send(data);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    public void reconnect() {
        eventLoopGroup.next().execute(() -> {
            if (client != null) {
                stopPing();
                client.reconnect();
            }
        });
    }

    private void stopPing() {
        if (pingTimer != null && !pingTimer.isCancelled()) {
            pingTimer.cancel(false);
            pingTimer = null;
        }
    }
}

分析

EventLoopGroup group = new NioEventLoopGroup(1);
此句即可创建[一个]Nio线程。其实线程并没有创建,当第一次使用的时候才会创建。如果是简单的应用,比如只有一个连接,那么一条线程即可。  
如果不指定线程数,应该是CPU核心数*2的线程数。如果线程多了,对于串行任务还是有问题的。    
EventLoopGroup有几个方法:  
group.next().execute(闭包),表示立即执行,如果NIO线程和调用线程不是同一个的话,闭包不会在调用线程执行,而是在group指定的NIO线程中执行。
group.next().schedule(闭包),表示执行单次定时任务,其他同上  
group.next().scheduleAtFixedRate(闭包),表示周期性定时任务,其他同上。  
group.next().submit(闭包,return value),带返回值的 excute()。

每个方法都可以返回一个 Future<?> ,可以获取闭包执行状态,获取取消任务。

如果有多个线程交互,我们可以通过其他线程的group,将任务投递到其他线程中执行。

与其他语言对比

类似android 的Handler。  
与chromium中的 MessageLoop相同。 group.next().execute(闭包),相当于 task_runner()->PostTask(Clouser)
尤其在单线程应用中,通过这种方法将外部调用,定时器,IO事件都可以归一化到单一线程中,极大的减少锁使用。
将任务序列化之后,程序的逻辑控制变得非常简单。  
其他语言中的UI单线程也有类似的实现。