Combiner函數 - twilighthook/BigDataNote GitHub Wiki

Hadoop除了Mapper和Reducer之外也提供了一個Combiner函數,來讓MapReduce達到最佳化。 Combiner是介在Mapper和Reducer之間的函數,可以處理map的輸出,而combiner的輸出會變成reduce的輸入。 而其中Hadoop不保證會對combiner呼叫特定次數,有可能為0~N次。


  • 平時的MapReduce

假如第一個map產生輸出為:

(1950, 0)
(1950, 20)
(1950, 10)

第二個為:

(1950, 25)
(1950, 15)

reduce函數會用到map的所有值

(1950, [0, 20, 10, 25, 15])

最後輸出

(1950, max(0, 20, 10, 25, 15)) = (1950, 25)

  • 加入combiner的MapReduce

加入combiner的MapReduce會在合併前就做一次max,合併後再做一次max

(1950, max(0, 20, 10)) = (1950, 20)
(1950, max(25, 15)) = (1950, 25)
(1950, max(20, 25)) = (1950, 25)

combiner在某些情況下可以大大減少mapper和reducer之間的交換比對次數 而有些時候就不適用,例如算平均數的時候 這邊就來接續MapReduce的實作,來加入combiner試試

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DoTempMapReduce {
	
	public static void main(String[] args) throws Exception{
		if(args.length != 2) {
			System.out.println("");
			System.exit(-1);
		}
		
		Job job = Job.getInstance();
		job.setJarByClass(DoTempMapReduce.class);
		job.setJobName("Max Temperature");
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.setMapperClass(DoTempMapper.class);
		job.setCombinerClass(DoTempReducer.class);
		job.setReducerClass(DoTempReducer.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
		
	}

}

整體只多了一行

job.setCombinerClass(DoTempReducer.class);

之後再打包一次jar檔,上到Hadoop執行一次,可以看到結果是一樣的