Aggregations - hazelcast/hazelcast-scala GitHub Wiki
The Hazelcast IMap
is a partitioned (sharded) KV store. The Scala API adds support for distributed parallel aggregations, with the same syntax as the native Scala collections, except for the return type, which is a Future
, making it fully non-blocking.
Let's say we have a collection of events, with timestamps, and we'd like to count events by month:
val events: IMap[Key, Event] = ???
val countByMonth: Future[Map[Month, Int]] =
events.map(_.value).aggregate(Map.empty[Month, Int])({
// seqop:
case (byMonth, event) =>
val month = event.ts.getMonth
val count = byMonth.getOrElse(month, 0) + 1
byMonth.updated(month, count)
}, {
// compop:
case (byMonthX, byMonthY) =>
val months = (byMonthX.keySet ++ byMonthY.keySet)
months.toSeq.map(_ -> 0).toMap.transform {
case (month, _) =>
byMonthX.getOrElse(month, 0) + byMonthY.getOrElse(month, 0)
}
})
This is virtually identical to Scala's parallel collections, but scales horizontally.
We can make the above aggregation a lot cleaner by leveraging the built-in groupBy
function, instead making it much simpler:
events.map(_.value).groupBy(_.ts.getMonth).aggregate(0)({
// seqop:
case (count, _) => count + 1
}, {
// combop:
case (countX, countY) => countX + countY
})
It's also possible to aggregate a result into another IMap
, which is useful for simply storing the result, but also to keep an aggregation constant for use in multiple derivative aggregations:
val byMonth: IMap[Month, Int] = ???
events.map(_.value).groupBy(_.ts.getMonth).aggregateInto(byMonth)(0)({
// seqop:
case (count, _) => count + 1
}, {
// combop:
case (countX, countY) => countX + countY
})
Built-in aggregations
There are a number of built-in aggregations, which are accessible depending on the type. To run the above count
aggregation, we can simply do this:
val countByMonth: Future[Map[Month, Int]] =
events.map(_.value).groupBy(_.ts.getMonth).count()