Spark: The Definite Guide Notes - pinky884/com.n.learn.samples GitHub Wiki

Chapter 4 Notes: DataFrame vs DataSets

Dataframes are untyped where as Datasets are typed.

  • DataFrames also check the data against the its schema but the checks are done during run time.
  • Datasets check the data against its schema during compile time only. Due to this, datasets are enabled/supported on Java virtual machine(JVM) languages - scala or java and we specify types using case types or java bean classes.

For Spark(in scala), dataframes are nothing but datasets of type 'Row'. The 'Row' type is spark's internal representation of its optimized in memory format which is of high performance for data computation. This format makes for highly specialized and efficient computation because rather than using JVM types, which can cause high garbage-collection and object instantiation costs, Spark can operate on its own internal format without incurring any of those costs.

For spark(in pyspark or R), everything is a dataframe so we always operate in optimized format.

Spark's structured api execution:

Below mentioned are the steps followed during execution

  1. Write Dataframe/SQL code
  2. Submit code to spark via command line or spark-submit
  3. Spark converts the code into logical plan
  4. Logical plan is then converted to physical plan by including optimisations along the way.
  5. Spark executes the physical plan and returns the result.

Logical Planning: The first phase of execution is meant to take user code and convert it into a logical plan The logical plan only represents a set of abstract transformations that do not refer to executors or drivers, it’s purely to convert the user’s set of expressions into the most optimized version. It does this by converting user code into an unresolved logical plan. This plan is unresolved because although your code might be valid, the tables or columns that it refers to might or might not exist. Spark uses the catalog, a repository of all table and DataFrame information, to resolve columns and tables in the analyzer. The analyzer might reject the unresolved logical plan if the required table or column name does not exist in the catalog. If the analyzer can resolve it, the result is passed through the Catalyst Optimizer, a collection of rules that attempt to optimize the logical plan by pushing down predicates or selections. Packages can extend the Catalyst to include their own rules for domain-specific optimizations.

Physical planning: After successfully creating an optimized logical plan, Spark then begins the physical planning process. The physical plan, often called a Spark plan, specifies how the logical plan will execute on the cluster by generating different physical execution strategies and comparing them through a cost model.After successfully creating an optimized logical plan, Spark then begins the physical planning process. The physical plan, often called a Spark plan, specifies how the logical plan will execute on the cluster by generating different physical execution strategies and comparing them through a cost model.

Finally the RDD's and transformations are performed and the results are returned.

Chapter 5- Basic Structured operations Notes

DataFrame Transformations:

  1. Add a column
  2. Remove a column
  3. Change the order of columns
  4. Rename a column

DF Select example: df.select("DEST_COUNTRY_NAME") --> Where DEST_COUNTRY_NAME is a column of dataframe.

Select and Expression: from pyspark.sql.functions import expr, col, colum Few examples:

df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME")
# in Python
df.selectExpr(
  "*", # all original columns
  "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")
# in Python
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))")
The above equivalent in sql is given below:
SELECT avg(count), count(distinct(DEST_COUNTRY_NAME)) FROM dfTable

Converting to Spark Literals:

from pyspark.sql.functions import lit df.select(expr("*"), lit(1).alias("One"))

Adding Columns:

df.withColumn("numberOne", lit(1)) df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))

The equivalent of sql is - SELECT *, 1 as numberOne FROM dfTable

Renaming Column:

renames column name from DEST_COUNTRY_NAME to dest. df.withColumnRenamed("DEST_COUNTRY_NAME", "dest")

selecting columns with long names or special characters:

Spark uses ` to escape space or any other special characters in column name. Ex in python: below example the dataframe column name is This Long Column-Name

dfWithLongColName.selectExpr( "This Long Column-Name", "This Long Column-Nameasnew col")\ .show(2)

Case Sensitivity

By default, spark is case sensitive. To make it case insensitive, following needs to be done set spark.sql.caseSensitive true

Removing Columns:

df.drop("ORIGIN_COUNTRY_NAME")

Changing a Column’s Type (cast):

df.withColumn("count2", col("count").cast("long"))

Row Filterting examples:

df.filter(col("count") < 2).show(2) df.where("count < 2").show(2)

Getting unique rows:

df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

Concatenating and Appending Rows (Union)

Dataframes are immutable ie they cannot be modified. So in order add data to a dataframe you will have to concatenate with other dataframe using union operation. In order to perform union operation on 2 dataframes, the no of columns and schema must be same for both the dataframes. To more explicitly specify sort direction, you need to use the asc and desc functions if operating on a column. These allow you to specify the order in which a given column should be sorted: from pyspark.sql.functions import desc,asc df.orderBy(expr("count desc")).show(2) df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)