Creating a Source Module - garyrussell/spring-xd GitHub Wiki

Introduction

As outlined in the modules document, Spring XD currently supports four types of modules: source, sink, and processor for stream processing and job for batch processing. This document walks through the creation of a custom source module.

The first module in a stream is always a source. Source modules are built with Spring Integration and are responsible for producing messages originating from an external data source on its output channel. These message can then be processed by the downstream modules in a stream. A source module is often fed data by a Spring Integration inbound channel adapter, configured with a poller.

Spring Integration provides a number of adapters out of the box to integrate with various transports and data stores, such as JMS, File, HTTP, Web Services, Mail, and more. Typically, it is straightforward to create a source module using an existing inbound channel adapter.

Here we walk through an example demonstrating how to create and register a source module using the Spring Integration Feed Inbound Channel Adapter. The complete code for this example is in the rss-feed-source sample project.

Create the module Application Context file

Configure the inbound channel adapter using an xml bean definition file in the config resource directory:

<beans...>

	<int-feed:inbound-channel-adapter id="xdFeed" channel="output" url="${url}" auto-startup="false" >
		<int:poller fixed-rate="${fixedRate}" max-messages-per-poll="${maxMessagesPerPoll}" />
	</int-feed:inbound-channel-adapter>

	<int:channel id="output"/>
</beans>

The adapter is configured to poll an RSS feed at a fixed rate (e.g., every 5 seconds). Note that auto-startup is set to false. This is a requirement for Spring XD modules. When a stream is deployed, the Spring XD runtime will create and start stream modules in reverse order to ensure that all modules are initialized before the source starts emiting messages. When an RSS Entry is retreived, it will create a message with a com.rometools.rome.feed.synd.SyndEntry payload type and send it to a message channel called output. The name output is a Spring XD convention indicating the module’s output channel. Any messages on the output channel will be consumed by the downstream processor or sink in a stream used by this module.

The module is configurable so that it may pull data from any feed URL, such as http://feeds.bbci.co.uk/news/rss.xml. Spring XD will automatically register a PropertyPlaceholderConfigurer in the module’s application context. These properties correspond to module options defined for this module (discussed below). Users supply option values when creating a stream using the DSL.

Users must provide a url option value when creating a stream that uses this source. The polling rate and maximum number of entries retrieved for each poll are also configurable and for these properties we should provide reasonable default values. The module’s properties file in the config resource directory contains Module Options Metadata including a description, type, and optional default value for each property. The metadata supports features like auto-completion in the Spring XD shell and option validation:

options.url.description = the URL of the RSS feed
options.url.type = java.lang.String

options.fixedRate.description = the fixed rate polling interval specified in milliseconds
options.fixedRate.default = 5000
options.fixedRate.type = int

options.maxMessagesPerPoll.description = the maximum number of messages per poll
options.maxMessagesPerPoll.default = 100
options.maxMessagesPerPoll.type = int

Alternately, you can write a POJO to define the metadata. Using a Java class provides better validation along with additional features and requires that the class be packaged as part of the module.

Create a Module Project

This section covers the setup of a standalone project containing the module configuration and some code for testing the module. This example uses Maven but Spring XD supports Gradle as well.

Take a look at the pom file for this example. You will see it declares spring-xd-module-parent as its parent and declares a dependency on spring-integration-feed which provides the inbound channel adapter. The parent pom provides everything else you need. We also need to configure repositories to access the parent pom and any other dependencies. The required xml file containing the bean definitions and properties file are located in src\main\resources\config. In this case, we have elected to use a custom transformer to convert the output of the feed inbound adapter to a JSON string.

<beans ...>
	<int-feed:inbound-channel-adapter id="xdFeed" channel="to.json" url="${url}" auto-startup="false">
		<int:poller fixed-rate="${fixedRate}" max-messages-per-poll="${maxMessagesPerPoll}" />
	</int-feed:inbound-channel-adapter>

	<int:transformer input-channel="to.json" output-channel="output">
		<bean class="com.acme.SyndEntryJsonTransformer"/>
	</int:transformer>

	<int:channel id="output"/>
</beans>

The project README contains a detailed explanation of why this transformer is needed, but such things are easily accomplished with Spring Integration.

Create a Spring Integration test

The first level of testing should ensure that the module’s Application Context is loaded and that the message flow works as expected independent of Spring XD. In this case, we need to wrap the module application context in a test context that provides a property placeholder (the Spring XD runtime does this for you). In addition, it is convenient to override the module’s output channel with a queue channel so that the test will block until a message is received from the feed.

Add the following configuration in the appropriate location under src/test/resources/:

<beans ...>

	<context:property-placeholder properties-ref="props"/>
	<util:properties id="props">
		<prop key="url">http://feeds.bbci.co.uk/news/rss.xml</prop>
		<prop key="fixedRate">5000</prop>
		<prop key="maxMessagesPerPoll">100</prop>
	</util:properties>

	<import resource="classpath:config/spring-module.xml"/>

	<!-- Override direct channel with a queue channel so the test will block until a message is received -->
	<int:channel id="output">
		<int:queue/>
	</int:channel>
</beans>

Next, create and run the test:

package com.acme;

