Datasets - ignacio-alorre/Spark GitHub Wiki

  • An extension of Spark SQL that provide additional compile-time type checking.

  • DataFrames are now a specialized version of Datasets that operate on generic Row objects and therefore lack the normal compile time type checking of Datasets

  • Datasets can be used when your data can be encoded for Spark SQL and you know the type information at compile time

  • The Dataset API is a strongly typed collection with mixture of relatinoal (DataFrame) and functional (RDD) transformatons

  • Datasets are represented by a logical plan that Catalyst optimizer can work with, and when cached the data is stored in Spark SQL internal encoding format

Interoperability with RDDs, DataFrames and Local Collections

  • To convert a DataFrame to a Dataset you can use the as[ElementType] function on the DataFrame to get a Dataset[ElementType]
def fromDF(df: DataFrame): Dataset[RawPanda] = {
  df.as[RawPanda]
}
  • The ElementType must be a case class, or similar such as tuple, consisting of types Spark SQL can represent

  • To create Datasets from local collections createDataSet() on the SQLContext and the toDS() implicit function are provided on Seqs

  • For converting from RDD to Dataset you can first convert from RDD to DataFrame and then convert it to a Dataset

  • For loading data into a Dataset you can first load your data into a DataFrame and then convert it to a Dataset. Since the conversion to the Dataset simply adds information, you do not have the problem of eagerly evaluating, and future filters and similar operations can still be pushed down to the data store

  • Converting from a Dataset back to an RDD or DataFrame can be done in similar ways as when converting DataFRames

// Illustrate converting a Dataset to an RDD
def toRDD(ds: Dataset[RawPanda]): RDD[RawPanda] = {
 ds.rdd
}

// Illustrate converting a Dataset to a DataFrame
def toDF(ds: Dataset[RawPanda]): DataFrame = {
  ds.toDF()
}
  • toDF simply copies the logical plan used in the Dataset into a DataFrame so you don't need to do any schema inference or conversion as you do when converting from RDDs.

  • Converting a Dataset of type T to an RDD of type T can be done by calling .rdd, which unlike calling toDF does involve converting the data from the internal SQL format to the regular types

Compile-Time Strong Typing

  • One of the reasons to use Datasets over traditional DataFrames is their compile-time strong typing.

  • DataFrames have runtime schema information but lack compile-time information about the schema. This strong typing is especially useful when making libraries, because you can specify the requirements of your inputs and your return types

Easier Functional (RDD "like") Transformations

  • One of the key advantages of Dataset API is easier integration with custom Scala code.

  • Datasets expose filter, map, mapPartitions and flatMap with similar function signatures as RDDs, with the requirement that your return ElementType also be understandable by Spark SQL

// Functional query on Dataset
def funMap(ds: Dataset[RawPanda]): Dataset[Double] = {
  ds.map{rp => rp.attributes.filter(_ > 0).sum}
}

Relational Transformations

Datasets introduce a typed version of select for relational-style transformations. When specifying an expression for this you need to include the type information by calling as[ReturnType] on the expression/column

// Simple relational select on Dataset
def squishyPandas(ds: Dataset[RawPanda]): Dataset[(Long, Boolean)] = {
  ds.select($"id".as[Long], ($"attributes"(0) > 0.5).as[Boolean])
}
  • Some operations, such as select, have both typed and untyped implementations.If you supply a Column rather than a TypedColumn you will get a DataFrame back instead of a Dataset

Multi-Dataset Relational Transformations

  • There are also transformations for working with multiple Datasets. The standard set operations like intersect, union and subtract are available. Also joining Datasets is supported

Grouped Operations on Datasets

  • Similar to grouped operations on DataFrames, groupBy on Datasets returns a GroupedDataset or a KeyValueGroupedDataset when grouped with an arbitrary function, and a RelationalGroupedDataset when grouped with a relational/Dataset

  • As with the expression in Relational Transformations you need to use typed expressions so the result can also be a Dataset

  • The convenience functions found on GroupData (e.g., min, max) are missing, so all of our aggregate expressions need to be specified through agg

// Compute the max panda size per zip code typed
def maxPandaSizePerZip(ds: Dataset[RawPanda]): Dataset[(String, Double)] = {
  ds.map(rp => MiniPandaInfo(rp.zip, rp.attributes(2)))
  .groupByKey(mp => mp.zip).agg(max("size").as[Double])
}
  • Beyond applying typed SQL expressions to aggregated columns, you can easily use arbitraty Scala code with mapGroups on grouped data
