MapReduce教學 - twilighthook/BigDataNote GitHub Wiki

MapReduce主要分成兩部分,Mapper和Reducer

Mapper

Mapper主要的功能就是將資料做前置處理切割,把他變成我們所需要的前置資料 例如下面所示:

這是一組key-value的資料
(0,0067011990999991950051507004.....9999999N9+00001+9999999999....)
(106,0067011990999991950051507004.....9999999N9+00221+9999999999....)
(212,0067011990999991949051507004.....9999999N9-00111+9999999999....)
(318,0067011990999991949051507004.....9999999N9+01111+9999999999....)

我們要在裡面找到我們所需資料年份和溫度 在裡面value的部分有看到 ...1950...+0000...的地方我們將他每一筆的此部分都擷取下來會變成

(1950,0)
(1950,22)
(1949,-11)
(1949,111)

而map函數在進行reduce之前會先用key-value的方式分群

(1949,[-11,111])
(1950,[0,22])

在此教學開始前,可以先配置hadoop的開發環境或是下載Apache-Hadoop的jar檔匯入封包 之後就可以開始寫code了


在實作MapReduce前,我們需要一個測試文件 這是從NCDC抓的資料,可以從這邊抓關於氣象的資料 抓好之後就可以開始實作MapReduce


MapReduce

MapReduce的程式碼主要分成三個:Mapper、Redcuer、MapReduce 我們先來實作Mapper

public class DoTempMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

	private static final int MISSINGDATA = -9999;

	@Override
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

		String line = value.toString();
		String year = line.substring(68, 73);

		String aitTemperatureStr = line.substring(78, 83).trim();
		int airTemperature = MISSINGDATA;

		if (isInteger(aitTemperatureStr))
			airTemperature = Integer.parseInt(aitTemperatureStr);

		if (airTemperature != MISSINGDATA) {
			context.write(new Text(year), new IntWritable(airTemperature));
		}

	}

	public static boolean isInteger(String value) {
		Pattern pattern = Pattern.compile("^[-+]?\\d+$");
		return pattern.matcher(value).matches();
	}

}

Mapper的部分主要是在做資料取樣,取得我們要進行Map的資料


接下來是Reduce的部分,要將多個相同key值的資料作處理

public class DoTempReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	
	@Override
	public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
		
		int maxValue = Integer.MIN_VALUE;
		for(IntWritable value : values) {
			maxValue = Math.max(maxValue, value.get());
		}
		
		context.write(key, new IntWritable(maxValue));
	}
	
}

要在每一組key-value之中找最大值的


最後是將兩個合併

public class DoTempMapReduce {
	
	public static void main(String[] args) throws Exception{
		if(args.length != 2) {
			System.out.println("");
			System.exit(-1);
		}
		
		Job job = Job.getInstance();
		job.setJarByClass(DoTempMapReduce.class);
		job.setJobName("Max Temperature");
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.setMapperClass(DoTempMapper.class);
		job.setReducerClass(DoTempReducer.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
		
	}

}

這邊的arg[0]和arg[1]兩個參數是要在hadoop的指令下運行的,第一個是hdfs之下的input第二個則是output 在執行時需要把它包裝成jar檔,才能在hadoop環境下執行