Skip to content

GSIP 119

Jody Garnett edited this page Aug 25, 2015 · 1 revision

GSIP 119 - WPS clustering of asynchronous requests

Overview

Proposed By

Andrea Aime

Assigned to Release

GeoServer 2.7.0

State

  • Under Discussion
  • In Progress
  • Completed
  • Rejected
  • Deferred

Motivation

Most OGC service requests are synchronous and stateless, which makes clustering GeoServer easy services wise, since none of the instances needs to know about what the other nodes in the cluster are up to.

WPS is the notable exception to the rule, it allows asynchronous requests with the following workflow:

  • The client submits a Execute request asking for status tracking
  • The WPS server returns a callback URL for the status document
  • The client polls such URL to get status updates and ultimately to retrieve the results

For the above to work, currently the GeoServer running the request needs to be polled, limiting the usage of WPS asynch requests to clusters that are using sticky sessions, which in turn limits scalability and ability to evenly distribute the load.

Proposal overview

The proposal aims at breaking the above limitation and allow status requests to be answered from nodes other than the one that's running the request, this demands for a shared repository of process execution status.

Moreover, the nodes need to be able to return the final WPS response, which needs to the returned to the status request once all the outputs and the response itself are ready. The response files can be very large, thus a shared large object store is also needed to handle all of the artifacts associated to a certain process execution

While the status store could be the same as the process artifacts one, we propose to roll two separate interfaces due to the significant difference in access modes and stored object size:

  • The status store will store small structured objects (the execution statuses) and will be accessed frequently both in read and write modes
  • The artifacts stores will contain potentially very large binary/text files, and will be normally only read and written to once per file.

Detailed presentation of changes

The following diagram shows the proposed changes, please follow this link for a full resolution image

image

ProcessListener

The ProcessListener interface provides a way for plugins to listen to the progress of a process through its lifecycle:

/**
 * An interface to monitor a process execution.
 * The methods will be called by different threads when working on asynchronous processes,
 * so the usage of thread locals to keep state about an execution is discouraged,
 */
public interface ProcessListener {

    /**
     * Called right before the process is submitted into the {@link ProcessManager}
     * 
     * @param event
     */
    void submitted(ProcessEvent event) throws WPSException;

    /**
     * Reports progress of the process. Not to be confused with the Java based process implementation 
     * progress tracking, this also includes input parsing and output encoding
    void progress(ProcessEvent event) throws WPSException;

    /**
     * Called when the process successfully executed and the output is succesfully written out to
     * the caller (or stored on disk, for asynchronous calls)
     * 
     * @param event
     */
    void completed(ProcessEvent event) throws WPSException;

    /**
     * Called when the process is getting cancelled by the client/administrator
     * 
     * @param event
     */
    void cancelled(ProcessEvent event) throws WPSException;

    /**
     * Called when the process failed to execute. This method should not throw further exceptions.
     */
    void failed(ProcessEvent event);
}

The above interface was discussed already during 2012 on geoserver-devel and it got resurrected in this proposal because tracking and exposing the status of a process across a cluster requires some sort of process status awareness (plus of course some way to store the information).

The ProcessEvent object holds togheter simple status information plus other bits that can be of interest for listeners, such as access to the original request, inputs, outputs and exceptions (which will be filled as the process lifecycle moves on):

/**
 * The context of an event triggered in a {@link ProcessListener}
 */
public class ProcessEvent implements Cloneable {

    ExecutionStatus status,

    Map<String, Object> inputs;

    Map<String, Object> outputs;

    Throwable error;
    
    ...
}

The ExecutionStatus class is already available in the current code base, and will be also the information being stored in the shared status repository. In order to make it more useful, some extra information will be added into it, such as a GeoServer node identifier, the user that initiated the call, whether the processs execution is asynchronous or chained.

One of the advantages of introducing this interface is that it will be available also for other usages, such as monitoring, or for example to add vendor extensions to the protocol such as notifying the user when a asynch process is completed via e-mail.

Node identifier

Several clustering related works are depending on a notion of a node identifier for each GeoServer in the cluster. Often this is coupled with a shared data directory, meaning the node identifier cannot be written in the standard configuration.

This proposal will make use of the GEOSERVER_NODE_OPTS system variable already used by the GUI to show a node id in every web interface page to identify the node, and if that is missing, will fall back to the host name/host address.

Status tracking and storage

The status will be tracked by a ProcessStateTracker class, which will implement the ProcessListener interface to be notified of process progress. The tracker will create and update the status of the various processes autonomously, and will offer some query methods to the parties interested in getting information about them.

/**
 * Autonomously tracks the status of processes running in this node, and allows querying for 
 * status of processes running either on this node, or on other nodes
 */
public class ProcessStatusTracker {
  
  /**
   * Retrieves the status of a particular process by executionId
   */
  ExecutionStatus get(executionId);

  /**
   * Lists all the statues matching the specified OGC filter (use the ExecutionStatus bean properties to
   * build filters)
  List<ExecutionStatus> list(Filter filter);
}

The status tracker will use a pluggable storage implementation to store and share the state among the various nodes::

/**
 * Stores and allows retrieval of execution status information for the various running and recently 
 * completed processes
 */
public class ProcessStatusStore {
   /**
    * Saves or updates a given process status 
    */
   void save(ExecutionStatus status);

