Seeing Catalyst at Work - datacouch-io/spark-java GitHub Wiki
Welcome to the "Seeing Catalyst at Work" lab. In this lab, we will explore several data transformations and delve into the fascinating world of Catalyst, the query optimizer used in Apache Spark. Catalyst is responsible for optimizing query plans in Spark, making your data processing tasks more efficient and performant.
This lab builds upon the knowledge gained in previous labs, particularly those focused on data transformations. We will apply various transformations to our data and closely examine the optimizations that Catalyst performs under the hood.
The estimated runtime for this lab is approximately 20-30 minutes. The actual time may vary based on your familiarity with Spark and the complexity of the transformations.
Before starting this lab, ensure that you have completed the previous labs related to data transformations. Basic knowledge of Spark's DataFrame API and transformations will be helpful.
In this lab, we will perform a series of transformations on DataFrames and analyze the optimizations carried out by Catalyst. The tasks will help you gain insights into how Spark optimizes query plans for better performance. Let's dive into the tasks.
We'll first work with the Wikimedia view data, and see how Catalyst helps to optimize queries involving filtering.
Tasks
You've already worked with the data file (spark-labs/data/wiki-pageviews.txt
). In previous labs you should already have done the below. If you haven't, then do it now.
- Loaded the data, and then split on whitespace.
- Create a new dataframe with a finer-grained schema.
- We illustrate doing that below.
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions.*;
public class SparkJavaExample {
public static void main(String[] args) {
// Create a SparkSession
SparkSession spark = SparkSession.builder()
.appName("SparkJavaExample")
.getOrCreate();
// Load the file into Spark
Dataset<Row> viewsDF = spark.read().text("spark-labs/data/wiki-pageviews.txt");
// Split on whitespace
Dataset<Row> splitViewsDF = viewsDF.select(split(viewsDF.col("value"), "\\s+").alias("splitLine"));
// Use a better schema
Dataset<Row> viewsWithSchemaDF = splitViewsDF.select(
splitViewsDF.col("splitLine").getItem(0).alias("domain"),
splitViewsDF.col("splitLine").getItem(1).alias("pageName"),
splitViewsDF.col("splitLine").getItem(2).cast("integer").alias("viewCount"),
splitViewsDF.col("splitLine").getItem(3).cast("long").alias("size")
);
// Show the resulting DataFrame
viewsWithSchemaDF.show();
// Stop the SparkSession
spark.stop();
}
}
Tasks
- First, write a transformation to order viewsWithSchemaDF by viewCount .
- explain the above transformation.
- Note that there is shuffling involved (an Exchange ).
- It needs to shuffle data to sort the rows.
- Next, filter viewsWithSchemaDF so you only view rows where the domain starts with "en".
- Put the filter at the end of the transformation chain in your code (after the ordering).
- You can use the following on the domain column to accomplish this.
- Python: startswith("en") (Lowercase w)
- explain this transformation.
- You should see the filtering happening before the shuffle for ordering.
- Catalyst has pushed the filter down to improve efficiency.
- View the steps that Catalyst took with the detailed version of explain()
- Optionally, try the transformation with the filter placed before the ordering.
- It should give you exactly the same plan.