Taps - garyrussell/spring-xd GitHub Wiki

Introduction

A Tap allows you to "listen" to data while it is processed in an existing stream and process the data in a separate stream. The original stream is unaffected by the tap and isn’t aware of its presence, similar to a phone wiretap. (WireTap is included in the standard catalog of EAI patterns and implemented in the Spring Integration EAI framework used by Spring XD).

Simply put, a Tap is a stream that uses a point in another stream as a source.

Example

The following XD shell commands create a stream foo1 and a tap named foo1tap:

xd:> stream create --name foo1 --definition "time | log" --deploy
xd:> stream create --name foo1tap --definition "tap:stream:foo1 > log" --deploy

Since a tap is a type of stream, use the stream create command to create the tap. The tap source is specified using the named channel syntax and always begins with tap:. In this case, we are tapping the stream named foo1 specified by :stream:foo1

Note
stream: is required in this case as it is possible to tap alternate XD targets such as jobs. This tap consumes data at the source of the target stream.

A tap can consume data from any point along the target stream’s processing pipeline. XD provides a few ways to tap a stream after a given processor has been applied:

Example - tap after a processor has been applied

If the module name is unique in the target stream, use tap:stream:<stream_name>.<module_name>

If you have a stream called mystream, defined as

http | filter --expression=payload.startsWith('A') | transform --expression=payload.toLowerCase() | file

Create a tap after the filter is applied using

tap:stream:mystream.filter > ....

Example - using a label

You may also use labels to create an alias for a module and reference the label in the tap

If you have a stream called mystream, defined as

http | transform --expression=payload.toLowerCase() | flibble: transform --expression=payload.substring(3) | file

Create a tap after the second transformer is applied using

tap:stream:mystream.flibble > ....

A primary use case for a Tap is to perform realtime analytics at the same time as data is being ingested via its primary stream. For example, consider a Stream of data that is consuming Twitter search results and writing them to HDFS. A tap can be created before the data is written to HDFS, and the data piped from the tap to a counter that correspond to the number of times specific hashtags were mentioned in the tweets.

Creating a tap on a named channel, a stream whose source is a named channel, or a label is not yet supported. This is planned for a future release.

You’ll find specific examples of creating taps on existing streams in the Analytics section.

Note
In cases where a multiple modules with the same module name, a label must be specified on the module to be tapped. For example if you want to tap the 2nd transform: http | transform --expression=payload.toLowerCase() | tapMe: transform --expression=payload.substring(3) | file

Tap Lifecycle

A side effect of a stream being unaware of any taps on its pipeline is that deleting the stream will not automatically delete the taps. The taps have to be deleted separately. However if the tapped stream is re-created, the existing tap will continue to function.

⚠️ **GitHub.com Fallback** ⚠️