Assignment ‐ Joins and Broadcast - datacouch-io/spark-java GitHub Wiki
Welcome to the DataFrame Transformations Lab! In this lab, we will dive deeper into DataFrame transformations, specifically focusing on joins and broadcast optimizations in Apache Spark.
This lab is designed to provide you with a hands-on experience with more complex queries, including alternatives using joins. We will also explore how Catalyst's broadcast optimizations work during joins and assess the performance impact of these optimizations.
Prerequisite: This lab assumes you have a basic understanding of Apache Spark and DataFrames.
This lab is expected to take approximately 30-40 minutes to complete.
By the end of this lab, you should be able to:
- Perform more complex DataFrame transformations using joins.
- Understand and utilize Catalyst's broadcast optimizations for improved performance.
- Compare the performance of joins with and without broadcast optimizations.
We want to find out the 20 contributors who have the most entries in the github data. Let's first do this in a straightforward way, as done in an earlier lab.
Tasks
- Find the top 20 contributors as follows.
// Perform the query: group by "actor.login", count, order by "count" in descending order, and limit to 20 rows Dataset<Row> scanQuery = githubDF.groupBy("actor.login").count() .orderBy("count").limit(20); // Show the result of the query scanQuery.show();
- Open the Web UI (localhost:4040) and look at the jobs tab.
- Note the execution time (0.6s on our system)
- Drill down on this job, and note the shuffle data (333KB on our system)
We've decided that we want to track 20 specific contirbutors (the top 20 contributors from the previous month), instead of the top 20 contributors at any given moment. We keep the login values of these contributors-of-interest in another data file we supply (spark-labs/data/github-top20.json
)
Write a join query that gives the actor login and count from the entries in github.json for the login values in github-top20.json.
Tasks(Assignment)
- Review the data in github-top20.json.
- Accomplish our query needs by:
- Loading github-top20.json into a dataframe (githubTop20DF)
- Joining githubDF and githubTop20DF in topContributorsJoinedDF
- Grouping by actor.login
- Counting the results, and ordering by descending count.
Let's review the performance of this join query, and the optimizations that are being done by Catalyst.
Tasks
-
Open a browser window on the Web UI - localhost:4040.
- Note the time taken for the last show performed above, drill through on the job for it, and note the shu#e data.
- On our system it took 0.6s and shuffled 7.9K of data.
- This is much less data shuffling than the scanQuery done earlier (40 times reduction).
-
Execute topContributorsJoinedDF.explain to see what is going on.
- We illustrate this below. Note the broadcasts which we have highlighted.
- Catalyst has optimized the join to use broadcast.
Let's disable the Catalyst broadcast optimization and look at what happens.
Tasks:
-
Disable automatic broadcasting, by seting the appropriate configuration property as shown below.
-
Next, create and show the topContributorsJoinedDF query again
-
You must create the DataFrame after setting the config property, to make sure it conforms to the new setting.
import org.apache.spark.sql.SparkSession; public class SparkConfigExample { public static void main(String[] args) { SparkSession spark = SparkSession.builder() .appName("SparkConfigExample") .config("spark.sql.autoBroadcastJoinThreshold", -1) .getOrCreate(); // Your Spark application logic goes here... spark.stop(); } }
-
-
Observe the execution time and shuffled data. This is almost 100 times the amount of shuffle data!
-
Renable automatic broadcasting, by seting the appropriate configuration property as shown below.
SparkConf conf = new SparkConf().setAppName("YourAppName"); conf.set("spark.sql.autoBroadcastJoinThreshold", String.valueOf(1024 * 1024 * 10)); SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
Joins are powerful tools, but distributed joins can be expensive in terms of amount of data shu#ed. We've worked with them here, and seen how Catalyst can optimize by broadcasting a small data set.
Note that we've used a common strategy here - we've replaced a query that was resource intensive (our straight scan query), with a di!erent query (querying on a specific group of logins that are provided). The new query is not exactly equivalent, but fills our needs, and is much less resource intensive.