Creating a Sink Module - garyrussell/spring-xd GitHub Wiki
As outlined in the modules document, Spring XD currently supports four types of modules: source, sink, and processor for stream processing and job for batch procesing. This document walks through implementing a custom sink module.
The last module in a stream is always a sink. A sink module is built with Spring Integration to consume messages on its input channel and send them to an external resource to terminate the stream.
Spring Integration provides a number of outbound channel adapters to integrate with various transports such as TCP, AMQP, JMS, Kafka, HTTP, web services, mail, or data stores such as file, Redis, MongoDB, JDBC, Splunk, Gemfire, and more. It is straightforward to create a sink module using an existing outbound channel adapters. Such outbound channel adapters are typically used to integrate streams with external data stores or legacy systems. Alternately, you may need to invoke a third party Java API to provide data to an external system. In this case, the sink can easily invoke a Java method using a Service Activator.
Here, we will demonstrate step-by-step how to create and install a sink module using the Spring Integration Redis Store Outbound Channel Adapter. The complete code for this example is redis-store-sink sample project.
Configure the outbound channel adapter in an xml bean definition file under the config resource directoy:
<beans ...>
<int:channel id="input" />
<int-redis:store-outbound-channel-adapter
id="redisListAdapter" collection-type="LIST" channel="input" key="${collection}" auto-startup="false"/>
<beans:bean id="redisConnectionFactory"
class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<beans:property name="hostName" value="${host}" />
<beans:property name="port" value="${port}" />
</beans:bean>
</beans>The adapter, as required by Spring XD, is configured as an endpoint on a channel named input. When a message is consumed, the Redis Store outbound channel adapter will write the payload to a Redis list with a key given by the ${collection} property. By default, the Redis Store outbound channel adapter uses a bean named redisConnectionFactory to connect to the Redis server. Here the connection factory is configured with property placeholders ${host}, ${port} which will be provided as module options in stream definitions that use this sink. Note that auto-startup is set to false. This is a requirement for Spring XD modules. When a stream is deployed, the Spring XD runtime will create and start the modules in the correct order to ensure that everything is initialized before the stream starts processing messages.
|
Note
|
By default, the adapter uses a StringRedisTemplate. Therefore, this module will store all payloads directly as Strings. You may configure a RedisTemplate with a different value Serializer to serialize other data types, such as Java objects, to the Redis collection. |
Spring XD will automatically register a PropertyPlaceholderConfigurer to your application context, so there is no need to declare one here. These properties correspond to module options defined for this module (discussed below). Users supply option values when creating a stream using the DSL.
The module’s properties file in the config resource directory contains Module Options Metadata including a description, type, and optional default value for each property. The metadata supports features like auto-completion in the Spring XD shell and option validation:
options.collection.description = the name of the list
options.collection.default= ${xd.stream.name}
options.collection.type = java.lang.String
#
options.host.description = the host name for the Redis store
options.host.default= localhost
options.host.type = java.lang.String
#
options.port.description = the port for the Redis store
options.port.default= 6379
options.port.type = java.lang.Integer
Note that the collection defaults to the stream name, referencing a common property provided by Spring XD.
Alternately, you can write a POJO to define the metadata. Using a Java class provides better validation along with additional features and requires that the class be packaged as part of the module.
This section covers creating the module as a standalone project containing some code to test the module. This example uses Maven but Spring XD supports Gradle as well
Take a look at the pom file for this example. You will see it declares spring-xd-module-parent as its parent and declares a dependency on spring-integration-redis which provides the outbound channel adapter. The parent pom provides everything else you need. We also need to configure repositories to access the parent pom and any other dependencies. The xml file containing the bean definitions and the properties file are located in src\main\resources\config.
The main objective of the test is to ensure that messages are stored in a Redis list once the module’s Application Context is loaded. In order to test the module stand-alone, we need to enhance the module context with property values and a RedisTemplate to retrieve the stored messages.
<beans...>
<context:property-placeholder properties-ref="props"/>
<util:properties id="props">
<prop key="collection">mycollection</prop>
<prop key="host">localhost</prop>
<prop key="port">6379</prop>
</util:properties>
<import resource="classpath:config/spring-module.xml"/>
<bean id="redisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
</beans>Next, create and run the RedisStoreSinkTest:
package org.springframework.xd.samples;
import ...
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class RedisStoreSinkTest {
@Autowired
ConfigurableApplicationContext applicationContext;
@Autowired
MessageChannel input;
@Autowired
RedisTemplate<String,String> redisTemplate;
@Test
public void test() {
applicationContext.start();
input.send(new GenericMessage<String>("hello"));
assertEquals("hello", redisTemplate.boundListOps("mycollection").leftPop(5, TimeUnit.SECONDS));
}
@After
public void cleanUp() {
redisTemplate.delete("mycollection");
}
}The test will load the module application context using our test context and send a message to the module’s input channel. It will fail if the input payload "hello" is not added to the Redis list within 5 seconds.
The test requires a running Redis server. See Getting Started for information on installing and starting Redis.
Another test you may want to include is one to verify the module options metadata, as defined in spring-module.properties Here is an example ModuleOptionsTest that uses Spring XD’s DefaultModuleOptionsMetadataResolver
package org.springframework.xd.samples;
import ...
/**
* Tests expected module properties are present.
*/
public class ModuleOptionsTest {
@Test
public void testModuleOptions() {
ModuleOptionsMetadataResolver moduleOptionsMetadataResolver = new DefaultModuleOptionsMetadataResolver();
String resource = "classpath:/";
ModuleDefinition definition = ModuleDefinitions.simple("redis-store", sink, resource);
ModuleOptionsMetadata metadata = moduleOptionsMetadataResolver.resolve(definition);
assertThat(
metadata,
containsInAnyOrder(moduleOptionNamed("collection"), moduleOptionNamed("host"),
moduleOptionNamed("port")));
for (ModuleOption moduleOption : metadata) {
if (moduleOption.getName().equals("collection")) {
assertEquals("${xd.stream.name}", moduleOption.getDefaultValue());
}
if (moduleOption.getName().equals("port")) {
assertEquals("6379", moduleOption.getDefaultValue());
}
if (moduleOption.getName().equals("host")) {
assertEquals("localhost", moduleOption.getDefaultValue());
}
}
}
public static Matcher<ModuleOption> moduleOptionNamed(String name) {
return hasProperty("name", equalTo(name));
}
}The next step is to package the module as an uber-jar using maven:
$mvn package
This will build an uber-jar in target/redis-store-sink-1.0.0.BUILD-SNAPSHOT.jar. If you inspect the contents of this jar, you will see it includes the module configuration files and dependent jars (spring-integration-redis in this case).
Fire up the Spring XD runtime if it is not already running and,
using the Spring XD Shell, install the module as a sink named redis-store using the module upload command:
xd:>module upload --file [path-to]/redis-store-sink/target/redis-store-sink-1.0.0.BUILD-SNAPSHOT.jar --name redis-store --type sink
See registering a module for more details.
Once the XD server is running, create a stream to test your new module. This stream will write tweets containing the word "java" to Redis as a JSON string:
xd:> stream create --name javasearch --definition "twittersearch --consumerKey=<your_key> --consumerSecret=<your_secret> --query=java | redis-store --collection=javatweets" --deploy
Note that you need to have a consumer key and secret to use the twittersearch module. See the description in the sources section for more information.
Fire up the redis-cli and verify that tweets are being stored:
$ redis-cli
redis 127.0.0.1:6379> lrange javatweets 0 -1
1) "{\"id\":342386150738120704,\"text\":\"Now Hiring: Senior Java Developer\",\"createdAt\":1370466194000,\"fromUser\":\"jencompgeek\",...\"}"If you prefer a simpler test, you can create a stream using the http source and manually post data to it:
xd:> stream create --name redisTest --definition "http | redis-store" --deploy xd:> http post --target http://localhost:9000 --data hello
redis 127.0.0.1:6379> lrange redisTest 0 -1 1) "hello"