Commit 18081da2 authored by feng's avatar feng

Stripes 10

parent f1acfd35
This diff is collapsed.
......@@ -2,98 +2,204 @@ package fr.eurecom.dsg.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
/*
* Very simple (and scholastic) implementation of a Writable associative map for String to Int
* Very simple (and scholastic) implementation of a Writable associative array for String to Int
*
**/
public class StringToIntMapWritable implements Writable {
// xTODO: add an internal field that is the real associative array
public Map<String, Long> instance;
public Text stringTrans = new Text();
public LongWritable intTrans = new LongWritable();
public StringToIntMapWritable(){
this.instance = new Hashtable<>();
// TODO: add an internal field that is the real associative array
private HashMap<String, Integer> hashMap;
private Text word;
private IntWritable count;
public StringToIntMapWritable() {
hashMap = new HashMap<String, Integer>();
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO: implement deserialization
hashMap.clear();
int size = in.readInt();
for (int i = 0; i < size; i++) {
word.readFields(in);
count.readFields(in);
hashMap.put(word.toString(), count.get());
}
// Warning: for efficiency reasons, Hadoop attempts to re-use old instances of
// StringToIntMapWritable when reading new records. Remember to initialize your variables
// inside this function, in order to get rid of old data.
}
@Override
public void write(DataOutput out) throws IOException {
// TODO: implement serialization
out.writeInt(hashMap.size());
Iterator iterator = hashMap.keySet().iterator();
while (iterator.hasNext()) {
word.set((String) iterator.next());
count.set(hashMap.get(iterator.next()));
word.write(out);
count.write(out);
}
}
public void clear(){ this.instance.clear(); }
public void clear() {
hashMap.clear();
}
public void add(Text input){
String word = input.toString();
if(this.instance.containsKey(word)){
long value = this.instance.get(word) + (long)1;
this.instance.put(word, value);
} else {
this.instance.put(word, (long)1);
}
public void add(String word) {
if (hashMap.containsKey(word)) {
hashMap.put(word, hashMap.get(word) + 1);
} else {
hashMap.put(word, 1);
}
}
public void add(String word, Long i){
if(this.instance.containsKey(word)){
long value = this.instance.get(word) + i;
this.instance.put(word, value);
} else {
this.instance.put(word, i);
}
public void addStripe(StringToIntMapWritable stripe) {
Iterator iterator = (Iterator) stripe.hashMap.keySet();
while (iterator.hasNext()) {
String newWord = (String) iterator.next();
add(newWord);
}
}
public Set<Map.Entry<String, Long>> entrySet(){
return this.instance.entrySet();
public String toString() {
String result = " {";
Iterator iterator = hashMap.keySet().iterator();
while (iterator.hasNext()) {
result = iterator.next() + " : " + hashMap.get(iterator.next());
}
return result + " }";
}
}
public void merge(StringToIntMapWritable input){
for(Map.Entry<String, Long> entry : input.entrySet()){
this.add(entry.getKey(), entry.getValue());
}
}
@Override
public void readFields(DataInput in) throws IOException {
// xTODO: implement deserialization
// Warning: for efficiency reasons, Hadoop attempts to re-use old instances of
// StringToIntMapWritable when reading new records. Remember to initialize your variables
// inside this function, in order to get rid of old data.
this.clear();
int numberOfEntries = in.readInt();
for(int i = 0; i < numberOfEntries; i ++){
this.stringTrans.readFields(in);
this.intTrans.readFields(in);
this.instance.put(this.stringTrans.toString(), this.intTrans.get());
}
}
@Override
public void write(DataOutput out) throws IOException {
// xTODO: implement serialization
out.writeInt(this.instance.size());
for(Map.Entry<String, Long> entry : this.instance.entrySet()){
this.stringTrans.set(entry.getKey());
this.intTrans.set(entry.getValue());
this.stringTrans.write(out);
this.intTrans.write(out);
}
/*
* Very simple (and scholastic) implementation of a Writable associative array for String to Int
*
**/
/*
public class StringToIntMapWritable implements Writable {
// TODO: add an internal field that is the real associative array
private HashMap<Text, IntWritable> hashMap;
private Text word;
private IntWritable count;
public StringToIntMapWritable() {
hashMap = new HashMap<Text, IntWritable>();
}
public StringToIntMapWritable(Text text, IntWritable intWritable) {
this();
this.word = text;
this.count = intWritable;
hashMap.put(this.word, this.count);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO: implement deserialization
// Warning: for efficiency reasons, Hadoop attempts to re-use old instances of
// StringToIntMapWritable when reading new records. Remember to initialize your variables
// inside this function, in order to get rid of old data.
hashMap.clear();
int size = in.readInt();
for (int i = 0; i < size; i++) {
word.readFields(in); // read the size of word from in
count.readFields(in);
hashMap.put(word, count);
}
}
@Override
public void write(DataOutput out) throws IOException {
// TODO: implement serialization
out.writeInt(hashMap.size());
Set<Text> keys = hashMap.keySet();
Iterator iterator = keys.iterator();
while (iterator.hasNext()) {
// write key, write method only write the size of the object bits, here is the size of word
word = (Text) iterator.next();
word.write(out);
// write value
count = hashMap.get(word);
count.write(out);
}
}
public void add(String second, int count) {
hashMap.put(new Text(second), new IntWritable(count));
}
@Override
public String toString(){
String value = ":{";
for (Map.Entry<String, Long> entry : this.entrySet()) {
value = value + entry.getKey() + ": " + entry.getValue().toString() + "\t";
}
value = value + "}";
return value;
public int getValueWithStringKey(String key) {
if (containsKey(key)) {
return hashMap.get(new Text(key)).get();
} else {
return 0;
}
}
public void addStripe(StringToIntMapWritable stripe) {
Iterator iterator = stripe.getHashMap().keySet().iterator();
while (iterator.hasNext()) {
Text word = (Text)iterator.next();
if (this.hashMap.containsKey(word)) {
this.hashMap.put(word, new IntWritable(this.hashMap.get(word).get() + stripe.hashMap.get(word).get()));
} else {
this.hashMap.put(word, stripe.hashMap.get(word));
}
}
}
public boolean containsKey (String key) {
return hashMap.containsKey(new Text(key));
}
public HashMap<Text, IntWritable> getHashMap() {
return hashMap;
}
public void setHashMap(HashMap<Text, IntWritable> hashMap) {
this.hashMap = hashMap;
}
public String toString() {
String result = " : {";
Iterator iterator = hashMap.keySet().iterator();
while (iterator.hasNext()) {
word = (Text)iterator.next();
count = hashMap.get(iterator.next());
result += word.toString() + " : " + count.toString();
}
return result + " }";
}
}
*/
......@@ -17,7 +17,6 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Stripes extends Configured implements Tool {
private int numReducers;
......@@ -28,22 +27,30 @@ public class Stripes extends Configured implements Tool {
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job = new Job(conf, "Word Co-Occurrence Stripes Pattern"); // xTODO: define new job instead of null using conf e setting a name
job.setInputFormatClass(TextInputFormat.class);// xTODO: set job input format
job.setMapperClass(StripesMapper.class);// xTODO: set map class and the map output key and value classes
Job job = null; // TODO: define new job instead of null using conf e setting a name
job = new Job(conf, "Stripes");
// TODO: set job input format
job.setInputFormatClass(TextInputFormat.class);
// TODO: set map class and the map output key and value classes
job.setMapperClass(StripesMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(StringToIntMapWritable.class);
job.setReducerClass(StripesReducer.class);// xTODO: set reduce class and the reduce output key and value classes
// TODO: set combiner
job.setCombinerClass(StripesReducer.class);
// TODO: set reduce class and the reduce output key and value classes
job.setReducerClass(StripesReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(StringToIntMapWritable.class);
job.setCombinerClass(StripesReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);// xTODO: set job output format
FileInputFormat.addInputPath(job, this.inputPath);// xTODO: add the input file as job input (from HDFS) to the variable inputFile
FileOutputFormat.setOutputPath(job, this.outputDir);// xTODO: set the output path for the job results (to HDFS) to the variable outputPath
job.setNumReduceTasks(this.numReducers);// xTODO: set the number of reducers using variable numberReducers
job.setJarByClass(Stripes.class);// xTODO: set the jar class
// TODO: set job output format
job.setOutputFormatClass(TextOutputFormat.class);
// TODO: add the input file as job input (from HDFS) to the variable inputFile
FileInputFormat.addInputPath(job, this.inputPath);
// TODO: set the output path for the job results (to HDFS) to the variable outputPath
FileOutputFormat.setOutputPath(job, this.outputDir);
// TODO: set the number of reducers using variable numberReducers
job.setNumReduceTasks(this.numReducers);
// TODO: set the jar class
job.setJarByClass(Stripes.class);
return job.waitForCompletion(true) ? 0 : 1;
}
......@@ -61,60 +68,113 @@ public class Stripes extends Configured implements Tool {
int res = ToolRunner.run(new Configuration(), new Stripes(args), args);
System.exit(res);
}
}
class StripesMapper
extends Mapper<LongWritable, // TODO: change Object to input key type
Text, // TODO: change Object to input value type
Text, // TODO: change Object to output key type
StringToIntMapWritable> { // TODO: change Object to output value type
public static class StripesMapper
extends Mapper<LongWritable, // xTODO: change Object to input key type
Text, // xTODO: change Object to input value type
Text, // xTODO: change Object to output key type
StringToIntMapWritable> { // xTODO: change Object to output value type
private Text first = new Text();
private Text second = new Text();
private StringToIntMapWritable count = new StringToIntMapWritable();
@Override
public void map(LongWritable offset, // xTODO: change Object to input key type
Text line, // xTODO: change Object to input value type
Context context)
throws java.io.IOException, InterruptedException {
// xTODO: implement map method
count.clear();
String[] words = line.toString().split("\\s+");
for (String word1 : words) {
count.clear();
this.first.set(word1);
for (String word2 : words) {
if (!(word2.equals(word1))) {
this.second.set(word2);
this.count.add(second);
}
StringToIntMapWritable stripe = new StringToIntMapWritable();
@Override
public void map(LongWritable key, // TODO: change Object to input key type
Text value, // TODO: change Object to input value type
Context context)
throws IOException, InterruptedException {
// TODO: implement map method
String line = value.toString();
String[] words = line.split("\\s+");
for (String first : words) {
stripe.clear();
for (String second : words) {
if (!first.equals(second)) {
stripe.add(second);
}
context.write(this.first, this.count);
}
context.write(new Text(first), stripe);
}
}
}
public static class StripesReducer
extends Reducer<Text, // xTODO: change Object to input key type
StringToIntMapWritable, // xTODO: change Object to input value type
Text, // xTODO: change Object to output key type
StringToIntMapWritable> { // xTODO: change Object to output value type
class StripesReducer
extends Reducer<Text, // TODO: change Object to input key type
StringToIntMapWritable, // TODO: change Object to input value type
Text, // TODO: change Object to output key type
StringToIntMapWritable> { // TODO: change Object to output value type
@Override
public void reduce(Text key, // TODO: change Object to input key type
Iterable<StringToIntMapWritable> values, // TODO: change Object to input value type
Context context) throws IOException, InterruptedException {
private StringToIntMapWritable sumStringToIntMap = new StringToIntMapWritable();
// TODO: implement the reduce method
StringToIntMapWritable stripeFinal = new StringToIntMapWritable();
@Override
public void reduce(Text key, // xTODO: change Object to input key type
Iterable<StringToIntMapWritable> values, // xTODO: change Object to input value type
Context context) throws IOException, InterruptedException {
for (StringToIntMapWritable value : values) {
// get the stripe which is a AssociativeArray, inside in associative array, there's a hashmap
stripeFinal.addStripe(value);
}
context.write(key, stripeFinal);
}
}
sumStringToIntMap.clear();
// xTODO: implement the reduce method
for(StringToIntMapWritable i : values){
sumStringToIntMap.merge(i);
/*
class StripesMapper
extends Mapper<LongWritable, // TODO: change Object to input key type
Text, // TODO: change Object to input value type
Text, // TODO: change Object to output key type
StringToIntMapWritable> { // TODO: change Object to output value type
@Override
public void map(LongWritable key, // TODO: change Object to input key type
Text value, // TODO: change Object to input value type
Context context)
throws IOException, InterruptedException {
// TODO: implement map method
String line = value.toString();
String[] words = line.split("\\s+");
StringToIntMapWritable stripe = new StringToIntMapWritable();
for (String first : words) {
stripe.getHashMap().clear();
for (String second : words) {
if (first != second) {
if (stripe.containsKey(second)) {
int count = stripe.getValueWithStringKey(second);
count++;
stripe.add(second, count);
} else {
stripe.add(second, (int)1);
}
}
}
context.write(key, sumStringToIntMap);
context.write(new Text(first), stripe);
}
}
}
class StripesReducer
extends Reducer<Text, // TODO: change Object to input key type
StringToIntMapWritable, // TODO: change Object to input value type
Text, // TODO: change Object to output key type
StringToIntMapWritable> { // TODO: change Object to output value type
@Override
public void reduce(Text key, // TODO: change Object to input key type
Iterable<StringToIntMapWritable> values, // TODO: change Object to input value type
Context context) throws IOException, InterruptedException {
// TODO: implement the reduce method
// the value to send in the end
StringToIntMapWritable stripe = new StringToIntMapWritable();
HashMap<Text, IntWritable> hashMap = stripe.getHashMap();
for (StringToIntMapWritable value : values) {
// get the stripe which is a AssociativeArray, inside in associative array, there's a hashmap
stripe.addStripe(value);
}
context.write(key, stripe);
}
}
\ No newline at end of file
}
*/
\ No newline at end of file
package fr.eurecom.dsg.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
/*
* Very simple (and scholastic) implementation of a Writable associative array for String to Int
*
**/
public class StringToIntMapWritable implements Writable {
// TODO: add an internal field that is the real associative array
private HashMap<String, Integer> hashMap;
private Text word;
private IntWritable count;
public StringToIntMapWritable() {
hashMap = new HashMap<String, Integer>();
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO: implement deserialization
hashMap.clear();
int size = in.readInt();
for (int i = 0; i < size; i++) {
word.readFields(in);
count.readFields(in);
hashMap.put(word.toString(), count.get());
}
// Warning: for efficiency reasons, Hadoop attempts to re-use old instances of
// StringToIntMapWritable when reading new records. Remember to initialize your variables
// inside this function, in order to get rid of old data.
}
@Override
public void write(DataOutput out) throws IOException {
// TODO: implement serialization
out.writeInt(hashMap.size());
Iterator iterator = hashMap.keySet().iterator();
while (iterator.hasNext()) {
word.set((String) iterator.next());
count.set(hashMap.get(iterator.next()));
word.write(out);
count.write(out);
}
}
public void clear() {
hashMap.clear();
}
public void add(String word) {
if (hashMap.containsKey(word)) {
hashMap.put(word, hashMap.get(word) + 1);
} else {
hashMap.put(word, 1);
}
}
public void addStripe(StringToIntMapWritable stripe) {
Iterator iterator = (Iterator) stripe.hashMap.keySet();
while (iterator.hasNext()) {
String newWord = (String) iterator.next();
add(newWord);
}
}
public String toString() {
String result = " {";
Iterator iterator = hashMap.keySet().iterator();
while (iterator.hasNext()) {
result = iterator.next() + " : " + hashMap.get(iterator.next());
}
return result + " }";
}
}
/*
* Very simple (and scholastic) implementation of a Writable associative array for String to Int
*
**/
/*
public class StringToIntMapWritable implements Writable {
// TODO: add an internal field that is the real associative array
private HashMap<Text, IntWritable> hashMap;
private Text word;
private IntWritable count;
public StringToIntMapWritable() {
hashMap = new HashMap<Text, IntWritable>();
}
public StringToIntMapWritable(Text text, IntWritable intWritable) {
this();
this.word = text;
this.count = intWritable;
hashMap.put(this.word, this.count);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO: implement deserialization
// Warning: for efficiency reasons, Hadoop attempts to re-use old instances of
// StringToIntMapWritable when reading new records. Remember to initialize your variables
// inside this function, in order to get rid of old data.
hashMap.clear();
int size = in.readInt();
for (int i = 0; i < size; i++) {
word.readFields(in); // read the size of word from in
count.readFields(in);
hashMap.put(word, count);
}
}
@Override
public void write(DataOutput out) throws IOException {