Narrow Vs Wide Transformations - ignacio-alorre/Spark GitHub Wiki

Main Point wide transformations are those that require a shuffle, while narrow transformations are those that do not.

  • Narrow transformations: Those in which each partition of the parent RDD is used y at most one partition of the child RDD.
  • Wide transformations: Those which multiple child partitions may depend on each partition in the parent

Since Spark evaluation engine (the "DAG") builds an execution plan in reverse: from the output (the last action) to the input RDD, founders of Spark define narrow/wide transformation based on how child partitions depend on parent partitions. There are two advantages on following this approach:

  • It rules out the case of one parent partition having multiple children in a narrow dependency. It explains why coalesce is only a narrow transformation when it is reducing rather than increasing the number of partitions
  • It clarifies why the number of tasks used to complete a computation corresponds to each output partition rather than each input partition. The tasks needed to compute a transformation are computed on the child partitions.

Diagram below shows dependencies between parent and child partitions for narrow and wide transformations for Spark

  • Each line of squares in the diagram represents the same executors at different point in time. The arrows denote dependencies between partitions. In fact repartitioning data does not necessarily require data movement across machines, since partitions may reside on the same executor.

  • When changing the partition of a record does require data movement between executors, since the records have to be passed through the driver rather than being transferred directly between the executors

//Narrow dependency. Map the rdd to tuples of (x, 1)
val rdd2 = rdd1.map(x => (x, 1))
//wide dependency groupByKey
val rdd3 = rdd2.groupByKey()

Arrows represent partition dependencies. Each child partition has arrows pointing to the parent partitions upon which it depends; if an arrow points from partition y to partition x, that means that x depends on y. Blue arrows represent narrow dependencies and red arrows represent wide dependencies.

We assume the RDD has four partitions

To compute the map step, each child partition depends on just one parent, since the data doesn't need to be moved between partitions for the operation to be computed. However, in the groupByKey step, Spark needs to move an arbitrary number of the records so that those with the same key are in the same partition in order to combine records corresponding to a single key into one iterator (an iterator is a local, rather than a distributed collection). Thus, the child partitions depend on many partitions in the parent RDD.

Implications for Performance

Narrow dependencies do not require data to be moved across partitions. Consequently narrow transformations don't require communication with the driver node, and an arbitrary number of narrow transformations can be executed on any subset of records given one set of instructions from the driver. In Spark terminology, we say that each series of narrow transformations can be computed in the same "stage" of the execution plan.

In contrast, a shuffle associated with a wide dependency marks a new stage in the RDD's evaluation. Because task must be computed on a single partition and the data needed to compute each partition of a wide dependency may be spread across machines. Downstream computations cannot be computed before the shuffle finishes.

Except in the case of multiple RDD operations like join, the stages associated with one RDD must be executed in sequence. Thus, not only are shuffles expensive since they require data movement and potential disk I/O, they also limit parallelisation

Implications for Fault Tolerance

  • The cost of failure for a partition with wide dependencies is much higher than for one with narrow dependencies, since it requires more partitions to be recomputed.

  • If one partition in the parent of a mappedRDD fails, only one of its children must be recomputed, and the tasks needed to recompute that child partition can be distributed across the executors to make this recomputation faster.

  • In contrast, if the parent of the sorted RDD loses a partition, it is possible (in the worst case) that all the child partitions will need to be recomputed.

Note: Chaining together transformations with wide dependencies only increases the risk of a very expensive recomputation particularly if any of the wide transformations have a high probability of causing memory errors. In some instances, the cost of recomputation may be high enough that it is worth checkpointing an RDD, so the intermediate results are saved.

The Special Case of coalesce

  • The coalesce operation is used to change the number of partitions in an RDD.

  • When coalesce reduces the number of output partitions, each parent partition is used in exactly one child partition since the child partitions are the union of several parents. We can consider then coalesce a narrow transformation even though it changes the number of partitions in the RDD. Since tasks are executed on the child partition, the number of tasks executed in a stage that includes a coalesce operation is equivalent to the number of partitions in the result RDD.

  • However using the coalesce to increase the number of partitions is a wide transformation. The coalesce function prioritizes evenly distributing the data across the child partitions. The location of records in the output cannot be determined at design time, because it depends on how many records are stored on each input partition.

Note: Coalesce causes the upstream partitions in the entire stage to execute with the level of parallelism assigned by coalesce. This may be undesirable. Avoid this behavior at the cost of a shuffle by setting the shuffle argument of coalesce to true or by using the repartition function instead.