001    /*
002     * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     *     http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package de.kumpe.hadooptimizer.hadoop;
017    
018    import java.io.IOException;
019    
020    import org.apache.hadoop.fs.Path;
021    import org.apache.hadoop.io.DoubleWritable;
022    import org.apache.hadoop.io.SequenceFile;
023    import org.apache.hadoop.mapreduce.Job;
024    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
025    
026    import de.kumpe.hadooptimizer.EsIndividual;
027    import de.kumpe.hadooptimizer.EsOptimizerConfiguration;
028    import de.kumpe.hadooptimizer.EvaluationResult;
029    import de.kumpe.hadooptimizer.Evaluator;
030    import de.kumpe.hadooptimizer.Mutator;
031    import de.kumpe.hadooptimizer.Optimizer;
032    
033    /**
034     * An {@link Optimizer} implementation for for evolution strategies which
035     * distributes the {@link Mutator mutation} and {@link Evaluator evaluation} in
036     * a Hadoop cluster.
037     * 
038     * @see Optimizer
039     * @see HadoOptimizerBase
040     * 
041     * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a>
042     */
043    public class EsHadoOptimizer extends HadoOptimizerBase<EsIndividual> {
044            private static final long serialVersionUID = 1L;
045    
046            public EsHadoOptimizer(final EsOptimizerConfiguration configuration) {
047                    super(configuration);
048            }
049    
050            @Override
051            protected EsOptimizerConfiguration getConfiguration() {
052                    return (EsOptimizerConfiguration) super.getConfiguration();
053            }
054    
055            @Override
056            protected void doOptimize() throws Exception {
057                    Path outputPath = new Path(baseDir, "evaluations-0");
058    
059                    for (long cycle = 1; !checkForHalt(); cycle++) {
060                            if (!getConfiguration().isPreserveParents()) {
061                                    evaluationResults.clear();
062                            }
063    
064                            final Job job = createJob(cycle);
065    
066                            // basic settings
067                            job.setInputFormatClass(IndividualInputFormat.class);
068                            IndividualInputFormat.setInputPaths(job, outputPath);
069                            IndividualInputFormat.setRandomGeneratorFactory(job,
070                                            getConfiguration().getRandomGeneratorFactory(),
071                                            getRandomGenerator().nextLong());
072    
073                            configureJob(job);
074    
075                            // output
076                            outputPath = new Path(baseDir, "cycle-" + cycle);
077                            FileOutputFormat.setOutputPath(job, outputPath);
078    
079                            executeJob(job);
080    
081                            new EvaluationResultReader<EsIndividual>(evaluationResults,
082                                            getConfiguration().getParents()).readIndivuals(conf,
083                                            outputPath);
084    
085                            if (getConfiguration().isPreserveParents()) {
086                                    outputPath = writeEvaluationResultsToFile(cycle);
087                            }
088                    }
089            }
090    
091            boolean checkForHalt() {
092                    return getConfiguration().getHalter().halt(evaluationResults);
093            }
094    
095            void configureJob(final Job job) throws IOException {
096                    final int maxMapTasks = getMaxMapTasks();
097                    job.setMapperClass(MutateAndEvaluateChildMapper.class);
098                    IndividualInputFormat.setSplits(job, maxMapTasks);
099                    IndividualInputFormat.setOffspring(job, getConfiguration()
100                                    .getOffspring());
101                    IndividualInputFormat.setRandomChoice(job, true);
102            }
103    
104            Path writeEvaluationResultsToFile(final long cycle) throws IOException {
105                    final Path outputPath = new Path(baseDir, "result-" + cycle);
106                    final SequenceFile.Writer writer = new SequenceFile.Writer(fileSystem,
107                                    conf, outputPath, DoubleWritable.class, valueClass);
108                    for (final EvaluationResult<EsIndividual> evaluationResult : evaluationResults) {
109                            key.set(evaluationResult.getEvaluation());
110                            writer.append(key, evaluationResult.getIndividual());
111                    }
112                    writer.close();
113                    return outputPath;
114            }
115    }