RecordLocks - laforge49/Asynchronous-Functional-Programming GitHub Wiki
Unlike queries, updates can only be processed one-at-a-time. Worse, all queries are blocked while an update is being processed. So it is important to keep updates short and sweet. Ideally, you want to query the database, determine exactly what change is to be made, and then update the database. Oops! What happens if someone else is updating the same record? This is why locks are important.
Opportunistic locking is very light-weight. There is no mandated locking order, and no possibility of deadlocks. The downside is that you need to break your database into smaller parts, which we call records, and design your applications so that most updates are not on the same small set of records. Otherwise you will have a lot of conflicts, and that means the updates will frequently need to be reprocessed--sometimes more than once.
We will illustrate the use of record locks with a very simple application which simply adds to a counter in the database.
In Counter.init we did not call NewRecord before UpdateRecord because UpdateRecord creates the record if it is not present... providing we are updating the record itself. I.E. UpdateRecord only creates a record if the pathname is $.
case class Counter(db: Actor) {
def init = {
val counter = IncDesInt(null)
val batch = Batch(db)
val chain = new Chain
chain.op(db, RecordUpdate(batch, "r1", "$", counter))
chain.op(db, TransactionRequest(batch))
chain
}
def get = {
val chain = new Chain
chain.op(db, RecordGet(db, null, "r1", "$"), "counter")
chain.op(Unit => chain("counter").asInstanceOf[IncDesInt], Value())
chain
}
def buildAdd(n: Int) = {
var counter: IncDesInt = null
val batch = Batch(db)
val chain = new Chain
chain.op(db, RecordGet(db, batch, "r1", "$"), "counter")
chain.op(Unit => {
counter = chain("counter").asInstanceOf[IncDesInt]
counter
}, Value(), "value")
chain.op(Unit => counter, Unit => {
val value = chain("value").asInstanceOf[Int]
Set(null, value + n)
})
chain.op(db, Unit => RecordUpdate(batch, "r1", "$", counter))
(chain, TransactionRequest(batch))
}
}
The Counter application has 3 methods:
- init - for initializing the counter record (r1) in the database.
- get - for doing queries on the counter. And
- buildAdd(n: Int) - for adding n to the counter.
The init and get methods return an operation which is subsequently applied to the database to perform the actual operation. buildAdd returns a 2-tuple with two such values, where the first constructs an update operation and the second invokes it. (We've broken the operation into two parts to more easily demonstrate conflicts.)
In the buildAdd method, the second parameter passed to RecordGet is the batch instead of null. RecordGet consequently adds a RecordLock action to batch with a record key of r1 and the timestamp of the last update made to that record. When batch is subsequently executed, it will fail if there has been an intervening update to record r1.
The first test will do the following:
- Initialize the counter. (The default value is 0.)
- Query the counter value.
- Build the update to add 1 to the counter.
- Process the update.
- Query the counter.
Test code.
val chain = new Chain
chain.op(systemServices, Register(db))
chain.op(db, Counter(db).init)
chain.op(db, Counter(db).get, "oldCounterValue")
val (b1, t1) = Counter(db).buildAdd(1)
chain.op(db, b1)
chain.op(db, t1)
chain.op(db, Counter(db).get, "newCounterValue")
Future(systemServices, chain)
println(chain.results)
Output.
{newCounterValue=1, oldCounterValue=0}
OK, now what about conflicts? We will create a conflict by building two updates and then processing them. The second update fails with a conflict. To handle the conflict, we just rebuild the failed update and reprocess it.
Test code.
val chain = new Chain
chain.op(systemServices, Register(db))
chain.op(db, Counter(db).get, "oldCounterValue")
val (b1, t1) = Counter(db).buildAdd(1)
val (b2, t2) = Counter(db).buildAdd(1)
chain.op(db, b1)
chain.op(db, b2)
chain.op(db, t1)
Future(systemServices, chain)
println(chain.results)
try {
println(Future(db, t2))
println("can't get here")
} catch {
case ex: TransactionConflictException => {
ex.printStackTrace
println("retrying")
val retry = new Chain
val (b3, t3) = Counter(db).buildAdd(1)
retry.op(db, b3)
retry.op(db, t3)
Future(db, retry)
println("retry was successful")
}
case ex: Exception => throw ex
}
println(Future(db, Counter(db).get))
Output.
{oldCounterValue=1}
org.agilewiki.db.transactions.batch.TransactionConflictException: r1
retrying
retry was successful
3