001    /*
002     * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     *     http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package de.kumpe.hadooptimizer.hadoop;
017    
018    import java.io.IOException;
019    
020    import org.apache.commons.math.random.RandomGenerator;
021    import org.apache.commons.math.random.Well44497b;
022    import org.apache.hadoop.conf.Configuration;
023    import org.apache.hadoop.fs.FileSystem;
024    import org.apache.hadoop.fs.Path;
025    import org.apache.hadoop.io.DoubleWritable;
026    import org.apache.hadoop.io.SequenceFile;
027    import org.apache.hadoop.io.serializer.WritableSerialization;
028    import org.apache.hadoop.mapreduce.Job;
029    import org.apache.hadoop.mapreduce.Mapper;
030    import org.apache.hadoop.mapreduce.Reducer;
031    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
032    import org.apache.hadoop.util.GenericOptionsParser;
033    
034    /**
035     * Simple experiment to ensure that the keys passed to the {@link Reducer} are
036     * always sorted and not only grouped.
037     * <p>
038     * <b>This class is not part of HadoOptimizer and will be removed.</b>
039     * 
040     * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a>
041     */
042    @Deprecated
043    public class HadoopTest {
044    
045            public static class GenerateRandomValuesMapper extends
046                            Mapper<DoubleWritable, Integer, DoubleWritable, Integer> {
047    
048                    private final RandomGenerator randomGenerator = new Well44497b();
049                    private final Integer one = new Integer(1);
050    
051                    @Override
052                    protected void setup(final Context context) throws IOException,
053                                    InterruptedException {
054                            context.getCounter(HadoOptimizerBase.COUNTER_GROUP,
055                                            "GenerateRandomValuesMapper instances").increment(1);
056                    }
057    
058                    @Override
059                    public void map(final DoubleWritable key, final Integer value,
060                                    final Context context) throws IOException, InterruptedException {
061                            context.getCounter(HadoOptimizerBase.COUNTER_GROUP,
062                                            "GenerateRandomValuesMapper reads").increment(1);
063                            final int numberOfValues = randomGenerator.nextInt(10000);
064                            for (int i = 0; i < numberOfValues; i++) {
065                                    key.set(randomGenerator.nextDouble());
066                                    context.getCounter(HadoOptimizerBase.COUNTER_GROUP,
067                                                    "GenerateRandomValuesMapper writes").increment(1);
068                                    context.write(key, one);
069                            }
070                    }
071            }
072    
073            public static class IntSumReducer extends
074                            Reducer<DoubleWritable, Integer, DoubleWritable, Integer> {
075                    private double lastKey = Double.NaN;
076    
077                    @Override
078                    protected void setup(final Context context) throws IOException,
079                                    InterruptedException {
080                            context.getCounter(HadoOptimizerBase.COUNTER_GROUP,
081                                            "IntSumReducer instances").increment(1);
082                    }
083    
084                    @Override
085                    public void reduce(final DoubleWritable key,
086                                    final Iterable<Integer> values, final Context context)
087                                    throws IOException, InterruptedException {
088                            if (Double.NaN != lastKey && key.get() <= lastKey) {
089                                    throw new AssertionError();
090                            }
091                            lastKey = key.get();
092                            int sum = 0;
093                            for (final Integer val : values) {
094                                    sum += val;
095                            }
096                            context.write(key, sum);
097                    }
098            }
099    
100            public static void main(final String[] args) throws Exception {
101                    final Configuration conf = new Configuration();
102                    new GenericOptionsParser(conf, args);
103                    conf.setStrings("io.serializations",
104                                    WritableSerialization.class.getName(),
105                                    JavaSerialization.class.getName());
106    
107                    final Path outputPath = new Path("testInput");
108                    final SequenceFile.Writer writer = new SequenceFile.Writer(
109                                    FileSystem.get(conf), conf, outputPath, DoubleWritable.class,
110                                    Integer.class);
111                    writer.append(new DoubleWritable(), 1);
112                    writer.close();
113    
114                    final Job job = new Job(conf, "HadoopTest");
115                    job.setJarByClass(HadoopTest.class);
116                    job.setMapperClass(GenerateRandomValuesMapper.class);
117                    // job.setCombinerClass(IntSumReducer.class);
118                    job.setReducerClass(IntSumReducer.class);
119                    job.setNumReduceTasks(1);
120                    job.setOutputKeyClass(DoubleWritable.class);
121                    job.setOutputValueClass(Integer.class);
122    
123                    job.setInputFormatClass(IndividualInputFormat.class);
124                    IndividualInputFormat.setInputPaths(job, outputPath);
125                    IndividualInputFormat.setOffspring(job, 100);
126                    IndividualInputFormat.setSplits(job, 100);
127                    FileOutputFormat.setOutputPath(job, new Path("testOutput"));
128                    System.exit(job.waitForCompletion(true) ? 0 : 1);
129            }
130    }