Sources - garyrussell/spring-xd GitHub Wiki

Introduction

In this section we will show some variations on input sources. As a prerequisite start the XD Container as instructed in the Getting Started page.

The Sources covered are

Future releases will provide support for other currently available Spring Integration Adapters. For information on how to adapt an existing Spring Integration Adapter for use in Spring XD see the section Creating a Source Module.

The following sections show a mix of Spring XD shell and plain Unix shell commands, so if you are trying them out, you should open two separate terminal prompts, one running the XD shell and one to enter the standard commands for sending HTTP data, creating directories, reading files and so on.

HTTP

To create a stream definition in the server using the XD shell

xd:> stream create --name httptest --definition "http | file" --deploy

Post some data to the http server on the default port of 9000

xd:> http post --target http://localhost:9000 --data "hello world"

See if the data ended up in the file

$ cat /tmp/xd/output/httptest

To send binary data, set the Content-Type header to application/octet-string

$ curl --data-binary @foo.zip -H'Content-Type: application-octet-string' http://localhost:9000

HTTP with options

The http source has the following options:

https

true for https:// (boolean, default: false)

maxContentLength

the maximum allowed content length (int, default: 1048576)

messageConverterClass

the name of a custom MessageConverter class, to convert HttpRequest to Message; must have a constructor with a 'MessageBuilderFactory' parameter (String, default: org.springframework.integration.x.http.NettyInboundMessageConverter)

port

the port to listen to (int, default: 9000)

sslPropertiesLocation

location (resource) of properties containing the location of the pkcs12 keyStore and pass phrase (String, default: classpath:httpSSL.properties)

Here is an example

xd:> stream create --name httptest9020 --definition "http --port=9020 | file" --deploy

Post some data to the new port

xd:> http post --target http://localhost:9020 --data "hello world"
$ cat /tmp/xd/output/httptest9020
hello world
Note
When using https, you need to provide a properties file that references a pkcs12 key store (containing the server certificate(s)) and its passphrase. Setting --https=true enables https:// and the module looks for the SSL properties in resource classpath:httpSSL.properties. This location can be overridden with the --sslPropertiesLocation property. For example:
xd:> stream create --name https9021 --definition "http --port=9021 --https=true --sslPropertiesLocation=file:/secret/ssl.properties | file" --deploy
$ cat /secret/ssl.properties
keyStore=file:/secret/httpSource.p12
keyStore.passPhrase=secret

Since this properties file contains sensitive information, it will typically be secured by the operating system with the XD container process having read access.

SFTP

This source module supports transfer of files through SFTP protocol. While the transfer of files happens from remote directory to local directory, this module creates message from local directory file system.

Options

The sftp source has the following options:

autoCreateLocalDir

if local directory must be auto created if it does not exist (boolean, default: true)

deleteRemoteFiles

delete remote files after transfer (boolean, default: false)

fixedDelay

fixed delay in SECONDS to poll the remote directory (int, default: 1)

host

the remote host to connect to (String, default: localhost)

localDir

set the local directory the remote files are transferred to (String, default: /tmp/xd/output)

passPhrase

the passphrase to use (String, default: ``)

password

the password for the provided user (String, default: ``)

pattern

simple filename pattern to apply to the filter (String, no default)

port

the remote port to connect to (int, default: 22)

privateKey

the private key location (a valid Spring Resource URL) (String, default: ``)

regexPattern

filename regex pattern to apply to the filter (String, no default)

remoteDir

the remote directory to transfer the files from (String, no default)

tmpFileSuffix

extension to use when downloading files (String, default: .tmp)

user

the username to use (String, no default)

Tail

Make sure the default input directory exists

$ mkdir -p /tmp/xd/input

Create an empty file to tail (this is not needed on some platforms such as Linux)

$ touch /tmp/xd/input/tailtest

To create a stream definition using the XD shell

xd:> stream create --name tailtest --definition "tail | file" --deploy

Send some text into the file being monitored

$ echo blah >> /tmp/xd/input/tailtest

See if the data ended up in the file

$ cat /tmp/xd/output/tailtest

Tail with options

The tail source has the following options:

delay

how often (ms) to poll for new lines (forces use of the Apache Tailer, requires nativeOptions='') (long, no default)

fileDelay

on platforms that don't wait for a missing file to appear, how often (ms) to look for the file (long, default: 5000)

fromEnd

whether to tail from the end (true) or from the start (false) of the file (forces use of the Apache Tailer, requires nativeOptions='') (boolean, no default)

lines

the number of lines prior to the end of an existing file to tail; does not apply if 'nativeOptions' is provided (int, default: 0)

name

the absolute path of the file to tail (String, default: /tmp/xd/input/<stream name>)

nativeOptions

options for a native tail command; do not set and use 'end', 'delay', and/or 'reOpen' to use the Apache Tailer (String, no default)

reOpen

whether to reopen the file each time it is polled (forces use of the Apache Tailer, requires nativeOptions='') (boolean, no default)

Here is an example

xd:> stream create --name tailtest --definition "tail --name=/tmp/foo | file --name=bar" --deploy
$ echo blah >> /tmp/foo

$ cat /tmp/xd/output/bar

Tail Status Events

