001 /*
002 * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package de.kumpe.hadooptimizer.examples;
017
018 import java.io.IOException;
019
020 import org.apache.commons.math.random.RandomGenerator;
021 import org.apache.commons.math.random.Well44497b;
022 import org.apache.hadoop.conf.Configuration;
023 import org.apache.hadoop.fs.FileSystem;
024 import org.apache.hadoop.fs.Path;
025 import org.apache.hadoop.io.DoubleWritable;
026 import org.apache.hadoop.io.SequenceFile;
027 import org.apache.hadoop.io.serializer.WritableSerialization;
028 import org.apache.hadoop.mapreduce.Job;
029 import org.apache.hadoop.mapreduce.Mapper;
030 import org.apache.hadoop.mapreduce.Reducer;
031 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
032
033 import de.kumpe.hadooptimizer.RandomGeneratorFactory;
034 import de.kumpe.hadooptimizer.hadoop.IndividualInputFormat;
035 import de.kumpe.hadooptimizer.hadoop.JavaSerialization;
036
037 /**
038 * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a>
039 */
040 @SuppressWarnings("deprecation")
041 public class HadoopTest extends OptimizerExample<Void> {
042
043 public static final class RandomGeneratorFactoryImpl implements
044 RandomGeneratorFactory {
045 private static final long serialVersionUID = 1L;
046
047 @Override
048 public RandomGenerator createRandomGenerator(final long seed) {
049 return new Well44497b();
050 }
051 }
052
053 public static class CopyMapper<K, V> extends Mapper<K, V, K, V> {
054
055 @Override
056 public void map(final K key, final V value, final Context context)
057 throws IOException, InterruptedException {
058 context.write(key, value);
059 }
060 }
061
062 public static class CopyReducer<K, V> extends Reducer<K, V, K, V> {
063
064 @Override
065 public void reduce(final K key, final Iterable<V> values,
066 final Context context) throws IOException, InterruptedException {
067 for (final V value : values) {
068 context.write(key, value);
069 }
070 }
071 }
072
073 @Override
074 public void execute() throws Exception {
075 final long start = System.nanoTime();
076 final Configuration conf = getConf();
077 conf.setStrings("io.serializations",
078 WritableSerialization.class.getName(),
079 JavaSerialization.class.getName());
080
081 final Path outputPath = new Path("testInput");
082
083 final SequenceFile.Writer writer = new SequenceFile.Writer(
084 FileSystem.get(conf), conf, outputPath, DoubleWritable.class,
085 Integer.class);
086 for (int i = 0; i < parents; i++) {
087 writer.append(new DoubleWritable(i), i);
088 }
089 writer.close();
090 long duration = System.nanoTime() - start;
091 System.out.println("Writing[ns]: " + duration);
092 System.out.println("Writing[s]: " + duration / 1000000000d);
093
094 final Job job = new Job(conf, "HadoopTest");
095 job.setJarByClass(IndividualInputFormat.class);
096 job.setMapperClass(CopyMapper.class);
097 job.setReducerClass(CopyReducer.class);
098 job.setNumReduceTasks(1);
099
100 job.setInputFormatClass(IndividualInputFormat.class);
101 IndividualInputFormat.setRandomGeneratorFactory(job,
102 new RandomGeneratorFactoryImpl(), 0);
103 IndividualInputFormat.setInputPaths(job, outputPath);
104 IndividualInputFormat.setOffspring(job, offspring);
105 IndividualInputFormat.setSplits(job, offspring);
106
107 job.setOutputKeyClass(DoubleWritable.class);
108 job.setOutputValueClass(Integer.class);
109 FileOutputFormat.setOutputPath(job, new Path("testOutput"));
110
111 if (!job.waitForCompletion(true)) {
112 throw new RuntimeException("Job failed.");
113 }
114 duration = System.nanoTime() - start;
115 System.out.println("Duration[ns]: " + duration);
116 System.out.println("Duration[s]: " + duration / 1000000000d);
117 }
118
119 @Override
120 protected String getVersionInfo() {
121 return "$Id: HadoopTest.java 3890 2011-04-20 14:58:14Z baumbart $";
122 }
123 }