001 /*
002 * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package de.kumpe.hadooptimizer.hadoop;
017
018 import java.io.IOException;
019 import java.io.Serializable;
020 import java.util.ArrayList;
021 import java.util.Collection;
022 import java.util.Iterator;
023 import java.util.List;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.commons.math.random.RandomGenerator;
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.hadoop.fs.Path;
030 import org.apache.hadoop.io.DoubleWritable;
031 import org.apache.hadoop.mapreduce.InputFormat;
032 import org.apache.hadoop.mapreduce.InputSplit;
033 import org.apache.hadoop.mapreduce.Job;
034 import org.apache.hadoop.mapreduce.JobContext;
035 import org.apache.hadoop.mapreduce.RecordReader;
036 import org.apache.hadoop.mapreduce.TaskAttemptContext;
037 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
038 import org.apache.hadoop.util.ReflectionUtils;
039
040 import de.kumpe.hadooptimizer.RandomGeneratorFactory;
041
042 /**
043 * A special {@link InputFormat} implementation to control the distribution of
044 * the individuals for mutation and evaluation.
045 *
046 * @param <I>
047 * the individuals's type
048 *
049 * @see EaHadoOptimizer
050 * @see EsHadoOptimizer
051 *
052 * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a>
053 */
054 public final class IndividualInputFormat<I extends Serializable> extends
055 InputFormat<DoubleWritable, I> {
056 private static final long serialVersionUID = 1L;
057 private static final Log log = LogFactory
058 .getLog(IndividualInputFormat.class);
059
060 private static final String[] EMPTY_STRING_ARRAY = new String[0];
061
062 public final static class IndividualInputSplit<I extends Serializable>
063 extends InputSplit implements Serializable, HasSeed {
064 private static final long serialVersionUID = 1L;
065
066 private final Collection<I> individuals;
067 private final long seed;
068
069 public IndividualInputSplit(final Collection<I> individuals,
070 final long seed) {
071 this.individuals = individuals;
072 this.seed = seed;
073 }
074
075 @Override
076 public long getSeed() {
077 return seed;
078 }
079
080 @Override
081 public long getLength() throws IOException, InterruptedException {
082 return individuals.size();
083 }
084
085 @Override
086 public String[] getLocations() throws IOException, InterruptedException {
087 return EMPTY_STRING_ARRAY;
088 }
089 }
090
091 private final static class IndividualRecordReader<I extends Serializable>
092 extends RecordReader<DoubleWritable, I> {
093 private Iterator<I> indivudualIterator;
094 private int size;
095 private I currentValue;
096 private int counter;
097
098 @Override
099 public void initialize(final InputSplit split,
100 final TaskAttemptContext context) throws IOException,
101 InterruptedException {
102 @SuppressWarnings("unchecked")
103 final IndividualInputSplit<I> indidualInputSplit = (IndividualInputSplit<I>) split;
104 indivudualIterator = indidualInputSplit.individuals.iterator();
105 size = indidualInputSplit.individuals.size();
106 currentValue = null;
107 counter = 0;
108 }
109
110 @Override
111 public boolean nextKeyValue() throws IOException, InterruptedException {
112 if (indivudualIterator.hasNext()) {
113 currentValue = indivudualIterator.next();
114 counter++;
115 return true;
116 }
117 return false;
118 }
119
120 @Override
121 public DoubleWritable getCurrentKey() throws IOException,
122 InterruptedException {
123 if (null != currentValue) {
124 return new DoubleWritable(Double.NaN);
125 }
126 return null;
127 }
128
129 @Override
130 public I getCurrentValue() throws IOException, InterruptedException {
131 return currentValue;
132 }
133
134 @Override
135 public float getProgress() throws IOException, InterruptedException {
136 return (float) counter / size;
137 }
138
139 @Override
140 public void close() throws IOException {
141 indivudualIterator = null;
142 currentValue = null;
143 }
144 }
145
146 private static final String OFFSPRING = "de.kumpe.hadooptimizer.hadoop.IndividualInputFormat.offspring";
147 private static final String SPLITS = "de.kumpe.hadooptimizer.hadoop.IndividualInputFormat.splits";
148 private static final String RANDOM_GENERATOR_FACTORY = "de.kumpe.hadooptimizer.hadoop.IndividualInputFormat.randomGeneratorFactory";
149 private static final String SEED = "de.kumpe.hadooptimizer.hadoop.IndividualInputFormat.seed";
150 private static final String RANDOM_CHOICE = "de.kumpe.hadooptimizer.hadoop.IndividualInputFormat.randomChoice";
151
152 public static void setInputPaths(final Job job, final Path... inputPaths)
153 throws IOException {
154 FileInputFormat.setInputPaths(job, inputPaths);
155 }
156
157 public static void setOffspring(final Job job, final int duplicationFactor) {
158 job.getConfiguration().setInt(OFFSPRING, duplicationFactor);
159 }
160
161 public static void setSplits(final Job job, final int maxNrOfSplits) {
162 job.getConfiguration().setInt(SPLITS, maxNrOfSplits);
163 }
164
165 public static void setRandomGeneratorFactory(final Job job,
166 final RandomGeneratorFactory randomGeneratorFactory, final long seed) {
167 job.getConfiguration()
168 .setClass(RANDOM_GENERATOR_FACTORY,
169 randomGeneratorFactory.getClass(),
170 RandomGeneratorFactory.class);
171 job.getConfiguration().setLong(SEED, seed);
172 }
173
174 public static void setRandomChoice(final Job job, final boolean randomChoice) {
175 job.getConfiguration().setBoolean(RANDOM_CHOICE, randomChoice);
176 }
177
178 @Override
179 public List<InputSplit> getSplits(final JobContext context)
180 throws IOException, InterruptedException {
181 final long start = System.nanoTime();
182 final Configuration conf = context.getConfiguration();
183
184 final RandomGenerator randomGenerator = getRandomGenerator(conf);
185
186 final I[] individualsAsArray = readIndividuals(context);
187
188 final int offspring = conf.getInt(OFFSPRING, 1);
189 final int splits = conf.getInt(SPLITS, offspring);
190 final int recordsPerSplit = offspring / splits;
191 int remainingRecords = offspring % splits;
192
193 final boolean randomChoice = conf.getBoolean(RANDOM_CHOICE, false);
194
195 final int nrOfIndividuals = individualsAsArray.length;
196 int currentIndividual = -1;
197
198 final List<InputSplit> result = new ArrayList<InputSplit>();
199
200 List<I> currentSplit = new ArrayList<I>();
201
202 for (int i = 0; i < offspring; i++) {
203 if (currentSplit.size() == recordsPerSplit && 0 < remainingRecords) {
204 remainingRecords--;
205 } else if (currentSplit.size() >= recordsPerSplit) {
206 result.add(new IndividualInputSplit<I>(currentSplit,
207 randomGenerator.nextLong()));
208 currentSplit = new ArrayList<I>();
209 }
210
211 if (randomChoice) {
212 currentIndividual = randomGenerator.nextInt(nrOfIndividuals);
213 } else {
214 currentIndividual = (currentIndividual + 1) % nrOfIndividuals;
215 }
216
217 currentSplit.add(individualsAsArray[currentIndividual]);
218 }
219 result.add(new IndividualInputSplit<I>(currentSplit, randomGenerator
220 .nextLong()));
221
222 if (log.isTraceEnabled()) {
223 final long duration = System.nanoTime() - start;
224 log.trace(String.format("Time for creating splits %dns => %.1fs",
225 duration, duration / 1000000000d));
226 }
227 return result;
228 }
229
230 @SuppressWarnings("unchecked")
231 private I[] readIndividuals(final JobContext context) throws IOException {
232 final Path[] inputs = FileInputFormat.getInputPaths(context);
233 final List<I> individuals = new ArrayList<I>();
234 new IndividualReader<I>(individuals).readIndivuals(
235 context.getConfiguration(), inputs);
236
237 return (I[]) individuals.toArray(new Serializable[individuals.size()]);
238 }
239
240 private RandomGenerator getRandomGenerator(final Configuration conf) {
241 final Class<? extends RandomGeneratorFactory> randomGeneratorFactoryClass = conf
242 .getClass(RANDOM_GENERATOR_FACTORY, null,
243 RandomGeneratorFactory.class);
244 if (null == randomGeneratorFactoryClass) {
245 throw new NullPointerException(
246 "A RandomGeneratorFactory has to be configured for IndividualInputFormat");
247 }
248 final RandomGeneratorFactory randomGeneratorFactory = ReflectionUtils
249 .newInstance(randomGeneratorFactoryClass, conf);
250 final long seed = conf.getLong(SEED, 0);
251 final RandomGenerator randomGenerator = randomGeneratorFactory
252 .createRandomGenerator(seed);
253 return randomGenerator;
254 }
255
256 @Override
257 public RecordReader<DoubleWritable, I> createRecordReader(
258 final InputSplit split, final TaskAttemptContext context)
259 throws IOException, InterruptedException {
260 return new IndividualRecordReader<I>();
261 }
262 }