Working With Key Value Data - ignacio-alorre/Spark GitHub Wiki
Spark relies heavily on the key/value pair paradigm to define and parallelize operations, particularly wide transformations which require data to be redistributed between machines.
Spark has its own PairRDDFunctions
class containing operations defined on RDDs of tuples. The PairRDDFunctions
class, made available through implicit conversion, contains most of Spark's methods for joins, and custom aggregations.
The OrderedRDDFunctions
class contains the method for sorting
Despite their utility, ke/value operations can lead to a number of performance issues. Most expensive operations in Spark fit into the key/value pair paradigm because most wide transformations are key/value transformations
Operations on key/value pairs can cause:
- Out-of-memory errors in the driver
- Out-of-memory errors on the executor nodes
- Shuffle failures
- "Straggler tasks" or partitions, which are especially slow to compute
First problem (memory errors in the driver) is usually caused by actions. The last three performance issues (out of memory on the executors, shuffles and straggler tasks) are all most often caused by shuffles associated with the wide transformations.
Shuffle less often
-
Techniques to minimize the number of shuffles needed to complete a complex computation. One way to minimize number of shuffles in a computation that requires several transformation is to make sure to preserve partitioning across narrow transformations to avoid reshuffling data
-
In some instances, we can use the same partitioner on sequence of wide transformations. This can be useful to avoid shuffles during joins and to reduce the number of shuffles required to compute a sequence of wide transformations.
-
Another option is to leverage custom partitioners to distribute the data most effectively for downstream computations as well as how to push computational work into the shuffle stage to make a complicated computation more efficient
Shuffle better
-
Sometimes computation cannot be completed without a shuffle.
-
By using wide transformations such as
reduceByKey
andaggregateByKey
that can perform map-side reductions and that do not require loading all the records for one key into memory. This will prevent memory errors on the executors and speed up wide transformations, particularly for aggregation operations -
Shuffling data in which records are distributed evenly throughout the keys and which contain a high number of distinct keys, prevents out-of-memory errors on the executors and "straggler task"
The Goldilocks Example
The example consist on a project with arbitrary rank statistics in high-dimensionality and high-volume data with complex key/value transformations
[img]
They want to start by designing an application that would let them input an arbitrary list of integers n1...nk and return the nth best element in each column. For example if inputs are 8, 1000, and 20million, the function should return the 8th, 1000th and 20millionth best-ranking panda for each attribute column
From the image above, lets imagine now we would like to find the 2nd and 4th element of the table, so our function would return something like:
[img]
This task is inherently expensive since it requires sorting all the values in each column in some way
Since data is columnar, we could consider Spark SQL to solve this problem. However, early Spark SQL did not have any support for rank statistics. It may be possible to write UDF/UDAF to solve the problem, but that would be complicated. Best solution is to leverage Spark Core.
Version 0: Iterative Solution
One intuitive solution would be to loop through each column, mapping each row to a single value, then use Spark's sortBy
and zipWithIndex
function on each column, and then filter for the indices that correspond to the desired rank statisks
def findRankStatistics( dataFrame: DataFrame, ranks: List[Long]): Map[Int, Iterable[Double]] = {
require(ranks.forall(_ > 0))
val numberOfColumns = dataFrame.schema.length
var i = 0
var result = Map[Int, Iterable[Double]]()
while(i < numberOfColumns){
val col = dataFrame.rdd.map(row => row.getDouble(i))
val sortedCol : RDD[(Double, Long)] = col.sortBy(v => v).zipWithIndex()
val ranksOnly = sortedCol.filter{
//rank statistics are indexed from one. e.g. first element is 0
case (colValue, index) => ranks.contains(index + 1)
}.keys
val list = ranksOnly.collect()
result += (i -> list)
i+=1
}
result
}
-
This solution is robust, but very slow since it has to sort the data once for each column iteratively. So if we have 1000 columns we have to do 1000 sorts.
-
Since each sort can be done without knowledge of the other sort, out intuition should be that it is possible to parallelize this computation using each column as the unit of parallelization.
-
We can represent the data as one long list of key/value pairs where the keys represent the column indices. Then, we can perform our computation in parallel for each key
img
- If we read in our data as a DataFrame, we can do this mapping with a simple function like this:
// Goldilocks version 1, mapping to column index/value pairs
def mapToKeyValuePairs(dataFrame: DataFrame): RDD[(Int, Double)] = {
val rowLength = dataFrame.schema.length
dataFrame.rdd.flatMap(
row => Range(0, rowLength).map(i => (i, row.getDouble(i)))
)
}
-
Spark's
flatMap
operation mimics the behavior of theflatMap
operation that is defined on iterators and collections in Scala.flatMap
is a very versatile narrow transformation, but could be a bit confusing.flatMap
lets the user define a mapping from each record to a collection of elements and then combines the resulting collections together. In this case the mapping is defined from a Spark SQL Row object to a sequence of elements, in this case(columnIndex, value)
pairs. The resulting RDD will have more records than the previous RDD, and each will be of (columnIndex, value) pairs. Increasing the total number of records is not a requirement for theflatMap
operation.flatMap
can be particularly useful because unlike map, it allows us to return an empty collection for one of the records. So the operator can be used to both filter and transform the elements in one pass. Amap
andfilter
on the sameRDD
or collection can always be combined into oneflatMap
step. -
After applying this function, we can perform this computation in parallel by column index (in this case column index is the key for each record)
How to use PairRDDFunctions and OrderedRDDFunctions
-
For
PairRDDFunctions
K and V can be of any type, but for theORderRDDFunctions
(sortByKey
,repartitionAndSortWithinPartitions
,filterByRange
) K must have some implicit ordering. Most common types, like the numeric types or string, have their ordering already defined in Scala. For custom type, you may have to define the ordering yourself. -
When writing a function that uses
ORderedRDDFunction
of generic key type, you may need to include code defining an implicit val of type ordering.
// Defining an implicit ordering to work with OrderedRDDFunctions
implicit def orderByLocationAndName[A <: PandaKey]: Ordering[A] = {
Ordering.by(pandaKey => (pandaKey.city, pandaKey.zip, pandaKey.name))
}
implicit val ordering: Ordering[(K, S)] = Ordering.Tuple2
Actions on Key/Value Pairs
-
Most key/value actions (including
countByKey
,countByValue
,lookUp
andcollectAsMap
) return data to the driver. In most instances they return unbounded data since the number of keys and the number of values are unknown. For example,countByKey
returns a data point for each key, and thus it may cause memory errors if there are more distinct key than fit in memory on the driver. -
Conversely
lookUp
returns all the values for each key, so it will cause memory problems if one key has more data than will fit in memory on the driver. ThelookUp
operation is also expensive because it triggers a shuffle if the RDD doesn't have a known partitioner -
In addition to number of records, the size of each record is an important factor in causing memory errors.
-
In general we want to design key/value problems so that the keys fit into memory on the driver. The values should be at least well distributed by key and the best distributed so that each key has no more records than can fit in memory on each executor.
-
As with all Spark programs, we should try to perform transformations that reduce the size of the data before calling actions that move results to the driver
-
Key/Value transformations can also cause memory errors, most often in the executors, if they require all the data associated with one key to be kept in memory on one partition. Avoiding memory errors and optimizing transformations for fewer shuffles is a bit more complicated than avoiding problems with actions.
What's so Dangerous About the groupByKey Function
- There is a big warning against the scalability of the
groupByKey
function, which returns an iterator of each element by key. This section attempts to explain the cases in whichgroupByKey
causes problems at scale.
Goldilocks Version 1: groupByKey Solution
-
One simple solution to the Goldilocks problem is to use
groupByKey
to group the element in each column.GroupByKey
returns an iterator of elements by each key, so to sort the elements by key we have to convert the iterator to an array and then sort the array. After converting the iterator to an array, we can sort the array and filter for the elements that correspond to our rank statistics. -
Below there is an implementation of the groupByKey solution. For consistency, this function also takes a DataFrame and a list of element positions as long values. It calls the function that creates key/value pair that we describe in Goldilocks version 1, mapping to column index/value pairs
// Goldilocks version 1, GroupByKey solution
def findRankStatistics( dataFrame: DataFrame, ranks: List[Long]): Map[Int, Iterable[Double]] = {
require(ranks.forall(_ > 0))
//Map to column index, value pairs
val pairRDD: RDD[(Int, Double)] = mapToKeyValuePairs(dataFrame)
val groupColumns: RDD[(Int, Iterable[Double])] = pairRDD.groupByKey()
groupColumns.mapValues(
iter => {
//convert to an array and sort
val sortedIter = iter.toArray.sorted
sortedIter.toIterable.zipWithIndex.flatMap({
case (colValue, index) =>
if (ranks.contains(index + 1)) {
Iterator(colValue)
} else {
Iterator.empty
}
})
}
).collectAsMap()
}
def findRankStatistics(pairRDD: RDD[(Int, Double)], ranks: List[Long]): Map[Int, Iterable[Double]] = {
assert(ranks.forall(_ > 0))
pairRDD.groupByKey().mapValues(iter => {
val sortedIter = iter.toArray.sorted
sortedIter.zipWithIndex.flatMap(
{
case (colValue, index) =>
if (ranks.contains(index + 1)) {
//this is one of the desired rank statistics
Iterator(colValue)
} else {
Iterator.empty
}
}
).toIterable //convert to more generic iterable type to match out spec
}).collectAsMap()
}
-
This solution is easy to understand. It leverages out-of-the-box Spark and Scala functions and so it introduces few edge cases and is relatively easy to test. On small data it is actually relatively efficient because it only requires one shuffle in the
groupByKey
step and because the sorting step can be computed as a narrow transformation on the executors -
In this solution we use
collectAsMap
which can have the same issues ascollect
due to the memory errors. So for small datasets it may work, but when we have million of rows, this solution will fail with memory exceptions
Why GroupByKey fails
-
The reason is that the groups created by
groupByKey
are always iterators, which can't be distributed. This causes an expensive shuffled read step in which Spark has to read all of the shuffled data from disk into memory -
As a consequence of partitioning by the hash value of the keys and pulling the results into memory to group as iterators,
groupByKey
often leads to out-of-memory errors on the executors if there are many duplicate records per key -
Each record whose key has the same hash value must live in memory on a single machine. Thus, if just one of your keys contains too many records to fit in memory on one executor, the entire operation will fail
-
This issue can be illustrated with the image below, where we can see more records corresponding to the 94110 zip code. So even if the records fit on the executores when evenly distributed, all the records associated with the 94110 zip code will not fit on a single executor after the
groupByKey
step
img
-
It is better to choose aggregation operations that can do some map-side reduction to decrease the number of records by key before shuffling (e.g.
aggregateByKey
orreduceByKey
). If this is not possible, using a wide transformation that does not require all the values associated with one key to be kept in-memory. -
If you must use
groupByKey
it is best if the next operation is an iterator-to-iterator transformation
Choosing an Aggregation Operation
Shuffling the records to combine those with the same key is a common use case for key/value Spark operations, and Spark provides a number of such aggregation operations. Most of them are built atop the generic combineByKey operation, but they differ widely in performance
Dictionary of Aggregation Operations with Performance Considerations
Aggregation operations have specific performance considerations
img
To avoid memory allocation in aggregateByKey
, modify the accumulator rather than return a new one
Preventing out-of-memory errors with aggregation operations
-
combineByKey
and all of the aggregation operators built on top of it (reduceByKey
,foldLeft
,foldRight
,aggregateByKey
) are no better thangroupByKey
in terms of memory errors if they cause the accumulator to become too large for one key. In fact,groupByKey
is actually implemented usingcombineByKey
where the accumulator is an iterator with all the data. So the accumulator is the size of all the data for that key. -
These operations are unlikely to cause memory errors as long as the combining steps make the data smaller. However, if the accumulators gets larger with the addition of each new record, it will eventually cause memory errors if there are many records associated with one key
-
Beyond being less likely to run out of memory than
groupByKey
, the following four functions -reduceByKey
,treeAggregate
,aggregateByKey
andfoldByKey
- are implemented to use map-side combinations, meaning that records with the same key are combined before they are shuffled. -
This can greatly reduce the shuffle read. Below there is a comparison between the DAG for
groupByKey
andreduceByKey
img
img
Multiple RDD Operations
Some transformations can operate on multiple RDD inputs. The most obvious of these are join type operations.
Co-Grouping
-
Much in the same way all of the accumulator operations (
reduceByKey
,aggregateByKey
,foldByKey
) are implemented usingcombineByKey
, all of the join operations are implemented using thecogroup
function, which uses theCoGroupedRDD
type. ACoGroupedRDD
is created from a sequence of key/value RDDs, each with the same key type. -
cogroup
shuffles each of the RDDs so that the items with the same value from each of the RDDs will end up on the same partition and into a single RDD by key.
// Corgoup example
val cogroupedRDD: RDD[(Long, (Iterable[Double], Iterable[String]))] = scoreRDD.cogroup(foodRDD)
-
cogroup
can be useful as an alternative to join when joining with multiple RDDs. Rather than doing joins on multiple RDDs with one RDD it is more performant to co-partition the RDDs since that will prevent Spark from shuffling the RDD being repeatedly joined. -
If we needed to join the panda score data with both address and favorite foods, it would be better to use
cogroup
than two join operations
// Cogroup to avoid multiple joins on the same RDD
val addressScoreFood = addressRDD.cogroup(scoreRDD, foodRDD)
cogroup
will cause memory errors for the same reason asgroupByKeys
if one key in either RDD or both combined is associated with more data than will fit on a single partition. In particular,cogroup
requires that all the records in all of theco-grouped
RDDs for one key be able to fit on one partition