Pig - cchantra/bigdata.github.io GitHub Wiki
(Source: http://www.rohitmenon.com/index.php/apache-pig-tutorial-part-1/)
Download Pig. We consider pig-0.17.0
wget https://downloads.apache.org/pig/pig-0.17.0/pig-0.17.0.tar.gz
Then untar
tar xvzf pig-0.17.0.tar.gz
Rename to folder for easier access:
mv pig-0.17.0 pig
Update .bashrc to add the following, using any editor: eg. vi
export PATH=$PATH:/home/hadoop/pig/bin
Then source it again.
source .bashrc
Get data
cd /home/hadoop/pig
mkdir myscripts
cd myscripts
wget https://raw.githubusercontent.com/cchantra/bigdata.github.io/master/pig/movies_data.csv
wget https://raw.githubusercontent.com/cchantra/bigdata.github.io/master/pig/movies_with_duplicates.csv
Pig can be started in one of the following two modes:
Local Mode
Cluster Mode
Using the ’-x local’ options starts pig in the local mode whereas executing the pig command without any options starts in Pig in the cluster mode. When in local mode, pig can access files on the local file system. In cluster mode, pig can access files on HDFS.
Note: Due to hadoop 3.2.1
you have to copy the guava from hadoop to /home/hadoop/pig/lib
cp ~/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar /home/hadoop/pig/lib
and remove the old version of guava in this pig/lib:
rm /home/hadoop/pig/lib/guava-11.0.jar
Execute the pig command in terminal:
To start in Local Mode:
pig -x local
15/04/03 00:19:56 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL
15/04/03 00:19:56 INFO pig.ExecTypeProvider: Picked LOCAL as the ExecType
2015-04-03 00:19:57,037 [main] INFO org.apache.pig.Main - Apache Pig version 0.14.0 (r1640057) compiled Nov 16 2014, 18:02:05 2015-04-03 00:19:57,037 [main] INFO org.apache.pig.Main - Logging error messages to: /home/hadoop/pig_1428034797036.log 2015-04-03 00:19:57,299 [main] INFO org.apache.pig.impl.util.Utils - Default bootup file /home/hadoop/.pigbootup not found 2015-04-03 00:19:57,680 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS 2015-04-03 00:19:57,681 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address 2015-04-03 00:19:57,682 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:/// 2015-04-03 00:19:58,715 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
grunt>
To start in Cluster Mode:
pig
15/04/03 00:21:06 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL
15/04/03 00:21:08 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE
15/04/03 00:21:08 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType
2015-04-03 00:21:08,549 [main] INFO org.apache.pig.Main - Apache Pig version 0.14.0 (r1640057) compiled Nov 16 2014, 18:02:05 2015-04-03 00:21:08,549 [main] INFO org.apache.pig.Main - Logging error messages to: /home/hadoop/pig_1428034868548.log 2015-04-03 00:21:08,704 [main] INFO org.apache.pig.impl.util.Utils - Default bootup file /home/hadoop/.pigbootup not found 2015-04-03 00:21:10,409 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address 2015-04-03 00:21:10,409 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS 2015-04-03 00:21:10,409 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:9000 2015-04-03 00:21:11,103 [main]
WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-04-03 00:21:13,530 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
grunt> quit;
You get a grunt shell. The grunt shell allows you to execute PigLatin statements to quickly test out data flows on your data step by step without having to execute complete scripts. Pig is now installed and we can go ahead and start using Pig to play with data.
let’s question the data. Before we start asking questions, we need the data to be accessible in Pig.
Download the data at https://github.com/rohitsden/pig-tutorial
Copy the data to hdfs
hdfs dfs -mkdir /pig_data
hdfs dfs -put /home/hadoop/pig/myscripts/*csv /pig_data
Note: please make sure yarn is up. Or you will get error
java.io.IOException: java.net.ConnectException: Your endpoint configuration is wrong;
(https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.6.5/bk_command-line-upgrade/content/start-yarn-mr-23.html)
Optional to store history
yarn-daemon.sh start timelineserver
mr-jobhistory-daemon.sh start historyserver
Use the following command to load the data: (from hdfs)
pig
you get the prompt
grunt>
type
movies = LOAD 'hdfs://localhost:9000/pig_data/movies_data.csv' USING PigStorage(',') as (id,name,year,rating,duration);
Assume '/pig_data/' is your location of the file.
The file is below: PIG-TUTORIAL-MASTER.ZIP on the Attach link at the bottom.
The above statement is made up of two parts. The part to the left of “=” is called the relation or alias. It looks like a variable but you should note that this is not a variable. When this statement is executed, no MapReduce task is executed.
Since our dataset has records with fields separated by a comma we use the keyword USING PigStorage(‘,’).
Another thing we have done in the above statement is giving the names to the fields using the ‘as’ keyword.
Now, let’s test to see if the alias has the data we loaded.
DUMP movies;
Once, you execute the above statement, you should see lot of text on the screen (partial text shown below).
2015-04-03 00:26:39,885 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: UNKNOWN 2015-04-03 00:26:39,943 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum 2015-04-03 00:26:39,944 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS 2015-04-03 00:26:39,944 [main] WARN org.apache.pig.data.SchemaTupleBackend - SchemaTupleBackend has already been initialized 2015-04-03 00:26:39,944 [main] INFO org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, ConstantCalculator, GroupByConstParallelSetter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, PartitionFilterOptimizer, PredicatePushdownOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter]} 2015-04-03 00:26:39,945 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false 2015-04-03 00:26:39,946 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1 2015-04-03 00:26:39,946 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1 2015-04-03 00:26:39,984 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum 2015-04-03 00:26:39,984 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS 2015-04-03 00:26:39,985 [main] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized 2015-04-03 00:26:39,986 [main] INFO org.apache.pig.tools.pigstats.mapreduce.MRScriptState - Pig script settings are added to the job................HadoopVersion PigVersion UserId StartedAt FinishedAt Features 2.6.0 0.14.0 hadoop 2015-04-03 00:33:12 2015-04-03 00:33:16 UNKNOWN Success! Job Stats (time in seconds): JobId Maps Reduces MaxMapTime MinMapTime AvgMapTime MedianMapTime MaxReduceTime MinReduceTime AvgReduceTime MedianReducetime Alias Feature Outputs job_local392346573_0001 1 0 n/a n/a n/a n/a 0 0 0 0 movies MAP_ONLY file:/tmp/temp1786048656/tmp1383833000, Input(s): Successfully read 6 records from: "/home/hadoop/pig/myscripts/movies_data.csv" Output(s): Successfully stored 6 records in: "file:/tmp/temp1786048656/tmp1383833000" Counters: Total records written : 6 Total bytes written : 0 Spillable Memory Manager spill count : 0 Total bags proactively spilled: 0 Total records proactively spilled: 0 Job DAG: job_local392346573_0001................(49582,Mumfie's White Christmas,1996,2.4,1350) (49583,Lady Gaga & The Muppets' Holiday Spectacular,2013,3.1,3496) (49584,Sunset Strip,2012,3.0,5770) (49585,Silver Bells,2013,3.5,5287) (49586,Winter Wonderland,2013,2.8,1812) (49587,Top Gear: Series 19: Africa Special,2013,,6822) (49588,Fireplace For Your Home: Crackling Fireplace with Music,2010,,3610) (49589,Kate Plus Ei8ht,2010,2.7,) (49590,Kate Plus Ei8ht: Season 1,2010,2.7,)
It is only after the DUMP statement that a MapReduce job is initiated. As we see our data in the output we can confirm that the data has been loaded successfully.
Now, since we have the data in Pig, let’s start with the query.
List the movies that having a rating greater than 4
at prompt
grunt>
type
movies_greater_than_four = FILTER movies BY (float)rating>4.0;
DUMP movies_greater_than_four;
The above statements filters the alias movies and store the results in a new alias movies_greater_than_four. The movies_greater_than_four alias will have only records of movies where the rating is greater than 4.
The DUMP command is only used to display information onto the standard output. If you need to store the data to a local file you can use the following command: (save to hdfs)
store movies_greater_than_four into ' /movies_greater_than_four';
More on Pig
Load command with data types.
We can modify the statement as follows to include the data type of the columns:
movies = LOAD 'hdfs://localhost:9000/pig_data/movies_data.csv' USING PigStorage(',') as (id:int,name:chararray,year:int,rating:double,duration:int);
In the above statement, name is chararray (string), rating is a double and fields id, year and duration are integers. If the data was loaded using the above statement we would not need to cast the column during filtering.
More queries
List the movies that were released between 1950 and 1960
movies_between_50_60 = FILTER movies by year>1950 and year<1960;
List the movies that start with the Alpahbet
movies_starting_with_A = FILTER movies by name matches 'A.*';
List the movies that have duration greater that 2 hours
movies_duration_2_hrs = FILTER movies by duration > 7200;
List the movies that have rating between 3 and 4
movies_rating_3_4 = FILTER movies BY rating>3.0 and rating<4.0;
DESCRIBE The schema of a relation/alias can be viewed using the DESCRIBE command:
DESCRIBE movies;
The output:
movies: {id: int,name: chararray,year: int,rating: double,duration: int}
Viewing step by step with ILLUSTRATE
To view the step-by-step execution of a sequence of statements you can use the ILLUSTRATE command:
ILLUSTRATE movies_duration_2_hrs;
------------------------------------------------------------------------------------------------------------ | movies | id:int | name:chararray | year:int | rating:double | duration:int | ------------------------------------------------------------------------------------------------------------ | | 8614 | Bodyguards and Assassins | 2009 | 3.7 | 8328 | | | 1211 | Air Bud Spikes Back | 2003 | 3.5 | 5243 | ------------------------------------------------------------------------------------------------------------ --------------------------------------------------------------------------------------------------------------------------- | movies_duration_2_hrs | id:int | name:chararray | year:int | rating:double | duration:int | --------------------------------------------------------------------------------------------------------------------------- | | 8614 | Bodyguards and Assassins | 2009 | 3.7 | 8328 | ---------------------------------------------------------------------------------------------------------------------------
DESCRIBE and ILLUSTRATE are really useful for debugging.
Pig supports three different complex types to handle data. It is important that you understand these types properly as they will be used very often when working with data.
- Tuples
A tuple is just like a row in a table. It is comma separated list of fields.
(49539,'The Magic Crystal',2013,3.7,4561)
The above tuple has five fields. A tuple is surrounded by brackets.
- Bags
A bag is an unordered collection of tuples.
{ (49382, 'Final Offer'), (49385, 'Delete') }
The above bag is has two tuples. Each tuple has two fields, Id and movie name.
- Maps
A map is a <key, value> store. The key and value are joined together using #.
['name'#'The Magic Crystal', 'year'#2013]
The above map has two keys and name and year and have values ‘The Magic Crystal’ and 2013. The first value is a chararray and the second one is an integer.
We will be using the above complex type quite often in our future examples.
- FOREACH
FOREACH gives a simple way to apply transformations based on columns. Let’s understand this with an example.
List the movie names its duration in minutes
movie_duration = FOREACH movies GENERATE name, (double)(duration/60);
The above statement generates a new alias that has the list of movies and it duration in minutes.
You can check the results using the DUMP command.
- GROUP
The GROUP keyword is used to group fields in a relation.
List the years and the number of movies released each year.
grouped_by_year = group movies by year;
count_by_year = FOREACH grouped_by_year GENERATE group, COUNT(movies);
You can check the result by dumping the count_by_year relation on the screen.
We know in advance that the total number of movies in the data set is 49590. We can check to see if our GROUP operation is correct by verify the total of the COUNT field. If the sum of the count field is 49590 we can be confident that our grouping has worked correctly.
sum_all = FOREACH group_all GENERATE SUM(count_by_year.$1);
DUMP sum_all;
From the above three statements, the first statement, GROUP ALL, groups all the tuples to one group. This is very useful when we need to perform aggregation operations on the entire set.
The next statement, performs a FOREACH on the grouped relation group_all and applies the SUM function to the field in position 1 (positions start from 0). Here field in position 1, are the counts of movies for each year. One execution of the DUMP statement the MapReduce program kicks off and gives us the following result:
(49590)
The above value matches to our know fact that the dataset has 49590 movies. So we can conclude that our GROUP operation worked successfully.
- ORDER BY
Let us question the data to illustrate the ORDER BY operation.
List all the movies in the ascending order of year.
desc_movies_by_year = ORDER movies BY year ASC;
DUMP desc_movies_by_year;
List all the movies in the descending order of year.
asc_movies_by_year = ORDER movies by year DESC;
DUMP asc_movies_by_year;
- DISTINCT
The DISTINCT statement is used to remove duplicated records. It works only on entire records, not on individual fields.
Let’s illustrate this with an example:
movies_with_dups = LOAD '/pig_data/movies_with_duplicates.csv' USING PigStorage(',') as (id:int,name:chararray,year:int,rating:double,duration:int);
DUMP movies_with_dups;
(1,The Nightmare Before Christmas,1993,3.9,4568)
(1,The Nightmare Before Christmas,1993,3.9,4568)
(1,The Nightmare Before Christmas,1993,3.9,4568)
2,The Mummy,1932,3.5,4388)
(3,Orphans of the Storm,1921,3.2,9062)
(4,The Object of Beauty,1991,2.8,6150)
(5,Night Tide,1963,2.8,5126)
(5,Night Tide,1963,2.8,5126)
(5,Night Tide,1963,2.8,5126)
(6,One Magic Christmas,1985,3.8,5333)
(7,Muriel's Wedding,1994,3.5,6323)
(8,Mother's Boys,1994,3.4,5733)
(9,Nosferatu: Original Version,1929,3.5,5651)
(10,Nick of Time,1995,3.4,5333)
(9,Nosferatu: Original Version,1929,3.5,5651)
You see that there are are duplicates in this data set. Now let us list the distinct records present movies_with_dups:
DUMP no_dups;
(1,The Nightmare Before Christmas,1993,3.9,4568)
(2,The Mummy,1932,3.5,4388)
(3,Orphans of the Storm,1921,3.2,9062)
(4,The Object of Beauty,1991,2.8,6150)
(5,Night Tide,1963,2.8,5126)
(6,One Magic Christmas,1985,3.8,5333)
(7,Muriel's Wedding,1994,3.5,6323)
(8,Mother's Boys,1994,3.4,5733)
(9,Nosferatu: Original Version,1929,3.5,5651)
(10,Nick of Time,1995,3.4,5333)
- LIMIT
Use the LIMIT keyword to get only a limited number for results from relation.
DUMP top_10_movies;
(1,The Nightmare Before Christmas,1993,3.9,4568)
(2,The Mummy,1932,3.5,4388)
(3,Orphans of the Storm,1921,3.2,9062)
(4,The Object of Beauty,1991,2.8,6150)
(5,Night Tide,1963,2.8,5126)
(6,One Magic Christmas,1985,3.8,5333)
(7,Muriel's Wedding,1994,3.5,6323)
(8,Mother's Boys,1994,3.4,5733)
(9,Nosferatu: Original Version,1929,3.5,5651)
(10,Nick of Time,1995,3.4,5333)
- SAMPLE
Use the sample keyword to get sample set from your data.
DUMP sample_10_percent;
Here, 0.1 = 10%
As we already know that the file has 49590 records. We can check to see the count of records in the relation.
sample_group_all = GROUP sample_10_percent ALL;
sample_count = FOREACH sample_group_all GENERATE COUNT(sample_10_percent.$0);
DUMP sample_count;
The output is (4930) which is approximately 10% for 49590.
(4930)
- Your Test
use 4300.txt
(https://github.com/haozeng/exercises/blob/master/bridgewater.txt)
Given an arbitrary text document written in English, write a program that will generate a concordance, i.e. an alphabetical list of all word occurrences, labeled with word frequencies. Bonus: label each word with the sentence numbers in which each occurrence appeared.
a {2:1,1}
all {1:1}
alphabetical {1:1}
an {2:1,1}
appeared {1:2}
arbitrary {1:1}
bonus {1:2}
concordance {1:1}
document {1:1}
each {2:2,2}
english {1:1}
frequencies {1:1}
generate {1:1}
given {1:1}
i.e. {1:1}
in {2:1,2}
label {1:2}
preprocess:
// lowercase the input, split each sentences into a line, and remove punctuation marks
perl -pe '$_=lc($_)' 4300.txt | perl -pe 's/(?<=[.?!])(?<![a-z]\.[a-z]\.)\s+(?=[a-z])/\n/g' | perl -pe 's/[,!.:]//g' 4300.txt > 4300-preproc.txt
hdfs dfs -mkdir /dfs
hdfs dfs -put 4300-preproc.txt /dfs
pig script
A = load 'hdfs://localhost:9000/dfs/4300-preproc.txt' using PigStorage('\n') AS chararray;
B = rank A;
C = foreach B generate flatten(TOKENIZE((chararray)$1)) as word, $0;
D = order (group C by word) by $0;
E = foreach D generate $0, COUNT($1), $1.rank_A;
STORE E INTO '/dfs/result.txt' USING PigStorage( );
https://pig.apache.org/docs/latest/basic.html
https://www.youtube.com/watch?v=1IUTnaMporU
https://pig.apache.org/docs/r0.7.0/setup.html
http://www.rohitmenon.com/index.php/apache-pig-tutorial-part-1/
http://www.rohitmenon.com/index.php/apache-pig-tutorial-part-2/