FileLoader - laforge49/Asynchronous-Functional-Programming GitHub Wiki

A file loader service can be helpful when you need to read short files. The problem is that I/O blocks, so if too many actors are doing I/O a lot of threads can be blocked. And threads are expensive, as they have a large memory footprint. Here's the message for requesting a file load:

case class LoadFile(file: File)

When a file is loaded, the response is an Array[Byte]. The logic for requesting a file load is straight forward.

systemServices(LoadFile(file)) { ... }

To test this we will create an actor, ShortFileLoader, which loads a file when it receives a DoIt message.

case class DoIt()
class ShortFileLoader extends Actor {
  setMailbox(new ReactorMailbox)
  bind(classOf[DoIt], doit)
  def doit(msg: AnyRef, rf: Any => Unit) {
    val cwd = new File(".")
    println("cwd = " + cwd.getCanonicalPath)
    val file = new File("aShortTestFile.txt")
    println("test file exists = " + file.exists)
    systemServices(LoadFile(file))(rf)
  }
}

We will need to create a SystemService and then inject it into the ShortFileLoader object.

val systemServices = SystemServices(new FileLoaderComponentFactory)
val shortFileLoader = new ShortFileLoader
shortFileLoader.setSystemServices(systemServices)
val bytes = Future(shortFileLoader, DoIt()).asInstanceOf[Array[Byte]]
val bais = new ByteArrayInputStream(bytes)
val isr = new InputStreamReader(bais)
val br = new BufferedReader(isr)
val line = br.readLine
println(line)

Output.

cwd = C:\aw\Mecurial\AgileWiki5\Blip
test file exists = true
A short test file.

FileLoaderTest

Because loading files blocks the thread, we need to implement it in a separate actor with its own mailbox.

class FileLoader
  extends Actor {
  setMailbox(new AsyncReactorMailbox)
  bind(classOf[LoadFile], loadFile)

  def loadFile(msg: AnyRef, rf: Any => Unit) {
    val file = msg.asInstanceOf[LoadFile].file
    val size = file.length
    val fis = new FileInputStream(file)
    val dis = new DataInputStream(fis)
    val bytes = new Array[Byte](size.asInstanceOf[Int])
    dis.readFully(bytes)
    rf(bytes)
  }
}

To invoke the LoadFile service, we will use a SafeForward object. This way the file load is invoked directly from the requesting actor's context, rather than suffering the overhead of going through the SystemServices actor.

class SafeForward(actor: Actor)
  extends Safe {
  def func(msg: AnyRef, rf: Any => Unit)(implicit sender: ActiveActor) {
    actor(msg)(rf)
  }
}

SafeForward is bound to a component.

class FileLoaderComponent(actor: Actor)
  extends Component(actor) {
  bindSafe(classOf[LoadFile], new SafeForward(new FileLoader))
}

Finally, we need a ComponentFactory which we can use when configuring the SystemServices actor and which will be used to add the FileLoaderComponent to the SystemServices composite.

class FileLoaderComponentFactory extends ComponentFactory {
  override def instantiate(actor: Actor) = new FileLoaderComponent(actor)
}

FileLoader

Tutorial