Commit 348bca33 authored by dangq's avatar dangq

Stripes

parent 19b01b2c
This diff is collapsed.
......@@ -4,8 +4,15 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.TreeSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.ArrayList;
import org.apache.hadoop.io.Writable;
......@@ -16,58 +23,55 @@ import org.apache.hadoop.io.Writable;
public class StringToIntMapWritable implements Writable {
// TODO: add an internal field that is the real associative array
//private HashMap<String, Integer> hm = new HashMap<String, Integer>();
public final Map<String, Integer> counts;
private HashMap<String, Integer> hm = new HashMap<String,Integer>();
public StringToIntMapWritable(Map<String, Integer> counts) {
this.counts = counts;
}
public StringToIntMapWritable(){
this(new HashMap<String, Integer>());
}
@Override
public int hashCode(){
final int prime=31;
int result =1;
result= prime * result + ((counts==null)? 0: counts.hashCode());
return result;
}
@Override
public boolean equals(Object obj){
if (this==obj)
return true;
if (obj==null)
return false;
if (getClass()!=obj.getClass())
return false;
StringToIntMapWritable other =(StringToIntMapWritable) obj;
if (counts==null){
if (other.counts!=null)
return false;
} else if (!counts.equals(other.counts))
return false;
return true;
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO: implement deserialization
counts.clear();
String inLine=in.readLine();
if (inLine!=null){
StringTokenizer tokenizer = new StringTokenizer(inLine," ");
while (tokenizer.hasMoreElements()){
String occurence = tokenizer.nextToken();
String occurenceWord = occurence.substring(0,occurence.lastIndexOf("-"));
int occurenceCount = Integer.parseInt(occurence.substring(occurence.lastIndexOf("-")+1));
counts.put(occurenceWord,occurenceCount);
}
int len =in.readInt();
hm.clear();
for (int i=0;i<len;i++){
int l=in.readInt();
byte[] ba =new byte[l];
in.readFully(ba);
String key=new String(ba);
Integer value=in.readInt();
hm.put(key,value);
}
}
public StringToIntMapWritable(){
}
public StringToIntMapWritable(String key, Integer value){
hm.put(key,value);
}
public List<StringToIntMapWritable> getElements(){
List<StringToIntMapWritable> result = new ArrayList<StringToIntMapWritable>();
for (String key : hm.keySet()) {
result.add(new StringToIntMapWritable(key, hm.get(key)));
}
return result;
}
public Set<String> getKeySet() {
return hm.keySet();
}
public Integer getValue(String key) {
return hm.get(key);
}
@Override
public String toString() {
return hm.keySet().iterator().next() + " " + hm.values().iterator().next();
}
// Warning: for efficiency reasons, Hadoop attempts to re-use old instances of
// StringToIntMapWritable when reading new records. Remember to initialize your variables
......@@ -75,17 +79,48 @@ public class StringToIntMapWritable implements Writable {
@Override
public void write(DataOutput out) throws IOException {
// TODO: implement serialization
for (String s : counts.keySet()) {
out.write((s + "-" + counts.get(s) + " ").getBytes());
out.write(hm.size());
Iterator<Entry<String,Integer>> it = hm.entrySet().iterator();
while (it.hasNext()){
Map.Entry<String,Integer> pairs = (Map.Entry<String,Integer>)it.next();
String k=(String) pairs.getKey();
Integer v =(Integer) pairs.getValue();
out.writeInt(k.length());
out.writeBytes(k);
out.writeInt(v);
}
}
@Override
public String toString(){
StringBuffer s = new StringBuffer();
for (String key: new TreeSet<String>(counts.keySet())){
s.append((key+"-"+counts.get(key)+" "));
public void clear() {
hm.clear();
}
public void increment(String t) {
int count = 1;
if (hm.containsKey(t)) {
count = hm.get(t) + count;
}
hm.put(t, count);
}
public void increment(String t, int value) {
int count = value;
if (hm.containsKey(t)) {
count = hm.get(t) + count;
}
return s.toString();
hm.put(t, count);
}
public void sum(StringToIntMapWritable h) {
Iterator<Entry<String, Integer>> it = h.hm.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, Integer> pairs = (Map.Entry<String, Integer>) it
.next();
String k = (String) pairs.getKey();
Integer v = (Integer) pairs.getValue();
increment(k, v);
}
}
}
}
package fr.eurecom.dsg.mapreduce;
import java.io.IOException;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
......@@ -14,101 +15,120 @@ import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Stripes extends Configured implements Tool {
private int numReducers;
private Path inputPath;
private Path outputDir;
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
//Job job = null; // TODO: define new job instead of null using conf e setting a name
Job job = new Job(conf,"STRIPES");
// TODO: set job input format
job.setInputFormatClass(TextInputFormat.class);
// TODO: set map class and the map output key and value classes
job.setMapperClass(StripesMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// TODO: set reduce class and the reduce output key and value classes
job.setReducerClass(StripesReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// TODO: set job output format
job.setOutputFormatClass(TextOutputFormat.class);
// TODO: add the input file as job input (from HDFS) to the variable inputFile
FileInputFormat.addInputPath(job, this.inputPath);
// TODO: set the output path for the job results (to HDFS) to the variable outputPath
FileOutputFormat.setOutputPath(job, this.outputDir);
// TODO: set the number of reducers using variable numberReducers
job.setNumReduceTasks(this.numReducers);
// TODO: set the jar class
job.setJarByClass(WordCount.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public Stripes (String[] args) {
if (args.length != 3) {
System.out.println("Usage: Stripes <num_reducers> <input_path> <output_path>");
System.exit(0);
private int numReducers;
private Path inputPath;
private Path outputDir;
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job = new Job(conf, "Stripes");
// set job input format
job.setInputFormatClass(TextInputFormat.class);
// set map class and the map output key and value classes
job.setMapperClass(StripesMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(StringToIntMapWritable.class);
// set reduce class and the reduce output key and value classes
job.setReducerClass(StripesReducer.class);
// set job output format
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(StringToIntMapWritable.class);
// add the input file as job input (from HDFS) to the variable
// inputFile
FileInputFormat.addInputPath(job, inputPath);
// set the output path for the job results (to HDFS) to the
// variable
// outputPath
FileOutputFormat.setOutputPath(job, outputDir);
// set the number of reducers using variable numberReducers
job.setNumReduceTasks(numReducers);
// set the jar class
job.setJarByClass(Stripes.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public Stripes(String[] args) {
if (args.length != 3) {
System.out
.println("Usage: Stripes <num_reducers> <input_path> <output_path>");
System.exit(0);
}
this.numReducers = Integer.parseInt(args[0]);
this.inputPath = new Path(args[1]);
this.outputDir = new Path(args[2]);
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Stripes(args), args);
System.exit(res);
}
this.numReducers = Integer.parseInt(args[0]);
this.inputPath = new Path(args[1]);
this.outputDir = new Path(args[2]);
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Stripes(args), args);
System.exit(res);
}
}
class StripesMapper
extends Mapper<LongWritable, // TODO: change Object to input key type
Text, // TODO: change Object to input value type
TextPair, // TODO: change Object to output key type
IntWritable> { // TODO: change Object to output value type
@Override
public void map(LongWritable key, // TODO: change Object to input key type
Text value, // TODO: change Object to input value type
Context context)
throws java.io.IOException, InterruptedException {
String line = this.toString();
line = line.replaceAll("[^a-zA-Z0-9_]+", " ");
line = line.replaceAll("^\\s+", "");
String[] words = line.split("\\s+");
for (int i = 0; i < words.length - 1; i++) {
String first = words[i];
for (int j = 0; j < words.length - 1; j++) {
if (i != j) {
String second = words[i + 1];
context.write(new TextPair(first, second), new IntWritable(1));
}
}
}
// TODO: implement map method
}
class StripesMapper extends
Mapper<LongWritable, Text, Text, StringToIntMapWritable> {
@Override
public void map(LongWritable key, // input key type
Text value, // input value type
Context context) throws java.io.IOException, InterruptedException {
String line = value.toString();
line = line.replaceAll("[^a-zA-Z0-9_]+", " ");
line = line.replaceAll("^\\s+", "");
String[] tokens = line.split("\\s+");
StringToIntMapWritable h = new StringToIntMapWritable();
for (int i = 0; i < tokens.length - 1; i++) {
h.clear();
/*for (int j = Math.max(0, i - 1); j < Math.min(tokens.length, i + 2); j++) {
// search three element: previous, current, next ? why search previous
if (i == j)
// skip current element
continue;
h.increment(tokens[j]);
}*/
String left = tokens[i];
String right = tokens[i+1];
h.increment(right);
context.write(new Text(left), h);
}
}
}
class StripesReducer
extends Reducer<TextPair, // TODO: change Object to input key type
IntWritable, // TODO: change Object to input value type
TextPair, // TODO: change Object to output key type
IntWritable> { // TODO: change Object to output value type
@Override
public void reduce(TextPair key, // TODO: change Object to input key type
Iterable<IntWritable> values, // TODO: change Object to input value type
Context context) throws IOException, InterruptedException {
// TODO: implement the reduce method
}
class StripesReducer extends Reducer<Text, // input key type
StringToIntMapWritable, // input value type
Text, // output key type
StringToIntMapWritable> { // output value type
@Override
public void reduce(Text key, // input key type
Iterable<StringToIntMapWritable> values, // input value type
Context context) throws IOException, InterruptedException {
StringToIntMapWritable hf = new StringToIntMapWritable();
for (StringToIntMapWritable value : values) {
hf.sum(value);
}
for (StringToIntMapWritable item : hf.getElements()) {
context.write(key, item);
}
}
}
\ No newline at end of file
Manifest-Version: 1.0
Build-Jdk: 1.7.0_75
Built-By: hrua
Built-By: heocon
Created-By: IntelliJ IDEA
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment