Hadoop Map Reduce Python - cchantra/bigdata.github.io GitHub Wiki

Prerequisite: Install hadoop 3.2.1 under /home/hadoop/hadoop

Start your hadoop using : start-all.sh and jps to find out all 5 elements are started.

disable ipv6: https://itsfoss.com/disable-ipv6-ubuntu-linux/

Install python3 to your VM

preferred python 3.8.5

Resources guide:

https://phoenixnap.com/kb/how-to-install-python-3-ubuntu https://linuxize.com/post/how-to-install-python-3-8-on-ubuntu-18-04/

Create mapper.py and reducer.py in your home directory with your favorite editor: eg. nano , copy the following to your reducer.py

nano reducer.py
#!/usr/bin/env python3
"""reducer.py"""

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print ('%s\t%s' % (current_word, current_count))
        current_count = count
        current_word = word
# do not forget to output the last word if needed!

if current_word :
    print ('%s\t%s' % (current_word, current_count))

Then save it.

Create mapper.py

nano mapper.py

copy the following

#!/usr/bin/env python3
"""mapper.py"""

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print ('%s\t%s' % (word, 1))

Then save it. chmod the files:

chmod +x mapper.py reducer.py

obtain the test txt file.

 wget https://www.gutenberg.org/files/27045/27045.txt

copy the text file to hdfs

hadoop dfs -put 27045.txt /

check your file:

Test mapper.py

echo "foo foo quux labs foo bar quux" | ./mapper.py 

or

cat 27045.txt | ./mapper.py

you should get some results

https://www.gutenberg.org/GUTINDEX.ALL	1
***	1
END:	1
FULL	1
LICENSE	1
***	1

run map reduce with hadoop streaming check if you have the following to mapred-site.xml. If not , add them.

<property>
 <name>yarn.app.mapreduce.am.env</name>
 <value>HADOOP_MAPRED_HOME=$HADOOP_HOME</value>
</property>
<property>
 <name>mapreduce.map.env</name>
 <value>HADOOP_MAPRED_HOME=$HADOOP_HOME</value>
</property>
<property>
 <name>mapreduce.reduce.env</name>
 <value>HADOOP_MAPRED_HOME=$HADOOP_HOME</value>
</property>
 hadoop jar /home/hadoop/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar -file ./mapper.py    -mapper ./mapper.py -file ./reducer.py   -reducer ./reducer.py -input /27045.txt -output /gutenberg

The output shows


....
2021-08-28 22:49:38,193 INFO mapreduce.Job: Running job: job_1628560123673_0004
2021-08-28 22:49:45,325 INFO mapreduce.Job: Job job_1628560123673_0004 running in uber mode : false
2021-08-28 22:49:45,326 INFO mapreduce.Job:  map 0% reduce 0%
2021-08-28 22:49:50,402 INFO mapreduce.Job:  map 100% reduce 0%
2021-08-28 22:49:55,452 INFO mapreduce.Job:  map 100% reduce 100%
2021-08-28 22:49:56,466 INFO mapreduce.Job: Job job_1628560123673_0004 completed successfully
2021-08-28 22:49:56,588 INFO mapreduce.Job: Counters: 54
	File System Counters
		FILE: Number of bytes read=137805
		FILE: Number of bytes written=966713
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=90096
		HDFS: Number of bytes written=35474
		HDFS: Number of read operations=11
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
		HDFS: Number of bytes read erasure-coded=0
	Job Counters
		Launched map tasks=2
		Launched reduce tasks=1
		Data-local map tasks=2
		Total time spent by all maps in occupied slots (ms)=13798
		Total time spent by all reduces in occupied slots (ms)=11320
		Total time spent by all map tasks (ms)=6899
		Total time spent by all reduce tasks (ms)=2830
		Total vcore-milliseconds taken by all map tasks=6899
		Total vcore-milliseconds taken by all reduce tasks=2830
		Total megabyte-milliseconds taken by all map tasks=14129152
		Total megabyte-milliseconds taken by all reduce tasks=11591680
	Map-Reduce Framework
		Map input records=1695
		Map output records=13575
		Map output bytes=110649
		Map output materialized bytes=137811
		Input split bytes=166
		Combine input records=0
		Combine output records=0
		Reduce input groups=3587
		Reduce shuffle bytes=137811
		Reduce input records=13575
		Reduce output records=3587
		Spilled Records=27150
		Shuffled Maps =2
		Failed Shuffles=0
		Merged Map outputs=2
		GC time elapsed (ms)=278
		CPU time spent (ms)=2860
		Physical memory (bytes) snapshot=840380416
		Virtual memory (bytes) snapshot=9983832064
		Total committed heap usage (bytes)=752877568
		Peak Map Physical memory (bytes)=323481600
		Peak Map Virtual memory (bytes)=2769162240
		Peak Reduce Physical memory (bytes)=234467328
		Peak Reduce Virtual memory (bytes)=4445544448
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters
		Bytes Read=89930
	File Output Format Counters
		Bytes Written=35474
2021-08-28 22:49:56,588 INFO streaming.StreamJob: Output directory: /gutenberg

Monitor your task using web browser as mentioned in the pervious lab . (see previous lab ports) eg. my case http://10.3.133.231:8088/

Screen Shot 2568-01-20 at 09 09 29

If you want to run again, you should delete the output folder ,e.g. /gutenberg output.

hadoop dfs -rm -r -f  /gutenberg
 

More see: https://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

For some hadoop streaming option: https://hadoop.apache.org/docs/r1.2.1/streaming.html

Video demo: https://youtu.be/oMgUyDsjMmY

⚠️ **GitHub.com Fallback** ⚠️