Conveyor and Enterprise Integration Patterns - aegisql/conveyor GitHub Wiki

RACE and Enterprise Integration Patterns

Cookbook

Public Draft. Please send your comments and feedback to [email protected]

Introduction

RACE provides implementation of the Enterprise Integration Aggregator Pattern, but it go's far beyond it. With the Conveyor you can achieve many of the Enterprise Integration goals with less efforts. It can be used in any stand-alone application on JVM platform, or combined with other Integration solutions, playing role of a "missing link". Aggregator in the Integration world is responsible for keeping state in a permanently changing stream of data, and then provide some "aggregated" result, when it's ready.

This article shows how to use conveyors with all Enterprise Integration Patterns and styles. It does not substitute documentation. Some patterns described with detailed examples. Others - just discussion of approaches that can be used.

Integration Styles

Conveyors can be useful with all Integration Styles.

How can I integrate multiple applications so that they work together and can exchange information?

Although file transfer is arguably the oldest and simplest method of integration, doing it right can be a challenge. You should be able to control such things as keeping manageable file size, both in absolute numbers and max number of records in each file. Closing file, and opening a new one when limits are reached. You may want to control content of the file by grouping data in it based on multiple and variable requirements. e.g. 'all data for the previous hour', or, 'data for the same transaction should be placed in one file'. As opposite, on the recipient's side, it may happen that different parts of the same data can be found in different files. Mostly it happens with the log files. So, you need a mechanism to combine those records properly. What to do if some records in the file are corrupted? What if data format varies from file to file, even from record to record? And many other concerns.

Many of them can be address by using Conveyors. Some of the useful features.

  • Batch conveyors. Send records with ID uniquely representing opened files. Labels can be used to establish different processing algorithms for different types of data, or to parametrize the batch. Readiness of the batch, its timeout strategy can be explicitly defined and managed at run time.
  • Think about Conveyor Persistence. SQLite database (one of supported) produces one file per schema. So, you can easily transfer it, when the job is done, and open on the recipient side. Conveyor persistence is schema-less. All data will be restored for you automatically.
  • Process multiple data formats using different builders or conveyors
  • Control broken records and notify about any issues by defining ScrapConsumers
  • Send notifications from the ResultConsumer when file is closed and ready to be sent to or downloaded by recipients. Metadata properties supported by conveyors will hold all destination addresses.
  • Use conveyors to set up post processing tasks , e.g. removing temporary files after receiving confirmation messages from recipients.

How can I integrate multiple applications so that they work together and can exchange information?

  • Like in case of the File Transfer pattern, Conveyor can be a great intellectual buffer between database and your data stream. Processing, validating and normalizing data, preparing batches, notifying about issues or specific data events, holding data, if you need once in a while restart your database, aggregate data to minimize updates, and so on.
  • With conveyor persistence you can completely eliminated some of the tasks, that usually being solved by shared databases. Specifically, if the only reason you want to use database is to store temporarily incoming data for reliability, then conveyor persistence provides you schema-less maintenance free (almost) solution out of the box.

How can I integrate multiple applications so that they work together and can exchange information?

Remote procedure invocation usually associated with synchronous calls. Although conveyor allows you to synchronize with its output, its not the main way to use it. However, some tasks can be performed better with the conveyor. One of the problems with remote call is, that sometimes you have to pass all data in one very bulky piece, and/or receive back huge result. Conveyors placed as a buffer on the input and output on both sides can solve some of these problems. Conveyors can parametrize call asynchronously with manageable chunks of data, repeat few times, if necessary, response can be persisted and task runner can be released from obligation of sending it back to the client. Result can be split in smaller pieces and re-assembled on the client's side.

How can I integrate multiple applications so that they work together and can exchange information?

Conveyor is totally ready for messaging! Its input and output is not blocking, unless you explicitly decide to synchronize with it. Yes, you can do both! Special data container, called "CART" holds all users data and metadata, priority, full time management and Future for the data part. All this can be transferred to the next stages without loosing any context, both on success or failure.

Most integration patterns here deal with messaging. You will find more details in the next sections.

Messaging Systems

How does one application communicate with another using messaging?

Conveyor is not a messaging system per se. However, you can think about it as about intra-process message channel. Every conveyor has a name and can be accessed by this name from any part of your application. Its capacity and throughput can be scaled according to your needs.

Conveyor is also a great interface where you can plug your external messaging bus. With conveyor you finally can keep messaging simple and independent from its actual content. Conveyor will take care about collecting and processing data chunks for you.

How can two applications connected by a message channel exchange a piece of information?

Conveyor receives data in containers called CARTS. Carts require that each message must be identified by number of parameters:

  • id - correlation identifier, in terms of this pattern language, or correlation ID matching predicate, if message is addressed to many recipients
  • label - identifier of the building part
  • creation time. Default creation time is time when cart was created, however, client can count "creation" from a different moment of time, and specify it explicitly.
  • exact expiration time or TTL counted from current time.
  • priority of the message, in case it has to compete with other messages in the prioritized input queue.
  • properties - map of named metadata objects

Additionally, carts can be used to deliver commands - special types of messages to manage the conveyor.

Carts are serializable. Persistence module provides effective schema-less tools for serialization and de-serialization

How can we perform complex processing on a message while maintaining independence and flexibility?

Use chained Scalar conveyors for every processing step you have to perform. Forward results from output of conveyor to input of the next one. Preserve metadata and timing settings.

Example: Find all unique words in some text

Builders

abstract class ScalarHolder<T,R> implements Testing, Supplier<R> {
	T value; 
	@Override
	public boolean test() {
		return true; // Scalar conveyors immediately ready
		             // after receiving any labeled message
	}
}
// First, convert text to lower case
class ToLower extends ScalarHolder<String,String> {
	public String get() {
		return value.toLowerCase();
	}
}
// Second, tokenize the text and put tokens in a set
class ToSet extends ScalarHolder<String, Set<String>> {
	public Set<String> get() {
		return Arrays
		    .stream(value.split("\\s+"))
		    .collect(Collectors.toSet());
	}
}

Conveyors and usage example

SimpleConveyor<Integer, String> toLower 
    = new SimpleConveyor<>(ToLower::new);
toLower.setName("toLower");

SimpleConveyor<Integer, Set<String>> toSet 
    = new SimpleConveyor<>(ToSet::new);
toSet.setName("toSet");
toSet.resultConsumer(LogResult.debug(toSet)).set();
// Set up forwarding
ForwardResult.from(toLower).to(toSet).label("value").bind();
// Send message to the first conveyor
toLower
	.part()
	.ttl(Duration.ofMillis(1000))
	.id(1)
	.label("value")
	.value("to Be Or not TO be")
	.place();

Output: Check the transformation

DEBUG:108 [toLower] - Forward value from toLower to toSet ProductBin [key=1, product='to be or not to be', expirationTime=1551987825866, status=READY, properties={}]
DEBUG:76 [toSet] - ProductBin [key=1, product='[not, be, or, to]', expirationTime=1551987825866, status=READY, properties={FORWARDED=toLower}]

How can you decouple individual processing steps so that messages can be passed to different filters depending on a set of conditions?

Family of Parallel Conveyors allows to route messages based on ID, Label, Properties, and their combination, or, you can always customize it to your own needs.

How can systems using different data formats communicate with each other using messaging?

Use different builders for different input and output formats. If product classes are compatible, you can use the same conveyor. If not, create and configure conveyor for every product type and route messages accordingly.

How does an application connect to a messaging channel to send and receive messages?

At the end point you need to unload or load data quickly. You don't want to make any complex decisions or data transformation. Everything must be prepared. Even if something went wrong.

  • Asynchronous immutable part loaders is a perfect end point for the messaging input.
  • Cart is a serializable container, supporting meta-data, priorities and time management for messages.
  • ResultConsumer - connect it to asynchronous outgoing message channel.
  • ScrapConsumer - connect it to asynchronous outgoing error channel.

Messaging Channels

How can the caller be sure that exactly one receiver will receive the document or perform the call?

Builders produce and consume their results in a single internal thread, based on decisions made by the readiness predicate. Thus, it can be safely parametrized with available receiver from a pool. You can pass destination point object as a message or in metadata.

// Keep receiver as a labeled part of the build
// Although you can send the RECEIVER instance as my times as you wish
// Only one will be effective at the moment the result is produced.
// You can chose to use the fist available, the last, or any other criteria
// Readiness algorithm must include requirement for RECEIVER availability.
conveyor.part().key(1).label(RECEIVER).value(receiverInstance).place();
// Keep receiver in metadata
// Each time you add the property with name "RECEIVER"
// stored instance will be updated
// The last processed will be applied
conveyor.part().key(1).label(LABEL).value("Some Value").addProperty("RECEIVER",receiverInstance)place();

How can the sender broadcast an event to all interested receivers?

It is possible to change result/scrap consumers dynamically, adding more forwarding as needed. It is even possible to define individual result consumer for some builds, which can be very useful for test message tracing and will not disturb other consumers.

You can also treat "receiver" as an ID in some other conveyor. Then, you can sed a broadcast message to this conveyor this way:

conveyor.part().foreach().label("MAX_VALUE").value(100).place();

All active processes will receive the value. Combine it with static part update in order to make this change available for future recipients.

conveyor.staticPart().label("MAX_VALUE").value(100).place();

How can the application send a data item such that the receiver will know how to process it?

  • Each conveyor knows Class of the builder it should use. It can be individual for each set of data that needs to be processed
  • Each part has a label, which binds this part to certain builder APIs
  • Smart Labels can aggregate multiple consumers for different types of data. Usually to process different classes that represent same or very close entities, e.g. Date and Instant types for current timestamp.

How can a messaging receiver gracefully handle receiving a message that makes no sense?

  • Add additional interceptors before the cart placement filter. Rejected cart will be passed to the ScrapConsumer.
  • Throw an exception from the builder's setter. Build will be canceled and all its unfinished data passed to the ScrapConsumer.
  • Throw KeepRunningConveyorException from the builder's setter. Build wil NOT be canceled, but, rejected data will be passed to the ScrapConsumer.

Example: Conveyor allows to send null values, but it can be unacceptable in your case:

conveyor.addCartBeforePlacementValidator(cart->Objects.requireNonNull(cart.getValue(),"Empty value in "+cart));

What will the messaging system do with a message it cannot deliver?

They will be passed to a scrap consumer. You can send messages to alternative route from there, if necessary, or just report an issue. Default ScrapConsumer puts message in the Conveyor Error log.

How can the sender make sure that a message will be delivered, even if the messaging system fails?

  • Use persistence to make sure data will not be lost even if you have to restart your system.
  • Shift expiration time and implement retry on timeout action. Do it until succeed or exceed max number of attempts
  • Re-route output to alternative path, where results can wait until your main channel is fixed. There is a simple scheduler, called Delayed Conveyor, which can be useful in some circumstances.

