RDD vs DataFrame vs Datasets - ignacio-alorre/Spark GitHub Wiki

Given same data, each of the 3 abstraction will compute and give same results to the user. But they are going to differ in performance and in the ways they compute. We could say that RDD lets us decide How we want to do something, which limits the optimisation Spark can do on processing underneath. On the other side Dataframe / Dataset lets us decide What we want to do, leaving everything on Spark to decide how to do computation.

Dataframe came as a major performance improvement over RDD, but not without some downsides. This led to development of Dataset which is an effort to unify best of RDD and Dataframe.

Resilient Distributed Dataset (RDD)

Properties

  • It is the building block of Spark. No matter which abstraction Dataframe or Dataset we use, internally final computation is done on RDDs
  • RDD is lazily evaluated immutable parallel collection of objects, partitioned across nodes in your cluster, exposed with lambda functions
  • Best part of RDD is that it is simple. We can load any data from a source, convert them into RDD and store in memory to compute results. RDD can be easily cached if same set of data needs to be recomputed.
  • Offer transformations and actions on data
  • Can deal with almost any kind of data: unstructured, semi-structures and structured data. A lot of times data is not ready to be fit into a DataFrame (even JSON), then RDD can be used to do preprocessing on the data so that it can fit in a Dataframe

Disadvantages

  • Performance limitations. Being in-memory jvm objects, RDDs involve overhead of Garbage Collection and Java Serialization which are expensive when data grows.

When to use RDDs

  • You want low-level transformation and actions and control on your dataset
  • Your data is unstructured, such as media streams or streams of text
  • You need to manipulate your data with functional programming constructs instead of domain specific expressions
  • You don't care about imposing a schema, such as columnar format, while processing or accessing data attributes by name or column
  • You can forgo some optimization and performance benefits available with DataFrames and Datasets for structured and semi-structured data
  • If you need end-to-end type safety or work a lot with types, which don't have built-in encoders, RDD API is a natural choice.
  • You may prefer RDD when order of execution is important (you can create your own planner rules with SQL, but it is much more effort) or you need low level control (like user defined Partitioners).
  • You need fine-grained control over the physical distribution of data (custom partitioning of data)

DataFrames

  • DataFrame is an abstraction which gives a schema view of data. Which means it gives us a view of data as columns with column name and type info. We can think data in Dataframe like a table in a database
  • Like RDD, execution in Dataframe too is lazy triggered
  • Offers huge performance improvement over RDDs because of 2 powerful features:
    • Custom Memory Management (aka Project Tungsten)
      • Data is stored in off-heap memory in binary format, which saves a lot of memory space
      • There is no Garbage Collection overhead involved
      • By knowing the schema of data in advance and storing efficiently in binary format expensive java Serialization is also avoided
    • Optimized Execution Plans (aka Catalyst Optimizer)
      • Query plans are created for execution using Spark catalyst optimiser.
      • After an optimised execution plan is prepared going through some steps, the final execution happens internally on RDDs only but thats completely hidden from the users


intr2

An example of optimisation with respect to the above picture, lets consider a query as below:

users.join(events, users("id") === events("uid"))
     .filter(events("data") > "2015-01-01")
intr2

Limitations

  • Lack of Type Safety. Referring attributes by string names means no compile time safety. Things can fail at runtime. For example, if we try accessing a column not present in schema, we will get problems only at run time.

  • DataFrame API is very scala-centric, even when it does support Java. For example: when creating a DataFrame from an existing RDD of Java objects, Spark's Catalyst optimizer cannot infer the schema and assumes that any objects in the DataFrame implement the scala.Product interface. Scala case class works out the box because they implement this interface.

// TODO - check if this is required Like an RDD, a DataFrame is an immutable distributed collection of data. Unlike an RDD, data is organized into named columns, like a table in relational database. Designed to make large data sets processing even easier, DataFrame allows developers to impose a structure onto a distributed collection of data, allowing higher-level abstraction; it provides a domain specific language API to manipulate your distributed data.

Datasets

  • It is an extension to Dataframe API, the lastest abstraction which tries to provide best of both RDD and Dataframe

  • Comes with OOPs style and developer friendly compile time safety like RDD as well as performance boosting features of Dataframe: Catalyst optimiser and custom memory managent.

  • Encoders is a new feature not present in Dataset. They act as interface between JVM objects and off-heap custom memory binary format data. Generate byte code to interact with off-heap data and provide on-demand access to individual attributes without having to de-serialize an entire object.

  • case class is used to define the structure of data schema in Dataset. Using case class, its very easy to work with dataset. Names of different attributes in case class is directly mapped to field names in Dataset. It gives feeling like working with RDD but actually underneath it works same as Dataframe

  • Dataframe is treated as Datasets of generic row objects: DataFrame = Dataset[Row]. So we can always convert a data frame at any point of time into a dataset by calling 'as' method on Dataframe

Limitations

  • Requires type casting to String: Querying the data from datasets currently requires us to specify the fields in the class as a string. Once we have queried the data, we are forced to cast column to the required data type.
  • If we use map operation on Datasets, it will not use Catalyst optimizer.

Important point: All these optimizations are possibel ebcause data is structured and Spark knows about the schema of data in advance. So it can apply all the powerful features like tungsten custom memory off-heap binary storage,catalyst optimiser and encoders to get the performance which was not possible if users would have been directly working on RDD

Benefits of Dataset APIs

1. Static-typing and runtime type-safety Consider static-typing and runtime safety as spectrum, with SQL least restrictive to Dataset most restrictive. For instance, in your Spark SQL string queries, you won't know a syntax error until runtime (which could be costly), whereas in DataFrames and Datasets you can catch errors at compile time (which saves developer-time and costs). Note: if you invoke a function in DataFrame that is not part of the API, the compiler will catch it. However, it won't detect a non-existing column name until runtime.

At the far end of the spectrum is Dataset, most restrictive. Since Dataset APIs are all expressed as lambda functions and JVM typed objects, any mismatch of typed-parameters will be detected at compile time. Also, you analysis error can be tected at compile time too, when using Datasets, hence saving developer-time and costs.

img1

2. High-level abstraction and custom view into structured and semi-structured data

DataFrames as a collection of Datasets[Row] render a structured custom view into your semi-structured data. For instance, let's say you have a huge IoT device event dataset, expressed as JSON. Since JSON is a semi-structured format, it lends itself well to employing Dataset as collection of strongly typed-specific Dataset[DeviceIoTData]

{"device_id": 198164, "device_name": "sensor-pad-198164owomcJZ", "ip": "80.55.20.25", "cca2": "PL", "cca3": "POL", "cn": "Poland", "latitude": 53.080000, "longitude": 18.620000, "scale": "Celsius", "temp": 21, "humidity": 65, "battery_level": 8, "c02_level": 1408, "lcd": "red", "timestamp" :1458081226051}

You could express each JSON entry as DeviceIoTData, a custom object, with a Scala case class

case class DeviceIoTData (battery_level: Long, c02_level: Long, cca2: String, cca3: String, cn: String, device_id: Long, device_name: String, humidity: Long, ip: String, latitude: Double, lcd: String, longitude: Double, scale:String, temp: Long, timestamp: Long)

Next, we can read the data from a JSON file:

// read the json file and create the dataset from the 
// case class DeviceIoTData
// ds is now a collection of JVM Scala objects DeviceIoTData
val ds = spark.read.json(“/databricks-public-datasets/data/iot/iot_devices.json”).as[DeviceIoTData]

Three things happen here under the hood in the code above:

  • Spark reads the JSON, infers the schema and creates a collection of DataFrames
  • At this point, Spark converts your data into DataFrame = Dataset[Row], a collection of generic Row object, since it does not know the exact type
  • Now, Spark converts the Dataset[Row] -> Dataset[DeviceIoTData] type-specific Scala JVM object, as dictated by the class DeviceIoTData.

With Dataset[ElementType] typed objects, you seamlessly get both compile-time safety and custom view for strongly-typed JVM objects.

3. Ease-of-use of APIs with structure Although structure may limit control in what your Spark program can do with data, it introduces rich semantics and an easy set of domain specific operations that can be expressed as high-level constructs. Most computations, however, can be accomplished with Dataset's high-level APIs. For example, it's much simpler to perform agg, select, sum, avg, map. filter or groupBy operations by accessing a Dataset typed object's DeviceIoTData than using RDD rows' data fields.

4. Performance and Optimization Along with all the above benefits, you cannot overlook the space efficiency and performance gains in using DataFrames and Dataset APIs for two reasons:

  • 1 DataFrame and Dataset APIs are built on top of the Spark SQL engine, it uses Catalyst to generate an optimized logical and physical query plan. Independently of the language (R/Python/Scala/Java), all relation type queries undergo the same code optimizer, providing the space and speed efficiency. Whereas the Dataset[T] typed API is optimized for data engineering tasks, the untyped Dataset[Row] is even faster and suitable for interactive analysis.

  • 2 Since Spark as a compiler understand your Dataset type JVM object, it maps your type-specific JVM object to Tungsten's internal memory representation using Encoders. As a result, Tungsten Encoders can efficiently serialize/deserialize JVM objects as well as generate compact bytecode that can execute at superior speeds.

When should I use DataFrames or Datasets?

  • If you want rich semantics, high-level abstractions, and domain specific APIs, use DataFrame or Dataset
  • If your processing demands high-level expressions, filter, maps, aggregation, averages, sum, SQL queries, columnar access and use of lambda functions on semi-structured data, use DataFrame or Dataset.
  • If you want higher degree of type-safety at compile time, want typed JVM objects, take advantage of Catalyst optimization, and benefit from Tungsten's efficient code generation, use Dataset
  • If you want unification and simplication of APis acress Spark Libraries, use DataFrame or Dataset
  • If you are a Python user, use DataFrames and resort back to RDDs if you need more control

Bringing It All Together

In summation, the choice of when to use RDD or DataFrame and/or Dataset seems obvious. While the former offers you low-level functionality and control, the latter allows custom view and structure, offers high-level and domain specific operations, saves space, and executes at superior speeds.

As we examined the lessons we learned from early releases of Spark—how to simplify Spark for developers, how to optimize and make it performant—we decided to elevate the low-level RDD APIs to a high-level abstraction as DataFrame and Dataset and to build this unified data abstraction across libraries atop Catalyst optimizer and Tungsten.

Source

⚠️ **GitHub.com Fallback** ⚠️