RDD shuffling - ignacio-alorre/Spark GitHub Wiki

Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.

Background To understand what happens during the shuffle we can consider the example of the reduceByKey operation. The reduceByKey operation generates a new RDD where all values for a single key are combined into a tuple - the key and the result of executing a reduce function against all values associated with that key. The challenge is that not all values for a single key necessarily reside on the same partition, or even the same machine, but they must be co-located to compute the result.

In Spark, data is generally not distributed across partitions to be in the necessary place for a specific operation. During computations, a single task will operate on a single partition - thus, to organize all the data for a single reduceByKey reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then bring together values across partitions **to compute the final result for each key - this is called the shuffle.

Although the set of elements in each partition of newly shuffled data will be deterministic, and so is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably ordered data following shuffle then it’s possible to use:

  • mapPartitions to sort each partition using, for example, .sorted
  • repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously repartitioning
  • sortBy to make a globally ordered RDD

Operations which can cause a shuffle include:

  • repartition operations like repartition and coalesce
  • *ByKey operations (except for counting) like groupByKey and reduceByKey
  • join operations like cogroup and join

Performance Impact The Shuffle is an expensive operation since it involves:

  • Disk I/O
  • Data serialization
  • Network I/O

Certain shuffle operations can consume significant amounts of heap memory since they employ in-memory data structures to organize records before or after transferring them, for example reduceByKey and aggregateByKey. Besides Shuffle also generates a large number of intermediate files on disk

To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark’s map and reduce operations. Internally, results from individual map tasks are kept in memory until they can’t fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks.

Example

Scenario: Data consists of 167 csv files, with a size of 6.5GB. The Data will be processed using two node cluster, each with 4GB of Ram and 3 CPUs. We want to separate the data into two sets: Weekends and Weekdays.

This means that Spark is unable to hold the entire dataset in-memory at once and therefore must write data to a driver, or pass it across the network.

In Theory

Partitioning To distribute work across the cluster and reduce the memory requirements of each node, Spark will split the data into smaller parts called Partitions. Each of this Partitions is sent to an Executor to be processed. Only one Partition is computed per executor thread at a time. This means the size and quantity of partitions passed to an executor is directly proportional to the time it takes to complete.

Problem 1: Data Skew Often the data is split into partitions based on a key, for instance the first letter of a name or a data. If values are not evenly distributed throughout this key, then more data will be placed in one partition than another. For example:

{Adam, Alex, Anja, Beth, Claire}
A: {Adam, Alex, Anja}
B: Beth
C: Claire

In this example A partition is 3 times larger than the other two, and therefore will take approximately 3 times more to compute. As the next stage of processing cannot begin until all three partitions are evaluated, the overall results from the stage will be delayed.

Problem 2: Scheduling It may happen there are too few partitions to correctly cover the number of executors available. Let's imagine there are 2 Executors and 3 Partitions. Executor 1 has an extra partition to compute and therefore takes twice as long as Executor 2, so Executor 2 will be idle for half of the job,

Solution to Problems 1 and 2 The simplest solution to the above two problems is to increase the number of partitions used for computations. This will reduce the effect of skew into a single partition and will also allow better matching of scheduling to CPUs.

A common recommendation is to have 4 partitions per CPU, however settings related to Spark performance are very case dependent, and so this value should be fine-tuned with your given scenario.

Problem 3: Shuffling A shuffle occurs when data is rearranged between partitions. This is required when a transformation requires information from other partitions, such as summing all the values in a column. Spark will gather the required data from each partition and combine it into a new partition, likely on a different executor.

During a shuffle, data is written to disk and transferred across the network, halting Spark’s ability to do processing in-memory and causing a performance bottleneck. Consequently we want to try to reduce the number of shuffles being done or reduce the amount of data being shuffled.

Map-Side Reduction

When aggregating data during a shuffle, rather than pass all the data, it is preferred to combine the values in the current partition and pass only the result in the shuffle. This process is known as Map-Side Reduction and improves performance by reducing the quantity of data being transferred during shuffle.

In Practice

We are going to enrich the data set with a new column which will be true for weekend and false for week days.

data = data.withColumn("isWeekend",
         data.col("Weekday").equalTo("Saturday").or(data.col("Weekday").equalTo("Sunday")));

Finally we will repartition the data based on the isWeekend column and the save that in Parquet format

data.repartition(data.col("isWeekend")).write().parquet("cycle-data-results" + Time.now());

Round 1 When the job is run, we see the repartition command does a shuffle and produces 200 partitions (the Spark default), which should offer excellent levels of parallelism. Below there is the execution timeline:

It is clear the timeline is not balanced. There are only two partitions taking up any significant execution time, amongst many tiny ones. Even between the two larger ones, the processing is not equally split. This is indicative of data skew, as the partitions are taking different lengths of time to process. The image above also demonstrates the scheduling issues mentioned before, with the second executor being idle for the last 60 seconds.

Why? This occurred because calling repartition moves all values for the same key into the same partition on one Executor. For this case, our key is isWeeekend, which is a boolean value, meaning that only two partitions will be populated with data. Therefore Spark offers 198 other partitions with no data. Even if we had more than two executors available, they would receive empty partitions and would be idle throughout the process.

Grouping in this fashion is also a common source of memory exceptions as, with a large data set, a single partition can easily be given multiple GBs of data and quickly exceed the allocated RAM.

Round 2

Another way of writing the query is to delegate the repartitioning to the write method.

data.write().partitionBy("isWeekend").parquet("cycle-data-results" + Time.now())

With this approach Spark loaded the CSV files into 69 partitions, split these based on isWeekend and shuffled the results into 200 new partitions for writing. In the new solution Spark still loads the CSVs into 69 partitions, however it is then able to skip the shuffle stage, realising that it can split the existing partitions based on the key and then write that data directly to parquet files. Looking at the execution timeline, we can see a much healthier spread between the partitions and the nodes, and no shuffle occurring.

Sources