XD Distributed Runtime - garyrussell/spring-xd GitHub Wiki

Introduction

This document describes what’s happening "under the hood" of the Spring XD Distributed Runtime (DIRT) and in particular, how the runtime architecture achieves high availability and failover in a clustered production environment. See Running in Distributed Mode for more information on installing and running Spring XD in distributed mode.

This discussion focuses on Spring XD’s core runtime components and the role of ZooKeeper in managing the state of the Spring XD cluster and enabling automatic recovery from failures.

Configuring Spring XD for High Availabilty(HA)

A production Spring XD environment is typically distributed among multiple hosts in a clustered environment. Spring XD scales horizontally when you add container instances. In the simplest case, all containers are replicas, that is each instance is running on an identically configured host and modules are deployed to any available container in a round-robin fashion. However, this simplifying assumption does not address real production scenarios in which more control is requred in order to optimize resource utilization. To this end, Spring XD supports a flexible algorithm which allows you to match module deployments to specific container configurations. The container matching algorithm will be covered in more detail later, but for now, let’s assume the simple case. Running multiple containers not only enables horizontal scalability, but enables failure recovery. If a container becomes unavailable due to an unrecoverable connection loss, any modules currently deployed to that container will be deployed automatically to the other available instances.

Spring XD requires that a single active Admin server handle interactions with the containers, such as stream deployment requests, as these types of operations must be processed serially in the order received. Without a backup, the Admin server becomes single point of failure. Therefore, two (or more for the risk averse) Admin servers are recommended for a production environment. Note that every Admin server can handle all requests via REST endpoints but only one instance, the "Leader", will actually perform requests that update the runtime state. If the Leader goes down, another available Admin server will assume the leader role. Leader Election is an example of a common feature for distributed systems provided by the Curator Framework which sits on top of ZooKeeper.

An HA Spring XD installation also requires that external servers - ZooKeeper, messaging middleware, and data stores needed for running Spring XD in distributed mode must be configured for HA as well. Please consult the product documentation for specific recommendations regarding each of these external components. Also see Message Bus Configuration for tips on configuring the MessageBus for HA, error handling, etc.

ZooKeeper Overview

In the previous section, we claimed that if a container goes down, Spring XD will redeploy any modules deployed on that instance to another available container. We also claimed that if the Admin Leader goes down, another Admin server will assume that role. ZooKeeper is what makes this all possible. ZooKeeper is a widely used Apache project designed primarily for distributed system management and coordination. This section will cover some basic concepts necessary to understand its role in Spring XD. See The ZooKeeper Wiki for a more complete overview.

ZooKeeper is based on a simple hierarchical data structure, formally a tree, and conceptually and semantically similar to a file directory structure. As such, data is stored in nodes. A node is referenced via a path, for example, /xd/streams/mystream. Each node can store additional data, serialized as a byte array. In Spring XD, all data is a java.util.Map serialized as JSON. The following figure shows the Spring XD schema:

ZooKeeper XD Schema

A ZooKeeper node is either ephemeral or persistent. An ephemeral node exists only as long as the session that created it remains active. A persistent node is, well, persistent. Ephemeral nodes are appropriate for registering Container instances. When an Spring XD container starts up it creates an ephemeral node, /xd/containers/<container-id>, using an internally generated container id. When the container’s session is closed due to a connection loss, for example, the container process terminates, its node is removed. The ephemeral container node also holds metadata such as its hostname and IP address, runtime metrics, and user defined container attributes. Persistent nodes maintain state needed for normal operation and recovery. This includes data such as stream definitions, job definitions, deployment manifests, module deployments, and deployment state for streams and jobs.

Obviously ZooKeeper is a critical piece of the Spring XD runtime and must itself be HA. ZooKeeper itself supports a distributed architecture, called an ensemble. The details are beyond the scope of this document but for the sake of this discussion it is worth mentioning that there should be at least three ZooKeeper server instances running (an odd number is always recommended) on dedicated hosts. The Container and Admin nodes are clients to the ZooKeeper ensemble and must connect to ZooKeeper at startup. Spring XD components are configured with a zk.client.connect property which may designate a single <host>:<port> or a comma separated list. The ZooKeeper client will attempt to connect to each server in order until it succeeds. If it is unable to connect, it will keep trying. If a connection is lost, the ZooKeeper client will attempt to reconnect to one of the servers. The ZooKeeper cluster guarantees consistent replication of data across the ensemble. Specifically, ZooKeeper guarantees:

  • Sequential Consistency - Updates from a client will be applied in the order that they were sent.

  • Atomicity - Updates either succeed or fail. No partial results.

  • Single System Image - A client will see the same view of the service regardless of the server that it connects to.

  • Reliability - Once an update has been applied, it will persist from that time forward until a client overwrites the update.

  • Timeliness - The clients view of the system is guaranteed to be up-to-date within a certain time bound.

