001    /*
002     * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     *     http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package de.kumpe.hadooptimizer.examples;
017    
018    import java.io.IOException;
019    
020    import org.apache.commons.math.random.RandomGenerator;
021    import org.apache.commons.math.random.Well44497b;
022    import org.apache.hadoop.conf.Configuration;
023    import org.apache.hadoop.fs.FileSystem;
024    import org.apache.hadoop.fs.Path;
025    import org.apache.hadoop.io.DoubleWritable;
026    import org.apache.hadoop.io.SequenceFile;
027    import org.apache.hadoop.io.serializer.WritableSerialization;
028    import org.apache.hadoop.mapreduce.Job;
029    import org.apache.hadoop.mapreduce.Mapper;
030    import org.apache.hadoop.mapreduce.Reducer;
031    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
032    
033    import de.kumpe.hadooptimizer.RandomGeneratorFactory;
034    import de.kumpe.hadooptimizer.hadoop.IndividualInputFormat;
035    import de.kumpe.hadooptimizer.hadoop.JavaSerialization;
036    
037    /**
038     * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a>
039     */
040    @SuppressWarnings("deprecation")
041    public class HadoopTest extends OptimizerExample<Void> {
042    
043            public static final class RandomGeneratorFactoryImpl implements
044                            RandomGeneratorFactory {
045                    private static final long serialVersionUID = 1L;
046    
047                    @Override
048                    public RandomGenerator createRandomGenerator(final long seed) {
049                            return new Well44497b();
050                    }
051            }
052    
053            public static class CopyMapper<K, V> extends Mapper<K, V, K, V> {
054    
055                    @Override
056                    public void map(final K key, final V value, final Context context)
057                                    throws IOException, InterruptedException {
058                            context.write(key, value);
059                    }
060            }
061    
062            public static class CopyReducer<K, V> extends Reducer<K, V, K, V> {
063    
064                    @Override
065                    public void reduce(final K key, final Iterable<V> values,
066                                    final Context context) throws IOException, InterruptedException {
067                            for (final V value : values) {
068                                    context.write(key, value);
069                            }
070                    }
071            }
072    
073            @Override
074            public void execute() throws Exception {
075                    final long start = System.nanoTime();
076                    final Configuration conf = getConf();
077                    conf.setStrings("io.serializations",
078                                    WritableSerialization.class.getName(),
079                                    JavaSerialization.class.getName());
080    
081                    final Path outputPath = new Path("testInput");
082    
083                    final SequenceFile.Writer writer = new SequenceFile.Writer(
084                                    FileSystem.get(conf), conf, outputPath, DoubleWritable.class,
085                                    Integer.class);
086                    for (int i = 0; i < parents; i++) {
087                            writer.append(new DoubleWritable(i), i);
088                    }
089                    writer.close();
090                    long duration = System.nanoTime() - start;
091                    System.out.println("Writing[ns]: " + duration);
092                    System.out.println("Writing[s]: " + duration / 1000000000d);
093    
094                    final Job job = new Job(conf, "HadoopTest");
095                    job.setJarByClass(IndividualInputFormat.class);
096                    job.setMapperClass(CopyMapper.class);
097                    job.setReducerClass(CopyReducer.class);
098                    job.setNumReduceTasks(1);
099    
100                    job.setInputFormatClass(IndividualInputFormat.class);
101                    IndividualInputFormat.setRandomGeneratorFactory(job,
102                                    new RandomGeneratorFactoryImpl(), 0);
103                    IndividualInputFormat.setInputPaths(job, outputPath);
104                    IndividualInputFormat.setOffspring(job, offspring);
105                    IndividualInputFormat.setSplits(job, offspring);
106    
107                    job.setOutputKeyClass(DoubleWritable.class);
108                    job.setOutputValueClass(Integer.class);
109                    FileOutputFormat.setOutputPath(job, new Path("testOutput"));
110    
111                    if (!job.waitForCompletion(true)) {
112                            throw new RuntimeException("Job failed.");
113                    }
114                    duration = System.nanoTime() - start;
115                    System.out.println("Duration[ns]: " + duration);
116                    System.out.println("Duration[s]: " + duration / 1000000000d);
117            }
118    
119            @Override
120            protected String getVersionInfo() {
121                    return "$Id: HadoopTest.java 3890 2011-04-20 14:58:14Z baumbart $";
122            }
123    }