Error Handling, Delegation, and Deadlettering - atleon/atleon GitHub Wiki

It's important to consider what happens when unhandled processing errors occur. Such errors may be transient or permanent. Transient errors are conventionally handled by retrying processing (resubscribeOnError). Dealing with permanent errors (i.e. "bad" data) may be a little more nuanced. In such cases, without explicitly handling such errors, manual intervention may be required.

In the worst case scenarios, a single message containing unprocessable data would prevent further processing of messages containing processable data.

Atleon provides mechanisms for configuring error handling, specified as "emission" or "delegation".

Error Emission (Default Mode)

The default error handling mode in Atleon is to emit errors into streams. Without any error handling, this would cause the stream to permanently die. This is why it's typically recommended to incorporate resubscribeOnError into your stream definitions.

Skipping Unprocessable Data

The following example shows how to skip invalid data:

import io.atleon.core.AloStream;
import io.atleon.core.DefaultAloSenderResultSubscriber;
import io.atleon.kafka.AloKafkaSender;
import reactor.core.Disposable;

public class MyStream extends AloStream<MyStreamConfig> {

    @Override
    public Disposable startDisposable(MyStreamConfig config) {
        AloKafkaSender<String, String> sender = config.buildKafkaMessageSender();

        return config.buildKafkaMessageReceiver()
            .receiveAloRecords(config.getSourceTopic())
            .map(record -> config.getService().transform(record.value()))
            .transform(sender.sendAloValues(config.getDestinationTopic(), message -> message.substring(0, 1)))
            .onAloErrorEmitUnless(IllegalArgumentException.class::isInstance)
            .resubscribeOnError(config.name())
            .doFinally(sender::close)
            .subscribe(new DefaultAloSenderResultSubscriber());
    }
}

This example makes use of onAloErrorEmitUnless to skip any errors that are IllegalArgumentException. Other errors are emitted.

Error Delegation

Instead of immediately emitting errors, error handling can be "delegated". When errors are delegated, this typically results in invoking an Alo element's "negative acknowledger" (aka "nacknowledger").

Error delegation mode must be explicitly activated using onAloErrorDelegate*. Without any other changes, this will result in invoking a given broker's native negative acknowledger. This default behavior is usually to also emit the error, and therefore, onAloErrorDelegate* must also be supplemented by explicitly configuring delegation behavior.

Depending on the infrastructure you are consuming from, there are different out-of-the box "native" negative acknowledger types available. It is also possible for you to implement your own custom negative acknowledger using the broker-specific factory interface; However, bear in mind that these interfaces do not allow for "ignoring" errors via positive acknowledgement, and therefore "ignoring" errors must be implemented in your stream, such as by explicitly ignoring them in your processing code (i.e. try-catch), using onAloErrorDelegateUnless, or via a delegator provided to addAloErrorDelegation (more on this below).

The following table shows the available native negative acknowledgement types:

Broker Configuration Available Types
Kafka kafka.receiver.nacknowledger.type emit (default): Emits error into stream
<any other value>: Implementation of io.atleon.kafka.NacknowledgerFactory
RabbitMQ rabbitmq.receiver.nacknowledger.type emit (default): Emits error into stream
requeue: Nacks the message with requeue set to true
discard: Nacks the message with requeue set to false
<any other value>: Implementation of io.atleon.rabbitmq.NacknowledgerFactory
AWS SQS sqs.receiver.nacknowledger.type emit (default): Emits error into stream
visibility_reset: Changes message's visibility timeout
<any other value>: Implementation of io.atleon.aws.sqs.NacknowledgerFactory

You can use addAloErrorDelegation in your stream definition to implement more complex error delegation. This method takes a BiFunction<T, Throwable, Publisher<?>> which is invoked when an error occurs, passing the associated data element and the error, and returns a Publisher. If a resulting Publisher completes successfully, then the originating element will be positively acknowledged, else the element will be negatively acknowledged.

Re-sending Unprocessable Data (Dead-lettering)

Rather than skipping bad data (as previously shown), we can send unprocessable data somewhere else for potential further debugging. This is commonly known as "dead-lettering". The following example shows how to implement this behavior:

import io.atleon.core.AloStream;
import io.atleon.core.DefaultAloSenderResultSubscriber;
import io.atleon.core.ErrorDelegator;
import io.atleon.kafka.AloKafkaSender;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import reactor.core.Disposable;

public class MyStream extends AloStream<MyStreamConfig> {

    @Override
    public Disposable startDisposable(MyStreamConfig config) {
        AloKafkaSender<String, String> sender = config.buildKafkaMessageSender();

        ErrorDelegator<ConsumerRecord<String, String>> deadletterer = ErrorDelegator.sending(sender::sendRecord)
            .errorMustMatch(IllegalArgumentException.class::isInstance)
            .composeData(record -> new ProducerRecord<>(config.getDeadletterTopic(), record.key(), record.value()));

        return config.buildKafkaMessageReceiver()
            .receiveAloRecords(config.getSourceTopic())
            .addAloErrorDelegation(deadletterer)
            .map(record -> config.getService().transform(record.value()))
            .transform(sender.sendAloValues(config.getDestinationTopic(), message -> message.substring(0, 1)))
            .onAloErrorDelegate()
            .resubscribeOnError(config.name())
            .doFinally(sender::close)
            .subscribe(new DefaultAloSenderResultSubscriber());
    }
}

In the above example, we have built an ErrorDelegator that sends errored ConsumerRecord instances to a "dead-letter" topic. This delegation only occurs if the error is an IllegalArgumentException, and other errors will result in emission into the stream. This delegator is provided to addAloErrorDelegation, and lastly, we activate error delegation mode using onAloErrorDelegate.

⚠️ **GitHub.com Fallback** ⚠️