001    /*
002     * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     *     http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package de.kumpe.hadooptimizer.hadoop;
017    
018    import java.io.IOException;
019    import java.io.Serializable;
020    import java.util.ArrayList;
021    import java.util.Collection;
022    import java.util.Iterator;
023    import java.util.List;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.commons.math.random.RandomGenerator;
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.hadoop.fs.Path;
030    import org.apache.hadoop.io.DoubleWritable;
031    import org.apache.hadoop.mapreduce.InputFormat;
032    import org.apache.hadoop.mapreduce.InputSplit;
033    import org.apache.hadoop.mapreduce.Job;
034    import org.apache.hadoop.mapreduce.JobContext;
035    import org.apache.hadoop.mapreduce.RecordReader;
036    import org.apache.hadoop.mapreduce.TaskAttemptContext;
037    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
038    import org.apache.hadoop.util.ReflectionUtils;
039    
040    import de.kumpe.hadooptimizer.RandomGeneratorFactory;
041    
042    /**
043     * A special {@link InputFormat} implementation to control the distribution of
044     * the individuals for mutation and evaluation.
045     * 
046     * @param <I>
047     *            the individuals's type
048     * 
049     * @see EaHadoOptimizer
050     * @see EsHadoOptimizer
051     * 
052     * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a>
053     */
054    public final class IndividualInputFormat<I extends Serializable> extends
055                    InputFormat<DoubleWritable, I> {
056            private static final long serialVersionUID = 1L;
057            private static final Log log = LogFactory
058                            .getLog(IndividualInputFormat.class);
059    
060            private static final String[] EMPTY_STRING_ARRAY = new String[0];
061    
062            public final static class IndividualInputSplit<I extends Serializable>
063                            extends InputSplit implements Serializable, HasSeed {
064                    private static final long serialVersionUID = 1L;
065    
066                    private final Collection<I> individuals;
067                    private final long seed;
068    
069                    public IndividualInputSplit(final Collection<I> individuals,
070                                    final long seed) {
071                            this.individuals = individuals;
072                            this.seed = seed;
073                    }
074    
075                    @Override
076                    public long getSeed() {
077                            return seed;
078                    }
079    
080                    @Override
081                    public long getLength() throws IOException, InterruptedException {
082                            return individuals.size();
083                    }
084    
085                    @Override
086                    public String[] getLocations() throws IOException, InterruptedException {
087                            return EMPTY_STRING_ARRAY;
088                    }
089            }
090    
091            private final static class IndividualRecordReader<I extends Serializable>
092                            extends RecordReader<DoubleWritable, I> {
093                    private Iterator<I> indivudualIterator;
094                    private int size;
095                    private I currentValue;
096                    private int counter;
097    
098                    @Override
099                    public void initialize(final InputSplit split,
100                                    final TaskAttemptContext context) throws IOException,
101                                    InterruptedException {
102                            @SuppressWarnings("unchecked")
103                            final IndividualInputSplit<I> indidualInputSplit = (IndividualInputSplit<I>) split;
104                            indivudualIterator = indidualInputSplit.individuals.iterator();
105                            size = indidualInputSplit.individuals.size();
106                            currentValue = null;
107                            counter = 0;
108                    }
109    
110                    @Override
111                    public boolean nextKeyValue() throws IOException, InterruptedException {
112                            if (indivudualIterator.hasNext()) {
113                                    currentValue = indivudualIterator.next();
114                                    counter++;
115                                    return true;
116                            }
117                            return false;
118                    }
119    
120                    @Override
121                    public DoubleWritable getCurrentKey() throws IOException,
122                                    InterruptedException {
123                            if (null != currentValue) {
124                                    return new DoubleWritable(Double.NaN);
125                            }
126                            return null;
127                    }
128    
129                    @Override
130                    public I getCurrentValue() throws IOException, InterruptedException {
131                            return currentValue;
132                    }
133    
134                    @Override
135                    public float getProgress() throws IOException, InterruptedException {
136                            return (float) counter / size;
137                    }
138    
139                    @Override
140                    public void close() throws IOException {
141                            indivudualIterator = null;
142                            currentValue = null;
143                    }
144            }
145    
146            private static final String OFFSPRING = "de.kumpe.hadooptimizer.hadoop.IndividualInputFormat.offspring";
147            private static final String SPLITS = "de.kumpe.hadooptimizer.hadoop.IndividualInputFormat.splits";
148            private static final String RANDOM_GENERATOR_FACTORY = "de.kumpe.hadooptimizer.hadoop.IndividualInputFormat.randomGeneratorFactory";
149            private static final String SEED = "de.kumpe.hadooptimizer.hadoop.IndividualInputFormat.seed";
150            private static final String RANDOM_CHOICE = "de.kumpe.hadooptimizer.hadoop.IndividualInputFormat.randomChoice";
151    
152            public static void setInputPaths(final Job job, final Path... inputPaths)
153                            throws IOException {
154                    FileInputFormat.setInputPaths(job, inputPaths);
155            }
156    
157            public static void setOffspring(final Job job, final int duplicationFactor) {
158                    job.getConfiguration().setInt(OFFSPRING, duplicationFactor);
159            }
160    
161            public static void setSplits(final Job job, final int maxNrOfSplits) {
162                    job.getConfiguration().setInt(SPLITS, maxNrOfSplits);
163            }
164    
165            public static void setRandomGeneratorFactory(final Job job,
166                            final RandomGeneratorFactory randomGeneratorFactory, final long seed) {
167                    job.getConfiguration()
168                                    .setClass(RANDOM_GENERATOR_FACTORY,
169                                                    randomGeneratorFactory.getClass(),
170                                                    RandomGeneratorFactory.class);
171                    job.getConfiguration().setLong(SEED, seed);
172            }
173    
174            public static void setRandomChoice(final Job job, final boolean randomChoice) {
175                    job.getConfiguration().setBoolean(RANDOM_CHOICE, randomChoice);
176            }
177    
178            @Override
179            public List<InputSplit> getSplits(final JobContext context)
180                            throws IOException, InterruptedException {
181                    final long start = System.nanoTime();
182                    final Configuration conf = context.getConfiguration();
183    
184                    final RandomGenerator randomGenerator = getRandomGenerator(conf);
185    
186                    final I[] individualsAsArray = readIndividuals(context);
187    
188                    final int offspring = conf.getInt(OFFSPRING, 1);
189                    final int splits = conf.getInt(SPLITS, offspring);
190                    final int recordsPerSplit = offspring / splits;
191                    int remainingRecords = offspring % splits;
192    
193                    final boolean randomChoice = conf.getBoolean(RANDOM_CHOICE, false);
194    
195                    final int nrOfIndividuals = individualsAsArray.length;
196                    int currentIndividual = -1;
197    
198                    final List<InputSplit> result = new ArrayList<InputSplit>();
199    
200                    List<I> currentSplit = new ArrayList<I>();
201    
202                    for (int i = 0; i < offspring; i++) {
203                            if (currentSplit.size() == recordsPerSplit && 0 < remainingRecords) {
204                                    remainingRecords--;
205                            } else if (currentSplit.size() >= recordsPerSplit) {
206                                    result.add(new IndividualInputSplit<I>(currentSplit,
207                                                    randomGenerator.nextLong()));
208                                    currentSplit = new ArrayList<I>();
209                            }
210    
211                            if (randomChoice) {
212                                    currentIndividual = randomGenerator.nextInt(nrOfIndividuals);
213                            } else {
214                                    currentIndividual = (currentIndividual + 1) % nrOfIndividuals;
215                            }
216    
217                            currentSplit.add(individualsAsArray[currentIndividual]);
218                    }
219                    result.add(new IndividualInputSplit<I>(currentSplit, randomGenerator
220                                    .nextLong()));
221    
222                    if (log.isTraceEnabled()) {
223                            final long duration = System.nanoTime() - start;
224                            log.trace(String.format("Time for creating splits %dns => %.1fs",
225                                            duration, duration / 1000000000d));
226                    }
227                    return result;
228            }
229    
230            @SuppressWarnings("unchecked")
231            private I[] readIndividuals(final JobContext context) throws IOException {
232                    final Path[] inputs = FileInputFormat.getInputPaths(context);
233                    final List<I> individuals = new ArrayList<I>();
234                    new IndividualReader<I>(individuals).readIndivuals(
235                                    context.getConfiguration(), inputs);
236    
237                    return (I[]) individuals.toArray(new Serializable[individuals.size()]);
238            }
239    
240            private RandomGenerator getRandomGenerator(final Configuration conf) {
241                    final Class<? extends RandomGeneratorFactory> randomGeneratorFactoryClass = conf
242                                    .getClass(RANDOM_GENERATOR_FACTORY, null,
243                                                    RandomGeneratorFactory.class);
244                    if (null == randomGeneratorFactoryClass) {
245                            throw new NullPointerException(
246                                            "A RandomGeneratorFactory has to be configured for IndividualInputFormat");
247                    }
248                    final RandomGeneratorFactory randomGeneratorFactory = ReflectionUtils
249                                    .newInstance(randomGeneratorFactoryClass, conf);
250                    final long seed = conf.getLong(SEED, 0);
251                    final RandomGenerator randomGenerator = randomGeneratorFactory
252                                    .createRandomGenerator(seed);
253                    return randomGenerator;
254            }
255    
256            @Override
257            public RecordReader<DoubleWritable, I> createRecordReader(
258                            final InputSplit split, final TaskAttemptContext context)
259                            throws IOException, InterruptedException {
260                    return new IndividualRecordReader<I>();
261            }
262    }