SQL Tables and Views - datacouch-io/spark-java GitHub Wiki

Apache Spark provides Spark SQL, a module for structured data processing, which allows you to run SQL queries on structured data in Spark. In addition to standard SQL, Spark SQL offers a programmatic interface and support for various data sources, including Hive, Parquet, Avro, ORC, and more. This documentation will provide a detailed guide on working with Spark SQL tables and views using Java.

CREATING A MANAGED TABLE

Tables reside within a database. By default, Spark creates tables under the default database. To create your own database name, you can issue a SQL command from your Spark application or notebook. Using the US flight delays dataset, let’s create both a managed and an unmanaged table. To begin, we’ll create a database called training_spark_db and tell Spark we want to use that database:

Example code

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SaveMode;

public class JavaSparkSQLExample {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("JavaSparkSQLExample")
                .master("local[*]")
                .getOrCreate();

        // List available databases
        spark.catalog().listDatabases().show(false);

        // Use the "training_spark_db" database
        spark.sql("USE training_spark_db");

        // Create a managed table
        spark.sql("CREATE TABLE managed_us_delay_flights_tbl (date STRING, delay INT, distance INT, origin STRING, destination STRING)");

        // Define CSV file path and schema
        String csvFile = "file:///home/cloudera/Desktop/SW/in/Data/departuredelays.csv";
        String schema = "date STRING, delay INT, distance INT, origin STRING, destination STRING";

        // Read data from CSV and apply the schema
        Dataset<Row> df = spark.read()
                .format("csv")
                .option("header", "true")
                .option("schema", schema)
                .load(csvFile);

        // Write the DataFrame to the managed table
        df.write()
                .mode(SaveMode.Overwrite)
                .saveAsTable("managed_us_delay_flights_tbl");

        // Stop the Spark session
        spark.stop();
    }
}

This code provides a complete example of how to use Spark SQL with Java to interact with structured data. It covers key operations such as database selection, table creation, data import, and table saving, allowing you to perform data analysis and queries within the Spark ecosystem.

Explanation

  1. Creating a SparkSession:

    SparkSession spark = SparkSession.builder()
     .appName("JavaSparkSQLExample")
     .master("local[*]")
     .getOrCreate();
    

    In this section, a SparkSession is initialized. It's the entry point for working with Spark SQL. The appName method sets the name of the Spark application, and master specifies the execution mode (in this case, "local[*]" for local mode).

  2. Listing Available Databases:

    spark.catalog().listDatabases().show(false);
    

    The code lists available databases using the spark.catalog().listDatabases() method. The .show(false) method is used to display the results without truncating long strings.

  3. Using a Database:

    spark.sql("USE training_spark_db");
    

    The code specifies the use of the "training_spark_db" database using the spark.sql() method.

  4. Creating a Managed Table:

    spark.sql("CREATE TABLE managed_us_delay_flights_tbl (date STRING, delay INT, distance INT, origin STRING, destination STRING)");
    

    The code creates a managed table named "managed_us_delay_flights_tbl" with specified columns and data types. This is done using the spark.sql() method.

  5. Reading Data from CSV:

    String csvFile = "file:///home/cloudera/Desktop/SW/in/Data/departuredelays.csv";
    String schema = "date STRING, delay INT, distance INT, origin STRING, destination STRING";
    
    Dataset<Row> df = spark.read()
        .format("csv")
        .option("header", "true")
        .option("schema", schema)
        .load(csvFile);
    

    This section defines the path to a CSV file and the schema for the data. It then uses the spark.read() method to read the data from the CSV file, applying the specified schema.

  6. Saving Data to a Managed Table:

    df.write()
        .mode(SaveMode.Overwrite)
        .saveAsTable("managed_us_delay_flights_tbl");
    

    After reading the data into a DataFrame (df), the code writes the DataFrame to the managed table "managed_us_delay_flights_tbl." The .mode(SaveMode.Overwrite) ensures that any existing data in the table is overwritten.

  7. Stopping the Spark Session:

    spark.stop();
    

    Finally, the Spark session is stopped to release resources and terminate the Spark application.

CREATING AN UNMANAGED TABLE

By contrast, you can create unmanaged tables from your own data sources—say, Parquet, CSV, or JSON files stored in a file store accessible to your Spark application.

To create an unmanaged table from a data source such as a CSV file, in SQL use:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SaveMode;

public class JavaSparkSQLExample {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("JavaSparkSQLExample")
                .master("local[*]")
                .getOrCreate();

        // Create a DataFrame
        String csvFile = "file:///home/{$USER}/Desktop/SW/in/Data/departuredelays.csv";
        Dataset<Row> df = spark.read()
                .option("header", "true")
                .csv(csvFile);

        // Create a table
        df.createOrReplaceTempView("us_delay_flights_tbl");
        spark.sql("CREATE TABLE us_delay_flights_tbl " +
                "(date STRING, delay INT, distance INT, origin STRING, destination STRING) " +
                "USING csv OPTIONS (PATH '/tmp/data/us_flights_delay')");

        // Save DataFrame as a table
        df.write()
                .mode(SaveMode.Overwrite)
                .option("path", "/tmp/data/us_flights_delay")
                .saveAsTable("us_delay_flights_tbl");

        // Stop the Spark session
        spark.stop();
    }
}

In this code:

  1. A SparkSession is created as the entry point to interact with Spark SQL.

  2. Data is loaded from a CSV file into a DataFrame (df) using the .read() method.

  3. The DataFrame is registered as a temporary view (us_delay_flights_tbl) to make it available for SQL queries.

  4. The spark.sql() method is used to create a table. It specifies the schema and defines the table's source as a CSV file located at '/tmp/data/us_flights_delay'.

  5. The DataFrame is saved as a table using the .write() method. The .option("path", "/tmp/data/us_flights_delay") specifies the location for saving the table data.

  6. Finally, the Spark session is stopped to release resources and terminate the Spark application.

⚠️ **GitHub.com Fallback** ⚠️