Updating values - hazelcast/hazelcast-scala GitHub Wiki
Hazelcast supports a number of ways to update existing entries in an IMap
. The most naive approach, is to simply get
an entry, update the value, and set
it back:
/** Increment and return number. */
def updateNumber(key: String, delta: Int): Option[Int] = {
Option(map.get(key)) map { current =>
val updated = current.copy(number = current.number + delta)
map.set(key, updated)
updated.number
}
}
This obviously isn't thread-safe, so we could fix that by using the java.util.concurrent.ConcurrentMap
methods and a CAS loop (recursive):
def updateNumber(key: String, delta: Int): Option[Int] = {
Option(map.get(key)) map { current =>
val updated = current.copy(number = current.number + delta)
val success = map.replace(key, current, updated)
if (success) updated.number
else updateNumber(key, delta)
}
}
This works AND is thread-safe. However, it still consists of moving the entire object across the network at least three times, once on get
and at least twice on replace
(old and new both must be passed). If the objects are small and the updates infrequent, this may not be an issue.
But then again, why go through all that when we have a much better way that is both lock and CAS free. The Scala API for Hazelcast supports the following delta update functions:
update
upsert
upsertAndGet
updateAndGet
updateAndGetIf
updateIf
getAndUpsert
getAndUpdate
getAndUpdateIf
This can simplify our code to this:
def updateNumber(key: String, delta: Int): Option[Int] = {
map.updateAndGet(key) { current =>
current.copy(number = current.number + delta)
} map (_.number)
}
Thread-safe and no CAS retries as the update happens on the partition thread, ensuring serial access. Network cost is the delta value itself, and the full object going back. If using update
, nothing would be returned.
This is still inefficient, because we're clearly not interested in the full value, only the updated number. In that case, let's grab the more flexible execute
method, which gives us much more freedom. Using execute
is generally a batch operation, and will go through all entries that match the filter. However if the filter is a single distinct key, you will always get a single callback of Entry[K, Option[V]]
, where the optional value indicates existence of the entry.
So, in this example, it would look like this:
def updateNumber(key: String, delta: Int): Option[Int] = {
map.execute(OnKey(key)) { entry =>
entry.value = entry.value map { current => current.copy(number = current.number + delta) }
entry.value map (_.number)
}
}
This is slightly more verbose than the last example, but it achieves our purpose, namely only passing Int
values over the network, first the delta, going to the partition, and then the updated number returned.
Are we not interested in any return value, it becomes much simpler and we can just use the update
method:
def updateNumber(key: String, delta: Int): Boolean = {
map.update(key) { current =>
current.copy(number = current.number + delta)
}
}
Non-blocking
All the examples shown use the blocking API. To use a non-blocking approach replace map
with map.async
, e.g.
def updateNumber(key: String, delta: Int): Future[Boolean] = {
map.async.update(key) { current =>
current.copy(number = current.number + delta)
}
}
Conditional updates
Sometimes we only want to update, if a given condition is met. And often that condition depends on the data we want to update. Does that mean we are back to using CAS replacement?
/** Update number, if status is active. */
def updateNumber(key: String, delta: Int): Future[Boolean] = {
map.async.get(key) flatMap {
case None => Future successful false
case Some(current) =>
if (current.isActive) {
val updated = current.copy(number = current.number + delta)
val success = map.replace(key, current, updated)
if (success) Future successful true
else updateNumber(key, delta)
} else {
Future successful false
}
}
}
No, we can use one of the three conditional update methods updateIf
, updateAndGetIf
, or getAndUpdateIf
:
def updateNumber(key: String, delta: Int): Future[Boolean] = {
map.async.updateIf(_.isActive, key) { active =>
active.copy(number = active.number + delta)
}
}