Dataset Typed API - datacouch-io/spark-java GitHub Wiki
In this lab, we will explore the Dataset Typed API in Apache Spark. The Dataset Typed API provides compile-time type safety, making it a powerful tool for working with structured data in Spark applications. Unlike DataFrames, which are dynamically typed, Datasets allow you to work with data in a strongly typed manner. This lab will guide you through the usage of Datasets in Spark and demonstrate their benefits in terms of type safety.
Please note that the Dataset Typed API is available in Scala and Java, but it is not available in Python. Python, being a dynamically typed language, does not support strongly typed Datasets. As a result, this lab will focus on Scala or Java implementations.
This lab is expected to take approximately 30-40 minutes to complete. The runtime may vary depending on your familiarity with Spark and the complexity of the tasks you perform.
Before starting this lab, ensure that you have the following prerequisites in place:
- Apache Spark environment set up and configured.
- Basic knowledge of Apache Spark and its concepts.
- Familiarity with Scala or Java, depending on your chosen language for implementing Spark.
In this lab, we will explore the Dataset Typed API by performing various tasks using Spark in Scala or Java. These tasks will demonstrate the advantages of compile-time type safety offered by Datasets. Let's dive into the tasks.
Music Data File
We've provided a small and simple JSON data file (/data/music.json) containing data about music items. We'll use it to do our Dataset work in this lab.
It has four pieces of data in it:
- title: Title of the item.
- artist: Artist who released the item.
- price: Price of the item
- category: The music category (e.g. Pop)
Tasks
- Create a DataFrame by reading in the music data in data/music.json
- View the schema of this DataFrame.
- Display the data in the DataFrame (show).
Simple Dataset Usage Tasks
- Define a MusicItem case class suitable for our music.json data. Using this class, create a typed Dataset from the musicDF dataframe created with this data in the previous lab.
- What type is musicDS? Display the data in the dataset.
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Encoders;
public class MusicDataExample {
public static void main(String[] args) {
// Create a SparkSession
SparkSession spark = SparkSession.builder()
.appName("MusicDataExample")
.getOrCreate();
// Read the JSON file into a DataFrame
Dataset<Row> musicDF = spark.read().json("data/music.json");
// Display the schema of musicDF
musicDF.printSchema();
// Show the data in musicDF
musicDF.show();
// Define a case class for MusicItem
class MusicItem {
String title;
String artist;
String category;
double price;
public MusicItem(String title, String artist, String category, double price) {
this.title = title;
this.artist = artist;
this.category = category;
this.price = price;
}
}
// Create a typed DataSet from musicDF using the MusicItem case class
Encoder<MusicItem> musicItemEncoder = Encoders.bean(MusicItem.class);
Dataset<MusicItem> musicDS = musicDF.as(musicItemEncoder);
// Display the data in musicDS
musicDS.show();
// Stop the SparkSession
spark.stop();
}
}
We'll perform some operations on our DataFrame and Dataset representations, to get a feel of the difference.
Tasks
- Filter on category
- Using musicDF get all the items in the "Pop" category.
- Using musicDS get all the items in the "Pop" category.
- Get lowest priced item in each category.
- You'll need to group, then do an aggregation.
- Using musicDF and untyped transformations, get the lowest price item in a category.
- Using musicDS and typed transformations, get the lowest price item in a category.
- Transform data so that the price is reduced 10%. (You can do this by multiplying it by 0.9).
- Using musicDF (the DataFrame) transform it to a DataFrame where the price is reduced by 10%.
- Hint use a select and a literal to create the new price values.
- Using musicDS (the Dataset) transform it to a Dataset where the price is doubled.
- Use map()
- Make sure you know how your case class is defined (i.e. what order the fields appear).
- Using musicDF (the DataFrame) transform it to a DataFrame where the price is reduced by 10%.
- Make a mistake - watch how errors are caught.
- Using musicDF (the DataFrame) make a mistake in transforming it when you modify the price.
- Try multiplying the price by a literal string - e.g. lit("A")
- What happens?
- Using musicDS (the Dataset) make a mistake in transforming when you modify the price.
- Try multiplying the price by a string (e.g. "A")
- What happens?
- Using musicDF (the DataFrame) make a mistake in transforming it when you modify the price.
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Encoders;
public class DataFrameVsDatasetExample {
public static void main(String[] args) {
// Create a SparkSession
SparkSession spark = SparkSession.builder()
.appName("DataFrameVsDatasetExample")
.getOrCreate();
// Read your music data into a DataFrame or Dataset (assuming you have MusicItem class)
Dataset<MusicItem> musicDS = spark.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.csv("<path>/music.csv")
.as(Encoders.bean(MusicItem.class));
// Filter on category using DataFrame
musicDS.filter("category = 'Pop'").show();
// Filter on category using Dataset
musicDS.filter(mi -> mi.getCategory().equals("Pop")).show();
// Using musicDF and untyped transformations, get the lowest price item in a category
musicDS.groupByKey(mi -> mi.getCategory()).min("price").show();
// Transform data so that the price is reduced by 10% (multiply by 0.9)
musicDS.select("title", "artist", "category", musicDS.col("price").multiply(0.9)).show();
// Make a mistake - watch how errors are caught (Java is strongly typed)
// This will throw a compilation error if the types do not match
// musicDS.map(mi -> new MusicItem(mi.getTitle(), mi.getArtist(), mi.getCategory(), mi.getPrice() * "A")).show();
// Stop the SparkSession
spark.stop();
}
}
In this Java code:
-
We create a SparkSession to establish a connection to Spark.
-
We read music data from a CSV file and convert it into a typed Dataset using
Encoders.bean()
. -
We demonstrate equivalent operations for filtering, aggregation, and transformations using both DataFrames and Datasets.
-
Note that in Java, type safety is enforced at compile time, so attempts to perform operations with incompatible types will result in compilation errors.
We've practiced some fairly simple transformations with Datasets. We've also done the same thing with DataFrames to get a feel of the di!erence. DataFrames are a little easier to program to. However, as we saw, Datasets can catch errors earlier than DataFrames. This can be important when debugging and maintaining a large system.