import ...

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class FeedConfigurationTest {
	@Autowired
	PollableChannel output;

	@Autowired
	ConfigurableApplicationContext applicationContext;

	@Test
	public void test() {
		applicationContext.start();
		Message message = output.receive(10000);
		assertNotNull(message);
		assertTrue(message.getPayload() instanceof String);
	}
}

The test will load an Application Context using our feed and test context files. It will fail if a item is not received on the output channel within 10 seconds.e

Create an in-container test

Now that you have verified that the module is basically correct, you can write a test to use it in a stream deployed to an embedded Spring XD container.

Note

See test a module for some important tips abouts regarding in-container testing.

The spring-xd-module-parent pom provides the necessary dependencies to write such a test:

package com.acme;

import ...

public class FeedSourceModuleIntegrationTest {
	private static SingleNodeApplication application;

	private static int RECEIVE_TIMEOUT = 6000;

	/**
	 * Start the single node container, binding random unused ports, etc. to not conflict with any other instances
	 * running on this host. Configure the ModuleRegistry to include the project module.
	 */
	@BeforeClass
	public static void setUp() {
		RandomConfigurationSupport randomConfigSupport = new RandomConfigurationSupport();
		application = new SingleNodeApplication().run();
		SingleNodeIntegrationTestSupport singleNodeIntegrationTestSupport = new SingleNodeIntegrationTestSupport(application);
		singleNodeIntegrationTestSupport.addModuleRegistry(new SingletonModuleRegistry(ModuleType.source, "feed"));
	}

	@Test
	public void test() {
		String url = "http://feeds.bbci.co.uk/news/rss.xml";
		SingleNodeProcessingChainConsumer chain = chainConsumer(application, "feedStream", String.format("feed --url='%s'", url));

		Object payload = chain.receivePayload(RECEIVE_TIMEOUT);
		assertTrue(payload instanceof String);

		chain.destroy();
	}
}

The above test configures an and starts embedded Spring XD runtime (SingleNodeApplication) to deploy a stream that uses the module under test.

The SingleNodeProcessingChainConsumer can test a stream that does not include a sink. The chain itself provides an in-memory sink to access the stream’s output directly. In this case, we use the chain to test the source in isolation. The above test is equivalent to deploying following stream definition:

feed --url='http://feeds.bbci.co.uk/news/rss.xml' > queue:aNamedChannel

and the chain consumes messages on the named queue channel. At the end of each test method, the chain should be destroyed to destroy these internal resources and restore the initial state of the Spring XD container.

Note

The spring-xd-module-parent Maven pom includes a tasks to install a local message bus implementation under lib in the project root to enable a local transport provider for the embedded Spring XD container. It is necessary to run maven process-resources or a downstream goal (e.g., compile, test, package) once in order for this test to work correctly.

Install the Module

We have implemented and tested the module using Spring Integration directly and also by deploying the module to an embedded Spring XD container. Time to install the module to Spring XD!

The next step is to package the module as an uber-jar using maven:

$mvn package

This will build an uber-jar in target/rss-feed-source-1.0.0.BUILD-SNAPSHOT.jar. If you inspect the contents of this jar, you will see it includes the module configuration files, custom transformer class, and dependent jars. Fire up the Spring XD runtime if it is not already running and, using the Spring XD Shell, install the module as a source named feed using the module upload command:

xd:>module upload --file [path-to]/rss-source-feed/target/rss-source-feed-1.0.0.BUILD-SNAPSHOT.jar --name feed --type source

Also See registering a module for more details.

Test the source module

Once Spring XD is running, create a stream to test it the module. This stream will write SyndEntry objects rendered as JSON to the Spring XD log:

xd:> stream create --name feedtest --definition "feed --url='http://feeds.bbci.co.uk/news/rss.xml' | log" --deploy

You should start seeing messages like the following in the container log:

16:46:41,309 1.1.0.SNAP  INFO xdbus.feedTest.0-1 sink.feedTest - {"uri":"http://www.bbc.co.uk/sport/0/football/30700069","link":"http://www.bbc.co.uk/sport/0/football/30700069","comments":null,"updatedDate":null,"title":"Gerrard to seal move to LA Galaxy","description":{"type":"text/html","value":"Liverpool captain Steven Gerrard is on the brink of finalising an 18-month deal to join MLS side Los Angeles Galaxy.","mode":null,"interface":"com.rometools.rome.feed.synd.SyndContent"},"links":[],"contents":[],"modules":[{"uri":"http://purl.org/dc/elements/1.1/","title":null,"creator":null,"subject":null,"description":null,"publisher":null,"contributors":[],"date":1420580673000,"type":null,"format":null,"identifier":null,"source":null,"language":null,"relation":null,"coverage":null,"rights":null,"sources":[],"types":[],"formats":[],"identifiers":[],"interface":"com.rometools.rome.feed.module.DCModule","creators":[],"titles":[],"descriptions":[],"publishers":[],"contributor":null,"dates":[1420580673000],"languages":[],"relations":[],"coverages":[],"rightsList":[],"subjects":[]}],"enclosures":[],"authors":[],"contributors":[],"source":null,"wireEntry":null,"categories":[],"interface":"com.rometools.rome.feed.synd.SyndEntry","titleEx":{"type":null,"value":"Gerrard to seal move to LA Galaxy","mode":null,"interface":"com.rometools.rome.feed.synd.SyndContent"},"publishedDate":1420580673000,"author":""}
⚠️ **GitHub.com Fallback** ⚠️