How can you connect an application to the messaging system so that it can send and receive messages?

Data Loaders, Result and Scrap consumers, form a layer of ADAPTERS between your application and conveyor.

How can multiple messaging systems be connected so that messages available on one are also available on the others?

Conveyor makes a perfect message transformer. Since all metadata are always preserved, at the output you can not only transform your target product from one Class to another, but you also can change format and value of the correlation IDs, labels, keep or modify timing settings add or edit properties, then forward resulting message to the next channel.

What is an architecture that enables separate applications to work together, but in a decoupled fashion such that applications can be easily added or removed without affecting the others?

Conveyors work WITH message bus, but they also form a messaging network inside your application. Whenever you want to pass results somewhere asynchronously, load it into appropriate conveyor and forget.

Message Construction

How can messaging be used to invoke a procedure in another application?

Create SmartLabel wrapping Runnable closure.

SmartLabel commandLabel = SmartLabel.of("commandLabel",()->{/*Do something*/});

Don't mix this up with conveyor commands - special type of messages to manage conveyors and builds on high level. Conveyor commands have higher priority, comparing to regular messages, and can not be suspended.

With conveyor commands you can:

  • create customizable builds
  • cancel builds
  • force build timeout event
  • re-schedule build (change its expiration time)
  • check if build with corresponding key presents
  • force asynchronous evaluation of the get() method of the supplier and return the product future
  • create a memento wrapper for the build in progress, that can be later...
  • restore build from existing memento object with all metadata.
  • suspend conveyor - command returns future, which is being completed when conveyors completes current processing.
  • resume suspended (not stopped) conveyor
  • stop the conveyor immediately, or wait until it complete all current builds

How can messaging be used to transfer data between applications?

Every document or part of it passed to the conveyor must be identified by two parameters:

  • id or id matching predicate
  • label identifying type of the part
conveyor.part().id(1000).label("MAX_VALUE").value(100).place();
conveyor.part().foreach().label("MAX_VALUE").value(100).place();
conveyor.part().foreach(id -> id <= 100000 ).label("MAX_VALUE").value(100).place();

For builds with well defined list of building parts it is recommended to use SmartLables. If you need more flexibility, use LabeledValueConsumer functional interface.

Example showing usage of regular expressions to match string labels

LabeledValueConsumer<String, String, StringBuilder> lvc = (l,v,b) -> {
	System.out.println("default action "+b+":"+v);
};
StringBuilder sb = new StringBuilder();
lvc = lvc.<String>match("a+", v->{
	System.out.println("appending:"+v);
	sb.append(v.toLowerCase());			
});

static parts require only a label. They will be applied automatically to all newly created builds.

conveyor.staticPart().label("MAX_VALUE").value(100).place();

How can messaging be used to transmit events from one application to another?

If message has to be addressed to all active builds in the conveyor, use multikey predicate instead of ID, when you load the part

//This will send value to all active builds
convetor.part().foreach().label("event").value("FINISHED").place();
//or, for subset of builds
convetor.part().foreach(id -> id < 100000).label("event").value("FINISHED").place();

When an application sends a message, how can it get a response from the receiver?

Use second conveyor to keep response non blocking. Example of the ResultConsumer:

res->{
    responseConveyor.part()
       .id(res.key)
       .label(RESULT)
       .value(res.product)
       .addProperties(res.properties)
       .expirationTime(res.expirationTime)
       .place();
}

Or, you can just use a helper result consumer, provided by the library:

ForwardResult.from(sourceConveyor).to(responseConveyor).label(RESULT).bind();

Then, responseConveyor should keep results until confirmation message is received

responseConveyor.part().key(resId).label(CONFIRMED).place();

How does a replier know where to send the reply?

Keep return address in the Cart properties.

conveyor.part()
    .id(100)
    .label(REPORT)
    .value("Balance=$100.01")
    .addProperty("mailTO","[email protected]")
    .place();

then, from the resultConsumer you can have access to this information in the properties Map

res->{
    emailClient.sendAsynchronously(
            res.properties.get("mailTO"),"Report",res.product);
}

How does a requester that has received a reply know which request this is the reply for?

Conveyors require correlation identifiers to place building parts properly. ID or KEY backed up by a HashMap, so, requirements for conveyor identifiers are the same as for Hash Maps in Java. They must have good hash method and implement equals. Optionally, if you plan to keep huge amounts of data in the conveyor, they should be naturally sortable. To keep carts serializable, IDs must be serializable too. Good classes for IDs are

  • Numeric (Integer, Long, etc)
  • Strings of reasonable length
  • UUIDs
  • Enumerated Types (for some specific type of builds with IDs that can by recycled or naturally limited)

Of course, you can create your own class for IDs following above recommendations.

ID (key) is available at any stage of data processing. Cart->placement filter->Result or Scrap consumer->Acknowledge event

Builder itself, however, does not have access to this information. If you think builder should be aware about its own ID, pass ID as a regular part.

How can messaging transmit an arbitrarily large amount of data?

Use conveyor key as sequence ID and Label as position identifier. In most cases builds have known and limited number of parts. But, for cases like that you can use the LabeledValueConsumer interface.

LabeledValueConsumer<Integer,List<String>,LongListBuilder> lvc = (pos, list, builder) {
    builder.insertAt(pos,list);
}
conveyor.setDefaultCartConsumer(lvc);
...
conveyor.part()
    .id(100)
    .label(3)
    .value(values)
    .place();

If order is not important, e.g. you append data to the list or buffer, you can use any label. Readiness algorithm will depend on who actually possesses information about number of parts in the build. If number of parts known in the beginning of aggregation, you can initialize builder with this information. If exact number of parts is unknown in the beginning, but you know when the last chunk of data is sent, you can notify builder with a special message indicating end of the sequence.

How can a sender indicate when a message should be considered stale and thus shouldn't be processed?

Conveyor has extremely flexible and detailed life span controls. You can control:

  • No expiration time
  • Absolute Expiration time
  • TTL, counted from the beginning of the event. (see next)
  • Max age for the cart

Time can be controlled from:

  • Conveyor engine, as default TTL for all builds
  • From the message source. First message creating the build also sets its expiration time
  • From the builder, which in this case must implement the Expireable interface

Start time can be:

  • captured at the moment the cart is created
  • captured at the moment the build is created
  • Inherited from the previous stages
  • Provided by client explicitly, so expiration time can be calculated with greater precision.

When timeout happens, conveyor can try to execute some special piece of code, and, immediately after that, evaluate readiness of the object. It allows to increase yield of good results for products with optional parts.

Expiration time can be re-scheduled using special command or configuration of the builder. E.g. when you want to prolong session life time for active users, and evict sessions on timeout for others.

How can a message’s data format be designed to allow for possible future changes?

How you can tell the conveyor that data have certain format? It depends on depth of difference between data and your preferences.

  • Use different Labels/Setters for the same ProductSupplier (Builder)
  • Use different builder implementations (probably, keeping the same set of labels)
  • Use property metadata to keep format indicator. It will allow you to convert or route the result according to it format.
  • Combine it all in a way you like more

If your data represented by very different classes, then you probably just have to process them differently. Implement different setters for each class label them differently, or use filtering capabilities of the SmartLabel interface. e.g. XML and Json libraries keep similar entities in their own classes.

Document xmlDoc = builder.parse(input);
conveyor.part().id(10).label("XML_DOC").value(xmlDoc).place();
...
JSONObject jsonDoc = new JSONParser().parse(input);
conveyor.part().id(100).label("JSON_DOC").value(jsonDoc).place();

What if all you have is a String object? You do not want to modify it in any way, but, you can add property with format indicator to your message

String doc = request.getBody();
conveyor
    .part()
    .id(10)
    .label("DOC")
    .addProperty("CONTENT-TYPE",request.getHeader("Content-Type"))
    .value(doc)
    .place();

You can then use the PBalancedParallelConveyor to route the massage based on their CONTENT-TYPE properties.

Message Routing

How do we handle a situation where the implementation of a single logical function (e.g., inventory check) is spread across multiple physical systems?

If routing information is available in a form of as some metadata, then client can put it in the message properties. Sometimes, however, this information is enclosed into body of the data. For example, you receive some XML or JSON document as text, and somewhere inside it there is a field called "dataType"", containing name of the class. In this case you can build a transforming scalar conveyor, that takes one type, and transforms it into another, that can be later queries using regular APIs for required routing information. In case of this example, String --> Document or JSONObject --> Domain Object

How can a component avoid receiving uninteresting messages?

Use cart properties to keep filtering metadata. Reject carts or re-route according to available information. Throw an exception from the Builder setter or readiness predicate as soon as you discover that you are not interested in that particular information.

How can you avoid the dependency of the router on all possible destinations while maintaining its efficiency?

Loaders usually obtained from instances of their conveyors, but, they can be also instantiated explicitly with a pretty complex constructors, requiring to define "placing" lambdas. In current version there is no direct support for routing, but, as you can see from definition, routing is nothing but "placing" data into a right instance of conveyor. This is how parallel and persistent conveyors work. They implement the Conveyor interface and then re-define placement lambdas for loaders.

It is very likely, that combination of existing conveyors, that provide routing by ID, Labels and Properties will be enough for all of your practical tasks.

How do we route a message to a list of dynamically specified recipients?

Pass recipient list in the cart properties. They will be preserved until your build is ready, and passed to ResultConsumer. Or to scrapConsumer, if build failed and you have to notify recipients about failure.

How can we process a message if it contains multiple elements, each of which may have to be processed in a different way?

Conveyor is a perfect aggregator for split messages. Just label each part, so that builder will later know how to assemble them in one product.

How do we combine the results of individual, but related messages so that they can be processed as a whole?

Conveyor fully implements the Aggregator pattern. To aggregate parts in a whole Product you need:

  • Correlation ID to identify which part belongs to which product
  • Labels to mark and name parts
  • Builder for the Product, implementing Supplier<? extends Product> or Product get(); method
  • Time span of parts and entire build, when you need it
  • Readiness algorithm. Some popular readiness algorithms provided.
  • Timeout action, if applicable
  • Destination for results (Result Consumer)
  • Destination for broken parts or builds (Scrap Consumer)

How can we get a stream of related but out-of-sequence messages back into the correct order?

Use Numeric labels to pass sequence order and LabeledValueConsumer to place the data into a right order;

class ListBuilder implements Supplier<Collection<String>>, Testing {
    Map<Integer,String> data = new TreeMap<>();
    boolean ready = false;
    @Override
    public Collection<String> get() {
        return data.values();
    }
    @Override
    public boolean test() {
        return ready;
    }
}

Usage:

