Interview Questions - ignacio-alorre/Spark GitHub Wiki

How to Repartition by a Column name? What is the default number of partitions it will create?

Using the method repartition(), this will create a minimum of 200 partitions by default. Example below will produce 3 partitions with data and 197 empty partitions

df = spark.createDataFrame([(11, "Deepak", 5), (12, "Leena", 1), (13, "Deepak", 7), (14, "Ram", 9)], ["id", "name", "bday"])

partitionedDf = df.repartition("name")

partitionedDf.rdd.getNumPartitions()

What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism?

  • spark.sql.shuffle.partitions configures the number of partitions that are used when shuffling data for joins or aggregations. Default value is 200.

  • spark.default.parallelism is the default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set explicitly by the user. Default value is 8.

How to Create DataFrame With & Without Schema specified in Spark?

Without Schema: Use the .createDataFrame method and pass the list. Spark will create the default schema

With Schema: Use the .createDataFrame method and pass the list as the first parameter & schemas as the second parameter

How many Spark Jobs will be generated with the below code & Why?

csvSchema = StructType([
  StructField("timestamp", StringType(), False),
  StructField("site", StringType(), False),
  StructField("requests", IntegerType(), False)
])

csvFile = "example.tsv"

csvDF = (
  spark.read
  .option("header", "true")
  .option("sep", "\t")
  .schema(csvSchema)
  .csv(csvFile)
)

Ans: 0. In thi case Spark doesn't need to touch the data physically in the storage layer, because it has all the relevant information to create the dataframe

How many Spark Jobs will be generated with the below code & Why?

csvSchema = StructType([
  StructField("timestamp", StringType(), False),
  StructField("site", StringType(), False),
  StructField("requests", IntegerType(), False)
])

csvFile = "example.tsv"

csvDF = (
  spark.read
  .option("header", "true")
  .option("sep", "\t")
  .option("inferSchema", "true")
  .csv(csvFile)
)

Ans: 2. Spark needs to touch the data physically 2 times. First to get the number of columns & second to scan the full csv file to get the data type.

How to get the COUNT of number of records in each Partition of a Dataframe in Spark?

Step 1: Import spark_partition_id

Step 2: Create a column name via .withColumn() method and assign spark_partition_id() as value

Step 3: Apply groupBy() and count() method to get the number of records in each partition

from pyspark.sql.functions import spark_partition_id

CountPerPartition = csvDF.withcolumn("partitionId", spark_partition_id()).groupBy("partitionId").count()

What is Exclusion Mechanism is Spark? How Databricks has improve it?

The exclusion mechanism was introduced for task scheduling in Apache Spark 2.2.0 (as “blacklisting”). The motivation for having exclusion is to enhance fault tolerance in Spark, especially against the following problematic scenario:

  1. In a cluster with hundreds or thousands of nodes, there is a decent probability that executor failures happen on one of the nodes during a long-running Spark application and this can lead to task failure.

  2. When a task failure happens, there is a **high probability that the scheduler will reschedule the task to the same node and same executor because of locality considerations. Now, the task will fail again.

  3. After failing #spark.task.maxFailures number of times on the same task, the Spark job would be aborted.

New features introduced in Databricks Runtime 7.3

#Feature1: by adding a configuration called spark.databricks.blacklist.decommissionNode.enabled. If spark.databricks.blacklist.decommissionNode.enabled is set to true, when a node is excluded on the application level, it will be decommissioned, and a new node would be launched to keep the cluster to its desired size.

#Feature2: By tuning spark.blacklist.application.blacklistedNodeThreshold

Source: