Writing WebSocket Sub Protocol - Atmosphere/atmosphere GitHub Wiki

The Atmosphere Framework easily allow the implementation of custom WebSocket Protocol (or sub protocol). Implementation of WebSocketProtocol as portable and works transparently on server supporting the WebSocket. The interface to implement is called WebSocketProtocol. The simplest way is to annotate your implementation using the WebSocketProtocolService annotation. You can also define it in web.xml or application.xml or inside your META-INF/services file.

The API is defined as

    /**
     * Allow an implementation to query the AtmosphereConfig of init-param, etc.
     *
     * @param config {@link org.atmosphere.cpr.AtmosphereConfig}
     */
    void configure(AtmosphereConfig config);

    /**
     * Parse the WebSocket message, and delegate the processing to the AtmosphereFramework#asyncSupport or
     * to any existing technology. Invoking  AtmosphereFramework#asyncSupport will delegate the request processing
     * to the targetted AtmosphereHandler implementation. Returning null means this implementation will
     * handle itself the processing/dispatching of the WebSocket's request;
     * <br>
     * As an example, this is how Websocket messages are delegated to the
     * Jersey runtime.
     * <br>
     *
     * @param webSocket The {@link WebSocket} connection
     * @param data      The Websocket message
     */
    List<AtmosphereRequest> onMessage(WebSocket webSocket, String data);

    /**
     * Parse the WebSocket message, and delegate the processing to the AtmosphereFramework#asyncSupport or
     * to any existing technology. Invoking  AtmosphereFramework#asyncSupport will delegate the request processing
     * to the targetted AtmosphereHandler implementation. Returning null means this implementation will
     * handle itself the processing/dispatching of the WebSocket's request;
     * <br>
     * As an example, this is how Websocket messages are delegated to the Jersey runtime.
     * <br>
     *
     * @param webSocket The {@link WebSocket} connection
     * @param offset    offset message index
     * @param length    length of the message.
     */
    List<AtmosphereRequest> onMessage(WebSocket webSocket, byte[] data, int offset, int length);

    /**
     * Invoked when a WebSocket is opened
     * @param webSocket {@link WebSocket}
     */
    void onOpen(WebSocket webSocket);

    /**
     * Invoked when a WebSocket is closed
     * @param webSocket {@link WebSocket}
     */
    void onClose(WebSocket webSocket);

    /**
     * Invoked when an error occurs.
     * @param webSocket {@link WebSocket}
     * @param t a WebSocketException
     */
    void onError(WebSocket webSocket, WebSocketProcessor.WebSocketException t);

The Atmosphere Framework will invoke the API in the following order:

  1. configure : when the framework starts, you can customize your implementation of WebSocketProtcol.
  2. onOpen : When a WebSocket is opened. You can manipulate the associated AtmosphereResource at that time, by suspending the response, adding special headers, etc. It is recommended to not manipulate the AtmosphereResource's request/response in that method and instead write an AtmosphereInterceptor instead.
  3. onMessage : invoked when bytes or text messages are received. This is where you manipulate the message. You can digest the message or create a list of AtmosphereResource based on the message's content. If you decide to return a list of AtmosphereResources, those object will be dispatched as they where normal HTTP requests so they can be consumed by any AtmosphereHandler. As an example, you can dispatch request to the Atmosphere-Jersey extension and Jersey will never realize the underlying communication and message is Websocket.
  4. onClose : when the WebSocket gets closed.
  5. onError : when an unexpected error occurs.

Echo Protocol

