Simple NoLog String Database - laforge49/Asynchronous-Functional-Programming GitHub Wiki

This is the first in a series of databases. The SimpleNoLogStringDataStore is only a starting point, and is rather simplistic. It consists of a single Block and, if any corruption occurs, the database is reinitialized. And there is no log file for recovery. It is however fully transactional, supports multiple queries at the same time and is largely synchronous--so it should be quite fast. (Everything uses the same mailbox except for disk I/O.)

Our approach this time will be top-down, beginning with the test code. Hopefully this will help keep us from getting lost in the complexities while providing the context for each successive class.

##SimpleNoLogStringDataStoreTest

val systemServices = SystemServices(new ServicesRootComponentFactory)
val dbName = "SimpleNoLogString.db"
val file = new java.io.File(dbName)
file.delete
val properties = new Properties
properties.put("dbPathname", dbName)
val db = Subsystem(
  systemServices,
  new SimpleNoLogStringDataStoreComponentFactory,
  properties = properties,
  actorId = ActorId("db"))
val results = new Results
val chain = new Chain(results)
chain.op(systemServices, Register(db))
chain.op(db, SetRootStringRequest.process(db, "Hello world!"), "timestamp")
chain.op(db, GetRootStringRequest.process(db), "string")
Future(systemServices, chain)
println(results)
systemServices.close

The above code does the following:

  1. Create the systemServices actor using ServicesRootComponent to specify the services to be included.
  2. Delete the database file.
  3. Assign the pathname SimpleNoLogString.db to the dbPathname property.
  4. Create db, a subsystem, using SimpleNoLogStringDataStoreComponentFactory to specify the services to be included.
  5. Register the subsystem with the systemServices actor--this will cause the db file to be closed when systemServices is closed.
  6. Apply a SetRootRequest update with a value of "Hello world!" to db.
  7. Apply a GetRootRequest query to db.
  8. Print the timestamp from the update and the results from the query.
  9. Close systemServices and consequently the database file.

Output.

{string=Hello world!, timestamp=1347585882031104}

SimpleNoLogStringDataStoreTest

##ServicesRootComponentFactory

The only service we are using at the top-level is actor registration, but the ActoryRegistry needs the Instantiate service implemented by the FactoryRegistry service. We get both of these by building system services using the ServicesRootComponentFactory.

class ServicesRootComponentFactory
  extends ComponentFactory {
  addDependency(classOf[FactoryRegistryComponentFactory])
  addDependency(classOf[ActorRegistryComponentFactory])
}

ServicesRootComponentFactory

##SimpleNoLogStringDataStoreComponentFactory

The db subsystem is built using the SimpleNoLogStringDataStoreComponentFactory, which registers SetRootStringRequestFactory and GetRootStringRequestFactory as well as adding dependencies on the SimpleDataStoreComponent and NullTransactionLogComponent services.

class SimpleNoLogStringDataStoreComponentFactory
  extends ComponentFactory {
  addDependency(classOf[SimpleDataStoreComponentFactory])
  addDependency(classOf[NullTransactionLogComponentFactory])

  override def configure(compositeFactory: Factory) {
    val factoryRegistryComponentFactory =
      compositeFactory.componentFactory(classOf[FactoryRegistryComponentFactory]).
        asInstanceOf[FactoryRegistryComponentFactory]
    factoryRegistryComponentFactory.registerFactory(new SetRootStringRequestFactory)
    factoryRegistryComponentFactory.registerFactory(new GetRootStringRequestFactory)
  }
}

SimpleNoLogStringDataStoreComponentFactory

##SetRootStringRequest

We generate a message with the following code and send it to the db subsystem:

SetRootStringRequest.process(db, "Hello world!")

We should look at the code that generates this message first.

object SetRootStringRequest {
  def apply() = (new SetRootStringRequestFactory).newActor(null).
    asInstanceOf[IncDesString]

  def process(db: Actor, value: String) = {
    val je = apply()
    val chain = new Chain
    chain.op(je, Set(null, value))
    chain.op(db, TransactionRequest(je))
    chain
  }
}

The call to process does the following:

  1. Create a SetRootRequest actor, which is a composite built on IncDesString.
  2. Create a chain message which will do the following:
    1. Sets the value of the SetRootRequest to "Hello world!"
    2. Create a TransactionRequest message holding the SetRootRequest and passes that message to the db actor.
  3. The chain message is returned.

When this chain message is passed to ANY actor, the result is that the db actor processes a TransactionRequest message containing a SetRootRequest.

SetRootStringRequestFactory simply builds the composite actor from an IncDesString actor using a SetRootStringRequestComponent and UpdateRequestComponent:

class SetRootStringRequestFactory extends Factory(new FactoryId("SetRootStringRequest")) {
  override protected def instantiate = {
    val req = new IncDesString
    addComponent(new UpdateRequestComponent(req))
    addComponent(new SetRootStringRequestComponent(req))
    req
  }
}

(The UpdateRequestComponent simply returns false when it receives an IsQuery message.)

The SetRootStringRequestComponent, like all requests, handles a Process message which is sent when the request is being processed.

class SetRootStringRequestComponent(actor: Actor)
  extends Component(actor) {
  bindSafe(classOf[Process], ChainFactory(process))

  private def process(msg: AnyRef, chain: Chain) {
    val transactionContext = msg.asInstanceOf[Process].transactionContext
    chain.op(actor, Value(), "value")
    chain.op(systemServices, DbRoot(), "dbRoot")
    chain.op(Unit => chain("dbRoot"),
      MakeSet(transactionContext, INC_DES_STRING_FACTORY_ID), "incDesString")
    chain.op(Unit => chain("incDesString"), Unit => Set(transactionContext, chain("value")))
 }
}

When a Process message is received (which occurs within the context of an update transaction), a SetRootRequest does the following:

  1. It extracts the value of the IncDesString which is the actor at the heart of the SetRootRequest. In this case, the value is "Hello world!".
  2. It gets the root block of the database.
  3. It uses MakeSet to get the value of the block or, if the value is null, to create and assign a new IncDesString to the block.
  4. The IncDesString which is (presumably) the value of the block is set to "Hello world!".

Now, why are we passing an IncDesString composit2 to perform this request? Because it is serializable. This means that we can easily create a log file of all transactions and use it for recovery if/when needed.

SetRootStringRequest

##GetRootStringRequest

GetRootStringRequest is almost the same as SetRootStringRequest. The 2 major differences are:

  1. The request contains no data, so it is an IncDes composite rather than an IncDesString composite. And
  2. As a query, GetRootStringRequest includes the QueryRequestComponent rather than the UpdateRequestComponent.

Generation of the request is done as follows:

GetRootStringRequest.process(db)

And here's the code:

object GetRootStringRequest {
  def apply() = (new GetRootStringRequestFactory).newActor(null).
    asInstanceOf[IncDes]

  def process(db: Actor) = {
    val je = apply()
    val chain = new Chain
    chain.op(db, TransactionRequest(je))
    chain
  }
}

class GetRootStringRequestFactory extends Factory(new FactoryId("GetRootStringRequest")) {
  override protected def instantiate = {
    val req = new IncDes
    addComponent(new QueryRequestComponent(req))
    addComponent(new GetRootStringRequestComponent(req))
    req
  }
}

class GetRootStringRequestComponent(actor: Actor)
  extends Component(actor) {
  bindSafe(classOf[Process], ChainFactory(process))

  private def process(msg: AnyRef, chain: Chain) {
    val transactionContext = msg.asInstanceOf[Process].transactionContext
    chain.op(systemServices, DbRoot(), "dbRoot")
    chain.op(Unit => chain("dbRoot"), Value(), "incDesString")
    chain.op(Unit => chain("incDesString"), Value(), "value")
 }
}

GetRootStringRequest

##SimpleDataStoreComponent

The SimpleDataStoreComponent implements our first database, a somewhat simplistic data store. It runs under the TransactionProcessorComponent, which provides the transactional context and which accepts transaction requests in the form of IncDes actors.

The SimpleDataStoreComponent has two dependencies: the TransactionProcessorComponent and the RandomIOComponent.

class SimpleDataStoreComponentFactory extends ComponentFactory {
  addDependency(classOf[TransactionProcessorComponentFactory])
  addDependency(classOf[RandomIOComponentFactory])

  override def instantiate(actor: Actor) = new SimpleDataStoreComponent(actor)
}

The SimpleDataStoreComponent services 4 types of messages:

case class Commit(update: Block)
case class Abort(exception: Exception)
case class DbRoot()
case class DirtyBlock(block: Block)

The first 2 types of messages are sourced by the TransactionProcessorComponentFactory, the 3rd is sourced by transaction requests, e.g. SetRootStringRequest and GetRootStringRequest and the 4th by the root Block of the database.

  • Commit - The processing of a request is complete and successful.
  • Abort - An exception occurred while processing a request.
  • DbRoot - A request for a reference to the root Block of the database.
  • DirtyBlock - A block in the database has been updated.

Time to dig in...

class SimpleDataStoreComponent(actor: Actor)
  extends Component(actor) {
  var dirty = false
  var block: Block = null
  ...
}

Variable usage:

  • The dirty flag is used to indicate when the root Block of the database has been updated. This effects the behavior of commit and abort.
  • The block variable holds a reference to the root block of the database, if present in memory.

Message Bindings:

bind(classOf[Commit], commit)
bind(classOf[Abort], abort)
bind(classOf[DirtyBlock], dirtyBlock)
bind(classOf[DbRoot], {
  (msg, rf) => exceptionHandler(msg, rf, dbRoot) {
    ex => {
      init(rf)
    }
  }
})

Binding of message types to methods is straight forward except for DbRoot. Here we make a naive assumption that if an error occurs when accessing the root Block, then the database has not been initialized. The downside here is if a real error occurs, the database gets reinitialized and the next query returns a null value. --Not the most robust approach, but we will do much better going forward.

The commit method is called when a transaction request is complete. This method does nothing when the root Block is not dirty (has not been updated).

private def commit(msg: AnyRef, rf: Any => Unit) {
  if (!dirty) {
    rf(null)
    return
  }
  val blockLength = IncDesInt(null)
  val results = new Results
  val chain = new Chain(results)
  chain.op(block, Bytes(), "bytes")
  chain.op(blockLength,
    Unit => {
      val blkLen = results("bytes").asInstanceOf[Array[Byte]].length
      Set(null, blkLen)
    })
  chain.op(blockLength, Bytes(), "header")
  chain.op(systemServices,
    Unit => WriteBytes(0L, results("header").asInstanceOf[Array[Byte]]))
  chain.op(systemServices,
    Unit => WriteBytes(4L, results("bytes").asInstanceOf[Array[Byte]]))
  chain.op(block, Unit => {
    dirty = false
    Clean()
  })
  actor(chain)(rf)
}

The above code simply writes the root Block (if it was marked dirty) and clears the dirty flags.

The abort method is called when an exception occurred while processing a transaction request. It simply clears the block reference if the block is dirty before re-throwing the exception.

private def abort(msg: AnyRef, rf: Any => Unit) {
  if (dirty) {
    block = null
    dirty = false
  }
  throw msg.asInstanceOf[Abort].exception
}

The dbRoot method is called when a transaction requests sending a DbRoot message. This method returns the root Block when present and otherwise reads the block.

private def dbRoot(msg: AnyRef, rf: Any => Unit) {
  if (block != null) {
    rf(block)
    return
  }
  val blockLength = IncDesInt(null)
  val results = new Results
  val chain = new Chain(results)
  chain.op(systemServices, ReadBytes(0L, 4), "header")
  chain.op(blockLength,
    Unit => {
      blockLength.load(results("header").asInstanceOf[Array[Byte]])
      Value()
    }, "length")
  chain.op(systemServices,
    Unit => {
      val blkLen = results("length").asInstanceOf[Int]
      ReadBytes(4L, blkLen)
    }, "bytes")
  actor(chain) {
    rsp => {
      if (block == null) {
        val bytes = results("bytes").asInstanceOf[Array[Byte]]
        block = Block(mailbox)
        block.setSystemServices(systemServices)
        block.load(bytes)
      }
      rf(block)
    }
  }
}

The above code does handle one race condition--it ignores the results of the read if the root block is already present.

The init method is called when the dbRoot block throws an exception. It simply creates an empty root block.

private def init(rf: Any => Unit) {
  block = Block(mailbox)
  block.setSystemServices(systemServices)
  rf(block)
}

SimpleDataStoreComponent


tutorial