Reference - krasserm/scalaz-camel GitHub Wiki

Reference

This reference is work in progress.

Router

TODO

Route Application

TODO

Endpoint Consumers

TODO

Error Handling

Error handling is done with the

attempt {
  // route that may fail
} fallback {
  // error handler ...
}

expression. It is of type MessageProcessor (and can therefore be composed with other message processors). The attempt block may contain a single message processor or a message processing route that may fail. The error handler is a partial function of type AttempHandler1 which returns error handling routes based on matched exceptions. Please note that the attempt { ... } fallback { ... } construct does not catch exceptions thrown by message processors, it rather relies on Exception objects stored in MessageContext.

TODO: describe failWith (see also this test)

Message Redelivery

Message redelivery is related to error handling and can be done with the

attempt(n) {
  // route that may fail
} fallback {
  // retry handler ...
}

expression. It is of type MessageProcessor (and can therefore be composed with other message processors). The first attempt parameter defines the number of routing attempts. The second parameter is a single message processor or a message processing route that may fail. The retry handler is a partial function of type AttempHandlerN which returns message processing routes based on matched (exception, retry-state) pairs. Here’s an example.

val p: MessageProcessor = attempt(3) {
  someProcessor1 >=> someProcessor2
} fallback {
  case (e: ExceptionA, s) => retry(s)
  case (e: ExceptionB, s) => orig(s) >=> retry(s)
  case (e: ExceptionC, s) => orig(s) >=> someProcessor3 >=> retry(s)
  case (e: ExceptionD, _) => failWith(e)
}

This expression defines a message processor that makes at most 3 attempts to route an input message along the message processing chain someProcessor1 >=> someProcessor2. Let’s say someProcessor2 may fail with any of the exceptions of type ExceptionA, ExceptionB, ExceptionC or ExceptionD (the exception object is stored in the message’s MessageContext, see also Error Handling section).

  • If the failure was caused by ExceptionA then another attempt is made to route the message. The number of remaining routing attempts is tracked by the retry-state object s which must be passed as argument to the retry processor. retry finally triggers the next routing attempt. The message used for next routing attempt is the message that was used as input to the failed processor. In this example, it is the response of someProcessor1 because someProcessor2 reported a failure of type ExceptionA.
  • If the failure was caused by ExceptionB then retry attempts are made as described before but now with the original message (i.e. the input to someProcessor1). The original message is obtained with the orig processor from the retry-state object s.
  • If the failure was caused by ExceptionC then retry attempts are made with the original message transformed by someProcessor3.
  • If the failure was caused by ExceptionD then no retry attempt is made and the whole processor p fails with that exception.

Please note that the orig and retry processors do not depend on each other and can also be used at any position in the retry handling routes. For example, having the retry processor not on the last position allows to do some retry post-processing.

Integration Patterns

Here’s an (incomplete) list of supported integration patterns.

Pattern
Content-Based Router Implementation Example
Static Recipient List Implementation Example
Scatter-Gather Implementation Example
Filter Implementation Example
Splitter Implementation Example
Aggregator Implementation (core)
Implementation (akka)
Example (core)
Example (akka)
Endpoint Implementation (consumer)
Implementation (producer)
Example (consumer)
Example (producer)
..

More patterns will be supported in upcoming versions but applications can easily define their own patterns using

  • Scala functions of type Message => Message or
  • Scala functions of type MessageProcessor or
  • Camel Processor or AsyncProcessor instances

Immutable Messages

TODO

Concurrency Strategies

TODO

  • See also CamelLoadTest for an example how to configure scalaz-camel for high message load scenarios.
  • See also this presentation on concurrent functional programming with Scalaz.

Akka Integration

scalaz-camel supports message exchanges with Akka actors. For example, this allows for an easier implementation of stateful integration patterns such as aggregator or resequencer. The scalaz-camel-akka dependency can be added to an sbt project definition as explained in section Project Setup. Applications additionally need to extend the Akka trait as shown in the following example.

import scalaz._
import scalaz.camel.akka._
import scalaz.camel.core._

class MyApp extends Camel with Akka {
  // ...
  val context: CamelContext = ...

  implicit val router = new Router(context) with ActorMgnt
  // ...
}

A Router extended with the ActorMgnt trait can be used to couple the lifecycle of actors to that of the router, so that applications don’t have to explicitly start and stop their actors (see below).

An Akka actor can be converted to a MessageProcessor with the to method which defines an ActorRef as parameter. This way, the converted actor can be composed with other message processors as shown in the following example.

import akka.actor.Actor
// other imports omitted ...

class SomeActor extends Actor {
  // ...
}

val actor = Actor.actorOf[SomeActor].start
val route: MessageRoute = someProcessor1 >=> to(actor) >=> someProcessor2

The above example starts the actor explicitly with the start method. If you want to couple the actor’s lifecycle to that of the current router, you can create a managed actor with actor.manage. The actor is then started and stopped with the router (recommended approach).

val route: MessageRoute = someProcessor1 >=> to(actor.manage) >=> someProcessor2

Any reply sent by the actor will be passed to someProcessor2. The actor can also reply multiple times which causes someProcessor2 to receive multiple messages. The message received by the actor is of type scalaz.camel.core.Message. If you want to interact with Akka actors via Akka’s Camel integration (and receive messages of type akka.camel.Message) you’ll need to provide the actor’s URI instead of the actor reference which can be done as follows.

val route: MessageRoute = someProcessor1 >=> to(actor.uri) >=> someProcessor2

or

val route: MessageRoute = someProcessor1 >=> to(actor.manage.uri) >=> someProcessor2

Here, the interaction with the actor is mediated by the Akka actor component (a Camel component that is part of the Akka distribution). This approach, however, only supports single replies from the actor. The scalaz-camel-akka component also comes with a simple implementation of an actor-based aggregator. This aggregator actor can be created with the

aggregate using <aggregation-function> until <completion-predicate>

expression where <aggregation-function> and <completion-predicate> are application-defined functions that define how to combine two messages into a single message and when the aggregation is done (completed). A simple example is shown in this test.

DSL Extensions

TODO

⚠️ **GitHub.com Fallback** ⚠️