Integer READY = -1;
AssemblingConveyor<Integer,Integer,Collection<String>> sortingConveyor = new AssemblingConveyor<>();
LabeledValueConsumer<Integer,String,ListBuilder> labeledValueConsumer = (pos,val,listBuilder)->{
    listBuilder.data.put(pos,val);
};
labeledValueConsumer = labeledValueConsumer.when(READY,(listBuilder,s)->{listBuilder.ready = true;});
sortingConveyor.setDefaultCartConsumer(labeledValueConsumer);
sortingConveyor.setName("resequencer");
sortingConveyor.setBuilderSupplier(ListBuilder::new);
sortingConveyor.resultConsumer(LogResult.debug(sortingConveyor)).set();

sortingConveyor.part().id(1).label(2).value("two").place();
sortingConveyor.part().id(1).label(1).value("one").place();
sortingConveyor.part().id(1).label(3).value("three").place();
sortingConveyor.part().id(1).label(READY).place().join();

Output:

DEBUG:76 [resequencer] - ProductBin [key=1, product='[one, two, three]', unexpireable, status=READY, properties={}]

How can you maintain the overall message flow when processing a message consisting of multiple elements, each of which may require different processing?

Use LBalancedParellelConveyor.

Label-balanced conveyor will route messages according their labels, and then aggregate them all in one common result.

How do you maintain the overall message flow when a message needs to be sent to multiple recipients, each of which may send a reply?

Scatter-Gather is easy to implement with the conveyor.

  • It implements the Aggregator pattern directly
  • Has flexible timeout support
  • Flexible readiness algorithm that can be customized for any task
  • You can provide default answers, in case some of the recipients have not provided any answer or timed out.

How do we route a message consecutively through a series of processing steps when the sequence of steps is not known at design-time and may vary for each message?

Keep routing slip with the cart properties, preserve it at every step. You can even enrich it with additional details at the end of every step.

How do we route a message through multiple processing steps when the required steps may not be known at design-time and may not be sequential?

Implement Process Manager as a conveyor Builder; build and forward new messages to child conveyors. Its readiness algorithm should take into account all completed, uncompleted and pending steps. Managing Builder can keep all intermediate data, settings, defaults and timeout algorithm.

How can you decouple the destination of a message from the sender and maintain central control over the flow of messages?

Simple Scalar Conveyor, placed at the input stream of your message bus can be used as a message broker. It should do minimal data transformation. Just enough to decide where to place a message; take care about such things as correlation IDs, labeling, message metadata, e.g. return addresses, expiration time and priorities.

Message Transformation

How can existing systems participate in a messaging exchange that places specific requirements on the message format, such as message header fields or encryption?

Conveyors accept data wrapped ino a special container called Cart. This container, obviously, can hold users data, but, it also usually contains correlation ID of the build, label, expected time span for the build or specific piece of data, creation time, properties, and priority. Carts can also be used to deliver special types of data. Such as, conveyor commands, builder suppliers and futures.

In most cases you do not have to work with Carts directly. Conveyors provide convenient factories called Loaders. Loaders immutable and support chainable DLS for loading data. Example:

conveyor
    .part()
    .id(1000)
    .label(LABEL)
    .value("value")
    .ttl(Duration.ofMillis(10000))
    .addProperty("version",1)
    .priority(1)
    .place();

Identical code with cart will look like:

Map<String,Object> properties = new HashMap<>();
properties.put("version",1);
Cart<Integer,?,String> cart = new ShoppingCart(1000,"value",LABEL,currentTime,expirationTime,properties,LoadType.PART,1);
conveypr.place(cart);

I show how to use it, in case you need to implement your own conveyor accessing its low level features. As you can see, Loaders are much easier to use.

Carts are serializable, but, if you want to serialize them, make sure that IDs, labels, parts and properties you use are serializable too.

How do we communicate with another system if the message originator does not have all the required data items available?

Conveyor is a perfect tool to build the Content Enricher. From the builder code you can trigger multiple asynchronous enrichment requests. Container will hold data for you until they all collected, will provide reasonable defaults if some of them not available, or will report about found issues. After all, it will gracefully clean up after itself.

One of the convenient ways to provide defaults is to use static parts. Static parts use the same labels as regular parts, but, they don't have correlation IDs. Instead, they applied to all newly created builds.

conveyor.staticPart().label("MAX_VALUE").value(100).place();

You can change or delete static part at any time.

How do you simplify dealing with a large message, when you are interested only in a few data items?

Extract and keep in the builder only data you need. Pass reduced data forward, if necessary. Note, that if conveyor wrapped in Persistence, it will keep the full message.

How can we reduce the data volume of message sent across the system without sacrificing information content?

Restore the build from persistence created at the previous Content Filter stage, but, with different type or configuration of the Builder class. Such that instead of clipping information, it will be fully aggregated.

How do you process messages that are semantically equivalent, but arrive in a different format?

  1. Use different labels and builder consumer methods for data in different formats. It will not affect the product, since it contains only normalized data.
conveyor.part().id(1).label("DATE_ISO").value("2008-09-15T15:53:00").set();
conveyor.part().id(2).label("DATE_UNIX_DATE").value("Wed Feb 27 22:36:53 EST 2019").set();

if you use Smart Labels and different data formats represented by different classes, you can group semantically close parts under the same label using its "intercept" method

SmartLabel<Builder> DATE = SmartLabel.of("DATE",Builder::setDateIsoString);
DATE = DATE.intercept(Date.class,Builder::setDate)
...
conveyor.part().id(1).label(DATE).value("2008-09-15T15:53:00").set();
conveyor.part().id(2).label(DATE).value(new Date()).set();
  1. If option one becomes too messy because of number of methods you have to override and labels to support, create different implementation for the entire builder. Put common data consumers in abstract implementation, and all differences in descendant classes. In this case, semantically equivalent messages can stay under the same label. If you use more than one Builders with the same conveyor, default BuilderSupplier cannot be used. You have to provide builder for each build explicitly.
conveyor.build().id(3).supplier(BuilderVersion1::new).create();
conveyor.build().id(4).supplier(BuilderVersion2::new).create();
...
conveyor.part().id(3).label("DATE").value("2008-09-15T15:53:00").set();
conveyor.part().id(4).label("DATE").value("Wed Feb 27 22:36:53 EST 2019").set();
  1. If creating new build instance explicitly complicates things too much, create separate conveyors for each type and group them with the PBalancedParallelConveyor. You must provide format version property with each message
AssemblingConveyor<Integer, String, Product> c1 = new AssemblingConveyor<>();
AssemblingConveyor<Integer, String, Product> c2 = new AssemblingConveyor<>();
c1.setBuilderSupplier(BuilderVersion1::new);
c2.setBuilderSupplier(BuilderVersion2::new);
ConveyorAcceptor<Integer, String, Product> t1 = new ConveyorAcceptor<>(c1);
t1.expectsValue("version", "iso");
ConveyorAcceptor<Integer, String, Product> t2 = new ConveyorAcceptor<>(c2);
t2.expectsValue("version", "unix_date");
PBalancedParallelConveyor<Integer, String, String> conveyor = new PBalancedParallelConveyor<>(t1,t2);

...
conveyor.part().id(3).label("DATE").value("2008-09-15T15:53:00").addProperty("version", "iso").set();
conveyor.part().id(4).label("DATE").value("Wed Feb 27 22:36:53 EST 2019").addProperty("version", "unix_date").set();

Messaging Endpoints

How do you encapsulate access to the messaging system from the rest of the application?

When conveyor is configured and running, client should almost never touch these configurations. In most cases client should only put parts in it. I strongly recommend to use Part Loaders. They immutable and safe.

PartLoader<Integer,OrderOperations> orders = PartLoader.byConveyorName("orders")
...

orders.id(1),label(CANCEL_ORDER).place();

How do you move data between domain objects and the messaging infrastructure while keeping the two independent of each other?

Conveyors explicitly separate Product class from its Builder. All messages always delivered to the Builder, and Builder knows at which moment and how to build the product. This explicit separation allows to build extremely flexible systems. You can always have more than one builder for the same domain Object, and chose any one you need based on available metadata.

How can a client control its transactions with the messaging system?

By implementing transactional builder, supporting START/COMPLETE/ROLLBACK messages. Builder should be able to keep two sets of data. One current, and one in-work, which is copied from the current set on START command. When COMPLETE message received, work dataset becomes current. On ROLLBACK whole in-work set should be just deleted. Parts should be always places in the in-work dataset, while get() methods must take data only from the current dataset. Use Persistence if your messaging system does not support it.

How can an application consume a message when the application is ready?

Conveyor is a polling consumer. It is constantly waiting for new messages in its input queue. It can be temporarily suspended when you need it, and then resume reading the data flow. Conveyor supports different implementations of the input balancing queue.

How can an application automatically consume messages as they become available?

Conveyors are asynchronous receivers. They take a message and put it into the input queue, then notify inner processing thread that message is available. Synchronization payoff is low, and overall throughput of conveyors is very high. If input data flow is getting out of control, you can add occasional synchronization to check status of the input queue or limits on its size.

How can a messaging client process multiple messages concurrently?

Use Parallel Conveyors. They can balance traffic by correlation ID, Labels or Cart properties.

How can multiple consumers on a single channel coordinate their message processing?

Use Parallel Conveyors. They can balance traffic by correlation ID, Labels or Cart properties.

If you need to build a dispatcher at the conveyor output, use ResultConsumer Functional interface filtering to process the output.

How can a message consumer select which messages it wishes to receive?

ResultConsumer and ScrapConsumer are flexible functional interfaces. Their default methods, such as

consumer.andThen(otherConsumer);

or filtering

consumer.filterStatus(status->Status.TIMED_OUT==status);

and others, allow to build selective consumers from more simple blocks.

How can a subscriber avoid missing messages while it’s not listening for them?

Use Persistence. It uses write-before-placement strategy. Once message stored in the persistence database, it will be delivered, even if you had to re-start your system.

How can a message receiver deal with duplicate messages?

Dealing with duplicate messages is a tough question, and conveyor can help you to find right approach.

  1. Simple aggregator with no side effects.

This is a simple case. However, duplicate message can be a sign of some issue in the system. The only question you have ask yourself, how should you treat duplicated parts? Here some options.

  • accept and update the value every time it arrived
  • accept only first, ignore others
  • accept first and reject others. KeepRunningConveyorException thrown from the setter of the builder will pass rejected cart to the Scrap Consumer, but, the build itself will keep running.
  • accept first and reject others. Exception thrown from the setter of the builder will terminate the build. Obviously, something went wrong.
  1. Summator with no side effects.

Option one does not work in some cases. For example, if you build some kind of budget, you can put and withdraw money from it many times in any order. Nobody wants any of those messages being processed more than once. The solution here is to mark every message with some unique ID. Conveyor should be able to recognize messages that were processed already. One of possible solutions is provided by the conveyor Persistence.

  • You should add a unique property to any part you send. e.g. TRANSACTION_ID
  • Instruct persistence to create a unique index on this property, or combination of it with some other message attributes

Example:

Builder and operations

class BalanceBuilder implements Supplier<Double> {
    Double summ = 0.00;
    @Override
	public Double get() {
        return summ;
    }
    public void add(Double m) {
        summ += m;
    }
    public void withdraw(Double m) {
        if(m > summ) {
            throw new KeepRunningConveyorException("Withdraw of $"+m+" rejected. Balance is low");
        }
        summ -= m;
    }
}

enum BALANCE_OPERATION implements SmartLabel<BalanceBuilder>{
    ADD{{ setter = (bb,val)->bb.add((Double)val); }},
    WITHDRAW{{ setter = (bb,val)->bb.withdraw((Double)val); }},
    CLOSE{{ setter = (bb,val)->{}; }};
    BiConsumer<BalanceBuilder, Object> setter;
    @Override
    public BiConsumer<BalanceBuilder, Object> get() { 
        return setter;
    }
}

Usage:

LastResultReference<Integer,Double> result = new LastResultReference();

JdbcPersistenceBuilder<Integer> jpb = JdbcPersistenceBuilder.presetInitializer("mysql", Integer.class)
		.autoInit(true)
		.partTable("BALANCE")
		.completedLogTable("BALANCE_LOG")
		.user("root")
		.addField(Long.class, "TRANSACTION_ID")
		.addUniqueFields("TRANSACTION_ID");

AssemblingConveyor<Integer, BALANCE_OPERATION, Double> balance = new AssemblingConveyor<>();

PersistentConveyor<Integer, BALANCE_OPERATION, Double> persistentBalance = jpb.build().wrapConveyor(balance);
persistentBalance.setReadinessEvaluator(Conveyor.getTesterFor(balance).accepted(BALANCE_OPERATION.CLOSE));
persistentBalance.setBuilderSupplier(BalanceBuilder::new);
persistentBalance.resultConsumer(result).set();
persistentBalance.scrapConsumer(LogScrap.stdErr(balance)).set();

PartLoader<Integer, BALANCE_OPERATION> loader = persistentBalance
		.part()
		.id(1);

loader
		.label(BALANCE_OPERATION.ADD)
		.value(100.00)
		.addProperty("TRANSACTION_ID",1)
		.place();
loader
		.label(BALANCE_OPERATION.ADD)
		.value(100.00)
		.addProperty("TRANSACTION_ID",1) //duplicate
		.place();
loader
		.label(BALANCE_OPERATION.ADD)
		.value(200.00)
		.addProperty("TRANSACTION_ID",2)
		.place();
loader
		.label(BALANCE_OPERATION.WITHDRAW)
		.value(50.00)
		.addProperty("TRANSACTION_ID",3)
		.place();
loader
		.label(BALANCE_OPERATION.WITHDRAW)
		.value(500.00) //over the limit
		.addProperty("TRANSACTION_ID",4)
		.place();
loader
		.label(BALANCE_OPERATION.CLOSE)
		.addProperty("TRANSACTION_ID",5)
		.place().join();
System.out.println(result);

Output:

