001 /* 002 * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package de.kumpe.hadooptimizer.hadoop; 017 018 import java.io.IOException; 019 020 import org.apache.hadoop.fs.Path; 021 import org.apache.hadoop.io.DoubleWritable; 022 import org.apache.hadoop.io.SequenceFile; 023 import org.apache.hadoop.mapreduce.Job; 024 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 025 026 import de.kumpe.hadooptimizer.EsIndividual; 027 import de.kumpe.hadooptimizer.EsOptimizerConfiguration; 028 import de.kumpe.hadooptimizer.EvaluationResult; 029 import de.kumpe.hadooptimizer.Evaluator; 030 import de.kumpe.hadooptimizer.Mutator; 031 import de.kumpe.hadooptimizer.Optimizer; 032 033 /** 034 * An {@link Optimizer} implementation for for evolution strategies which 035 * distributes the {@link Mutator mutation} and {@link Evaluator evaluation} in 036 * a Hadoop cluster. 037 * 038 * @see Optimizer 039 * @see HadoOptimizerBase 040 * 041 * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a> 042 */ 043 public class EsHadoOptimizer extends HadoOptimizerBase<EsIndividual> { 044 private static final long serialVersionUID = 1L; 045 046 public EsHadoOptimizer(final EsOptimizerConfiguration configuration) { 047 super(configuration); 048 } 049 050 @Override 051 protected EsOptimizerConfiguration getConfiguration() { 052 return (EsOptimizerConfiguration) super.getConfiguration(); 053 } 054 055 @Override 056 protected void doOptimize() throws Exception { 057 Path outputPath = new Path(baseDir, "evaluations-0"); 058 059 for (long cycle = 1; !checkForHalt(); cycle++) { 060 if (!getConfiguration().isPreserveParents()) { 061 evaluationResults.clear(); 062 } 063 064 final Job job = createJob(cycle); 065 066 // basic settings 067 job.setInputFormatClass(IndividualInputFormat.class); 068 IndividualInputFormat.setInputPaths(job, outputPath); 069 IndividualInputFormat.setRandomGeneratorFactory(job, 070 getConfiguration().getRandomGeneratorFactory(), 071 getRandomGenerator().nextLong()); 072 073 configureJob(job); 074 075 // output 076 outputPath = new Path(baseDir, "cycle-" + cycle); 077 FileOutputFormat.setOutputPath(job, outputPath); 078 079 executeJob(job); 080 081 new EvaluationResultReader<EsIndividual>(evaluationResults, 082 getConfiguration().getParents()).readIndivuals(conf, 083 outputPath); 084 085 if (getConfiguration().isPreserveParents()) { 086 outputPath = writeEvaluationResultsToFile(cycle); 087 } 088 } 089 } 090 091 boolean checkForHalt() { 092 return getConfiguration().getHalter().halt(evaluationResults); 093 } 094 095 void configureJob(final Job job) throws IOException { 096 final int maxMapTasks = getMaxMapTasks(); 097 job.setMapperClass(MutateAndEvaluateChildMapper.class); 098 IndividualInputFormat.setSplits(job, maxMapTasks); 099 IndividualInputFormat.setOffspring(job, getConfiguration() 100 .getOffspring()); 101 IndividualInputFormat.setRandomChoice(job, true); 102 } 103 104 Path writeEvaluationResultsToFile(final long cycle) throws IOException { 105 final Path outputPath = new Path(baseDir, "result-" + cycle); 106 final SequenceFile.Writer writer = new SequenceFile.Writer(fileSystem, 107 conf, outputPath, DoubleWritable.class, valueClass); 108 for (final EvaluationResult<EsIndividual> evaluationResult : evaluationResults) { 109 key.set(evaluationResult.getEvaluation()); 110 writer.append(key, evaluationResult.getIndividual()); 111 } 112 writer.close(); 113 return outputPath; 114 } 115 }