ZooKeeper maintains data primarily in memory backed by a disk cache. Updates are logged to disk for recoverability, and writes are serialized to disk before they are applied to the in-memory database.

In addition to performing basic CRUD operations on nodes, A ZooKeeper client can register a callback on a node to respond to any events or state changes to that node or any of its children. Such node operations and callbacks are the mechanism that control the Spring XD runtime.

Spring XD and ZK Ensemble

The Admin Server Internals

Assuming more than one Admin instance is running, Each instance requests leadership at start up. If there is already a designated leader, the instance will watch the xd/admin node to be notified if the Leader goes away. The instance designated as the "Leader", using the Leader Selector recipe provided by Curator, a ZooKeeper client library that implements some common patterns. Curator also provides some Listener callback interfaces that the client can register on a node. The AdminServer creates the top level nodes, depicted in the figure above:

  • /xd/admins - children are ephemeral nodes for each available Admin instance and used for Leader Selector

  • /xd/containers - children are ephemeral nodes containing runtime attributes including hostname,process id, ip address, and user defined attributes for each container instance.

  • /xd/streams - children are persistent nodes containing the definition (DSL) for each stream.

  • /xd/jobs - children are persistent nodes containing the definition (DSL) for each job.

  • /xd/taps - children are persistent nodes describing each deployed tap.

  • /xd/deployments/streams - children are nodes containing stream deployment status (leaf nodes are ephemeral).

  • /xd/deployments/jobs - children are nodes containing job deployment status (leaf nodes are ephemeral).

  • /xd/deployments/modules/requested - stores module deployment requests including deployment criteria.

  • /xd/deployments/modules/allocated - stores information describing currently deployed modules.

The admin leader creates a DeploymentSupervisor which registers listeners on /xd/deployments/modules/requested to handle module deployment requests related to stream and job deployments, and xd/containers/ to be notified when containers are added and removed from the cluster. Note that any Admin instance can handle user requests. For example, if you enter the following commands via XD shell,

xd>stream create ticktock --definition "time | log"

This command will invoke a REST service on its connected Admin instance to create a new node /xd/streams/ticktock

xd>stream deploy ticktock

Assuming the deployment is successful, This will result in the creation of several nodes used to manage deployed resources, for example, /xd/deployments/streams/ticktock. The details are discussed in the example below.

If the Admin instance connected to the shell is not the Leader, it will perform no further action. The Leader’s DeploymentSupervisor will attempt to deploy each module in the stream definition, in accordance with the deployment manifest, to an available container, and update the runtime state.

XD Admin Internals

Example

Let’s walk through the simple example above. If you don’t have a Spring XD cluster set up, this example can be easily executed running Spring XD in a single node configuration. The single node application includes an embedded ZooKeeper server by default and allocates a random unused port. The embedded ZooKeeper connect string is reported in the console log for the single node application:

...
13:04:27,016  INFO main util.XdConfigLoggingInitializer - Transport: local
13:04:27,016  INFO main util.XdConfigLoggingInitializer - Hadoop Distro: hadoop22
13:04:27,019  INFO main util.XdConfigLoggingInitializer - Hadoop version detected from classpath: 2.2.0
13:04:27,019  INFO main util.XdConfigLoggingInitializer - Zookeeper at: localhost:31316
...

For our purposes, we will use the ZooKeeper CLI tool to inspect the contents of ZooKeeper nodes reflecting the current state of Spring XD. First, we need to know the port to connect the CLI tool to the embedded server. For convenience, we will assign the ZooKeeper port (5555 in this example) when starting the single node application. From the XD install directory:

$export JAVA_OPTS="-Dzk.embedded.server.port=5555"
$xd/bin/xd-singlenode

In another terminal session, start the ZooKeeper CLI included with ZooKeeper to connect to the embedded server and inspect the contents of the nodes (NOTE: tab completion works) :

$zkCli.sh -server localhost:5555

After some console output, you should see a prompt:

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:5555(CONNECTED) 0]

