Commit c5b64585 authored by TRUONG Quang-Huy's avatar TRUONG Quang-Huy

WordCountIMC

parent 935cb767
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="" />
<mapping directory="" vcs="Git" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ChangeListManager">
<list default="true" id="30ff8914-3a60-4b29-8ec6-4036cadfcba8" name="Default" comment="" />
<list default="true" id="30ff8914-3a60-4b29-8ec6-4036cadfcba8" name="Default" comment="">
<change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/src/main/java/fr/eurecom/dsg/mapreduce/WordCountIMC.java" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/.idea/vcs.xml" afterPath="$PROJECT_DIR$/.idea/vcs.xml" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/.idea/workspace.xml" afterPath="$PROJECT_DIR$/.idea/workspace.xml" />
</list>
<ignored path="WordCount.iws" />
<ignored path=".idea/workspace.xml" />
<ignored path="$PROJECT_DIR$/out/" />
......@@ -26,36 +30,65 @@
</component>
<component name="FileEditorManager">
<leaf>
<file leaf-file-name="pom.xml" pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/pom.xml">
<file leaf-file-name="WordCount.java" pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/src/main/java/fr/eurecom/dsg/mapreduce/WordCount.java">
<provider selected="true" editor-type-id="text-editor">
<state vertical-scroll-proportion="0.0" vertical-offset="0" max-vertical-offset="1485">
<caret line="51" column="40" selection-start-line="51" selection-start-column="40" selection-end-line="51" selection-end-column="40" />
<state vertical-scroll-proportion="0.0" vertical-offset="519" max-vertical-offset="1425">
<caret line="41" column="0" selection-start-line="40" selection-start-column="7" selection-end-line="41" selection-end-column="0" />
<folding>
<element signature="imports" expanded="true" />
</folding>
</state>
</provider>
</entry>
</file>
<file leaf-file-name="WordCountCombiner.java" pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/src/main/java/fr/eurecom/dsg/mapreduce/WordCountCombiner.java">
<provider selected="true" editor-type-id="text-editor">
<state vertical-scroll-proportion="0.0" vertical-offset="909" max-vertical-offset="1500">
<caret line="85" column="33" selection-start-line="85" selection-start-column="1" selection-end-line="85" selection-end-column="33" />
<folding />
</state>
</provider>
</entry>
</file>
<file leaf-file-name="WordCount.java" pinned="false" current-in-tab="true">
<entry file="file://$PROJECT_DIR$/src/main/java/fr/eurecom/dsg/mapreduce/WordCount.java">
<file leaf-file-name="WordCountIMC.java" pinned="false" current-in-tab="true">
<entry file="file://$PROJECT_DIR$/src/main/java/fr/eurecom/dsg/mapreduce/WordCountIMC.java">
<provider selected="true" editor-type-id="text-editor">
<state vertical-scroll-proportion="0.47445256" vertical-offset="330" max-vertical-offset="1425">
<caret line="35" column="9" selection-start-line="35" selection-start-column="9" selection-end-line="35" selection-end-column="9" />
<state vertical-scroll-proportion="0.23357664" vertical-offset="1959" max-vertical-offset="2370">
<caret line="137" column="0" selection-start-line="137" selection-start-column="0" selection-end-line="137" selection-end-column="0" />
<folding>
<element signature="imports" expanded="true" />
<element signature="e#3393#3394#0" expanded="true" />
<element signature="e#3453#3454#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
</file>
<file leaf-file-name="Mapper.class" pinned="false" current-in-tab="false">
<entry file="jar://$MAVEN_REPOSITORY$/org/apache/hadoop/hadoop-core/2.5.0-mr1-cdh5.3.2/hadoop-core-2.5.0-mr1-cdh5.3.2.jar!/org/apache/hadoop/mapreduce/Mapper.class">
<provider selected="true" editor-type-id="text-editor">
<state vertical-scroll-proportion="-7.8" vertical-offset="135" max-vertical-offset="615">
<caret line="26" column="0" selection-start-line="26" selection-start-column="0" selection-end-line="26" selection-end-column="0" />
<folding />
</state>
</provider>
</entry>
</file>
</leaf>
</component>
<component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
</component>
<component name="IdeDocumentHistory">
<option name="CHANGED_PATHS">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
<option value="$PROJECT_DIR$/src/main/java/fr/eurecom/dsg/mapreduce/App.java" />
<option value="$PROJECT_DIR$/src/main/java/fr/eurecom/dsg/mapreduce/WordCountCombiner.java" />
<option value="$PROJECT_DIR$/src/main/java/fr/eurecom/dsg/mapreduce/WordCount.java" />
<option value="$PROJECT_DIR$/src/main/java/fr/eurecom/dsg/mapreduce/WordCountIMC.java" />
</list>
</option>
</component>
......@@ -127,6 +160,66 @@
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
</PATH>
<PATH>
<PATH_ELEMENT>
<option name="myItemId" value="WordCount" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="WordCount" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="src" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="main" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="java" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="mapreduce" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="WordCountIMC.java" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.ClassesTreeStructureProvider$PsiClassOwnerTreeNode" />
</PATH_ELEMENT>
</PATH>
<PATH>
<PATH_ELEMENT>
<option name="myItemId" value="WordCount" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="WordCount" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="src" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="main" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="java" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="mapreduce" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
<option name="myItemId" value="WordCountCombiner.java" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.ClassesTreeStructureProvider$PsiClassOwnerTreeNode" />
</PATH_ELEMENT>
</PATH>
<PATH>
<PATH_ELEMENT>
<option name="myItemId" value="WordCount" />
......@@ -164,8 +257,54 @@
<property name="MemberChooser.sorted" value="false" />
<property name="MemberChooser.showClasses" value="true" />
<property name="MemberChooser.copyJavadoc" value="false" />
<property name="recentsLimit" value="5" />
<property name="dynamic.classpath" value="false" />
</component>
<component name="RunManager">
<component name="RunManager" selected="Application.WordCountCombiner">
<configuration default="false" name="WordCount" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea">
<pattern>
<option name="PATTERN" value="fr.eurecom.dsg.mapreduce.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
<option name="MAIN_CLASS_NAME" value="fr.eurecom.dsg.mapreduce.WordCount" />
<option name="VM_PARAMETERS" />
<option name="PROGRAM_PARAMETERS" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
<option name="ALTERNATIVE_JRE_PATH" />
<option name="ENABLE_SWING_INSPECTOR" value="false" />
<option name="ENV_VARIABLES" />
<option name="PASS_PARENT_ENVS" value="true" />
<module name="WordCount" />
<envs />
<RunnerSettings RunnerId="Run" />
<ConfigurationWrapper RunnerId="Run" />
<method />
</configuration>
<configuration default="false" name="WordCountCombiner" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea">
<pattern>
<option name="PATTERN" value="fr.eurecom.dsg.mapreduce.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
<option name="MAIN_CLASS_NAME" value="fr.eurecom.dsg.mapreduce.WordCountCombiner" />
<option name="VM_PARAMETERS" />
<option name="PROGRAM_PARAMETERS" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
<option name="ALTERNATIVE_JRE_PATH" />
<option name="ENABLE_SWING_INSPECTOR" value="false" />
<option name="ENV_VARIABLES" />
<option name="PASS_PARENT_ENVS" value="true" />
<module name="WordCount" />
<envs />
<RunnerSettings RunnerId="Run" />
<ConfigurationWrapper RunnerId="Run" />
<method />
</configuration>
<configuration default="true" type="#org.jetbrains.idea.devkit.run.PluginConfigurationType" factoryName="Plugin">
<module name="" />
<option name="VM_PARAMETERS" value="-Xmx512m -Xms256m -XX:MaxPermSize=250m -ea" />
......@@ -319,7 +458,16 @@
<option name="FILTER_LOGCAT_AUTOMATICALLY" value="true" />
<method />
</configuration>
<list size="0" />
<list size="2">
<item index="0" class="java.lang.String" itemvalue="Application.WordCount" />
<item index="1" class="java.lang.String" itemvalue="Application.WordCountCombiner" />
</list>
<recent_temporary>
<list size="2">
<item index="0" class="java.lang.String" itemvalue="Application.WordCountCombiner" />
<item index="1" class="java.lang.String" itemvalue="Application.WordCount" />
</list>
</recent_temporary>
</component>
<component name="ShelveChangesManager" show_recycled="false" />
<component name="TaskManager">
......@@ -333,7 +481,7 @@
</component>
<component name="ToolWindowManager">
<frame x="0" y="0" width="1366" height="768" extended-state="6" />
<editor active="true" />
<editor active="false" />
<layout>
<window_info id="Palette&#9;" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="-1" side_tool="false" content_ui="tabs" />
<window_info id="UI Designer" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="-1" side_tool="false" content_ui="tabs" />
......@@ -341,8 +489,9 @@
<window_info id="Designer" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="-1" side_tool="false" content_ui="tabs" />
<window_info id="Terminal" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="-1" side_tool="false" content_ui="tabs" />
<window_info id="Palette" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="-1" side_tool="false" content_ui="tabs" />
<window_info id="Find" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="true" weight="0.32924962" sideWeight="0.5" order="1" side_tool="false" content_ui="tabs" />
<window_info id="Find" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.32924962" sideWeight="0.5" order="1" side_tool="false" content_ui="tabs" />
<window_info id="Ant Build" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.25" sideWeight="0.5" order="1" side_tool="false" content_ui="tabs" />
<window_info id="Debug" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.4" sideWeight="0.5" order="3" side_tool="false" content_ui="tabs" />
<window_info id="Event Log" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="-1" side_tool="true" content_ui="tabs" />
<window_info id="Favorites" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="-1" side_tool="true" content_ui="tabs" />
<window_info id="Version Control" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="-1" side_tool="false" content_ui="tabs" />
......@@ -353,10 +502,9 @@
<window_info id="Commander" active="false" anchor="right" auto_hide="false" internal_type="SLIDING" type="SLIDING" visible="false" weight="0.4" sideWeight="0.5" order="0" side_tool="false" content_ui="tabs" />
<window_info id="Application Servers" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="-1" side_tool="false" content_ui="tabs" />
<window_info id="Project" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="true" weight="0.25" sideWeight="0.5" order="0" side_tool="false" content_ui="combo" />
<window_info id="Run" active="true" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="true" weight="0.32924962" sideWeight="0.5" order="2" side_tool="false" content_ui="tabs" />
<window_info id="Cvs" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.25" sideWeight="0.5" order="4" side_tool="false" content_ui="tabs" />
<window_info id="Message" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="0" side_tool="false" content_ui="tabs" />
<window_info id="Debug" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.4" sideWeight="0.5" order="3" side_tool="false" content_ui="tabs" />
<window_info id="Run" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="2" side_tool="false" content_ui="tabs" />
<window_info id="Inspection" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.4" sideWeight="0.5" order="5" side_tool="false" content_ui="tabs" />
<window_info id="Hierarchy" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.25" sideWeight="0.5" order="2" side_tool="false" content_ui="combo" />
</layout>
......@@ -381,6 +529,10 @@
<breakpoint-manager />
<watches-manager />
</component>
<component name="antWorkspaceConfiguration">
<option name="IS_AUTOSCROLL_TO_SOURCE" value="false" />
<option name="FILTER_TARGETS" value="false" />
</component>
<component name="editorHistoryManager">
<entry file="file://$PROJECT_DIR$/pom.xml">
<provider selected="true" editor-type-id="text-editor">
......@@ -390,12 +542,40 @@
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/src/main/java/fr/eurecom/dsg/mapreduce/WordCountCombiner.java">
<provider selected="true" editor-type-id="text-editor">
<state vertical-scroll-proportion="0.0" vertical-offset="909" max-vertical-offset="1500">
<caret line="85" column="33" selection-start-line="85" selection-start-column="1" selection-end-line="85" selection-end-column="33" />
<folding />
</state>
</provider>
</entry>
<entry file="jar://$MAVEN_REPOSITORY$/org/apache/hadoop/hadoop-core/2.5.0-mr1-cdh5.3.2/hadoop-core-2.5.0-mr1-cdh5.3.2.jar!/org/apache/hadoop/mapreduce/Mapper.class">
<provider selected="true" editor-type-id="text-editor">
<state vertical-scroll-proportion="-7.8" vertical-offset="135" max-vertical-offset="615">
<caret line="26" column="0" selection-start-line="26" selection-start-column="0" selection-end-line="26" selection-end-column="0" />
<folding />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/src/main/java/fr/eurecom/dsg/mapreduce/WordCount.java">
<provider selected="true" editor-type-id="text-editor">
<state vertical-scroll-proportion="0.47445256" vertical-offset="330" max-vertical-offset="1425">
<caret line="35" column="9" selection-start-line="35" selection-start-column="9" selection-end-line="35" selection-end-column="9" />
<state vertical-scroll-proportion="0.0" vertical-offset="519" max-vertical-offset="1425">
<caret line="41" column="0" selection-start-line="40" selection-start-column="7" selection-end-line="41" selection-end-column="0" />
<folding>
<element signature="imports" expanded="true" />
</folding>
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/src/main/java/fr/eurecom/dsg/mapreduce/WordCountIMC.java">
<provider selected="true" editor-type-id="text-editor">
<state vertical-scroll-proportion="0.23357664" vertical-offset="1959" max-vertical-offset="2370">
<caret line="137" column="0" selection-start-line="137" selection-start-column="0" selection-end-line="137" selection-end-column="0" />
<folding>
<element signature="imports" expanded="true" />
<element signature="e#3393#3394#0" expanded="true" />
<element signature="e#3453#3454#0" expanded="true" />
</folding>
</state>
</provider>
......
package fr.eurecom.dsg.mapreduce;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* Word Count example of MapReduce job. Given a plain text in input, this job
* counts how many occurrences of each word there are in that text and writes
* the result on HDFS.
*
*/
public class WordCountCombiner extends Configured implements Tool {
private int numReducers;
private Path inputPath;
private Path outputDir;
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
//Job job = null; // TODO: define new job instead of null using conf e setting a name
Job job = new Job(conf,"Word Count");
// TODO: set job input format
job.setInputFormatClass(TextInputFormat.class);
// TODO: set map class and the map output key and value classes
job.setMapperClass(WCMapperCombiner.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// * TODO: set the combiner class and the combiner output key and value classes
// TODO: set reduce class and the reduce output key and value classes
job.setReducerClass(WCReducerCombiner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// TODO: set job output format
job.setOutputFormatClass(TextOutputFormat.class);
// TODO: add the input file as job input (from HDFS)
FileInputFormat.addInputPath(job, this.inputPath);
// TODO: set the output path for the job results (to HDFS)
FileOutputFormat.setOutputPath(job,this.outputDir);
// TODO: set the number of reducers. This is optional and by default is 1
job.setNumReduceTasks(this.numReducers);
// TODO: set the jar class
job.setJarByClass(WordCountCombiner.class);
return job.waitForCompletion(true) ? 0 : 1; // this will execute the job
}
public WordCountCombiner (String[] args) {
if (args.length != 3) {
System.out.println("Usage: WordCountCombiner <num_reducers> <input_path> <output_path>");
System.exit(0);
}
this.numReducers = Integer.parseInt(args[0]);
this.inputPath = new Path(args[1]);
this.outputDir = new Path(args[2]);
}
public static void main(String args[]) throws Exception {
int res = ToolRunner.run(new Configuration(), new WordCountCombiner(args), args);
System.exit(res);
}
}
class WCMapperCombiner extends Mapper<LongWritable, Text, Text, LongWritable> {
private Text word = new Text();
private final static LongWritable ONE = new LongWritable(1);
@Override
protected void map(LongWritable offset, Text text, Context context)
throws IOException, InterruptedException {
StringTokenizer iter = new StringTokenizer(text.toString());
while (iter.hasMoreTokens()) {
this.word.set(iter.nextToken());
context.write(this.word , ONE);
}
}
}
class WCReducerCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text word, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
long accumulator = 0;
for (LongWritable value : values) {
accumulator += value.get();
}
context.write(word, new LongWritable(accumulator));
}
}
package fr.eurecom.dsg.mapreduce;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* Word Count example of MapReduce job. Given a plain text in input, this job
* counts how many occurrences of each word there are in that text and writes
* the result on HDFS.
*
*/
public class WordCountIMC extends Configured implements Tool {
private int numReducers;
private Path inputPath;
private Path outputDir;
@Override
public int run(String[] args) throws Exception {
//Job job = null; // DONE: define new job instead of null using conf e setting
// a name
Configuration conf = this.getConf();
Job job = new Job(conf,"WordCountIMC");
// DONE: set job input format
job.setInputFormatClass(TextInputFormat.class);
// DONE: set map class and the map output key and value classes
job.setMapperClass(WCIMCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// DONE: set reduce class and the reduce output key and value classes
job.setReducerClass(WCIMCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// DONE: set job output format
job.setOutputFormatClass(OutputFormat.class);
// DONE: add the input file as job input (from HDFS)
FileInputFormat.addInputPath(job,this.inputPath);
// DONE: set the output path for the job results (to HDFS)
FileOutputFormat.setOutputPath(job,this.outputDir);
// DONE: set the number of reducers. This is optional and by default is 1
job.setNumReduceTasks(this.numReducers);
// DONE: set the jar class
job.setJarByClass(WordCountIMC.class);
return job.waitForCompletion(true) ? 0 : 1; // this will execute the job
}
public WordCountIMC (String[] args) {
if (args.length != 3) {
System.out.println("Usage: WordCountIMC <num_reducers> <input_path> <output_path>");
System.exit(0);
}
this.numReducers = Integer.parseInt(args[0]);
this.inputPath = new Path(args[1]);
this.outputDir = new Path(args[2]);
}
public static void main(String args[]) throws Exception {
int res = ToolRunner.run(new Configuration(), new WordCountIMC(args), args);
System.exit(res);
}
}
class WCIMCMapper extends Mapper<LongWritable,
// type
Text,
Text,
IntWritable> {
private IntWritable ONE = new IntWritable(1);
private Map<String,IntWritable> H;
private Text textValue = new Text();
@Override
protected void setup(Context context){
this.H = new HashMap<String, IntWritable>() ;
}
@Override
protected void map(LongWritable key,
Text text,
Context context) throws IOException, InterruptedException {
// * TODO: implement the map method (use context.write to emit results). Use
// the in-memory combiner technique
StringTokenizer iter = new StringTokenizer(text.toString());
while (iter.hasMoreTokens()) {
String word = iter.nextToken();
if (!H.containsKey(word)) {
H.put(word, ONE);
}
else {
H.put(word, new IntWritable((H.get(word).get() + 1)));
}
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
Iterator it= this.H.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String,IntWritable> pair = (Map.Entry)it.next();
textValue.set(pair.getKey().toString());
context.write(textValue,pair.getValue());
}
}
}
class WCIMCReducer extends Reducer<Text,
// type
IntWritable,
Text,
IntWritable> {
private IntWritable writableSum = new IntWritable();
@Override
protected void reduce(Text key,
Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// TODO: implement the reduce method (use context.write to emit results)
int sum = 0;
for (IntWritable value : values)
sum += value.get();
writableSum.set(sum);
context.write(key,writableSum);
}
}
}
\ No newline at end of file
Manifest-Version: 1.0
Build-Jdk: 1.7.0_75
Built-By: hrua
Created-By: IntelliJ IDEA
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment