001 /*
002 * Copied from http://hadoop.apache.org/common/docs/r0.20.0/mapred_tutorial.html
003 */
004 package de.kumpe.hadooptimizer.examples.tutorial;
005
006 import java.io.IOException;
007 import java.util.StringTokenizer;
008
009 import org.apache.hadoop.conf.Configuration;
010 import org.apache.hadoop.fs.Path;
011 import org.apache.hadoop.io.IntWritable;
012 import org.apache.hadoop.io.LongWritable;
013 import org.apache.hadoop.io.Text;
014 import org.apache.hadoop.mapreduce.Job;
015 import org.apache.hadoop.mapreduce.Mapper;
016 import org.apache.hadoop.mapreduce.Reducer;
017 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
018 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
019 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
020 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
021 import org.apache.hadoop.util.GenericOptionsParser;
022
023 /**
024 * Adapted MapReduce example from hadoop's MapReduce tutorial: <a
025 * href="http://hadoop.apache.org/common/docs/r0.20.0/mapred_tutorial.html"
026 * >http://hadoop.apache.org/common/docs/r0.20.0/mapred_tutorial.html</a>.
027 */
028 public class WordCount {
029 public static class Map extends
030 Mapper<LongWritable, Text, Text, IntWritable> {
031 private final static IntWritable one = new IntWritable(1);
032 private Text word = new Text();
033
034 @Override
035 protected void map(final LongWritable key, final Text value,
036 final Context context) throws IOException, InterruptedException {
037 final String line = value.toString();
038 final StringTokenizer tokenizer = new StringTokenizer(line);
039 while (tokenizer.hasMoreTokens()) {
040 word.set(tokenizer.nextToken());
041 context.write(word, one);
042 }
043 }
044 }
045
046 public static class Reduce extends
047 Reducer<Text, IntWritable, Text, IntWritable> {
048
049 @Override
050 protected void reduce(final Text key,
051 final Iterable<IntWritable> values, final Context context)
052 throws IOException, InterruptedException {
053 int sum = 0;
054 for (final IntWritable value : values) {
055 sum += value.get();
056 }
057 context.write(key, new IntWritable(sum));
058 }
059 }
060
061 public static void main(final String[] args) throws Exception {
062 final Configuration conf = new GenericOptionsParser(args)
063 .getConfiguration();
064 final Job job = new Job(conf);
065
066 job.setOutputKeyClass(Text.class);
067 job.setOutputValueClass(IntWritable.class);
068
069 job.setMapperClass(Map.class);
070 job.setCombinerClass(Reduce.class);
071 job.setReducerClass(Reduce.class);
072
073 job.setInputFormatClass(TextInputFormat.class);
074 job.setOutputFormatClass(TextOutputFormat.class);
075
076 FileInputFormat.setInputPaths(job, new Path(args[0]));
077 FileOutputFormat.setOutputPath(job, new Path(args[1]));
078
079 if (job.waitForCompletion(true)) {
080 return;
081 }
082 throw new RuntimeException("Job failed.");
083 }
084 }