Spark SQL - aseldawy/bdtutorials GitHub Wiki
SparkSQL is an extension to Spark RDD to support structured and semi-structured data. This tutorial explains how to set up a project to use SparkSQL and run simple SQL queries on it.
A walkthrough video is available to follow this tutorial.
- Development setup for big data
- Simple Scala project. Java can also work but this tutorial will focus on Scala.
First, add SparkSQL dependency to your project. Just search for "SparkSQL maven" and choose the latest version or any specific version you want.
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.1</version>
</dependency>
In your application program, the first step is to initialize the SparkSession which gives you access to SparkSQL functions.
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
val conf = new SparkConf().setMaster("local")
val spark = SparkSession
.builder()
.appName("BigData SparkSQL Demo")
.config(conf)
.getOrCreate()
Then, load a semi-structured file using the load command.
val input = spark.read.format("csv")
.option("sep", "\t")
.option("inferSchema", "true")
.option("header", "true")
.load("nasa_header.tsv")
import spark.implicits._
input.show()
You can download the nasa_header.tsv
sample file. Notice that this file contains a header line that allows SparkSQL to access attributes by name.
The output will look similar to the following.
+--------------------+-------+---------+------+--------------------+--------+------+-------+---------+
| host|logname| time|method| url|response| bytes|referer|useragent|
+--------------------+-------+---------+------+--------------------+--------+------+-------+---------+
|pppa006.compuserv...| -|807256800| GET|/images/launch-lo...| 200| 1713| null| null|
| vcc7.langara.bc.ca| -|807256804| GET|/shuttle/missions...| 200| 8677| null| null|
|pppa006.compuserv...| -|807256806| GET|/history/apollo/i...| 200| 1173| null| null|
|thing1.cchem.berk...| -|807256870| GET|/shuttle/missions...| 200| 4705| null| null|
| 202.236.34.35| -|807256881| GET| /whats-new.html| 200| 18936| null| null|
|bettong.client.uq...| -|807256884| GET|/history/skylab/s...| 200| 1687| null| null|
| 202.236.34.35| -|807256884| GET|/images/whatsnew.gif| 200| 651| null| null|
| 202.236.34.35| -|807256885| GET|/images/KSC-logos...| 200| 1204| null| null|
|bettong.client.uq...| -|807256900| GET|/history/skylab/s...| 304| 0| null| null|
|bettong.client.uq...| -|807256913| GET|/images/ksclogosm...| 304| 0| null| null|
|bettong.client.uq...| -|807256913| GET|/history/apollo/i...| 200| 3047| null| null|
| hella.stm.it| -|807256914| GET|/shuttle/missions...| 200|513911| null| null|
|mtv-pm0-ip4.halcy...| -|807256916| GET| /shuttle/countdown/| 200| 4324| null| null|
| ednet1.osl.or.gov| -|807256924| GET| /| 200| 7280| null| null|
|mtv-pm0-ip4.halcy...| -|807256942| GET|/shuttle/countdow...| 200| 46573| null| null|
|dd10-046.compuser...| -|807256943| GET|/shuttle/missions...| 200| 10566| null| null|
|ad11-013.compuser...| -|807256944| GET|/history/history....| 200| 1602| null| null|
|dd10-046.compuser...| -|807256946| GET|/shuttle/missions...| 200| 8083| null| null|
|dd10-046.compuser...| -|807256954| GET|/images/KSC-logos...| 200| 1204| null| null|
|dd10-046.compuser...| -|807256954| GET|/history/apollo/i...| 200| 1173| null| null|
+--------------------+-------+---------+------+--------------------+--------+------+-------+---------+
only showing top 20 rows
You can also print the inferred schema of the data using the command
input.printSchema()
The output will look like the following.
root
|-- host: string (nullable = true)
|-- logname: string (nullable = true)
|-- time: integer (nullable = true)
|-- method: string (nullable = true)
|-- url: string (nullable = true)
|-- response: integer (nullable = true)
|-- bytes: integer (nullable = true)
|-- referer: string (nullable = true)
|-- useragent: string (nullable = true)
Next, we can run simple queries by applying relational operators to the loaded data. There are two ways to run a query in SparkSQL. First, you can build your query one operator at a time, such as filter
or group
. Second, you can directly type a SQL query similar to traditional databases.
For example, let's say that we want to count all the records in a time range by response code. This requires a filter operation, followed by a grouping operation, and finally an aggregate operation. We can do that using this code.
val startTime: Int = 807274014
val endTime: Int = 807283738
val matchedLines = input.filter(s"time BETWEEN ${startTime} AND ${endTime}")
val grouped = matchedLines.groupBy($"response")
val countByCode = grouped.count()
countByCode.show()
Alternatively, you can combine all this logic in one SQL query using this code.
input.createTempView("nasalog")
val groupedCount = spark.sql(s"""SELECT response, count(*)
FROM nasalog
WHERE time BETWEEN ${startTime} AND ${endTime}
GROUP BY response""")
groupedCount.show
In both cases, the result will look something like the following.
+--------+-----+
|response|count|
+--------+-----+
| 404| 40|
| 200| 5818|
| 304| 467|
| 302| 64|
+--------+-----+
You can continue from here. Check the SparkSQL documentation to get more familiar with SparkSQL functions. Check also this SQL Primer to refresh your memories of SQL.