AloFlux Transforms - atleon/atleon GitHub Wiki

For the vast majority of use cases, regardless of the source or destination of the messages in your pipeline, AloFlux is the reactive type that Atleon users will operate on. Let's go over some of the most common use cases.

Computationally-Bound Message Transforms

One of the most simple message pipelines you can have is a pipeline that simply transforms messages to something else and sends it somewhere. In this case, the application service we delegate to has a method like String transform(String message);. The following stream simply maps the values on Kafka ConsumerRecords to some other value, filters for non-empty results, and sends the results to a destination Kafka topic.

public class MyStream extends AloStream<MyStreamConfig> {

    @Override
    public Disposable startDisposable(MyStreamConfig config) {
        AloKafkaSender<String, String> sender = config.buildKafkaMessageSender();

        return config.buildKafkaMessageReceiver()
            .receiveAloRecords(config.getSourceTopic())
            .map(record -> config.getService().transform(record.value()))
            .filter(result -> result.length() > 0)
            .transform(sender.sendAloValues(config.getDestinationTopic(), message -> message.substring(0, 1)))
            .resubscribeOnError(config.name())
            .doFinally(sender::close)
            .subscribe(new DefaultAloSenderResultSubscriber<>());
    }
}

I/O-Bound Message Transforms with Non-blocking Types

