001    /*
002     * Copied from http://hadoop.apache.org/common/docs/r0.20.0/mapred_tutorial.html
003     */
004    package de.kumpe.hadooptimizer.examples.tutorial;
005    
006    import java.io.IOException;
007    import java.util.StringTokenizer;
008    
009    import org.apache.hadoop.conf.Configuration;
010    import org.apache.hadoop.fs.Path;
011    import org.apache.hadoop.io.IntWritable;
012    import org.apache.hadoop.io.LongWritable;
013    import org.apache.hadoop.io.Text;
014    import org.apache.hadoop.mapreduce.Job;
015    import org.apache.hadoop.mapreduce.Mapper;
016    import org.apache.hadoop.mapreduce.Reducer;
017    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
018    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
019    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
020    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
021    import org.apache.hadoop.util.GenericOptionsParser;
022    
023    /**
024     * Adapted MapReduce example from hadoop's MapReduce tutorial: <a
025     * href="http://hadoop.apache.org/common/docs/r0.20.0/mapred_tutorial.html"
026     * >http://hadoop.apache.org/common/docs/r0.20.0/mapred_tutorial.html</a>.
027     */
028    public class WordCount {
029            public static class Map extends
030                            Mapper<LongWritable, Text, Text, IntWritable> {
031                    private final static IntWritable one = new IntWritable(1);
032                    private Text word = new Text();
033    
034                    @Override
035                    protected void map(final LongWritable key, final Text value,
036                                    final Context context) throws IOException, InterruptedException {
037                            final String line = value.toString();
038                            final StringTokenizer tokenizer = new StringTokenizer(line);
039                            while (tokenizer.hasMoreTokens()) {
040                                    word.set(tokenizer.nextToken());
041                                    context.write(word, one);
042                            }
043                    }
044            }
045    
046            public static class Reduce extends
047                            Reducer<Text, IntWritable, Text, IntWritable> {
048    
049                    @Override
050                    protected void reduce(final Text key,
051                                    final Iterable<IntWritable> values, final Context context)
052                                    throws IOException, InterruptedException {
053                            int sum = 0;
054                            for (final IntWritable value : values) {
055                                    sum += value.get();
056                            }
057                            context.write(key, new IntWritable(sum));
058                    }
059            }
060    
061            public static void main(final String[] args) throws Exception {
062                    final Configuration conf = new GenericOptionsParser(args)
063                                    .getConfiguration();
064                    final Job job = new Job(conf);
065    
066                    job.setOutputKeyClass(Text.class);
067                    job.setOutputValueClass(IntWritable.class);
068    
069                    job.setMapperClass(Map.class);
070                    job.setCombinerClass(Reduce.class);
071                    job.setReducerClass(Reduce.class);
072    
073                    job.setInputFormatClass(TextInputFormat.class);
074                    job.setOutputFormatClass(TextOutputFormat.class);
075    
076                    FileInputFormat.setInputPaths(job, new Path(args[0]));
077                    FileOutputFormat.setOutputPath(job, new Path(args[1]));
078    
079                    if (job.waitForCompletion(true)) {
080                            return;
081                    }
082                    throw new RuntimeException("Job failed.");
083            }
084    }