Home - aegisql/conveyor GitHub Wiki

RACE - Reactive Aggregation and Creation for Enterprise

New in 1.6.6

  • Filters for persistent parts
  • Dependencies updates
  • Bugfixes and refactoring

Other releases

NOTE:

This documentation might not reflect all current changes and features.

Current version is 1.6.x-SNAPSHOT

About RACE

RACE is a scaleable asynchronous enterprise integration and creational Java framework. It brings re-activeness to well know Object Oriented creational design patterns, such as Builder, Factory, Prototype, and others. Unlike other reactive technologies, it is focused more on data integrity and time management, than on data transmission mechanisms. RACE can be easily integrated with any concurrent technology, including Java threads, streams, messages, events or actors. Consider it as a complimentary "missing link", that makes joining and aggregation of asynchronous data much easier. It is compact and has minimal dependencies.

Why you might want to use RACE?

  • You need to aggregate, group, and/or transform data from independent sources with unpredictable order
  • You need detailed time management for all building parts and the final result
  • You need high level of control over the process. "Black Box" is great as a concept, but not for your project.
  • You want to be reactive and have all above mentioned features in a non blocking manner.
  • Your current aggregator cannot suggest a solution to all aggregation challenges you have

Lets consider few examples of real applications where conveyors can be useful:

Internet Auction

Auction service collects bids from many participants. Auction time span is defined by auction policy. Its expiration and completion rules can be very tricky and can change dynamically. Every bidding event in the auction triggers lots of sub-events, such as sending notification for participant who has just lost his bid. They all though should respect timespan of the auction.

Log aggregator

You have to group log records by certain event, order them by time and then transform records to a form suitable for analysis. Multiple events happen at the same time and leave their traces in multiple log files on multiple computers. Event started in one log can be finished in another, or, sometimes, never finished. Some records can be corrupted. Some can have different formats. It may happen when you have to aggregate logs from third party or legacy applications.

There is a reason why log aggregators are so popular. With the RACE you probably do not need them.

Searching in multiple sources

Task with very strict timeout requirements. Usually, user expects an answer in below one second interval. You send parallel asynchronous search requests to multiple data sources, and combine responses in one. If some of the sources take too much time or failed to answer, you send response on timeout with all results you have collected from faster supplies.

Report building

Similar to searching in multiple sources, but, instead of strict timing out it requires persistence on obtaining results form all sources included in the report. If some source became unavailable for a short time, you system should be able to reschedule the request, while keeping valid data from other sources. If it has to give up then clear error path should be defined.

Exactly one event

Sometimes processing the same event twice is unacceptable. Payments with credit cards or accumulators for data and statistics require that every request must be processed exactly one time.

Smart cache

Smart cache provides rapid access to a very complex data structure, which parts has to be periodically updated from multiple independent sources; changes become visible to clients only when they complete, consistent and validated.

Synchronizer

The goal of synchronizer is to produce a simple synchronization message and put it in the output stream. Synchronizer usually does not care about actual data content. It only needs to know that all required sub-goals were accomplished, and only then release the message. The message itself can be empty or contain just enough information for tracking and debugging purposes.

Requirements

RACE requires Java 17 or higher.

Last version supporting Java 11 is 1.5.3

Last version supporting Java 8 is 1.4.4

Aggregators

RACE provides implementation of the Enterprise Integration Aggregator Pattern, but it go's beyond it. With the RACE you can achieve many of the Enterprise Integration goals with less efforts. You can find more details in this Article. RACE Conveyors can be used in stand-alone applications, or in distributed Integration systems. Meaning of the Aggregator in the Integration world is to keep state in a changing stream of data, and then provide some "aggregated" result, when ready. The the best way to keep state in OOP and to produce results from it is to use Creational Design Patterns, specifically...

Builders

Builder Pattern is a popular object creation software design pattern. You can find more details on the Wikipedia with code examples, diagrams and use cases.

Schema below shows a typical time line for the builder pattern in a single thread app. Single Thread  Builder

It starts with creation of an instance of the Builder. Builder can be empty or provide some reasonable default values for its building parts. Building parts show up in certain order defined by developer of the app. As soon as all necessary building parts arrived, actual building process starts. Result of the building process is some Product object. Builder object after that is usually discard. If you decide to re-use the builder, then it technically becomes a Factory

Classic Builder schema is usually insensitive to order of its building parts. However, decision when exactly call building process is critically important to obtain correct and consistent results. Knowledge about that exact moment is implicitly present in code which is using the Builder and providing building parts to it.

Some reasons why developers can decide to use builders:

  • Avoid “constructor hell”
  • Building parts require complex validation
  • Building parts require non trivial processing
  • Avoid unnecessary dependencies in the Product
  • Create complex immutable objects
  • Object prototyping
  • Flexible and configurable defaults
  • Product Polymorphism
  • Lazy instantiation. You create the Product only when you need it from earlier available parts.

Reactive Builder

Reactive Programming is a programming paradigm around data flows. Some features of reactive systems were summarized by authors of the Reactive Manifesto. Properly designed reactive system is:

  • Responsive: The system responds in a timely manner if at all possible.
  • Resilient: The system stays responsive in the face of failure.
  • Elastic: The system stays responsive under varying workload.
  • Message Driven: Reactive Systems rely on asynchronous message-passing to establish a boundary between components that ensures loose coupling, isolation and location transparency.
  • Maintainable
  • Extensible

When a reactive approach is embedded in an Object Oriented programming language, such as Java, developers face certain challenges caused by conflicts between mutability of Java objects and pure functional nature of reactive expressions.

RACE Framework project, thus, is an attempt to create a framework that will take care about most of concerns developers face when trying to combine reactive non-blocking data flows and classic, familiar to all Java developers, object creational patterns. Imagine that you have to build more than one Product object and that different building parts arrive asynchronously, from independent sources and parallel threads. It means that at every moment you have several instances of the Builder waiting for their parts in different state of incompletion. Builder knows which parts have already arrived, but it cannot make any predictions about size, order and pace of other data. It has to make a decision and build the Product as soon as all necessary parts became available or discard the build after certain awaiting time.

Here is an incomplete wish list for the RACE Framework

  • Independent from any data transportation technology
  • Accepts building parts of any type in their natural order
  • Life circle of the Builder is data driven
  • Waits for all building parts necessary to build the Product
  • Provides Thread safety
  • Store data, if necessary, in a persistent storage, until the build is complete. Restore build context from this storage.
  • Provide, if necessary, synchronization mechanisms for building parts and entire Product
  • Do not create unnecessary synchronization or blocking
  • Flexible build expiration and timeout algorithms
  • Error handling
  • Easy to test
  • Scalability
  • Integration in existing monitoring infrastructure.
Reactive Builder

We can expect some features from the Reactive Builder that will make it useful and convenient. It is quite obvious that each Builder and its Building parts must possess some Correlation ID to bind incoming data with proper instance of the Builder. We can also expect that any failure or interruption in flow of some of the building parts should not affect other good data. Building process will have some well defined life span AKA Time To Live (TTL). There should be no memory leaks. i.e. if builder has not received all necessary data in certain period of time, then the process should be canceled and all allocated resources properly released. At the same time, user should be able to make his own decision what to do with incomplete builds and its building parts. Data should not be lost if they still have some value for the application. It should be relatively easy to scale the Build process. Also, dependency between reactive part and actual Builder implementation should be reduced to absolute minimum.

The project is relatively compact and should be easy to start using right out of the box. External dependencies limited to the SLF4J Logger library.

Let's begin

Make sure you using Java 17 or higher

If you use Maven - add this dependency:

<dependency>
  <groupId>com.aegisql</groupId>
  <artifactId>conveyor-core</artifactId>
  <version>1.6.6</version>
</dependency>

check for the latest release in the Maven Repository or check out code from GitHub for the latest snapshot.

For Java 8 use version 1.4.4

Check list

Reactive Builder is easy to use, but before you can start using it you need to answer some questions.

Don't be afraid. The list is pretty long, but most of the answers you already know, or thought about. Each step will be discussed in details in next sections, here is just a summary.

Step Comment
What I build? Define class of the Product. As soon as you know it, you know that your Builder must implement the Supplier<Product> interface. Then tell the conveyor how exactly it should create and initialize the Builder.
How I identify each build? Decide which class to use for the unique Correlation IDs. Good keys are immutable, implement equals, hashCode and naturally comparable. Strings, UUIDs or Integers make good keys.
Build steps labeling Create list of steps in which you can build the product. If your list has limited number of named steps, you most likely can create constant labels from it. If not, conveyor has solutions for fluent labels too.
Building parts and their types This will tell you basic interface for the Builder implementation. Think if building parts require some non-trivial and resource consuming processing. List of parts and List of steps can be often combined
Mandatory, optional and default parts Is there any default values that can be used if some parts are not available? Where these defaults coming from?
When the build is ready? Readiness algorithm is a Predicate that returns true only when Builder is ready to build the Product. Some common frequently used readiness algorithms provided by the framework
What's the time span of the build? This framework provides reach timing configuration. Even if you think you do not need it, defining some default timeout and idle heartbeat is almost always a good idea to prevent memory leak in heavily loaded applications.
What to do when errors happen? What really? Errors will happen. This can be a tough question, with as many answers as many different errors may happen. Conveyor will provide you mechanisms to handle errors.
What to do if timeout happened? This is your last chance to prepare the build, using reasonable defaults, or fail.
What do I do with the results? Do you need synchronous or asynchronous access to the result? Use CompletableFuture<Product> for synchronous and ResultConsumer for asynchronous result access.
How to scale performance? Use ParallelConveyor. Should it be balanced by key, or by label, or combination of both? Really, the LAST thing to worry about. Make it work first, then you will know how to scale.
What if I need persistence Ligt weight PersistentConveyor package simplifies creation of resilient applications
Configuration Comprehensive Conveyor configuration can be taken from property or YAML files.

Conveyor Configuration

Instantiating the AssemblingConveyor

Main class of the RACE framework is AssemblingConveyor.

The AssemblingConveyor constructor asks you to define three generic types.

  • Class of a unique Build ID
  • Class that will be used to label building parts
  • Class of the Product

For the Build IDs or keys you can use any type you find appropriate. It can be a numeric type, it can be a String, Session ID, UUID, or your own Class. In this case it must follow recommendations for Java HashMap keys. i.e. has implemented equals and hashCode methods.

Labels also can be of any type. Since number of different building parts is usually limited, it is convenient to use enum's or final static fields.

As soon as you know the answers you can create an instance of the conveyor.

Example.

  • Every new build will have a unique incremental Integer ID
  • Building parts will be identified by String labels
  • Product class is MyProduct

With this in mind you can create an instance of the AssemblingConveyor:

Conveyor<Integer, String, MyProduct> conveyor = new AssemblingConveyor<>();

You also can pass to the constructor a Supplier for alternative inner Queue implementation.

Conveyor<Integer, String, MyProduct> conveyor = 
   new AssemblingConveyor<>( () -> new ArrayBlockingQueue(100) );

Example above creates conveyor with a Queue which capacity is limited by only 100 elements. You can chose from any Queue implementation, which you think is appropriate for your task. Default queue is ConcurrentLinkedQueue.

Assembling conveyor is now created, however, it is not very useful at this point, because it is not configured. You have to tell it how exactly you going to build the MyProduct, when to build it, and what to do with the result.

Builder Supplier

1 - Tell the conveyor how to build the Builder.

Builder is a class that should be able to create an instance of the MyProduct. Java8 has an interface Supplier with a method get() that returns you a required object. So, your builder must implement the Supplier interface. e.g.

public class MyProductBuilder implements Supplier<MyProduct> {
   public MyProductBuilder() {
      //Constructor with no parameters
   }
   public MyProduct get() {
      //Build the MyProduct object here
   }
}

Then you can tell your conveyor how to create new builder for every new build.

conveyor.setBuilderSupplier(MyProductBuilder::new);

MyProductBuilder::new is itself a Supplier of MyProductBuilder. So, if construction of your builder requires some complex initialization then you can provide any Supplier<Supplier<? extends OUT>> you need to create and activate your builder. <? extends OUT> means that you are not limited with a single OUT class. Your builders can produce OUT class and any class derived from it. Why Conveyor requires a Supplier, and not a Builder instance? Because in this case user has no access to the Builder object itself. It only deals with Building Parts and final Product.

BuilderSupplier is a functional interface with some additional features. You can find more details here: https://github.com/aegisql/conveyor/wiki/Builder-Supplier

Labels

2 - Tell the conveyor how to identify and use Building Parts

Building Parts can have absolutely any type and value, including null. Builder must provide an interface for each building part it expects. This type of interface is called a Consumer. All conventional POJO setters are consumers because they take one value parameter and return nothing. When you know how to put Building Parts into your builder you need to tell the Conveyor about that.

The second generic parameter we gave to the Conveyor was for Labels. Lets consider the following example. We need to build an object of type Person. Person has tree mandatory fields: First Name, Last Name and Date of birth.

public class Person {
   private final String firstName; // First building part
   private final String lastName;  // Second building part
   private final Date dateOfBirth; // Third building part
 
   Person(String firstName, String lastName, Date dateOfBirth) {
      this.firstName = firstName;
      this.lastName = lastName;
      this.dateOfBirth = dateOfBirth;
   }
   // more methods to access data. not shown
}

Note that Person is immutable (all fields declared as final) and has no public constructor. Lets create a builder for it.

public class PersonBuilder implements Supplier<Person>{
   private String firstName;
   private String lastName;
   private Date dateOfBirth;
 
   public PersonBuilder() {
   }

   public void setFirstName(String firstName) {
      this.firstName = firstName;
   }
   public void setLastName(String lastName) {
      this.lastName = lastName;
   }
   public void setDateOfBirth(Date dateOfBirth) {
      this.dateOfBirth = dateOfBirth;
   }

   @Override
   public Person get() {
      //Create and return a new instance of the Person
      return new Person(firstName,lastName,dateOfBirth); 
   }

}

PersonBuilder has three setters and for this example we will assume that each of them is a separate building part. Imagine multiple users filling up GUI SignUp forms with three mandatory fields on a first page. Session starts as soon as user put some data in any of those fields in any order. And as soon as all three fields have data user can be sent to the next stage.

Now we must tell the conveyor how to find right destinations for building parts.

There are several methods for that. Both use Labels to bind data with corresponding Builders's API

First Method - Also, the easiest one. Allow Builder to guess using the Java Reflection API, Use Strings as labels, that match setter or field names of the Builder class.

SimpleConveyor<Integer,Person> conveyor = new SimpleConveyor<>();
conveyor.setBuilderSupplier(PersonBuilder::new);
conveyor.setReadinessEvaluator(
   Conveyor.getTesterFor(conveyor).accepted("setFirstName","setLastName","setDateOfBirth"));
			conveyor
				.part()
				.value("John")
				.id(1)
				.label("setFirstName")
				.place();
			conveyor
				.part()
				.value("Silver")
				.id(1)
				.label("setLastName")
				.place();
			conveyor
				.part()
				.id(1)
				.value(format.parse("1695-11-10"))
				.label("setDateOfBirth")
				.place();

Second method - Use simple descriptive labels, like Strings or simple enum's with speaking for themselves texts (setter method or field names work well) or numbers or any other type you think will clearly identify your data. Then call the conveyor.setDefaultCartConsumer method that takes a LabeledValueConsumer for three parameters. (Java 8 does not have a TriConsumer, so we provided one). Expected parameters are - label, value, and an instance of Builder.

AssemblingConveyor<Integer, String, Person> conveyor = new AssemblingConveyor<>();

conveyor.setBuilderSupplier(PersonBuilder::new);
LabeledValueConsumer<String, ?, PersonBuilder > lvc = (l, v, b) -> throw new RuntimeException("Unknown label " + l);
conveyor.setDefaultCartConsumer(lvc
                                   .<String>when("FirstName", PersonBuilder::setFirstName)
                                   .<String>when("LastName", PersonBuilder::setLastName)
                                   .<Date>when("DateOfBirth", PersonBuilder::setDateOfBirth)
);

First method works well if you have small number of Building Parts or when number of different labels is unknown and there are use cases for that. They will be discussed later.

Third method - use Smart Labels.

Smart Label is any class that implements the SmartLabel interface.

Let's illustrate how to use the Smart Labels.

public enum  PersonBuilderLabel implements SmartLabel<PersonBuilder> {
	FIRST_NAME(PersonBuilder::setFirstName),
	LAST_NAME(PersonBuilder::setLastName),
	DATE_OF_BIRTH(PersonBuilder::setDateOfBirth);

	BiConsumer<PersonBuilder, Object> setter;

	<T> PersonBuilderLabel(BiConsumer<PersonBuilder,T> setter) {
		this.setter = (BiConsumer<PersonBuilder, Object>) setter;
	}
	@Override
	public BiConsumer< PersonBuilder, Object> getSetter() {
		return setter;
	}
}

We created a really simple enum class which is very easy to use, but each label now contains an exact method that has to be applied to the object marked with this label. You do not always have to implement Smart Labels as enums, but enums obviously have a lot of benefits. You do not have to create new instance every time, they easy to use and naturally immutable. More details about SmartLabel interface usage can be found in this article

