Reusing RDDs - ignacio-alorre/Spark GitHub Wiki

All kinds of persistence and checkpointing have some cost and are unlikely to improve performance for operations that are performed only once. Furthermore, on large datasets the cost of persisting or checkpointing can be so high that recomputing is more desirable.

Cases for Reuse

Iterative Computations

For transformations that use the same parent RDD multiple times, reusing an RDD forces evaluation of that RDD and so can help avoid repeated computations. For example, you are performing a loop of joins to the same dataset, persisting that dataset could lead to huge performance improvements since it ensures that the partitions of the RDD will be available in-memory to do each join

Let's see an example where we compute the RMSE on a number of different RDDs representing predictions from different models

val testSet: Array[RDD[(Double, Int)]] = 
	Array(
		validationSet.mapValues(_ + 1),
		validationSet.mapValues(_ + 1),
		validationSet
	)
validationSet.persist() // persist since we are using this RDD several times
val errors = testSet.map( rdd => {
	rmse(rdd.join(validationSet).values)
})
  • In this example it is not required to reload and repartition the training dataset RDD to complete the join, since it will stay loaded in memory on the executors with each run of the algorithm.

Multiple actions on the same RDD

  • If you do not reuse an RDD, each action called on an RDD will launch its own Spark job with the full lineage of RDD transformations. Persisting and checkpointing breaks the RDD's lineage, so the same series of transformations preceding the persist or checkpoint call will be executed only once.

  • Since persisting or checkpointing an RDD lasts for the duration of a Spark application, an RDD persisted during one Spark job will be available in a subsequent job executed with the same SparkContext.

  • An example of this scenario is from an RDD we want to sortByKey() , then we make a count and finally a take of the 10% of the records. In case we don't persist the original rdd, the sortByKey() needed to create the RDD sorted twice: once in the job called by count and again in the job called by take. This application would launch two jobs and each one includes a sort stage.

  • However, if we add a persist or checkpointc all before the actions, the transformation will only be executed once.

val sorted = rddA.sortByKey()
val count = sorted.count()
val sample: Long = count / 10
rddA.persist()
sorted.take(sample.toInt)
  • To reuse data between Spark applications, use checkpointing with the same directory

If the cost to compute each partition is very high

  • Persisting or checkpointing can be particularly useful if the cost of computing one partition is very high because they ensure that the entire expensive operation will not need to be recomputed in the case of downstream failures.

  • If all of the narrow transformations together create more GC overhead or memory strain than your cluster's executors can handle, then checkpointing or persisting off_heap can be particularly useful. Both persisting off_heap and checkpointing allow the RDD to be stored outside of the Spark executor memory, leaving space to compute.

  • Some individual narrow transformations, such as training a model per partition or working with very wide rows can be expensive. In these cases, reusing an RDD after the expensive computation, so it is not recomputed, may improve performance

Deciding if Recompute Is Inexpensive Enough

  • Persisting data in memory is space intensive and will take time to serialize and deserialize. Persisting in memory and in-memory computations are both done in the Spark executor JVM. Thus, persisting in memory may take space that could be used for downstream computations or increase the risk of memory failures.

  • Caching with Java-based memory structures will incur a much higher garbage collecting cost than will recomputing

  • Persisting to disk or checkpointing has the disadvantages of MapReduce, causing expensive write and read operations, which also need to be taken into account.

  • For relatively simple operations the cost of the read operation needed to load the RDD far outweighs the others, so persisting is most useful when it prevents triggering another read operation or in the case of many iterative computations

  • Breaking an RDD's lineage by forcing evaluation through persisting or checkpointing prevents transformations with narrow dependencies from being combined into a single task. Breaking lineage between narrow transformations is only desirable in the most extreme cases

  • In general it is worth reusing an RDD rather than recomputing it if the computation is large relative to your cluster and the rest of your job

  • Best way to tell if you need to reuse your RDD is to run a job. If your job runs very slowly, see if persisting the RDDs may help.

  • If a job is failing with GC ot out-of-memory errors, checkpointing or persisting off_heap may allow the job to complete, particularly if the cluster is noise. If you were already persisting with the options that use in-memory persistence, consider removing the persist call or switching to checkpointing or off_heap persistence or checkpointing.

Types of Reuse: Cache, Persist, Checkpoint, Shuffle Files

  • Caching: Equivalent to persisting with the in-memory storage
  • Persisting: Most useful to avoid recomputation during one Spark job or to break RDDs with long lineages, since they keep an RD on the executors during a Spark job
  • Checkpointing: Is most useful to prevent failures and high cost or recomputation by saving intermediate results. Similar to persist.

Persist and cache

  • Persist an RDD means materializing an RDD (usually by storing it in-memory on the executors), for reuse during the current job.

  • Spark remembers a persisted RDD's lineage so that it can recompute it for the duration of a Spark job if one of the persisted partitions is lost.

  • Spark provides a number of different storage levels as constants, but each one is created based on five attributes of how to store the RDD: useDisk, useMemory, useOfHeap, deserialized and replication. Calling toString on a storage level will reveal what option it contains. A more detailed explanation of those options here:

    • useDisk: Partitions that don not fit in memory will be written to disk. By default, if partitions do not fit in memory, they will simply be evicted and will need to be recomputed when the persisted RDD is used. Persisting to disk can ensure that recomputation of those additional large partitions is avoided. However, reading from disk can be time-intensive, so persistence to disk is only important if the cost of recomputation is rather high.

    • useMemory: RDD will be stored in-memory or be directly written to disk. The DISK_ONLY storage levels are the only option that mark this as false. Most of the speed benefits of caching come from keeping RDDs in memory, for fast access when reusing them for repeated computations. However, there are some cases where disk-only persistence makes sense, e.g., when the computation is more expensive than reading in local disk or the network filesystem is especially slow

    • useOfHeap: RDD will be stored outside of the Spark executor in an external system such as Tachyon. The storage option off_heap enables this property. If memory is a serious issue, or a cluster is noisy and partitions are evicted, this option may be compelling.

    • deserialized: RDD will be stored as deserialized Java objects. This can make storing RDDs more space efficient, especially when using a faster serializer. But incurs some performance overhead. If your RDD is too large to persist in-memory, first try to serialize it with the MEMORY_ONLY_SER option. This will keep the RDD fast to access, but will decrease the memory needed to store it.

    • replication: It is an integer that controls the number of copies of the persisted data to be stored in the cluster. Use this option to ensure fast fault tolerance. However, be aware that persistence with replication incurs double the space and speed costs of persistence without replication. Replication is usually only necessary in an instance of a noisy cluster or bad connection where failures are unusual. It might also be useful if you do not have time to recompute in case of failure, such as when serving a live web application.

Checkpointing

  • Writes the RDD to an external storage system such as HDFS or S3, in contrast to persisting, forgets the RDD's lineage. Since checkpointing requires writing the RDD outside of Spark, checkpointed information survives beyond the duration of a single Spark application and forces evaluation of an RDD.

  • Checkpointing takes up more space in external storage and may be slower than persisting since it requires potentially costly write operations. However, it does not use any Spark memory and will not incur recomputation if a Spark worker fails.

  • It is best to use checkpointing when the cost of failure and recomputation is of more concern than additional space in external storage.

  • It is better persisting when jobs are slow and checkpointing when they are failing.

  • If a Spark job is failing due to out-of-memory errors, checkpointing will reduce the cost and likelihood of failure without using up memory on the executors

  • If your jobs are failing due to network errors or preemption on noise cluster, checkpointing can reduce the likelihood of failure by breaking up a long-running job into smaller segments