Some platforms, such as linux, send status messages to stderr. The tail module sends these events to a logging adapter, at WARN level; for example…​

[message=tail: cannot open `/tmp/xd/input/tailtest' for reading: No such file or directory, file=/tmp/xd/input/tailtest]
[message=tail: `/tmp/xd/input/tailtest' has become accessible, file=/tmp/xd/input/tailtest]

File

The file source provides the contents of a File as a byte array by default but may be configured to provide the file reference itself.

To log the contents of a file create a stream definition using the XD shell

xd:> stream create --name filetest --definition "file | log" --deploy

The file source by default will look into a directory named after the stream, in this case /tmp/xd/input/filetest

Note the above will log the raw bytes. For text files, it is normally desirable to output the contents as plain text. To do this, set the outputType parameter:

xd:> stream create --name filetest --definition "file --outputType=text/plain | log" --deploy

For more details on the use of the outputType parameter see Type Conversion

Copy a file into the directory /tmp/xd/input/filetest and observe its contents being logged in the XD Container.

File with options

The file source has the following options:

dir

the absolute path to the directory to monitor for files (String, default: /tmp/xd/input/<stream name>)

fixedDelay

the fixed delay polling interval specified in seconds (int, default: 5)

pattern

a filter expression (Ant style) to accept only files that match the pattern (String, default: )*

preventDuplicates

whether to prevent the same file from being processed twice (boolean, default: true)

ref

set to true to output the File object itself (boolean, default: false)

The ref option is useful in some cases in which the file contents are large and it would be more efficient to send the file path.

Mail

Spring XD provides a source module for receiving emails, named mail. Depending on the protocol used, in can work by polling or receive mails as they become available.

Let’s see an example:

xd:> stream create --name mailstream --definition "mail --host=imap.gmail.com [email protected] --password=secret | file" --deploy

Then send an email to yourself and you should see it appear inside a file at /tmp/xd/output/mailstream

The full list of options for the mail source is below:

The mail source has the following options:

charset

the charset used to transform the body of the incoming emails to Strings (String, default: UTF-8)

delete

whether to delete the emails once they’ve been fetched (boolean, default: true)

expression

a SpEL expression which filters which mail messages will be processed (non polling imap only) (String, default: true)

fixedDelay

the polling interval used for looking up messages (s) (int, default: 60)

folder

the folder to take emails from (String, default: INBOX)

host

the hostname of the mail server (String, default: localhost)

markAsRead

whether to mark emails as read once they’ve been fetched (boolean, default: false)

password

the password to use to connect to the mail server (String, no default)

port

the port of the mail server (int, default: 25)

protocol

the protocol to use to retrieve messages (MailProtocol, default: imap, possible values: imap,imaps,pop3,pop3s)

usePolling

whether to use polling or not (no polling works with imap(s) only) (boolean, default: false)

username

the username to use to connect to the mail server (String, no default)

Warning
Of special attention are the markAsRead and delete options, which by default will delete the emails once they are consumed. It is hard to come up with a sensible default option for this (please refer to the Spring Integration documentation section on mail handling for a discussion about this), so just be aware that the default for XD is to delete incoming messages.

The twittersearch source runs a continuous query against Twitter.

The twittersearch source has the following options:

connectTimeout

the connection timeout for making a connection to Twitter (ms) (int, default: 5000)

consumerKey

a consumer key issued by twitter (String, no default)

consumerSecret

consumer secret corresponding to the consumer key (String, no default)

geocode

geo-location given as latitude,longitude,radius. e.g., '37.781157,-122.398720,1mi' (String, default: ``)

includeEntities

whether to include entities such as urls, media and hashtags (boolean, default: true)

language

language code e.g. 'en' (String, default: ``)

query

the query string (String, default: ``)

readTimeout

the read timeout for the underlying URLConnection to the twitter stream (ms) (int, default: 9000)

resultType

result type: recent, popular, or mixed (ResultType, default: mixed, possible values: mixed,recent,popular)

For information on how to construct a query, see the Search API v1.1.

To get a consumerKey and consumerSecret you need to register a twitter application. If you don’t already have one set up, you can create an app at the Twitter Developers site to get these credentials.

Tip
For both twittersearch and twitterstream you can put these keys in a module properties file instead of supplying them in the stream definition. If both sources share the same credentials, it is easiest to configure the required credentials in config/modules/modules.yml. Alternately, each module has its own properties file. For twittersearch, the file would be config/modules/source/twittersearch/twittersearch.properties.

To create and deploy a stream definition in the server using the XD shell:

xd:> stream create --name springone2gx --definition "twittersearch --query='#springone2gx' | file" --deploy

Let the twittersearch run for a little while and then check to see if some data ended up in the file

$ cat /tmp/xd/output/springone2gx
Note
Both twittersearch and twitterstream emit JSON in the native Twitter format.

Twitter Stream

This source ingests data from Twitter’s streaming API v1.1. It uses the sample and filter stream endpoints rather than the full "firehose" which needs special access. The endpoint used will depend on the parameters you supply in the stream definition (some are specific to the filter endpoint).

You need to supply all keys and secrets (both consumer and accessToken) to authenticate for this source, so it is easiest if you just add these to XD_HOME/config/modules/modules.yml or XD_HOME/config/modules/source/twitterstream/twitterstream.properties file.

Stream creation is then straightforward:

xd:> stream create --name tweets --definition "twitterstream | file" --deploy

The twitterstream source has the following options:

accessToken

a valid OAuth access token (String, no default)

accessTokenSecret

an OAuth secret corresponding to the access token (String, no default)

connectTimeout

the connection timeout for making a connection to Twitter (ms) (int, default: 5000)

consumerKey

a consumer key issued by twitter (String, no default)

consumerSecret

consumer secret corresponding to the consumer key (String, no default)

delimited

set to true to get length delimiters in the stream data (boolean, default: false)

discardDeletes

set to discard 'delete' events (boolean, default: true)

filterLevel

controls which tweets make it through to the stream: none,low,or medium (FilterLevel, default: none, possible values: none,low,medium)

follow

comma delimited set of user ids whose tweets should be included in the stream (String, default: ``)

language

language code e.g. 'en' (String, default: ``)

locations

comma delimited set of latitude/longitude pairs to include in the stream (String, default: ``)

readTimeout

the read timeout for the underlying URLConnection to the twitter stream (ms) (int, default: 9000)

stallWarnings

set to true to enable stall warnings (boolean, default: false)

track

comma delimited set of terms to include in the stream (String, default: ``)

Note: The options available are pretty much the same as those listed in the Twitter API docs and unless otherwise stated, the accepted formats are the same.

Note
Both twittersearch and twitterstream emit JSON in the native Twitter format.

GemFire Source

This source configures a client cache and client region, along with the necessary subscriptions enabled, in the XD container process along with a Spring Integration GemFire inbound channel adapter, backed by a CacheListener that outputs messages triggered by an external entry event on the region. By default the payload contains the updated entry value, but may be controlled by passing in a SpEL expression that uses the EntryEvent as the evaluation context.

Options

The gemfire source has the following options:

cacheEventExpression

an optional SpEL expression referencing the event (String, default: newValue)

host

host name of the cache server or locator (if useLocator=true). May be a comma delimited list (String, no default)

port

port of the cache server or locator (if useLocator=true). May be a comma delimited list (String, no default)

regionName

the name of the region for which events are to be monitored (String, default: <stream name>)

useLocator

indicates whether a locator is used to access the cache server (boolean, default: false)

Example

Use of the gemfire source requires an external process (or a separate stream) that creates or updates entries in a GemFire region configured for a cache server. Such events may feed a Spring XD stream. To support such a stream, the Spring XD container must join a GemFire distributed client-server grid as a client, creating a client region corresponding to an existing region on a cache server. The client region registers a cache listener via the Spring Integration GemFire inbound channel adapter. The client region and pool are configured for a subscription on all keys in the region.

The following example creates two streams: One to write http messages to a Gemfire region named Stocks, and another to listen for cache events and record the updates to a file. This works with the Cache Server and sample configuration included with the Spring XD distribution:

xd:> stream create --name gftest --definition "gemfire --regionName=Stocks | file" --deploy
xd:> stream create --name stocks --definition "http --port=9090 | gemfire-json-server --regionName=Stocks --keyExpression=payload.getField('symbol')" --deploy

Now send some messages to the stocks stream.

xd:> http post --target http://localhost:9090 --data {"symbol":"FAKE","price":73}
xd:> http post --target http://localhost:9090 --data {"symbol":"FAKE","price":78}
xd:> http post --target http://localhost:9090 --data {"symbol":"FAKE","price":80}
Note
Avoid spaces in the JSON when using the shell to post data

As updates are posted to the cache you should see them captured in the output file:

$ cat /tmp/xd/output/gftest.out

{"symbol":"FAKE","price":73}
{"symbol":"FAKE","price":78}
{"symbol":"FAKE","price":80}
Note
The useLocator option is intended for integration with an existing GemFire installation in which the cache servers are configured to use locators in accordance with best practice. GemFire supports configuration of multiple locators (or direct server connections) and this is specified by supplying comma-delimited values for the host and port options. You may specify a single value for either of these options otherwise each value must contain the same size list. The following are examples are valid for multiple connection addresses:
gemfire --host=myhost --port=10334,10335
gemfire --host=myhost1,myhost2 --port=10334
gemfire --host=myhost1,myhost2,myhost3 --port=10334,10335,10336

The last example creates connections to myhost1:10334, myhost2:10335, myhost3:10336

Note
You may also configure default Gemfire connection settings for all gemfire modules in config\modules.yml:
gemfire:
   useLocator: true
   host: myhost1,myhost2
   port: 10334
Tip
If you are deploying on Java 7 or earlier and need to deploy more than 4 Gemfire modules be sure to increase the permsize of the singlenode or container. i.e. JAVA_OPTS="-XX:PermSize=256m"

Launching the XD GemFire Server

This source requires a cache server to be running in a separate process and its host and port, or a locator host and port must be configured. The XD distribution includes a GemFire server executable suitable for development and test purposes. This is a Java main class that runs with a Spring configured cache server. The configuration is passed as a command line argument to the server’s main method. The configuration includes a cache server port and one or more configured region. XD includes a sample cache configuration called cq-demo. This starts a server on port 40404 and creates a region named Stocks. A Logging cache listener is configured for the region to log region events.

Run Gemfire cache server by changing to the gemfire/bin directory and execute

$ ./gemfire-server ../config/cq-demo.xml

GemFire Continuous Query

Continuous query allows client applications to create a GemFire query using Object Query Language(OQL) and register a CQ listener which subscribes to the query and is notified every time the query’s result set changes. The gemfire_cq source registers a CQ which will post CQEvent messages to the stream.

Options

The gemfire-cq source has the following options:

host

host name of the cache server or locator (if useLocator=true). May be a comma delimited list (String, no default)

port

port of the cache server or locator (if useLocator=true). May be a comma delimited list (String, no default)

query

the query string in Object Query Language (OQL) (String, no default)

useLocator

indicates whether a locator is used to access the cache server (boolean, default: false)

The example is similar to that presented for the gemfire source above, and requires an external cache server as described in the above section. In this case the query provides a finer filter on data events. In the example below, the cqtest stream will only receive events matching a single ticker symbol, whereas the gftest stream example above will receive updates to every entry in the region.

xd:> stream create --name stocks --definition "http --port=9090 | gemfire-json-server --regionName=Stocks --keyExpression=payload.getField('symbol')" --deploy
xd:> stream create --name cqtest --definition "gemfire-cq --query='Select * from /Stocks where symbol=''FAKE''' | file" --deploy

Now send some messages to the stocks stream.

xd:> http post --target http://localhost:9090 --data {"symbol":"FAKE","price":73}
xd:> http post --target http://localhost:9090 --data {"symbol":"FAKE","price":78}
xd:> http post --target http://localhost:9090 --data {"symbol":"FAKE","price":80}

The cqtest stream is now listening for any stock quote updates for the ticker symbol FAKE. As updates are posted to the cache you should see them captured in the output file:

$ cat /tmp/xd/output/cqtest.out

{"symbol":"FAKE","price":73}
{"symbol":"FAKE","price":78}
{"symbol":"FAKE","price":80}

Syslog

Three syslog sources are provided: reactor-syslog, syslog-udp, and syslog-tcp. The reactor-syslog adapter uses tcp and builds upon the functionality available in the Reactor project and provides improved throughput over the syslog-tcp adapter.

The reactor-syslog source has the following options:

port

the port on which the system will listen for syslog messages (int, default: 5140)

The syslog-udp source has the following options:

port

the port on which to listen (int, default: 5140)

rfc

the format of the syslog (String, default: 3164)

The syslog-tcp source has the following options:

nio

use nio (recommend false for a small number of senders, true for many) (boolean, default: false)

port

the port on which to listen (int, default: 5140)

rfc

the format of the syslog (String, default: 3164)

To create a stream definition (using shell command)

xd:> stream create --name syslogtest --definition "reactor-syslog --port=5140 | file" --deploy

or

xd:> stream create --name syslogtest --definition "syslog-udp --port=5140 | file" --deploy

or

xd:> stream create --name syslogtest --definition "syslog-tcp --port=5140 | file" --deploy

(--port is not required when using the default 5140)

Send a test message to the syslog

logger -p local3.info -t TESTING "Test Syslog Message"

See if the data ended up in the file

$ cat /tmp/xd/output/syslogtest

Refer to your syslog documentation to configure the syslog daemon to forward syslog messages to the stream; some examples are:

UDP - Mac OSX (syslog.conf) and Ubuntu (rsyslog.conf)

*.*	@localhost:5140

TCP - Ubuntu (rsyslog.conf)

$ModLoad omfwd
*.*	@@localhost:5140

Restart the syslog daemon after reconfiguring.

TCP

The tcp source acts as a server and allows a remote party to connect to XD and submit data over a raw tcp socket.

To create a stream definition in the server, use the following XD shell command

xd:> stream create --name tcptest --definition "tcp | file" --deploy

This will create the default TCP source and send data read from it to the tcptest file.

TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of decoders are available, the default being 'CRLF' which is compatible with Telnet.

$ telnet localhost 1234
Trying ::1...
Connected to localhost.
Escape character is '^]'.
foo
^]

