001    /*
002     * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     *     http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package de.kumpe.hadooptimizer.hadoop;
017    
018    import java.util.Collection;
019    import java.util.Collections;
020    
021    import org.apache.hadoop.fs.Path;
022    import org.apache.hadoop.mapreduce.Job;
023    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
024    
025    import de.kumpe.hadooptimizer.EaOptimizerConfiguration;
026    import de.kumpe.hadooptimizer.Evaluator;
027    import de.kumpe.hadooptimizer.Mutator;
028    import de.kumpe.hadooptimizer.Optimizer;
029    
030    /**
031     * An {@link Optimizer} implementation for general evolutionary algorithms which
032     * distributes the {@link Mutator mutation} and {@link Evaluator evaluation} in
033     * a Hadoop cluster.
034     * 
035     * @param <I>
036     *            the individuals' type
037     * 
038     * @see Optimizer
039     * @see HadoOptimizerBase
040     * 
041     * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a>
042     */
043    public final class EaHadoOptimizer<I> extends HadoOptimizerBase<I> {
044            private static final long serialVersionUID = 1L;
045    
046            public EaHadoOptimizer(final EaOptimizerConfiguration<I> configuration) {
047                    super(configuration);
048            }
049    
050            @Override
051            protected EaOptimizerConfiguration<I> getConfiguration() {
052                    return (EaOptimizerConfiguration<I>) super.getConfiguration();
053            }
054    
055            @Override
056            protected void doOptimize() throws Exception {
057                    for (long cycle = 1; !getConfiguration().getHalter().halt(
058                                    evaluationResults); cycle++) {
059                            final Path parentsPath = new Path(baseDir, "parents" + cycle);
060                            final Path evaluationsPath = new Path(baseDir, "evaluations"
061                                            + cycle);
062    
063                            final Collection<I> children = getConfiguration().getRecombiner()
064                                            .recombine(
065                                                            Collections
066                                                                            .unmodifiableCollection(evaluationResults));
067    
068                            if (!getConfiguration().isPreserveParents()) {
069                                    evaluationResults.clear();
070                            }
071    
072                            writePopulation(children, parentsPath);
073    
074                            // mutation and evaluation
075                            final Job job = createJob(cycle);
076    
077                            // mapper
078                            job.setMapperClass(MutateAndEvaluateChildMapper.class);
079    
080                            // input
081                            // basic settings
082                            job.setInputFormatClass(IndividualInputFormat.class);
083                            IndividualInputFormat.setInputPaths(job, parentsPath);
084                            IndividualInputFormat.setRandomGeneratorFactory(job,
085                                            getConfiguration().getRandomGeneratorFactory(),
086                                            getRandomGenerator().nextLong());
087                            final int maxMapTasks = getMaxMapTasks();
088                            IndividualInputFormat.setSplits(job, maxMapTasks);
089                            IndividualInputFormat.setOffspring(job, children.size());
090                            IndividualInputFormat.setRandomChoice(job, false);
091    
092                            // output
093                            FileOutputFormat.setOutputPath(job, evaluationsPath);
094    
095                            executeJob(job);
096    
097                            new EvaluationResultReader<I>(evaluationResults, getConfiguration()
098                                            .getParents()).readIndivuals(conf, evaluationsPath);
099                    }
100            }
101    }