Now we can use this type in conveyor declaration. No need to define any cart consumers in conveyor configuration.

Readiness Evaluation

3 - Tell the conveyor when the product is ready to be built. Each time a new portion of data arrives, the conveyor should decide if the Product ready to be built or the Builder should continue to wait. We have to give it a hint. We can assume that readiness evaluation can rely either on information about already collected data, or on data stored in the builder itself. The algorithm we inject into the conveyor is an instance of Java 8 BiPredicate. BiPredicate takes two parameters and returns a boolean value. Where true means that the builder can finally build the Product and false that it should keep waiting.

First Method - Let conveyor hold the readiness algorithm for all Builders

conveyor.setReadinessEvaluator((state, builder) -> {
	return state.previouslyAccepted == 3;
});

First parameter is State. This class keeps some basic information about the build progress. Second parameter is builder and in this example it is not used at all. Use either or both of them to make a decision. In most case answer will be as simple as in example above. It means that State's counter received 3 building parts and that is exactly what we expect.

There are some helping pre-defined scenarios for popular use cases.

Ready when received three values. Identical to the previous example.

conveyor.setReadinessEvaluator(
   Conveyor.getTesterFor(conveyor).accepted(3);
);

Ready when received three values with the following labels

conveyor.setReadinessEvaluator(
   Conveyor.getTesterFor(conveyor).accepted("setFirstName","setLastName","setDateOfBirth");
);

ReadinessTester class implements the BiPredicate FUnctional interface, so you can build pretty sophisticated readiness logic purely functionally.

Second method - Implement readiness predicate as a part of the Builder code

This method is obviously more flexible. It can be finely tuned for each individual instance of the Builder. All you need to do when create your Builder is add

implements ..., Testing, ...

or

implements ..., TestingState, ...

The difference between Testing and TestingState is that the second one provides access to the State parameter. This is it. Conveyor will automatically discover when Builder implements one of those interfaces and give it a priority over readiness evaluator set by conveyor.setReadinessEvaluator.

Futures

Conveyor provides you access to futures of every step of your build life span, starting from build creation, delivering of every piece of data, and finally, to the Product itself.

Consumers

You can read more about Result and Scrap consumers in detailed article: https://github.com/aegisql/conveyor/wiki/Consumers Below is a brief illustration of the Consumer ideas.

Result Consumer

4 - Tell the conveyor what to do with the result Just provide a ResultConsumer

conveyor.resultConsumer(result->{
    System.out.println(result.product);
}).set();

result is an instance of ProductBin<K,OUT> class. Which holds the Product itself, as well as its Key, remaining TTL and status. In this example we just print the result, but in your real application this will be the place where the Product meets its destination. Note that if Result Consumer takes a lot of time to process the Product, it will hold all other builds in the queue. If this happens, try to keep the Product consumer simple. Just put the Product in some balancing output queue and process data in some other place, using several parallel threads or services.

Scrap Consumer

5 - Tell the conveyor what to do if something go's wrong

If builder threw an exception or timed out, or data arrived too late, you can tell the conveyor to do something with those unstructured and incomplete data. They will be placed into a ScrapBin object with context explanation, and then a Scrap Consumer code will be called.

conveyor.scrapConsumer((scrapBin)->{
	System.err.println(scrapBin);
}).set();

It is probably a good idea to at least log the rejected data. So, default Scrap Consumer puts Scrap bin into the conveyor's log.

Time control

6 - Control data expiration

Framework provides several methods for full support of timing out and finalizing builds. There ate two common approaches to manage expiration time:

  • You know exact time in the future, after which you no longer interested in data or need some default action.
  • You know acceptable duration of the Build process, relative to the present time.

Obviously, both approaches can be used interchangeably and it is only a matter of convenience which one to use.

You should think about what should be a source of TTL or expiration time. The options are:

  • Let framework apply reasonable TTL to all builds.
  • Let the builder decide when to expire.
  • TTL or expiration time can be a part of the Building Part context. Build duration can be set when building part created, not when it's actual processing started.

Precision of timer, however, is not guaranteed. Queue is checked for expired builders every time it processes next Building Part. You also should tell the conveyor how often it should check for builder expiration when conveyor is idle (i.e. receiving very low traffic with significant intervals between messages).

conveyor.setIdleHeartBeat(500, TimeUnit.MILLISECONDS);

If you care about clock precision the Expiration Collection Idle Interval should not exceed the Timeout interval for your data. If you do not set the Expiration Collection Idle Interval, idle conveyor will never check for expired builds. Default Heart beat is set to 1 second.

With regular traffic and building processes not locking the conveyor for long, timer precision can be reasonably high and suitable for most practical use cases.

Let's look close to those methods.

First method - let conveyor control expiration time.

conveyor.setDefaultBuilderTimeout(1, TimeUnit.SECONDS);

The Easiest method. If after one second the builder is still not ready (as it specified by the readiness algorithm described above), then the builder will be taken out of the conveyor and sent to the Scrap Consumer, or to the ProductConsumer if it is ready.

There is also an option to give the Builder last chance to produce a valid Product. It is sometimes possible that valid Product can be build even when some (or even all) Building Parts were not delivered in time. Delivering of product with missing or default parts can be better than delivering nothing.

conveyor.setOnTimeoutAction((builder)->{
        // We only care about mandatory parts
	if(builder.mandatoryPart != null) {
           builder.ready = true;
        }
});

When timeout happens, conveyor will call the OnTimeoutAction algorithm and immediately after that the Readiness Evaluator. If readiness evaluator return "true" - then the conveyor will build the Product and call the Product Consumer. Otherwise, builder will be evicted and send to the Scrap Consumer with TIME_OUT status.

Second Method - Implement the Expireable interface in the Builder class. If builder implements the Expireable interface this has higher priority than timeout parameters you pass to the conveyor. You can also implement the TimeoutAction interface to make some last moment decisions. Usually in this case Builder should also implement the Test or TestState interface. There ais a small abstract class CommonBuilder you can extend. It suggests a family of constructors, expiration logic and test method.

Note: Value returned by the getExpirationTime() method will be used only once, when the Builder is created and initialized. If you want to change your expiration time, you must explicitly enable both expiration postpone and expiration postpone in timeout block.

Third Method - Building parts can be rejected if they just too old. Previously described methods set Time To Live relative to the moment when the builder was created. However, this could be too late. Data can be accumulated in some message queue for a long tome. When data acquisition is restored it can be too late for the data. It will be just wasting CPU resources and memory to process them. Conveyor provides a method that allows to control expiration base on Building Part creation timestamp.

conveyor.rejectUnexpireableCartsOlderThan(5,TimeUnit.SECONDS);

In this examples, all Building Parts created more than 5 seconds ago will be quickly drained, using Scrap Consumer algorithm.

Fourth Method - Keep expiration data with building parts - see next section for details.

Prolonging TTL By default, conveyor will configure expiration time only once, when the Builder is created. However, you can activate mode in which conveyor can postpone its expiration time after processing of a message. To enable Expiration postpone mode call:

conveyor.enablePostponeExpiration(true);
//In some cases, you may decide to make a decision to cancel timeout right in the OnTimeout method
//You must be careful when enable this feature
//And know why you doing that.
conveyor.enablePostponeExpirationOnTimeout(true);

Special mode required for better performance, when change of TTL is not necessary.

And again, you have three ways to prolong TTL of your builds, from lower to higher priority:

  • Use default TTL increment:
conveyor.setExpirationPostponeTime(100, TimeUnit.MILLISECONDS);
  • Take new expiration time from a cart, if available. Cart must be created explicitly with TTL or Expiration Time. Conveyor will use the longest TTL.
  • When Builder implements the Expireable interface. In this case, developer should implement logic of expiration time changes himself. Based on requirements of the project.

Possible use cases for expandable TTL: Cache, where inactive factories removed, while active factories stay in the cache. Auction, add some time, so that other participant can make their own bids.

Sending Data

7 - Start sending data to the conveyor

Data Loaders

The Conveyor interface has several methods to send data to the conveyor:

Basic method which is used to send all types of messages is

public <V> CompletableFuture<Boolean> place(Cart<K,V,L> cart);

Cart<K,V,L> is a container that holds together key, label, value and timing for each message. In most cases you do not have to work with this method, and know all details about Carts. It can be useful though, if you would like to create your own, extended version of Conveyor, based on existing Conveyor classes.

Structure of a cart

More convenient way to send messages to conveyors provided by Loaders.

public <X> PartLoader<K, L, X, OUT, Boolean> part();

//add management command bypassing main input queue
public <V> CompletableFuture<Boolean> command(GeneralCommand<K, V> command);
public CommandLoader<K, OUT> command();

//createBuild interfaces create instances of builder, but do not apply any data to it.
//Instead, it takes as a parameter a supplier for the builder. If this supplier is present
//it will be used instead of default supplier.
//This allows ultimately customize builder initialization at any moment.
public BuilderLoader<K, OUT, Boolean> build();

Loaders are immutable and safe to use. If you need to hide the conveyor instance from the client software, you can use a loader instead. Loaders have right enough functionality to serve common user needs.

Note return type of the place/create methods of Loaders - CompletableFuture<Boolean>. CompletableFuture allows to trace message until it is accepted by the builder, or rejected by any reason. Returning true means cart has been accepted, false or exception - rejected. Check Java documentation for details. CompletableFuture is extremely powerful tool to build asynchronous application, and now it is available for Conveyor developers! Warning: do not over-use this mechanism. Checking future for every cart will, in fact, destroy all benefits of multi-threading.

/**
 * Creates a CompletableFuture for the OUT product and sends to the Conveyor using standard Cart message
 * When processed, Building site will register the Future in its future collection.
 * getFuture can be called multiple times from different threads. Each time new future will be created
 * but at the end all won them will get their completion or other appropriate status.
 */
public FutureLoader<K, OUT> future();

CompletableFuture, as expected, provides synchronized access to the final product.


So, to send data to the conveyor you need at least three things.

  • Key to identify the unique build
  • Label to identify building part
  • Building part object

Simple example

conveyor.part().id(123).label("FirstName").value("John").place();

Optional, but important parameter is expiration time. Expiration time can be defined as a timestamp in milliseconds of the certain moment in the future. Since calculating such timestamp is not always convenient, conveyor supports additional API that allows to define expiration time in more appropriate to the task units. Instant, TimeUnit or Duration.

conveyor.part().id(123).label("FirstName").value("John").ttl(1,TimeUnit.SECONDS).place();

The following examples illustrates different methods to set the same expiration time:

//this object is unexpireable
PartLoader loader = conveyor.part().id(123).label("FirstName").value("John");
long current = loader.creationTime;
//Adding one second expiration time
PartLoader loader1 = loader.expirationTime(current + 1000);
PartLoader loader2 = loader.expirationTime(Instant.ofEpochMilli(current+1000));

//The next two loaders will have same expiration time
//because the inherit creation time from their parent! 
PartLoader loader3 = loader.ttl(1000, TimeUnit.MILLISECONDS);
PartLoader loader4 = loader.ttl(Duration.ofMillis(1000));

You can use any methods of the Conveyor interface, but you should understand that conveyor accepts building parts wrapped into a container implementing Cart interface.

You can instantiate Cart directly and use the place(Cart<K,V,L> cart) method. You can even use your own implementation of the Cart. All provided implementations are Serializable. But, if you would like to use this in your code, remember that Keys, Labels and Values must be Serializable as well.

Framework provides several versions of the Cart. Most common is the ShoppingCart It is used when you call the place() method of the Part and MultiPart loaders.

There are some special types of cart

7.1 - Creating Cart

Above we described how you set the Builder Supplier. If Builder Supplier is available then new Builder will be created each time conveyor detects a new key. You can chose to create new instance of builder without any building parts, before they actually start arriving.

You can use the CreatingCart It is very similar to the ShoppingCart. The difference is - it's value must be the Builder Supplier.

CreatingCart is sent when you call methods create or createFuture of the BuildLoader

CompletableFuture<Boolean> createFuture = 
    conveyor.build().id(1).supplier(UserBuilder::new).create();
//OR
CompletableFuture<User> productFuture = 
    conveyor.build().id("1").supplier(UserBuilder::new).createFuture();

7.2 - Commands

In some cases you can decide to bypass the main input queue. Conveyor interface provides a method

public <V> CompletableFuture<Boolean> command(GeneralCommand<K, V> command);

Again, you do not have to use the GeneralCommand (Which is a Cart itself), there is a CommandLoader that should be used instead.

The following commands are available:

  • cancel -Immediately remove existing builder and drain unfinished data.
  • reschedule - Changes expiration time for the key which is still in waiting state.
  • timeout - Removes builder from processing, but try to apply onTimeout action as described above, if available. Otherwise works as CancelCommand
  • create - same as CreatingCart, but bypassing the main input queue. This command is not supported in multi-key version
  • check - Returns a CompletableFuture which returns true if build exists and false if not. This command is not supported in multi-key version
conveyor.command().id(1).cancel();

7.3 - Future Cart Get Product future for already running build. Be careful, as any other message it can create new build if you have specified the default supplier.

CompletableFuture<User> uf1 = conveyor.future().id("1").get();

7.4 - Multi Key values Part and Command loaders provide some multi-key operations. It means, that data you sent to the conveyor can be applied to multiple builds. Instead of using the id() method of loaders you should use foreach() or foreach(predicate). Method without parameter will apply data to all active builds, the second one will apply data to builds with IDs matching the provided filtering predicate.

conveyor.part().foreach().label(MIN_BID).value(100.00).place();
//OR for all keys > 1000000
conveyor.part().foreach( k -> k > 1000000 ).label(MIN_BID).value(100.00).place();

7.5 - Static values Some values can be treated as constant values, same for all build on a long time intervals, e.g. until they changed in global configuration of your app. In this case, you can keep values for such labels statically. It means, they will be stored on the conveyor level and will be applied to all new builds when they created. But, you still can send normal messages with corresponding labels, if you need to override static defaults.

conveyor.staticPart().label(MAX_BID).value(10000.00).place();

\\Then later
conveyor.part().id("VIP_USER").label(MAX_BID).value(20000.00).place();

Properties

Consider the following use case. You need to build a report and send it by e-mail to the customer. Report has the following format:

Date: {date} 
Dear {prefix} {lastName},
You spent ${spent} since last report 
Remaining balance on your account is ${balance}

User Info, such as Last Name, Gender and E-MAil, can be found the Users database, financial reporting can be requested from the billing system. Both use unique user ID to access information. So, userID (Integer) is a great candidate for our build key. Report itself is a simple text, so, we going to build a String class. Required fields are: Date (Current timestamp on the billing DB side), Prefix (Mr. or Mrs., depending on customers gender field), lastName, spent and balance. This gives us a list of labels and builder API.

ResultConsumer for our build will be an input Queue of the Mailing client.

But wait, where is our E-Mail? Mail Client requires it, but, there is no E-Mail in our report. It's a single String. We could change our product class, so that it can hold both e-mail and report Strings, but, this can't be called an elegant solution, needless to say, it requires more coding from you, and makes whole code less clear, compromising the whole idea of the Builder pattern.

In fact, here we face a situation, where we need to deliver some piece information not to the Builder/Product per se, but to something that happens after the build. To achieve this, you can use Properties

conveyor.resultConsumer( bin->{
    emailClient.sendEmail(  bin.properties.get("E_MAIL"), bin.product )
}).set();
....
....
conveyor.part()
    .id(1)
    .label("lastName")
    .addProperty("E_MAIL",email)
    .value(lastName)
    .place();

Property with name "E_MAIL" will be stored with the build, until its done, and then will be passed to Consumers.

You can add as many properties as you need. Just keep in mind, they stored in a Map, thus later property with the same name will override the previous one. Property can be of any class.

Configuration Files

This is a short introduction. Check detailed Configuration File Documentation

Include configurator dependency into your project pom.xml

<dependency>
  <groupId>com.aegisql</groupId>
  <artifactId>conveyor-configurator</artifactId>
  <version>1.6.6</version>
</dependency>

Now you can create and configure conveyors without writing a single line of code in Java

Example: First, add to your property file lines. e.g. file name is application.yml

conveyor:
  my_conveyor:
    builderSupplier:  new org.myproject.product.ProductBuilder();
    defaultBuilderTimeout:  1 SECONDS
    firstResultConsumer:  new com.aegisql.conveyor.consumers.result.LogResult()
    firstScrapConsumer:  new com.aegisql.conveyor.consumers.scrap.LogScrap()
    readyWhenAccepted:  
      - com.aegisql.conveyor.config.harness.NameLabel.FIRST
      - com.aegisql.conveyor.config.harness.NameLabel.LAST
    persistence:
      derby.my_conveyor.parts:
        keyClass: java.lang.Integer
        archiveStrategy:
          path: ./
          maxFileSize: 20KB
          bucketSize: 50
          zip: true

Then, use this code to initialize conveyors

ConveyorConfiguration.build("classpath:application.yml");

Now, from any place of your code,

Conveyor<Integer,String,String> myConveyor = Conveyor.byName("my_conveyor");
...

Configurator supports properties and YAML files. More Detailed Documentation.

Some use cases

Utility classes can be used as is, or be a template for your own implementation. They created to demonstrate wide variety of possible applications.

Scalars

If readiness evaluation algorithm always return true

(state,builder) -> true;

then the builder will produce results after processing first Cart with data. It can be useful in some cases. For example, input can be a very bulky and complex structured document, like XML or JSON, but it actually contains all necessary data to build the Product. Second use case - competitive algorithms or services. The faster wins. For the second case you should use CreatingCart to initialize the builder and do not set the Builder Supplier in the Conveyor. This will guarantee that only first message will be processed, an the second will be sent to the Scrap consumer. The Scalar package helps to make those tasks easier.

Scalar

Example with a simple Comma Separated Value (CSV) line parser:

static class StringToUserBuilder extends ScalarConvertingBuilder<String, Person > {
	@Override
	public Person get() {
		DateFormat df = new SimpleDateFormat ("yyyy-MM-dd");
		String[] fields = scalar.split(",");
		return new Person(fields[0], fields[1], df.parse(fields[2]));
	}
	...
}
...
ScalarConvertingConveyor<String, String, Person> sc = new ScalarConvertingConveyor<>();
sc.setBuilderSupplier(StringToUserBuilder::new);
AtomicReference<Person> usr = new AtomicReference<>(null);
sc.setResultConsumer(u->{
	usr.set(u.product);
});
String csv = "John,Dow,1990-06-21";
sc.part().id("test").value(csv).place();
//Note that Scalar part loader is pre-labeled for you!
//You do not have to define any label

Collections

Requires two types of labels. One for collection values. And one label for collection completion. Avoid using "magic values", including null. Remember that in Java null is a valid value for most common collections.

Collection
CollectionConveyor<Integer,Integer> b = new CollectionConveyor<>();

b.setBuilderSupplier( () -> new CollectionBuilder<>(100, TimeUnit.MILLISECONDS) );
b.setScrapConsumer((obj)->{
	System.out.println(obj);
});
b.setResultConsumer((list)->{
	System.out.println(list);
});
b.setIdleHeartBeat(100, TimeUnit.MILLISECONDS);
for(int i = 0; i < 100; i++) {
	b.part().id(1).value(i).place();
}
b.part().id(1).label(b.COMPLETE).place();

Maps

Map Conveyor uses labels as keys to keep values in a map. Useful when you have an open list of possible labels. Label class should meet expectations of the Map implementation you use in the Builder. Default is HashMap, but you can specify any other Map implementation using appropriate MapBuilder constructor

MapConveyor<Integer, String, String> mc = new MapConveyor<>();
		
mc.setBuilderSupplier( ()-> new MapBuilder<>() );
		
mc.setResultConsumer(bin->{
	System.out.println(bin);
});
		
PartLoader<Integer, String,?,?,?> pl = mc.part().id(1);
pl.label("FIRST").value("ONE").place();
pl.label("SECOND").value("TWO").place();
pl.label("THIRD").value("THREE").place();
CompletableFuture<Boolean> last = (CompletableFuture<Boolean>) pl.label(null).value(null).place();
last.get();

Result of this code is Map<String,String>

Batches

Interesting use case. Similar to Collection Conveyor, but it does not need a termination cart. Readiness algorithm returns true after collecting certain number of values or on timeout. Batch

BatchConveyor<Integer> b = new BatchConveyor<>();
		
b.setBuilderSupplier( () -> new BatchCollectingBuilder<>(10, 10, TimeUnit.MILLISECONDS) );
b.setScrapConsumer((obj)->{
	System.out.println(obj);
});
b.setResultConsumer((list)->{
	System.out.println(list);
});
b.setIdleHeartBeat(100, TimeUnit.MILLISECONDS);
for(int i = 0; i < 102; i++) {
	b.part().value(i).place();
}

In this example, 10 batches of 10 values will be printed, and the last two elements will be also processed after timeout. So, no values will be lost. Use case: collect more data before writing them into a database or a file.

Chaining conveyors

Converts output of the first conveyor to a shopping cart of the second conveyor.

Caching

Very interesting, and a little unexpected aspect - caching. If readiness algorithm always return false

(state,builder) -> false;

then, builder can be evicted only by timeout (or never, if timeout not set). AssemblingConveyor does not provide any access to the builder, so, we have to extend its functionality. CachingConveyor instead of providing access to builder, which can compromise data integrity, provides access to the Supplier of the Product.

public Supplier<? extends OUT> getProductSupplier(K key)

Then, you can obtain the Product by calling Supplier's get() method. Though it is not required, it is good idea to return a new copy of the Product each time.

This allows to solve several problems:

  • Builder cannot be accessed directly and its data cannot be modified. You still need to use conveyor interfaces and carts to modify data
  • Supplier life span is the same as the life span of the builder. If builder expires - supplier expires too. Instead of Product getProductSupplier() will throw an exception. In this can you can try to retrieve a new supplier.
  • Unlike common caches, CachingConveyor allows to update building parts separately, using additional logic, like data validation.
  • Provides much more flexible and precise expiration policy, which can be, as you already know, controlled by input carts, conveyor defaults and builder itself.
  • Synchronization only when and where it is required.
  • Naturally suggests using immutable Products.

Some notes: If you decide to use the Building Framework as a caching tool and keep hundreds of thousands or millions of records in it, you should pay more attention to the Key class. In addition to hashCode and equals it ought to implement the Comparable interface - for better mapping. Java Strings and Integer types already implement the Comparable interface. If you use those classes as keys - you good.

Delay Line

DelayLineConveyor Inserts programmable delay before result released. Can be useful for modeling data sources with different delay behavior. For example, allows to implement the Sleep Sort algorithm

	List<Integer> res = new ArrayList<>();
	
	DelayLineConveyor<Integer, Integer> c = new DelayLineConveyor<>();
	c.setResultConsumer(bin->{
		System.out.println("++ "+bin);
		res.add(bin.product);
	});
	c.setScrapConsumer(bin->{
		System.out.println("-- "+bin);
	});
	c.setIdleHeartBeat(50, TimeUnit.MILLISECONDS);
	
	c.part().id(1).value(1).ttl(Duration.ofMillis(11)).place();
	c.part().id(4).value(4).ttl(Duration.ofMillis(14)).place();
	c.part().id(2).value(2).ttl(Duration.ofMillis(12)).place();
	c.part().id(3).value(3).ttl(Duration.ofMillis(13)).place();
	c.part().id(5).value(5).ttl(Duration.ofMillis(15)).place();

This example will order numbers incrementally in the result array list.

Performance optimization

ParallelConveyor launches several instances of the Conveyor. Each in its own thread. This allows to scale building on multi-core computers in cases when building takes significant time and causes significant growth of the input queue.

You need to include this dependency in your project:

<dependency>
  <groupId>com.aegisql</groupId>
  <artifactId>conveyor-parallel</artifactId>
  <version>1.6.6</version>
</dependency>

ParallelConveyor is an abstract class and has three implementations, balanced differently. ParallelConveyor implements the Conveyor interface, as the AssemblingConveyor does. There are few additional methods providing access to properties of individual conveyors.

public int getNumberOfConveyors() 
public int getCollectorSize(int idx)
public int getInputQueueSize(int idx)
public int getDelayedQueueSize(int idx) 
public boolean isRunning(int idx)

There are two methods that can be used to scale performance. Scaling by key and scaling by label (and combination of both if necessary)

Scaling by key

KBalancedParallelConveyor This is a simple method. KBalancedParallelConveyor takes a key from a cart and decides to which internal conveyor it should be sent. For that it uses a balancing function.

cart -> { 
   int index = (key.hashCode() & 0x0fffffff) % pf;
   return this.conveyors.subList(index, index+1);
}

KBalancedParallelConveyor has two constructor. One with number of parallel AssemblingConveyors to be launched, the second with additional conveyor supplier, in case you need to run in parallel any different implementation of the Conveyor interface.

Parallel Conveyor

Scaling by Label

LBalancedParallelConveyor Consider the following scenario. Some external process initiates many events of certain type (A). Processing each of them takes a lot of CPU time, so they all stay in the input queue. Then, the same, or other process starts sending events of second type (B). Receiving B should complete the build. But, because processing of event A takes a lot of time, input queue is full of A events, waiting for their turn. So, conveyor will not start processing B events until all A events are processed. This problem can be only partially solved by simple increasing of number of conveyors, as described above.

