Commit c3970aba authored by YUSHIQIAN's avatar YUSHIQIAN

word co-occurence pair

parent e1e0795d
package fr.eurecom.dsg.mapreduce;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Pair extends Configured implements Tool {
public static class PairMapper
extends Mapper<LongWritable, Text, TextPair, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private static Text word0 = new Text();
private static Text word1 = new Text();
private String pattern = "[^a-zA-Z0-9-']";
@Override
public void map(LongWritable inKey, Text inValue, Context context)throws IOException, InterruptedException {
String line = inValue.toString();
line = line.replaceAll(pattern, " ");
line = line.toLowerCase();
String[] str = line.split(" +");
for(int i=0; i< str.length-1; i++)
{
word0.set(str[i]);
word1.set(str[i+1]);
TextPair pair = new TextPair(word0, word1);
context.write(pair, one);
}
}
}
public static class PairReducer
extends Reducer<TextPair, IntWritable, TextPair, IntWritable> {
private IntWritable result = new IntWritable();
@Override
public void reduce(TextPair inKey, Iterable<IntWritable> inValues, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : inValues) {
sum += val.get();
}
result.set(sum);
context.write(inKey, result);
}
}
private int numReducers;
private Path inputPath;
private Path outputDir;
public Pair(String[] args) {
if (args.length != 3) {
System.out.println("Usage: Pair <num_reducers> <input_path> <output_path>");
System.exit(0);
}
this.numReducers = Integer.parseInt(args[0]);
this.inputPath = new Path(args[1]);
this.outputDir = new Path(args[2]);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job = new Job(conf,"Word co-occurrence");
// TODO: set job input format
// TODO: set map class and the map output key and value classes
// TODO: set reduce class and the reduce output key and value classes
// TODO: set job output format
// TODO: add the input file as job input (from HDFS) to the variable inputFile
// TODO: set the output path for the job results (to HDFS) to the variable outputPath
// TODO: set the number of reducers using variable numberReducers
// TODO: set the jar class
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(PairMapper.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(PairReducer.class);
job.setOutputKeyClass(TextPair.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setNumReduceTasks(Integer.parseInt(args[0]));
job.setJarByClass(Pair.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Pair(args), args);
System.exit(res);
}
}
package fr.eurecom.dsg.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
/**
* TextPair is a Pair of Text that is Writable (Hadoop serialization API)
* and Comparable to itself.
*
*/
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
private String deli = ",";
public void set(Text first, Text second) {
this.first.set(first);
this.second.set(second);
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
public TextPair() {
first = new Text();
second = new Text();
// for deserialization
}
public TextPair(String first, String second) {
this.set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
this.set(first, second);
}
@Override
public void write(DataOutput out) throws IOException {
// can be deserializated in future. This will be use to write to HDFS
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object o) {
boolean isEqual = false;
if (o instanceof TextPair) {
TextPair iPair = (TextPair)o;
isEqual = first.equals(iPair.first) && second.equals(iPair.second);
}
return isEqual;
}
@Override
public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.getFirst());
if (0 == cmp) {
cmp = second.compareTo(tp.getSecond());
}
return cmp;
}
public void setDeli(String deli) {
this.deli = deli;
}
@Override
public String toString() {
return first + deli + second;
}
// DO NOT TOUCH THE CODE BELOW
/** Compare two pairs based on their values */
public static class Comparator extends WritableComparator {
/** Reference to standard Hadoop Text comparator */
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public Comparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
if (cmp != 0) {
return cmp;
}
return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,
b2, s2 + firstL2, l2 - firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}
static {
WritableComparator.define(TextPair.class, new Comparator());
}
/** Compare just the first element of the Pair */
public static class FirstComparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public FirstComparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@SuppressWarnings("unchecked")
@Override
public int compare(WritableComparable a, WritableComparable b) {
if (a instanceof TextPair && b instanceof TextPair) {
return ((TextPair) a).getFirst().compareTo(((TextPair) b).getFirst());
}
return super.compare(a, b);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment