Pyspark Interactive (v1) - cchantra/bigdata.github.io GitHub Wiki
Run pyspark
download jars
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.31/mysql-connector-j-8.0.31.jar -O mysql.jar
mv mysql.jar /home/hadoop/spark/jars
Then
./bin/pyspark --jars jars/mysql.jar
Python 3.8.5 (default, Jan 27 2021, 15:41:15)
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
21/03/22 20:25:51 WARN Utils: Your hostname, bigdata resolves to a loopback address: 127.0.1.1; using 10.3.133.231 instead (on interface ens3)
21/03/22 20:25:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/03/22 20:25:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.1.1
/_/
Using Python version 3.8.5 (default, Jan 27 2021 15:41:15)
Spark context Web UI available at http://10.3.133.231:4040
Spark context available as 'sc' (master = local[*], app id = local-1616419554570).
SparkSession available as 'spark'.
Spark Transformation
map()
flatMap()
filter()
sample()
union()
intersection()
distinct()
join()
Spark Action
reduce()
collect()
count()
first()
takeSample(withReplacement, num, [seed])
===========
#Read in RDD
#make sure you copy demo2.txt to hdfs
hdfs dfs -put demo2.txt /user1/demo2.txt
then call
pyspark
Type at the spark shell:
RDDread = sc.textFile ("hdfs://localhost:9000/demo2.txt")
Then call
RDDread.collect ()
[u'chantana', u'ploy', u'arnon', u'putchong', u'mudang', u'bundit', u'chantana', u'ploy', u'wandee', u'new', u'pisit']
RDDread.first ()
u'chantana'
RDDread.take (4)
[u'chantana', u'anon', u'mudang', u'march']
RDDread.takeSample (False, 4, 2) # (withReplacement, n, [seed])
[u'march', u'anon', u'mudang', u'nack']
RDDread.count ()
7
exit()
Install mySQL 8.0
wget https://dev.mysql.com/get/mysql-apt-config_0.8.12-1_all.deb
sudo dpkg -i mysql-apt-config_0.8.12-1_all.deb
select
Bionic -> MySQL Server and Cluster -> mysql-8.0 -> ok
sudo apt-get update
run
sudo apt install mysql-server
make as a service
sudo systemctl start mysql.service
run
mysql_secure_installation
to config password. my case : I set root password is : "password"
connect with MySQL and create database
my machine for root : "password"
$ mysql -u root -p
Enter password:
then enter your password. Create table
mysql> create database demo;
Query OK, 1 row affected (0.02 sec)
mysql> use demo
Database changed
mysql> create table demotable(id int,name varchar(20),age int);
Query OK, 0 rows affected (0.13 sec)
mysql> use demo;
Database changed
mysql> insert into demotable values(1,"abhay",25),(2,"banti",25);
Query OK, 2 rows affected (0.10 sec)
Records: 2 Duplicates: 0 Warnings: 0
mysql> insert into demotable values(3,"due",30),(4,"joy",45);
Query OK, 2 rows affected (0.14 sec)
Records: 2 Duplicates: 0 Warnings: 0
mysql> select * from demotable;
+------+-------+------+
| id | name | age |
+------+-------+------+
| 1 | abhay | 25 |
| 2 | banti | 25 |
| 3 | due | 30 |
| 4 | joy | 45 |
+------+-------+------+
4 rows in set (0.00 sec)
mysql> exit
NOTE:
Download MySQL jar which would have all the classes to connect to a MySQL database.
or official site (make sure you select version 8.x>
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.31/mysql-connector-j-8.0.31.jar
mv mysql-connector-j-8.0.31.jar mysql.jar
Then move mysql.jar to where CLASSPATH is.
mv mysql.jar /home/hadoop/spark/jars/
make sure you have CLASSPATH in your .bashrc
export CLASSPATH=$CLASSPATH:/home/hadoop/spark/jars
Then
Run pyspark with jar
(Assume under /home/hadoop/spark)
pyspark --jars jars/mysql.jar
>>> user = sqlContext.read.format("jdbc").options( url="jdbc:mysql://localhost:3306/demo",driver = "com.mysql.jdbc.Driver",dbtable = "demotable",user="root",password="password").load()
>>> user.show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|abhay| 25|
| 2|banti| 25|
| 3| due| 30|
| 4| joy| 45|
+---+-----+---+
>>> user.persist()
DataFrame[id: int, name: string, age: int]
>>> user.columns
['id', 'name', 'age']
Troubleshooting: Access Denied Error for 'root'@'localhost'
If you follow the steps above and encounter this specific error when running the .load() command:
py4j.protocol.Py4JJavaError: An error occurred while calling oXX.load. : java.sql.SQLException: Access denied for user 'root'@'localhost'
Reason: This often happens because default MySQL/MariaDB installations on Linux might configure the root user for local connections (localhost) to use the auth_socket plugin. This plugin checks the OS user instead of a password. Spark's JDBC connection, however, typically requires password authentication. Solution: You need to configure the MySQL root@localhost user to use password authentication and set a password (in this example, we'll use password).
- Connect to MySQL:
sudo mysql
- Run SQL Commands: Once inside the MySQL prompt (mysql>), execute the following:
-- Set 'root'@'localhost' to use password authentication with 'password'
-- Using mysql_native_password for broad compatibility
ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'password';
-- Apply the changes
FLUSH PRIVILEGES;
-- Exit MySQL client
EXIT;
⚠️ Using password as the password for the root user is extremely insecure. This password is very easy to guess. Use it only for temporary, isolated testing where security is not a concern. For any other scenario, use a strong, unique password and preferably create a dedicated, less-privileged MySQL user for your application.
Then go back to update table in mysql:
mysql -u root -p
mysql> use demo;
Database changed
mysql> ALTER TABLE demotable add nationality int;
Query OK, 0 rows affected (0.05 sec)
Records: 0 Duplicates: 0 Warnings: 0
mysql> update demotable set nationality = 1 where id = 1 or id = 2;
Query OK, 2 rows affected (0.11 sec)
Rows matched: 2 Changed: 2 Warnings: 0
mysql> update demotable set nationality = 2 where id = 3 or id = 4;
Query OK, 2 rows affected (0.17 sec)
Rows matched: 2 Changed: 2 Warnings: 0
mysql> select * from demotable;
+------+-------+------+-------------+
| id | name | age | nationality |
+------+-------+------+-------------+
| 1 | abhay | 25 | 1 |
| 2 | banti | 25 | 1 |
| 3 | due | 30 | 2 |
| 4 | joy | 45 | 2 |
+------+-------+------+-------------+
4 rows in set (0.00 sec)
mysql> create table national (nationality int, nation_name varchar(50));
Query OK, 0 rows affected (0.14 sec)
mysql> insert into national values (1,"Thailand"), (2,"China");
Query OK, 2 rows affected (0.06 sec)
Records: 2 Duplicates: 0 Warnings: 0
mysql> select * from national;
+-------------+-------------+
| nationality | nation_name |
+-------------+-------------+
| 1 | Thailand |
| 2 | China |
+-------------+-------------+
2 rows in set (0.00 sec)
mysql>
========
Now invoke pyspark again with mysql jar
./bin/pyspark --jars jars/mysql.jar
>>> user = sqlContext.read.format("jdbc").options( url="jdbc:mysql://localhost:3306/demo",driver = "com.mysql.jdbc.Driver",dbtable = "demotable",user="root",password="password").load()
>>> user.show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|abhay| 25|
| 2|banti| 25|
| 3| due| 30|
| 4| joy| 45|
+---+-----+---+
>>> nation = sqlContext.read.format("jdbc").options( url="jdbc:mysql://localhost:3306/demo?characterEncoding=utf8",driver = "com.mysql.jdbc.Driver",dbtable = "national",user="root",password="password").load()
>>> nation.show()
+-----------+-----------+
|nationality|nation_name|
+-----------+-----------+
| 1| Thailand|
| 2| China|
+-----------+-----------+
## create map from spark rdd
>>> user_name = user.rdd.map(lambda row:(row[3],row[1]));
>>> nation_name = nation.rdd.map(lambda row:(row[0],row[1]));
#join rdd
>>> user_name = user.rdd.map(lambda row:(row[3],row[1]));
>>> nation_name = nation.rdd.map(lambda row:(row[0],row[1]));
>>> user_name_nation = nation_name.join(user_name)
>>> nation_name_user = user_name.join(nation_name)
>>> user_name_nation.take(5)
[(2, (u'China', u'due')), (2, (u'China', u'joy')), (1, (u'Thailand', u'abhay')), (1, (u'Thailand', u'banti'))]
>>> user_name_nation.count()
4
Managing text example:
Download some text file
wget https://raw.githubusercontent.com/cchantra/bigdata.github.io/master/spark/sample.txt
hdfs dfs -put sample.txt /user1/
bin/pyspark
>>> confusedRDD = sc.textFile("hdfs://localhost:9000/user1/sample.txt")
>>> confusedRDD.take(5)
[u'The Project Gutenberg EBook of Ion, by Plato', u'', u'This eBook is for the use of anyone anywhere at no cost and with', u'almost no restrictions whatsoever. You may copy it, give it away or', u're-use it under the terms of the Project Gutenberg License included']
>>> mappedconfusion = confusedRDD.map(lambda line : line.split(" "))
>>>
>>> mappedconfusion.take(5)
[u'The', u'Project', u'Gutenberg', u'EBook', u'of', u'Ion,', u'by', u'Plato'], [u''], [u'This', u'eBook', u'is', u'for', u'the', u'use', u'of', u'anyone', u'anywhere', u'at', u'no', u'cost', u'and', u'with'], [u'almost', u'no', u'restrictions', u'whatsoever.', u'', u'You', u'may', u'copy', u'it,', u'give', u'it', u'away', u'or'], [u're-use', u'it', u'under', u'the', u'terms', u'of', u'the', u'Project', u'Gutenberg', u'License', u'included'](/cchantra/bigdata.github.io/wiki/u'The',-u'Project',-u'Gutenberg',-u'EBook',-u'of',-u'Ion,',-u'by',-u'Plato'],-[u''],-[u'This',-u'eBook',-u'is',-u'for',-u'the',-u'use',-u'of',-u'anyone',-u'anywhere',-u'at',-u'no',-u'cost',-u'and',-u'with'],-[u'almost',-u'no',-u'restrictions',-u'whatsoever.',-u'',-u'You',-u'may',-u'copy',-u'it,',-u'give',-u'it',-u'away',-u'or'],-[u're-use',-u'it',-u'under',-u'the',-u'terms',-u'of',-u'the',-u'Project',-u'Gutenberg',-u'License',-u'included')
#flatten them
>>> flatMappedConfusion = confusedRDD.flatMap(lambda line : line.split(" "))
>>> flatMappedConfusion.take(100)
[u'The', u'Project', u'Gutenberg', u'EBook', u'of', u'Ion,', u'by', u'Plato', u'', u'This', u'eBook', u'is', u'for', u'the', u'use', u'of', u'anyone', u'anywhere', u'at', u'no', u'cost', u'and', u'with', u'almost', u'no', u'restrictions', u'whatsoever.', u'', u'You', u'may', u'copy', u'it,', u'give', u'it', u'away', u'or', u're-use', u'it', u'under', u'the', u'terms', u'of', u'the', u'Project', u'Gutenberg', u'License', u'included', u'with', u'this', u'eBook', u'or', u'online', u'at', u'www.gutenberg.org', u'', u'', u'Title:', u'Ion', u'', u'Author:', u'Plato', u'', u'Translator:', u'Benjamin', u'Jowett', u'', u'Posting', u'Date:', u'October', u'10,', u'2008', u'[EBook', u'#1635]', u'Release', u'Date:', u'February,', u'1999', u'', u'Language:', u'English', u'', u'', u'***', u'START', u'OF', u'THIS', u'PROJECT', u'GUTENBERG', u'EBOOK', u'ION', u'***', u'', u'', u'', u'', u'Produced', u'by', u'Sue', u'Asscher', u'']
#filter only "able"
>>> onlyconfusion = confusedRDD.filter(lambda line : ("able" in line.lower()))
>>> onlyconfusion.count()
27
>>> onlyconfusion.collect() # show these lines
[u'speaker is able to judge of the bad. And poetry is a whole; and he', u'who judges of poetry by rules of art ought to be able to judge of', u'reason would be incapable of understanding them. Reflections of this', u'in his art of rhetoric. Even more than the sophist he is incapable of', u'laborious part of my art; and I believe myself able to speak about', u'SOCRATES: And if you were a prophet, would you not be able to interpret', u'you speak of Homer without any art or knowledge. If you were able to', u'speak of him by rules of art, you would have been able to speak of all', u'incapable of criticizing other painters; and when the work of any other', u'was able to discourse of Olympus or Thamyras or Orpheus, or Phemius the', u'not attained to this state, he is powerless and is unable to utter his', u'was better able to judge of the propriety of these lines?', u'will the art of the fisherman or of the rhapsode be better able to judge', u'Socrates, are able to assign different passages in Homer to their', u'are able to praise Homer, you do not deal fairly with me, and after', u'Gutenberg-tm License (available with this file or online at', u'1.E.8. You may charge a reasonable fee for copies of or providing', u' you already use to calculate your applicable taxes. The fee is', u'1.F.1. Project Gutenberg volunteers and employees expend considerable', u'LIABLE TO YOU FOR ACTUAL, DIRECT, INDIRECT, CONSEQUENTIAL, PUNITIVE OR', u'law of the state applicable to this agreement, the agreement shall be', u'the applicable state law. The invalidity or unenforceability of any', u'electronic works in formats readable by the widest variety of computers', u'remain freely available for generations to come. In 2001, the Project', u'freely distributed in machine readable form accessible by the widest', u'charities and charitable donations in all 50 states of the United', u'considerable effort, much paperwork and many fees to meet and keep up']
Union, distinct, intersect operations
>>> joy_grade = [('math',4), ('chemistry',3),('english',4)]
>>> dom_grade = [('math',3), ('chemistry',2),('english',4)]
>>> joy = sc.parallelize(joy_grade)
>>> dom = sc.parallelize(dom_grade)
>>> joy.union(dom).collect() # perform union
[('math', 4), ('chemistry', 3), ('english', 4), ('math', 3), ('chemistry', 2), ('english', 4)]
>>> score_wise_grade = joy.join(dom) # perform join
>>> score_wise_grade.collect()
[('chemistry', (3, 2)), ('math', (4, 3)), ('english', (4, 4))]
>>> best_story = ["movie1","movie3","movie7","movie5","movie8"]
>>> pop_singer = ["madona","taylor smith", "elvis", "chicago","aerosmith", "m2m"]
>>> pop_band = ["chicago","m2m"]
>>> jazz_singer = ["coco_jazz","nancy wilson", "louis armstong", "nat king cole"]
>>> pop = sc.parallelize(pop_singer)
>>> jazz = sc.parallelize(jazz_singer)
>>> popb = sc.parallelize(pop_band)
>>> total = pop.union(popb).union(jazz) #union/ union
>>> total.collect()
['madona', 'taylor smith', 'elvis', 'chicago', 'aerosmith', 'm2m', 'chicago', 'm2m', 'coco_jazz', 'nancy wilson', 'louis armstong', 'nat king cole']
>>> total.distinct().collect() #distinct
['madona', 'aerosmith', 'coco_jazz', 'chicago', 'nat king cole', 'taylor smith', 'm2m', 'louis armstong', 'nancy wilson', 'elvis']
>>> total_ins = pop.intersection(popb)
>>> total_ins.collect()
['m2m', 'chicago']
RDD Partition
Operations are done by dividing the data into multiple parallel partitions. The same operation is performed on the partitions simultaneously which helps achieve fast data processing with spark. Map and Reduce operations can be effectively applied in parallel in apache spark by dividing the data into multiple partitions. A copy of each partition within an RDD is distributed across several workers running on different nodes of a cluster so that in case of failure of a single worker the RDD still remains available.
Degree of parallelism of each operation on RDD depends on the fixed number of partitions that an RDD has. We can specify the degree of parallelism or the number of partitions when creating it or later on using the repartition () and coalesce() methods.
>>> partRDD = sc.textFile("hdfs://localhost:9000/user1/sample.txt")
>>> partRDD.getNumPartitions()
2
-partRDD.mapPartitions() : This runs a map operation individually on each partition unlike a normal map operation where map is used to operate on each line of the entire RDD.
-mapPartitionsWithIndex() : This works same as partRDD.mapPartitions but we can additionally specify the partition number on which this operation has to be applied.
Caching, Accumulators and UDF Accumulators in spark are the global variable that can be shared across tasks. The scope of normal variables is just limited to a specific task so they can’t hold any update that needs to be aggregated from all other tasks. UDF (User Defined Functions) provide a simple way to add separate functions into Spark that can be used during various transformation stages.
Try full program (movielens)
download data u.user from (http://files.grouplens.org/datasets/movielens/ml-100k/). You can get from
wget https://raw.githubusercontent.com/cchantra/bigdata.github.io/master/spark/movie_len_user
hdfs dfs -put movie_len_user /user1/
bin/pyspark
>>> userRDD = sc.textFile("hdfs://localhost:9000/user1/movie_len_user")
>>> userRDD.count()
943
type in the function
def parse_N_calculate_age(data):
userid,age,gender,occupation,zip = data.split("|")
return userid, age_group(int(age)),gender,occupation,zip,int(age)
def age_group(age):
if age < 10 :
return '0-10'
elif age < 20:
return '10-20'
elif age < 30:
return '20-30'
elif age < 40:
return '30-40'
elif age < 50:
return '40-50'
elif age < 60:
return '50-60'
elif age < 70:
return '60-70'
elif age < 80:
return '70-80'
else :
return '80+'
>>> data_with_age_bucket = userRDD.map(parse_N_calculate_age)
>>> RDD_20_30 = data_with_age_bucket.filter(lambda line : '20-30' in line)
>>> freq = RDD_20_30.map(lambda line : line[3]).countByValue()
age_wise = RDD_20_30.map (lambda line : line[2]).countByValue()
>>> dict(freq)
{u'administrator': 19, u'lawyer': 4, u'healthcare': 4, u'marketing': 5, u'executive': 7, u'doctor': 2, u'scientist': 8, u'student': 116, u'technician': 12, u'librarian': 11, u'programmer': 30, u'salesman': 2, u'homemaker': 3, u'engineer': 23, u'none': 2, u'artist': 12, u'writer': 14, u'entertainment': 8, u'other': 38, u'educator': 12}
--count the number of user (same age group, by gender)
>>> age_wise = RDD_20_30.map (lambda line : line[2]).countByValue()
>>> dict(age_wise)
{u'M': 247, u'F': 85}
remove from memory
>>> RDD_20_30.unpersist()
PythonRDD[118] at RDD at PythonRDD.scala:53
use accumulator for outliers detection.
>>> Under_age = sc.accumulator(0)
>>> Over_age = sc.accumulator(0)
Define under age and over age function
def outliers(data):
global Over_age, Under_age
age_grp = data[1]
if age_grp == "70-80": # Use == for comparison
Over_age += 1
if age_grp == "0-10": # Use == for comparison
Under_age += 1
return data
>>> df = data_with_age_bucket.map(outliers).collect()
check the total that is under age and over age.
>>> Under_age.value
1
>>> Over_age.value
4
>>>
Create a demo python
assume file: demo.py: run in standalone mode.
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('MyFirstStandaloneApp')
# Note: Set master in SparkConf or via spark-submit if not running locally
# e.g., conf.setMaster("local[*]") or conf.setMaster("yarn")
sc = SparkContext(conf=conf)
userRDD = sc.textFile("hdfs://localhost:9000/user1/movie_len_user")
def parse_N_calculate_age(data):
# Consistent 4-space indentation
try: # Added basic error handling for split/conversion
userid, age_str, gender, occupation, zip_code = data.split("|")
age = int(age_str)
return userid, age_group(age), gender, occupation, zip_code, age
except ValueError:
# Handle lines that don't split correctly or age isn't an int
# Option: return None, or a specific marker, or log error
return None # Returning None requires filtering later
def age_group(age):
# Consistent 4-space indentation
if age < 10:
return '0-10'
elif age < 20:
return '10-20'
elif age < 30:
return '20-30'
elif age < 40:
return '30-40'
elif age < 50:
return '40-50'
elif age < 60:
return '50-60'
elif age < 70:
return '60-70'
elif age < 80:
return '70-80'
else:
return '80+'
# Apply map and filter out None values from parsing errors
data_with_age_bucket = userRDD.map(parse_N_calculate_age).filter(lambda x: x is not None)
# Cache potentially? data_with_age_bucket.cache() if reused often
RDD_20_30 = data_with_age_bucket.filter(lambda line: line[1] == '20-30') # Use specific index [1] for age group
# Calculate frequency count for occupation in the 20-30 age group
freq = RDD_20_30.map(lambda line: line[3]).countByValue() # Index [3] is occupation
# Use print() function for Python 3 compatibility
print("Total user count (successfully parsed): ", data_with_age_bucket.count()) # Count parsed records
print("Profession frequency (age 20-30): ", dict(freq))
# Initialize accumulators
Under_age = sc.accumulator(0)
Over_age = sc.accumulator(0)
# Define function to update accumulators (no 'global' needed)
def outliers(data):
# Access accumulators defined in the outer scope
age_grp = data[1] # Index [1] is age group string
# Use .add() method for accumulators
if age_grp == "70-80":
Over_age.add(1)
if age_grp == "0-10":
Under_age.add(1)
# This function is used for its side effect (accumulator update)
# within a map, so it still needs to return the data element
return data
# Apply the map transformation using the 'outliers' function.
# This sets up the computation but doesn't run it yet.
mapped_rdd_for_outliers = data_with_age_bucket.map(outliers)
# Trigger an action to execute the map and update accumulators.
# Using .count() forces evaluation and is usually more efficient
# than .collect() if you only need the side effects (like accumulator updates).
mapped_rdd_for_outliers.count()
# Print accumulator values (use .value attribute)
print("Under age (0-10) users count: ", Under_age.value)
print("Over age (70-80) users count: ", Over_age.value)
# Stop the SparkContext - important in standalone scripts
sc.stop()
run at shell prompt
./bin/spark-submit demo.py
hadoop@bigdata:~/spark$ bin/spark-submit demo.py
2020-03-25 15:15:04,298 WARN util.Utils: Your hostname, bigdata resolves to a loopback address: 127.0.1.1; using 10.3.133.100 instead (on interface ens3)
2020-03-25 15:15:04,299 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
2020-03-25 15:15:04,811 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020-03-25 15:15:05,340 INFO spark.SparkContext: Running Spark version 3.0.0-preview2
2020-03-25 15:15:05,403 INFO resource.ResourceUtils: ==============================================================
2020-03-25 15:15:05,404 INFO resource.ResourceUtils: Resources for spark.driver:
2020-03-25 15:15:05,405 INFO resource.ResourceUtils: ==============================================================
2020-03-25 15:15:05,405 INFO spark.SparkContext: Submitted application: MyFirstStandaloneApp
2020-03-25 15:15:05,485 INFO spark.SecurityManager: Changing view acls to: hadoop
2020-03-25 15:15:05,485 INFO spark.SecurityManager: Changing modify acls to: hadoop
2020-03-25 15:15:05,485 INFO spark.SecurityManager: Changing view acls groups to:
2020-03-25 15:15:05,485 INFO spark.SecurityManager: Changing modify acls groups to:
2020-03-25 15:15:05,485 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); groups with view permissions: Set(); users with modify permissions: Set(hadoop); groups with modify permissions: Set()
2020-03-25 15:15:05,862 INFO util.Utils: Successfully started service 'sparkDriver' on port 32863.
2020-03-25 15:15:05,895 INFO spark.SparkEnv: Registering MapOutputTracker
2020-03-25 15:15:05,932 INFO spark.SparkEnv: Registering BlockManagerMaster
2020-03-25 15:15:05,959 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
2020-03-25 15:15:05,960 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
2020-03-25 15:15:05,964 INFO spark.SparkEnv: Registering BlockManagerMasterHeartbeat
2020-03-25 15:15:05,982 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-a2fdbd33-a298-4082-a960-dfdb2a478b35
2020-03-25 15:15:06,010 INFO memory.MemoryStore: MemoryStore started with capacity 366.1 MiB
2020-03-25 15:15:06,030 INFO spark.SparkEnv: Registering OutputCommitCoordinator
2020-03-25 15:15:06,142 INFO util.log: Logging initialized @2798ms to org.sparkproject.jetty.util.log.Slf4jLog
2020-03-25 15:15:06,237 INFO server.Server: jetty-9.4.z-SNAPSHOT; built: 2019-04-29T20:42:08.989Z; git: e1bc35120a6617ee3df052294e433f3a25ce7097; jvm 1.8.0_232-8u232-b09-0ubuntu1~16.04.1-b09
2020-03-25 15:15:06,264 INFO server.Server: Started @2922ms
2020-03-25 15:15:06,301 INFO server.AbstractConnector: Started ServerConnector@1ed04ff{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2020-03-25 15:15:06,301 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
2020-03-25 15:15:06,332 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1418c57{/jobs,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,334 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@169ec77{/jobs/json,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,335 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1a75433{/jobs/job,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,338 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@19a245f{/jobs/job/json,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,339 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@14864e5{/stages,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,339 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@c94d5e{/stages/json,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,340 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@182776{/stages/stage,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,342 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4a0a9{/stages/stage/json,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,343 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@c5a509{/stages/pool,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,344 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@92f95{/stages/pool/json,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,344 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@17a0475{/storage,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,345 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@11b0347{/storage/json,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,346 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@b592fd{/storage/rdd,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,346 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@9d29b6{/storage/rdd/json,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,347 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@84663d{/environment,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,348 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@e27f3b{/environment/json,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,348 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@84a57e{/executors,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,349 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1d4b425{/executors/json,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,350 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1843f60{/executors/threadDump,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,351 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1b00da5{/executors/threadDump/json,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,361 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@133787a{/static,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,362 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@17bb6ca{/,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,363 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@c6a683{/api,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,364 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@830cf9{/jobs/job/kill,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,365 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@165c892{/stages/stage/kill,null,AVAILABLE,@Spark}
2020-03-25 15:15:06,367 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.3.133.100:4040
2020-03-25 15:15:06,544 INFO executor.Executor: Starting executor ID driver on host 10.3.133.100
2020-03-25 15:15:06,573 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44451.
2020-03-25 15:15:06,574 INFO netty.NettyBlockTransferService: Server created on 10.3.133.100:44451
2020-03-25 15:15:06,575 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
2020-03-25 15:15:06,587 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.3.133.100, 44451, None)
2020-03-25 15:15:06,591 INFO storage.BlockManagerMasterEndpoint: Registering block manager 10.3.133.100:44451 with 366.1 MiB RAM, BlockManagerId(driver, 10.3.133.100, 44451, None)
2020-03-25 15:15:06,594 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.3.133.100, 44451, None)
2020-03-25 15:15:06,595 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.3.133.100, 44451, None)
2020-03-25 15:15:06,797 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1330975{/metrics/json,null,AVAILABLE,@Spark}
/home/hadoop/spark/python/lib/pyspark.zip/pyspark/context.py:219: DeprecationWarning: Support for Python 2 and Python 3 prior to version 3.6 is deprecated as of Spark 3.0. See also the plan for dropping Python 2 support at https://spark.apache.org/news/plan-for-dropping-python-2-support.html.
DeprecationWarning)
2020-03-25 15:15:07,481 WARN util.SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
2020-03-25 15:15:07,530 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 263.2 KiB, free 365.9 MiB)
2020-03-25 15:15:07,607 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 27.6 KiB, free 365.9 MiB)
2020-03-25 15:15:07,610 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.3.133.100:44451 (size: 27.6 KiB, free: 366.1 MiB)
2020-03-25 15:15:07,614 INFO spark.SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:0
2020-03-25 15:15:08,405 INFO mapred.FileInputFormat: Total input files to process : 1
2020-03-25 15:15:08,515 INFO spark.SparkContext: Starting job: countByValue at /home/hadoop/spark/demo.py:36
2020-03-25 15:15:08,531 INFO scheduler.DAGScheduler: Got job 0 (countByValue at /home/hadoop/spark/demo.py:36) with 2 output partitions
2020-03-25 15:15:08,531 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (countByValue at /home/hadoop/spark/demo.py:36)
2020-03-25 15:15:08,532 INFO scheduler.DAGScheduler: Parents of final stage: List()
2020-03-25 15:15:08,533 INFO scheduler.DAGScheduler: Missing parents: List()
2020-03-25 15:15:08,539 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] at countByValue at /home/hadoop/spark/demo.py:36), which has no missing parents
2020-03-25 15:15:08,568 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 9.8 KiB, free 365.9 MiB)
2020-03-25 15:15:08,576 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 6.0 KiB, free 365.9 MiB)
2020-03-25 15:15:08,577 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.3.133.100:44451 (size: 6.0 KiB, free: 366.1 MiB)
2020-03-25 15:15:08,577 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1206
2020-03-25 15:15:08,602 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (PythonRDD[2] at countByValue at /home/hadoop/spark/demo.py:36) (first 15 tasks are for partitions Vector(0, 1))
2020-03-25 15:15:08,604 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
2020-03-25 15:15:08,682 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.3.133.100, executor driver, partition 0, ANY, 7383 bytes)
2020-03-25 15:15:08,686 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 10.3.133.100, executor driver, partition 1, ANY, 7383 bytes)
2020-03-25 15:15:08,695 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
2020-03-25 15:15:08,712 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)
2020-03-25 15:15:09,104 INFO rdd.HadoopRDD: Input split: hdfs://localhost:9000/user1/movie_len_user:0+11314
2020-03-25 15:15:09,104 INFO rdd.HadoopRDD: Input split: hdfs://localhost:9000/user1/movie_len_user:11314+11314
2020-03-25 15:15:09,436 INFO python.PythonRunner: Times: total = 176, boot = 127, init = 46, finish = 3
2020-03-25 15:15:09,436 INFO python.PythonRunner: Times: total = 306, boot = 254, init = 49, finish = 3
2020-03-25 15:15:09,461 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 1993 bytes result sent to driver
2020-03-25 15:15:09,461 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 2025 bytes result sent to driver
2020-03-25 15:15:09,469 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 782 ms on 10.3.133.100 (executor driver) (1/2)
2020-03-25 15:15:09,471 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 809 ms on 10.3.133.100 (executor driver) (2/2)
2020-03-25 15:15:09,472 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
2020-03-25 15:15:09,476 INFO python.PythonAccumulatorV2: Connected to AccumulatorServer at host: 127.0.0.1 port: 51547
2020-03-25 15:15:09,482 INFO scheduler.DAGScheduler: ResultStage 0 (countByValue at /home/hadoop/spark/demo.py:36) finished in 0.917 s
2020-03-25 15:15:09,487 INFO scheduler.DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
2020-03-25 15:15:09,488 INFO scheduler.TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
2020-03-25 15:15:09,490 INFO scheduler.DAGScheduler: Job 0 finished: countByValue at /home/hadoop/spark/demo.py:36, took 0.974480 s
total user count is 2020-03-25 15:15:09,532 INFO spark.SparkContext: Starting job: count at /home/hadoop/spark/demo.py:38
2020-03-25 15:15:09,534 INFO scheduler.DAGScheduler: Got job 1 (count at /home/hadoop/spark/demo.py:38) with 2 output partitions
2020-03-25 15:15:09,534 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (count at /home/hadoop/spark/demo.py:38)
2020-03-25 15:15:09,534 INFO scheduler.DAGScheduler: Parents of final stage: List()
2020-03-25 15:15:09,534 INFO scheduler.DAGScheduler: Missing parents: List()
2020-03-25 15:15:09,536 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (PythonRDD[3] at count at /home/hadoop/spark/demo.py:38), which has no missing parents
2020-03-25 15:15:09,542 INFO memory.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 7.9 KiB, free 365.8 MiB)
2020-03-25 15:15:09,544 INFO memory.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.8 KiB, free 365.8 MiB)
2020-03-25 15:15:09,545 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.3.133.100:44451 (size: 4.8 KiB, free: 366.1 MiB)
2020-03-25 15:15:09,546 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1206
2020-03-25 15:15:09,547 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (PythonRDD[3] at count at /home/hadoop/spark/demo.py:38) (first 15 tasks are for partitions Vector(0, 1))
2020-03-25 15:15:09,547 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
2020-03-25 15:15:09,549 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 10.3.133.100, executor driver, partition 0, ANY, 7383 bytes)
2020-03-25 15:15:09,550 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 10.3.133.100, executor driver, partition 1, ANY, 7383 bytes)
2020-03-25 15:15:09,550 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 2)
2020-03-25 15:15:09,550 INFO executor.Executor: Running task 1.0 in stage 1.0 (TID 3)
2020-03-25 15:15:09,557 INFO rdd.HadoopRDD: Input split: hdfs://localhost:9000/user1/movie_len_user:0+11314
2020-03-25 15:15:09,559 INFO rdd.HadoopRDD: Input split: hdfs://localhost:9000/user1/movie_len_user:11314+11314
2020-03-25 15:15:09,601 INFO python.PythonRunner: Times: total = 40, boot = -120, init = 159, finish = 1
2020-03-25 15:15:09,603 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 2). 1585 bytes result sent to driver
2020-03-25 15:15:09,604 INFO python.PythonRunner: Times: total = 41, boot = -124, init = 163, finish = 2
2020-03-25 15:15:09,605 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 56 ms on 10.3.133.100 (executor driver) (1/2)
2020-03-25 15:15:09,609 INFO executor.Executor: Finished task 1.0 in stage 1.0 (TID 3). 1585 bytes result sent to driver
2020-03-25 15:15:09,611 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 61 ms on 10.3.133.100 (executor driver) (2/2)
2020-03-25 15:15:09,611 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
2020-03-25 15:15:09,612 INFO scheduler.DAGScheduler: ResultStage 1 (count at /home/hadoop/spark/demo.py:38) finished in 0.073 s
2020-03-25 15:15:09,613 INFO scheduler.DAGScheduler: Job 1 is finished. Cancelling potential speculative or zombie tasks for this job
2020-03-25 15:15:09,613 INFO scheduler.TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
2020-03-25 15:15:09,613 INFO scheduler.DAGScheduler: Job 1 finished: count at /home/hadoop/spark/demo.py:38, took 0.080571 s
943
total movie users profession wise {u'administrator': 19, u'lawyer': 4, u'healthcare': 4, u'marketing': 5, u'executive': 7, u'doctor': 2, u'scientist': 8, u'student': 116, u'technician': 12, u'librarian': 11, u'programmer': 30, u'salesman': 2, u'homemaker': 3, u'engineer': 23, u'none': 2, u'artist': 12, u'writer': 14, u'entertainment': 8, u'other': 38, u'educator': 12}
2020-03-25 15:15:09,636 INFO spark.SparkContext: Starting job: collect at /home/hadoop/spark/demo.py:54
2020-03-25 15:15:09,637 INFO scheduler.DAGScheduler: Got job 2 (collect at /home/hadoop/spark/demo.py:54) with 2 output partitions
2020-03-25 15:15:09,637 INFO scheduler.DAGScheduler: Final stage: ResultStage 2 (collect at /home/hadoop/spark/demo.py:54)
2020-03-25 15:15:09,637 INFO scheduler.DAGScheduler: Parents of final stage: List()
2020-03-25 15:15:09,638 INFO scheduler.DAGScheduler: Missing parents: List()
2020-03-25 15:15:09,639 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (PythonRDD[4] at collect at /home/hadoop/spark/demo.py:54), which has no missing parents
2020-03-25 15:15:09,641 INFO memory.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 7.9 KiB, free 365.8 MiB)
2020-03-25 15:15:09,644 INFO memory.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 4.9 KiB, free 365.8 MiB)
2020-03-25 15:15:09,644 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 10.3.133.100:44451 (size: 4.9 KiB, free: 366.1 MiB)
2020-03-25 15:15:09,645 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1206
2020-03-25 15:15:09,646 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 2 (PythonRDD[4] at collect at /home/hadoop/spark/demo.py:54) (first 15 tasks are for partitions Vector(0, 1))
2020-03-25 15:15:09,646 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
2020-03-25 15:15:09,650 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4, 10.3.133.100, executor driver, partition 0, ANY, 7383 bytes)
2020-03-25 15:15:09,650 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 2.0 (TID 5, 10.3.133.100, executor driver, partition 1, ANY, 7383 bytes)
2020-03-25 15:15:09,651 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 4)
2020-03-25 15:15:09,651 INFO executor.Executor: Running task 1.0 in stage 2.0 (TID 5)
2020-03-25 15:15:09,657 INFO rdd.HadoopRDD: Input split: hdfs://localhost:9000/user1/movie_len_user:11314+11314
2020-03-25 15:15:09,657 INFO rdd.HadoopRDD: Input split: hdfs://localhost:9000/user1/movie_len_user:0+11314
2020-03-25 15:15:09,704 INFO python.PythonRunner: Times: total = 44, boot = -55, init = 94, finish = 5
2020-03-25 15:15:09,707 INFO python.PythonRunner: Times: total = 45, boot = -55, init = 96, finish = 4
2020-03-25 15:15:09,713 INFO executor.Executor: Finished task 1.0 in stage 2.0 (TID 5). 23380 bytes result sent to driver
2020-03-25 15:15:09,713 INFO executor.Executor: Finished task 0.0 in stage 2.0 (TID 4). 23365 bytes result sent to driver
2020-03-25 15:15:09,715 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 2.0 (TID 5) in 65 ms on 10.3.133.100 (executor driver) (1/2)
2020-03-25 15:15:09,716 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4) in 67 ms on 10.3.133.100 (executor driver) (2/2)
2020-03-25 15:15:09,716 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
2020-03-25 15:15:09,717 INFO scheduler.DAGScheduler: ResultStage 2 (collect at /home/hadoop/spark/demo.py:54) finished in 0.077 s
2020-03-25 15:15:09,717 INFO scheduler.DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
2020-03-25 15:15:09,718 INFO scheduler.TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished
2020-03-25 15:15:09,718 INFO scheduler.DAGScheduler: Job 2 finished: collect at /home/hadoop/spark/demo.py:54, took 0.081711 s
under age users of the movie are 1
over age users of the movie are 4
2020-03-25 15:15:09,737 INFO spark.SparkContext: Invoking stop() from shutdown hook
2020-03-25 15:15:09,744 INFO server.AbstractConnector: Stopped Spark@1ed04ff{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2020-03-25 15:15:09,746 INFO ui.SparkUI: Stopped Spark web UI at http://10.3.133.100:4040
2020-03-25 15:15:09,764 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
2020-03-25 15:15:09,801 INFO memory.MemoryStore: MemoryStore cleared
2020-03-25 15:15:09,802 INFO storage.BlockManager: BlockManager stopped
2020-03-25 15:15:09,810 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
2020-03-25 15:15:09,814 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
2020-03-25 15:15:09,821 INFO spark.SparkContext: Successfully stopped SparkContext
2020-03-25 15:15:09,822 INFO util.ShutdownHookManager: Shutdown hook called
2020-03-25 15:15:09,822 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-d0d12a0d-aa48-4ee4-af01-161820939ed0/pyspark-37cd4109-07a6-42c5-a0bc-a163d731bd5e
2020-03-25 15:15:09,825 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-d0d12a0d-aa48-4ee4-af01-161820939ed0
2020-03-25 15:15:09,828 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-4ff4a383-a4b8-4814-be2d-1eea3f959ed4
References
https://www.dezyre.com/apache-spark-tutorial/pyspark-tutorial