Reducer - MatteoDJon/CloudProgrammingTonellotto GitHub Wiki
Code
public static class KMeansReducer extends Reducer<IntWritable, WritableWrapper, IntWritable, WritableWrapper> {
private IntWritable outputKey = new IntWritable(); // reuse
private WritableWrapper outputValue = new WritableWrapper(); // reuse
private int d;
public void setup(Context context) throws IOException, InterruptedException {
this.d = context.getConfiguration().getInt("kmeans.d", 7);
}
public void reduce(IntWritable key, Iterable<WritableWrapper> values, Context context)
throws IOException, InterruptedException {
int totCount = 0;
Point sum = new Point(d); // init with 0, 0, ..., 0
for (WritableWrapper wrapper : values) {
Point p = wrapper.getPoint();
sum.sum(p);
totCount += wrapper.getCount();
}
// compute mean point
// sum.divide(totCount);
// emit <clusterId, (sum point, count)>
outputKey = key;
outputValue.setPoint(sum).setCount(totCount);
context.write(outputKey, outputValue);
}
}
Explanation
Setup
The Setup function in the Reducer only have to read the point dimension from the context.
Reduce
The Reduce function will receive as an input an Iterable<WritableWrapper> which references all the points assigned by the Mapper to the IntWritable key (which represents the cluster id). The function will create a default point, with all the components to 0, and by iterating over the list it will sum, point by point and count by count, all the points assigned to the cluster, obtaining at the end a point with the components and count, which are a sum of the all others. Such point will be then emitted as it is as the value of the key-value pair. The task of calling the function on the output point to obtain the average of the components and therefore the new centroid is left to the driver.