   /**
    * Clears process statuses matching a certain condition
    */
   void remove(Filter filter);

   /**
    * Retrieves process statuses based on a given condition
    */
   List<ExecutionStatus> list(Filter filter);
}

Obviously, given the interfaces above, a simple way to implement a ProcessStatusStore is to wrap a GeoTools DataStore, and this is the likely path that will be taken for a in memory implementation that will be used for stand-alone WPS servers, and a database based implementation used across a cluster.

ProcessArtifactsStore

The ProcessArtifactStore interface will be used to store and retrieve any artifact related to a given process. The interface will look as follows:

/** 
 * Handles all the artifacts for a given process
 */
public interface ProcessArtifactsStore {
   /**
    * Retrieves a Resource matching the specified artifactName. The resource will be UNDEFINED if the 
    * artifact is missing, and will be created on demand at the first access
    */
   Resource getArtifact(String executionId, String artifactName);

   /**
    * Clears all the artifacts associated to a given executionId
    */
   void clear(String executionId);
   
}

The default implementation will be based on a FileSystemResourceStore pointing at the $GEOSERVER_DATA_DIR/tmp/wps folder, with an option to point it to another folder (which will be specified as a new WPSInfo metadata entry), which will suit clustering against a shared data directory, but also a model where each node has its own data directory, provided the location of a shared folder is configured in WPSInfo.

Status/artifact expiration

The current code already expires asynchronous process related artifacts after a configurable timeout, assuming everything is on the file system. The new version of the same will perform expiration based on the same timeout, but will clean up also the execution status storage accordingly to maintain consistency.

Tracking input decoding/output encoding

The overall time spent running a process can be subdivided in three stages:

  • Decoding the inputs
  • Executing the process
  • Encoding the outputs

Each of the three phases can take significant time, and it's hard to predict on a process by process basis what the expected percentage of time will be used by each. Regardless, the progress method of ProcessListener will have to report a overall progress percentage to be show to the WPS client and the admin.

The subdivision will necessarily be arbitrary, and will be setup as follows:

  • 30% to decoding the inputs, if the inputs contain any expensive to parse input, in particular, if any input is complex and needs to run though a PPIO to be decoded, 1% otherwise
  • 30% to encoding out outputs, if the outputs contain any complex output that needs a PPIO to be encoded, 1% otherwise
  • The remaining percentage will be associated to the actual process execution

Tracking the input decoding will be easy, as we need to pass a map of parsed Java objects to the process, and if the process has any raw input, its decoding will be tracked as part of the process execution itself. Tracking the output encoding is tricky, as that can be done in three different ways:

  • Raw output, where the output is directly streamed back to the client. This mode can be used only for synchronous requests, but for admin management purposes, it is interesting to also see the state of synchronous processes
  • Have each output saved into a separate file, and back-linked to from the actual WPS response, in this case the outputs are written
  • Output embedded in the XML response, which is performed by using XML encoder delegates as part of the normal XML encoding of the EMF object describing the response

Given the complexities of this phase, in case of complex outputs the progress will be set to 70% when the encoding starts, and will jump to 100% when the full response encoding is complete.

Changes to the ProcessManager interface

In the current implementation the ProcessManager has two submit methods, and one method to get the status of requests:

public interface ProcessManager {

    ...

    Map<String, Object> submitChained(String executionId, Name processName,
            Map<String, Object> inputs) throws ProcessException;


    void submit(String executionId, Name processName, Map<String, Object> inputs, boolean background)
            throws ProcessException;

    ExecutionStatus getStatus(String executionId);
}

This follows the WPS model, where the progress is not automatically notified to the client, but is instead pooled from the client given a executionId. However, in order to share the information between nodes, we have to move from a polling model to a notification one, so the above methods will be modified to receive a ProgressListener instead:

public interface ProcessManager {

    ...

    Map<String, Object> submitChained(String executionId, Name processName,
            Map<String, Object> inputs, ProgressListener listener) throws ProcessException;


    void submit(String executionId, Name processName, Map<String, Object> inputs, boolean background,
                ProgressListener listener) throws ProcessException;
}

ProcessStatePage

The ProcessStatePage will be used to monitor the state of processes currently running in the server, reporting most of the informations available in the execution status class as a sort-able and filter-able table:

  • state (eventually as an icon, alternatively text)
  • node
  • user
  • process name
  • synchronous or asynchronous
  • progress (status message will be available as a hover on the progress percentage)
  • whether the process is performing chained execution, or not

The filtering will be based on a text match just like any other GeoServer table.

Feedback

Backwards Compatibility

Voting

Project Steering Committee:

  • Alessio Fabiani +1
  • Andrea Aime +1
  • Ben Caradoc-Davies
  • Christian Mueller
  • Gabriel Roldán
  • Jody Garnett +1
  • Jukka Rahkonen +0
  • Justin Deoliveira
  • Phil Scadden +0
  • Simone Giannecchini +0

Links

Clone this wiki locally