SmallRecordsDataStore - laforge49/Asynchronous-Functional-Programming GitHub Wiki
SmallRecordsDataStore is a collection of records whose keys are strings. Record is a subclass of IncDesIncDes with the addition of a persistent (Long) timestamp, where the timestamp is the timestamp of the last transaction which created the record or updated its content. This timestamp is used to implement opportunistic locking.
Opportunistic locking works best when conflicts between updates are rare, which means either a lightly loaded system or a database divided into enough records that any two transactions are unlikely to conflict. A conflict only occurs when another transaction updates the database between the time of a database query and the time an update based on that query is processed. Conflicts are detected by recording the timestamp of the last update for every record queried and then validating those timestamps within the context of the transaction which performs the update.
SmallRecordsDataStore supports some of the operations which were supported by the SmallDataStore:
- GetRequest returns the content selected by an absolute pathname.
- SizeRequest returns the size of a collection selected by an absolute pathname.
- DbIntSeq[V], DbLongSeq[V] and DbStringSeq[V] sequence over a collection selected by an absolute pathname.
Several new operations are also supported:
- RecordGet uses a pathname to return a copy of the selected IncDes from a given record.
- RecordExists is similar to RecordGet, except that it returns true if the selected IncDes item is present and false if not.
- RecordsCount returns the number of records.
- RecordSize returns the size of a collection in a record.
- RecordsSeq is a sequence over all the records in the datastore.
- RecordIntSeq[V], RecordLongSeq[V] and RecordStringSeq[V] sequence over a collection in a record, where the collection is selected by a pathname relative to the record.
- Batch executes a series of actions as a single transaction. There are several actions supported by batch:
- NewRecord creates a record with a given key.
- DeleteRecord deletes a record with the given key.
- RecordUpdate updates the contents of a record with the given key, with a relative pathname used to specify the location of the content.
- RecordLock queries the datastore for the timestamp of a given record and records the results in batch. (A -1L is recorded if the record is not present.) When the batch is subsequently processed, it aborts with a TransactionConflictException if any of the recorded timestamps have changed. A TransactionConflictException simply indicates that there was a conflicting update that was processed between the time RecordLock was executed and the time the batch was processed. Corrective action is to re-execute the RecordLock(s), the queries used to rebuild the batch, rebuild the batch and reprocess. This corrective action can be done at the discretion of the end user or redone automatically, but there is no guarantee of success while the system is under load. For this reason it is best to introduce an ever-increasing delay when automatically retrying.
RecordGet, RecordExists and RecordSize all call RecordLock when a batch actor is passed in place of null as the second parameter.
##create an empty small records datastore
Test code.
val systemServices = SystemServices(new ServicesRootComponentFactory)
val dbName = "smallRecords.db"
val logDirPathname = "smallRecords"
val file = new java.io.File(dbName)
file.delete
EmptyLogDirectory(logDirPathname)
val properties = new Properties
properties.put("dbPathname", dbName)
properties.put("logDirPathname", logDirPathname)
val db = Subsystem(
systemServices,
new SmallRecordsComponentFactory,
properties = properties,
actorId = ActorId("db"))
val chain = new Chain
chain.op(systemServices, Register(db))
Future(systemServices, chain)
systemServices.close
The SmallRecordsComponentFactory is used above to define a small datastore that supports Batch updates and opportunistic locks.
##process an empty batch
Test code.
val systemServices = SystemServices(new ServicesRootComponentFactory)
val dbName = "smallRecords.db"
val logDirPathname = "smallRecords"
val properties = new Properties
properties.put("dbPathname", dbName)
properties.put("logDirPathname", logDirPathname)
val db = Subsystem(
systemServices,
new SmallRecordsComponentFactory,
properties = properties,
actorId = ActorId("db"))
val batch = Batch(db)
val chain = new Chain
chain.op(systemServices, Register(db))
chain.op(db, TransactionRequest(batch))
println(Future(systemServices, chain))
systemServices.close
Unlike the first test we do not delete the database, nor do we clear the log directory. We do however execute (an empty) batch transaction and print its timestamp.
Output.
1350027673046016
##Create two records
Test code.
val batch = Batch(db)
val chain = new Chain
chain.op(systemServices, Register(db))
chain.op(db, NewRecord(batch, "fun"))
chain.op(db, NewRecord(batch, "games"))
chain.op(db, TransactionRequest(batch), "timestamp")
chain.op(db, RecordsCount(db), "records count")
chain.op(db, RecordGet(db, null, "fun", ""), "fun")
chain.op(db, RecordGet(db, null, "games", ""), "games")
Future(systemServices, chain)
println(chain.results)
In this test we added two actions to the batch to create tow new records, fun and games. We then get the number of records in the database and get both records as well before printing the results.
The last parameter passed to GetRecord is the pathname of the content to be retrieved from the record, where an empty string gives you the record itself.
Output.
{fun=org.agilewiki.incDes.records.Record@58f0fa12,
games=org.agilewiki.incDes.records.Record@34b1e15c,
records count=2,
timestamp=1350027673067520}
##Update records
Test code.
val funContent = IncDesInt(null)
val gamesContent = IncDesString(null)
val batch = Batch(db)
val chain = new Chain
chain.op(systemServices, Register(db))
chain.op(funContent, Set(null, 42))
chain.op(gamesContent, Set(null, "Checkers"))
chain.op(db, RecordUpdate(batch, "fun", "$", funContent))
chain.op(db, RecordUpdate(batch, "games", "$", gamesContent))
chain.op(db, TransactionRequest(batch), "timestamp")
chain.op(db, RecordsCount(db), "records count")
chain.op(db, RecordGet(db, null, "fun", "$"), "funContent")
chain.op(Unit => chain("funContent"), Value(), "funValue")
chain.op(db, RecordGet(db, null, "games", "$"), "gamesContent")
chain.op(Unit => chain("gamesContent"), Value(), "gamesValue")
Future(systemServices, chain)
println(chain.results)
In this test we use RecordUpdate to set the contents of fun to an IncDesInt with a value of 42 and to set the contents of games to an IncDesString with a value of "Checkers". We then use RecordGet with a pathname of $ to fetch the contents of fun and games.
Output.
{funContent=org.agilewiki.incDes.IncDesInt@ec30f48,
funValue=42,
gamesContent=org.agilewiki.incDes.IncDesString@2820478a,
gamesValue=Checkers, records count=2,
timestamp=1350027673115648}
##RecordIntSeq
Test code.
val gamesContent = IncDesIntIncDesMap(null, db)
val gamesSeq = new RecordIntSeq(db, "games", "$")
val batch = Batch(db)
val chain = new Chain
chain.op(systemServices, Register(db))
chain.op(db, NewRecord(batch, "games"))
chain.op(gamesContent, PutString(null, 1, "Chess"))
chain.op(gamesContent, PutString(null, 2, "Checkers"))
chain.op(gamesContent, PutString(null, 3, "Spider"))
chain.op(db, RecordUpdate(batch, "games", "$", gamesContent))
chain.op(db, TransactionRequest(batch))
chain.op(db, RecordSize(db, null, "games", "$"), "gamesSize")
chain.op(db, RecordSize(db, null, "games", "$/4"), "unknownSize")
chain.op(gamesSeq, LoopSafe(PrintIntStringMap()))
Future(systemServices, chain)
println(chain.results)
case class PrintIntStringMap() extends Safe {
override def func(target: Actor, msg: AnyRef, rf: Any => Unit)(implicit sender: ActiveActor) {
val nvPair = msg.asInstanceOf[KVPair[Int, IncDesIncDes]]
nvPair.value(Value()) {
rsp => {
rsp.asInstanceOf[Actor](Value()) {
rsp2 => {
println(nvPair.key + " -> " + rsp2)
rf(true)
}
}
}
}
}
}
In this test we create an IncDesIntIncDesMap with three strings and then use RecordUpdate to update the contents of the games record. We then query the size of the content of the games record and the size of a non-existent collection under games using RecordSize. After that we fetch all the items in the collection held by the games record using RecordIntSeq and print the keys and values.
Output.
fun->org.agilewiki.incDes.records.Record@100e6a4b
games->org.agilewiki.incDes.records.Record@5da1a47
1 -> Chess
2 -> Checkers
3 -> Spider
{gamesSize=3,
unknownSize=null}
##Delete record
Test code.
val batch = Batch(db)
val chain = new Chain
chain.op(systemServices, Register(db))
chain.op(db, DeleteRecord(batch, "fun"))
chain.op(db, TransactionRequest(batch), "timestamp")
chain.op(db, RecordsCount(db), "record count")
chain.op(db, RecordExists(db, null, "fun", ""), "funExists")
chain.op(db, RecordExists(db, null, "games", ""), "gamesExists")
Future(systemServices, chain)
println(chain.results)
Here we delete the fun record with the DeleteRecord action, get the number of records using RecordsCount and then use RecordExists to see if the fun and games records exist. When calling RecordExists use use a pathname of "" to check for the presence of the records rather than checking for the presence of some item in their content.
Output.
{funExists=false,
gamesExists=true,
record count=1,
timestamp=1350027673198592}
##recover
Test code.
val systemServices = SystemServices(new ServicesRootComponentFactory)
val dbName = "smallRecords.db"
val logDirPathname = "smallRecords"
val file = new java.io.File(dbName)
file.delete
val properties = new Properties
properties.put("dbPathname", dbName)
properties.put("logDirPathname", logDirPathname)
val db = Subsystem(
systemServices,
new SmallRecordsRecoveryComponentFactory,
properties = properties,
actorId = ActorId("db"))
val chain = new Chain
chain.op(systemServices, Register(db))
chain.op(db, Recover())
Future(systemServices, chain)
systemServices.close
SmallRecordsRecoveryComponentFactory is used to recover a small records datastore.
Output.
recovering smallRecords\2011-10-12_02-26-39_452.jnl
recovering smallRecords\2011-10-12_02-26-39_472.jnl
recovering smallRecords\2011-10-12_02-26-39_518.jnl
recovering smallRecords\2011-10-12_02-26-39_575.jnl
recovering smallRecords\2011-10-12_02-26-39_604.jnl
##Check recovery
Test code.
val gamesSeq = new RecordIntSeq(db, "games", "$")
val chain = new Chain
chain.op(systemServices, Register(db))
chain.op(db, RecordsCount(db), "record count")
chain.op(db, RecordExists(db, null, "fun", ""), "funExists")
chain.op(db, RecordExists(db, null, "games", ""), "gamesExists")
chain.op(db, RecordSize(db, null, "games", "$"), "gamesSize")
chain.op(gamesSeq, LoopSafe(PrintIntStringMap()))
Future(systemServices, chain)
println(chain.results)
To validate the recovery we use RecordCount to get the number of records, RecordExists to see if the fun and games records are present, RecordSize to get the size of the collection held by the games record and then RecordIntSeq to fetch the contents of the collection held by the games record and print the keys and values.
Output.
1 -> Chess
2 -> Checkers
3 -> Spider
{funExists=false,
gamesExists=true,
gamesSize=3,
record count=1}