Uniform Sampling - MatteoDJon/CloudProgrammingTonellotto GitHub Wiki
package it.unipi.hadoop;
import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
class UniformSampling {
public static class UniformMapper extends Mapper<Object, Text, IntWritable, Text> {
private static final IntWritable keyInt = new IntWritable();
public void map(final Object key, final Text value, final Context context)
throws IOException, InterruptedException {
int nStreams = context.getConfiguration().getInt("uniformSampling.nStreams", 1);
for (int i = 0; i < nStreams; ++i) {
keyInt.set(i);
context.write(keyInt, value);
}
}
}
public static class UniformCombiner extends Reducer<IntWritable, Text, IntWritable, Text> {
private static final Text p = new Text();
public void reduce(final IntWritable key, final Iterable<Text> value, final Context context)
throws IOException, InterruptedException {
String str = selectUniformSample(value);
p.set(str);
context.write(key, p);
}
}
public static class UniformReducer extends Reducer<IntWritable, Text, NullWritable, Text> {
private static final Text p = new Text();
public void reduce(final IntWritable key, final Iterable<Text> value, final Context context)
throws IOException, InterruptedException {
String str = selectUniformSample(value);
p.set(str);
context.write(NullWritable.get(), p);
}
}
private static String selectUniformSample(Iterable<Text> value) {
String str = null;
long k = 0;
for (Text t : value) {
++k;
double random = ThreadLocalRandom.current().nextDouble(0, 1);
if (random < ((double) 1 / k))
str = t.toString(); // returns a new string
}
return str;
}
static Job createUniformSamplingJob(Configuration conf, Path inputFile, Path outputDir, int nStreams)
throws IOException {
conf.setInt("uniformSampling.nStreams", nStreams);
Job job = Job.getInstance(conf, "uniformsampling");
FileInputFormat.addInputPath(job, inputFile);
FileOutputFormat.setOutputPath(job, outputDir);
job.setJarByClass(UniformSampling.class);
job.setMapperClass(UniformMapper.class);
job.setCombinerClass(UniformCombiner.class);
job.setReducerClass(UniformReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(nStreams);
return job;
}
}
The UniformSampling class has the purpose of returning a single point randomly and uniformly selected from all the points. The createUniformSamplingJob will be called k times by the .getRandomCentroids in the Driver, therefore obtaining all the k initial centroids.
The job consists in :
- Map operation: this operation will emit as many key-value pairs as the number of stream desired (expressed by the "uniformSampling.nStreams" parameter, in the driver is set equal to 1), having as key a value between 0 and numStream-1 and as value the string representation of the point passed as argument of the function;
- Combiner operation: the combiner will receive as input all the point representations, emitted with a key by the mapper, and therefore working on a single stream. It will randomly select through the .selectUniformSample just one of the points and emit it as output together with the input key;
- Reducer operation: the reducer will execute the same operation of the combiner, namely an uniform selection from all the point representations with the same key passed as argument, and emit a key-value pair whose key is just a NullWritable because the key doesn't matter in this case and as value the selected representation that from now on will be part of the initial centroids.
For brevity we consider only a number as value and not a point (the idea is the same)
Mapper 1 output: (0,1), (0,2), (0,3), (0,4), (0,5), ...
Mapper 2 output: (0,5), (0,6), (0,8), (0,9), (0,1), ...
Mapper 3 output: (0,4), (0,2), (0,1), (0,2), (0,3), ...
Combiner 1 output: (0,2)
Combiner 2 output: (0,8)
Combiner 3 output: (0,4)
Reducer output: (null, 4) -> new centroid