Commit fcb3be1d authored by TRUONG Quang-Huy's avatar TRUONG Quang-Huy

lkdsajflka

parent 5cc0ab03
This diff is collapsed.
......@@ -7,7 +7,7 @@
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>WordCount</name>
<name>WordCountIMC</name>
<url>http://maven.apache.org</url>
<properties>
......
package fr.eurecom.dsg.mapreduce;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Pair extends Configured implements Tool {
public static class PairMapper
extends Mapper<LongWritable, // TODO: change Object to input key type
Text, // TODO: change Object to input value type
TextPair, // TODO: change Object to output key type
IntWritable> { // TODO: change Object to output value type
// TODO: implement mapper
protected void map(LongWritable key, Text text, Context context) {
//TODO :
//TextPair tp = new TextPair();
//context.write(tp, IntWritable(1));
}
}
public static class PairReducer
extends Reducer<TextPair, // TODO: change Object to input key type
IntWritable, // TODO: change Object to input value type
TextPair, // TODO: change Object to output key type
IntWritable> { // TODO: change Object to output value type
IntWritable writableSum= new IntWritable();
// TODO: implement reducer
protected void reduce(TextPair tp,
Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// TODO: implement the reduce method (use context.write to emit results)
int sum = 0;
for (IntWritable value : values)
sum += value.get();
writableSum.set(sum);
context.write(tp,writableSum);
}
}
private int numReducers;
private Path inputPath;
private Path outputDir;
public Pair(String[] args) {
if (args.length != 3) {
System.out.println("Usage: Pair <num_reducers> <input_path> <output_path>");
System.exit(0);
}
this.numReducers = Integer.parseInt(args[0]);
this.inputPath = new Path(args[1]);
this.outputDir = new Path(args[2]);
}
@Override
public int run(String[] args) throws Exception {
// TODO: set job input format
Configuration conf = this.getConf();
Job job = new Job(conf,"Word Count");
job.setInputFormatClass(TextInputFormat.class);
// TODO: set map class and the map output key and value classes
job.setMapperClass(PairMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// TODO: set reduce class and the reduce output key and value classes
job.setReducerClass(PairReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// TODO: set job output format
job.setOutputFormatClass(TextOutputFormat.class);
// TODO: add the input file as job input (from HDFS) to the variable inputFile
FileInputFormat.addInputPath(job,this.inputPath);
// TODO: set the output path for the job results (to HDFS) to the variable outputPath
FileOutputFormat.setOutputPath(job, this.outputDir);
// TODO: set the number of reducers using variable numberReducers
job.setNumReduceTasks(this.numReducers);
// TODO: set the jar class
job.setJarByClass(WordCount.class);
job.waitForCompletion(true);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Pair(args), args);
System.exit(res);
}
}
package fr.eurecom.dsg.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Stripes extends Configured implements Tool {
private int numReducers;
private Path inputPath;
private Path outputDir;
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job = null; // TODO: define new job instead of null using conf e setting a name
// TODO: set job input format
// TODO: set map class and the map output key and value classes
// TODO: set reduce class and the reduce output key and value classes
// TODO: set job output format
// TODO: add the input file as job input (from HDFS) to the variable inputFile
// TODO: set the output path for the job results (to HDFS) to the variable outputPath
// TODO: set the number of reducers using variable numberReducers
// TODO: set the jar class
return job.waitForCompletion(true) ? 0 : 1;
}
public Stripes (String[] args) {
if (args.length != 3) {
System.out.println("Usage: Stripes <num_reducers> <input_path> <output_path>");
System.exit(0);
}
this.numReducers = Integer.parseInt(args[0]);
this.inputPath = new Path(args[1]);
this.outputDir = new Path(args[2]);
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Stripes(args), args);
System.exit(res);
}
}
class StripesMapper
extends Mapper<Object, // TODO: change Object to input key type
Object, // TODO: change Object to input value type
Object, // TODO: change Object to output key type
Object> { // TODO: change Object to output value type
@Override
public void map(Object key, // TODO: change Object to input key type
Object value, // TODO: change Object to input value type
Context context)
throws java.io.IOException, InterruptedException {
// TODO: implement map method
}
}
class StripesReducer
extends Reducer<Object, // TODO: change Object to input key type
Object, // TODO: change Object to input value type
Object, // TODO: change Object to output key type
Object> { // TODO: change Object to output value type
@Override
public void reduce(Object key, // TODO: change Object to input key type
Iterable<Object> values, // TODO: change Object to input value type
Context context) throws IOException, InterruptedException {
// TODO: implement the reduce method
}
}
\ No newline at end of file
package fr.eurecom.dsg.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
/**
* TextPair is a Pair of Text that is Writable (Hadoop serialization API)
* and Comparable to itself.
*
*/
public class TextPair implements WritableComparable<TextPair> {
// TODO: add the pair objects as TextPair fields
private Text first;
private Text second;
public void set(Text first, Text second) {
// TODO: implement the set method that changes the Pair content
this.first = first;
this.second = second;
}
public Text getFirst() {
// TODO: implement the first getter
return this.first;
}
public Text getSecond() {
// TODO: implement the second getter
return this.second;
}
public TextPair() {
// TODO: implement the constructor, empty constructor MUST be implemented
// this
this.first = new Text();
this.second = new Text();
}
public TextPair(String first, String second) {
this.set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
this.set(first, second);
}
@Override
public void write(DataOutput out) throws IOException {
// TODO: write to out the serialized version of this such that
// can be deserializated in future. This will be use to write to HDFS
this.first.write(out);
this.second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO: read from in the serialized version of a Pair and deserialize it
this.first.readFields(in);
this.second.readFields(in);
}
@Override
public int hashCode() {
// TODO: implement hash
return this.hashCode();
//return 0;
}
@Override
public boolean equals(Object o) {
// TODO: implement equals
if (o.hashCode() == this.hashCode()){
return true;
}
return false;
}
@Override
public int compareTo(TextPair tp) {
// TODO: implement the comparison between this and tp
int cmp = this.first.compareTo(tp.first);
if (cmp != 0 ) return cmp;
return this.second.compareTo(tp.second);
}
@Override
public String toString() {
// TODO: implement toString for text output format
return "("+this.first.toString()+";"+ this.second.toString()+")";
}
// DO NOT TOUCH THE CODE BELOW
/** Compare two pairs based on their values */
public static class Comparator extends WritableComparator {
/** Reference to standard Hadoop Text comparator */
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public Comparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
if (cmp != 0) {
return cmp;
}
return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,
b2, s2 + firstL2, l2 - firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}
static {
WritableComparator.define(TextPair.class, new Comparator());
}
/** Compare just the first element of the Pair */
public static class FirstComparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public FirstComparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@SuppressWarnings("unchecked")
@Override
public int compare(WritableComparable a, WritableComparable b) {
if (a instanceof TextPair && b instanceof TextPair) {
return ((TextPair) a).getFirst().compareTo(((TextPair) b).getFirst());
}
return super.compare(a, b);
}
}
}
package fr.eurecom.dsg.mapreduce;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* Word Count example of MapReduce job. Given a plain text in input, this job
* counts how many occurrences of each word there are in that text and writes
* the result on HDFS.
*
*/
public class WordCountCombiner extends Configured implements Tool {
private int numReducers;
private Path inputPath;
private Path outputDir;
@Override
public int run(String[] args) throws Exception {
//Job job = null; // TODO: define new job instead of null using conf e setting a name
Configuration conf = this.getConf();
Job job = new Job(conf,"Word Count");
// TODO: set job input format
job.setInputFormatClass(TextInputFormat.class);
// TODO: set map class and the map output key and value classes
job.setMapperClass(WCMapperCombiner.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// * TODO: set the combiner class and the combiner output key and value classes
job.setCombinerClass(WCReducerCombiner.class);
//////
// TODO: set reduce class and the reduce output key and value classes
job.setReducerClass(WCReducerCombiner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// TODO: set job output format
job.setOutputFormatClass(TextOutputFormat.class);
// TODO: add the input file as job input (from HDFS)
// TODO: set the output path for the job results (to HDFS)
// TODO: set the number of reducers. This is optional and by default is 1
// TODO: set the jar class
FileInputFormat.addInputPath(job, this.inputPath);
FileOutputFormat.setOutputPath(job, this.outputDir);
job.setNumReduceTasks(this.numReducers);
job.setJarByClass(WordCount.class);
return job.waitForCompletion(true) ? 0 : 1; // this will execute the job
}
public WordCountCombiner (String[] args) {
if (args.length != 3) {
System.out.println("Usage: WordCountCombiner <num_reducers> <input_path> <output_path>");
System.exit(0);
}
this.numReducers = Integer.parseInt(args[0]);
this.inputPath = new Path(args[1]);
this.outputDir = new Path(args[2]);
}
public static void main(String args[]) throws Exception {
int res = ToolRunner.run(new Configuration(), new WordCountCombiner(args), args);
System.exit(res);
}
}
class WCMapperCombiner extends Mapper<LongWritable, Text, Text, LongWritable> {
private Text word = new Text();
private final static LongWritable ONE = new LongWritable(1);
@Override
protected void map(LongWritable offset, Text text, Context context)
throws IOException, InterruptedException {
StringTokenizer iter = new StringTokenizer(text.toString());
while (iter.hasMoreTokens()) {
this.word.set(iter.nextToken());
context.write(this.word , ONE);
}
}
}
class WCReducerCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text word, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
long accumulator = 0;
for (LongWritable value : values) {
accumulator += value.get();
}
context.write(word, new LongWritable(accumulator));
}
}
......@@ -11,11 +11,11 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
......@@ -34,10 +34,10 @@ public class WordCountIMC extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//Job job = null; // DONE: define new job instead of null using conf e setting
/* //Job job = null; // DONE: define new job instead of null using conf e setting
// a name
Configuration conf = this.getConf();
Job job = new Job(conf,"WordCountIMC");
Job job = new Job(conf,"Word Count");
// DONE: set job input format
job.setInputFormatClass(TextInputFormat.class);
......@@ -52,7 +52,7 @@ public class WordCountIMC extends Configured implements Tool {
job.setOutputValueClass(IntWritable.class);
// DONE: set job output format
job.setOutputFormatClass(OutputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// DONE: add the input file as job input (from HDFS)
FileInputFormat.addInputPath(job,this.inputPath);
......@@ -62,8 +62,34 @@ public class WordCountIMC extends Configured implements Tool {
job.setNumReduceTasks(this.numReducers);
// DONE: set the jar class
job.setJarByClass(WordCountIMC.class);
job.waitForCompletion(true);
return 0; // this will execute the jo*/
Configuration conf = this.getConf();
Job job = new Job(conf,"Word Count");
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(WCIMCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(WCIMCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1; // this will execute the job
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, this.inputPath);
FileOutputFormat.setOutputPath(job, this.outputDir);
job.setNumReduceTasks(this.numReducers);
job.setJarByClass(WordCountIMC.class);
job.waitForCompletion(true);
return 0;
}
public WordCountIMC (String[] args) {
......@@ -82,11 +108,7 @@ public class WordCountIMC extends Configured implements Tool {
}
}
class WCIMCMapper extends Mapper<LongWritable,
// type
Text,
Text,
IntWritable> {
class WCIMCMapper extends Mapper<LongWritable,Text, Text, IntWritable> {
private IntWritable ONE = new IntWritable(1);
private Map<String,IntWritable> H;
......@@ -94,13 +116,12 @@ class WCIMCMapper extends Mapper<LongWritable,
@Override
protected void setup(Context context){
this.H = new HashMap<String, IntWritable>() ;
}
@Override
protected void map(LongWritable key,
Text text,
Context context) throws IOException, InterruptedException {
protected void map(LongWritable key, Text text, Context context) {
// * TODO: implement the map method (use context.write to emit results). Use
// the in-memory combiner technique
......@@ -129,7 +150,6 @@ class WCIMCMapper extends Mapper<LongWritable,
}
class WCIMCReducer extends Reducer<Text,
// type
IntWritable,
Text,
IntWritable> {
......@@ -149,5 +169,5 @@ class WCIMCReducer extends Reducer<Text,
writableSum.set(sum);
context.write(key,writableSum);
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment