20. Actors - RobertMakyla/scalaWiki GitHub Wiki
Key points
Extend the 'Actor' class and provide an 'act' method for each actor.
To send a message to an actor, use: actor ! message.
Message sending is asynchronous: "send and forget."
To receive messages, an actor calls 'receive' or 'react', usually in a loop.
The argument to 'receive/react' is a block of case clauses (technically, a partial function).
Actors should never share state. Always send data using messages.
Don't invoke methods on actors. Communicate by sending messages.
Avoid synchronous messaging - that is, unlink sending a message and waiting for a reply.
Actors can share threads by using 'react' instead of 'receive', provided the control flow of the message handler is simple.
It is OK to let actors crash, provided you have other actors that monitor their demise. Use linking to set up monitoring relationships.
Creating and Starting Actors
import scala.actors.Actor
class HiActor extends Actor {
def act() {
while (true) {
receive {
case "Hi" => println("Hello")
}
}
}
}
The atc() method of many different actors man run concurrently, like java Runnable.run()
val actor1 = new HiActor
actor1.start() // now it runs so I can send some message
The Actor companion object has actor() method for creating actors on the fly:
import scala.actors.Actor._
val actor2 = actor {
while (true) {
receive {
case "Hi" => println("Hello")
}
}
}
When computation is divided among actors and result must be gathered,
we must use some thread safe data structure, e.g. concurrent hash map.
However, Actor's model discourage using shared data.
Instead actor should send result to another actor. E.g. :
1. There may be a number of global actors gathering result data ( not really scalable )
2. Actors may be provided with reference to next actor which should get the result. eg:
actor ! Compute(data, continuation)
'continuation' is a next actor that should be called with the result of the computation
Caution: 'actor' should send the result to continuation and not invoke it's methods directly.
Otherwise there might be concurrent modification at once.
Channels - With ! operator it's ASYNC messaging
Instead of sharing reference to actor, it's better to share channels to actors.
- channels are typesafe (I can send only messages of particular types)
- with channels I can't accidentally invoke actor's method.
There are:
- OutputChannel trait - with ! method
- InputChannel trait - with receive / react methods
The 'Channel' class extends both. There are 2 ways of creating:
val channel = new Channel[String](MyActor) // channel is tied to someActor
actor { val channel = new Channel[String] } // channel is tied to current actor
Typical example:
case class Msg(input: String, result: OutputChannel[String]) // Msg: data and continuation actor (under channel)
class Computer extends Actor { // 1st actor
def act() {
while (true) {
receive {
case Msg(input, out) => { val answer = input.toUpperCase ; out ! answer }
}
}
}
}
actor { // 2nd actor
val channel = new Channel[String] // tied to actor surrounding this line
// val channel = new Channel[String](this) - the same
val computingActor: Computer = new Computer
val input: String = "hello"
computingActor start() // start computing actor
computingActor ! Msg(input, channel) // sending msg to computing actor
channel.receive { // receiving result on channel of current actor
case x => println("received: "+x) // x is known to be an String
}
}
If I wanted to receive on current actor, and not its channel,
I must match an instance of case class with '!' :
receive {
case !(channel, x) => ...
}
Synchronous Messages
val reply = account !? Deposit(1000) // sender is waiting for reply synchronously - and getting it as value
for this to work, the account actor must return a message to sender
receive {
case Deposit(amount) => { balance += amount; sender ! Balance(balance) } // sender is blocked until account completes deposit
...
}
or
receive {
case Deposit(amount) => { balance += amount; reply( Balance(balance) ) }
...
}
Synchronous Actor can lead to deadlocks !
Normally I don't want to wait forever so I can:
actor {
worker ! Task(data, self)
receiveWithin(seconds * 1000) { // will receive response or Actor.TIMEOUT object
case Result(data) => ...
case TIMEOUT => log(...)
}
}
Future - The !! operator synchronously returns Future
Instead of waiting for an answer, I can opt to receive a Future: an object that will yield a result when it's available.
val replyFuture: Future = account !! Deposit(1000)
Getting data out of it (waiting for this data synchronously)
val isAvailable = replyFuture.isSet() // tests whether the future's result is available
if(isAvailable) {
val reply = replyFuture()
}
If I try to get the data out of Future immediately - I might as well use synchronous call.
But as actor, I can put Future aside for a while or pass it to some other actor.
Thread Sharing
Normally each actor has it's own thread (Initially, there are 4 worker threads)
But for a huge number of actors it's expensive to have 1 thread per actor.
Sometimes after receiving 1 msg, we want to wait for another message to process it.
Blocking actor/thread for this awaiting time is very expensive.
BEST PRACTICE: we can have a single thread executing msg handling of many actors.
we can have it without locking out thread between messages !!!
The 'react' method makes possible to wait for a few messages instead of expensive thread-blocking operations
The 'react' method takes a partial function and adds it to the mailbox, then exits.
react { // Partial function f1
case WithdrawDryRun(amount) =>
case Withdraw(amount) =>
reactWithin(seconds * 1000) { // (react/reactWithin) Partial function f2
case Confirm() => println("Confirming " + amount)
}
}
In this actor, first call to react() associates f1 with actor's mailbox, then exits.
When Withdraw msg arrives, f1 is invoked. The partial function f1 calls second react().
This associates f2 with mailbox. When Confirm msg arrives, the f2 is invoked.
f1 never returns a value (return Type is Nothing)
Since react() is of type 'Nothing' (abnormal termination - just like exception), I can't just place it in loop cause it'd jump out of it after first iteration:
def act() {
while (true) {
react { // Partial function f1 --> type 'Nothing' exits infinite loop just like exception !!!!! but WE WANT LOOP TO WORK INFINITELY !!
case Withdraw(amount) => println("Withdrawing " + amount)
}
}
}
correct way:
def act() {
loop {
react { // Partial function f1 --> does not exit loop block: loop {}
case Withdraw(amount) => println("Withdrawing " + amount)
}
}
}
possible to limit the number of message processing:
def act() {
loopWhile(count < max) {
react {
...
}
}
}
The Actor Life Cycle
Life starts with invocation of start() method
Life ends when :
- The act() method returns
- The act() method is terminated because of an exception
- The actor calls the exit method
exit() is a protected method - must be called from inside of subclass of Actor :
val actor1 = actor {
while (true) {
receive {
case "Hi" => println("Hello")
case "Bye" => exit() // or exit('normal)
} // (reason : AnyRef) is reason of exit, default value is 'normal
}
}
Linking Actors - One actor is notified when other terminates.
def act() {
link(masterActor)
...
}
link() method is bidirectional. if A dies, B should know about it. And the other way around.
(Supervisor actor should know if worker dies - to reassign the work)
(Worker actor should know if supervisor dies - to stop working)
link() method is NOT symmetrical - I cannot replace 'link(worker)' with 'worker.link(self)'
By default, an actor terminates when a linked (master) actor exits with a reason other than 'normal
Example of Supervisor actor which deals with failing workers (failing is normal):
override def act() {
trapExit = true // it's a property of actor, means that whenever a linked actor terminates(normally or abnormally)
link(workerActor) // we will receive an Exit message (with reason)
while (...) {
receive {
...
case Exit(linked, UncaughtException(_, _, _, _, cause)) => ...
case Exit(linked, reason) => ...
}
}
}
So with trapExit=true the current actor will receive an Exit message when workerActor terminates.
(assuming that workerActor does not terminate before invocation of link)
Designing with Actors
Minimize replies to the sender. Actors are not intended to be remote procedure calls.
Your work should be distributed so that it flows through a network of actors
that compute parts of an answer, then send them on to other actors that combine the parts.
Avoid synchronous calls. They block, and may well lead to deadlocks.
Use react (not receive) when you can. Actors that use react can share threads.
You can always use react when the message handlers carry out a task and then exit.
Establish failure zones as it is OK for an actor to fail.