MessageBus - garyrussell/spring-xd GitHub Wiki

Introduction

This section contains additional information about configuring the Message Bus, including High Availability, SSL, and Error handling.

Rabbit Message Bus High Availability (HA) Configuration

The RabbitMessageBus allows for HA configuration using normal RabbitMQ HA Configuration.

First, use the addresses property in servers.yml to include the host/port for each server in the cluster. See Application Configuration.

By default, queues and exchanges declared by the bus are prefixed with xdbus. (this prefix can be changed as described in Application Configuration).

To configure the entire bus for HA, create a policy:

rabbitmqctl set_policy ha-xdbus "^xdbus\." \'{"ha-mode":"all"}'

Error Handling (Message Delivery Failures)

RabbitMQ Message Bus

Note
The following applies to normally deployed streams. When direct binding between modules is being used, exceptions thrown by the consumer are thrown back to the producer.

When a consuming module (processor, sink) fails to handle a message, the bus will retry delivery based on the module (or default bus) retry configuration. The default configuration will make 3 attempts to deliver the message. The retry configuration can be modified at the bus level (in servers.yml), or for an individual stream/module using the deployment manifest.

When retries are exhausted, by default, messages are discarded. However, using RabbitMQ, you can configure such messages to be routed to a dead-letter exchange/dead letter queue. See the RabbitMQ Documentation for more information.

Note
The following configuration examples assume you are using the default bus prefix used for naming rabbit elements: "xdbus."

Consider a stream: stream create foo --definition "source | processor | sink"

The first pipe (by default) will be backed by a queue named xdbus.foo.0, the second by xdbus.foo.1. Messages are routed to these queues using the default exchange (with routing keys equal to the queue names).

To enable dead lettering just for this stream, first configure a policy:

rabbitmqctl set_policy foo.DLX "^xdbus\.foo\..*" \'{"dead-letter-exchange":"foo.dlx"}' --apply-to queues

To configure dead-lettering for all streams:

rabbitmqctl set_policy DLX "^xdbus\..*" \'{"dead-letter-exchange":"dlx"}' --apply-to queues

The next step is to declare the dead letter exchange, and bind dead letter queues with the appropriate routing keys.

For example, for the second "pipe" in the stream above we might bind a queue foo.sink.dlq to exchange foo.dlx with a routing key xdbus.foo.1 (remember, the original routing key was the queue name).

Now, when the sink fails to handle a message, after the configured retries are exhausted, the failed message will be routed to foo.sink.dlq.

There is no automated mechanism provided to move dead lettered messages back to the bus queue.

Automatic Dead Lettering Queue Binding

Starting with version 1.1, the dead letter queue and binding can be automatically configured by the system. A new property autoBindDLQ has been added; it can be set at the bus level (in servers.yml) or using deployment properties, e.g. --properties module.*.consumer.autoBindDLQ=true for all modules in the stream. When true, the dead letter queue will be declared (if necessary) and bound to a dead letter exchange named xdbus.DLX (again, assuming the default prefix).

In the above example, where we have queues xdbus.foo.0 and xdbus.foo.1, the system will also create xdbus.foo.0.dlq, bound to xdbus.DLX with routing key xdbus.foo.0 and xdbus.foo.1.dlq, bound to xdbus.DLX with routing key xdbus.foo.1.

Note
This just sets up the DLQ and binding, you still need to set a policy to enable dead lettering on the queues, routing failed messages to xdbus.DLX:

rabbitmqctl set_policy DLX "^xdbus\..*" \'{"dead-letter-exchange":"xdbus.DLX"}' --apply-to queues

Redis Message Bus

When Redis is the transport, the failed messages (after retries are exhausted) are LPUSH+ed to a +LIST ERRORS:<stream>.n (e.g. ERRORS:foo.1 in the above example in the RabbitMQ Message Bus section).

This is unconditional; the data in the ERRORS LIST is in "bus" format; again, some external mechanism would be needed to move the data from the ERRORS LIST back to the bus’s foo.1 LIST.

Rabbit Message Bus Secure Sockets Layer (SSL)

If you wish to use SSL for communications with the RabbitMQ server, consult the RabbitMQ SSL Support Documentation.

First configure the broker as described there. The message bus is a client of the broker and supports both of the described configurations for connecting clients (SSL without certificate validation and with certficate validation).

To use SSL without certificate validation, simply set

spring:
  rabbitmq:
    useSSL: true

In application.yml (and set the port(s) in the addresses property appropriately).

To use SSL with certificate validation, set

spring:
  rabbitmq:
    useSSL: true
    sslProperties: file:path/to/secret/ssl.properties

The sslProperties property is a Spring resource (file:, classpath: etc) that points to a properties file, Typically, this file would be secured by the operating system (and readable by the XD container) because it contains security information. Specifically:

keyStore=file:/secret/client/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret

Where the pkcs12 keystore contains the client certificate and the truststore contains the server’s certificate as described in the rabbit documentation. The key/trust store properties are Spring resources.

Note
By default, the rabbit source and sink modules inherit their default configuration from the container, but it can be overridden, either using modules.yml or with specific module definitions.

Rabbit Message Bus Batching and Compression

See RabbitMQ Message Bus Properties for information about batching and compressing messages passing through the bus.

⚠️ **GitHub.com Fallback** ⚠️