navigate using the ls command. This will reflect the schema shown in the figure above, the unique container ID will be different for you.

[[zk: localhost:5555(CONNECTED) 0] ls /xd
[deployments, containers, admins, taps, streams, jobs]
[zk: localhost:5555(CONNECTED) 1] ls /xd/streams
[]
[zk: localhost:5555(CONNECTED) 2] ls /xd/deployments
[jobs, streams, modules]
[zk: localhost:5555(CONNECTED) 3] ls /xd/deployments/streams
[]
[zk: localhost:5555(CONNECTED) 4] ls /xd/deployments/modules
[requested, allocated]
[zk: localhost:5555(CONNECTED) 5] ls /xd/deployments/modules/allocated
[2ebbbc9b-63ac-4da4-aa32-e39d69eb546b]
[zk: localhost:5555(CONNECTED) 6] ls /xd/deployments/modules/2ebbbc9b-63ac-4da4-aa32-e39d69eb546b
[]
[zk: localhost:5555(CONNECTED) 7] ls /xd/containers
[2ebbbc9b-63ac-4da4-aa32-e39d69eb546b]
[zk: localhost:5555(CONNECTED) 8]

The above reflects the initial state of Spring XD with a running admin and container instance. Nothing is deployed yet and there are no existing stream or job definitions. Note that xd/deployments/modules/allocated has a persistent child corresponding to the id of the container at xd/containers. If you are running in a distributed configuration and connected to one of the ZooKeeper servers in the same ensemble that Spring XD is connected to, you might see multiple nodes under /xd/containers, and xd/admins. Because the external ensemble persists the state of the Spring XD cluster, you will also see any deployments that existed when the Spring XD cluster was shut down.

Start the XD Shell in a new terminal session and create a stream:

$ shell/bin/xd-shell
 _____                           __   _______
/  ___|          (-)             \ \ / /  _  \
\ `--. _ __  _ __ _ _ __   __ _   \ V /| | | |
 `--. \ '_ \| '__| | '_ \ / _` |  / ^ \| | | |
/\__/ / |_) | |  | | | | | (_| | / / \ \ |/ /
\____/| .__/|_|  |_|_| |_|\__, | \/   \/___/
      | |                  __/ |
      |_|                 |___/
eXtreme Data
{appversion} | Admin Server Target: http://localhost:9393
Welcome to the Spring XD shell. For assistance hit TAB or type "help".
xd:>stream create ticktock --definition "time | log"
Created new stream 'ticktock'
xd:>

Back to the ZK CLI session:

[zk: localhost:5555(CONNECTED) 8] ls /xd/streams
[ticktock]
[zk: localhost:5555(CONNECTED) 9] get /xd/streams/ticktock
{"definition":"time | log"}
cZxid = 0x31
ctime = Mon Jul 14 10:32:33 EDT 2014
mZxid = 0x31
mtime = Mon Jul 14 10:32:33 EDT 2014
pZxid = 0x31
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 27
numChildren = 0
[zk: localhost:5555(CONNECTED) 10]

using the get command on the new stream node, we can see the stream definition represented as JSON, along with some standard ZooKeeper metadata.

Note
ephemeralOwner = 0x0, indicating this is not an ephemeral node. At this point, nothing else should have changed from the initial state.

Now, Using the Spring XD shell, let’s deploy the stream,

xd>stream deploy ticktock
Deployed stream 'ticktock'

and verify with ZooKeeper:

[zk: localhost:5555(CONNECTED) 10] ls /xd/deployments/streams
[ticktock]
[zk: localhost:2181(CONNECTED) 11] ls /xd/streams/deployments/ticktock
[modules, status]
[[zk: localhost:2181(CONNECTED) 12] get /xd/deployments/streams/ticktock/status
{"state":"deployed"}
....
zk: localhost:2181(CONNECTED) 13] ls /xd/deployments/streams/ticktock/modules
[source.time.1.2ebbbc9b-63ac-4da4-aa32-e39d69eb546b, sink.log.1.2ebbbc9b-63ac-4da4-aa32-e39d69eb546b]

Note the deployment state shown for the stream’s status node is deployed, meaning the deployment request was satisfied. Deployment states are discussed in more detail here.

Spring XD decomposes stream deployment requests to individual module deployment requests. Hence, we see that each module in the stream is associated with a container instance. The container instance in this case is the same since there is only one instance in the single node configuration. In a distributed configuration with more than one instance, the stream source and sink will each be deployed to a separate container. The node name itself is of the form <module_type>.<module_name>.<module_sequence_number>.<container_id>, where the sequence number identifies a deployed instance of a module if multiple instances of that module are requested.

[zk: localhost:2181(CONNECTED) 14] ls /xd/deployments/modules/allocated/2ebbbc9b-63ac-4da4-aa32-e39d69eb546b/ticktock.source.time.1
[metadata, status]

The metadata and status nodes are ephemeral nodes which store details about the deployed module. This information is provided to XD shell queries. For example:

xd:>runtime modules
  Module                  Container Id                          Options                                          Deployment Properties
  ----------------------  ------------------------------------  -----------------------------------------------  ---------------------
  ticktock.sink.log.1     2ebbbc9b-63ac-4da4-aa32-e39d69eb546b  {name=ticktock, expression=payload, level=INFO}  {count=1, sequence=1}
  ticktock.source.time.1  2ebbbc9b-63ac-4da4-aa32-e39d69eb546b  {fixedDelay=1, format=yyyy-MM-dd HH:mm:ss}       {count=1, sequence=1}

Module Deployment

This section describes how the Spring XD runtime manages deployment internally. For more details on how to deploy streams and jobs see Deployment.

To process a stream deployment request, the StreamDeploymentListener invokes its ContainerMatcher to select a container instance for each module and records the module’s deployment properties under /xd/deployments/modules/requested/. If a match is found, the StreamDeploymentListener creates a node for the module under /xd/deployments/modules/allocated/<container_id>. The Container includes a DeploymentListener that monitors the container node for new modules to deploy. If the deployment is successful, the Container writes the ephemeral nodes status and metadata under the new module node.

Module Deployment

When a container departs, the ephemeral nodes are deleted so its modules are now undeployed. The ContainerListener responds to the deleted nodes and attempts to redeploy any affected modules to another instance.

Example: Automatic Redeployment

For this example we start two container instances and deploy and simple stream:

xd:>runtime containers
  Container Id                          Host            IP Address   PID    Groups  Custom Attributes
  ------------------------------------  --------------  -----------  -----  ------  -----------------
  0ddf80b9-1e80-44b8-8c12-ecc5c8c32e11  ultrafox.local  192.168.1.6  19222
  6cac85f8-4c52-4861-a225-cdad3675f6c9  ultrafox.local  192.168.1.6  19244

xd:>stream create ticktock --definition "time | log"
Created new stream 'ticktock'
xd:>stream deploy ticktock
Deployed stream 'ticktock'
xd:>runtime modules
  Module                  Container Id                          Options                                          Deployment Properties
  ----------------------  ------------------------------------  -----------------------------------------------  ---------------------
  ticktock.sink.log.1     0ddf80b9-1e80-44b8-8c12-ecc5c8c32e11  {name=ticktock, expression=payload, level=INFO}  {count=1, sequence=1}
  ticktock.source.time.1  6cac85f8-4c52-4861-a225-cdad3675f6c9  {fixedDelay=1, format=yyyy-MM-dd HH:mm:ss}       {count=1, sequence=1}

Now we will kill one of the container processes and observe that the affect module has been redeployed to the remaining container:

xd:>runtime containers
  Container Id                          Host            IP Address   PID    Groups  Custom Attributes
  ------------------------------------  --------------  -----------  -----  ------  -----------------
  6cac85f8-4c52-4861-a225-cdad3675f6c9  ultrafox.local  192.168.1.6  19244

xd:>runtime modules
  Module                  Container Id                          Options                                          Deployment Properties
  ----------------------  ------------------------------------  -----------------------------------------------  ---------------------
  ticktock.sink.log.1     6cac85f8-4c52-4861-a225-cdad3675f6c9  {name=ticktock, expression=payload, level=INFO}  {count=1, sequence=1}
  ticktock.source.time.1  6cac85f8-4c52-4861-a225-cdad3675f6c9  {fixedDelay=1, format=yyyy-MM-dd HH:mm:ss}       {count=1, sequence=1}

Now if we kill the remaining container, we see warnings in the xd-admin log:

14:36:07,593  WARN DeploymentSupervisorCacheListener-0 server.DepartingContainerModuleRedeployer - No containers available for redeployment of log for stream ticktock
14:36:07,599  WARN DeploymentSupervisorCacheListener-0 server.DepartingContainerModuleRedeployer - No containers available for redeployment of time for stream ticktock
⚠️ **GitHub.com Fallback** ⚠️