Mapper - MatteoDJon/CloudProgrammingTonellotto GitHub Wiki
public static class KMeansMapper extends Mapper<Object, Text, IntWritable, WritableWrapper> {
private int d;
private int k;
private List<Point> centroids;
private IntWritable outputKey = new IntWritable(); // reuse
private WritableWrapper outputValue = new WritableWrapper(); // reuse
public void setup(Context context) throws IOException, InterruptedException {
this.d = context.getConfiguration().getInt("kmeans.d", 7);
this.k = context.getConfiguration().getInt("kmeans.k", 13);
String outputName = context.getConfiguration().get("kmeans.output", "centroids.txt");
centroids = new ArrayList<>(k);
Path cache = new Path(outputName);
FileSystem fs = cache.getFileSystem(context.getConfiguration());
// read centroids from cache
try (FSDataInputStream stream = fs.open(cache);
BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
// read all lines from cache, use as centroids
String line = null;
while ((line = reader.readLine()) != null) {
Point newPoint = Point.parseString(line);
if (newPoint.getDimension() != this.d) {
System.err.println("Wrong centroids dimension");
System.exit(1);
}
centroids.add(newPoint);
}
} catch (IOException | ParseException e) {
// TODO handle exception
}
// check if centroids are not k
if (centroids.size() != this.k) {
System.err.println("Wrong centroids number");
System.exit(1);
}
}
public void map(final Object key, final Text value, final Context context)
throws IOException, InterruptedException {
try {
Point point = Point.parseString(value.toString());
// ignore, i.e. do not emit, points with dimension different from d
if (point.getDimension() != d)
return;
int nearestClusterId = -1;
double minDistance = -1d;
// find the nearest centroid
int id = 0;
for (Point centroid : centroids) {
double distance = point.computeSquaredDistance(centroid);
if (minDistance == -1 || distance < minDistance) {
minDistance = distance;
nearestClusterId = id;
}
id++;
}
// emit <clusterId, point>
outputKey.set(nearestClusterId);
outputValue.setPoint(point).setCount(1);
context.write(outputKey, outputValue);
} catch (ParseException e) {
// ignore malformed points
// TODO print something
}
}
}
Setup
In the Setup function the Mapper will read all the necessary informations in order to be able to properly execute the map logic. First, it will ready from the context the values d, the established dimension of all the points, k, the number of cluster the application has to find, and the outputName, the path where the cache, containing a file with a list of the actual centroids, has been allocated. Then it will open the cache and read the file line by line allocating for each one of them an instance of the class Point, collecting them all together into a list for later use.
Map
The goal of the Map function is to assign each point to the cluster whose centroid has the minimal distance to the point.
The Map function will receive as an input a line containing all the values forming a point and by using such string representation it will create an instance of the class Point.
Then, for each centroid contained in the list created and filled in the Setup, it will compute the distance between the point and the centroid: if such distance is less then the shortest distance computed so far, the minimum distance and the actual assigment to a cluster will be updated.
At the end, the mapper will emit a key-value pair, whose key is an IntWritable which contains the clusterId to which the point has been assigned, and as value the Point with the count setted to 1.