001 /*
002 * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package de.kumpe.hadooptimizer.hadoop;
017
018 import java.io.IOException;
019
020 import org.apache.commons.math.random.RandomGenerator;
021 import org.apache.commons.math.random.Well44497b;
022 import org.apache.hadoop.conf.Configuration;
023 import org.apache.hadoop.fs.FileSystem;
024 import org.apache.hadoop.fs.Path;
025 import org.apache.hadoop.io.DoubleWritable;
026 import org.apache.hadoop.io.SequenceFile;
027 import org.apache.hadoop.io.serializer.WritableSerialization;
028 import org.apache.hadoop.mapreduce.Job;
029 import org.apache.hadoop.mapreduce.Mapper;
030 import org.apache.hadoop.mapreduce.Reducer;
031 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
032 import org.apache.hadoop.util.GenericOptionsParser;
033
034 /**
035 * Simple experiment to ensure that the keys passed to the {@link Reducer} are
036 * always sorted and not only grouped.
037 * <p>
038 * <b>This class is not part of HadoOptimizer and will be removed.</b>
039 *
040 * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a>
041 */
042 @Deprecated
043 public class HadoopTest {
044
045 public static class GenerateRandomValuesMapper extends
046 Mapper<DoubleWritable, Integer, DoubleWritable, Integer> {
047
048 private final RandomGenerator randomGenerator = new Well44497b();
049 private final Integer one = new Integer(1);
050
051 @Override
052 protected void setup(final Context context) throws IOException,
053 InterruptedException {
054 context.getCounter(HadoOptimizerBase.COUNTER_GROUP,
055 "GenerateRandomValuesMapper instances").increment(1);
056 }
057
058 @Override
059 public void map(final DoubleWritable key, final Integer value,
060 final Context context) throws IOException, InterruptedException {
061 context.getCounter(HadoOptimizerBase.COUNTER_GROUP,
062 "GenerateRandomValuesMapper reads").increment(1);
063 final int numberOfValues = randomGenerator.nextInt(10000);
064 for (int i = 0; i < numberOfValues; i++) {
065 key.set(randomGenerator.nextDouble());
066 context.getCounter(HadoOptimizerBase.COUNTER_GROUP,
067 "GenerateRandomValuesMapper writes").increment(1);
068 context.write(key, one);
069 }
070 }
071 }
072
073 public static class IntSumReducer extends
074 Reducer<DoubleWritable, Integer, DoubleWritable, Integer> {
075 private double lastKey = Double.NaN;
076
077 @Override
078 protected void setup(final Context context) throws IOException,
079 InterruptedException {
080 context.getCounter(HadoOptimizerBase.COUNTER_GROUP,
081 "IntSumReducer instances").increment(1);
082 }
083
084 @Override
085 public void reduce(final DoubleWritable key,
086 final Iterable<Integer> values, final Context context)
087 throws IOException, InterruptedException {
088 if (Double.NaN != lastKey && key.get() <= lastKey) {
089 throw new AssertionError();
090 }
091 lastKey = key.get();
092 int sum = 0;
093 for (final Integer val : values) {
094 sum += val;
095 }
096 context.write(key, sum);
097 }
098 }
099
100 public static void main(final String[] args) throws Exception {
101 final Configuration conf = new Configuration();
102 new GenericOptionsParser(conf, args);
103 conf.setStrings("io.serializations",
104 WritableSerialization.class.getName(),
105 JavaSerialization.class.getName());
106
107 final Path outputPath = new Path("testInput");
108 final SequenceFile.Writer writer = new SequenceFile.Writer(
109 FileSystem.get(conf), conf, outputPath, DoubleWritable.class,
110 Integer.class);
111 writer.append(new DoubleWritable(), 1);
112 writer.close();
113
114 final Job job = new Job(conf, "HadoopTest");
115 job.setJarByClass(HadoopTest.class);
116 job.setMapperClass(GenerateRandomValuesMapper.class);
117 // job.setCombinerClass(IntSumReducer.class);
118 job.setReducerClass(IntSumReducer.class);
119 job.setNumReduceTasks(1);
120 job.setOutputKeyClass(DoubleWritable.class);
121 job.setOutputValueClass(Integer.class);
122
123 job.setInputFormatClass(IndividualInputFormat.class);
124 IndividualInputFormat.setInputPaths(job, outputPath);
125 IndividualInputFormat.setOffspring(job, 100);
126 IndividualInputFormat.setSplits(job, 100);
127 FileOutputFormat.setOutputPath(job, new Path("testOutput"));
128 System.exit(job.waitForCompletion(true) ? 0 : 1);
129 }
130 }