A slightly more complex transform is one that requires I/O. In this case, the transformation is (typically) I/O-bound, and the service we delegate to will reflect that by updating its method to the form Mono<String> transform(String message); (note that we'll assume our transform will produce non-empty results from now on and remove the .filter).

public class MyStream extends AloStream<MyStreamConfig> {

    @Override
    public Disposable startDisposable(MyStreamConfig config) {
        AloKafkaSender<String, String> sender = config.buildKafkaMessageSender();

        return config.buildKafkaMessageReceiver()
            .receiveAloRecords(config.getSourceTopic())
            .concatMap(record -> config.getService().transform(record.value()))
            .transform(sender.sendAloValues(config.getDestinationTopic(), message -> message.substring(0, 1)))
            .resubscribeOnError(config.name())
            .doFinally(sender::close)
            .subscribe(new DefaultAloSenderResultSubscriber<>());
    }
}

In this case, we have switched .map for .concatMap to account for the fact that the application service returns a reactive type.

Other non-blocking types, like CompletableFuture, can be wired in the same way. Let's say our service's method is updated to the form CompletableFuture<String> transform(String message);:

public class MyStream extends AloStream<MyStreamConfig> {

    @Override
    public Disposable startDisposable(MyStreamConfig config) {
        AloKafkaSender<String, String> sender = config.buildKafkaMessageSender();

        return config.buildKafkaMessageReceiver()
            .receiveAloRecords(config.getSourceTopic())
            .concatMap(record -> Mono.fromFuture(config.getService().transform(record.value())))
            .transform(sender.sendAloValues(config.getDestinationTopic(), message -> message.substring(0, 1)))
            .resubscribeOnError(config.name())
            .doFinally(sender::close)
            .subscribe(new DefaultAloSenderResultSubscriber<>());
    }
}

I/O Bound Message Transforms with blocking Types

In many cases, the code we delegate to may be I/O bound, but not modeled with non-blocking types. These transforms can also be wired in to our stream, however the processing must be isolated on a Scheduler (from Schedulers) designed for blocking operations.

public class MyStream extends AloStream<MyStreamConfig> {

    @Override
    public Disposable startDisposable(MyStreamConfig config) {
        AloKafkaSender<String, String> sender = config.buildKafkaMessageSender();
        Scheduler scheduler = newBoundedElasticScheduler(1);

        return config.buildKafkaMessageReceiver()
            .receiveAloRecords(config.getSourceTopic())
            .publishOn(scheduler)
            .map(record -> config.getService().transform(record.value()))
            .transform(sender.sendAloValues(config.getDestinationTopic(), message -> message.substring(0, 1)))
            .resubscribeOnError(config.name())
            .doFinally(sender::close)
            .doFinally(__ -> scheduler.dispose())
            .subscribe(new DefaultAloSenderResultSubscriber<>());
    }
}

Concurrency

With I/O bound pipelines, it is frequently desirable to be able to increase processing concurrency. In the previous examples, we were effectively processing with a concurrency of one. With Atleon, we can increase processing concurrency using "grouping":

public class MyStream extends AloStream<MyStreamConfig> {

    @Override
    public Disposable startDisposable(MyStreamConfig config) {
        AloKafkaSender<String, String> sender = config.buildKafkaMessageSender();

        return config.buildKafkaMessageReceiver()
            .receiveAloRecords(config.getSourceTopic())
            .groupByStringHash(ConsumerRecord::key, config.getConcurrency())
            .innerConcatMap(record -> config.getService().transform(record.value()))
            .flatMapAlo(sender.sendAloValues(config.getDestinationTopic(), message -> message.substring(0, 1)))
            .resubscribeOnError(config.name())
            .doFinally(sender::close)
            .subscribe(new DefaultAloSenderResultSubscriber<>());
    }
}

The grouping in our case is through groupByStringHash. This operator extracts the String value of each ConsumerRecord's key and hashes it in to one of a configurable number (config.getConcurrency()) of processing "rails". Grouping results in a GroupFlux which has convenience methods for applying operators on each inner GroupedFlux, which are prefixed with inner.

The previous example assumed an application service that returns a reactive type. The following shows what's necessary to correct the transform method for invoking a blocking I/O signature (String transform(String message);):

public class MyStream extends AloStream<MyStreamConfig> {

    @Override
    public Disposable startDisposable(MyStreamConfig config) {
        AloKafkaSender<String, String> sender = config.buildKafkaMessageSender();
        Scheduler scheduler = newBoundedElasticScheduler(1);

        return config.buildKafkaMessageReceiver()
            .receiveAloRecords(config.getSourceTopic())
            .groupByStringHash(ConsumerRecord::key, config.getConcurrency())
            .innerPublishOn(scheduler)
            .innerMap(record -> config.getService().transform(record.value()))
            .flatMapAlo(sender.sendAloValues(config.getDestinationTopic(), message -> message.substring(0, 1)))
            .resubscribeOnError(config.name())
            .doFinally(sender::close)
            .doFinally(__ -> scheduler.dispose())
            .subscribe(new DefaultAloSenderResultSubscriber<>());
    }
}

Isolating the blocking I/O transform call is especially important in this case such that the main Receiver thread does not get blocked while emitting messages on any given processing rail.

Microbatching

Another common way to make message processing pipelines more efficient and elastic is to apply "microbatching". Microbatching in a message processing pipeline is achieved when we configure it to "Buffer messages up to size N or within time window T, whichever comes first, and emit each batch for processing". With Atleon, microbatching is accomplished with the .bufferTimeout operator. The following example assumes our application service's signature has been updated for batch processing (Flux<String> transformMany(List<String> messages);):

public class MyStream extends AloStream<MyStreamConfig> {

    @Override
    public Disposable startDisposable(MyStreamConfig config) {
        AloKafkaSender<String, String> sender = config.buildKafkaMessageSender();

        return config.buildKafkaMessageReceiver()
            .receiveAloRecords(config.getSourceTopic())
            .mapNotNull(ConsumerRecord::value)
            .bufferTimeout(config.getBatchSize(), config.getBatchDuration())
            .concatMap(batch -> config.getService().transformMany(batch), Integer.MAX_VALUE)
            .transform(sender.sendAloValues(config.getDestinationTopic(), message -> message.substring(0, 1)))
            .resubscribeOnError(config.name())
            .doFinally(sender::close)
            .subscribe(new DefaultAloSenderResultSubscriber<>());
    }
}

Note that it is recommended to explicitly set the prefetch after a .bufferTimeout to Integer.MAX_VALUE.

For completeness, here's an example where the application service has a blocking I/O signature (List<String> transformMany(List<String> messages);):

public class MyStream extends AloStream<MyStreamConfig> {

    @Override
    public Disposable startDisposable(MyStreamConfig config) {
        AloKafkaSender<String, String> sender = config.buildKafkaMessageSender();
        Scheduler scheduler = newBoundedElasticScheduler(1);

        return config.buildKafkaMessageReceiver()
            .receiveAloRecords(config.getSourceTopic())
            .mapNotNull(ConsumerRecord::value)
            .bufferTimeout(config.getBatchSize(), config.getBatchDuration())
            .publishOn(scheduler, Integer.MAX_VALUE)
            .flatMapCollection(batch -> config.getService().transformMany(batch))
            .transform(sender.sendAloValues(config.getDestinationTopic(), message -> message.substring(0, 1)))
            .resubscribeOnError(config.name())
            .doFinally(sender::close)
            .doFinally(__ -> scheduler.dispose())
            .subscribe(new DefaultAloSenderResultSubscriber<>());
    }
}

Concurrency + Microbatching

In some cases, it may be desirable to combine concurrency with microbatching:

public class MyStream extends AloStream<MyStreamConfig> {

    @Override
    public Disposable startDisposable(MyStreamConfig config) {
        AloKafkaSender<String, String> sender = config.buildKafkaMessageSender();

        return config.buildKafkaMessageReceiver()
            .receiveAloRecords(config.getSourceTopic())
            .groupByStringHash(ConsumerRecord::key, config.getConcurrency())
            .innerBufferTimeout(config.getBatchSize(), config.getBatchDuration())
            .innerConcatMap(record -> config.getService().transform(record.value()), Integer.MAX_VALUE)
            .flatMapAlo(sender.sendAloValues(config.getDestinationTopic(), message -> message.substring(0, 1)))
            .resubscribeOnError(config.name())
            .doFinally(sender::close)
            .subscribe(new DefaultAloSenderResultSubscriber<>());
    }
}

We can also update the transform method if you are delegating to an I/O-Bound application service with a blocking signature:

public class MyStream extends AloStream<MyStreamConfig> {

    @Override
    public Disposable startDisposable(MyStreamConfig config) {
        AloKafkaSender<String, String> sender = config.buildKafkaMessageSender();
        Scheduler scheduler = newBoundedElasticScheduler(1);

        return config.buildKafkaMessageReceiver()
            .receiveAloRecords(config.getSourceTopic())
            .groupByStringHash(ConsumerRecord::key, config.getConcurrency())
            .innerBufferTimeout(config.getBatchSize(), config.getBatchDuration())
            .innerPublishOn(scheduler, Integer.MAX_VALUE)
            .innerMap(record -> config.getService().transform(record.value()))
            .flatMapAlo(sender.sendAloValues(config.getDestinationTopic(), message -> message.substring(0, 1)))
            .resubscribeOnError(config.name())
            .doFinally(sender::close)
            .doFinally(__ -> scheduler.dispose())
            .subscribe(new DefaultAloSenderResultSubscriber<>());
    }
}

Terminal (Non-Sending) Transforms

In many cases, the transform you want to apply does not requiring sending any form of "result" to another message broker. This is typically the case if/when the transform just needs to interact with some other resource, like another application, data store, etc., and the transform/processing is done after completing that interaction. The signature for such an application service to delegate to would not return anything, or return a nil type, like Void.

Let's update the service method to which we delegate to have the signature Mono<Void> handle(String message);.

public class MyStream extends AloStream<MyStreamConfig> {

    @Override
    public Disposable startDisposable(MyStreamConfig config) {
        return config.buildKafkaMessageReceiver()
            .receiveAloValues(config.getSourceTopic())
            .concatMap(record -> config.getService().handle(record.value()))
            .resubscribeOnError(config.name())
            .subscribe();
    }
}

The updated stream requires no Sender, nor a specific Subscriber, since the result of processing produces nothing to act on.

We can also update the handle method if you are delegating to an I/O-Bound application service with a blocking signature void handle(String message);:

public class MyStream extends AloStream<MyStreamConfig> {

    @Override
    public Disposable startDisposable(MyStreamConfig config) {
        Scheduler scheduler = newBoundedElasticScheduler(1);

        return config.buildKafkaMessageReceiver()
            .receiveAloValues(config.getSourceTopic())
            .publishOn(scheduler)
            .consume(record -> config.getService().handle(record.value()))
            .resubscribeOnError(config.name())
            .doFinally(__ -> scheduler.dispose())
            .subscribe();
    }
}

For a blocking consumption, we replace concatMap with consume. This results in an AloFlux of Void, which will emit no values.

⚠️ **GitHub.com Fallback** ⚠️