How to solve the problem? Well, we can create two separate conveyors. Send all events A to the first, and all events B to the second. Then, we should change readiness algorithm for both of them. If original readiness algorithm returned true when both A and B events were processed, then readiness algorithm of the First should return true when A is ready, ignoring B, and readines algorithm of the Second should check B and ignore A

///Before
builder -> {builder.a!=null && builder.b!=null}

//After
//First
builder -> {builder.a!=null}

//Second
builder -> {builder.b!=null}

As a result, two new conveyors will start producing incomplete Product. With missing corresponding parts (assuming your builder allows this. If not, it should be fixed first)

Next, we can wrap partial results in new carts with labels A-PARTIAL and B-PARTIAL and send both to a third conveyor. It's builder should be able to merge two partial products into one final product.

Parallel Conveyor

LBalancedParallelConveyor has an API to simplify this task. But it still requires some planning and thinking ahead.

It is easier to start with creating a single Conveyor with all necessary configurations, and then start creating more conveyors, using the first one as a prototype.

What you need to do:

  • Decide how exactly labels should be split among conveyors. Write those labels down.
  • Check that your builder can build the Product with incomplete data.
  • Check that Product's interfaces have access to it's internal parts or allow merging several partial products in one complete Product.
  • Create new labels that will be used to hold your partial results. One new label per conveyor sounds like sufficient. Give them clear names, describing what exactly they for.
  • Specify readiness evaluation algorithm for each of the new conveyors.
  • Write Builder code that will merge partial Products.
  • Think which instance of conveyors will be a destination point for partial Products. It can be a separate conveyor designated to this only task, or you can reuse a less loaded conveyor.

Note that in some cases labels can be Overlapped. In this case same cart will be delivered to all designated conveyors. Obvious use case for that is "creating cart". Its only job is to create a new instance of the Builder and it should be created in all Conveyors.

Now some examples. Familiar classes UserBuilder which builds User Lets split conveyor in three parallel tasks:

  1. Conveyor to process labels UserBuilderEvents.SET_FIRST and UserBuilderEvents.SET_LAST
  2. Conveyor to process label UserBuilderEvents.SET_YEAR
  3. Conveyor to merge both
  4. Create labels and merging code UserBuilderEvents.MERGE_A and UserBuilderEvents.MERGE_B
// Add two new Smart Labels to support partial results
...
MERGE_A(UserBuilder::mergeChannelA),
MERGE_B(UserBuilder::mergeChannelB),
...
// Code in the UserBuilder class applying partial results
public static void mergeChannelA(UserBuilder builder, User user) {
	builder.first = user.getFirst();
	builder.last  = user.getLast();
}
public static void mergeChannelB(UserBuilder builder, User user) {
	builder.yearOfBirth = user.getYearOfBirth();
}

We start with already familiar Assembling Conveyor configuration

AtomicReference<User> user = new AtomicReference<User>(null);
AssemblingConveyor<Integer, UserBuilderEvents, User> ac = new AssemblingConveyor<>();
ac.setName("main");
ac.setBuilderSupplier(UserBuilder::new);
ac.setDefaultBuilderTimeout(50, TimeUnit.MILLISECONDS);
ac.setScrapConsumer(bin->{
	System.out.println("rejected: "+bin);
});
ac.setResultConsumer(bin->{
	System.out.println("AC result: "+bin);
	user.set(bin.product);
});
//This is a full readiness evaluator
//Requires that all three parts were available
ac.setReadinessEvaluator(b->{
	UserBuilder ub = (UserBuilder)b;
	return ub.first != null && ub.last != null && ub.yearOfBirth != null;
});

Now, we have to create a new instances of Conveyor, using the first one as a prototype. Of course, we can do it individually, as we just did above, but cloning from a prototype will create a copy which already has all configurations we just did (and yes, you still can change any of them in a copy)

//1. Clone original conveyor using the detach() method
AssemblingConveyor<Integer, UserBuilderEvents, User> ch1 = ac.detach();
//2. Tell new conveyor how to label partial result and where to send it
ch1.forwardPartialResultTo(UserBuilderEvents.MERGE_A, ac);
//3. Readiness evaluator requires that only 
//first and last name must present
ch1.setReadinessEvaluator(b->{
	UserBuilder ub = (UserBuilder)b;
	return ub.first != null && ub.last != null;
});
//4. Give conveyor a distinctive name that will help to trace events
ch1.setName("CH1");

//1. Clone original conveyor using the detach() method
AssemblingConveyor<Integer, UserBuilderEvents, User> ch2 = ac.detach();
//2. Tell new conveyor how to label partial result and where to send it
ch2.forwardPartialResultTo(UserBuilderEvents.MERGE_B, ac);
//3. Readiness evaluator requires that only yearOfBirth must present
ch2.setReadinessEvaluator(b->{
	UserBuilder ub = (UserBuilder)b;
	return ub.yearOfBirth != null;
});
//4. Give conveyor a distinctive name that will help to trace events
ch2.setName("CH2");

Tell each conveyor which labels it can accept. Labels can overlap, if you want to deliver some values to all conveyors.

ac.acceptLabels(UserBuilderEvents.MERGE_A,UserBuilderEvents.MERGE_B);
ch1.acceptLabels(UserBuilderEvents.SET_FIRST,UserBuilderEvents.SET_LAST);
ch2.acceptLabels(UserBuilderEvents.SET_YEAR);

After this step any attempt to send UserBuilderEvents.SET_FIRST to ch2 or to ac will fail!!!

Finally, lets create a LBalancedParallelConveyor and use it, as usual

Conveyor<Integer, UserBuilderEvents, User> pc = new LBalancedParallelConveyor<>(ac,ch1,ch2);

PartLoader<Integer,UserBuilderEvents,?,?,?> loader = 
   pc.part().id(1).ttl(100,TimeUnit.MILLISECONDS);

loader.label(UserBuilderEvents.SET_FIRST).value("John").place();
loader.label(UserBuilderEvents.SET_LAST).value("Silver").place();
loader.label(UserBuilderEvents.SET_YEAR).value(1695).place();

...

Method

public void acceptLabels(L... labels) 

plays an important role. If you should know some details of how it works.

  • Conveyor sets internal filter, rejecting all other labels.
  • method isLBalanced() returns true.

When you create new LBalancedParallelConveyor it checks this flag for all internal it's internal conveyors. TBC

You can call acceptLabels several times, if you need. Once added labels cannot be deleted.

Even more scaling

You can combine K an L balanced conveyors. In example above any of Assembling Conveyors can be replaced by a KBalancedParallelConveyor with corresponding parallelism factor.

Balancing By Methadata

There is a new ParallelConveyor available since 1.4.2 - PBalancedParalelConveyor. Its balancing method is based on methadata extracted from Cart properties. Thus, "P" is for "Properties". It makes easier to implement such things as versioning, A-B testing, etc. It requires, that all carts sent to conveyors MUST have same set of defined properties. To make a decision to which conveyor data should be sent, each conveyor has to be configured with corresponding set of predicates. As soon as all predicates will match values extracted from cart properties, cart will be immediately passed to that conveyor. So, set of predicates must provide single value for any practically possible set of properties. Consistency of balancing decision is not validated, and must be evaluated bu the developer. Example:

Two versions of demo service produce strings, concatenated from two pieces with a delimiter character, provided by configuration

// Abstract implementation
abstract class AbstractConcatBuilder implements Supplier<String> {
		String del = "";
		String first;
		String second;
}

Version 1: Joins first with second

class StringConcatBuilder1 extends AbstractConcatBuilder { 
    @Override
    public String get() {
	    return first+del+second; 
    }
}

Version 2: Joins second with first

class StringConcatBuilder1 extends AbstractConcatBuilder { 
    @Override
    public String get() { 
        return second+del+first; 
    }
}

A-B testing will be in applying different delimiters: A - space " "; B - hyphen "-"

// place results here
ResultMap<Integer, String> results = new ResultMap<>();
//V1,A
SimpleConveyor<Integer, String> c1 = new SimpleConveyor<>(StringConcatBuilder1::new);
//V1,B
SimpleConveyor<Integer, String> c2 = new SimpleConveyor<>(StringConcatBuilder2::new);
//V2,A
SimpleConveyor<Integer, String> c3 = new SimpleConveyor<>(StringConcatBuilder1::new);
//V2,B
SimpleConveyor<Integer, String> c4 = new SimpleConveyor<>(StringConcatBuilder2::new);
//assign predicates
ConveyorAcceptor<Integer, String, String> t1 = new ConveyorAcceptor<>(c1);
t1.expectsValue("version", 1).expectsValue("abtest", "A");
ConveyorAcceptor<Integer, String, String> t2 = new ConveyorAcceptor<>(c2);
t2.expectsValue("version", 2).expectsValue("abtest", "A");
ConveyorAcceptor<Integer, String, String> t3 = new ConveyorAcceptor<>(c3);
t3.expectsValue("version", 1).expectsValue("abtest", "B");
ConveyorAcceptor<Integer, String, String> t4 = new ConveyorAcceptor<>(c4);
t4.expectsValue("version", 2).expectsValue("abtest", "B");
// wrap conveyors with PBalancedParallelConveyor
PBalancedParallelConveyor<Integer, String, String> pbc = new PBalancedParallelConveyor<>(t1,t2,t3,t4);
pbc.setName("testPConveyorDouble");
pbc.setReadinessEvaluator(Conveyor.getTesterFor(pbc).accepted("first", "second"));
pbc.resultConsumer(results).set();
// Obtain and set up loaders
StaticPartLoader<String> delLoader = pbc.staticPart().label("del");
PartLoader<Integer, String> v1Loader = pbc.part().addProperty("version", 1);
PartLoader<Integer, String> v2Loader = pbc.part().addProperty("version", 2);
// load constants
delLoader.addProperty("version", 1).addProperty("abtest","A").value(" ").place();
delLoader.addProperty("version", 1).addProperty("abtest","B").value("-").place();
delLoader.addProperty("version", 2).addProperty("abtest","A").value(" ").place();
delLoader.addProperty("version", 2).addProperty("abtest","B").value("-").place();
// load data and metadata
v1Loader.id(1).label("first").addProperty("abtest","A").value("A").place();
v2Loader.id(2).label("first").addProperty("abtest","A").value("X").place();
v1Loader.id(1).label("second").addProperty("abtest","A").value("B").place();
v2Loader.id(2).label("second").addProperty("abtest","A").value("Y").place();

v1Loader.id(3).label("first").addProperty("abtest","B").value("W").place();
v2Loader.id(4).label("first").addProperty("abtest","B").value("R").place();
v1Loader.id(3).label("second").addProperty("abtest","B").value("S").place();
v2Loader.id(4).label("second").addProperty("abtest","B").value("T").place();
// wait and stop
pbc.completeAndStop().join();
System.out.println(results);

Priority management

To enable message priority support you must create AssemblingConveyor with the Priority Queue supplier. corresponding methods priority(long priority) were added to loaders.

Conveyor<Integer,String,String> c = new AssemblingConveyor<>(BlockingPriorityQueue::new));
...
c.part().id(1).label("X").value("Y").priority(100).place();

Please note that setting priority will not guarantee, that certain message will be processed in front of another message. It affects only positioning of carts in the balancing input queue. Input queue can be empty most of the time, in this case, behavior of the conveyor will be undistinguishable from any non priority queue. But, if you have serious data accumulation in the input queue, e.g. you processing many slow messages, then setting up priority will help to place some carts before another and improve conveyor latency time.

Carts with same priority will be ordered by their actual creation timestamp.

Persistence

Current Persistent is backed up by a JDBC supporting database.

Pre-initialized persistence builders provided for

  • derby
  • derby-client
  • derby-memory
  • sqlite
  • sqlite-memory
  • mysql
  • mariadb
  • postgres

Other configurations can be created using provided API. Please contact the author if you are interested in any particular database. We'll try to include it into one of the next releases.

You have to include these dependencies in your project:

<dependency>
  <groupId>com.aegisql.persistence</groupId>
  <artifactId>conveyor-persistence-core</artifactId>
  <version>1.6.6</version>
</dependency>

<dependency>
  <groupId>com.aegisql.persistence</groupId>
  <artifactId>conveyor-persistence-jdbc</artifactId>
  <version>1.6.6</version>
</dependency>

You also must provide dependency for the JDBS driver for the database you have chosen.

Features

  • Works with all existing Conveyors
  • Data stored in the persistence as byte arrays (BLOB). If no data converter defined, data expected to be Serializable
  • Effective converters provoded for all common Java classes, such as Strings, Numbers, arrays, Collections and Maps of common classes.
  • User can provide his own data converters for complex data structures.
  • Automatic and manual Acknowledge process
  • Unload build and free memory - for builds with significant time intervals between building parts
  • Optional encryption of stored data
  • Choice of algorithms to archive building part records after the build is complete.
  • Automatic re-play
  • Flexible storage configuration

Limitations

  • Although there is no limitations on Classes that can be set as Cart properties, In Persistence they Converted to JSON strings. For successful conversion values must satisfy requirements of the Jackson library.
  • Futures are not persisted. You can retrieve new futures if you know build IDs.
  • Commands are not persisted. (may change in the future)
  • You can persist the BuilderSupplier and ResultConsumer messages, but, this is the most dangerous feature. Remember, that context enclosed into a lambda can be unavailable at the time you restore it. Best practice - parametrize BuilderSupplier or ResultConsumer function with immutable parameters, so that the context can be restored independently at the evaluation time. e.g. instead of passing opened input stream, pass file name, and open file inside the method or constructor. But, even in this case, file could be removed when the conveyor was offline.

Persistence Interface

First, you need to create and configure an instance of the Persistence interface. Example below shows how you can do it.

Persistence<Integer> persitence = JdbcPersistenceBuilder
	.presetInitializer("derby", Integer.class)
	.autoInit(true);
    .schema("userConv")
    .partTable("user")
    .completedLogTable("user_completed")
    .labelConverter(UserBuilderEvents.class)
    .build();

Here you can also set parameters for DB connection, data encryption, size and time for the archiving batch, archiving strategy. For now, three archiving strategies implemented. Doing nothing (rely on external process for archiving), delete completed build data, keep completed build data, but mark them with "archived" flag in the same part's table.

Using Persistent Conveyor

This is really very easy. You just wrap existing conveyor in persistence, and use it the same way. Before Persistence

AssemblingConveyor<Integer, UserBuilderEvents, User> conv = new AssemblingConveyor<>();

With Persistence

PersistentConveyor<Integer, UserBuilderEvents, User> persistentConv = 
    new PersistentConveyor(persistence, conv);

or

PersistentConveyor<Integer, UserBuilderEvents, User> persistentConv = 
    new PersistentConveyor(persistence, AssemblingConveyor::new);

Other features.

Set conveyor name.

conveyor.setName("Person Builder Conveyor");

This name will be also assigned to the Thread, where conveyor is executed and MBean with conveyor details. Clear thread name helps to read thread dumps and it can be included in the Log output. ParallelConveyor will add actual thread number to the name you provided. Knowing conveyor name is mandatory if you plan to access conveyor by it's name.

Check conveyor health status and basic metrics

//Method isRunning() returns false if conveyor stopped
if( conveyor.isRunning() ) {
   //do something
} else {
   throw new RuntimeException("Conveyor is not running");
}

//Size of balancing input buffer. Should be reasonably low almost all the time
//If it is growing faster than builder can process builds
//consider using parallel builder
int inputQueueSize = conveyor.getInputQueueSize();

//collector size shows how many builders currently waiting for the build. 
int collectorSize = conveyor.getCollectorSize();

//delay queue size can be zero even when collector size is > 0, in case 
//when none of the builds have expiration time
//delay queue size shows how mane different expiration timestamps were found
//collector size can have millions of builds in it, but delay queue still can be equal to 1
//if all builders spire at the same time
int delayedQueueSize = conveyor.getDelayedQueueSize();

Conveyor failure

If Builder throws any Exception (Runtime or not), it will be removed from the conveyor and sent to the Scrap Consumer.

However, if Builder throws an Error, the conveyor will stop! All remaining unfinished builds will be sent to the Scrap Consumer. Method isRunning() will start returning false. Conveyor cannot recover from this condition. You will have to create a new instance of it.

Suspending Conveyor

If you want to temporarily suspend reading from the input queue, you can use the suspend() method. It will not affect any currently processed data or commands.

conveyor.suspend();

To continue running, use the resume() method

conveyor.resume();

Both methods are asynchronous. They also available can be called via JMX. Boolean flag isSuspended shows current status.

If you want to make sure, that conveyor is in suspended mode and not processing anything, use the suspend command instead. Th command returns completable future, so you can synchronize your code with conveyor being idle.

CompletableFuture<Boolean> suspendFuture = conveyor.command().suspend();

Stopping Conveyor

You can stop conveyor by calling method stop() explicitly.

conveyor.stop();

When conveyor is stopped, it will throw the IllegalStateException on any attempt to add new Data to it and content of internal collections will be drained the the Scrap Consumer. Which can be more appropriate solution than just garbage collecting internal data.

If you would like to let conveyor finish all jobs pending completion before stopping it, use method

CompletableFuture<Boolean> stopFuture = conveyor.completeAndStop();

Future will return when there are no more messages in the queue of unfinished builds in conveyor. In order to avoid blocking you should specify timeout for each build. All new builds will be rejected, however, builder will allow to pass messages for already created builds, with corresponding key.

Build interruption

It is possible, that some builds take too long time. There are two possible reasons for that.

  • Internal thread in a wait condition. e.g. expecting input, or just sleeping.
  • Long loops

"interrupt" command sent to a thread in a non-wait state will have zero effect on it. e.g. loop

boolean keepRunning = true;

...
while(keepRunning) {
   //very long computations
   ...
}

will continue running forever.

method interrupt(String threadName) of the Conveyor interface allows to address some of those issues. By default, it just sends the interrupt commend to the thread. However, you can add the Interruptble interface to the Builder and implement your own interruption behavior. You can chose to access the inner thread instance, or modify build inner variables, so that it can exit too long computation loops.

boolean keepRunning = true;
...

//Method of the Interruptable interface
@Override
public void interrupt(Thread conveyorThread) {
   keepRunning = false;
}

...
while(keepRunning) {
   // very long computations
   ...
}

Notes: if you send interrupt command to a waiting thread, exception will be thrown, and whole build will be killed.

Interrupt command is always applied to the current build. No locking or synchronization.

Result and Scrap consumers executed by the same thread, but there is no access to their inner state. You still can try to kill them sending interrupt command to the thread. Hint: use builders ready condition to find in which state the build is.

Using conveyors as Services

RACE provides an interface ConveyorInitiatingService that allows to use conveyors as Java Services.

To use conveyor as a Service you have to:

Implement a class that will create and initiate your conveyor with a certain name. This name must be returned by the getInitiatedConveyorNames method.

Example of implementation

public class ExampleConveyorServiceInitializer implements ConveyorInitiatingService {

    private final static String CONVEYOR_NAME = "example";

    public CollectingConveyorServiceInitializer(){
        Conveyor<Integer,SOME_LABELS,Integer> conveyor = new AssemblingConveyor();
        conveyor.setName(CONVEYOR_NAME);
        // More conveyor configurations
        ...
        ...
    }

    @Override
    public List<String> getInitiatedConveyorNames() {
        return Arrays.asList(CONVEYOR_NAME);
    }
}

create a file com.aegisql.conveyor.ConveyorInitiatingService in the src/main/resources/META-INF/services

Each line in this file mast contain absolute path to the ConveyorInitiatingService implementations.

Example:

com.my_company.my_project.ExampleConveyorServiceInitializer

Load all conveyors using command:

Conveyor.loadServices();
// You can also check which conveyor names known in the system:
Set<String> knownConveyorNames = Conveyor.getKnownConveyorNames();

JMX

Conveyors create JMX MBeans under com.aegisql.conveyor domain. For example, K-Balanced conveyor with name "Parallel User Builder" and parallelism factor of 4 will create the following MBeans:

MBeans also created by the PersistentConveyor and its instance of Persistence.

You can access some operations and setters from JMX

Operations:

  • interrupt
  • stop
  • completeAndStop
  • suspend
  • resume
  • rejectUnexpireableCartsOlderThanMsec(long msec)

Setters:

  • idleHeartBeatMsec(long msec)
  • defaultBuilderTimeoutMsec(long msec)
  • expirationPostponeTimeMsec(long msec)

You can tell Conveyor to remove instance from the JMX visibility

Conveyor.unRegister("myConveyor");

Note: If you unregister your conveyor, it also makes it invisible for byName resolution and Lazy Suppliers.

Demo

Some Demos

Best Practice and Implementation ideas

Complex products, simple parts

Lets say, Person class, which we used a lot in our examples, has an Address, sometimes more than one. Address, by itself, is another complex class with many fields, such as Street, zip code, state, country, etc. If we were treating an instance of the Person as a usual POJO, we'll do something like this:

Person p = new Person();
...
Address mailingAddress = new Address(<address fields>);
p.setMailingAddress(mailingAddress);

Address billingAddress = new Address(<address fields>);
p.setBillingAddress()billibgAddress;
...

If you were taking address fields from some DB, you would probably ask your DB access layer to provide fully constructed instances of Address.

But, this is not necessarily the best way to do in case of Conveyor. There are some benefits in passing each field as a separate part.

  • Each part will be documented by the conveyor logging system
  • If you have several sources of data, you will have to pass only available parts; if they have NULL values, you will know it explicitly.
  • Persistence becomes much easier, if you use only trivial parts, such as Strings, Numbers, Dates, etc.
  • Data transport can be abstracted to work with simple named values, rather than changing over time business entities.
  • Transport and serialization of simple objects is also easier.

Exception handling

Since the main purpose of the framework is to provide standard data aggregation mechanism in multi-source and multi-thread environment, it is very common that various sources will come with their distinctive families of exceptions. When exception happens in some thread, then the easiest way to inform conveyor about is is to treat exception as a normal message. There a many different approaches how exceptions can be handled by the Builder (including our own project https://github.com/aegisql/ftry ), but in many practical cases the easiest way is just to re-throw the exception as soon as it received. This will result in termination of the build and call of the Scrap Bin processor, which, in many cases, is sufficient. Corresponding Futures will be terminated exceptionally.

public enum  BuilderLabel implements SmartLabel<MyBuilder> {
...
    FAILURE( (MyBuilder builder, RuntimeException ex) -> { throw ex; }  ),
...
}
try {
...
   ResultSet rs = stmt.executeQuery("SELECT * FROM my_schema.my_table WHERE NAME='John'");
...
} catch(SQLException ex){
   conveyor.part().id(requestKey).value(ex).label(BuilderLabel.FAILURE).place();
}

Please remember, that if there are more threads still pending for results - they know nothing about issues in other threads. If Conveyor has a default builder supplier, then first successful message from other thread will re-create build with the same ID (but different future!!!). If there is no timeout, default or set by cart, it can result In memory leak. If there is no default builder supplier, remaining data will be consumed by the Scrap processor.

Second option, process each data source error individually.

public enum  BuilderLabel implements SmartLabel<MyBuilder> {
...
    SQL_QUERY( MyBuilder::processSqlData  ),
    SQL_QUERY_FAILURE( MyBuilder::processSqlError ),
...
}
try {
...
   ResultSet rs = stmt.executeQuery("SELECT * FROM my_schema.my_table WHERE NAME='John'");
...
   conveyor.part().id(requestKey).value(mySqlData).label(BuilderLabel.SQL_QUERY).place();
} catch(SQLException ex){
   conveyor.part().id(requestKey).value(ex).label(BuilderLabel.SQL_QUERY_FAILURE).place();
}

Third option. Use SmartLabel interception mechanism and use one label to manage several object classes.

SmartLabel<UserBuilder> SET_FIRST = SmartLabel.of(UserBuilder::setFirst);
SET_FIRST = SET_FIRST.intercept(Exception.class, (builder,error)->{
	LOG.error("Intercepted Error: "+error.getMessage());			
});

Then, you can use the same label SET_FIRST to handle two different classes of messages, Strings for user first name and Exeption, if something went wrong.

Smart Labels

We already discussed several approaches to implement Labels. Best way is to implement the SmartLable interface. Most pf examples in this tutorial use Java enum. Enums have a lot of benefits. They are immutable highly efficient type of constants in Java. But Enums are obviously not the only possible way to implement the SmartLabel interface.

Inheritance or composition

TBA

Singleton

If you have to to guarantee uniqueness of an instance of Conveyor, or provide simple access to it from different unconnected units of your code, you, in most cases, do not need to do anything. Each Conveyor you create must have a unique name. In fact, it has, even if you have not assigned it explicitly. Unique name is used by the JMX service to create corresponding MBean. Bonus side effect of this, you can always request an instance of a Conveyor by its know name (another reason why you should always set conveyor name)

Conveyor<Integer,String,MyProduct> conveyor = Conveyor.byName("MyProductConveyor");

Conveyor also provides "Lazy suppliers" for named conveyors. Lazy supplier can be obtained at any time, regardless of, if corresponding conveyor already initiated or not. It makes instantiation of Conveyor easier.

private Supplier<Conveyor> conveyorSupplier = Conveyor.lazySupplier("MyProductConveyor");
...
Conveyor<Integer,String,MyProduct> myProductConveyor = conveyorSupplier.get();

Copyright (C) AEGIS DATA SOLUTIONS,LLC, 2016

Author: Mikhail Teplitskiy LinkedIn Profile

This software is free. Use it at your own risk. Feel free to copy and distribute it. Just keep copyright and author information. The Apache Software License, Version 2.0

⚠️ **GitHub.com Fallback** ⚠️