Commit 0d382f44 authored by desrouss's avatar desrouss

exo 2

parent f8b014cd
This diff is collapsed.
package fr.eurecom.dsg.mapreduce;
/**
* Created by desrouss on 10/29/15.
*/
public class Pair {
}
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Pair extends Configured implements Tool {
public static class PairMapper
extends Mapper<LongWritable, Text, TextPair, LongWritable> {
@Override
protected void map(LongWritable key,
Text value,
Context context) throws IOException, InterruptedException {
StringTokenizer lineTokenizer = new StringTokenizer(value.toString(), ".?!");
while (lineTokenizer.hasMoreElements()) {
String line = lineTokenizer.nextToken();
line = line.trim();
StringTokenizer tokenizer = new StringTokenizer(line, " ");
List<String> wordList = new ArrayList<String>();
while (tokenizer.hasMoreElements()) {
String word = tokenizer.nextToken();
word = word.toLowerCase();
wordList.add(word);
}
LongWritable one = new LongWritable(1);
for (String word : wordList) {
for (String pairWord : wordList) {
if (!word.equals(pairWord)) {
TextPair pair = new TextPair(word, pairWord);
context.write(pair, one);
}
}
}
}
}
}
public static class PairReducer
extends Reducer<TextPair, // TODO: change Object to input key type
LongWritable, // TODO: change Object to input value type
TextPair, // TODO: change Object to output key type
LongWritable> { // TODO: change Object to output value type
@Override
protected void reduce(TextPair pair,
Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (LongWritable currentInt : values) {
sum += currentInt.get();
}
context.write(pair, new LongWritable(sum));
}
}
private final int numReducers;
private final Path inputPath;
private final Path outputDir;
public Pair(String[] args) {
if (args.length != 3) {
System.out.println("Usage: Pair <num_reducers> <input_path> <output_path>");
System.exit(0);
}
this.numReducers = Integer.parseInt(args[0]);
this.inputPath = new Path(args[1]);
this.outputDir = new Path(args[2]);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job = new Job(conf); // TODO: define new job instead of null using conf e setting a name
job.setJobName("Pair");
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(PairMapper.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(PairReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextInputFormat.setInputPaths(job, inputPath);
TextOutputFormat.setOutputPath(job, outputDir);
job.setNumReduceTasks(numReducers);
job.setJarByClass(Pair.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Pair(args), args);
System.exit(res);
}
}
package fr.eurecom.dsg.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
/**
* Created by desrouss on 10/29/15.
* TextPair is a Pair of Text that is Writable (Hadoop serialization API)
* and Comparable to itself.
*
*/
public class TextPair {
}
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second; // TODO: add the pair objects as TextPair fields
public void set(Text first, Text second) {
this.first = first;
this.second = second;// TODO: implement the set method that changes the Pair content
}
public Text getFirst() {
return first ;
}
public Text getSecond() {
return second;
}
public TextPair() {
}
public TextPair(String first, String second) {
this.set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
this.set(first, second);
}
@Override
public void write(DataOutput out) throws IOException {
this.first.write(out);
this.second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
this.first = new Text();
this.first.readFields(in);
this.second = new Text();
this.second.readFields(in);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((first == null) ? 0 : first.hashCode());
result = prime * result + ((second == null) ? 0 : second.hashCode());
return result;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null)
return false;
if (getClass() != o.getClass())
return false;
TextPair other = (TextPair) o;
if (first == null) {
if (other.first != null)
return false;
} else if (!first.equals(other.first))
return false;
if (second == null) {
if (other.second != null)
return false;
} else if (!second.equals(other.second))
return false;
return true;
}
@Override
public int compareTo(TextPair tp) {
// TODO: implement the comparison between this and tp
if (first.toString().equals(tp.first.toString())) {
return second.toString().compareTo(tp.second.toString());
} else {
return first.toString().compareTo(tp.first.toString());
}
}
@Override
public String toString() {
return String.format("%s-%s",
first, second);
}
// DO NOT TOUCH THE CODE BELOW
/** Compare two pairs based on their values */
public static class Comparator extends WritableComparator {
/** Reference to standard Hadoop Text comparator */
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public Comparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
if (cmp != 0) {
return cmp;
}
return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,
b2, s2 + firstL2, l2 - firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}
static {
WritableComparator.define(TextPair.class, new Comparator());
}
/** Compare just the first element of the Pair */
public static class FirstComparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public FirstComparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@SuppressWarnings("unchecked")
@Override
public int compare(WritableComparable a, WritableComparable b) {
if (a instanceof TextPair && b instanceof TextPair) {
return ((TextPair) a).getFirst().compareTo(((TextPair) b).getFirst());
}
return super.compare(a, b);
}
}
}
\ No newline at end of file
No preview for this file type
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment