diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 0e4d86f30d4880c9b17ca933be6afdb1a854c238..54ffc2fae8ad45ccf25dd4cd87b97e3db009b55d 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -2,9 +2,8 @@ - - + @@ -36,42 +35,18 @@ - - + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + @@ -91,6 +66,8 @@ @@ -136,10 +113,10 @@ - @@ -164,8 +141,8 @@ - + @@ -407,19 +384,6 @@ - - - @@ -448,6 +412,19 @@ + + + - - + + - + @@ -549,15 +526,15 @@ - + - - + + @@ -594,6 +571,52 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -654,9 +677,6 @@ - - - @@ -754,9 +774,6 @@ - - - @@ -770,9 +787,17 @@ + + + + + + + + - + @@ -780,27 +805,41 @@ - - + + - - + + - + - - + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/main/java/fr/eurecom/dsg/mapreduce/Pair.java b/src/main/java/fr/eurecom/dsg/mapreduce/Pair.java index bda9e918b4f3cee15af4ad1981ff0260912aa948..fcf61b6ca2495ad5da9540ff24495f94541b0535 100644 --- a/src/main/java/fr/eurecom/dsg/mapreduce/Pair.java +++ b/src/main/java/fr/eurecom/dsg/mapreduce/Pair.java @@ -94,7 +94,7 @@ public class Pair extends Configured implements Tool { // TODO: set job input format Configuration conf = this.getConf(); - Job job = new Job(conf,"Word Count"); + Job job = new Job(conf,"PAIR"); job.setInputFormatClass(TextInputFormat.class); diff --git a/src/main/java/fr/eurecom/dsg/mapreduce/StringToIntMapWritable.java b/src/main/java/fr/eurecom/dsg/mapreduce/StringToIntMapWritable.java new file mode 100644 index 0000000000000000000000000000000000000000..8bbb272c87b4e71324e3f99f447f83728547b122 --- /dev/null +++ b/src/main/java/fr/eurecom/dsg/mapreduce/StringToIntMapWritable.java @@ -0,0 +1,91 @@ +package fr.eurecom.dsg.mapreduce; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.StringTokenizer; +import java.util.TreeSet; + + +import org.apache.hadoop.io.Writable; +/* + * Very simple (and scholastic) implementation of a Writable associative array for String to Int + * + **/ +public class StringToIntMapWritable implements Writable { + + // TODO: add an internal field that is the real associative array + //private HashMap hm = new HashMap(); + public final Map counts; + + public StringToIntMapWritable(Map counts) { + this.counts = counts; + } + + public StringToIntMapWritable(){ + this(new HashMap()); + } + + @Override + public int hashCode(){ + final int prime=31; + int result =1; + result= prime * result + ((counts==null)? 0: counts.hashCode()); + return result; + } + @Override + public boolean equals(Object obj){ + if (this==obj) + return true; + if (obj==null) + return false; + if (getClass()!=obj.getClass()) + return false; + StringToIntMapWritable other =(StringToIntMapWritable) obj; + if (counts==null){ + if (other.counts!=null) + return false; + } else if (!counts.equals(other.counts)) + return false; + return true; + } + + @Override + public void readFields(DataInput in) throws IOException { + + // TODO: implement deserialization + counts.clear(); + String inLine=in.readLine(); + if (inLine!=null){ + StringTokenizer tokenizer = new StringTokenizer(inLine," "); + while (tokenizer.hasMoreElements()){ + String occurence = tokenizer.nextToken(); + String occurenceWord = occurence.substring(0,occurence.lastIndexOf("-")); + int occurenceCount = Integer.parseInt(occurence.substring(occurence.lastIndexOf("-")+1)); + counts.put(occurenceWord,occurenceCount); + } + } + } + + + // Warning: for efficiency reasons, Hadoop attempts to re-use old instances of + // StringToIntMapWritable when reading new records. Remember to initialize your variables + // inside this function, in order to get rid of old data. + @Override + public void write(DataOutput out) throws IOException { + // TODO: implement serialization + for (String s : counts.keySet()) { + out.write((s + "-" + counts.get(s) + " ").getBytes()); + } + } + @Override + public String toString(){ + StringBuffer s = new StringBuffer(); + for (String key: new TreeSet(counts.keySet())){ + s.append((key+"-"+counts.get(key)+" ")); + } + return s.toString(); + + } +} diff --git a/src/main/java/fr/eurecom/dsg/mapreduce/Stripes.java b/src/main/java/fr/eurecom/dsg/mapreduce/Stripes.java index 7d1392bca5633956c77bcc66fe13c8b6ad3348a6..516b1446cec668401dabc5ef471f764245b4d0eb 100644 --- a/src/main/java/fr/eurecom/dsg/mapreduce/Stripes.java +++ b/src/main/java/fr/eurecom/dsg/mapreduce/Stripes.java @@ -5,9 +5,16 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -22,16 +29,28 @@ public class Stripes extends Configured implements Tool { public int run(String[] args) throws Exception { Configuration conf = this.getConf(); - Job job = null; // TODO: define new job instead of null using conf e setting a name - - // TODO: set job input format + //Job job = null; // TODO: define new job instead of null using conf e setting a name + Job job = new Job(conf,"STRIPES"); + // TODO: set job input format + job.setInputFormatClass(TextInputFormat.class); // TODO: set map class and the map output key and value classes + job.setMapperClass(StripesMapper.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); // TODO: set reduce class and the reduce output key and value classes + job.setReducerClass(StripesReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); // TODO: set job output format + job.setOutputFormatClass(TextOutputFormat.class); // TODO: add the input file as job input (from HDFS) to the variable inputFile + FileInputFormat.addInputPath(job, this.inputPath); // TODO: set the output path for the job results (to HDFS) to the variable outputPath + FileOutputFormat.setOutputPath(job, this.outputDir); // TODO: set the number of reducers using variable numberReducers + job.setNumReduceTasks(this.numReducers); // TODO: set the jar class + job.setJarByClass(WordCount.class); return job.waitForCompletion(true) ? 0 : 1; } @@ -53,29 +72,41 @@ public class Stripes extends Configured implements Tool { } class StripesMapper -extends Mapper { // TODO: change Object to output value type +extends Mapper { // TODO: change Object to output value type @Override - public void map(Object key, // TODO: change Object to input key type - Object value, // TODO: change Object to input value type + public void map(LongWritable key, // TODO: change Object to input key type + Text value, // TODO: change Object to input value type Context context) throws java.io.IOException, InterruptedException { - - // TODO: implement map method + String line = this.toString(); + line = line.replaceAll("[^a-zA-Z0-9_]+", " "); + line = line.replaceAll("^\\s+", ""); + String[] words = line.split("\\s+"); + for (int i = 0; i < words.length - 1; i++) { + String first = words[i]; + for (int j = 0; j < words.length - 1; j++) { + if (i != j) { + String second = words[i + 1]; + context.write(new TextPair(first, second), new IntWritable(1)); + } + } + } + // TODO: implement map method } } class StripesReducer -extends Reducer { // TODO: change Object to output value type +extends Reducer { // TODO: change Object to output value type @Override - public void reduce(Object key, // TODO: change Object to input key type - Iterable values, // TODO: change Object to input value type + public void reduce(TextPair key, // TODO: change Object to input key type + Iterable values, // TODO: change Object to input value type Context context) throws IOException, InterruptedException { // TODO: implement the reduce method