Spark Joins - datacouch-io/spark-java GitHub Wiki

This documentation provides a detailed overview of Spark Joins, including the different types of joins, how to perform joins using the Spark SQL API, and examples of how to use joins in real-world scenarios.

What is a Join?

A join is a data operation that combines two or more datasets based on a common field between them. Joins are commonly used in data analysis and machine learning to combine data from different sources into a single dataset for analysis.

Types of Joins

There are different types of joins, each with its own specific behavior. The most common types of joins are:

  1. Inner Join:

    • An inner join returns only the rows that have matching values in both datasets. If there's no match for a particular row in one of the datasets, it will be excluded from the result.
  2. Left Join (Left Outer Join):

    • A left join returns all the rows from the left dataset and the matching rows from the right dataset. If there is no match in the right dataset for a row in the left dataset, the result will contain null values for the right dataset.
  3. Right Join (Right Outer Join):

    • Similar to the left join, a right join returns all rows from the right dataset and the matching rows from the left dataset. Rows from the right dataset without a match in the left dataset will have null values for the left dataset.
  4. Full Join (Outer Join):

    • A full join, also known as an outer join, returns all rows when there is a match in either the left or the right dataset. It includes all rows from both datasets, and where there is no match, the result will contain null values.
  5. Cross Join:

    • A cross join, also known as a Cartesian join, combines every row from the first dataset with every row from the second dataset. It results in a Cartesian product of the two datasets, which can be computationally expensive if the datasets are large.
  6. Left Anti Join:

    • A left anti join returns all rows from the left dataset that do not have a matching row in the right dataset. In other words, it provides rows from the left dataset that are not found in the right dataset.
  7. Left Semi Join:

    • A left semi join returns all the rows from the left dataset where there is a match in the right dataset. It doesn't return the actual matching rows from the right dataset, only the rows from the left dataset.
  8. Self Join:

    • A self join is when you join a dataset with itself. This is often used to find relationships within a single dataset. For example, you might use a self join to find all pairs of items in a product catalog that have the same category.

Performing Joins in Spark SQL

To perform a join in Spark SQL, you can use the join() method. The join() method takes two DataFrames as input and returns a new DataFrame that is the result of the join operation.

The following example shows how to perform an inner join on two DataFrames:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions.*;

// Create two DataFrames
Dataset<Row> df1 = spark.createDataFrame(
    Arrays.asList(
        (1, "Alice"),
        (2, "Bob"),
        (3, "Carol")
    ),
    Arrays.asList(
        "id", "name"
    )
);

Dataset<Row> df2 = spark.createDataFrame(
    Arrays.asList(
        (1, "Alice's company"),
        (2, "Bob's company"),
        (4, "David's company")
    ),
    Arrays.asList(
        "id", "company"
    )
);

// Perform an inner join on the two DataFrames
Dataset<Row> joinedDf = df1.join(df2, df1.col("id").equalTo(df2.col("id")));

// Print the joined DataFrame
joinedDf.show();

Output:

+---+----+-------+
| id|name|company|
+---+----+-------+
|  1|Alice|Alice's company|
|  2|  Bob| Bob's company|
+---+----+-------+

Examples of Spark Joins in Real-World Scenarios

Spark joins can be used in a variety of real-world scenarios. For example, you could use a join to:

  • Combine customer data from different sources to create a single customer profile.
  • Combine product data from different sources to create a single product catalog.
  • Combine sensor data from different sources to create a unified view of the state of a system.

Examples

The list of joins provided by Spark SQL is:

Inner Join

An inner join returns only the rows that have matching values in both datasets. Rows that do not have a match in the other dataset are excluded from the result.

Dataset<Row> innerJoinResult = dataset1.join(dataset2, "common_column");

Left / Left Outer Join

A left join returns all rows from the left dataset and the matching rows from the right dataset. If there's no match in the right dataset, the result will contain null values for the right dataset.

Dataset<Row> leftJoinResult = dataset1.join(dataset2, "common_column", "left");

Right / Right Outer Join

A right join is similar to the left join, but it returns all rows from the right dataset and the matching rows from the left dataset. If there's no match in the left dataset, the result will contain null values for the left dataset.

Dataset<Row> rightJoinResult = dataset1.join(dataset2, "common_column", "right");

Outer / Full Join

An outer join, also known as a full join, returns all rows when there is a match in either the left or the right dataset. If there's no match in either dataset, the result will contain null values for the missing dataset.

Dataset<Row> fullJoinResult = dataset1.join(dataset2, "common_column", "outer");

Cross Join

A cross join returns the Cartesian product of rows from both datasets. It's not a common join type and can result in a very large output.

Dataset<Row> crossJoinResult = dataset1.crossJoin(dataset2);

Left Anti Join

A left anti join returns all rows from the left dataset that do not have a matching row in the right dataset.

Dataset<Row> leftAntiJoinResult = dataset1.join(dataset2, "common_column", "left_anti");

Left Semi Join

A left semi join returns all the rows from the left dataset where there is a match in the right dataset but doesn't return the actual matching rows from the right dataset.

javaCopy code

Dataset<Row> leftSemiJoinResult = dataset1.join(dataset2, "common_column", "left_semi");

Self Join

A self-join is when you join a dataset with itself. It can be useful for comparing rows within the same dataset.

Dataset<Row> leftSemiJoinResult = dataset1.join(dataset2, "common_column", "left_semi");

In all the join examples, dataset1 and dataset2 represent the datasets you want to join, and "common_column" is the column used as the join key. The join type is specified as an optional argument, which can be "inner," "left," "right," "outer," "cross," "left_anti," or "left_semi."

These are the common join types available in Spark SQL, and you can choose the one that best fits your data processing requirements.

