Joins - ignacio-alorre/Spark GitHub Wiki
Core Spark Joins
Joins in general are expensive since they require that corresponding keys from each RDD are located at the same partition so that they can be combined locally. If the RDDs do not have known partitioners, they will need to be shuffled so that both RDDs share a partitioner, and data with the same keys lives in the same partitions
If they have the same partitioner, the data may be colocated as in the figure below, to avoid network transfer.
Regardless of whether the partitioners are the same, if one (or both) of the RDDs have a known partitioner only a narrow dependency is created, as in figure below.
As with most key/value operations, the cost of the join increases with the number of keys and the distance the records have to travel in order to get to their correct partition
-
Two RDDs will be colocated if they have the same partitioner and were shuffled as part of the same action
-
Core Spark joins are implemented using the
cogroup
function.
Choosing a Join Type
-
The default join operation in Spark includes only values for keys present in both RDDs, and in the case of multiple values per key, provides all permutations of the key/value pair.
-
The best scenario for a standard join is when both RDDs contain the same set of distinct keys. With duplicate keys, the size of the data may expand dramatically causing performance issues, and if one key is not present in both RDDs you will lose that row of data. Here are a few guidelines:
-
When both RDDs have duplicate keys, the join can cause the size of the data to expand dramatically. It may be better to perform a
distinct
orcombineByKey
operation to reduce the key space or to usecogroup
to handle duplicate keys instead of producing the full cross product. By using smart partitioning during the combine step, it is possible to prevent a second shuffle in the join. -
If keys are not present in both RDDs you risk losing your data unexpectedly. It can be safer to use an outer join, so that you are guaranteed to keep all the data in either the left or the right RDD, then filter the data after the join.
-
If one RDD has some easy-to-define subset of the keys, in the other you may be better off filtering or reducing before the join to avoid a big shuffle of data, which will ultimately throw away anyway
-
-
Join is one of the most expensive operations you will commonly use in Spark, so it is worth doing what you can to shrink your data before performing a join
For example, suppose you have one RDD with some data in the form (id
, score
) and another RDD with (id
, address
), and you want to have some id with an email and its best score. You can then join the RDDs on id
and then compute the best score for each address.
//Basic RDD join
def joinScoresWithAddress1( scoreRDD : RDD[(Long, Double)],
addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, String))]= {
val joinedRDD = scoreRDD.join(addressRDD)
joinedRDD.reduceByKey( (x, y) => if(x._1 > y._1) x else y )
}
This approach is not as fast as first reducing the score data, so that the first dataset contains only one row for each id
with the best score, and then joining the data with the address data. Like in the example below:
// Pre-filter before join
def joinScoresWithAddress2(scoreRDD : RDD[(Long, Double)],
addressRDD: RDD[(Long, String)]) : RDD[(Long, (Double, String))]= {
val bestScoreData = scoreRDD.reduceByKey((x, y) => if(x > y) x else y)
bestScoreData.join(addressRDD)
}
It could be possible also to perform a left outer join to keep all keys for processing even those missing in the right RDD by using leftOuterJoin
in place of join
, as in the next example. Spark also has fullOuterJoin
and rightOuterJoin
depending on which records we wish to keep. Any missing values are None
and present values are Some('x')
// Basic RDD left outer join
def outerJoinScoresWithAddress(scoreRDD : RDD[(Long, Double)],
addressRDD: RDD[(Long, String)]) : RDD[(Long, (Double, Option[String]))]= {
val joinedRDD = scoreRDD.leftOuterJoin(addressRDD)
joinedRDD.reduceByKey( (x, y) => if(x._1 > y._1) x else y )
}
Choosing an Execution Plan
-
In order to join data, Spark needs the data that is to be joined (the data based on each key) to live on the same partition. The default implementation of a join in Spark is a shuffled hash join.
-
The shuffled hash join ensures that data on each partition will contain the same keys by partitioning the second dataset with the same default partitioner as the first, so that the keys with the same hash value from both datasets are in the same partition. While this approach always works, it can be more expensive than necessary because it requires a shuffle. The shuffle can be avoided if:
- Both RDDs have a known partitioner
- One of the datasets is small enough to fit in memory, in which case we can do a broadcast hash join
- If the RDDs are colocated, the network transfer can be avoided, along with the shuffle.
Speeding up joins by assigning a known partitioner
-
If you have to do an operation before the join that requires a shuffle, such as
aggregateByKey
orreduceByKey
, you can prevent the shuffle by adding a hash partitioner with the same number of partitions as an explicit argument to the first operation and persisting the RDD before the join. -
You could reproduce the example in the previous section even faster by using the partitioner for the address data as an argument for the
reduceByKey
step as in the example below:
// Known partitioner join
def joinScoresWithAddress3(scoreRDD: RDD[(Long, Double)],
addressRDD: RDD[(Long, String)]) : RDD[(Long, (Double, String))]= {
// If addressRDD has a known partitioner we should use that,
// otherwise it has a default hash parttioner, which we can reconstruct by
// getting the number of partitions.
val addressDataPartitioner = addressRDD.partitioner match {
case (Some(p)) => p
case (None) => new HashPartitioner(addressRDD.partitions.length)
}
val bestScoreData = scoreRDD.reduceByKey(addressDataPartitioner,
(x, y) => if(x > y) x else y)
bestScoreData.join(addressRDD)
}
- Always persist after repartitioning
Speeding up joins using a broadcast hash join
-
A broadcast hash join pushes one of the RDDs (the smaller one) to each of the worker nodes. Then it does a
map-side
combine with each partition of the larger RDD -
If one of your RDD can fit in memory, it is always beneficial to do a broadcast hash join, since it doesn't require a shuffle
-
Sometimes Spark SQL will be smart enough to configure the broadcast join itself; in Spark SQL this is controlled with
spark.sql.autoBroadcastJoinThreshold
andspark.sql.broadcastTimeout
-
Note: Spark Core does not have an implementation of the broadcast hash join. Instead, we can manually implement a version of the broadcast hash join by collecting the smaller RDD to the driver as a map, then broadcasting the result, and using
mapPartitions
to combine the elements. -
The script below is a general function that could be used to join a larger and smaller RDD. Its behavior mirrors the default “join” operation in Spark. We exclude elements whose keys do not appear in both RDDs.
// Manual broadcast hash join
def manualBroadCastHashJoin[K : Ordering : ClassTag, V1 : ClassTag,
V2 : ClassTag](bigRDD : RDD[(K, V1)], smallRDD : RDD[(K, V2)])= {
val smallRDDLocal: Map[K, V2] = smallRDD.collectAsMap()
bigRDD.sparkContext.broadcast(smallRDDLocal)
bigRDD.mapPartitions(iter => {
iter.flatMap{
case (k,v1 ) =>
smallRDDLocal.get(k) match {
case None => Seq.empty[(K, (V1, V2))]
case Some(v2) => Seq((k, (v1, v2)))
}
}
}, preservesPartitioning = true)
}
Partial manual broadcast hash join
-
If all of our smaller RDD will fit into memory, but some keys are so overrepresented in the large dataset that you want to broadcast just the most common keys. This is especially useful if one key is so large that it can’t fit on a single partition.
-
In this case you can use
countByKeyApprox
on the large RDD to get an approximate idea of which keys would most benefit from a broadcast. -
You then filter the smaller RDD for only these keys, collecting the result locally in a HashMap. Using
sc.broadcast
you can broadcast theHashMap
so that each worker only has one copy and manually perform the join against the HashMap. -
Using the same HashMap you can then filter your large RDD down to not include the large number of duplicate keys and perform your standard join, unioning it with the result of your manual join. This approach is quite convoluted but may allow you to handle highly skewed data you couldn’t otherwise process.
Spark SQL Joins
-
Spark SQL supports the same basic join types as core Spark, but the optimizer is able to do more of the heavy lifting for you—although you also give up some of your control. For example, Spark SQL can sometimes push down or reorder operations to make your joins more efficient.
-
On the other hand, you don’t control the partitioner for
DataFrames
orDatasets
, so you can’t manually avoid shuffles as you did with core Spark joins.
DataFrame Joins
-
Joinig data between DataFrames is one of the most common multi-DataFrame transformations. The standard SQL join types are all supported and can be specific as the
joinType
indf.join(otherDf, sqlCondition, joinType)
when performing a join. -
As with joins between RDDs, joining with nonunique keys will result in the cross product (so if the left table has R1 and R2 with key1 and the right table has R3 and R5 with key1, you will get (R1,R3), (R1,R5), (R2, R3), (R2, R5)) in the output
-
While self joins are supported, you must alias the fields you are interested in to different names beforehand, so they can be accessed.
Below are the tables which are going to be used for the following join examples
-
Spark's supported join types are inner, left_outer, left_anti, right_outer, full_outer and left_semi. With the exception of left_semi these join types all join the two tables, but they behave differently when handling rows that do not have keys in both tables
-
The Inner Join is both the default. It requires that the key be present in both tables, or the result is dropped.
//Simple inner join
// Inner join implicit
df1.join(df2, df1("name") === df2("name"))
// Inner join explicit
df1.join(df2, df1("name") === df2("name"), "inner")
- Left Outer Join will produce a table with all of the keys from the left table, and any rows without matching keys in the right table will have null values in the fields that would be populated by the right table. Right outer joins are the same, but with the requirements reversed.
// Left outer join explicit
df1.join(df2, df1("name") === df2("name"), "left_outer")
// Right outer join explicit
df1.join(df2, df1("name") === df2("name"), "right_outer")
- To keep all records from both tables you can use the fill outer join, which results in
-
Left semi joins and left anti joins are the only kinds of joins that only have values from the left table. A left semi join is the same as filtering the left table for only rows with keys present in the right table.
-
The left anti join also only returns data from the left table, but instead only returns records that are not present in the right table
// Left semi join explicit
df1.join(df2, df1("name") === df2("name"), "left_semi")
Self Joins
Self joins are supported on DataFrames, but we end up with duplicated columns names. So that you can access the results, you need to alias the DataFrames to different names. Otherwise you will be unable to select the columns due to name collision. Once you've aliased each DataFrame, in the result you can access the individual columns for each DataFrame with dfName.colName
// Self join
val joined = df.as("a").join(df.as("b")).where($"a.name" === $"b.name")
Broadcast hash joins
- You can see the type of join being performed by calling
queryExecution.executedPlan
- As with core Spark, if one of the tables is much smaller than the other you may want a broadcast hash join. You can hint to Spark SQL that a given DF should be broadcast for join by calling broadcast on the DataFrame before joining it
df1.join(broadcast(df2), "key")
- Spark also automatically uses the
spark.sql.conf.autoBroadcastJoinThreshold
to determine if a table should be broadcast
Dataset Joins
-
Joining Datasets is done with
joinWith
and this behaves similarly to a regular relational join, except the result is a tuple of the different record type as shown below. -
This is somewhat more awkward to work with after the join, but also does make self joins, much easier as you don't need to alias the columns first
// Joining two Datasets
val result: Dataset[(RawPanda, CoffeeShop)] = pandas.joinWith(coffeeShops, $"zip" === $"zip")
// Self Join a Dataset
val result: Dataset[(RawPanda, RawPanda)] = pandas.joinWith(pandas, $"zip" === $"zip")
-
Using a self join and a lit(true), you can produce the cartesian product of your Dataset, which can be useful but also illustrates how joins (especially self joins) can easily result in unworkable data sizes.
-
As with DataFrames you can specify the type of join desired (
inner
,left_outer
,right_outer
,left_semi
)