AloStreamConfig and AloStream - atleon/atleon GitHub Wiki
The main entry point into Atleon is through AloStreamConfig and AloStream.
-
AloStreamConfig
is the interface used to configure and build the resources needed to define a message processing pipeline -
AloStream
is to be extended and contain the message processing pipeline definition
Separating the pipeline definition from its configuration allows defining a reusable stream that can be dynamically configured. It may also separate the pipeline definition from how the resources it needs are sourced, i.e. through a dependency injection. However, when using certain frameworks (like Spring), it is possible to inject dependencies into the AloStream
itself, and it can therefore be more convenient/intuitive to use SelfConfigurableAloStream
, which removes the need to define an AloStreamConfig
separately from your AloStream
.
We'll first walk through the case where AloStreamConfig
is defined separately from AloStream
. An example AloStreamConfig
based on Kafka might look like the following:
import io.atleon.core.AloStreamConfig;
import io.atleon.kafka.AloKafkaReceiver;
import io.atleon.kafka.KafkaConfigSource;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class MyStreamConfig implements AloStreamConfig {
private final KafkaConfigSource originKafkaConfig; // Should contain entry for "bootstrap.servers", at minimum
private final KafkaConfigSource destinationKafkaConfig; // Should contain entry for "bootstrap.servers", at minimum
private final String originTopic;
private final String destinationTopic;
private final MyApplicationService service; // Some application service to delegate to for processing
public MyStreamConfig(
KafkaConfigSource originKafkaConfig,
KafkaConfigSource destinationKafkaConfig,
String originTopic,
String destinationTopic,
MyApplicationService service
) {
this.originKafkaConfig = originKafkaConfig;
this.destinationKafkaConfig = destinationKafkaConfig;
this.originTopic = originTopic;
this.destinationTopic = destinationTopic;
this.service = service;
}
public AloKafkaSender<String, String> buildKafkaMessageSender() {
return originKafkaConfig.withClientId(name())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.as(AloKafkaSender::create);
}
public AloKafkaReceiver<String, String> buildKafkaMessageReceiver() {
return originKafkaConfig.withClientId(name())
.with(ConsumerConfig.GROUP_ID_CONFIG, "my-application-my-stream")
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
.as(AloKafkaReceiver::create);
}
public String getOriginTopic() {
return originTopic;
}
public String getDestinationTopic() {
return destinationTopic;
}
public MyApplicationService getService() {
return service;
}
}
The above config is constructed with all the parameters that are needed for our stream to build resources for receiving and sending messages to/from (possibly-different) Kafka clusters.
We'll next define the message processing pipeline by extending AloStream
:
import io.atleon.core.AloStream;
import io.atleon.core.DefaultAloSenderResultSubscriber;
import io.atleon.kafka.AloKafkaSender;
import reactor.core.Disposable;
public class MyStream extends AloStream<MyStreamConfig> {
@Override
public Disposable startDisposable(MyStreamConfig config) {
AloKafkaSender<String, String> sender = config.buildKafkaMessageSender(); // (1)
return config.buildKafkaMessageReceiver() // (2)
.receiveAloRecords(config.getOriginTopic()) // (3)
.map(record -> config.getService().transform(record.value())) // (4)
.transform(sender.sendAloValues(config.getDestinationTopic(), message -> message.substring(0, 1))) // (5)
.resubscribeOnError(config.name()) // (6)
.doFinally(sender::close) // (7)
.subscribe(new DefaultAloSenderResultSubscriber()); // (8)
}
}
The above stream definition does the following:
- Build a Sender instance which will be used by the subsequent stream definition
- Build a Receiver from which we can receive messages
- Create an
AloFlux
(implementation ofPublisher
) of KafkaConsumerRecord
from the Receiver - Transform the messages my delegating the received record values to an application service
- The transform in this case is assumed to be a non-blocking computationally-bound transform
- Ideally, transforms that require I/O will return reactive types (
Publisher
,Flux
,Mono
) or types that can be wrapped as such (CompletableFuture
, etc.), and the stream should use.concatMap
or.flatMap
instead of.map
- Blocking I/O transforms can also be applied, but must be isolated on their own Scheduler. See AloFlux Transforms for more info.
- Send the transformed values to the destination Kafka topic, using the first letter of each message as the
ProducerRecord
key- Using the first letter of each message is just for demonstration purposes
- Upon occurrence of any upstream error, resubscribe the whole sequence indefinitely
- In the case of Kafka, this provides resiliency in the face of rebalances, cluster cycling, etc.
- Resubscriptions happen with a configurable delay
- If/when the stream is ever stopped, close the Sender
- Trigger the pipeline by subscribing with a
Subscriber
that positively acknowledges each sent result that does not have an error- If sending any given message results in an error, the result will be negatively acknowledged ("nacknowledged")
- Negatively acknowledging a result causes an error to be emitted in the receiver. Coupled with
.resubscribeOnError
, this will cause previous messages that could not be fully processed to be consumed again
If/when it is possible to use SelfConfigurableAloStream
, the consolidated AloStream
from above looks like this:
import io.atleon.core.DefaultAloSenderResultSubscriber;
import io.atleon.core.SelfConfigurableAloStream;
import io.atleon.kafka.AloKafkaSender;
import reactor.core.Disposable;
public class MyStream extends SelfConfigurableAloStream {
private final KafkaConfigSource originKafkaConfig; // Should contain entry for "bootstrap.servers", at minimum
private final KafkaConfigSource destinationKafkaConfig; // Should contain entry for "bootstrap.servers", at minimum
private final String originTopic;
private final String destinationTopic;
private final MyApplicationService service; // Some application service to delegate to for processing
public MyStream(
KafkaConfigSource originKafkaConfig,
KafkaConfigSource destinationKafkaConfig,
String originTopic,
String destinationTopic,
MyApplicationService service
) {
this.originKafkaConfig = originKafkaConfig;
this.destinationKafkaConfig = destinationKafkaConfig;
this.originTopic = originTopic;
this.destinationTopic = destinationTopic;
this.service = service;
}
@Override
public Disposable startDisposable(MyStreamConfig config) {
AloKafkaSender<String, String> sender = buildKafkaMessageSender();
return buildKafkaMessageReceiver()
.receiveAloRecords(originTopic)
.map(record -> service.transform(record.value()))
.transform(sender.sendAloValues(destinationTopic, message -> message.substring(0, 1)))
.resubscribeOnError(name())
.doFinally(sender::close)
.subscribe(new DefaultAloSenderResultSubscriber());
}
private AloKafkaSender<String, String> buildKafkaMessageSender() {
return originKafkaConfig.withClientId(name())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.as(AloKafkaSender::create);
}
private AloKafkaReceiver<String, String> buildKafkaMessageReceiver() {
return originKafkaConfig.withClientId(name())
.with(ConsumerConfig.GROUP_ID_CONFIG, "my-application-my-stream")
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
.as(AloKafkaReceiver::create);
}
}