001 /* 002 * Copied from http://hadoop.apache.org/common/docs/r0.20.0/mapred_tutorial.html 003 */ 004 package de.kumpe.hadooptimizer.examples.tutorial; 005 006 import java.io.IOException; 007 import java.util.StringTokenizer; 008 009 import org.apache.hadoop.conf.Configuration; 010 import org.apache.hadoop.fs.Path; 011 import org.apache.hadoop.io.IntWritable; 012 import org.apache.hadoop.io.LongWritable; 013 import org.apache.hadoop.io.Text; 014 import org.apache.hadoop.mapreduce.Job; 015 import org.apache.hadoop.mapreduce.Mapper; 016 import org.apache.hadoop.mapreduce.Reducer; 017 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 018 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 019 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 020 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 021 import org.apache.hadoop.util.GenericOptionsParser; 022 023 /** 024 * Adapted MapReduce example from hadoop's MapReduce tutorial: <a 025 * href="http://hadoop.apache.org/common/docs/r0.20.0/mapred_tutorial.html" 026 * >http://hadoop.apache.org/common/docs/r0.20.0/mapred_tutorial.html</a>. 027 */ 028 public class WordCount { 029 public static class Map extends 030 Mapper<LongWritable, Text, Text, IntWritable> { 031 private final static IntWritable one = new IntWritable(1); 032 private Text word = new Text(); 033 034 @Override 035 protected void map(final LongWritable key, final Text value, 036 final Context context) throws IOException, InterruptedException { 037 final String line = value.toString(); 038 final StringTokenizer tokenizer = new StringTokenizer(line); 039 while (tokenizer.hasMoreTokens()) { 040 word.set(tokenizer.nextToken()); 041 context.write(word, one); 042 } 043 } 044 } 045 046 public static class Reduce extends 047 Reducer<Text, IntWritable, Text, IntWritable> { 048 049 @Override 050 protected void reduce(final Text key, 051 final Iterable<IntWritable> values, final Context context) 052 throws IOException, InterruptedException { 053 int sum = 0; 054 for (final IntWritable value : values) { 055 sum += value.get(); 056 } 057 context.write(key, new IntWritable(sum)); 058 } 059 } 060 061 public static void main(final String[] args) throws Exception { 062 final Configuration conf = new GenericOptionsParser(args) 063 .getConfiguration(); 064 final Job job = new Job(conf); 065 066 job.setOutputKeyClass(Text.class); 067 job.setOutputValueClass(IntWritable.class); 068 069 job.setMapperClass(Map.class); 070 job.setCombinerClass(Reduce.class); 071 job.setReducerClass(Reduce.class); 072 073 job.setInputFormatClass(TextInputFormat.class); 074 job.setOutputFormatClass(TextOutputFormat.class); 075 076 FileInputFormat.setInputPaths(job, new Path(args[0])); 077 FileOutputFormat.setOutputPath(job, new Path(args[1])); 078 079 if (job.waitForCompletion(true)) { 080 return; 081 } 082 throw new RuntimeException("Job failed."); 083 } 084 }