Spark groupbyKey vs reduceByKey vs aggregateByKey - veeraravi/Spark-notes GitHub Wiki

ReduceByKey

While both reducebykey and groupbykey will produce the same answer, the reduceByKey example works much better on a large dataset. That’s because Spark knows it can combine output with a common key on each partition before shuffling the data.

On the other hand, when calling groupByKey – all the key-value pairs are shuffled around. This is a lot of unnessary data to being transferred over the network.

Syntax:

sparkContext.textFile("hdfs://") .flatMap(line => line.split(" ")) .map(word => (word,1)) .reduceByKey((x,y)=> (x+y))

Data is combined at each partition , only one output for one key at each partition to send over network. reduceByKey required combining all your values into another value with the exact same type.

GroupByKey – groupByKey([numTasks])

It doesn’t merge the values for the key but directly the shuffle process happens and here lot of data gets sent to each partition, almost same as the initial data.

And the merging of values for each key is done after the shuffle. Here lot of data stored on final worker node so resulting in out of memory issue.

Syntax:

sparkContext.textFile("hdfs://") .flatMap(line => line.split(" ") ) .map(word => (word,1)) .groupByKey() .map((x,y) => (x,sum(y)) )

groupByKey can cause out of disk problems as data is sent over the network and collected on the reduce workers

AggregateByKey – aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) It is similar to reduceByKey but you can provide initial values when performing aggregation.

same as reduceByKey, which takes an initial value.

3 parameters as input i. initial value ii. Combiner logic iii. sequence op logic

val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D") val data = sc.parallelize(keysWithValuesList) //Create key value pairs val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache() val initialCount = 0; val addToCounts = (n: Int, v: String) => n + 1 val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2 val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)

ouput: Aggregate By Key sum Results bar -> 3 foo -> 5

Comparison between groupByKey, reduceByKey and aggregateByKey groupByKey() is just to group your dataset based on a key.

reduceByKey() is something like grouping + aggregation.

reduceByKey can be used when we run on large data set.

reduceByKey when the input and output value types are of same type over aggregateByKey

aggregateByKey() is logically same as reduceByKey() but it lets you return result in different type. In another words, it lets you have a input as type x and aggregate result as type y. For example (1,2),(1,4) as input and (1,”six”) as output.

Summary Summary is that when you have an RDD of type (K, V)):

if you need to keep the values, then use groupByKey

if you no need to keep the values, but you need to get some aggregated info about each group (items of the original RDD, which have the same K), you have two choices: reduceByKey or aggregateByKey (reduceByKey is kind of particular aggregateByKey)

if you can provide an operation which take as an input (V, V) and returns V, so that all the values of the group can be reduced to the one single value of the same type, then use reduceByKey. As a result you will have RDD of the same (K, V) type.

if you can not provide this aggregation operation, then use aggregateByKey. It happens when you reduce values to another type. So you will have (K, V2) as a result

References

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

https://stackoverflow.com/questions/43364432/spark-difference-between-reducebykey-vs-groupbykey-vs-aggregatebykey-vs-combineb