001 /* 002 * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package de.kumpe.hadooptimizer.hadoop; 017 018 import java.io.IOException; 019 020 import org.apache.commons.math.random.RandomGenerator; 021 import org.apache.commons.math.random.Well44497b; 022 import org.apache.hadoop.conf.Configuration; 023 import org.apache.hadoop.fs.FileSystem; 024 import org.apache.hadoop.fs.Path; 025 import org.apache.hadoop.io.DoubleWritable; 026 import org.apache.hadoop.io.SequenceFile; 027 import org.apache.hadoop.io.serializer.WritableSerialization; 028 import org.apache.hadoop.mapreduce.Job; 029 import org.apache.hadoop.mapreduce.Mapper; 030 import org.apache.hadoop.mapreduce.Reducer; 031 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 032 import org.apache.hadoop.util.GenericOptionsParser; 033 034 /** 035 * Simple experiment to ensure that the keys passed to the {@link Reducer} are 036 * always sorted and not only grouped. 037 * <p> 038 * <b>This class is not part of HadoOptimizer and will be removed.</b> 039 * 040 * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a> 041 */ 042 @Deprecated 043 public class HadoopTest { 044 045 public static class GenerateRandomValuesMapper extends 046 Mapper<DoubleWritable, Integer, DoubleWritable, Integer> { 047 048 private final RandomGenerator randomGenerator = new Well44497b(); 049 private final Integer one = new Integer(1); 050 051 @Override 052 protected void setup(final Context context) throws IOException, 053 InterruptedException { 054 context.getCounter(HadoOptimizerBase.COUNTER_GROUP, 055 "GenerateRandomValuesMapper instances").increment(1); 056 } 057 058 @Override 059 public void map(final DoubleWritable key, final Integer value, 060 final Context context) throws IOException, InterruptedException { 061 context.getCounter(HadoOptimizerBase.COUNTER_GROUP, 062 "GenerateRandomValuesMapper reads").increment(1); 063 final int numberOfValues = randomGenerator.nextInt(10000); 064 for (int i = 0; i < numberOfValues; i++) { 065 key.set(randomGenerator.nextDouble()); 066 context.getCounter(HadoOptimizerBase.COUNTER_GROUP, 067 "GenerateRandomValuesMapper writes").increment(1); 068 context.write(key, one); 069 } 070 } 071 } 072 073 public static class IntSumReducer extends 074 Reducer<DoubleWritable, Integer, DoubleWritable, Integer> { 075 private double lastKey = Double.NaN; 076 077 @Override 078 protected void setup(final Context context) throws IOException, 079 InterruptedException { 080 context.getCounter(HadoOptimizerBase.COUNTER_GROUP, 081 "IntSumReducer instances").increment(1); 082 } 083 084 @Override 085 public void reduce(final DoubleWritable key, 086 final Iterable<Integer> values, final Context context) 087 throws IOException, InterruptedException { 088 if (Double.NaN != lastKey && key.get() <= lastKey) { 089 throw new AssertionError(); 090 } 091 lastKey = key.get(); 092 int sum = 0; 093 for (final Integer val : values) { 094 sum += val; 095 } 096 context.write(key, sum); 097 } 098 } 099 100 public static void main(final String[] args) throws Exception { 101 final Configuration conf = new Configuration(); 102 new GenericOptionsParser(conf, args); 103 conf.setStrings("io.serializations", 104 WritableSerialization.class.getName(), 105 JavaSerialization.class.getName()); 106 107 final Path outputPath = new Path("testInput"); 108 final SequenceFile.Writer writer = new SequenceFile.Writer( 109 FileSystem.get(conf), conf, outputPath, DoubleWritable.class, 110 Integer.class); 111 writer.append(new DoubleWritable(), 1); 112 writer.close(); 113 114 final Job job = new Job(conf, "HadoopTest"); 115 job.setJarByClass(HadoopTest.class); 116 job.setMapperClass(GenerateRandomValuesMapper.class); 117 // job.setCombinerClass(IntSumReducer.class); 118 job.setReducerClass(IntSumReducer.class); 119 job.setNumReduceTasks(1); 120 job.setOutputKeyClass(DoubleWritable.class); 121 job.setOutputValueClass(Integer.class); 122 123 job.setInputFormatClass(IndividualInputFormat.class); 124 IndividualInputFormat.setInputPaths(job, outputPath); 125 IndividualInputFormat.setOffspring(job, 100); 126 IndividualInputFormat.setSplits(job, 100); 127 FileOutputFormat.setOutputPath(job, new Path("testOutput")); 128 System.exit(job.waitForCompletion(true) ? 0 : 1); 129 } 130 }