telnet> quit
Connection closed.

See if the data ended up in the file

$ cat /tmp/xd/output/tcptest

By default, the TCP module will emit a byte[]; to convert to a String, add --outputType=text/plain to the module definition.

TCP with options

The tcp source has the following options:

bufferSize

the size of the buffer (bytes) to use when encoding/decoding (int, default: 2048)

charset

the charset used when converting from bytes to String (String, default: UTF-8)

decoder

the decoder to use when receiving messages (Encoding, default: CRLF, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)

nio

whether or not to use NIO (boolean, default: false)

port

the port on which to listen (int, default: 1234)

reverseLookup

perform a reverse DNS lookup on the remote IP Address (boolean, default: false)

socketTimeout

the timeout (ms) before closing the socket when no data is received (int, default: 120000)

useDirectBuffers

whether or not to use direct buffers (boolean, default: false)

Available Decoders

Text Data
CRLF (default)

text terminated by carriage return (0x0d) followed by line feed (0x0a)

LF

text terminated by line feed (0x0a)

NULL

text terminated by a null byte (0x00)

STXETX

text preceded by an STX (0x02) and terminated by an ETX (0x03)

Text and Binary Data
RAW

no structure - the client indicates a complete message by closing the socket

L1

data preceded by a one byte (unsigned) length field (supports up to 255 bytes)

