Dataframe - ignacio-alorre/Spark GitHub Wiki

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.

DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R.

In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset to represent a DataFrame.

First step is to generate a Spark Session

import org.apache.spark.sql.SparkSession

object SparkDataframeTest extends App{

  val spark = SparkSession
    .builder()
    .appName("Dummy Script")
    .config("spark.master", "local")
    .getOrCreate()

  // Once we have Spark Session we can start generating the DataFrames

}  

1- Creating a new Dataframe from a given sequence

  import spark.implicits._ 

  val namesDF = Seq(
    (1, "Roger", 22),
    (2, "Jon", 22),
    (3, "Robert", 48),
    (4, "Ned", 48),
    (5, "Arthur", 36),
    (6, "Trevor", 32),
    (7, "Franklin", 22),
    (8, "Michael", 36)
  ).toDF("id", "name", "age")

namesDF.show

// Output
namesDF.show

+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|   Roger| 22|
|  2|     Jon| 22|
|  3|  Robert| 48|
|  4|     Ned| 48|
|  5|  Arthur| 36|
|  6|  Trevor| 32|
|  7|Franklin| 22|
|  8| Michael| 36|
+---+--------+---+

2- Creating a DataFrame from Seq of Rows with complex schema

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StringType, IntegerType}

val data = Seq(Row(Row("Roger","","Rabbit"),"M",200),
  Row(Row("Jon","Jackson",""),"M",300),
  Row(Row("Trevor","","Tantelli"),"M",1000),
  Row(Row("Maria","Mine","Miller"),"F",120),
  Row(Row("Franklin","Fred","Ferdinan"),"F",80)
)

val schema = new StructType()
  .add("name",new StructType()
    .add("firstname",StringType)
    .add("middlename",StringType)
    .add("lastname",StringType))
  .add("gender",StringType)
  .add("score",IntegerType)

val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)

df.printSchema()  

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- score: integer (nullable = true)

df.show(false)
+------------------------+------+-----+
|name                    |gender|score|
+------------------------+------+-----+
|[Roger,,Rabbit]        |M     |200  |
|[Jon,Jackson,]          |M     |300  |
|[Trevor,,Tantelli]     |M     |1000 |
|[Maria,Mine,Miller]    |F     |120  |
|[Franklin,Fred,Ferdinan]|F     |80   |
+------------------------+------+-----+

3- Creating a DataFrame from a json file

val jsonDF = spark.read.json("examples/src/main/resources/people.json")

Untyped Dataframe Operations

Printing the Dataframe Schema

namesDF.printSchema()

//Output
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

Selecting one column from the DataFrame

namesDF.select("name").show

// Output
+--------+
|    name|
+--------+
|   Roger|
|     Jon|
|  Robert|
|     Ned|
|  Arthur|
|  Trevor|
|Franklin|
| Michael|
+--------+

Running SQL Queries into a DataFrame

// Register the DataFrame as a SQL temporary view
namesDF.createOrReplaceTempView("names")

val sqlDF = spark.sql("SELECT * FROM names where id > 4")
sqlDF.show()

// Online
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  5|  Arthur| 36|
|  6|  Trevor| 32|
|  7|Franklin| 22|
|  8| Michael| 36|
+---+--------+---+

Add things from:

⚠️ **GitHub.com Fallback** ⚠️