ScrapBin [DATA_REJECTED key=1: Site Processor failed; BuildingSite [builder=AcknowledgeBuilder [persistence=MYSQL Persistence [schema=null partsTable=BALANCE completedTable=BALANCE_LOG archiveStrategy=DELETE encryption=OFF], cartIds=[1552103002000000000], forward=AssemblingConveyor [name=Persistent<AssemblingConveyor 15>, thread=15], complete=false], initialCart=PART [key=1, value=PART [key=1, value=100.0, label=ADD, expirationTime=0, properties={#CART_ID=1552103002000000000, #TIMESTAMP=1135628495694494, TRANSACTION_ID=1, 1552103002000000000=#CART_ID}], label=CART, expirationTime=0, properties={TRANSACTION_ID=1}], acceptCount=3, builderCreated=1552103002069, builderExpiration=9223372036854775807, status=INVALID, delay=9223370484751773533, lastError=com.aegisql.conveyor.persistence.core.PersistenceException: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1' for key 'BALANCE_TRANSACTION_ID_IDX', eventHistory={CART=1, MIN_COMPACT=1, MODE=1}] error=java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1' for key 'BALANCE_TRANSACTION_ID_IDX']
ScrapBin [KEEP_RUNNING_EXCEPTION key=1: Site Processor failed. Keep running; PART [key=1, value=500.0, label=WITHDRAW, expirationTime=0, properties={#CART_ID=1552103002000000004, #TIMESTAMP=1135628706524376, 1552103002000000004=#CART_ID, TRANSACTION_ID=4}] error=Withdraw of 500.0 rejected. Balance is low]
ResultReference [250.0]
  1. Builds with side effects.

In some cases you need to do something with the results, that required access to some external resource. It can be a database, service on the internet, even writing into a file, which is usually considered as a reliable operation. And you don't want to do it twice. In general, this problem cannot be solved, because, there would always be a tiny gap between the moment you addressed your external resource, successfully or not, does not matter, and the moment you applied acknowledge to ensure completeness of transaction. Your system can crash right in the middle.

What you can do, you can build a system that will be able to discover situations with potentially unfinished transactions and allow you to implement some kind of reconciliation algorithm or protocol. For that, you have to create a builder with slightly higher involvement in the message processing, than just simple aggregating parts and building a result.

  • Reserve special messages to mark beginning and end of unsafe events.
  • Keep order of events inside the builder.
  • Take a part loader from persistent conveyor and send it to the build as a regular part. Builder, of course, must be able to hold this additional information. Since Loader for the same conveyor never changes, you can use Static Part to keep it for all builds.
  • Builder should know its correlation ID. It should be sent to builder as any other part, but, before the "unsafe" part. Good idea is to send ID as a first message, which will alo initialize the build.
  • Before starting sensitive operation, send "STARTED" message to itself with higher priority than regular events.
  • After finishing operation, send "COMPLETED" message to itself with priority even higher than "STARTED" message.
  • Configure persistent conveyor to restore builds with high priority messages first. Thus, all "COMPLETED" messages will be processed before other events, and builds will know, they do't have to do anything else. It is possible, that one message per conveyor will receive "STARTED" message first. It means that crash happened somewhere between your action and persisting the "COMPLETED" message. This is a potential issue. What you should do in this case depends completely on the task. It can be something simple, like checking if target file or record in database exists, or ot may require making a phone call to your client and ask him to check if some events happened or not. In any case, this is something you can deal with and control.
Persistence<Integer> p = JdbcPersistenceBuilder
    .presetInitializer("derby", Integer.class)
    .autoInit(true)
    .setArchived()
    .restoreOrder(RestoreOrder.BY_PRIORITY_AND_ID) //put higher priority parts first
    .build();

How can an application design a service to be invoked both via various messaging technologies and via non-messaging techniques?

Essential part of the Conveyor technology is - implement one ore more Builders for the Product(s). It means, you can use and test builders absolutely independently from any messaging, including conveyors themselves. Conveyor labels is nothing but named builder API!

And vice versa, if you already implemented some kind of builders for your products, then you almost ready to start using conveyors! There is only one requirement for the Builders. They must implement Supplier<? extends Product> interface.

Builder - is a very potent creational design pattern. It has very few limitations and can be adopted in almost any situation.

System Management

How can we effectively administer a messaging system that is distributed across multiple platforms and a wide geographic area?

Conveyors are easy to control. They accessible by their names from any part of your code. Basic management commands and conveyor health status available via JMX. Builds can be parametrized by using static parts for new builds, and placement predicates for active builds. You can interrupt or cancel existing builds, dynamically change default result and scrap consumers, timeout settings, and many other things.

Configurator package allows to instantiate and configure conveyors from properties or YAML files, environment variables or JVM properties.

How can you route a message through intermediate steps to perform validation, testing or debugging functions?

Parametrize builder to do all additional functionality for you. Manage it via static or regular labeled values.

conveyor.staticPart().label("LOG_LEVEL").value(Logger.ERROR).place();
...
//then, later, for a specific build, increase log level
conveyor.part().id(100).label("LOG_LEVEL").value(Logger.DEBUG).place();

How do you inspect messages that travel on a point-to-point channel?

Add any number of placement validators. If you need asynchronous processing, put the cart in some other conveyor.

conveyor.addCartBeforePlacementValidator(cart->{
    wire_tap_conveyor.place(cart);
});

How can we effectively analyze and debug the flow of messages in a loosely coupled system?

You can use Conveyor Persistence to keep message history permanently. Explicit control of expiration time and flexible archiving strategies will help to keep size of conveyor database manageable.

How can we report against message information without disturbing the loosely coupled and transient nature of a messaging system?

If you need to store a duplicate of a message you can use second conveyor - persistent or not. Storage conveyor has the same ID and Labels as the main one, but, its builder, readiness algorithm and timeout settings serve different purpose.

How can you track messages on a service that publishes reply messages to the Return Address specified by the requestor?

Use conveyor cart properties to keep result destination.

conveyor.part().id(1).label("test").value(100).addProperty("return_to","conveyor_a").place();

What happens, though, if a component is actively processing messages, but garbles outgoing messages due to an internal fault?

Conveyor has multiple tools to control its health status

  • Detailed logging at TRACE or DEBUG level
  • In case of failure, precise failure status is available, including processing stage and exception object
  • JMX provides basic statistics about successful and failed builds.
  • You can add yor own filters in front of the input queue.
conveyor.addCartBeforePlacementValidator(cart->{
    // do something with the cart
    // if you throw any exception here, cart will be rejected and passed to ScrapConsumer
})
  • Result and Scrap consumers can be chained, so you can collect your own information about results of failures without disturbing actual recipient
  • Use metadata (properties) to help identify test message
ResultConsumer<Integer,String> mainConsumer = bin->{
    //do something with the result
};
ResultConsumer<Integer,String> testingConsumer = bin->{
    //do something with test
}
//only for test_message
testingConsumer = testingConsumer.filter(bin-> bin.properties.containsKey("test_messge"));

conveyor.first(tetConsumer).andThen(mainConsumer).set();

conveyor.part()
   .id(100)
   .label("report")
   .value("balance = $100.00")
   .addProperty("test_message",true).place();
  • Finally, right before build is completely evicted from the conveyor, you can specify the Acknowledge event
conveyor.addBeforeKeyEvictionAction(acknowledgeStatus->{
    //do something on acknowledge
})

Note that Result and Scrap consumers, before-placement filters and Acknowledge events have access to properties metadata

How can you keep 'left-over' messages on a channel from disturbing tests or running systems?

When conveyor is stopped, all remaining messages in the input queue and all unfinished builds will be passed to Scrap Consumers. Stopped conveyor will immediately start rejecting any new incoming messages. Stopped conveyor cannot be restored. You have to create new instance of the conveyor, if you want to restore the channel.

conveyor.stop();
⚠️ **GitHub.com Fallback** ⚠️