L2

data preceded by a two byte (unsigned) length field (up to 216-1 bytes)

L4

data preceded by a four byte (signed) length field (up to 231-1 bytes)

Examples

The following examples all use echo to send data to netcat which sends the data to the source.

The echo options -en allows echo to interpret escape sequences and not send a newline.

CRLF Decoder
xd:> stream create --name tcptest --definition "tcp | file" --deploy

This uses the default (CRLF) decoder and port 1234; send some data

$ echo -en 'foobar\r\n' | netcat localhost 1234

See if the data ended up in the file

$ cat /tmp/xd/output/tcptest
LF Decoder
xd:> stream create --name tcptest2 --definition "tcp --decoder=LF --port=1235 | file" --deploy
$ echo -en 'foobar\n' | netcat localhost 1235
$ cat /tmp/xd/output/tcptest2
NULL Decoder
xd:> stream create --name tcptest3 --definition "tcp --decoder=NULL --port=1236 | file" --deploy
$ echo -en 'foobar\x00' | netcat localhost 1236
$ cat /tmp/xd/output/tcptest3
STXETX Decoder
xd:> stream create --name tcptest4 --definition "tcp --decoder=STXETX --port=1237 | file" --deploy
$ echo -en '\x02foobar\x03' | netcat localhost 1237
$ cat /tmp/xd/output/tcptest4
RAW Decoder
xd:> stream create --name tcptest5 --definition "tcp --decoder=RAW --port=1238 | file" --deploy
$ echo -n 'foobar' | netcat localhost 1238
$ cat /tmp/xd/output/tcptest5
L1 Decoder
xd:> stream create --name tcptest6 --definition "tcp --decoder=L1 --port=1239 | file" --deploy
$ echo -en '\x06foobar' | netcat localhost 1239
$ cat /tmp/xd/output/tcptest6
L2 Decoder
xd:> stream create --name tcptest7 --definition "tcp --decoder=L2 --port=1240 | file" --deploy
$ echo -en '\x00\x06foobar' | netcat localhost 1240
$ cat /tmp/xd/output/tcptest7
L4 Decoder
xd:> stream create --name tcptest8 --definition "tcp --decoder=L4 --port=1241 | file" --deploy
$ echo -en '\x00\x00\x00\x06foobar' | netcat localhost 1241
$ cat /tmp/xd/output/tcptest8

Binary Data Example

xd:> stream create --name tcptest9 --definition "tcp --decoder=L1 --port=1242 | file --binary=true" --deploy

Note that we configure the file sink with binary=true so that a newline is not appended.

$ echo -en '\x08foo\x00bar\x0b' | netcat localhost 1242
$ hexdump -C /tmp/xd/output/tcptest9
00000000  66 6f 6f 00 62 61 72 0b                           |foo.bar.|
00000008

TCP Client

The tcp-client source module uses raw tcp sockets, as does the tcp module but contrary to the tcp module, acts as a client. Whereas the tcp module will open a listening socket and wait for connections from a remote party, the tcp-client will initiate the connection to a remote server and emit as messages what that remote server sends over the wire. As an optional feature, the tcp-client can itself emit messages to the remote server, so that a simple conversation can take place.

TCP Client options

The tcp-client source has the following options:

bufferSize

the size of the buffer (bytes) to use when encoding/decoding (int, default: 2048)

charset

the charset used when converting from bytes to String (String, default: UTF-8)

close

whether to close the socket after each message (boolean, default: false)

decoder

the decoder to use when receiving messages (Encoding, default: CRLF, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)

encoder

the encoder to use when sending messages (Encoding, default: CRLF, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)

expression

a SpEL expression used to transform messages (String, default: payload.toString())

fixedDelay

the rate at which stimulus messages will be emitted (seconds) (int, default: 5)

host

the remote host to connect to (String, default: localhost)

nio

whether or not to use NIO (boolean, default: false)

port

the port on the remote host to connect to (int, default: 1234)

propertiesLocation

the path of a properties file containing custom script variable bindings (String, no default)

reverseLookup

perform a reverse DNS lookup on the remote IP Address (boolean, default: false)

script

reference to a script used to process messages (String, no default)

socketTimeout

the timeout (ms) before closing the socket when no data is received (int, default: 120000)

useDirectBuffers

whether or not to use direct buffers (boolean, default: false)

variables

variable bindings as a comma delimited string of name-value pairs, e.g., 'foo=bar,baz=car' (String, no default)

Implementing a simple conversation

That "stimulus" counter concept bears some explanation. By default, the module will emit (at interval set by fixedDelay) an incrementing number, starting at 1. Given that the default is to use an expression of payload.toString(), this results in the module sending 1, 2, 3, …​ to the remote server.

By using another expression, or more certainly a script, one can implement a simple conversation, assuming it is time based. As an example, let’s assume we want to join some kind of chat server where one first needs to authenticate, then specify which rooms to join. Lastly, all clients are supposed to send some keepalive commands to make sure that the connection is open.

