Commit 8b41320a authored by YUSHIQIAN's avatar YUSHIQIAN

modified cooccurence

parent 4701db3c
package fr.eurecom.dsg.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
......@@ -23,53 +21,57 @@ import org.apache.hadoop.util.ToolRunner;
public class Pair extends Configured implements Tool {
public static class PairMapper
extends Mapper<LongWritable, Text, TextPair, IntWritable> {
private final int numReducers;
private final Path inputPath;
private final Path outputDir;
private final static IntWritable one = new IntWritable(1);
private static Text word0 = new Text();
private static Text word1 = new Text();
private String pattern = "[^a-zA-Z0-9-']";
public static class PairMapper
extends Mapper<LongWritable, Text, TextPair, LongWritable> {
@Override
public void map(LongWritable inKey, Text inValue, Context context)throws IOException, InterruptedException {
String line = inValue.toString();
//line = line.replaceAll(pattern, " ");
//line = line.toLowerCase()
//String[] str = line.split(" +");
String[] str = line.split("\\s+");
for(int i=0; i< str.length-1; i++)
{
word0.set(str[i]);
word1.set(str[i+1]);
TextPair pair = new TextPair(word0, word1);
context.write(pair, one);
protected void map(LongWritable key,
Text value,
Context context) throws IOException, InterruptedException {
StringTokenizer lineTokenizer = new StringTokenizer(value.toString(), ".?!");
while (lineTokenizer.hasMoreElements()) {
String line = lineTokenizer.nextToken();
line = line.trim();
StringTokenizer tokenizer = new StringTokenizer(line, " ");
List<String> wordList = new ArrayList<String>();
while (tokenizer.hasMoreElements()) {
String word = tokenizer.nextToken();
word = word.toLowerCase().replaceAll("[\\(\\),;\\.:\"\'“”—’]", "");
wordList.add(word);
}
LongWritable one = new LongWritable(1);
for (String word: wordList) {
for (String pairWord: wordList) {
if (!word.equals(pairWord)){
TextPair pair = new TextPair(word, pairWord);
context.write(pair, one);
}
}
}
}
}
}
public static class PairReducer
extends Reducer<TextPair, IntWritable, TextPair, IntWritable> {
private IntWritable result = new IntWritable();
extends Reducer<TextPair, LongWritable, TextPair, LongWritable> {
@Override
public void reduce(TextPair inKey, Iterable<IntWritable> inValues, Context context) throws IOException, InterruptedException {
protected void reduce(TextPair pair,
Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : inValues) {
sum += val.get();
for (LongWritable currentInt: values) {
sum += currentInt.get();
}
result.set(sum);
context.write(inKey, result);
context.write(pair, new LongWritable(sum));
}
}
private int numReducers;
private Path inputPath;
private Path outputDir;
public Pair(String[] args) {
if (args.length != 3) {
System.out.println("Usage: Pair <num_reducers> <input_path> <output_path>");
......@@ -83,39 +85,24 @@ public class Pair extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job = new Job(conf,"Word co-occurrence");
// TODO: set job input format
// TODO: set map class and the map output key and value classes
// TODO: set reduce class and the reduce output key and value classes
// TODO: set job output format
// TODO: add the input file as job input (from HDFS) to the variable inputFile
// TODO: set the output path for the job results (to HDFS) to the variable outputPath
// TODO: set the number of reducers using variable numberReducers
// TODO: set the jar class
Job job = new Job(conf);
job.setJobName("Pair");
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(PairMapper.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(IntWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(PairReducer.class);
job.setOutputKeyClass(TextPair.class);
job.setOutputValueClass(IntWritable.class);
job.setReducerClass(PairReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setNumReduceTasks(Integer.parseInt(args[0]));
TextInputFormat.setInputPaths(job, inputPath);
TextOutputFormat.setOutputPath(job, outputDir);
job.setNumReduceTasks(numReducers);
job.setJarByClass(Pair.class);
......
......@@ -19,11 +19,10 @@ public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
private String deli = ",";
public void set(Text first, Text second) {
this.first.set(first);
this.second.set(second);
this.first = first;
this.second = second;
}
public Text getFirst() {
......@@ -35,13 +34,10 @@ public class TextPair implements WritableComparable<TextPair> {
}
public TextPair() {
first = new Text();
second = new Text();
// for deserialization
}
public TextPair(String first, String second) {
this.set(new Text(first), new Text(second));
this(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
......@@ -50,55 +46,71 @@ public class TextPair implements WritableComparable<TextPair> {
@Override
public void write(DataOutput out) throws IOException {
// can be deserializated in future. This will be use to write to HDFS
first.write(out);
second.write(out);
this.first.write(out);
this.second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
this.first = new Text();
this.first.readFields(in);
this.second = new Text();
this.second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
final int prime = 31;
int result = 1;
result = prime * result + ((first == null) ? 0 : first.hashCode());
result = prime * result + ((second == null) ? 0 : second.hashCode());
return result;
}
@Override
public boolean equals(Object o) {
boolean isEqual = false;
if (o instanceof TextPair) {
TextPair iPair = (TextPair)o;
isEqual = first.equals(iPair.first) && second.equals(iPair.second);
}
return isEqual;
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
TextPair other = (TextPair) obj;
if (first == null) {
if (other.first != null)
return false;
} else if (!first.equals(other.first))
return false;
if (second == null) {
if (other.second != null)
return false;
} else if (!second.equals(other.second))
return false;
return true;
}
@Override
public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.getFirst());
if (0 == cmp) {
cmp = second.compareTo(tp.getSecond());
if (first.toString().equals(tp.first.toString())) {
return second.toString().compareTo(tp.second.toString());
} else {
return first.toString().compareTo(tp.first.toString());
}
return cmp;
}
public void setDeli(String deli) {
this.deli = deli;
}
@Override
public String toString() {
return first + deli + second;
return String.format("%s-%s",
first, second);
}
// DO NOT TOUCH THE CODE BELOW
// DO NOT TOUCH THE CODE BELOW
/** Compare two pairs based on their values */
public static class Comparator extends WritableComparator {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment