Processors - garyrussell/spring-xd GitHub Wiki

Introduction

This section will cover the processors available out-of-the-box with Spring XD. As a prerequisite, start the XD Container as instructed in the Getting Started page.

The Processors covered are

See the section Creating a Processor Module for information on how to create custom processor modules.

Filter

Use the filter module in a stream to determine whether a Message should be passed to the output channel.

The filter processor has the following options:

expression

a SpEL expression used to transform messages (String, default: payload.toString())

propertiesLocation

the path of a properties file containing custom script variable bindings (String, no default)

script

reference to a script used to process messages (String, no default)

variables

variable bindings as a comma delimited string of name-value pairs, e.g., 'foo=bar,baz=car' (String, no default)

Filter with SpEL expression

The simplest way to use the filter processor is to pass a SpEL expression when creating the stream. The expression should evaluate the message and return true or false. For example:

xd:> stream create --name filtertest --definition "http | filter --expression=payload=='good' | log" --deploy

This filter will only pass Messages to the log sink if the payload is the word "good". Try sending "good" to the HTTP endpoint and you should see it in the XD log:

xd:> http post --target http://localhost:9000 --data "good"

Alternatively, if you send the word "bad" (or anything else), you shouldn’t see the log entry.

Filter using jsonPath evaluation

As part of the SpEL expression you can make use of the pre-registered JSON Path function.

This filter example shows to pass messages to the output channel if they contain a specific JSON field matching a specific value.

xd:> stream create --name jsonfiltertest --definition "http --port=9002 | filter --expression=#jsonPath(payload,'$.firstName').contains('John') | log" --deploy

Note: There is no space between payload JSON and the jsonPath in the expression

This filter will only pass Messages to the log sink if the JSON payload contains the firstName "John". Try sending this payload to the HTTP endpoint and you should see it in the XD log:

xd:> http post --target http://localhost:9002 --data "{\"firstName\":\"John\", \"lastName\":\"Smith\"}"

Alternatively, if you send a different firstName, you shouldn’t see the log entry.

Here is another example usage of filter

filter --expression=#jsonPath(payload,'$.entities.hashtags[*].text').contains('obama')

This is an example that is operating on a JSON payload of tweets as consumed from the twitter search module.

Filter with Groovy Script

For more complex filtering, you can pass the location of a Groovy script using the script option. If you want to pass variable values to your script, you can statically bind values using the variables option or optionally pass the path to a properties file containing the bindings using the propertiesLocation option.All properties in the file will be made available to the script as variables. Note that payload and headers are implicitly bound to give you access to the data contained in a message.

Example:

Note
These features are common to all modules backed by Groovy scripts.
//custom-filter.groovy
return payload.size()> 4 || shortstrings=='true'
#custom-filter.properties
shortstrings=false

By default, Spring XD will search the classpath for custom-filter.groovy and custom-filter.properties. You can place the script in ${xd.home}/modules/processor/scripts and the properties file in ${xd.home}/config to make them available on the classpath. Alternatively, you can prefix the script and properties-location values with file: to load from the file system.

In the following stream definitions, the filter will pass only the first message:

xd>: stream create --name groovyfiltertest1 --definition "http --port=9001 | filter --script=file:<absolute-path-to>/custom-filter.groovy --variables='shortstrings=false' | log" --deploy
Created and deployed new stream 'groovyfiltertest1'
xd:>http post --target http://localhost:9001 --data hello
xd:http post --target http://localhost:9001 --data hi
xd>: stream create --name groovyfiltertest2 --definition "http --port=9002 | filter --script=file:<absolute-path-to>/custom-filter.groovy --propertiesLocation=file:<absolute-path-to>/custom-filter.properties | log" --deploy
Created and deployed new stream 'groovyfiltertest2'
xd:>http post --target http://localhost:9002 --data hello
xd:http post --target http://localhost:9002 --data hi

In the following stream definitions, the filter will pass all messages (provided the payload type supports a size() method):

xd>: stream create --name groovyfiltertest1 --definition "http --port=9001 | filter --script=file:<absolute-path-to>/custom-filter.groovy --variables='shortstrings=false' | log" --deploy
Created and deployed new stream 'groovyfiltertest1'
xd>: stream create --name groovyfiltertest2 --definition "http --port=9002 | filter --script=file:<absolute-path-to>/custom-filter.groovy --variables='shortstring=false' --propertiesLocation=file:<absolute-path-to>/custom-filter.properties | log" --deploy
Created and deployed new stream 'groovyfiltertest2'

Note the last example demonstrates that values specified in variables override values from propertiesLocation

Tip
The script is checked for updates every 60 seconds, so it may be replaced in a running system.

Transform

Use the transform module in a stream to convert a Message’s content or structure.

The transform processor has the following options:

expression

a SpEL expression used to transform messages (String, default: payload.toString())

propertiesLocation

the path of a properties file containing custom script variable bindings (String, no default)

script

reference to a script used to process messages (String, no default)

variables

variable bindings as a comma delimited string of name-value pairs, e.g., 'foo=bar,baz=car' (String, no default)

Transform with SpEL expression

The simplest way to use the transform processor is to pass a SpEL expression when creating the stream. The expression should return the modified message or payload. For example:

xd:> stream create --name transformtest --definition "http --port=9003 | transform --expression='FOO' | log" --deploy

This transform will convert all message payloads to the word "FOO". Try sending something to the HTTP endpoint and you should see "FOO" in the XD log:

xd:> http post --target http://localhost:9003 --data "some message"

As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is #jsonPath(payload,'<json path expression>')

Transform with Groovy Script

For more complex transformations, you can pass the location of a Groovy script using the script option. If you want to pass variable values to your script, you can statically bind values using the variables option or optionally pass the path to a properties file containing the bindings using the propertiesLocation option. All properties in the file will be made available to the script as variables. Note that payload and headers are implicitly bound to give you access to the data contained in a message. See the Filter example for a more detailed discussion of script variables.

xd:> stream create --name groovytransformtest1 --definition "http --port=9004 | transform --script=custom-transform.groovy --variables="x=foo" | log" --deploy
xd:> stream create --name groovytransformtest2 --definition "http --port=9004 | transform --script=custom-transform.groovy --propertiesLocation=custom-transform.properties | log" --deploy

By default, Spring XD will search the classpath for custom-transform.groovy and custom-transform.properties. You can place the script in ${xd.home}/modules/processor/scripts and the properties file in ${xd.home}/config to make them available on the classpath. Alternatively, you can prefix the script and properties-location values with file: to load from the file system.

Tip
The script is checked for updates every 60 seconds, so it may be replaced in a running system.

Script

The script processor contains a Service Activator that invokes a specified Groovy script. This is a slightly more generic way to accomplish processing logic, as the provided script may simply terminate the stream as well as transform or filter Messages.

The script processor has the following options:

propertiesLocation

the path of a properties file containing custom script variable bindings (String, no default)

script

reference to a script used to process messages (String, no default)

variables

variable bindings as a comma delimited string of name-value pairs, e.g., 'foo=bar,baz=car' (String, no default)

To use the module, pass the location of a Groovy script using the script attribute. If you want to pass variable values to your script, you can statically bind values using the variables option or optionally pass the path to a properties file containing the bindings using the propertiesLocation option. All properties in the file will be made available to the script as variables. Note that payload and headers are implicitly bound to give you access to the data contained in a message. See the Filter example for a more detailed discussion of script variables.

xd:> stream create --name groovyprocessortest --definition "http --port=9006 | script --script=custom-processor.groovy --variables='x=foo' | log" --deploy
xd:> stream create --name groovyprocessortest --definition "http --port=9006 | script --script=custom-processor.groovy --propertiesLocation=custom-processor.properties | log" --deploy

By default, Spring XD will search the classpath for custom-processor.groovy and custom-processor.properties. You can place the script in ${xd.home}/modules/processor/scripts and the properties file in ${xd.home}/config to make them available on the classpath. Alternatively, you can prefix the location and properties-location values with file: to load from the file system.

Tip
The script is checked for updates every 60 seconds, so it may be replaced in a running system.

Splitter

The splitter module builds upon the concept of the same name in Spring Integration and allows the splitting of a single message into several distinct messages.

The splitter processor has the following options:

expression

a SpEL expression which would typically evaluate to an array or collection (String, default: payload)

Note
The default value for expression is payload, which actually does not split, unless the message is already a collection.

As part of the SpEL expression you can make use of the pre-registered JSON Path function. The syntax is #jsonPath(payload,'<json path expression>')

Extract the value of a specific field

This splitter converts a JSON message payload to the value of a specific JSON field.

xd:> stream create --name jsontransformtest --definition "http --port=9005 | splitter --expression=#jsonPath(payload,'$.firstName') | log" --deploy

Try sending this payload to the HTTP endpoint and you should see just the value "John" in the XD log:

xd:> http post --target http://localhost:9005 --data '{"firstName":"John", "lastName":"Smith"}'

Aggregator

The aggregator module does the opposite of the splitter, and builds upon the concept of the same name found in Spring Integration. By default, it will consider all incoming messages from a stream to belong to the same group:

xd:> stream create --name aggregates --definition "http | aggregator --count=3 --aggregation=T(org.springframework.util.StringUtils).collectionToDelimitedString(#this.![payload],' ') | log" --deploy

This uses a SpEL expression that will basically concatenate all payloads together, inserting a space character in between. As such,

xd:> http post --data Hello
xd:> http post --data World
xd:> http post --data !

would emit a single message whose contents is "Hello World !". This is because we set the aggregator release strategy to accumulate 3 messages.

The aggregator processor has the following options:

aggregation

how to construct the aggregated message (SpEL expression against a collection of messages) (String, default: #this.![payload])

correlation

how to correlate messages (SpEL expression against each message) (String, default: '<stream name>')

count

the number of messages to group together before emitting a group (int, default: 50)

dbkind

which flavor of init scripts to use for the jdbc store (blank to attempt autodetection) (String, no default)

driverClassName

the jdbc driver to use when using the jdbc store (String, no default)

hostname

hostname of the redis instance to use as a store (String, default: localhost)

initializeDatabase

whether to auto-create the database tables for the jdbc store (boolean, default: false)

password

the password to use when using the jdbc or redis store (String, default: ``)

port

port of the redis instance to use as a store (int, default: 6379)

release

when to release messages (SpEL expression against a collection of messages accumulated so far) (String, no default)

store

the kind of store to use to retain messages (StoreKind, default: memory, possible values: memory,jdbc,redis)

timeout

the delay (ms) after which messages should be released, even if the completion criteria is not met (int, default: 50000)

url

the jdbc url to connect to when using the jdbc store (String, no default)

username

the username to use when using the jdbc store (String, no default)

Note
  • Some of the options are only relevant when using a particular store

  • The default correlation of '<stream name>' actually considers all messages to be correlated, since they all belong to the same stream.

  • Using the release option overrides the count option (which is a simpler approach)

  • The default for aggregation creates a new collection made of the payloads of the accumulated messages

  • About the timeout option: due to the way it is implemented (see MessageGroupStoreReaper in the Spring Integration documentation), the actual observed delay may vary between timeout and 2xtimeout.

HTTP Client

The http-client processor acts as a client that issues HTTP requests to a remote server, submitting the message payload it receices to that server and in turn emitting the response it receives to the next module down the line.

For example, the following command will result in an immediate fetching of earthquake data and it being logged in the container:

xd:>stream create earthquakes --definition "trigger | http-client --url='''http://earthquake.usgs.gov/earthquakes/feed/geojson/all/day''' --httpMethod=GET | log" --deploy
Note

Please be aware that the url option above is actually a SpEL expression, hence the triple quotes. If you’d like to learn more about quotes, please read the relevant documentation.

The http-client processor has the following options:

charset

the charset to use when in the Content-Type header when emitting Strings (String, default: UTF-8)

httpMethod

the http method to use when performing the request (HttpMethod, default: POST, possible values: OPTIONS,GET,HEAD,POST,PUT,PATCH,DELETE,TRACE,CONNECT)

mappedRequestHeaders

request message header names to be propagated to/from the adpater/gateway (String, default: HTTP_REQUEST_HEADERS)

mappedResponseHeaders

response message header names to be propagated from the adpater/gateway (String, default: HTTP_RESPONSE_HEADERS)

replyTimeout

the amount of time to wait (ms) for a response from the remote server (int, default: 0)

url

the url to perform an http request on (String, no default)

Shell

The shell processor forks an external process by running a shell command to launch a process written in any language. The process should implement a continual loop that waits for input from stdin and writes a result to stdout in a request-response manner. The process will be destroyed when the stream is undeployed. For example, it is possible to invoke a Python script within a stream in this manner. Since the shell processor relies on low-level stream processing there are some additional requirements:

  • Input and output data are expected to be Strings, the charset is configurable.

  • The shell process must not write out of band data to stdout, such as a start up message or prompt.

  • Anything written to stderr will be logged as an ERROR in Spring XD but will not terminate the stream.

  • Responses written to stdout must be terminated using the configured encoder (CRLF or "\r\n" is the default) for the module and must not exceed the configured bufferSize

  • Any external software required to run the script must be installed on the container node to which the module is deployed.

Here is a simple Python example that echos the input:

#echo.py
import sys

#=====================
# Write data to stdout
#=====================
def send(data):
  sys.stdout.write(data)
  sys.stdout.flush()

#===========================================
# Terminate a message using the default CRLF
#===========================================
def eod():
  send("\r\n")

#===========================
# Main - Echo the input
#===========================

while True:
  try:
    data = raw_input()
    if data:
      send(data)
      eod()
  except EOFError:
      eod()
      break
Note

Spring XD provides additional Python programming support for handling basic stream processing, as shown above, see creating a Python module.

To try this example, copy the above script and save it to echo.py. Start Spring XD and create a stream:

xd:>stream create pytest --definition "time | shell --command='python <absolute-path-to>/echo.py' | log" --deploy
Created and deployed new stream 'pytest'

you should see the time echoed in the log:

09:49:14,856  INFO task-scheduler-5 sink.pytest - 2014-10-10 09:49:14
09:49:15,860  INFO task-scheduler-1 sink.pytest - 2014-10-10 09:49:15
09:49:16,862  INFO task-scheduler-1 sink.pytest - 2014-10-10 09:49:16
09:49:17,864  INFO task-scheduler-1 sink.pytest - 2014-10-10 09:49:17

This script can be easily modified to do some actual work by providing a function that takes the input as an argument and returns a string. Then insert the function call:

while True:
  try:
    data = raw_input()
    if data:
      result = myfunc(data)
      send(result)
      eod()
  except EOFError:
      eod()
      break

The shell processor has the following options:

bufferSize

the size of the buffer (bytes) to use when encoding/decoding (int, default: 2048)

charset

the charset used when converting from String to bytes (String, default: UTF-8)

command

the shell command (String, no default)

encoder

the encoder to use when sending messages (Encoding, default: CRLF, possible values: CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)

environment

additional process environment variables as comma delimited name-value pairs (String, no default)

redirectErrorStream

redirects stderr to stdout (boolean, default: false)

workingDir

the process working directory (String, no default)

JSON to Tuple

The json-to-tuple processor is able to transform a String representation of some JSON map into a Tuple.

Here is a simple example:

xd:>stream create tuples --definition "http | json-to-tuple | transform --expression='payload.firstName + payload.lastName' | log" --deploy

xd:>http post --data '{"firstName": "Spring", "lastName": "XD"}'
Note

Transformation to Tuple can be used as an alternative or in addition of Type Conversion, depending on your usecase.

The json-to-tuple processor has no particular option (in addition to options shared by all modules)

Object to JSON

The object-to-json processor can be used to convert any java Objec to a JSON String.

In the following example, notice how the collection of three elements is transformed to JSON (in particular, the three Strings are surrounded by quotes):

xd:>stream create json --deploy --definition "http | aggregator --count | object-to-json | log"

xd:>http post --data hello
xd:>http post --data world
xd:>http post --data !

results in ["hello", "world", "!"] appearing in the log.

The object-to-json processor has no particular option (in addition to options shared by all modules)

⚠️ **GitHub.com Fallback** ⚠️