Integrating Hive and Spark - datacouch-io/spark-java GitHub Wiki

Integrating Hive and Spark is a powerful combination for large-scale data processing and analytics. This overview provides a high-level explanation of the steps involved in creating a Hive table, accessing it in Spark, and using the Spark Catalog for data manipulation and management.

Step 1: Create a Hive Table

In Hive, create a table named "reports" within a schema named "boa" using the following SQL commands:

CREATE SCHEMA boa;
CREATE TABLE boa.reports (id INT, days INT, YEAR INT);
INSERT INTO TABLE boa.reports VALUES
(121, 232, 2015), (122, 245, 2015), (123, 134, 2014),
(126, 67, 2016), (182, 122, 2016), (137, 92, 2015), (101, 311, 2015);

Step 2: Accessing Hive Data in Spark

Open the Spark shell and execute the following commands to create a DataFrame "hiveReports" by querying the Hive table:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession.builder()
    .appName("Hive-Spark Integration")
    .enableHiveSupport()
    .getOrCreate();

Dataset<Row> hiveReports = spark.sql("SELECT * FROM boa.reports");

Step 3: Check DataFrame Schema

To check the schema of the DataFrame "hiveReports," use the following command:

hiveReports.printSchema();

Step 4: Show DataFrame Data

Display the contents of the DataFrame "hiveReports," which is equivalent to running "SELECT * from boa.reports" in Hive CLI:

hiveReports.show();

Step 5: Accessing Catalog from Spark

The Catalog is available within the Spark session. You can access it as follows:

Catalog  catalog  =  spark.catalog();

Step 6: Querying Databases

You can use the Catalog to query databases. The following code demonstrates how to list the available databases:

catalog.listDatabases().select("name").show();

Step 7: Registering DataFrame with createTempView

In Spark 2.0 and later, you should register DataFrames using createTempView instead of registerTempTable. For example:

hiveReports.createOrReplaceTempView("sales"); 
spark.catalog().listTables("boa").show();

Step 8: Checking if a Table is Cached

You can use the Catalog to check whether a table is cached. By default, tables are not cached:

boolean  isCached  =  catalog.isCached("sales"); 
System.out.println("Is 'sales' table cached: "  + isCached);

Step 9: Drop a View

You can use the Catalog to drop views. In Spark SQL, it deregisters the view; in Hive, it removes it from the metadata store:

catalog.dropTempView("sales");

Step 10: Query Registered Functions

The Catalog API allows you to query registered functions, including built-in functions:

catalog.listFunctions().select("name",  "description",  "className",  "isTemporary").show(100);`

Step 11: Querying Registered Functions

You can use the Catalog API to query registered user-defined functions (UDFs) and built-in functions. The following code demonstrates how to list and access these functions:

catalog.listFunctions().show(100);  // Lists all registered functions`

Step 12: Working with Databases and Tables

You can use the Spark Catalog to interact with databases and tables. For example, to switch to a different database and list tables within it, you can do the following:

spark.sql("USE some_other_database");  // Change to a different 
databasecatalog.listTables().show();  // List tables in the current database`

Step 13: Caching Tables in Spark

You can cache tables in Spark to speed up access to frequently used data. For instance, to cache the "sales" table:

hiveReports.cache();  // Cache the DataFrame  
hiveReports.count();  // Trigger an action to cache the data  boolean  
isCached  =catalog.isCached("sales"); 
System.out.println("Is 'sales' table cached: "+ isCached);`

Step 14: Unpersisting Cached Tables

You can also remove a table from the cache when it's no longer needed:

hiveReports.unpersist();  // Unpersist the cached DataFrame`

Step 15: Listing Views

To list views created using createOrReplaceTempView, you can use the Catalog:

catalog.listTables("boa").show();  // List all views in the "boa" schema`

Step 16: Listing Databases and Tables

You can continue to query databases and list tables within Spark:

catalog.listDatabases().select("name").show(); 
catalog.listTables().show();`

Step 17: Exploring Built-in Functions

You can also explore Spark's built-in functions using the spark.catalog:

catalog.listFunctions().filter("isTemporary = false").show(100);  // Lists built-in functions`

These additional steps and explanations provide further insight into working with Hive and Spark integration. By following these steps, you can effectively query, manage, and interact with data stored in Hive using Spark's capabilities.

Lets execute the code

package com.sparkTutorial.sparkSql;  
  
import org.apache.spark.sql.Dataset;  
import org.apache.spark.sql.Row;  
import org.apache.spark.sql.SparkSession;  
import org.apache.spark.sql.Dataset;  
import org.apache.spark.sql.Row;  
import org.apache.spark.sql.SparkSession;  
  
public class JavaHiveAPI {  
    public static void main(String[] args) {  
  
        SparkSession mySparkSession = SparkSession.builder()  
                .master("local")  
                .appName("Spark-SQL Hive Integration ")  
                .enableHiveSupport()  
                .config("spark.sql.warehouse.dir", "your path")  
                .getOrCreate();  
  
  mySparkSession.sql("CREATE TABLE IF NOT EXISTS "  
  +" CDRs (callingNumber STRING, calledNumber String, "  
  +"origin String, Dest String,CallDtTm String, callCharge Int) "  
  +" ROW FORMAT DELIMITED FIELDS TERMINATED BY ','");  
  
  
  mySparkSession.sql("LOAD DATA LOCAL INPATH '"+"/C://Datasets/cdrs.csv"+" " +  
                "'INTO TABLE CDRs");  
  
  mySparkSession.sql(" SELECT origin, dest, count(*) as cnt "  
  +" FROM CDRs "  
  +" GROUP by origin, dest "  
  +" ORDERR by cnt desc "  
  +"  LIMIT 5").show();  
  
  }  
}

Explanation

The provided code is an example of using Apache Spark SQL and Hive integration for working with structured data. It's written to run as a standalone Java Spark application. Let's break down what this code does:

  1. Package Declaration and Imports:
package com.sparkTutorial.sparkSql;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

This section defines the package and imports necessary classes and libraries for Spark SQL and Hive integration.

  1. Main Class: JavaHiveAPI
public class JavaHiveAPI {

This class contains the main method, serving as the entry point for the Spark application.

  1. SparkSession Initialization:
SparkSession mySparkSession = SparkSession.builder()
        .master("local")
        .appName("Spark-SQL Hive Integration ")
        .enableHiveSupport()
        .config("spark.sql.warehouse.dir", "your path")
        .getOrCreate();

Here, a SparkSession is created. This is the entry point to programming Spark with the Dataset and DataFrame API. The key points in this code block are:

  • master("local"): It specifies that Spark will run locally, which is useful for development and testing. In a production environment, you would set this to a cluster manager (e.g., YARN). - appName("Spark-SQL Hive Integration "): This sets the application name. - enableHiveSupport(): It enables Hive support, allowing Spark to interact with Hive tables and metadata. - config("spark.sql.warehouse.dir", "your path"): You should replace "your path" with the path to your Hive warehouse directory. This is where Hive stores table data.
  1. Hive Table Creation:
mySparkSession.sql("CREATE TABLE IF NOT EXISTS "
       +" CDRs (callingNumber STRING, calledNumber String, "
       +"origin String, Dest String,CallDtTm String, callCharge Int) "
       +" ROW FORMAT DELIMITED FIELDS TERMINATED BY ','");

This code block uses SparkSession to execute a SQL command. It creates a table named "CDRs" with specified column names and data types. It defines the table as comma-separated values (CSV). The IF NOT EXISTS clause ensures that the table is created only if it doesn't already exist.

  1. Data Loading:
mySparkSession.sql("LOAD DATA LOCAL INPATH '"+"/C://Datasets/cdrs.csv"+" " +
        "'INTO TABLE CDRs");

This SQL command loads data from a local CSV file (specified by the INPATH) into the Hive table "CDRs."

  1. Data Querying:
mySparkSession.sql(" SELECT origin, dest, count(*) as cnt "
        +" FROM CDRs "
        +" GROUP by origin, dest "
        +" ORDERR by cnt desc "
        +"  LIMIT 5").show();

This SQL query retrieves data from the "CDRs" table. It selects the "origin" and "dest" columns and calculates the count of records (aliasing it as "cnt"). The results are grouped by "origin" and "dest," ordered in descending order of "cnt," and limited to the top 5 records. Finally, the .show() method is used to display the results.

In summary, this Java Spark application demonstrates the creation of a Hive table, data loading, and data querying. It showcases the integration of Spark SQL with Hive, making it possible to perform SQL-like operations on structured data stored in Hive tables.

⚠️ **GitHub.com Fallback** ⚠️