Shuffling What it is and why it's important (Coursera) - ignacio-alorre/Spark GitHub Wiki
Since we have our data distributed among several nodes, some data transfer will occur when we perform a groupBy()
or groupByKey()
. For example:
val pairs = sc.parallelize(List((1, "one"), (2, "two"), (3, "three")))
pairs.groupByKey()
// res2: org.apache.spark.rdd.RDD[(Int, Iterable[String])]
// = ShuffledRDD[16] at groupByKey at <console>:37
This action of moving data from one node to another because of the grouped with its key is called shuffling. This action can enormous hit the performance, since Spark must send data from one node to another, producing a lot of Latency.
case class CFFPurchase(customerId: Int, destination: String, price: Double)
Assume we have an RDD of the purchases that users of the Swiss train company's, the CFF's, mobile app have made in the past month.
val purchasesRDD: RDD[CFFPurchase] = sc.textFile(...)
Goal: Calculate two things: how many trips, and how much money was spent by each individual customer over the course of the month
// Returns: Arra[(Int, (Int, Double))]
val purchasesPerMonth =
purchasesRDD.map(p => (p.customerId, p.price)) // Pair RDD
.groupByKey() // groupByKey returns RDD[(K, Iterable[V])]
.map(p => (p._1, (p._2.size, p._2.sum)))
.collect() // Calling an action to kick off the computation
Let's see how an example dataset could be distributed in the cluster. The original data look like:

Data distributed among three nodes:

Distributed data after executing the initial map()
function to convert the initial data into ```PairRDD``

Now comes the groupByKey()
. We will create now collections of values, where each values goes to its corresponding unique key. This implies moving (key,values) pairs across the network. Since we have tree nodes and three keys, each node will be the home of one single key.

We can try to reduce
the data before the shuffle, reducing in this way the amount of data we have to send over the network.
We can use reduceByKey()
. reduceByKey()
can be thought of as a combination of first doing groupByKey()
and then reduce on all the values grouped per key. It's more efficient though, than using each separately. We'll see how in the following example.
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
Let's apply this new method to the original script. Notice that the function passed to map has changed. Now it is p => (p.customerId, (1, p.price))
// Returns: Arra[(Int, (Int, Double))]
val purchasesPerMonth =
purchasesRDD.map(p => (p.customerId, (1, p.price))) // Pair RDD
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
.groupByKey() // groupByKey returns RDD[(K, Iterable[V])]
.map(p => (p._1, (p._2.sum, p._2.sum)))
.collect() // Calling an action to kick off the computation
Now the original data will apply a reduceByKey
operation first in the nodes, as below
Then it will be shuffle again during the groupByKey, but this time will be a reduced amount

Benefits
-
By reducing the dataset first, the amount of data sent over the network duing the shuffle is greatly reduced.
-
This can result in non-trivial gains in performance when the data sets are really big.
Benchmark on a real cluster groupByKey vs reduceByKey

For the previous example:
val purchasesPerMonth =
purchasesRDD.map(p => (p.customerId, p.price)) // Pair RDD
.groupByKey()
Grouping all values of key-value pairs with the same key requires collecting all key-values pairs with the same key on the same machine.
How does Spark know which key to put in which machine?
By default, Spark uses hash partitioning to determine which key-value pair should be sent to which machine.
The data within an RDD is split into several partitions.
Properties of partitions:
- Partitions never span multiple machines, i.e., tuples in the same partition are guaranteed to be on the same machine
- Each machine in the cluster contains one or more partitions
- The number of partitions to use is configurable. By default, it equals the total number of cores on all executors nodes. For example if you have 4 machines and each machine has 6 cores, the number of partitions will be 24.
Two kinds of partitioning available in Spark
- Hash partitioning
- Range partitioning
Customizing a partitioning is only possible in Pair RDDs, since partitions as based on keys.
Using again the same example:
val purchasesPerCust =
purchasesRDD.map(p => (p.customerId, p.price)) // Pair RDD
.groupByKey()
groupByKey
first computes per tuple (k, v) its partition p:
p = k.hashCode() % numPartitions // The number of partitions by default is numMachines * numCoresPerMachine
Then, all tuples in the same partition p are sent to the machine hosting p.
Intuition: hash partitioning attempts to spread data evenly across partitions based on the key
Pair RDDs may contain keys that have an ordering defined. For example: Int, Char, String...
For such RDDs, range partitioning may be more efficient.
Using a range partitioner, keys are partitioned according to:
- An ordering for keys
- A set of sorted ranges of keys
Property: tuples with keys in the same range appear on the same machine.
Consider...
- a Pair RDD, with keys [8, 96, 240, 400, 401, 800] and a desired number of partitions of 4.
- suppose that hashCode() is the identity (n.hashCode() == n), which means:
p = k % 4
In this case, hash partitioning distributes the keys as follows among the partitions:
partition 0: [8, 96, 240, 400, 800]
partition 1: [401]
partition 2: []
partition 3: []
The result is a very unbalanced distribution which hurts performance.
Using range partitioning the distribution can be improved significantly:
- Assumptions: (a) keys non-negative, (b) 800 is biggest key in the RDD.
- Set of ranges: [1, 200], [201, 400], [401, 600], [601, 800]
In this case, range partitioning distributes the keys as follows among the partitions:
partition 0: [8, 96]
partition 1: [240, 400]
partition 2: [401]
partition 3: [800]
The resulting partitioning is much more balanced.
How do we set a partitioning for our data?
There are two ways to create RDDs with specific partitionings:
-
- Call partitionBy on an RDD, providing an explicit Partitioner
-
- Using transformations that return RDDs with specific partitiones
Invoking partitionBy
creates an RDD with a specified partitioner. Example:
val pairs = purchasesRdd.map(p => (p.customerId, p.price))
val tunedPartitioner = new RangePartitioner(8, pairs) # We want to have 8 partitions
val partitioned = pairs.partitionBy(tunedPartitioner).persist()
Notice the persist(), the answer is spark semantics and tendency to reevaluate chains of transformations again and again. This data would be shuffled over the network and partitioned again and again. With ```persist`` we're saying to Spark once you move the data around in the network and repartition it just keep it where it is, persist it in memory. Else you'd easily find yourself accidentally re-partitioning your data in every iteration completely unknowingly.
Creating a RangePartitioner()
requires:
- Specifying the desired number of partitions
- Providing a Pair RDD with ordered keys. This RDD is sampled to create a suitable set of sorted ranges.
AGAIN!!: The result of partitionBy should be persisted. Otherwise, the partitioning is repeatedly applied (involving shuffling!) each time the partitioned RDD is used.
Partitioner from parent RDD:
Pair RDDs that are the result of a transformation on a partitioned Pair RDD typically is configured to use the hash partitioner that was used to construct it
Automatically-set partitioners:
Some operations on RDDs automatically result in an RDD with known partitioner - for when it makes sense.
For example, by default, when using sortByKey
, a RangePartitioner
is used. Further, the default partitioner when using groupByKey
, is a HashPartitioner
, as we saw earlier.
Operations on Pair RDDs that hold to (and propagate) a partitioner:
- cogroup
- groupWith
- join
- lefOuterJoin
- rightOuterJoin
- groupByKey
- reduceByKey
- foldByKey
- combineByKey
- partitionBy
- sort
- mapValues (if parent has a partitioner)
- flapMapValues (if parent has a partitioner)
- filter (if parent has a partitioner)
Notice!!: if you apply a map() or flatMap() on a partitioned RDD, that RDD will loose its partition and the data may be moved all around again in the cluster
...all other operations will produce a result without a partitioner. Why?
Consider again the map transformation. Given that we have a hash partitioned Pair RDD, why would it make sense for map to lose the partitioner in its result RDD?
Because it's possible for map to change the key. For example:
rdd.map((k: String, v: Int) => ("doh!", v))
In this case, if the map transformation preserved the partitioner in the result RDD, it no longer make sense, as now the keys are all different.
However mapValues()
enables us to still do map transformations without changing the keys, thereby preserving the partitioner. For this reason, if we are working with pairRDDs it is better tying to use first mapValues
instead of map
Partitioning can bring substantial performance gains, especially to face shuffles.
Using range partitioners we can optimize our earlier use of reduceByKey so that is does not involve any shuffling over the network at all. Here is how to do it:
val pairs = purchasesRDD.map(p => (p.customerId, p.price)) // Pair RDD
val tunedPartitioner = new RangePartitioner(8, pairs)
val partitioned = pairs.partitionBy(tunedPartitioner)
.persist()
// The second part remains as before
val purchasesPerCust = partitioned.map(p => (p._1, (1, p._2)))
val purchasesPerMonth = purchasesPerCust
.reduceByKey((v1, v2) => (v1._1 + v2._2, v1._2 + v2._2))
.collect()
Let's see how this will improve performance:

Consider an application that keeps a large table of user information in memory:
- userData [BIG]- containing (UserID, UserInfo) pairs, where UserInfo contains a list of topics the user is subscribed to
The application periodically combines this big table with a smaller file representing events that happened in the past five minutes
- events [small]- containing (UserID, LinkInfo) pairs for users who have clicked a link on a website in those five minutes:
For example, we may wish to count how many users visited a link that was not to one of their subscribed topics. We can perform this combination with Spark's join operation, which can be used to group the UserInfo and LinkInfo for each UserID by key.
This is how we would implement this case:
val sc = new SparkContext()
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
def processNewLogs(logFileName: String){
val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
val joined = userData.join(events) // RDD of (UserID, (UserInfo, LinkInfo))
val offTopicVisits = joined.filter{
case (userId, (userInfo, linkInfo)) => // Expand the tuple
!userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed topics: "+ offTopicVisits)
}
The code above will be very inefficient, since the join
operation called each time processNewLogs is invoked, does not know anything about how the keys are partitioned in the datasets.
By default, this operation will hash all the keys of both datasets, sending elements with the same key hash across the network to the same machine, and then join together the elements with the same key on that machine. Even though userData doesn't change

To fix this issue we need to use partitionBy on the big userData RDD at the start of the program
val sc = new SparkContext()
/** NEw*/
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
.partitionBy(new HashPartitioner(100)) // Create 100 partitions
.persist()
def processNewLogs(logFileName: String){
val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
val joined = userData.join(events) // RDD of (UserID, (UserInfo, LinkInfo))
val offTopicVisits = joined.filter{
case (userId, (userInfo, linkInfo)) => // Expand the tuple
!userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed topics: "+ offTopicVisits)
}
Since we called partitionBy
when building userData
, Spark will now know that it is hash-partitioned, and calls to join on it will take advantage of this information.
In particular, when we call userData.join(events)
, Spark will shuffle only the events RDD (which was smaller than users), sending events with each particular UserID
to the machine that contains the corresponding hash partition of userData. Visually:

Using a previous example to understand what is happening under the hood:
val purchasesPerCust =
purchasesRdd.map(p => (p.customerId, p.price)) // Pair RDD
.groupByKey()
Grouping all values of the key-value pairs with the same key requires collecting all key-value pairs with the same key on the same machine.
Grouping is done using a hash partitioner with default parameters
The result RDD, purchasesPerCust, is configured to use the hash partitioner that was used to construct it.
How do I know a shuffle will occur?
Rule of thumb: a shuffle can occur when the resulting RDD depends on other elements from the same RDD or another RDD.
You can also figure out whether a shuffle has been planned/executed via:
- The return type of certain transformations, e.g.
org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[366]
- Using function
toDebugString
to see its execution plan:
partitioned.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
.toDebugString

Operations that might cause a shuffle:
- cogroup
- groupWith
- join
- lefOuterJoin
- rightOuterJoin
- groupByKey
- reduceByKey
- foldByKey
- combineByKey
- distinct
- intersection
- repartition
- coalesce
-
reduceByKey
running on a pre-repartitioned RDD will cause the values to be computedlocally
, requiring only the final reduced value has to be sent from the worker to the driver -
join
called on two RDDs that are pre-partitioned with the same partitioner and cached on the same machine will cause the join to be computed locally, with no shuffling across the network.
How your data is organized on the cluster, and what operations you are doing with it matters!