The following groovy script could be used to that effect:

def commands = ['', // index 0 is not used
'LOGIN user=johndoe', // first command sent
'JOIN weather',
'JOIN news',
'JOIN gossip'
]


// payload will contain an incrementing counter, starting at 1
if (commands.size > payload)
  return commands[payload] + "\n"
else
  return "PING\n"  // send keep alive after 4th 'real' command

Reactor IP

The reactor-ip source acts as a server and allows a remote party to connect to XD and submit data over a raw TCP or UDP socket. The reactor-ip source differs from the standard tcp source in that it is based on the Reactor Project and can be configured to use the LMAX Disruptor RingBuffer library allowing for extremely high ingestion rates, e.g. ~ 1M/sec.

To create a stream definition use the following XD shell command

xd:> stream create --name tcpReactor --definition "reactor-ip | file" --deploy

This will create the reactor TCP source and send data read from it to the file named tcpReactor.

The reactor-ip source has the following options:

codec

codec used to transcode data (String, default: string)

dispatcher

type of Reactor Dispatcher to use (String, default: shared)

framing

method of framing the data (String, default: linefeed)

host

host to bind the server to (String, default: 0.0.0.0)

lengthFieldLength

byte precision of the number used in the length field (int, default: 4)

port

port to bind the server to (int, default: 3000)

transport

whether to use TCP or UDP as a transport (String, default: tcp)

RabbitMQ

The "rabbit" source enables receiving messages from RabbitMQ.

The following example shows the default settings.

Configure a stream:

xd:> stream create --name rabbittest --definition "rabbit | file --binary=true" --deploy

This receives messages from a queue named rabbittest and writes them to the default file sink (/tmp/xd/output/rabbittest.out). It uses the default RabbitMQ broker running on localhost, port 5672.

The queue(s) must exist before the stream is deployed. We do not create the queue(s) automatically. However, you can easily create a Queue using the RabbitMQ web UI. Then, using that same UI, you can navigate to the "rabbittest" Queue and publish test messages to it.

Notice that the file sink has --binary=true; this is because, by default, the data emitted by the source will be bytes. This can be modified by setting the content_type property on messages to text/plain. In that case, the source will convert the message to a String; you can then omit the --binary=true and the file sink will then append a newline after each message.

To destroy the stream, enter the following at the shell prompt:

xd:> stream destroy --name rabbittest

RabbitMQ with Options

The rabbit source has the following options:

ackMode

the acknowledge mode (AUTO, NONE, MANUAL) (String, default: AUTO)

addresses

a comma separated list of 'host[:port]' addresses (String, default: ${spring.rabbitmq.addresses})

concurrency

the minimum number of consumers (int, default: 1)

converterClass

the class name of the message converter (String, default: org.springframework.amqp.support.converter.SimpleMessageConverter)

enableRetry

enable retry; when retries are exhausted the message will be rejected; message disposition will depend on dead letter configuration (boolean, default: false)

initialRetryInterval

initial interval between retries (int, default: 1000)

mappedRequestHeaders

request message header names to be propagated to/from the adpater/gateway (String, default: STANDARD_REQUEST_HEADERS)

maxAttempts

maximum delivery attempts (int, default: 3)

maxConcurrency

the maximum number of consumers (int, default: 1)

maxRetryInterval

maximum retry interval (int, default: 30000)

password

the password to use to connect to the broker (String, default: ${spring.rabbitmq.password})

prefetch

the prefetch size (int, default: 1)

queues

the queue(s) from which messages will be received (String, default: <stream name>)

requeue

whether rejected messages will be requeued by default (boolean, default: true)

retryMultiplier

retry interval multiplier (double, default: 2.0)

sslPropertiesLocation

resource containing SSL properties (String, default: ${spring.rabbitmq.sslProperties})

transacted

true if the channel is to be transacted (boolean, default: false)

txSize

the number of messages to process before acking (int, default: 1)

useSSL

true if SSL should be used for the connection (String, default: ${spring.rabbitmq.useSSL})

username

the username to use to connect to the broker (String, default: ${spring.rabbitmq.username})

vhost

the RabbitMQ virtual host to use (String, default: ${spring.rabbitmq.virtual_host})

See the RabbitMQ MessageBus Documentation for more information about SSL configuration.

A Note About Retry

Note
With the default ackMode (AUTO) and requeue (true) options, failed message deliveries will be retried indefinitely. Since there is not much processing in the rabbit source, the risk of failure in the source itself is small. However, when using the LocalMessageBus or Direct Binding, exceptions in downstream modules will be thrown back to the source. Setting requeue to false will cause messages to be rejected on the first attempt (and possibly sent to a Dead Letter Exchange/Queue if the broker is so configured). The enableRetry option allows configuration of retry parameters such that a failed message delivery can be retried and eventually discarded (or dead-lettered) when retries are exhausted. The delivery thread is suspended during the retry interval(s). Retry options are enableRetry, maxAttempts, initialRetryInterval, retryMultiplier, and maxRetryInterval. Message deliveries failing with a MessageConversionException (perhaps when using a custom converterClassName) are never retried; the assumption being that if a message could not be converted on the first attempt, subsequent attempts will also fail. Such messages are discarded (or dead-lettered).

