Introduction to DataFrames - ignacio-alorre/Spark GitHub Wiki

First let's create two dataframes from two sequences

import spark.implicits._

val scoresSector1Df = Seq(
  (3, "Aby"),
  (5, "Barney"),
  (8, "Charly"),
  (9, "Dubby")
).toDF("score", "name")

val scoresSector2Df = Seq(
  (6, "Zoey"),
  (2, "Wilson"),
  (7, "Vicky"),
  (5, "Trevor")
).toDF("score", "name")

Union()

Now let's union those dataframes

val scoresRegionDf = scoresSector1Df.union(scoresSector2Df)
//org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [score: int, name: string]

scoresRegionDf.show

+-----+------+
|score|  name|
+-----+------+
|    3|   Aby|
|    5|Barney|
|    8|Charly|
|    9| Dubby|
|    6|  Zoey|
|    2|Wilson|
|    7| Vicky|
|    5|Trevor|
+-----+------+

Select()

Selects a subset of the dataframe columns

scoresRegionDf.select("score").show

+-----+
|score|
+-----+
|    3|
|    5|
|    8|
|    9|
|    6|
|    6|
|    2|
|    7|
|   12|
|    5|
+-----+

Filter()

Select a subset of the dataframe rows

scoresRegionDf.filter($"name" === "Barney" || $"name" === "Vicky").show

+-----+------+
|score|  name|
+-----+------+
|    5|Barney|
|    7| Vicky|
+-----+------+

Where()

This clause is equivalent to filter()

scoresRegionDf.where($"name" === "Barney" || $"name" === "Vicky").show

+-----+------+
|score|  name|
+-----+------+
|    5|Barney|
|    7| Vicky|
+-----+------+

Sort()

Sort the rows in the dataframe based on the values of a given column

scoresRegionDf.sort($"name".asc).show

+-----+------+
|score|  name|
+-----+------+
|    3|   Aby|
|    5|Barney|
|    9|Charly|
|    8|Charly|
|    6|Dwight|
|    5|Trevor|
|   12|  Uson|
|    7| Vicky|
|    2|Wilson|
|    6|  Zoey|
+-----+------+

Agg()

Used to sum values. It will return a single row dataframe

val scoreSum = scoresRegionDf.agg("score" -> "sum")
//org.apache.spark.sql.DataFrame = [sum(score): bigint]

scoreSum.show
+----------+
|sum(score)|
+----------+
|        63|
+----------+

createOrReplaceTempView()

Consist on registering a DataFrame as a temp view, so we can query it using SQL

scoresRegionDf.createOrReplaceTempView("scoresRegionVw")

val filterScore = spark.sql("""
  SELECT *
  FROM scoresRegionVw
  WHERE name in ("Barney", "Vicky")
""")

filterScore.show()

+-----+------+
|score|  name|
+-----+------+
|    5|Barney|
|    7| Vicky|
+-----+------+

Explode() List

It is used to flatten columns in a dataframe which type is a sequence:

import org.apache.spark.sql.functions._

val tripsDf = Seq(
  ("green", Seq("Gijon", "Leon", "Madrid")),
  ("red", Seq("Gijon", "Vegadoe", "Vigo")),
  ("blue", Seq("Gijon", "Santander", "Bilbao"))
).toDF("line", "stations")

tripsDf.show(20, false)
+-----+--------------------------+
|line |stations                  |
+-----+--------------------------+
|green|[Gijon, Leon, Madrid]     |
|red  |[Gijon, Vegadoe, Vigo]    |
|blue |[Gijon, Santander, Bilbao]|
+-----+--------------------------+

val expldStationsDf = tripsDf.select($"line",explode($"stations").alias("station"))
// exploded: org.apache.spark.sql.DataFrame = [line: string, station: string]

expldStationsDf.show()
+-----+---------+
| line|  station|
+-----+---------+
|green|    Gijon|
|green|     Leon|
|green|   Madrid|
|  red|    Gijon|
|  red|  Vegadoe|
|  red|     Vigo|
| blue|    Gijon|
| blue|Santander|
| blue|   Bilbao|
+-----+---------+

Group() and Agg()

Collecting in a single column values from columnB which have in common columnA

val grpStationsDf = expldStationsDf.groupBy("line").agg(collect_list($"station") as "stations")
// grpStationsDf: org.apache.spark.sql.DataFrame = [line: string, stations: array<string>]

grpStationsDf.show(20,false)
+-----+--------------------------+
|line |stations                  |
+-----+--------------------------+
|green|[Gijon, Leon, Madrid]     |
|red  |[Gijon, Vegadoe, Vigo]    |
|blue |[Gijon, Santander, Bilbao]|
+-----+--------------------------+

Explode() Map

val sensorsMapDf = Seq(
  ("sensor1", Map("T" -> 26, "H" -> 30, "B" -> 1000)),
  ("sensor2", Map("T" -> 25, "H" -> 30, "B" -> 1100)),
  ("sensor3", Map("T" -> 26, "H" -> 30, "B" -> 1000))
).toDF("sensor", "measures")

+-------+--------------------------------+
|sensor |measures                        |
+-------+--------------------------------+
|sensor1|Map(T -> 26, H -> 30, B -> 1000)|
|sensor2|Map(T -> 25, H -> 30, B -> 1100)|
|sensor3|Map(T -> 26, H -> 30, B -> 1000)|
+-------+--------------------------------+

val explSensorDf = sensorsMapDf.select($"sensor", explode_outer($"measures"))
// explSensorDf: org.apache.spark.sql.DataFrame = [sensor: string, key: string ... 1 more field]

explSensorDf.show(false)

+-------+---+-----+
|sensor |key|value|
+-------+---+-----+
|sensor1|T  |26   |
|sensor1|H  |30   |
|sensor1|B  |1000 |
|sensor2|T  |25   |
|sensor2|H  |30   |
|sensor2|B  |1100 |
|sensor3|T  |26   |
|sensor3|H  |30   |
|sensor3|B  |1000 |
+-------+---+-----+

explSensorDf.groupBy($"sensor").pivot("key").agg(first("value")).show(false)

+-------+----+---+---+
|sensor |B   |H  |T  |
+-------+----+---+---+
|sensor1|1000|30 |26 |
|sensor3|1000|30 |26 |
|sensor2|1100|30 |25 |
+-------+----+---+---+

Merge columns into a Map

val sensorsDf = Seq(
  ("sensor1", 26, 30, 1000),
  ("sensor2", 25, 30, 1100),
  ("sensor3", 26, 30, 1000),
  ("sensor1", 28, 32, 1200),
  ("sensor2", 29, 35, 1500),
  ("sensor3", 27, 33, 1300)
).toDF("sensor", "temperature", "humidity", "brightness")

sensorsDf.show

+-------+-----------+--------+----------+
| sensor|temperature|humidity|brightness|
+-------+-----------+--------+----------+
|sensor1|         26|      30|      1000|
|sensor2|         25|      30|      1100|
|sensor3|         26|      30|      1000|
|sensor1|         28|      32|      1200|
|sensor2|         29|      35|      1500|
|sensor3|         27|      33|      1300|
+-------+-----------+--------+----------+

val columnsToMap = List("temperature", "humidity", "brightness").flatMap(colName =>  List(lit(colName) , col(colName) ))
val colsAsMap = sensorsDf.withColumn("measures", map(colToMap: _ *))

// Another option, more complex but can drop used columns 
def mergeMapUdf= udf((map1: Map[String, Int], map2: Map[String, Int])=> map1 ++ map2)
val colsAsMap = columnsToMap.foldLeft(sensorsDf.withColumn("measures", lit(null)))((df,column) => df.withColumn("measures", when(col("measures").isNull,map(lit(column), col(column))).otherwise(mergeMapUdf( col("measures"), map(lit(column), col(column))))).drop(col(column)))

colsAsMap.show(false)

+-------+----------------------------------------------------------+
|sensor |measures                                                  |
+-------+----------------------------------------------------------+
|sensor1|Map(temperature -> 26, humidity -> 30, brightness -> 1000)|
|sensor2|Map(temperature -> 25, humidity -> 30, brightness -> 1100)|
|sensor3|Map(temperature -> 26, humidity -> 30, brightness -> 1000)|
|sensor1|Map(temperature -> 28, humidity -> 32, brightness -> 1200)|
|sensor2|Map(temperature -> 29, humidity -> 35, brightness -> 1500)|
|sensor3|Map(temperature -> 27, humidity -> 33, brightness -> 1300)|
+-------+----------------------------------------------------------+
⚠️ **GitHub.com Fallback** ⚠️