The Anatomy of a Spark Job - ignacio-alorre/Spark GitHub Wiki
-
Due to Spark lazy evaluation, a Spark application doesn't "do anything" until the driver program calls an action.
-
With each action, the Spark scheduler builds an execution graph and launches a
Spark job
. -
Each job consists of stages, which are steps in the transformation of the data needed to materialize the final RDD.
-
Each stage consists of a collection of tasks that represent each parallel computation and are performed on the executors
Figure below shows a tree of the different components of a Spark application and how these correspond to the API calls.
-
An application corresponds to starting a
SparkContext/SparkSession
. -
Each application may contain many jobs that correspond to one RDD action.
-
Each job may contain several stages that correspond to each wide transformation
-
Each stage is composed of one or many tasks that correspond to a parallelizable unit of computation done in each stage
-
There is one task for each partition in the resulting RDD of that stage
The DAG
-
Spark's high-level scheduling layer uses RDD dependencies to build a *Directed Acyclic Graph (DAG) of stages for each Spark job.
-
Errors related with connecting to your cluster, your configuration parameters or launching a Spark job show up as DAG Scheduler errors
Jobs
Highest element of Spark execution hierarchy. Each Spark job corresponds to one action, and each action is called by the driver program of a Spark application. One way to conceptualize an action is as something that brings data out of the RDD world of Spark into some other storage system (usually by bringing data to the driver or writing to some stable storage system)
The edges of the Spark execution graph are based on dependencies between the partitions in RDD transformations. Thus, an operation that returns something other than an RDD cannot have any children (actions are "leaves" of the graph). An arbitrarily large set of transformations may be associated with one execution graph.
As soon as an action is called, Spark can no longer add to the graph. The application launches a job including those transformations that were needed to evaluate the final RDD which called the action
Stages
Actions may include one or several transformations, and wide transformations define the breakdown of jobs into stages
Each stage correspond to a shuffle dependency created by a wide transformation in the Spark program. One stage can be though of as the set of computations (tasks) that can each be computed on one executor without communication with other executors or with the driver.
In other words, a new stage begins whenever network communication between workers is required; for instance, in a shuffle
These dependencies that create stage boundaries are called ShuffleDependencies. Shuffles are caused by those wide transformations such as sort
or groupByKey
, which require the data to be redistributed across the partitions. Several transformations with narrow dependencies can be grouped into one stage.
Spark can combine the flatMap
, map
, filter
steps into one stage since none of those transformations requires a shuffle.
Spark keeps track of how an RDD is partitioned, so that it does not need to partition the same RDD by the same partitioner more than once. The same operations on RDDs with known partitioners and RDDs without a known partitioner can result in different stage boundaries, because there is no need to shuffle an RDD with known partition
Because the stage boundaries require communication with the driver, the stages associated with one job generally have to be executed in sequence rather than in parallel.
It is possible to execute stages in parallel if they are used to compute different RDDs that are combined in a downstream transformation such as a join. However, the wide transformations needed to compute one RDD have to be computed in sequence
Tasks
-
A stage consist of tasks. The task is the smallest unit on the execution hierarchy, and each can represent one local computation.
-
All of the tasks in one stage execute the same code on a different piece of the data. One task cannot be executed on more than one executor. However, each executor has a dynamically allocated number of slots for running tasks and may run many tasks concurrently throughout its lifetime. The number of tasks per stage corresponds to the number of partitions in the output RDD of the stage
def simpleSparkProgram(rdd : RDD[Double]): Long = {
//stage1
rdd.filter(_< 1000.0)
.map(x => (x, x) )
//stage2
.groupByKey()
.map{ case(value, groups) => (groups.sum, value)}
//stage 3
.sortByKey()
.count()
}
The stages (blue boxes) are bounded by the shuffle operations groupByKey
and sortByKey
. Each stage consist of several tasks: one for each partition in the result of the RDD transformations (shown as red rectangles), which are executed in parallel
A cluster cannot necessarily run every task in parallel for each stage. Each executor has a number of cores. The number of cores per executor is configured at the application level, buy likely corresponding to the physical cores on cluster.
Spark can run no more tasks at once than the total number of executor cores allocated for the application. We can calculate the number of tasks from the settings from the Spark Conf (total number of executors cores = # of cores per executor x number of executors).
If there are more partitions (and thus more tasks) than the number of slots for running tasks, then the extra tasks will be allocated to the executors as the first round of tasks finish and resources are available.
In most cases, all the tasks for one stage must be completed before the next stage can start. The process of distributing these tasks is done by the TaskScheduler
and varies depending on whether the fair scheduler or FIFO scheduler is used.
One stage can be computed without moving data across the partitions. Within one stage, the tasks are the units of work done for each partition of the data.