Pig UDF - cchantra/bigdata.github.io GitHub Wiki

Pre-requisite : install python3 if you haven't.

download pig_util.py

wget -O pig_utils.py  https://raw.githubusercontent.com/cchantra/bigdata.github.io/master/pig/pig_util.py

Then move pig_utils.py to $PIG_HOME/src/python/streaming/pig_utils.py

where $PIG_HOME is your pig installation.

add the following to .bashrc

export PIG_HOME=/home/hadoop/pig
export PYTHONPATH=$PYTHONPATH:$PIG_HOME/src/python/streaming

then don't forget to

source .bashrc

mv pig_utils.py $PIG_HOME/src/python/streaming/pig_utils.py

create python file: my_udfs.py in folder myscripts

import sys
sys.path.append('/home/hadoop/pig/src/python/streaming')


from pig_utils import outputSchema

@outputSchema('word:chararray')
def hi_world(s1):
    return "hello world"+s1

note that the default pythonpath for pigudf is $PIG_HOME/src/python/streaming so we have to move the pig_utils.py there.

register python

Assume you have demo2.txt on hdfs

pig -x local

grunt>REGISTER '/home/hadoop/pig/myscripts/my_udfs.py' using jython as my_special_udfs

load data

** data must be in hdfs **

grunt>users = LOAD 'hdfs://localhost:9000/demo2.txt' AS (name: chararray);
grunt>dump users

Counters:
Total records written : 7
Total bytes written : 17267814
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_local1364112294_0003


2018-12-19 15:40:53,340 [main] WARN  org.apache.hadoop.metrics2.impl.MetricsSystemImpl - JobTracker metrics system already initialized!
2018-12-19 15:40:53,342 [main] WARN  org.apache.hadoop.metrics2.impl.MetricsSystemImpl - JobTracker metrics system already initialized!
2018-12-19 15:40:53,344 [main] WARN  org.apache.hadoop.metrics2.impl.MetricsSystemImpl - JobTracker metrics system already initialized!
2018-12-19 15:40:53,349 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2018-12-19 15:40:53,350 [main] WARN  org.apache.pig.data.SchemaTupleBackend - SchemaTupleBackend has already been initialized
2018-12-19 15:40:53,356 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input files to process : 1
2018-12-19 15:40:53,356 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
(chantana)
(anon)
(mudang)
(march)
(nack)
(boss)
(kris)

use udfs


grunt>REGISTER '/home/hadoop/pig/myscripts/my_udfs.py' using jython as my_special_udfs

2020-08-20 16:02:14,825 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
2020-08-20 16:02:14,839 [main] INFO  org.apache.pig.scripting.jython.JythonScriptEngine - created tmp python.cachedir=/tmp/pig_jython_8557535746916413601
2020-08-20 16:02:17,587 [main] WARN  org.apache.pig.scripting.jython.JythonScriptEngine - pig.cmd.args.remainders is empty. This is not expected unless on testing.
2020-08-20 16:02:17,893 [MainThread] INFO  org.apache.pig.scripting.jython.JythonScriptEngine - Register scripting UDF: my_special_udfs.hi_world

grunt> hello_users = FOREACH users GENERATE name, my_special_udfs.hi_world(name);
grunt> dump hello_users;

2018-12-19 20:43:16,409 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: UNKNOWN
2018-12-19 20:43:16,433 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
2018-12-19 20:43:16,434 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
....
Total Length = 42
Input split[0]:
   Length = 42
   ClassName: org.apache.hadoop.mapreduce.lib.input.FileSplit
   Locations:

....
2018-12-19 20:43:16,992 [LocalJobRunner Map Task Executor #0] INFO  org.apache.hadoop.mapred.Task - Final Counters for attempt_local1712236376_0004_m_000000_0: Counters: 15
File System Counters
FILE: Number of bytes read=1044
FILE: Number of bytes written=987813
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=7
Map output records=7
Input split bytes=376
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=0
Total committed heap usage (bytes)=416284672
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=0
....
HadoopVersion PigVersion UserId StartedAt FinishedAt Features
3.1.1 0.17.0 cchantra 2018-12-19 20:43:16 2018-12-19 20:43:17 UNKNOWN

Success!

Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTime AvgMapTime MedianMapTime MaxReduceTime MinReduceTime AvgReduceTime MedianReducetime Alias Feature Outputs
job_local1712236376_0004 1 0 n/a n/a n/a n/a 0 0 0 0 hello_users,users MAP_ONLY file:/tmp/temp168775303/tmp-1332874127,

Input(s):
Successfully read 7 records from: "/Users/admin/Desktop/big_data_code/demo2.txt"

Output(s):
Successfully stored 7 records in: "file:/tmp/temp168775303/tmp-1332874127"

Counters:
Total records written : 7
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

....

2018-12-19 20:43:17,184 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input files to process : 1
2018-12-19 20:43:17,185 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
(chantana,hello worldchantana)
(anon,hello worldanon)
(mudang,hello worldmudang)
(march,hello worldmarch)
(nack,hello worldnack)
(boss,hello worldboss)
(kris,hello worldkris)

References

(https://www.codementor.io/sheena/extending-hadoop-apache-pig-with-python-udfs-du107mj6t)