// Compute the max panda size per zip code using map groups
def maxPandaSizePerZipScala(ds: Dataset[RawPanda]): Dataset[(String, Double)] = {
  ds.groupByKey(rp => rp.zip).mapGroups{ case (g, iter) =>
    (g, iter.map(_.attributes(2)).reduceLeft(Math.max(_, _)))
  }
}

Extending with User-Defined Functions and Aggregate Functions (UDFs, UDAFs)

  • User-defined functions and user-defined aggregate functions provide you with ways to extend the DataFrame and SQL API with your own custom code while keeping the Catalyst optimizer.

  • Dataset API is another performant option for much of what you can do with UDFs and UDAFs. This is quite useful for performance, since otherwise you would need to convert the data to an RDD to perform arbitrary functions, which is quite expensive

  • UDFs and UDAFs can also be accessed from inside of regular SQL expressions, making them accessible to analysts or others more comfortable with SQL

  • When using UDFs or UDAFs written in non-JVM languages, such as Python, it is important to note that you lose much of the performance benefit, as the data must still be transferred out of the JVM. If most of your work is in Python but you want to access some UDFs without the performance penalty, you can write your UDFs in Scala and register them for use in Python

  • Writing nonaggregate UDFs for Spark SQL is very simple: you need to write a regular function and register it using sqlContext.udf().register

// String length UDF
def setupUDFs(sqlCtx: SQLContext) = {
  sqlCtx.udf.register("strLen", (s: String) => s.length())
}
  • UDAFs are trickier to write. Instead of writing a regular Scala function, you extend the UserDefinedAggregateFunction and implement a number of different functions, similar to the functions one might write for aggregateByKey on an RDD, except working with different data structures.

  • UDAF can be quite performant compared with options like mapGroups on Datasets or even simply written aggregateByKey on RDDs

// UDAF for computing the average
def setupUDAFs(sqlCtx: SQLContext) = {
  class Avg extends UserDefinedAggregateFunction {
  
    // Input type
    def inputSchema: org.apache.spark.sql.types.StructType =
    StructType(StructField("value", DoubleType) :: Nil)
    def bufferSchema: StructType = StructType(
      StructField("count", LongType) ::
      StructField("sum", DoubleType) :: Nil
    )
    
    // Return type
    def dataType: DataType = DoubleType
    def deterministic: Boolean = true
    def initialize(buffer: MutableAggregationBuffer): Unit = {
      buffer(0) = 0L
      buffer(1) = 0.0
    }
    
    def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
    buffer(0) = buffer.getAs[Long](0) + 1
    buffer(1) = buffer.getAs[Double](1) + input.getAs[Double](0)
    }
    
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
      buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
      buffer1(1) = buffer1.getAs[Double](1) + buffer2.getAs[Double](1)
    }
    
    def evaluate(buffer: Row): Any = {
      buffer.getDouble(1) / buffer.getLong(0)
    }
  }
  // Optionally register
  val avg = new Avg
  sqlCtx.udf.register("ourAvg", avg)
}
  • Start by specifying what the input type is, the specify the schema of the buffer you will use for storing the in-progress work. The rest of the functions are implementing the same functions you use when writing aggregateByKey on an RDD, but instead of taking arbitraty Scala objects you work with Row and MutableAggregationBuffer. The final evaluate function takes the Row representing the aggregation data and returns the final result

Query Optimizer

  • Catalyst is the Spark SQL query optimizer

  • It takes the query plan and transform it into an execution plan that Spark can run

  • Spark SQL builds up a tree representing our query plan, called logical plan. Spark is able to apply a number of optimizations on the logical plan and can also choose between multiple physical plans for the same logical plan using a cost-based model

Logical and Physical Plans

  • The logical plan you construct through transformations on DataFrames/Datasets starts out as an unresolved logical plan

  • The Spark optimizer is multiphased and before any optimizations can be performed, it needs to resolve the references and types of the expression

  • This resolved plan is referred to as the logical plan, and Spark applies a number of simplifications directly on the logical plan, producing an optimized logical plan

  • Once the logical plan has been optimized, Spark will produce a physical plan. The physical plan stage has both rule-based and cost-based optimizations to produce the optimal physical plan

Large Query Plans and Iterative Algorithms

  • While the Catalyst optimizer is quite powerful, one of the cases where it runs into challenges is with very large query plans

  • These query plans tend to be the result of iterative algorithms like, graph algorithms or machine learning algorithms

  • One simple workaround for this is converting the data to an RDD and back to DataFrame/Dataset at the end of each iteration