As an example, below is a simple WebSocketProtocol that just echo the WebSocket message received:


    @Override
    public List<AtmosphereRequest> onMessage(WebSocket webSocket, String data) {
        webSocket.resource().getBroadcaster().broadcast(data);
        return null;
    }

    @Override
    public List<AtmosphereRequest> onMessage(WebSocket webSocket, byte[] data, int offset, int length) {

        byte[] b = new byte[length];
        System.arraycopy(data, offset, b, 0, length);
        webSocket.resource().getBroadcaster().broadcast(b);
        return null;
    }

    @Override
    public void configure(AtmosphereConfig config) {
    }

    @Override
    public void onOpen(WebSocket webSocket) {
    }

    @Override
    public void onClose(WebSocket webSocket) {
    }

    @Override
    public void onError(WebSocket webSocket, WebSocketProcessor.WebSocketException t) {
        System.out.println(t.getMessage() 
             + " Status {} Message {}", t.response().getStatus(), t.response().getStatusMessage());
    }

The complete code can be read here

Simple HTTP Protocol

By default, Atmosphere is using the SimpleHttpProtocol to dispatch WebSocket's message to framework and application. When a message is received, it is considered as an HTTP POST with Content-Type equals to text/html.

    public void configure(AtmosphereConfig config) {
        String contentType = config.getInitParameter(ApplicationConfig.WEBSOCKET_CONTENT_TYPE);
        if (contentType == null) {
            contentType = "text/html";
        }
        this.contentType = contentType;

        String methodType = config.getInitParameter(ApplicationConfig.WEBSOCKET_METHOD);
        if (methodType == null) {
            methodType = "POST";
        }
        this.methodType = methodType;

        String delimiter = config.getInitParameter(ApplicationConfig.WEBSOCKET_PATH_DELIMITER);
        if (delimiter == null) {
            delimiter = "@@";
        }
        this.delimiter = delimiter;

        String s = config.getInitParameter(ApplicationConfig.RECYCLE_ATMOSPHERE_REQUEST_RESPONSE);
        if (s != null && Boolean.valueOf(s)) {
            destroyable = true;
        } else {
            destroyable = false;
        }
    }

    public List<AtmosphereRequest> onMessage(WebSocket webSocket, String d) {
        AtmosphereResourceImpl resource = (AtmosphereResourceImpl) webSocket.resource();
        if (resource == null) {
            logger.error("Invalid state. No AtmosphereResource has been suspended");
            return null;
        }
        String pathInfo = resource.getRequest().getPathInfo();
        if (d.startsWith(delimiter)) {
            int delimiterLength = delimiter.length();
            int bodyBeginIndex = d.indexOf(delimiter, delimiterLength);
            if (bodyBeginIndex != -1) {
                pathInfo = d.substring(delimiterLength, bodyBeginIndex);
                d = d.substring(bodyBeginIndex + delimiterLength);
            }
        }
        Map<String,Object> m = new HashMap<String, Object>();
        m.put(FrameworkConfig.WEBSOCKET_SUBPROTOCOL, FrameworkConfig.SIMPLE_HTTP_OVER_WEBSOCKET);
        // Propagate the original attribute to WebSocket message.
        m.putAll(resource.getRequest().attributes());

        List<AtmosphereRequest> list = new ArrayList<AtmosphereRequest>();

        // We need to create a new AtmosphereRequest as WebSocket message may arrive concurrently on the same connection.
        list.add(new AtmosphereRequest.Builder()
                .request(resource.getRequest())
                .method(methodType)
                .contentType(contentType)
                .body(d)
                .attributes(m)
                .pathInfo(pathInfo)
                .destroyable(destroyable)
                .headers(resource.getRequest().headersMap())
                .build());

        return list;
    }

    public List<AtmosphereRequest> onMessage(WebSocket webSocket, byte[] d, final int offset, final int length) {
        return onMessage(webSocket, new String(d, offset, length));
    }

    public void onOpen(WebSocket webSocket) {
    }

    public void onClose(WebSocket webSocket) {
    }

    @Override
    public void onError(WebSocket webSocket, WebSocketProcessor.WebSocketException t) {
        logger.warn(t.getMessage() + " Status {} Message {}", t.response().getStatus(), t.response().getStatusMessage());
    }

If you need a more powerful REST over WebSocket Protocol, takes a look at the SwaggerSocket project.