DataFrame API - ignacio-alorre/Spark GitHub Wiki
-
Allows us to work with DataFrames without having to register temporary tables or generate SQL expressions.
-
The DataFrame API has both, transformations and actions
-
The transformations on DataFrames are more relational in nature, with the Dataset API offering a more functional-style API
Transformations
-
Are similar in concept to RDD transformations, but with a more relational flavor
-
Instead of specifying arbitrary functions, which the optimizer is unable to introspect, you use a restricted expression syntax so the optimizer can have more information
-
As with RDDs, we can broadly break down transformations into simple single DataFrame, multiple DataFRame, key/value, and grouped/windowed transformations
-
Spark SQL transformations are only partially lazy; the schema is eagerly evaluated
Simple DataFRame transformations and SQL expressions
-
Simple DataFrame transformations allow us to do most of the standard things one can do when working a row at a time
-
You can do many of the operations defined on RDDs, except using Spark SQL expressions instead of arbitrary functions
-
DataFrame functions, like
filter
, accept Spark SQL expressions instead of lambdas. These expressions allow the optimizer to understand what the condition represents, and withfilter
it can often be used to skip reading unnecessary records. For example filtering out all the unhappy pandas in our dataframe:
pandaInfo.filter(pandaInfo("happy") !== true)
- Too look up the column, we can either provide the column name on the specific DataFrame or use the implicit $ operator for column lookup. This is especially useful when the DataFrame is anonymous. The ! binary negation function can be used together with $ to simplify our expression
df.filter(!$("happy"))
- For accessing other structures inside of DataFrame, like nested structs, keyed maps and array elements, you can use:
df("attribute")(0) >= 3
- You can compare multiple columns in our filter expression. Complex filters like the one below are more difficult to push down to the storage layer, so you may not see the same speedup over RDDs that you may see with simpler filters
pandaInfo.filter(pandaInfo("happy").and(pandaInfo("attribute")(0) > pandaInfo("attribute")(1))
-
Spark SQL's column operators are defined on the column class, so a filter containing the expression
0 >= df.col("friends")
will not compile since Scala will use the>=
defined on0
. Instead you would writedf.col("friend") <= 0
or convert0
to a column literal withlit()
-
Columns use
====
and!===
for equality to avoid conflict with Scala internals. -
For columns of strings
startWith
/endWith
,substr
,like
andisNull
are all available.
Spark SQL Scala operators
Spark SQL expression operators
Spark SQL standard functions
Spark SQL common mathematical expressions
Functions for use on Spark SQL array
-
Beyond simply filtering out data, you can also produce a DataFrame with new columns or updated values in old columns. Spark uses the same expression syntax we discussed for filter, except instead of having to include a condition the results are used as values in the new DataFrame.
-
Example of a Spark SQL select and explode operators
val pandaInfo = pandaPlace.explode(pandaPlace("pandas")){
case Row(pandas: Seq[Row]) =>
pandas.map{
case (Row(
id: Long,
zip: String,
pt: String,
happy: Boolean,
attrs: Seq[Double])) =>
RawPanda(id, zip, pt, happy, attrs.toArray)
}}
pandaInfo.select(
(pandaInfo("attributes")(0) / pandaInfo("attributes")(1))
.as("squishyness"))
-
With the
as()
oralias()
operators you can specify the resulting column name -
While all of these operations are quite powerful, sometimes the logic you wish to express can be encoded easier with
if/else
semantics. Below we can see this in an example, where it encodes the different types of panda as a numeric value.
/**
* Encodes pandaType to Integer values instead of String values.
*
* @param pandaInfo the input DataFrame
* @return Returns a DataFrame of pandaId and integer value for pandaType.
*/
def encodePandaType(pandaInfo: DataFrame): DataFrame = {
pandaInfo.select(pandaInfo("id"),
(when(pandaInfo("pt") === "giant", 0).
when(pandaInfo("pt") === "red", 1).
otherwise(2)).as("encodedType")
)
}
Specialized DataFrame transformations for missing and noisy data
-
By using
isNan
orisNull
along with filters, you can create conditions for the rows you want to keep. -
If you have a number of different columns, perhaps with different levels of precision (some of which may be null), you can use
coalesce(c1, c2, ...)
to return the first nonnull column. Similarly, for numeric data,nanvl
returns the first non-NaN value (e.g.,nanvl(0/0, sqrt(-2), 3)
results in3
)
Beyond row-by-row transformations
-
You can select the unique rows by calling
dropDuplicates
, but as with the similar operation onRDDs
(distinct
), this can require a shuffle, so is often much slower than filter. -
Unlike with RDDs,
dropDuplicates
can optionally drop rows based on only a subset of the columns, such as an ID field
df.dropDuplicates(List("id"))
Aggregates and groupBy
-
Thanks to its optimizer Spark SQL can easily combine many aggregates into one single action/query.
-
groupBy
returns special objects on which we can ask for certain aggregations to be performed.min
,max
,avg
andsum
are all implemented as convenience functions directly onGroupedData
, and more can be specifier by providing the expression toagg
. -
Example below shows how to compute the maximum panda size by zip code. Once you specify the aggregates you want to compute, you can get the result back as a DataFrame.
def maxPandasSizePerZip(pandas: DataFrame): DataFrame = {
pandas.groupBy(pandas("zip")).max("pandaSize")
}
- These aggregates can also be applied over the entire DataFrane or all numeric columns in a DataFrame. This is often useful when trying to collect some summary statistics for the data with which you are working. With
describe
you can extract some stats from a DataFrame:
// Compute the count, mean, stddev, min, max summary stats for all
// of the numeric fields of the provided panda infos. non-numeric
// fields (such as string (name) or array types) are skipped.
val df = pandas.describe()
// Collect the summary back locally
println(df.collect())
Output:
Array([count,3,3], [mean,1.3333333333333333,5.0],
[stddev,0.5773502691896258,4.358898943540674], [min,1,2], [max,2,10])
-
For computing multiple different aggregations, or more complex aggregations, you should use the
agg
API on theGroupedData
instead of directly callingcount
,mean
... -
For the
agg
API, you either supply a list of aggregate expressions, a string representing the aggregates or a map of column names to aggregate function names. -
Once we've called agg with the requested aggregates, we get back a regular DataFrame with the aggregated results.
-
Below we have a list of some common and useful aggregates
-
Computing multiple aggregates with Spark SQL can be much simpler than doing the same task with the RDD API
-
For the examples we will consider a DataFrame with the schema of name field (string) and age (integer), but nullable with values: ({"ikea", nukk}, {"tube", 6}, {"real", 30}).
-
If the built-in aggregation functions don't meet your needs, you can extend Spark SQL using UDFs
Windowing
-
When creating a window you specify what columns the window is over, the order of the rows within each partition/group, and the size of the window
-
The figure below shows a sample window and its results
- Using this specification each input row is related to some set of rows, called a frame, that is used to compute the resulting aggregate.
// Defining a window on the +/-10 closest (by age) pandas in the same zip code
val windowSpec = Window
.orderBy(pandas("age"))
.partitionBy(pandas("zip"))
.rowsBetween(start = -10, end = 10) // can use rangeBetween for range instead
- Once you've defined a window specification you can compute a function over it.
val pandaRelativeSizeCol = pandas("pandaSize") -
avg(pandas("pandaSize")).over(windowSpec)
pandas.select(pandas("name"), pandas("zip"), pandas("pandaSize"), pandas("age"),
pandaRelativeSizeCol.as("panda_relative_size"))
Sorting
- Sorting supports multiple columns in ascending or descending order, with ascending as the default. These sort orders can be intermixed, as shown in the example below. Spark SQL has some extra benefits for sorting as some serialized data can be compared without deserialization
pandas.orderBy(pandas("pandaSize").asc, pandas("age").desc)
- When limiting results, sorting is often used to only bring back the top or bottom K results. For example
limit(threshold)
will return just thethreshold
top/bottom rows. In case you just want a subset of the DataFrame independently of the row order, you can use insteandsample(numRows)
Multi-DataFrame Transformations
Set-like operations
-
This kind of operations allow us to perform many operations that are most commonly though of as set operations.
-
This operations don't have the restriction of unique elements
-
In the table below you can see the cost of these operations
Plain Old SQL Queries and Interacting with Hive Data
-
Sometimes it's better to use regular SQL queries instead of building up our operations on DataFrames.
-
If you are connected to a Hive Metastore we can directly write SQL queries against the Hive tables and get the results as a DataFrame
-
If you have a DataFrame and you want to write SQL queries against, you can register it as a temporary table
-
Datasets can also be converted back to DataFrames and registered for querying against
// Registering/saving tables
def registerTable(df: DataFrame): Unit = {
df.registerTempTable("pandas")
df.write.saveAsTable("perm_pandas")
}
- Querying tables is the same, regardless of whether it is a temporary table, existing Hive table or newly saved Spark table
// Querying a table (permanent or temporary)
def querySQL(): DataFrame = {
sqlContext.sql("SELECT * FROM pandas WHERE size > 0")
}
- In addition to registering tables you can also write queries directly against a specific file path
// Querying a table (permanent or temporary)
def querySQL(): DataFrame = {
sqlContext.sql("SELECT * FROM parquet.`path_to_parquet_file`")
}
Data Representation in DataFrames and Datasets
-
DataFrames are more than RDDs of Row objects
-
DataFrame and Datasets have a specialized representation and columnar cache format.
-
The specialization representation is not only more space efficient, but also can be much faster to encode than even Kryo serialization
-
Like RDDs, DataFrames and Datasets are generally lazily evaluated and build up a lineage of their dependencies (except in DataFrames this is called a logical plan and contains more information)
Tungsten
-
It is a new Spark SQL component that provides more efficient Spark operations by working directly at the byte level.
-
Tungsten includes specialized in-memory data structures tuned for the types of operations required by Spark, improved code generation and specialized wire protocol
-
Tungsten's representation is substantially smaller than objects serialized using Java or even Kryo serializers. As Tungsten does not depend on Java objects, both on-heap and off-heap allocations are supported.
-
Not only is the format more compact, but serialization times can be substantially faster than with native serialization
-
Tungsten's data structures are also created closely in mind with the kind of processing for which they are used. For example with sorting, which is typically an expensive operation, the on-wire representation is implemented so that sorting can be done without having to deserialize the data again.
Data Loading and Saving Functions
DataFrame Writer and DataFrameReader cover writing and reading from external data sources.
Formats
When reading or writing you specify the format by calling format(formatName)
on the DataFrameWriter/DataFrameReader
JSON
-
Loading and writing JSON is supported directly in Spark SQL, and despite the lack of schema information in JSON, Spark is able to infer a schema for us by sampling the records.
-
Loading JSON data is more expensive than loading many data sources, since Spark needs to read some of the records to determine the schema information.
-
If the schema between records varies widely (or the number of records is very small), you can increase the percentage of records read to determine the schema by setting
samplingRation
to a higher value.
val df2 = session.read.format("json")
.option("samplingRatio", "1.0").load(path)
Since our input may contain invalid JSON records, we can also take in an RDD of strings. This allow us to load the input as a standard text file, filter out our invalid records, and the lead the data into JSON.
val rdd: RDD[String] = input.filter(_.contains("panda"))
val df = session.read.json(rdd)
JDBC
-
The JDBC data source represents a natural Spark SQL data source
-
While spark supports many different JDBC sources, it does not ship with the JARs required to talk to all of these databases.
-
If you are submitting your Spark job with
spark-submit
you can download the required JARs to the host you are launching and include them by specifying --jars or supply them in Maven
Parquet
-
Are very space-efficient
-
Have the ability to easily split across multiple files, good compression, allow nested types
Hive Tables
- One option for bringing in data from Hive table is writing a SQL query against it and having the result as a DataFrame
// Load a Hive table
def loadHiveTable(): DataFrame = {
session.read.table("pandas")
}
-
When loading a Hive table, Spark SQL will convert the metadata and cache the result. If the underlying metadata has change, it can be used
sqlContext.refreshTable("tablename")
to update the metadata, or the caching can be disabled by settingspark.sql.parquet.cacheMetadata
to false -
Saving a managed table is a bit different
def saveManagedTable(df: DataFrame): Unit = {
df.write.saveAsTable("pandas")
}
RDDs
-
Spark SQL DataFrames can be easily converted to RDDs of Row objects, and can also be created from RDDs of Row objects as well as Scala case classes and tuples.
-
Datasets of type T can be easily converted to RDDs of type T.
-
RDDs are special-case data source, since when going to/from RDDs, the data remains inside of Spark without writing out to or reading from an external system.
-
Converting a DataFrame to an RDD is a transformation (not an action). However, converting an RDD to a DataFrame or Dataset may involve computing the input RDD
-
When you create a DataFrame from an RDD, Spark SQL needs to add schema information. If you are creating the DataFrame from an RDD of case classes, Spark SQL is able to use reflection to automatically determine the schema
// Creating DataFrames from RDDs
def createFromCaseClassRDD(input: RDD[PandaPlace]) = {
// Create DataFrame explicitly using session and schema inference
val df1 = session.createDataFrame(input)
// Create DataFrame using session implicits and schema inference
val df2 = input.toDF()
// Create a Row RDD from our RDD of case classes
val rowRDD = input.map(pm => Row(pm.name,
pm.pandas.map(pi => Row(pi.id, pi.zip, pi.happy, pi.attributes))))
val pandasType = ArrayType(StructType(List(
StructField("id", LongType, true),
StructField("zip", StringType, true),
StructField("happy", BooleanType, true),
StructField("attributes", ArrayType(FloatType), true))))
// Create DataFrame explicitly with specified schema
val schema = StructType(List(StructField("name", StringType, true),
StructField("pandas", pandasType)))
val df3 = session.createDataFrame(rowRDD, schema)
}
-
Case classes defined inside another class can sometimes cause problems.
-
Converting a DataFrame to an RDD is incredible simple, however, you get an RDD of Row objects. Since a row can contain anything, you need to specify the type (or cast the result) as you fetch the values for each column in the row.
def toRDD(input: DataFrame): RDD[RawPanda] = {
val rdd: RDD[Row] = input.rdd
rdd.map(row => RawPanda(row.getAs[Long](0), row.getAs[String](1),
row.getAs[String](2), row.getAs[Boolean](3), row.getAs[Array[Double]](4)))
}
- If you know that the schema of your DataFrame matches that of another, you can use the existing schema when constructing you new DataFrame. One common place where this occurs is when an input DataFrame has been converted to an RDD for functional filtering and then back
Save Modes
-
In core Spark, saving RDDs always requires that the target directory does not exist, which can make appending to existing tables challenging.
-
With Spark SQL, you can specify the desired behavior when writting out to a path that may already have data.
-
The default behavior is
saveMode.ErrorIfExists
[matching the behavior of RDDs], then spark will throw an exception if the target already exists. The different save modes and their behaviors are listed below
// Specify save mode of append
def writeAppend(input: DataFrame): Unit = {
input.write.mode(SaveMode.Append).save("output/")
}
Partitions (Discovery and Writing)
-
Partition data is an important part of Spark SQL since it powers on of the key optimizations to allow reading only the required data
-
If you know how your downstream consumers may access your data (like for example partition based on zip code), when you write your data it is beneficial to use that information to partition your output.
-
When reading the data, it's useful to understand how partition discovery functions, so you can have a better understanding of whether your filter can be pushed down.
-
Filter push-down can make a huge difference when working with large datasets by allowing Spark to only access the subset of data required for your computation instead of doing effectively a full scan
-
When reading partitioned data, you point Spark to the root path of your data and it will automatically discover the different partitions.
-
Not all data types can be used as partition keys; currently only strings and numeric data are the supported types
-
If your data is all in a single DataFrame, the DataFrameWriter API makes it easy to specify the partition information while you are writing the data out
-
The partitionBy function takes a list of columns to partition the output on
// Save partitioned by zip code
def writeOutByZip(input: DataFrame): Unit = {
input.write.partitionBy("zipcode").format("json").save("output/")
}
- In addition to splitting the data by a partition key, it can be useful to make sure the resulting file size are reasonable, especially if the results will be used downstram