AsyncMailbox - laforge49/Asynchronous-Functional-Programming GitHub Wiki
AsyncFP operates on a single thread, because that is generally faster. But sometimes it is better to do things in parallel. That's when you need an asynchronous mailbox.
As an exercise, we will use Pause to represent a work load that you want to be able to run in parallel. Here's the code, including a convenience companion object:
case class Pause()
class Worker extends Actor {
bind(classOf[Pause], pause)
def pause(msg: AnyRef, rf: Any => Unit) {
Thread.sleep(200)
rf(null)
}
}
object Pause {
def apply(rf: Any => Unit)
(implicit srcActor: ActiveActor) {
val worker = new Worker
worker.setExchangeMessenger(srcActor.bindActor.newAsyncMailbox)
worker(Pause())(rf)
}
}
Note the call to newAsyncMailbox in the Pause companion actor. This is what forces messages sent to the worker actor to be run on a different thread... which is the default for Scala actors.
Next we need a driver, an actor which will launch any number of worker actors. Once they have all completed, we print out the elapsed time and return a null result:
case class Doit(c: Int)
class Driver extends Actor {
bind(classOf[Doit], doit)
var rem = 0
var c = 0
var rf: Any => Unit = null
var t0 = 0L
def doit(msg: AnyRef, _rf: Any => Unit) {
c = msg.asInstanceOf[Doit].c
rem = c
rf = _rf
t0 = System.currentTimeMillis
var i = 0
while(i < c) {
i += 1
Pause(r)
}
}
def r(rsp: Any) {
rem -= 1
if (rem == 0) {
val t1 = System.currentTimeMillis
println("total time for "+c+" messages = "+(t1 - t0)+" milliseconds")
rf(null)
}
}
}
Now we need a bit of test code:
val systemServices = SystemServices()
try {
val driver = new Driver
driver.setExchangeMessenger(systemServices.newSyncMailbox)
Future(driver, Doit(10))
Future(driver, Doit(20))
} finally {
systemServices.close
}
A SystemServices object has any number of uses. It manages the threads--which is why it is important to do the close if you are running multiple tests. It also creates the mailboxes. Later we will see how SystemServices is used in IOC.
The test code runs two tests, the first with 10 simulated work loads and the second with 20. As workloads take 200 millisconds and are run in parallel, we would expect to see the total time to be around 200 milliseconds. Only we may run out of threads, in which case the total time will be some multiple of 200 milliseconds, depending on the number of simulated workloads and the number of available threads. Here's the output:
total time for 10 messages = 208 milliseconds
total time for 20 messages = 401 milliseconds