Exam Notes - ScalaRocks/SparkHacks GitHub Wiki
Exam The exam has a duration of approximately 180 minutes and mine had 20 multiple-choice questions and 19 coding challenges. Since the exam is done within the Databricks platform, it's important that you become familiar with their environment.
You will have access to the Databricks, Spark and Scala (or Python) documentations, but remember that you only have 180 minutes, thus knowing beforehand where each class / function resides it's crucial to finish the assessment on time.
The assessment is composed of 4 main notebooks:
Getting started - contains some basic examination information Tutorial - briefly shows how to use the Databricks platform (how to attach a cluster, run a cell on a notebook, run all the notebook cells, etc.) Multiple-choice questions - page with 20 links pointing to each of the multiple-choice question. Each question is a separate notebook. Coding challenges - same as previous Multiple-choice questions range from very easy (e.g which of the following is a narrow transformation) to quite hard (e.g about Catalyst optimizer or Tungsten row format), but I would say the most part of questions are medium level ones.
Some questions may cover:
identify actions and transformations differentiate between narrow and wide transformations know how caching works and Spark's LRU partition eviction know how to increase or decrease the partition number, without or without shuffling reason about the pros and cons, GC, execution time and OOM errors when there is data skew in multiple setups, e.g it is better to have a setup with 1 executor with 100GB of RAM and 20 cores or 10 executors with 10GB of RAM and 2 cores each, etc. know how shuffle works know what is a DAG and Spark's units of execution: Job, Stage and Task know the relation between the RDD partition count and the number of spawned Tasks All the coding challenges notebooks were composed by 4 or 5 cells in total:
Description Environment setup cell - may execute some bash script that prepares the environment Data format cell - just a cell with display(df) so that you can check the structure of the DataFrame to be transformed (present in some exercises) Coding cell - skeleton of the to-be-implemented function that usually receives a DataFrame and may return one or multiple results, e.g DataFrame or a tuple with a DataFrame and its StructType schema. Result cell - surprisingly you can check if the previously implemented function passes the basic tests, resulting in a table with PASSED or FAILED column value. This cell has a description with something like "Run this to check if you are on track", however there are no guarantees that the same tests will be used for the final evaluation. Since the Environment setup cell may take some time to execute, I would recommend to run this cell before reading the Description.
Be careful with the imports, since none of them are included! Just memorize all the important imports like import org.apache.spark.sql.functions._ or import spark.implicits._ over the SparkSession.
Unfortunately the Databricks platform was getting slower with each notebook I opened and I was afraid of refreshing or closing the browser with the risk of somehow ending the assessment session. It got to the point of writing the whole import org.apache.spark.sql.functions._ and waiting some additional 5 seconds until the written text appeared. Almost after implementing all the coding challenges I finally refreshed the tab, cause it was just impossible to continue, and the platform became normal again.
Also the copy paste was not working. I was unable to paste anything from the documentation tabs.
I waited approximately 3 business days until I received the grading result. Expect at least 72 hours considering business days.
Cheat Sheet This section contains all the topics included in the certification page. As you can see I've already completed some code blocks and others are marked with // TODO. You can use this section as a studying guide / cheat sheet. Feel free to contribute to the remaining topics :)
The dataset is available here.
Good luck on your preparation!!!
Spark Architecture Components Candidates are expected to be familiar with the following architectural components and their relationship to each other:
Driver Executor Cores/Slots/Threads Partitions Spark Execution Candidates are expected to be familiar with Sparkβs execution model and the breakdown between the different elements:
Jobs Stages Tasks Spark Concepts Candidates are expected to be familiar with the following concepts:
Caching Shuffling Partitioning Wide vs. Narrow Transformations DataFrame Transformations vs. Actions vs. Operations High-level Cluster Configuration DataFrames API SparkContext Candidates are expected to know how to use the SparkContext to control basic configuration settings such as spark.sql.shuffle.partitions.
// TODO SparkSession Candidates are expected to know how to:
Create a DataFrame/Dataset from a collection (e.g. list or set) import org.apache.spark.sql._ import org.apache.spark.sql.types._
val schema = StructType( StructField("id", StringType) :: StructField("value", IntegerType) :: Nil )
val data = Seq( Row("id1", 1), Row("id2", 2), Row("id3", 3) )
val df = spark.createDataFrame( spark.sparkContext.parallelize(data), schema )
// Or using .toDF(...) import spark.implicits._
val df2 = Seq( ("id1", 1), ("id2", 2), ("id3", 3) ).toDF("id", "value")
// Or using Datasets case class Record(id: String, value: Int)
val ds = Seq( Record("id1", 1), Record("id2", 2), Record("id3", 3) ).toDS() Create a DataFrame for a range of numbers val ds = spark.range(100).toDF("number") Access the DataFrameReaders // See https://spark.apache.org/docs/2.4.0/api/scala/index.html#org.apache.spark.sql.DataFrameReader
spark.read.csv(...) spark.read.jdbc(...) spark.read.json(...) spark.read.orc(...) spark.read.parquet(...) spark.read.table(...) spark.read.text(...) spark.read.textFile(...) Register User Defined Functions - UDFs // See https://docs.databricks.com/spark/latest/spark-sql/udf-scala.html
// Use in Spark SQL val squared = (s: Long) => s * s
spark.udf.register("square", squared)
spark.range(1, 20).registerTempTable("test")
val df = spark.sql("select id, square(id) as id_squared from test") df.show()
// Use with DataFrames import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
val df = spark.range(1, 20).select(col("id"), squared(col("id")) as "id_squared") df.show() DataFrameReader Candidates are expected to know how to:
Read data for the "core" data formats (CSV, JSON, JDBC, ORC, Parquet, text and tables) // See // https://spark.apache.org/docs/2.4.0/api/scala/index.html#org.apache.spark.sql.DataFrameReader // https://spark.apache.org/docs/2.4.0/sql-data-sources-load-save-functions.html
// CSV val csvDF = spark.read .option("header", true) .option("inferSchema", true) .csv("data/flight-data/csv/2015-summary.csv") csvDF.show()
// CSV alternative way val csvDS = spark.createDataset(Seq( """DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count""", """United States,Romania,15""", """United States,Croatia,1""" )) val csvDF = spark.read .option("header", true) .csv(csvDS) csvDF.show()
// JSON val jsonDF = spark.read.json("data/flight-data/json/2015-summary.json") jsonDF.show()
// JSON alternative way val jsonDS = spark.createDataset(Seq( """{"ORIGIN_COUNTRY_NAME":"Romania","DEST_COUNTRY_NAME":"United States","count":15}""", """{"ORIGIN_COUNTRY_NAME":"Croatia","DEST_COUNTRY_NAME":"United States","count":1}""" )) val jsonDF = spark.read.json(jsonDS) jsonDF.show()
// JSON handle malformed records val jsonDS = spark.createDataset(Seq( """{"id":1, "name":"bob"}""", """{"id":2, "name":"tom"}""", """{"id":3, "name":broken}""", """{"id":4, "name":"derrick"}""" )) val jsonDF = spark.read .schema("id INT, name STRING, _corrupt_record STRING") .json(jsonDS) jsonDF.show()
/* Default read mode "Sets all fields to null when it encounters a corrupted record and places all corrupted records in a string column called _corrupt_record"
+----+-------+--------------------+ | id| name| _corrupt_record| +----+-------+--------------------+ | 1| bob| null| | 2| tom| null| |null| null|{"id":3, "name":b...| | 4|derrick| null| +----+-------+--------------------+
Other read modes: dropMalformed => Drops the row that contains malformed records failFast => Fails immediately upon encountering malformed records */
val jsonDF = spark.read .schema("id INT, name STRING, _corrupt_record STRING") .option("mode", "dropMalformed") .json(jsonDS) jsonDF.show()
// JDBC val jdbcDF = spark.read .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .jdbc()
// JDBC alternative way val connectionProperties = new Properties() connectionProperties.put("user", "username") connectionProperties.put("password", "password") connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Parquet val parquetDF = spark.read.parquet("data/flight-data/parquet/2010-summary.parquet") parquetDF.show()
// Text val textDF = spark.read.text("data/flight-data/csv/2015-summary.csv") // Returns a DataFrame textDF.show()
val textDS = spark.read.textFile("data/flight-data/csv/2015-summary.csv") // Returns a Dataset textDS.show()
// Table val tableDF = spark.read.table("some_table") How to configure options for specific formats // See above How to read data from non-core formats using format() and load() val csvDF = spark.read.format("csv") .option("header", true) .option("inferSchema", true) .load("data/2015-summary.csv") csvDF.show() How to specify a DDL-formatted schema spark.read.schema("a INT, b STRING, c DOUBLE").csv("...") How to construct and specify a schema using the StructType classes val schema = StructType( StructField("id", StringType) :: StructField("value", IntegerType) :: Nil ) DataFrameWriter Candidates are expected to know how to:
Write data to the "core" data formats (csv, json, jdbc, orc, parquet, text and tables) // See // https://spark.apache.org/docs/2.4.0/sql-data-sources-load-save-functions.html
// JDBC jdbcDF.write .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .jdbc()
// JDBC alternative way val connectionProperties = new Properties() connectionProperties.put("user", "username") connectionProperties.put("password", "password")
jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// TODO write to other data formats Overwriting existing files val textDF = spark.read.text("data/flight-data/csv/2015-summary.csv") textDF.write.mode("overwrite").save("/tmp/deleteme")
// Other options are: // overwrite: overwrite the existing data. // append: append the data. // ignore: ignore the operation (i.e. no-op). // error or errorifexists: default option, throw an exception at runtime. How to configure options for specific formats // See above How to write a data source to 1 single file or N separate files val df = spark.read.parquet("data/flight-data/parquet/2010-summary.parquet")
// Creates a single /tmp/output/part-00000... file // Should only be used if the data fits in a single partition df.repartition(1).write.save("/tmp/output") How to write partitioned data val df = spark.read.parquet("data/flight-data/parquet/2010-summary.parquet")
df.write.partitionBy("ORIGIN_COUNTRY_NAME").save("/tmp/output") How to bucket data by a given set of columns // See https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-bucketing.html
val df = spark.read.parquet("data/flight-data/parquet/2010-summary.parquet")
df.write .bucketBy(6, "ORIGIN_COUNTRY_NAME") .sortBy("DEST_COUNTRY_NAME") .saveAsTable("test") DataFrame Have a working understanding of every action such as take(), collect(), and foreach() // Actions def collect(): Array[T] def collectAsList(): List[T] def count(): Long def first(): T def foreach(f: (T) β Unit): Unit def foreachPartition(f: (Iterator[T]) β Unit): Unit def head(): T def head(n: Int): Array[T] def reduce(func: (T, T) β T): T def take(n: Int): Array[T] def takeAsList(n: Int): List[T] def toLocalIterator(): Iterator[T] Have a working understanding of the various transformations and how they work such as producing a distinct set, filtering data, repartitioning and coalescing, performing joins and unions as well as producing aggregates. // Typed transformations def alias(alias: String): Dataset[T] def as(alias: Symbol): Dataset[T] def coalesce(numPartitions: Int): Dataset[T] def distinct(): Dataset[T] def dropDuplicates(): Dataset[T] def dropDuplicates(col1: String, cols: String*): Dataset[T] def except(other: Dataset[T]): Dataset[T] def exceptAll(other: Dataset[T]): Dataset[T] def filter(func: (T) β Boolean): Dataset[T] def filter(conditionExpr: String): Dataset[T] def filter(condition: Column): Dataset[T] def flatMap[U](func: (T) β TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U] def groupByKey[K](func: (T) β K)(implicit arg0: Encoder[K]): KeyValueGroupedDataset[K, T] def intersect(other: Dataset[T]): Dataset[T] def intersectAll(other: Dataset[T]): Dataset[T] def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] def joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] def limit(n: Int): Dataset[T] def map[U](func: (T) β U)(implicit arg0: Encoder[U]): Dataset[U] def mapPartitions[U](func: (Iterator[T]) β Iterator[U])(implicit arg0: Encoder[U]): Dataset[U] def orderBy(sortExprs: Column*): Dataset[T] def orderBy(sortCol: String, sortCols: String*): Dataset[T] def randomSplit(weights: Array[Double]): Array[Dataset[T]] def randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]] def randomSplitAsList(weights: Array[Double], seed: Long): List[Dataset[T]] def repartition(partitionExprs: Column*): Dataset[T] def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] def repartition(numPartitions: Int): Dataset[T] def repartitionByRange(partitionExprs: Column*): Dataset[T] def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]
def sort(sortExprs: Column*): Dataset[T] def sort(sortCol: String, sortCols: String*): Dataset[T] def sortWithinPartitions(sortExprs: Column*): Dataset[T] def sortWithinPartitions(sortCol: String, sortCols: String*): Dataset[T] def transform[U](t: (Dataset[T]) β Dataset[U]): Dataset[U] def union(other: Dataset[T]): Dataset[T] def unionByName(other: Dataset[T]): Dataset[T] def where(conditionExpr: String): Dataset[T] def where(condition: Column): Dataset[T] Know how to cache data, specifically to disk, memory or both spark.catalog.cacheTable("tableName")
// Persist this Dataset/DataFrame with the default storage level (MEMORY_AND_DISK) ds.cache() ds.count() // Because cache are lazily evaluated
import org.apache.spark.storage.StorageLevel
ds.persist(StorageLevel.MEMORY_AND_DISK_SER) ds.count() // Because cache are lazily evaluated
/* Possible Storage Formats: NONE DISK_ONLY DISK_ONLY_2 MEMORY_ONLY MEMORY_ONLY_2 MEMORY_ONLY_SER MEMORY_ONLY_SER_2 MEMORY_AND_DISK MEMORY_AND_DISK_2 MEMORY_AND_DISK_SER MEMORY_AND_DISK_SER_2 OFF_HEAP */ Know how to uncache previously cached data spark.catalog.uncacheTable("tableName")
ds.unpersist() Converting a DataFrame to a global or temp view // Global temporary view is cross-session. // Its lifetime is the lifetime of the Spark application, // i.e. it will be automatically dropped when the application terminates df1.createGlobalTempView("temp1")
// Local temporary view is session-scoped. // Its lifetime is the lifetime of the session that created it, // i.e. it will be automatically dropped when the session terminates df2.createTempView("temp2") Applying hints import org.apache.spark.sql.functions.broadcast
val df1 = spark.table("table1") val df2 = spark.table("table2")
// Broadcast Hash Join df1.join(broadcast(df2), "key")
// Or df1.join(df2.hint("broadcast")) Row & Column Candidates are expected to know how to work with row and columns to successfully extract data from a DataFrame Spark SQL Functions When instructed what to do, candidates are expected to be able to employ the multitude of Spark SQL functions. Examples include, but are not limited to:
Aggregate functions: getting the first or last item from an array or computing the min and max values of a column. // TODO Collection functions: testing if an array contains a value, exploding or flattening data. // TODO Date time functions: parsing strings into timestamps or formatting timestamps into strings // TODO Math functions: computing the cosign, floor or log of a number // See: // https://spark.apache.org/docs/2.4.0/api/scala/index.html#org.apache.spark.sql.functions$ // https://spark.apache.org/docs/2.4.0/api/sql/
// See rounding modes: https://docs.oracle.com/javase/7/docs/api/java/math/RoundingMode.html def bround(e: Column, scale: Int): Column // Uses HALF_EVEN, which rounds up if the number is odd and down if even def round(e: Column, scale: Int): Column // Uses HALF_UP def ceil(columnName: String): Column def floor(columnName: String): Column Misc functions: converting a value to crc32, md5, sha1 or sha2 // TODO Non-aggregate functions: creating an array, testing if a column is null, not-null, nan, etc // TODO Sorting functions: sorting data in descending order, ascending order, and sorting with proper null handling import spark.implicits._ import org.apache.spark.sql.functions._
val df = Seq[(Int, Integer)]( (1, 100), (2, 200), (3, null), (4, 50) ).toDF("row", "value")
df.sort(
df.sortWithinPartitions($"value".desc_nulls_last).show() String functions: applying a provided regular expression, trimming string and extracting substrings. // TODO UDF functions: employing a UDF function. // See: // https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-udfs.html // https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-udfs-blackbox.html
// It is always better to perform the logic using the existing functions on Columns val dateDiff: (Column, Column) => Column = (x, y) => { datediff(to_date(y), to_date(x)) } df = df.withColumn("date_diff", dateDiff(col("start_date"), col("end_date")))
val df = Seq( (0, "hello"), (1, "world") ).toDF("id", "text")
// Define a UDF that wraps the upper Scala function import org.apache.spark.sql.functions.udf val upperUDF = udf((x: String) => if(x != null) x.toUpperCase else null)
// Apply the UDF to change the source dataset df.withColumn("upper", upperUDF('text)).show() +---+-----+-----+ | id| text|upper| +---+-----+-----+ | 0|hello|HELLO| | 1|world|WORLD| +---+-----+-----+
// Alternativaly register de UDFs to use in SQL-based query expressions spark.udf.register("myUpper", (x: String) => if(x != null) x.toUpperCase else null)
// Check all the standard and user-defined functions with listFunctions over the Catalog spark.catalog.listFunctions.filter('name like "%upper%").show(false) +-----+--------+-----------+-----------------------------------------------+-----------+ |name |database|description|className |isTemporary| +-----+--------+-----------+-----------------------------------------------+-----------+ |upper|null |null |org.apache.spark.sql.catalyst.expressions.Upper|true | +-----+--------+-----------+-----------------------------------------------+-----------+
// Register the DF to be used in plain SQL df.createTempView("tempTable")
// Use the registered UDF spark.sql("SELECT id, text, myUpper(text) as upper FROM tempTable").show() Window functions: computing the rank or dense rank. // TODO Other Working with missing data in DataFrames types: by using the DataFrameNaFunctions class import spark.implicits._
val df = Seq( (1, null), (2, "lol") ).toDF("row","value")
df.show() +---+-----+ |row|value| +---+-----+ | 1| null| | 2| lol| +---+-----+
// Drop rows with null values df.na.drop(Seq("value")).show() +---+-----+ |row|value| +---+-----+ | 2| lol| +---+-----+
// Replace null values from all string type columns df.na.fill("lolada").show() +---+------+ |row| value| +---+------+ | 1|lolada| | 2| lol| +---+------+
// Replace null values in specified columns df.na.fill("asd", Seq("value")).show() +---+-----+ |row|value| +---+-----+ | 1| asd| | 2| lol| +---+-----+
// Replace null values in specified columns df.na.fill(Map("value" -> "lolada")).show() +---+------+ |row| value| +---+------+ | 1|lolada| | 2| lol| +---+------+
// Replace values in specified columns df.na.replace(Seq("value"), Map("lol" -> "lolada")).show() +---+------+ |row| value| +---+------+ | 1| null| | 2|lolada| +---+------+ File system utility dbutils: see Databricks Utilities dbutils.fs.rm("/tmp/dataframe_sample.csv", true) dbutils.fs.put("/tmp/dataframe_sample.csv", """ id|end_date|start_date|location 1|2015-10-14 00:00:00|2015-09-14 00:00:00|CA-SF 2|2015-10-15 01:00:20|2015-08-14 00:00:00|CA-SD 3|2015-10-16 02:30:00|2015-01-14 00:00:00|NY-NY 4|2015-10-17 03:00:20|2015-02-14 00:00:00|NY-NY 5|2015-10-18 04:30:00|2014-04-14 00:00:00|CA-LA """, true)
// List all Databricks datasets display(dbutils.fs.ls("/databricks-datasets")) Convert DataFrame to JSON using the Dataset's toJSON method val df = Seq( (1, null, 1.23, 66), (2, "lol", 3.45, 77), (3, "strings", 5.6789, 88), (4, "another", 0.12345, 99) ).toDF("row","str_alue", "dbl_Value", "int_value")
df.toJSON.show(false) +-----------------------------------------------------------------+ |value | +-----------------------------------------------------------------+ |{"row":1,"dbl_Value":1.23,"int_value":66} | |{"row":2,"str_alue":"lol","dbl_Value":3.45,"int_value":77} | |{"row":3,"str_alue":"strings","dbl_Value":5.6789,"int_value":88} | |{"row":4,"str_alue":"another","dbl_Value":0.12345,"int_value":99}| +-----------------------------------------------------------------+ Manage Databases and Tables // See https://docs.databricks.com/user-guide/tables.html
// Update the table metadata refresh table Create a DataFrame from an external file %sh
// The file will actually be saved to file:/databricks/driver/flight_data.parquet wget -O flight_data.csv https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/flight-data/csv/2010-summary.csv