Lets execute the below code

package com.sparkTutorial.part1dataframe;  
  
import org.apache.log4j.Level;  
import org.apache.log4j.Logger;  
import org.apache.spark.sql.Column;  
import org.apache.spark.sql.Dataset;  
import org.apache.spark.sql.Row;  
import org.apache.spark.sql.SparkSession;  
import static org.apache.spark.sql.functions.expr;  
  
public class Joins {  
  
    public static void main(String[] args) throws Exception {  
  
        Logger.getLogger("org").setLevel(Level.ERROR);  
  SparkSession session = SparkSession.builder().appName("Joins")  
                .master("local[1]").getOrCreate();  
  
  Dataset<Row> guitarsDF = session.read()  
                .option("inferSchema", "true")  
                .json("src/main/resources/data/guitars.json");  
  
  Dataset<Row>  guitaristsDF = session.read()  
                .option("inferSchema", "true")  
                .json("src/main/resources/data/guitarPlayers.json");  
  
  Dataset<Row>  bandsDF = session.read()  
                .option("inferSchema", "true")  
                .json("src/main/resources/data/bands.json");  
  
  // inner joins  
  Column joinCondition = guitaristsDF.col("band")  
                .equalTo(bandsDF.col("id"));  
  
  
  Dataset<Row> guitaristsBandsDF = guitaristsDF.join(bandsDF,  
  guitaristsDF.col("band").equalTo(bandsDF.col("id")),  
  "inner");  
  
  
  guitaristsBandsDF.show(5);  
  
  guitaristsBandsDF.select("id", "band").show ();//what will be the output?  
  
 //m-1#drop the dupe column  guitaristsBandsDF.drop(bandsDF.col("id"));  
  
  //renaming the column  
  guitaristsDF.join(bandsDF.withColumnRenamed("id","band")  
                ,"band");  
  
  
  
  
  // outer joins  
 // left outer = everything in the inner join + all the rows in the LEFT table, with nulls in where the data is missing  guitaristsDF.join(bandsDF, joinCondition, "left_outer");  
  
  // right outer = everything in the inner join + all the rows in the RIGHT table, with nulls in where the data is missing  
  guitaristsDF.join(bandsDF, joinCondition, "right_outer");  
  
  // outer join = everything in the inner join + all the rows in BOTH tables, with nulls in where the data is missing  
  guitaristsDF.join(bandsDF, joinCondition, "outer");  
  
  // semi-joins = everything in the left DF for which there is a row in the right DF satisfying the condition  
  guitaristsDF.join(bandsDF, joinCondition, "left_semi");  
  
  // anti-joins = everything in the left DF for which there is NO row in the right DF satisfying the condition  
  guitaristsDF.join(bandsDF, joinCondition, "left_anti");  
  
  // things to bear in mind  
  
  
 // option 1 - rename the column on which we are joining  guitaristsDF.join(bandsDF.withColumnRenamed("id", "band"),  
  "band");  
  
  // option 2 - drop the dupe column  
  guitaristsBandsDF.drop(bandsDF.col("id"));  
  
  // option 3 - rename the offending column and keep the data  
  Dataset<Row> bandsModDF = bandsDF.withColumnRenamed("id", "bandId");  
  guitaristsDF.join(bandsModDF, guitaristsDF.col("band").  
                equalTo(bandsModDF.col("bandId")));  
  
  // using complex types  
  guitaristsDF.join(guitarsDF.withColumnRenamed("id", "guitarId")  
                , expr("array_contains(guitars, guitarId)"));  
  
  }  
  
}

The Above code demonstrates various operations related to joining DataFrames in Apache Spark using the Spark SQL API. It performs the following tasks:

  1. Logger Configuration:

    • It configures the logger to set the log level of "org" to Level.ERROR, which suppresses unnecessary log messages.
  2. SparkSession Initialization:

    • It creates a SparkSession, which is the entry point for using Spark SQL. The session is configured with the application name "Joins" and set to run in local mode using one thread.
  3. Loading Data:

    • It loads three DataFrames from JSON files: guitarsDF, guitaristsDF, and bandsDF. The option("inferSchema", "true") is used to automatically infer the schema of the JSON data.
  4. Inner Join:

    • It performs an inner join between the guitaristsDF and bandsDF DataFrames based on the condition that the "band" column in guitaristsDF matches the "id" column in bandsDF. The result is stored in the guitaristsBandsDF DataFrame.
  5. Displaying Data:

    • It displays the first 5 rows of the guitaristsBandsDF using the .show(5) method.
  6. Dropping Duplicate Column and Renaming:

    • It attempts to drop the duplicate "id" column from guitaristsBandsDF. However, it doesn't actually modify the DataFrame, and the result is not stored or displayed.
    • It also demonstrates renaming the "id" column to "band" in bandsDF during the join, but the result is not stored or displayed.
  7. Other Types of Joins:

    • It outlines various types of joins such as left outer join, right outer join, full outer join, left semi join, and left anti join. However, it does not store or display the results of these operations.
  8. Using Complex Types:

    • It performs a join between guitaristsDF and guitarsDF where it checks if the "guitarId" is present in the "guitars" array column of guitaristsDF. The expr function is used to specify a complex condition for the join, but the result is not stored or displayed.

In summary, this code provides a basic demonstration of Spark DataFrame operations, including loading data, performing joins, and some common operations. However, it does not display or store the results of many of these operations, and it includes comments for exercises that are not fully implemented.

Conclusion

Spark joins are a powerful tool for combining data from different sources into a single dataset for analysis. By understanding the different types of joins and how to perform them in Spark SQL, you can use joins to solve a wide variety of data analysis problems.

⚠️ **GitHub.com Fallback** ⚠️