Lesson 20 Distributed Processing with Spark Tutorial - adparker/GADSLA_1403 GitHub Wiki
This lesson is based on the AMPCamp Tutorial.
Apache Spark is a distributed data processing system written in Scala. It builds on the ideas of map-reduce (fault-tolerant distributed data processing on commodity machines) with the addition of caching data between processing operations and smart construction of job dependencies for scheduling and error recovery.
The RDD is the core abstraction of Apache Spark. It's collection of records that are partitioned across the machines of the cluster. It supports many common data manipulation operations.
Original instructions are here
You can create an account at Amazon AWS and navigate to EC2 -> Access Keys to create a new access key. Once you've downloaded your access key (.pem
file) you will need to change the permissions on that file:
$ chmod 600 <keypair_file>
# Set Amazon Keys (these are available under Account > IMA > Users > Create User)
$ export AWS_ACCESS_KEY_ID=<>
$ export AWS_SECRET_ACCESS_KEY=<>
$ git clone git://github.com/amplab/training-scripts.git -b ampcamp4
$ cd training-scripts
$ ./spark-ec2 -i <key_file> -k <name_of_key_pair> --copy launch amplab-training
Where <name_of_key_pair>
is the name of your EC2 key pair (that you gave it when you created it), <key_file>
is the private key file for your key pair. amplab-training
is just our name for the cluster, and you can change it to be whatever you want.
I have some other options that I want to add, such as waiting 600 seconds for all the machines to boot up, and launch 19 slave machines:
$ ./spark-ec2 -i ~/aws/SparkKeyPair.pem -k SparkKeyPair -s 19 -w 600 --copy launch amplab-training
The whole thing takes 30 - 40 minutes. This launches a total of 20 machines, one master and 19 slaves, sets up the Berkeley Data Analytics Stack, and copies files from Wikipedia and other training documents into HDFS.
Note that by default, Amazon limits the number of instances you can run, depending on instance type. For me, I can only launch a total of 20 m1.xlarge instances.
To get the names of my Spark masters that I'm running, I do:
$ ./get-masters
amplab-training ec2-184-73-150-82.compute-1.amazonaws.com
Once the cluster been launched, you can login through ssh
.
$ ssh -i <keypair_file> root@<master_address.com>
In my case, I do:
$ ssh -i ~/aws/SparkKeyPair.pem [email protected]
Most importantly, remember to kill the cluster when you are done with it otherwise you may start to build up a bill
$ ./spark-ec2 -i <keypair_file> destroy <cluster_name>
More complete instructions are available at AMPLab Tutorials
If you want to access files on S3, you'll need to set your ACCESS_KEY in your core-site.xml configuration file.
This is available at ephemeral-hdfs/hadoop/conf/core-site.xml
on the master. Open this file and add the following lines:
<property>
<name>fs.s3.awsAccessKeyId</name>
<value>ACCESS_KEY</value>
</property>
<property>
<name>fs.s3.awsSecretAccessKey</name>
<value>SECRET</value>
</property>
<property>
<name>fs.s3n.awsAccessKeyId</name>
<value>ACCESS_KEY</value>
</property>
<property>
<name>fs.s3n.awsSecretAccessKey</name>
<value>SECRET</value>
</property>
Among other datasets, your HDFS cluster should come preloaded with 20GB of Wikipedia traffic statistics data obtained from http://aws.amazon.com/datasets/4182 . To make the analysis feasible (within the short timeframe of the exercise), we took three days worth of data (May 5 to May 7, 2009; roughly 20G and 329 million entries). You can list the files:
$ ephemeral-hdfs/bin/hadoop fs -ls /wiki/pagecounts
There are 74 files (2 of which are intentionally left empty).
The data are partitioned by date and time. Each file contains traffic statistics for all pages in a specific hour. Let’s take a look at the file:
ephemeral-hdfs/bin/hadoop fs -cat /wiki/pagecounts/part-00148 | less
The first few lines of the file are copied here:
20090507-040000 aa ?page=http://www.stockphotosharing.com/Themes/Images/users_raw/id.txt 3 39267
20090507-040000 aa Main_Page 7 51309
20090507-040000 aa Special:Boardvote 1 11631
20090507-040000 aa Special:Imagelist 1 931
Each line, delimited by a space, contains stats for one page. The schema is:
<date_time> <project_code> <page_title> <num_hits> <page_size>
The <date_time>
field specifies a date in the YYYYMMDD format (year, month, day) followed by a hyphen and then the hour in the HHmmSS format (hour, minute, second). There is no information in mmSS. The <project_code>
field contains information about the language of the pages. For example, project code “en” indicates an English page. The <page_title>
field gives the title of the Wikipedia page. The <num_hits>
field gives the number of page views in the hour-long time slot starting at <data_time>
. The <page_size>
field gives the size in bytes of the Wikipedia page.
To quit less, stop viewing the file, and return to the command line, press q
.
Jump to AmpCamp's Interactive Analysis to play with Wikipedia traffic data.
First get the MovieLens Dataset, "10 million ratings and 100,000 tag applications applied to 10,000 movies by 72,000 users"
wget http://files.grouplens.org/datasets/movielens/ml-10m.zip
unzip ml-10m.zip
Next move the ml-10m directory to HDFS.
ephemeral-hdfs/bin/hadoop fs -copyFromLocal ml-10M100K/ .
$ /root/spark/bin/pyspark
Loading the data
movie_data = sc.textFile('ml-10M100K/movies.dat')
ratings_data = sc.textFile('ml-10M100K/ratings.dat')
Let's answer some simpler questions first ...
# How many movies do we have?
movie_data.???
# How many ratings do we have?
ratings_data.????
# How many movies are tagged with Horror?
# First we need to retrieve the tags
tags = movie_data.map( lambda x : x.split("::")[-1] )
tags.filter( lambda tag: 'Horror' in tag )
# Did anything happen on that last command?
tags.filter( lambda tag: 'Horror' in tag ).count()
# How can I keep the movie names with the tags?
tags = movie_data.map( lambda x : (x.split("::")[1], x.split("::")[-1]) )
tags.filter( lambda (movie, tag): 'Horror' in tag ).take(20)
Exercise: Find all movies that contain horror and comedy
Exercise:
Using the IP example above, how can we count the number of movies per category?
#TODO(you)
HINT: You can't ... what's missing?
Spark also support a join
operation, but first we need to key our datasets on something to join on.
keyed_ratings = ratings_data.keyBy(lambda x: x.split("::")[1])
keyed_movies = movie_data.keyBy(lambda x: x.split('::')[0])
Then we can join our datasets. Joining two keyed datasets return a single keyed dataset where each row is a tuple
where the first value is the key
and the second value is another tuple.
In the second tuple the first value is the matched row from dataset 1 and the second is the matched row from dataset 2.
row = ( <key> , (dataset_1_row, dataset_2_row))
joined = keyed_movies.join(keyed_ratings) # This will take some time
We only care about a small amount of this data, the movie, the user, the category and rating. So let's create a simpler dataset that just stores that
def createMovieRow( joined_row ):
movie_row = joined_row[1][0]
ratings_row = joined_row[1][1]
title = movie_row.split("::")[1]
tags = movie_row.split("::")[-1]
user = ratings_row.split("::")[0]
rating = float(ratings_row.split("::")[2])
return (title, tags, user, rating)
data = joined.map(createMovieRow)
# How many reviews per movie
#TODO(you)
# How many reviews per tag
#TODO(you)
# How many average users per movie
#TODO(you)
# Compute the average rating per tag
tag_ratings = data.flatMap( lambda x: [(tag, x[-1]) for tag in x[1].split("|")] )
tag_ratings.groupBy(lambda x : x[0]).map( lambda (tag, ratings): (tag, sum(r[1] for r in ratings)/len(ratings))).collect()
Spark has a common library for machine learning functionality called mllib (http://spark.apache.org/docs/0.9.0/mllib-guide.html), however the functionality is still sparse. Additionally, there have been efforts to integrate scikits-learn and Spark (https://gist.github.com/MLnick/4707012), (https://github.com/ogrisel/spylearn)
Spark has some advantages for machine learning in that it can support multiple passes over a dataset in-memory, a vital operation for machine learning algorithms that require iteration.