Commit c88246c1 authored by dangq's avatar dangq

Order Inversion

parent 0eed8a42
package fr.eurecom.dsg.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class OrderInversion extends Configured implements Tool {
private final static String ASTERISK = "\0";
private final static Text ASTERISK_TEXT = new Text(ASTERISK);
private int numReducers;
private Path inputPath;
private Path outputDir;
public static class PartitionerTextPair extends
Partitioner<TextPair, IntWritable> {
@Override
public int getPartition(TextPair key, IntWritable value,
int numPartitions) {
// send Output of mapper to Reducer based on first text of key
return (key.getFirst().hashCode() & Integer.MAX_VALUE)
% numPartitions;
}
/**
* toUnsigned(10) = 10 toUnsigned(-1) = 2147483647
*
* @param val
* Value to convert
* @return the unsigned number with the same bits of val
* */
public static int toUnsigned(int val) {
return val & Integer.MAX_VALUE;
}
}
public static class PairMapper extends
Mapper<LongWritable, Text, TextPair, IntWritable> {
@Override
public void map(LongWritable key, Text value, Context context)
throws java.io.IOException, InterruptedException {
String line = value.toString();
line = line.replaceAll("[^a-zA-Z0-9_]+", " ");
line = line.replaceAll("^\\s+", "");
String[] words = line.split("\\s+");
for (int i = 0; i < words.length - 1; i++) {
String first = words[i];
String second = words[i + 1];
context.write(new TextPair(first, second), new IntWritable(1));
context.write(new TextPair(first, ASTERISK), new IntWritable(1));
}
}
}
public static class PairReducer extends
Reducer<TextPair, IntWritable, TextPair, DoubleWritable> {
int count;
int occurence;
@Override
protected void reduce(TextPair key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
if (key.getSecond().compareTo(ASTERISK_TEXT) == 0) {
occurence = 0;
for (IntWritable value : values) {
occurence += value.get();
}
} else {
count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new DoubleWritable((double) count
/ occurence));
}
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf, "OrderInversion");
job.setJarByClass(Pair.class);
job.setMapperClass(PairMapper.class);
job.setReducerClass(PairReducer.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(TextPair.class);
job.setOutputValueClass(DoubleWritable.class);
TextInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputFormatClass(TextOutputFormat.class);
job.setPartitionerClass(PartitionerTextPair.class);
// job.setGroupingComparatorClass(GroupComparator.class);
job.setSortComparatorClass(TextPair.Comparator.class);
job.setNumReduceTasks(numReducers);
return job.waitForCompletion(true) ? 0 : 1;
}
public OrderInversion(String[] args) {
if (args.length != 3) {
System.out
.println("Usage: OrderInversion <num_reducers> <input_file> <output_dir>");
System.exit(0);
}
this.numReducers = Integer.parseInt(args[0]);
this.inputPath = new Path(args[1]);
this.outputDir = new Path(args[2]);
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new OrderInversion(args),
args);
System.exit(res);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment