Home - krasserm/scalaz-camel GitHub Wiki
Scalaz-camel is now superseded by the streamz project and its combinators for Apache Camel.
This project provides a domain-specific language (DSL) for Apache Camel that is based on the Scala programming language and the Scalaz library. It applies functional programming (FP) concepts to enterprise integration patterns (EIPs) so that integration solutions can be constructed by composing EIPs with a functional DSL.
The scalaz-camel DSL is an alternative to Camel’s existing Java DSL and Scala DSL (it does not depend on the org.apache.camel.model
package). It was re-designed from scratch with the goal of a better and more native integration with the Scala programming language. See also this interview.
- A functional DSL for composing message processors to message processing routes. Message processors can be
- Scala functions that return the processing result directly
- Scala functions with continuation-passing style (CPS) for asynchronous, non-blocking processing
- Existing Camel message processors (both synchronous and asynchronous)
- Existing Camel endpoint producers (all Camel components can be re-used)
- A message processing route itself is a message processor and can be further composed.
- A message processing route can be created and applied anywhere in a program, not only in special route builders.
- Non-blocking routing of messages along the message processing chain.
- Fault-tolerance via error handling strategies with optional message redelivery.
- Immutability of messages so that they can be shared by concurrent message processors.
- Predefined message processors and enterprise integration patterns (EIPs).
- Configurable concurrency strategies.
- Extensibility of the DSL.
-
Akka integration
- For exchanging messages with Akka actors
- For implementing stateful EIPs such as aggregator, resequencer etc.
Let’s get started with a simple example. The example source code is part of the project. Instructions how to download and build scalaz-camel can be found here. To run the example enter
$ sbt
> project scalaz-camel-samples
> test-only
The following subsections will walk through the example step-by-step. It is assumed that you already have a basic knowledge of Apache Camel and the Scala programming language. A good introduction to Apache Camel is the free Chapter 1 of Camel in Action.
The example is from the order processing domain. It exposes a service that accepts purchase order messages, validates them and queues valid orders for further processing. This is outlined in the following figure.
With scalaz-camel, the above route can be implemented as follows
// order placement route (main route)
val placeOrderRoute = validateOrder >=> oneway >=> to("jms:queue:valid") >=> {
m: Message => m.setBody("order accepted")
}
// order placement route consuming from direct:place-order endpoint (incl. error handler)
from("direct:place-order") {
attempt { placeOrderRoute } fallback {
case e: ValidationException => { m: Message => m.setBody("order validation failed") }
case e: Exception => { m: Message => m.setBody("general processing error") } >=> failWith(e)
}
}
// order processing route
from("jms:queue:valid") {
split { m: Message => for (item <- m.bodyAs[PurchaseOrder].items) yield m.setBody(item) } >=> choose {
case Message(PurchaseOrderItem(_, "books", _, _), _) => orderItemToTuple >=> to("mock:books")
case Message(PurchaseOrderItem(_, "bikes", _, _), _) => to("mock:bikes")
} >=> { m: Message => println("received order item = %s" format m.body); m }
}
where validateOrder
and orderItemToTuple
are example-specific message processors:
// Continuation-passing style (CPS) message processor that validates order messages
// in a separate thread (created by Strategy.Naive). Validation responses are passed
// (asynchronously) to continuation k.
val validateOrder: MessageProcessor = (m: Message, k: MessageValidation => Unit) => {
Strategy.Naive.apply(m.body match {
case order: PurchaseOrder if (!order.items.isEmpty) => k(m.success)
case _ => k(m.setException(ValidationException("invalid order")).fail)
})
}
// Direct-style message processor that transforms an order item to a tuple. Synchronous processor.
val orderItemToTuple = (m: Message) => m.transform[PurchaseOrderItem](i => (i.customer, i.name, i.amount))
A purchase order is defined as follows.
case class PurchaseOrder(items: List[PurchaseOrderItem])
case class PurchaseOrderItem(customer: Int, category: String, name: String, amount: Int)
The example starts with the definition of placeOrderRoute
, a message processing route that is composed of four processors. The composition operator is >=>
. This is the Kleisli composition operator from Scalaz (the monad used for Kleisli composition is Scala’s Responder
, a continuation monad. The details behind Kleisli composition, the Responder
monad and implicit conversion from message processors to Kleisli structures are explained in the Architecture section (TODO). You can however safely skip that section, it is not really needed for understanding this example and for using the scalaz-camel DSL).
The first processor in the placeOrderRoute
, validateOrder
is a message written in continuation-passing style (CPS). It’s type is MessageProcessor
which is an alias for (Message, MessageValidation => Unit) => Unit
. The first parameter is the input Message
and the second parameter is a continuation that is called by the message processor with the message processing result (or response). The result type is MessageValidation
which can be either Failure
or Success
. A Failure
object contains a message that caused the failure. The failed message itself contains an exception object describing failure details. A Success
object contains a normal response message. The methods success
and fail
(from Scalaz) are used to create Success
and Failure
objects.
A CPS processor can call the continuation either synchronously or asynchronously (as done in validateOrder
using Scalaz’s Naive
concurrency strategy). Calling the continuation causes the response message to be routed to the next processor. Please note that scalaz-camel internally does not wait for a response (by blocking a thread) while running this processor (non-blocking routing).
When a message is successfully validated, it is routed to the next processor in the route. On failure, all remaining processors are bypassed and the final response is the failure response generated by the validator. Please note that bypassing remaining processors on failure is not a feature of the example validator, this is done internally by scalaz-camel. Any message processor that generates a Failure
response will cause scalaz-camel to skip all remaining processors (except those defined in error handlers).
The second processor in the route, oneway
, prepares the message for being sent via JMS to a queue of validated order messages. If we omitted this processor we’d tell the Camel JMS endpoint to wait for a reply (using a temporary queue) but this is not want we want. The third processor, to(“jms:queue:valid”)
, is an endpoint producer that actually puts validated order messages to a JMS queue.
The last processor in the chain (defined inline) finally generates a response that acknowledges the successful receipt of an order message. This processor is of type Message => Message
and is called a direct-style processor. Direct-style processors indicate successful message processing by returning a message. Failures are indicated by throwing exceptions. Internally, direct-style message processors are converted to CPS processors that wait for responses in the calling thread. Alternatively, one can also do a conversion to a CPS processor that executes the direct-style processor asynchronously (not shown). Direct-style message processors of type Message => MessageValidation
will be supported in upcoming versions.
The placeOrderRoute
can now either be applied to messages directly (see below) or it can be connected to an endpoint consumer using the from
method which takes an endpoint URI as parameter. In our example, the route is connected to the direct:place-order
endpoint. Any message sent to this endpoint (e.g. via a Camel producer template) will be processed by the placeOrderRoute
.
The example also defines an error handler with attempt { ... } fallback { ... }
. The attempt
block defines the route that may fail and the fallback
block a partial function that returns alternative routes that should be executed depending on the Exception
object contained in the failed message. The example defines error handling for the whole placeOrderRoute
but one can also define error handling for route fragments or even single message processors as well. The attempt { ... } fallback { ... }
expression itself is of type MessageProcessor
and can therefore be composed with other message processors or even be nested.
The order processing route receives validated order messages from a JMS queue. It consumes messages from the jms:queue:valid
endpoint. Here, the example doesn’t use a separate val
for the order processing route, instead, the route is defined inline.
The first processor in the route is a splitter. A splitter can be created with the split
method which has a parameter of type Message => Seq[Message]
. This is a function that implements the split logic and creates a sequence of messages from a single input message. In our example, we split an order message into n order item messages.
The order item messages are then routed to different destinations depending on the order item details. This is done with a content-based router that can be created with choose
. It takes a partial function that matches input messages and returns one of several destinations based on the matching result. In our example, order item messages are routed to different destinations (mock endpoints) based on their order item category values.
If the order item category equals "books"
then the order item is additionally transformed to a tuple (before sending it to the mock endpoint). This is done with a custom direct-style message processor named orderItemToTuple
(shown above). Finally, all messages (as received by the mock endpoints) are written to stdout by the last processor in the route.
The routes are defined, so we can start sending messages. Let’s define a purchase order for two books and a freeride bike.
val order = PurchaseOrder(List(
PurchaseOrderItem(123, "books", "Camel in Action", 1),
PurchaseOrderItem(123, "books", "DSLs in Action", 1),
PurchaseOrderItem(123, "bikes", "Canyon Torque FRX", 1)
))
We can send this purchase order with a Camel producer template to endpoint direct:place-order
. The expected answer is that the message has been accepted.
template.requestBody("direct:place-order", order) must equal ("order accepted")
When sending an invalid order (e.g. a simple String) then order validation will fail and the error handler generates a response with a custom error message.
template.requestBody("direct:place-order", "wrong") must equal ("order validation failed")
Failed message exchanges matched by the error handler are automatically marked as handled (hence the producer template won’t throw an exception). To mark a message exchange as failed, even if matched by an error handler, the markFailed
processor should be appended to the corresponding error handling route (as done in our example for exceptions other than ValidationException
).
The following message bodies arrive at the mock endpoints:
-
mock:books
endpoint:(123, “Camel in Action”, 1)
and(123, “DSLs in Action”, 1)
in any order -
mock:bikes
endpoint:PurchaseOrderItem(123, “bikes”, “Canyon Torque FRX”, 1)
There’s no need for a setting up an endpoint consumer (using from
) for applying routes to messages. Routes can also be applied to messages directly as shown in the following.
val orderMessage = Message(order)
// applies placeOrderRoute to orderMessage and waits for the response (process blocks).
placeOrderRoute process orderMessage
// applies placeOrderRoute to orderMessage and returns a response promise
// (submit does not block, promise.get blocks)
val promise = placeOrderRoute submit orderMessage
val result = promise.get // blocks until result is available
// applies placeOrderRoute to orderMessage and provides a continuation for processing the result
placeOrderRoute apply orderMessage.success { mv => ... }
To use the scalaz-camel DSL, your application needs to extend the Camel
trait and make the following imports.
import scalaz._
import scalaz.camel.core._
class MyApp extends Camel {
import Scalaz._
// ...
}
scalaz-camel can be configured with different concurrency strategies
for
- routing messages from one processor to the next processor in the message processing route (
Camel.dispatchConcurrencyStrategy
) and - distributing messages to destinations with
multicast
andscatter-gather
processors (Camel.multicastConcurrencyStrategy
)
The default setting for both is Strategy.Sequential
which uses the calling thread for execution. In our example, Strategy.Executor is used with a thread pool of size 3 for Camel.dispatchConcurrencyStrategy
. This essentially places a queue (that is part of the thread pool) between processors in a route and uses the threads of the pool to route messages from one processor to the next. With Strategy.Sequential
on the other hand, one could create routes that run synchronously, provided all participating processors run synchronously as well.
The Camel.multicastConcurrencyStrategy
can be ignored in our example because neither multicast
not scatter-gather
is used for message processing. Otherwise, this concurrency strategy defines whether to perform sequential or parallel multicast
or scatter-gather
.
Definition of routes requires the implicit definition of a Router
instance. A router must to be initialized with a CamelContext
instance.
val camelContext: CamelContext = ...
implicit val router = new Router(camelContext)
The router must be started before applying routes to messages.
router.start