Loops - laforge49/Asynchronous-Functional-Programming GitHub Wiki
Loops are quite common, but difficult to implement asynchronously. It is then fortunate that Functional Programming is becoming more popular, as the best approach to looping is to use actors which implement map, filter, fold, exists and find rather than implementing loops yourself. We will start by looking at simple looping and its limits. Later we will cover the conditional tailrec pattern--it is a bit more complicated but much more powerful.
Simple looping can only be used when (a) each iteration through the loop is independent of the others and (b) the number of iterations is not too large. Effectively, you just generate a message for each iteration and you are done when you have received all the responses. Lets look at some code that prints the numbers from 1 to n.
We will use two types of messages. The first invokes the loop; the second is used for each iteration.
case class Loop(n: Int)
case class I(i: Int)
We will need an actor, P, which prints i when it receives an I message.
class P extends Actor {
bind (classOf[I], ifunc)
def ifunc(msg: AnyRef, rf: Any => Unit) {
println(msg.asInstanceOf[I].i)
rf(null)
}
}
OK, now we need the actor, L, which receives a Loop message and sends the I messages to some actor, a.
class L(a: Actor) extends Actor {
bind(classOf[Loop], lfunc)
def lfunc(msg: AnyRef, rf: Any => Unit) {
val n = msg.asInstanceOf[Loop].n
if (n < 1) {
rf(null)
return
}
var i = 0
var r = 0
while (i < n) {
i += 1
a(I(i)) { rsp =>
r += 1
if (r == n) rf(null)
}
}
}
}
L.ifunc first validates Loop.n and returns a null result if n is less than 1. It then uses two counters, i and r. The i counter is used to count the messages sent to the a actor. The r counter is used to count the responses. L.ifunc sends Loop.n messages and on receipt of the last response it returns a null result.
Testing is pretty straight forward.
val mb = new ReactorMailbox
val p = P
p.setMailbox(mb)
println("synchronous test")
val sl = new L(p)
sl.setMailbox(mb)
Future(sl, Loop(3))
println("asynchronous test")
val al = new L(p)
al.setMailbox(new ReactorMailbox)
Future(al, Loop(3))
And here's the output.
synchronous test
1
2
3
asynchronous test
1
2
3
The problem with this pattern occurs when n is large and the messages sent by L are processed asynchronously. The problem is that L may generate messages faster than the receiving actor can process them. And if n is large enough, you may run out of memory.