JMS

The "jms" source enables receiving messages from JMS.

The following example shows the default settings.

Configure a stream:

xd:> stream create --name jmstest --definition "jms | file" --deploy

This receives messages from a queue named jmstest and writes them to the default file sink (/tmp/xd/output/jmstest). It uses the default ActiveMQ broker running on localhost, port 61616.

To destroy the stream, enter the following at the shell prompt:

xd:> stream destroy --name jmstest

To test the above stream, you can use something like the following…​

public class Broker {

	public static void main(String[] args) throws Exception {
		BrokerService broker = new BrokerService();
		broker.setBrokerName("broker");
		String brokerURL = "tcp://localhost:61616";
		broker.addConnector(brokerURL);
		broker.start();
		ConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL);
		JmsTemplate template = new JmsTemplate(cf);
		while (System.in.read() >= 0) {
			template.convertAndSend("jmstest", "testFoo");
		}
	}
}

and tail -f /tmp/xd/output/jmstest

Run this as a Java application; each time you hit <enter> in the console, it will send a message to queue jmstest.

The out of the box configuration is setup to use ActiveMQ. To use another JMS provider you will need to update a few files in the XD distribution. There are sample files for HornetMQ in the distribution as an example for you to follow. You will also need to add the appropriate libraries for your provider in the JMS module lib directory or in the main XD lib directory.

JMS with Options

The jms source has the following options:

clientId

an identifier for the client, to be associated with a durable topic subscription (String, no default)

destination

the destination name from which messages will be received (String, default: <stream name>)

durableSubscription

when true, indicates the subscription to a topic is durable (boolean, default: false)

provider

the JMS provider (String, default: activemq)

pubSub

when true, indicates that the destination is a topic (boolean, default: false)

subscriptionName

a name that will be assigned to the topic subscription (String, no default)

Note
the selected broker requires an infrastructure configuration file jms-<provider>-infrastructure-context.xml in modules/common. This is used to declare any infrastructure beans needed by the provider. See the default (jms-activemq-infrastructure-context.xml) for an example. Typically, all that is required is a ConnectionFactory. The activemq provider uses a properties file jms-activemq.properties which can be found in the config directory. This contains the broker URL.

Time

The time source will simply emit a String with the current time every so often.

The time source has the following options:

fixedDelay

how often to emit a message, expressed in seconds (int, default: 1)

format

how to render the current time, using SimpleDateFormat (String, default: yyyy-MM-dd HH:mm:ss)

initialDelay

an initial delay when using a fixed delay trigger, expressed in TimeUnits (seconds by default) (Integer, default: 0)

timeUnit

the time unit for the fixed delay (String, default: SECONDS)

MQTT

The mqtt source connects to an mqtt server and receives telemetry messages.

Configure a stream:

xd:> stream create tcptest --definition "mqtt --url='tcp://localhost:1883' --topics='xd.mqtt.test' | log" --deploy

If you wish to use the MQTT Source defaults you can execute the command as follows:

xd:> stream create tcptest --definition "mqtt | log" --deploy

Options

The mqtt source has the following options:

binary

true to leave the payload as bytes (boolean, default: false)

charset

the charset used to convert bytes to String (when binary is false) (String, default: UTF-8)

cleanSession

whether the client and server should remember state across restarts and reconnects (boolean, default: true)

clientId

identifies the client (String, default: xd.mqtt.client.id.src)

connectionTimeout

the connection timeout in seconds (int, default: 30)

keepAliveInterval

the ping interval in seconds (int, default: 60)

password

the password to use when connecting to the broker (String, default: guest)

persistence

'memory' or 'file' (String, default: memory)

persistenceDirectory

file location when using 'file' persistence (String, default: /tmp/paho)

qos

the qos; a single value for all topics or a comma-delimited list to match the topics (String, default: 0)

topics

the topic(s) (comma-delimited) to which the source will subscribe (String, default: xd.mqtt.test)

url

