Spark - BKJackson/BKJackson_Wiki GitHub Wiki

Spark api
Visualization of Spark Architecture (from Spark API)

What is Spark?

Spark is a general-purpose distributed data processing engine

RDD = Resilient Distributed Datasets. RDDs hide all the complexity of transforming and distributing your data automatically across multiple nodes by a scheduler if you’re running on a cluster.

PySpark

In a Python context, think of PySpark as a way to handle parallel processing without the need for the threading or multiprocessing modules. All of the complicated communication and synchronization between threads, processes, and even different CPUs is handled by Spark. PySpark Intro

SparkContext for a local machine

The entry-point of any PySpark program is a SparkContext object. This object allows you to connect to a Spark cluster and create RDDs. The local[*] string is a special string denoting that you’re using a local cluster, which is another way of saying you’re running in single-machine mode. The * tells Spark to create as many worker threads as logical cores on your machine.

import pyspark
sc = pyspark.SparkContext('local[*]')  

SparkContext for a cluster

Creating a SparkContext can be more involved when you’re using a cluster. To connect to a Spark cluster, you might need to handle authentication and a few other pieces of information specific to your cluster. You can set up those details similarly to the following:

conf = pyspark.SparkConf()
conf.setMaster('spark://head_node:56887')
conf.set('spark.authenticate', True)
conf.set('spark.authenticate.secret', 'secret-key')
sc = SparkContext(conf=conf)  

filter(), map(), and reduce()

filter() - filters items out of an iterable based on a condition, typically expressed as a lambda function. filter() takes an iterable, calls the lambda function on each item, and returns the items in an iterable where the lambda returned True.

>>> x = ['Python', 'programming', 'is', 'awesome!']
>>> print(list(filter(lambda arg: len(arg) < 8, x)))
['Python', 'is']  

map() is similar to filter() in that it applies a function to each item in an iterable, but it always produces a 1-to-1 mapping of the original items. The new iterable that map() returns will always have the same number of elements as the original iterable, which was not the case with filter(). map() automatically calls the lambda function on all the items, effectively replacing a for loop.

>>> x = ['Python', 'programming', 'is', 'awesome!']
>>> print(list(map(lambda arg: arg.upper(), x)))
['PYTHON', 'PROGRAMMING', 'IS', 'AWESOME!']  

reduce()applies a function to elements in an iterable. However, reduce() doesn’t return a new iterable. Instead, reduce() uses the function called to reduce the iterable to a single value. There is no call to list() here because reduce() already returns a single item. Note: Python 3.x moved the built-in reduce() function into the functools package.

>>> from functools import reduce
>>> x = ['Python', 'programming', 'is', 'awesome!']
>>> print(reduce(lambda val1, val2: val1 + val2, x))
Pythonprogrammingisawesome!  

Spark SQL

Reading CSV files into a Spark dataframe

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data_path = '/MyPath/data'
file_path = os.path.join(data_path, 'location_temp.csv')
df1 = spark.read.format("csv").option("header", "true").load(file_path)

Reading JSON files

json_df2_path = data_path + "/utilization.json"
df2 = spark.read.format("json").load(json_df2_path)  

Display dataframe data

df2.head(3)   

Output: 
[Row(cpu_utilization=0.57, event_datetime='03/05/2019 08:06:14', free_memory=0.51, server_id=100, session_count=47),
 Row(cpu_utilization=0.47, event_datetime='03/05/2019 08:11:14', free_memory=0.62, server_id=100, session_count=43),
 Row(cpu_utilization=0.56, event_datetime='03/05/2019 08:16:14', free_memory=0.57, server_id=100, session_count=62)]

Display dataframe data as a table

df2.show(3)

Output:
+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|
+---------------+-------------------+-----------+---------+-------------+
only showing top 3 rows  

Print schema

df.printSchema()

Output:
root
 |-- cpu_utilization: double (nullable = true)
 |-- event_datetime: string (nullable = true)
 |-- free_memory: double (nullable = true)
 |-- server_id: long (nullable = true)
 |-- session_count: long (nullable = true)  

Create dataframe manually

df_dup = sc.parallelize([Row(server_name='101 Server', cpu_utilization=85, session_count=80), \
                             Row(server_name='101 Server', cpu_utilization=80, session_count=90),
                             Row(server_name='102 Server', cpu_utilization=85, session_count=80),
                             Row(server_name='102 Server', cpu_utilization=85, session_count=80)]).toDF()  

Sort dataframe by column

df2_sort = df2_sample.sort('event_datetime')  

Show summary statistics

df2.describe().show()  


Output:  
+-------+-------------------+-------------------+-------------------+------------------+------------------+
|summary|    cpu_utilization|     event_datetime|        free_memory|         server_id|     session_count|
+-------+-------------------+-------------------+-------------------+------------------+------------------+
|  count|             500000|             500000|             500000|            500000|            500000|
|   mean| 0.6205177400000123|               null| 0.3791280999999977|             124.5|          69.59616|
| stddev|0.15875173872912837|               null|0.15830931278376217|14.430884120553255|14.850676696352865|
|    min|               0.22|03/05/2019 08:06:14|                0.0|               100|                32|
|    max|                1.0|04/09/2019 01:22:46|               0.78|               149|               105|
+-------+-------------------+-------------------+-------------------+------------------+------------------+

Calculate correlation between columns

df_util.stat.corr('free_memory', 'cpu_utilization')  

Calculate item frequency

df_util.stat.freqItems(('server_id','session_count')).show()  

Get number of rows

df2.count()  

Group by one column and calculate mean of another column

df1.groupby('location_id').agg({'temp_celcius': 'mean'}).show()  

Save dataframe to csv

df1.write.csv('df1.csv')  

Save dataframe to json

df1.write.json('df1.json')  

Working with null values

Add an empty column full of "null" strings

df_na = df.withColumn('na_col', lit(None).cast(StringType()))  

Drop rows with null values

df2.na.drop()  

Run SQL queries on dataframe

Start by giving the table a name "utilization":

df.createOrReplaceTempView("utilization")

# Select all columns
df_sql = spark.sql("SELECT * FROM utilization LIMIT 10")  

# Select two columns  
df_sql = spark.sql("SELECT server_id, session_count FROM utilization LIMIT 10"  
  
# Select two columns and give them alias names  
df_sql = spark.sql("SELECT server_id as sid, session_count as sc FROM utilization")
  
# Select all where server_id = 120  
df_sql = spark.sql("SELECT * FROM utilization WHERE server_id = 120")

# Writing longer SQL statements 
df_sql = spark.sql("SELECT server_id, session_count \
                   FROM utilization \
                   WHERE session_count > 70 AND server_id = 120 \
                   ORDER BY session_count DESC"

# Select unique server_id values with DISTINCT  
df_count = spark.sql("SELECT DISTINCT server_id FROM utilization ORDER BY server_id"  

# Get count of number of rows in the table    
df_count = spark.sql("SELECT count(*) FROM utilization")   

# Get count of rows where session_count > 70 and group by server_id and order by descending count   
df_sql = spark.sql("SELECT server_id, count(*) \
                    FROM utilization \
                    WHERE session_count > 70 \
                    GROUP BY server_id" \
                    ORDER BY count(*) DESC"))  

# Create some summary statistics and round the output to 2 decimal places  
df_sql = spark.sql("SELECT server_id, min(session_count), round(avg(session_count),2), max(session_count) \
                    FROM utilization \
                    WHERE session_count > 70 \
                    GROUP BY server_id \
                    ORDER BY count(*) DESC")  

# Join two tables by server_id   
df_join = spark.sql("SELECT u.server_id, sn.server_name, u.session_count \
                     FROM utilization u \
                     INNER JOIN server_name sn \
                     ON sn.server_id = u.server_id")  

df_sql.show()

Alternative way to query and view data

sql_window2 = "SELECT event_datetime, server_id, cpu_utilization,  \
         avg(cpu_utilization) OVER (PARTITION BY server_id) avg_server_util, \
         cpu_utilization - avg(cpu_utilization) OVER (PARTITION BY server_id) delta_server_util \
         FROM utilization"  

spark.sql(sql_window2).show()

Spark for Machine Learning and AI

Driver host parameter setting for pyspark.ml

spark = SparkSession.builder.config("spark.driver.host","127.0.0.1").config("spark.driver.bindAddress","127.0.0.1").getOrCreate()

Articles

A Neanderthal’s Guide to Apache Spark in Python - Tutorial on Getting Started with PySpark for Complete Beginners, June 14, 2019
First Steps With PySpark and Big Data Processing - Real Python, July 31 2019