001 /* 002 * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package de.kumpe.hadooptimizer.hadoop; 017 018 import java.io.IOException; 019 import java.io.Serializable; 020 import java.util.ArrayList; 021 import java.util.Collection; 022 import java.util.Iterator; 023 import java.util.List; 024 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.apache.commons.math.random.RandomGenerator; 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.hadoop.fs.Path; 030 import org.apache.hadoop.io.DoubleWritable; 031 import org.apache.hadoop.mapreduce.InputFormat; 032 import org.apache.hadoop.mapreduce.InputSplit; 033 import org.apache.hadoop.mapreduce.Job; 034 import org.apache.hadoop.mapreduce.JobContext; 035 import org.apache.hadoop.mapreduce.RecordReader; 036 import org.apache.hadoop.mapreduce.TaskAttemptContext; 037 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 038 import org.apache.hadoop.util.ReflectionUtils; 039 040 import de.kumpe.hadooptimizer.RandomGeneratorFactory; 041 042 /** 043 * A special {@link InputFormat} implementation to control the distribution of 044 * the individuals for mutation and evaluation. 045 * 046 * @param <I> 047 * the individuals's type 048 * 049 * @see EaHadoOptimizer 050 * @see EsHadoOptimizer 051 * 052 * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a> 053 */ 054 public final class IndividualInputFormat<I extends Serializable> extends 055 InputFormat<DoubleWritable, I> { 056 private static final long serialVersionUID = 1L; 057 private static final Log log = LogFactory 058 .getLog(IndividualInputFormat.class); 059 060 private static final String[] EMPTY_STRING_ARRAY = new String[0]; 061 062 public final static class IndividualInputSplit<I extends Serializable> 063 extends InputSplit implements Serializable, HasSeed { 064 private static final long serialVersionUID = 1L; 065 066 private final Collection<I> individuals; 067 private final long seed; 068 069 public IndividualInputSplit(final Collection<I> individuals, 070 final long seed) { 071 this.individuals = individuals; 072 this.seed = seed; 073 } 074 075 @Override 076 public long getSeed() { 077 return seed; 078 } 079 080 @Override 081 public long getLength() throws IOException, InterruptedException { 082 return individuals.size(); 083 } 084 085 @Override 086 public String[] getLocations() throws IOException, InterruptedException { 087 return EMPTY_STRING_ARRAY; 088 } 089 } 090 091 private final static class IndividualRecordReader<I extends Serializable> 092 extends RecordReader<DoubleWritable, I> { 093 private Iterator<I> indivudualIterator; 094 private int size; 095 private I currentValue; 096 private int counter; 097 098 @Override 099 public void initialize(final InputSplit split, 100 final TaskAttemptContext context) throws IOException, 101 InterruptedException { 102 @SuppressWarnings("unchecked") 103 final IndividualInputSplit<I> indidualInputSplit = (IndividualInputSplit<I>) split; 104 indivudualIterator = indidualInputSplit.individuals.iterator(); 105 size = indidualInputSplit.individuals.size(); 106 currentValue = null; 107 counter = 0; 108 } 109 110 @Override 111 public boolean nextKeyValue() throws IOException, InterruptedException { 112 if (indivudualIterator.hasNext()) { 113 currentValue = indivudualIterator.next(); 114 counter++; 115 return true; 116 } 117 return false; 118 } 119 120 @Override 121 public DoubleWritable getCurrentKey() throws IOException, 122 InterruptedException { 123 if (null != currentValue) { 124 return new DoubleWritable(Double.NaN); 125 } 126 return null; 127 } 128 129 @Override 130 public I getCurrentValue() throws IOException, InterruptedException { 131 return currentValue; 132 } 133 134 @Override 135 public float getProgress() throws IOException, InterruptedException { 136 return (float) counter / size; 137 } 138 139 @Override 140 public void close() throws IOException { 141 indivudualIterator = null; 142 currentValue = null; 143 } 144 } 145 146 private static final String OFFSPRING = "de.kumpe.hadooptimizer.hadoop.IndividualInputFormat.offspring"; 147 private static final String SPLITS = "de.kumpe.hadooptimizer.hadoop.IndividualInputFormat.splits"; 148 private static final String RANDOM_GENERATOR_FACTORY = "de.kumpe.hadooptimizer.hadoop.IndividualInputFormat.randomGeneratorFactory"; 149 private static final String SEED = "de.kumpe.hadooptimizer.hadoop.IndividualInputFormat.seed"; 150 private static final String RANDOM_CHOICE = "de.kumpe.hadooptimizer.hadoop.IndividualInputFormat.randomChoice"; 151 152 public static void setInputPaths(final Job job, final Path... inputPaths) 153 throws IOException { 154 FileInputFormat.setInputPaths(job, inputPaths); 155 } 156 157 public static void setOffspring(final Job job, final int duplicationFactor) { 158 job.getConfiguration().setInt(OFFSPRING, duplicationFactor); 159 } 160 161 public static void setSplits(final Job job, final int maxNrOfSplits) { 162 job.getConfiguration().setInt(SPLITS, maxNrOfSplits); 163 } 164 165 public static void setRandomGeneratorFactory(final Job job, 166 final RandomGeneratorFactory randomGeneratorFactory, final long seed) { 167 job.getConfiguration() 168 .setClass(RANDOM_GENERATOR_FACTORY, 169 randomGeneratorFactory.getClass(), 170 RandomGeneratorFactory.class); 171 job.getConfiguration().setLong(SEED, seed); 172 } 173 174 public static void setRandomChoice(final Job job, final boolean randomChoice) { 175 job.getConfiguration().setBoolean(RANDOM_CHOICE, randomChoice); 176 } 177 178 @Override 179 public List<InputSplit> getSplits(final JobContext context) 180 throws IOException, InterruptedException { 181 final long start = System.nanoTime(); 182 final Configuration conf = context.getConfiguration(); 183 184 final RandomGenerator randomGenerator = getRandomGenerator(conf); 185 186 final I[] individualsAsArray = readIndividuals(context); 187 188 final int offspring = conf.getInt(OFFSPRING, 1); 189 final int splits = conf.getInt(SPLITS, offspring); 190 final int recordsPerSplit = offspring / splits; 191 int remainingRecords = offspring % splits; 192 193 final boolean randomChoice = conf.getBoolean(RANDOM_CHOICE, false); 194 195 final int nrOfIndividuals = individualsAsArray.length; 196 int currentIndividual = -1; 197 198 final List<InputSplit> result = new ArrayList<InputSplit>(); 199 200 List<I> currentSplit = new ArrayList<I>(); 201 202 for (int i = 0; i < offspring; i++) { 203 if (currentSplit.size() == recordsPerSplit && 0 < remainingRecords) { 204 remainingRecords--; 205 } else if (currentSplit.size() >= recordsPerSplit) { 206 result.add(new IndividualInputSplit<I>(currentSplit, 207 randomGenerator.nextLong())); 208 currentSplit = new ArrayList<I>(); 209 } 210 211 if (randomChoice) { 212 currentIndividual = randomGenerator.nextInt(nrOfIndividuals); 213 } else { 214 currentIndividual = (currentIndividual + 1) % nrOfIndividuals; 215 } 216 217 currentSplit.add(individualsAsArray[currentIndividual]); 218 } 219 result.add(new IndividualInputSplit<I>(currentSplit, randomGenerator 220 .nextLong())); 221 222 if (log.isTraceEnabled()) { 223 final long duration = System.nanoTime() - start; 224 log.trace(String.format("Time for creating splits %dns => %.1fs", 225 duration, duration / 1000000000d)); 226 } 227 return result; 228 } 229 230 @SuppressWarnings("unchecked") 231 private I[] readIndividuals(final JobContext context) throws IOException { 232 final Path[] inputs = FileInputFormat.getInputPaths(context); 233 final List<I> individuals = new ArrayList<I>(); 234 new IndividualReader<I>(individuals).readIndivuals( 235 context.getConfiguration(), inputs); 236 237 return (I[]) individuals.toArray(new Serializable[individuals.size()]); 238 } 239 240 private RandomGenerator getRandomGenerator(final Configuration conf) { 241 final Class<? extends RandomGeneratorFactory> randomGeneratorFactoryClass = conf 242 .getClass(RANDOM_GENERATOR_FACTORY, null, 243 RandomGeneratorFactory.class); 244 if (null == randomGeneratorFactoryClass) { 245 throw new NullPointerException( 246 "A RandomGeneratorFactory has to be configured for IndividualInputFormat"); 247 } 248 final RandomGeneratorFactory randomGeneratorFactory = ReflectionUtils 249 .newInstance(randomGeneratorFactoryClass, conf); 250 final long seed = conf.getLong(SEED, 0); 251 final RandomGenerator randomGenerator = randomGeneratorFactory 252 .createRandomGenerator(seed); 253 return randomGenerator; 254 } 255 256 @Override 257 public RecordReader<DoubleWritable, I> createRecordReader( 258 final InputSplit split, final TaskAttemptContext context) 259 throws IOException, InterruptedException { 260 return new IndividualRecordReader<I>(); 261 } 262 }