location of the mqtt broker(s) (comma-delimited list) (String, default: tcp://localhost:1883)

username

the username to use when connecting to the broker (String, default: guest)

Note
The defaults are set up to connect to the RabbitMQ MQTT adapter on localhost.

Stdout Capture

There isn’t actually a source named "stdin" but it is easy to capture stdin by redirecting it to a tcp source. For example if you wanted to capture the output of a command, you would first create the tcp stream, as above, using the appropriate sink for your requirements:

xd:> stream create tcpforstdout --definition "tcp --decoder=LF | log" --deploy

You can then capture the output from commands using the netcat command:

$ cat mylog.txt | netcat localhost 1234

Kafka

This source module ingests data from Kafka topic configuration.

The kafka source has the following options:

autoOffsetReset

strategy to reset the offset when there is no initial offset in ZK or if an offset is out of range (String, default: smallest)

encoding

string encoder to translate bytes into string (String, default: UTF8)

fetchMaxBytes

max messages to attempt to fetch for each topic-partition in each fetch request (int, default: 307200)

fetchMaxWait

max wait time before answering the fetch request (int, default: 100)

fetchMinBytes

the minimum amount of data the server should return for a fetch request (int, default: 1)

groupId

kafka consumer configuration group id (String, default: <stream name>)

initialOffsets

initial offsets (String, default: ``)

offsetStorage

offset storage strategy (OffsetStorageStrategy, default: kafka, possible values: inmemory,redis,kafka)

partitions

kafka partitions (String, default: ``)

socketBufferBytes

socket receive buffer for network requests (int, default: 65536)

socketTimeout

sock timeout for network requests in milliseconds (int, default: 30000)

streams

number of streams in the topic (int, default: 1)

topic

kafka topic name (String, default: <stream name>)

zkconnect

zookeeper connect string (String, default: localhost:2181/kafka)

zkconnectionTimeout

the max time the client waits to connect to ZK in milliseconds (int, default: 6000)

zksessionTimeout

zookeeper session timeout in milliseconds (int, default: 6000)

zksyncTime

how far a ZK follower can be behind a ZK leader in milliseconds (int, default: 2000)

Configure a stream:

xd:> stream create myKafkaSource --definition "kafka --zkconnect=localhost:2181 --topic=mytopic | log" --deploy

JDBC Source

This source module supports the ability to ingest data directly from various databases. It does this by querying the database and sending the results as messages to the stream.

Configure a stream with a jdbc source using a query:

xd:> stream create foo --definition "jdbc --fixedDelay=1 --split=1 --url=jdbc:hsqldb:hsql://localhost:9101/mydb --query='select * from testfoo' |log" --deploy

In the example above the user will be polling the testfoo table to retrieve all the rows in the table once a second until the stream is undeployed or destroyed.

Configure a stream with a jdbc source using a query and update:

xd:> stream create foo --definition "jdbc --fixedDelay=1 --split=1 --url=jdbc:hsqldb:hsql://localhost:9101/mydb --query='select * from testfoo where tag = 0' --update='update testfoo set tag=1 where fooid in (:fooid)'|log" --deploy

In the example above the user will be polling the testfoo table to retrieve rows in the table that have a "tag" of zero. The update will set the value of tag to 1 for the rows that were retrieved, thus rows that have already been retrieved will not included in future queries.

The jdbc source has the following options:

abandonWhenPercentageFull

connections that have timed out wont get closed and reported up unless the number of connections in use are above the percentage (int, default: 0)

alternateUsernameAllowed

uses an alternate user name if connection fails (boolean, default: false)

connectionProperties

connection properties that will be sent to our JDBC driver when establishing new connections (String, no default)

driverClassName

the JDBC driver to use (String, no default)

fairQueue

set to true if you wish that calls to getConnection should be treated fairly in a true FIFO fashion (boolean, default: true)

fixedDelay

how often to poll for new messages (s) (int, default: 5)

initSQL

custom query to be run when a connection is first created (String, no default)

initialSize

initial number of connections that are created when the pool is started (int, default: 0)

jdbcInterceptors

semicolon separated list of classnames extending org.apache.tomcat.jdbc.pool.JdbcInterceptor (String, no default)

jmxEnabled

register the pool with JMX or not (boolean, default: true)

logAbandoned

flag to log stack traces for application code which abandoned a Connection (boolean, default: false)

maxActive

maximum number of active connections that can be allocated from this pool at the same time (int, default: 100)

maxAge

time in milliseconds to keep this connection (int, default: 0)

maxIdle

maximum number of connections that should be kept in the pool at all times (int, default: 100)

maxRowsPerPoll

max numbers of rows to process for each poll (int, default: 0)

maxWait

maximum number of milliseconds that the pool will wait for a connection (int, default: 30000)

minEvictableIdleTimeMillis

minimum amount of time an object may sit idle in the pool before it is eligible for eviction (int, default: 60000)

minIdle

minimum number of established connections that should be kept in the pool at all times (int, default: 10)

password

the JDBC password (Password, no default)

query

an SQL select query to execute to retrieve new messages when polling (String, no default)

removeAbandoned

flag to remove abandoned connections if they exceed the removeAbandonedTimout (boolean, default: false)

removeAbandonedTimeout

timeout in seconds before an abandoned connection can be removed (int, default: 60)

split

whether to split the SQL result as individual messages (boolean, default: true)

suspectTimeout

this simply logs the warning after timeout, connection remains (int, default: 0)

testOnBorrow

indication of whether objects will be validated before being borrowed from the pool (boolean, default: false)

testOnReturn

indication of whether objects will be validated before being returned to the pool (boolean, default: false)

testWhileIdle

indication of whether objects will be validated by the idle object evictor (boolean, default: false)

timeBetweenEvictionRunsMillis

number of milliseconds to sleep between runs of the idle connection validation/cleaner thread (int, default: 5000)

update

an SQL update statement to execute for marking polled messages as 'seen' (String, no default)

url

the JDBC URL for the database (String, no default)

useEquals

true if you wish the ProxyConnection class to use String.equals (boolean, default: true)

username

the JDBC username (String, no default)

validationInterval

avoid excess validation, only run validation at most at this frequency - time in milliseconds (long, default: 30000)

validationQuery

sql query that will be used to validate connections from this pool (String, no default)

validatorClassName

name of a class which implements the org.apache.tomcat.jdbc.pool.Validator (String, no default)

⚠️ **GitHub.com Fallback** ⚠️