001 /* 002 * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package de.kumpe.hadooptimizer.examples; 017 018 import java.io.IOException; 019 020 import org.apache.commons.math.random.RandomGenerator; 021 import org.apache.commons.math.random.Well44497b; 022 import org.apache.hadoop.conf.Configuration; 023 import org.apache.hadoop.fs.FileSystem; 024 import org.apache.hadoop.fs.Path; 025 import org.apache.hadoop.io.DoubleWritable; 026 import org.apache.hadoop.io.SequenceFile; 027 import org.apache.hadoop.io.serializer.WritableSerialization; 028 import org.apache.hadoop.mapreduce.Job; 029 import org.apache.hadoop.mapreduce.Mapper; 030 import org.apache.hadoop.mapreduce.Reducer; 031 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 032 033 import de.kumpe.hadooptimizer.RandomGeneratorFactory; 034 import de.kumpe.hadooptimizer.hadoop.IndividualInputFormat; 035 import de.kumpe.hadooptimizer.hadoop.JavaSerialization; 036 037 /** 038 * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a> 039 */ 040 @SuppressWarnings("deprecation") 041 public class HadoopTest extends OptimizerExample<Void> { 042 043 public static final class RandomGeneratorFactoryImpl implements 044 RandomGeneratorFactory { 045 private static final long serialVersionUID = 1L; 046 047 @Override 048 public RandomGenerator createRandomGenerator(final long seed) { 049 return new Well44497b(); 050 } 051 } 052 053 public static class CopyMapper<K, V> extends Mapper<K, V, K, V> { 054 055 @Override 056 public void map(final K key, final V value, final Context context) 057 throws IOException, InterruptedException { 058 context.write(key, value); 059 } 060 } 061 062 public static class CopyReducer<K, V> extends Reducer<K, V, K, V> { 063 064 @Override 065 public void reduce(final K key, final Iterable<V> values, 066 final Context context) throws IOException, InterruptedException { 067 for (final V value : values) { 068 context.write(key, value); 069 } 070 } 071 } 072 073 @Override 074 public void execute() throws Exception { 075 final long start = System.nanoTime(); 076 final Configuration conf = getConf(); 077 conf.setStrings("io.serializations", 078 WritableSerialization.class.getName(), 079 JavaSerialization.class.getName()); 080 081 final Path outputPath = new Path("testInput"); 082 083 final SequenceFile.Writer writer = new SequenceFile.Writer( 084 FileSystem.get(conf), conf, outputPath, DoubleWritable.class, 085 Integer.class); 086 for (int i = 0; i < parents; i++) { 087 writer.append(new DoubleWritable(i), i); 088 } 089 writer.close(); 090 long duration = System.nanoTime() - start; 091 System.out.println("Writing[ns]: " + duration); 092 System.out.println("Writing[s]: " + duration / 1000000000d); 093 094 final Job job = new Job(conf, "HadoopTest"); 095 job.setJarByClass(IndividualInputFormat.class); 096 job.setMapperClass(CopyMapper.class); 097 job.setReducerClass(CopyReducer.class); 098 job.setNumReduceTasks(1); 099 100 job.setInputFormatClass(IndividualInputFormat.class); 101 IndividualInputFormat.setRandomGeneratorFactory(job, 102 new RandomGeneratorFactoryImpl(), 0); 103 IndividualInputFormat.setInputPaths(job, outputPath); 104 IndividualInputFormat.setOffspring(job, offspring); 105 IndividualInputFormat.setSplits(job, offspring); 106 107 job.setOutputKeyClass(DoubleWritable.class); 108 job.setOutputValueClass(Integer.class); 109 FileOutputFormat.setOutputPath(job, new Path("testOutput")); 110 111 if (!job.waitForCompletion(true)) { 112 throw new RuntimeException("Job failed."); 113 } 114 duration = System.nanoTime() - start; 115 System.out.println("Duration[ns]: " + duration); 116 System.out.println("Duration[s]: " + duration / 1000000000d); 117 } 118 119 @Override 120 protected String getVersionInfo() { 121 return "$Id: HadoopTest.java 3890 2011-04-20 14:58:14Z baumbart $"; 122 } 123 }