001 /*
002 * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package de.kumpe.hadooptimizer.hadoop;
017
018 import java.io.IOException;
019
020 import org.apache.hadoop.fs.Path;
021 import org.apache.hadoop.io.DoubleWritable;
022 import org.apache.hadoop.io.SequenceFile;
023 import org.apache.hadoop.mapreduce.Job;
024 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
025
026 import de.kumpe.hadooptimizer.EsIndividual;
027 import de.kumpe.hadooptimizer.EsOptimizerConfiguration;
028 import de.kumpe.hadooptimizer.EvaluationResult;
029 import de.kumpe.hadooptimizer.Evaluator;
030 import de.kumpe.hadooptimizer.Mutator;
031 import de.kumpe.hadooptimizer.Optimizer;
032
033 /**
034 * An {@link Optimizer} implementation for for evolution strategies which
035 * distributes the {@link Mutator mutation} and {@link Evaluator evaluation} in
036 * a Hadoop cluster.
037 *
038 * @see Optimizer
039 * @see HadoOptimizerBase
040 *
041 * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a>
042 */
043 public class EsHadoOptimizer extends HadoOptimizerBase<EsIndividual> {
044 private static final long serialVersionUID = 1L;
045
046 public EsHadoOptimizer(final EsOptimizerConfiguration configuration) {
047 super(configuration);
048 }
049
050 @Override
051 protected EsOptimizerConfiguration getConfiguration() {
052 return (EsOptimizerConfiguration) super.getConfiguration();
053 }
054
055 @Override
056 protected void doOptimize() throws Exception {
057 Path outputPath = new Path(baseDir, "evaluations-0");
058
059 for (long cycle = 1; !checkForHalt(); cycle++) {
060 if (!getConfiguration().isPreserveParents()) {
061 evaluationResults.clear();
062 }
063
064 final Job job = createJob(cycle);
065
066 // basic settings
067 job.setInputFormatClass(IndividualInputFormat.class);
068 IndividualInputFormat.setInputPaths(job, outputPath);
069 IndividualInputFormat.setRandomGeneratorFactory(job,
070 getConfiguration().getRandomGeneratorFactory(),
071 getRandomGenerator().nextLong());
072
073 configureJob(job);
074
075 // output
076 outputPath = new Path(baseDir, "cycle-" + cycle);
077 FileOutputFormat.setOutputPath(job, outputPath);
078
079 executeJob(job);
080
081 new EvaluationResultReader<EsIndividual>(evaluationResults,
082 getConfiguration().getParents()).readIndivuals(conf,
083 outputPath);
084
085 if (getConfiguration().isPreserveParents()) {
086 outputPath = writeEvaluationResultsToFile(cycle);
087 }
088 }
089 }
090
091 boolean checkForHalt() {
092 return getConfiguration().getHalter().halt(evaluationResults);
093 }
094
095 void configureJob(final Job job) throws IOException {
096 final int maxMapTasks = getMaxMapTasks();
097 job.setMapperClass(MutateAndEvaluateChildMapper.class);
098 IndividualInputFormat.setSplits(job, maxMapTasks);
099 IndividualInputFormat.setOffspring(job, getConfiguration()
100 .getOffspring());
101 IndividualInputFormat.setRandomChoice(job, true);
102 }
103
104 Path writeEvaluationResultsToFile(final long cycle) throws IOException {
105 final Path outputPath = new Path(baseDir, "result-" + cycle);
106 final SequenceFile.Writer writer = new SequenceFile.Writer(fileSystem,
107 conf, outputPath, DoubleWritable.class, valueClass);
108 for (final EvaluationResult<EsIndividual> evaluationResult : evaluationResults) {
109 key.set(evaluationResult.getEvaluation());
110 writer.append(key, evaluationResult.getIndividual());
111 }
112 writer.close();
113 return outputPath;
114 }
115 }