Locality scheduling notes - animeshtrivedi/notes GitHub Wiki

Spark task/RDD locality and scheduling

Details on various locality/scheduling related components

The stack dump of the locality call

Call stack where the parquet location information is read:

	at com.ibm.crail.hdfs.CrailHadoopFileSystem._getFileBlockLocations(CrailHadoopFileSystem.java:270)
	at com.ibm.crail.hdfs.CrailHadoopFileSystem.getFileBlockLocations(CrailHadoopFileSystem.java:260)
	at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$listLeafFiles$3.apply(PartitioningAwareFileIndex.scala:429)
	at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$listLeafFiles$3.apply(PartitioningAwareFileIndex.scala:412)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
	at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$.org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$listLeafFiles(PartitioningAwareFileIndex.scala:412)
	at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$bulkListLeafFiles$1.apply(PartitioningAwareFileIndex.scala:302)
	at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$bulkListLeafFiles$1.apply(PartitioningAwareFileIndex.scala:301)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$.org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$bulkListLeafFiles(PartitioningAwareFileIndex.scala:301)
	at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listLeafFiles(PartitioningAwareFileIndex.scala:253)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:74)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:50)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:397)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
	at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:441)
	at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:425)

InMemoryFileIndex implements the abstract class PartitionAwareFileIndex. Roughly they both represents a general idea where data is partitioned across multiple files, and needs to enumerate. Upon allocating a new InMemoryFileIndex - it calls refresh0 which calls PartitioningAwareFileIndex.listLeafFiles which calls PartitioningAwareFileIndex.bulkListLeafFiles. Here the driver can decide if to launch a new set of Tasks to enumerate all files or just handle things at the driver (configurable by spark.sql.sources.parallelPartitionDiscovery.threshold, default 32 partitions). bulkListLeafFiles calls listLeafFiles. It is here the location information is extracted from the file system. First it calls directory enumeration fs.listStatus and then for each file it calls fs.getFileBlockLocations(f, 0, f.getLen). The statuses variable contains all file names, some of which are eventually filtered out (like _SUCCESS, temp, etc.). Filtering is done by shouldFilterOut() function.

Encoding location in RDD partitions

DataSourceScanExec which is the top level action when reading a data source (e.g., parquet) creates an RDD[InternalRow]. The input for this RDD is FileScanRDD which is created in createNonBucketedReadRDD function. It is the same function where the partitioning logic is calculated.

In FileScanRDD.scala it has case class FilePartition extends RDDPartition - which captures the RDD partition index and where the data from this index should come from as a sequence of PartitionedFile. A PartitionedFile is also a case class that captures file-level low information about filepath, start, length, and locations details. It also has InternalRow object where the information should go (I am completely clear how this is used). Though, this is passed during the initialization of a parquet reader in ParquetFileFormat.buildReader(). I suspect it is allocated as a buffer space here but does not have an immediate use.

FileScanRDD.filePartitions are then returned when RDD.getPartitions is called because FileScanRDD is also an RDD type which has partition information.

FileScanRDD.getPreferredLocations is actually an interesting function. It calculates for a given distribution of block information where to schedule task. It does so by calculating which hosts have the maximum amount of data and then take top 3 as the preferred locations.


NOTE: Partition index 'n' results in task'n' in a stage. For example,

TaskSetManager: Starting task 3.0 in stage 0.0 (TID 0, flex15.zurich.ibm.com, executor 4, partition 3, NODE_LOCAL, 6608 bytes)

task 3.0 corresponds to the third partition of the RDD (Here FileSanRDD), but TID 0 says it was the first one to be scheduled. That is all.


Task Scheduling details

The main entry point in the DAGScheduler is

def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] 

which is called from SparkContext.submitJob. It passes an RDD with necessary locality information about it partitions. When a job is submitted an event is generated and doOnReceive is called. A job can be a MapStage or a ResultStage. From submission it comes to def handleJobSubmitted function. From there to def submitStage which calculates if the current stage has missing parents of missing tasks. Since for a simple IO job, there is just one stage with missing tasks, the code branches off to def submitMissingTasks. Here it calculates preferred locations as :

val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
  stage match {
    case s: ShuffleMapStage =>
      partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
    case s: ResultStage =>
      partitionsToCompute.map { id =>
        val p = s.partitions(id)
        (id, getPreferredLocs(stage.rdd, p))
      }.toMap
   }
} catch {
  case NonFatal(e) =>
    stage.makeNewStageAttempt(partitionsToCompute.size)
     listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
     abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
    runningStages -= stage
    return
}

This location information is then passed to the stage variable. The stage variable is use to calculate a sequence of Tasks that will be executed. These tasks are then submitted to the TaskScheduler (implemented as TaskSchedulerImpl).

taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

The TaskScheduler allocates a new TaskSetManager which is responsible for scheduling the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of (1) each task, retries tasks if they fail (up to a limited number of times), and (2) handles locality-aware scheduling for this TaskSet via delay scheduling. The TaskSetManager then calls var myLocalityLevels = computeValidLocalityLevels() which calculates the preferred location of a task based upon multiple factors. For a given RDD partition location it checks with TaskSchedulerImpl.hasExecutorsAliveOnHost(hostname) if the host has an executor. If yes, then it uses that host to schedule the computation for that particular RDD partition. The preferred level should be NODE_LOCAL. And the case of no match usually one gets RACK_LOCAL or ANY.

Task/Stage and location related information

abstract class Stage is implemented by ResultStage and ShuffleMapStage - these are two types of tasks that run in Spark. A stage is a collection of completely parallel tasks.

Both ResultStage and ShuffleMapStage result in ResultTask and ShuffleMapTask which implement a Task abstract class. Both of these tasks type have preferredLocs as a sequence of TaskLocation. A TaskLocation is a location where a task should run. This can either be a host or a (host, executorID) pair.

⚠️ **GitHub.com Fallback** ⚠️