001 /* 002 * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package de.kumpe.hadooptimizer.hadoop; 017 018 import java.io.IOException; 019 import java.io.ObjectInputStream; 020 import java.io.ObjectOutputStream; 021 import java.net.URISyntaxException; 022 import java.util.ArrayList; 023 import java.util.Collection; 024 import java.util.Date; 025 import java.util.TreeSet; 026 import java.util.concurrent.atomic.AtomicBoolean; 027 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 import org.apache.commons.math.random.RandomGenerator; 031 import org.apache.commons.math.random.Well44497b; 032 import org.apache.hadoop.conf.Configurable; 033 import org.apache.hadoop.conf.Configuration; 034 import org.apache.hadoop.fs.FileSystem; 035 import org.apache.hadoop.fs.Path; 036 import org.apache.hadoop.io.DoubleWritable; 037 import org.apache.hadoop.io.SequenceFile; 038 import org.apache.hadoop.io.serializer.WritableSerialization; 039 import org.apache.hadoop.mapred.JobClient; 040 import org.apache.hadoop.mapred.JobConf; 041 import org.apache.hadoop.mapreduce.InputSplit; 042 import org.apache.hadoop.mapreduce.Job; 043 import org.apache.hadoop.mapreduce.Mapper; 044 import org.apache.hadoop.mapreduce.Reducer; 045 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 046 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 047 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 048 049 import de.kumpe.hadooptimizer.EaOptimizerConfigurationBase; 050 import de.kumpe.hadooptimizer.EvaluationResult; 051 import de.kumpe.hadooptimizer.Optimizer; 052 import de.kumpe.hadooptimizer.OptimizerConfiguration; 053 import de.kumpe.hadooptimizer.OptimizerException; 054 055 /** 056 * Base-class for all Hadoop-based {@link Optimizer} implementations. Subclasses 057 * need to override the {@link #doOptimize()} method and setup and create the 058 * Hadoop-{@link Job jobs} to do the actual optimization. 059 * <p> 060 * When setting up {@link Mapper}s or {@link Reducer}s you can use 061 * {@link #readConfiguration(Mapper.Context)} to read in the 062 * {@link OptimizerConfiguration}. 063 * 064 * @param <I> 065 * the individuals' type 066 * 067 * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a> 068 */ 069 @SuppressWarnings("deprecation") 070 public abstract class HadoOptimizerBase<I> extends Optimizer<I> implements 071 Configurable { 072 private static final Log log = LogFactory.getLog(HadoOptimizerBase.class); 073 074 static final String COUNTER_GROUP = "HadoOptimizer"; 075 private static final String CONFIGURATION_FILE = "de.kumpe.hadooptimizer.EaOptimizerConfiguration.file"; 076 077 /** 078 * Reads the {@link OptimizerConfiguration} out of the {@link Configuration 079 * hadoop's job configuration} from the specified {@link Mapper.Context}. 080 * <p> 081 * If the {@link InputSplit} of the specified {@link Mapper.Context} is 082 * implementing {@link HasSeed} then a new 083 * {@link Well44497b#Well44497b(long)} -instance with the 084 * {@link HasSeed#getSeed() retrieved seed} is 085 * {@link OptimizerConfiguration#injectRandomGenerator(RandomGenerator) injected} 086 * into the configuration. Otherwise taskAttemptID is used to create a seed. 087 * 088 * @param context 089 * the mapper's {@link Mapper.Context context} 090 * 091 * @return the read configuration 092 */ 093 @SuppressWarnings("rawtypes") 094 public static OptimizerConfiguration readConfiguration( 095 final Mapper.Context context) throws IOException { 096 final Configuration conf = context.getConfiguration(); 097 098 final Path configurationFile = new Path( 099 conf.get(HadoOptimizerBase.CONFIGURATION_FILE)); 100 final ObjectInputStream in = new ObjectInputStream(FileSystem.get(conf) 101 .open(configurationFile)); 102 103 final InputSplit inputSplit = context.getInputSplit(); 104 final long seed; 105 if (inputSplit instanceof HasSeed) { 106 seed = ((HasSeed) inputSplit).getSeed(); 107 } else { 108 if (log.isWarnEnabled()) { 109 log.warn("No seed delivered, instantiating with taskAttemptID."); 110 } 111 seed = context.getTaskAttemptID().hashCode() 112 * System.currentTimeMillis(); 113 } 114 115 try { 116 final OptimizerConfiguration optimizerConfiguration = (EaOptimizerConfigurationBase) in 117 .readObject(); 118 optimizerConfiguration.injectRandomGenerator(optimizerConfiguration 119 .getRandomGeneratorFactory().createRandomGenerator(seed)); 120 return optimizerConfiguration; 121 } catch (final ClassNotFoundException e) { 122 throw new RuntimeException(e); 123 } 124 } 125 126 private final AtomicBoolean running = new AtomicBoolean(); 127 final TreeSet<EvaluationResult<I>> evaluationResults = new TreeSet<EvaluationResult<I>>(); 128 final DoubleWritable key = new DoubleWritable(); 129 Class<?> valueClass; 130 Configuration conf; 131 FileSystem fileSystem; 132 Path baseDir; 133 134 public HadoOptimizerBase(final EaOptimizerConfigurationBase<I> configuration) { 135 super(configuration); 136 } 137 138 @Override 139 protected EaOptimizerConfigurationBase<I> getConfiguration() { 140 return (EaOptimizerConfigurationBase<I>) super.getConfiguration(); 141 } 142 143 @Override 144 public Configuration getConf() { 145 return conf; 146 } 147 148 @Override 149 public void setConf(final Configuration conf) { 150 this.conf = conf; 151 } 152 153 /** 154 * Creates the output-directory, writes the 155 * {@link EaOptimizerConfigurationBase} into the file-system and starts the 156 * actual optimization in {@link #doOptimize()}. 157 * 158 * @see Optimizer#optimize() 159 */ 160 @Override 161 public final void optimize() { 162 if (!running.compareAndSet(false, true)) { 163 throw new IllegalStateException( 164 "The optimizer can only be started once."); 165 } 166 if (log.isDebugEnabled()) { 167 log.debug("Initializing optimization..."); 168 } 169 170 final Collection<I> startPopulation = getConfiguration() 171 .getPopulationReader().read(); 172 173 try { 174 if (conf == null) { 175 conf = new Configuration(); 176 } 177 178 getConfiguration().injectRandomGenerator(getRandomGenerator()); 179 180 evaluationResults.clear(); 181 valueClass = startPopulation.iterator().next().getClass(); 182 fileSystem = FileSystem.get(conf); 183 184 conf.setStrings("io.serializations", 185 WritableSerialization.class.getName(), 186 JavaSerialization.class.getName()); 187 188 baseDir = new Path(String.format( 189 "/tmp/hadooptimizer-%1$tY%1$tm%1$td-%1$tH%1$tM%1$tS", 190 new Date())); 191 fileSystem.mkdirs(baseDir); 192 193 final Path configurationFile = new Path(baseDir, "configuration"); 194 final ObjectOutputStream out = new ObjectOutputStream( 195 fileSystem.create(configurationFile)); 196 out.writeObject(getConfiguration()); 197 out.close(); 198 conf.set(HadoOptimizerBase.CONFIGURATION_FILE, 199 configurationFile.toString()); 200 201 initialEvaluation(startPopulation); 202 203 if (log.isInfoEnabled()) { 204 log.info("Starting optimization..."); 205 } 206 207 doOptimize(); 208 209 if (log.isInfoEnabled()) { 210 log.info("Optimization finished."); 211 } 212 213 // unpack individuals from evaluationResults 214 final Collection<I> resultPopulation = new ArrayList<I>(); 215 for (final EvaluationResult<I> evaluationResult : evaluationResults) { 216 resultPopulation.add(evaluationResult.getIndividual()); 217 } 218 219 fileSystem.delete(baseDir, conf.getBoolean("clean.basedir", true)); 220 221 getConfiguration().getPopulationWriter().write(resultPopulation); 222 } catch (final Exception e) { 223 throw new OptimizerException(e); 224 } finally { 225 running.set(false); 226 } 227 } 228 229 void initialEvaluation(final Collection<I> startPopulation) 230 throws Exception { 231 if (log.isInfoEnabled()) { 232 log.info(String.format( 233 "Starting initial evaluation for %d individuals.", 234 startPopulation.size())); 235 } 236 237 final Path inputPath = new Path(baseDir, "startPopulation"); 238 final Path evaluationsPath = new Path(baseDir, "evaluations-0"); 239 240 writePopulation(startPopulation, inputPath); 241 242 // mutation and evaluation 243 final Job job = createJob(0); 244 245 // mapper 246 job.setMapperClass(EvaluateIndividualMapper.class); 247 248 // input 249 job.setInputFormatClass(SequenceFileInputFormat.class); 250 SequenceFileInputFormat.setInputPaths(job, inputPath); 251 252 // output 253 FileOutputFormat.setOutputPath(job, evaluationsPath); 254 255 executeJob(job); 256 257 new EvaluationResultReader<I>(evaluationResults, getConfiguration() 258 .getParents()).readIndivuals(conf, evaluationsPath); 259 } 260 261 void writePopulation(final Collection<I> population, final Path inputPath) 262 throws IOException { 263 final SequenceFile.Writer writer = SequenceFile.createWriter( 264 fileSystem, conf, inputPath, DoubleWritable.class, population 265 .iterator().next().getClass()); 266 key.set(Double.NaN); 267 for (final I individual : population) { 268 writer.append(key, individual); 269 } 270 writer.close(); 271 } 272 273 /** 274 * Needs to be overridden to do the actual optimization. 275 */ 276 protected abstract void doOptimize() throws Exception; 277 278 Job createJob(final long cycle) throws IOException, URISyntaxException { 279 final Job job = new Job(conf); 280 job.setJobName(getClass().getSimpleName() + " cycle " + cycle); 281 job.setJarByClass(SelectionReducer.class); 282 283 // reducer 284 job.setCombinerClass(SelectionReducer.class); 285 job.setReducerClass(SelectionReducer.class); 286 SelectionReducer.setParents(job, getConfiguration().getParents()); 287 job.setNumReduceTasks(1); 288 289 // output 290 job.setOutputFormatClass(SequenceFileOutputFormat.class); 291 job.setOutputKeyClass(DoubleWritable.class); 292 job.setOutputValueClass(valueClass); 293 294 return job; 295 } 296 297 void executeJob(final Job job) throws IOException, InterruptedException, 298 ClassNotFoundException { 299 if (job.waitForCompletion(true)) { 300 return; 301 } 302 throw new RuntimeException("Job failed."); 303 } 304 305 int getMaxMapTasks() throws IOException { 306 int maxMapTasks = conf.getInt("hadooptimizer.maxMapTasks", 0); 307 if (maxMapTasks <= 0) { 308 maxMapTasks = new JobClient(new JobConf(conf)).getClusterStatus() 309 .getMaxMapTasks(); 310 } 311 return maxMapTasks; 312 } 313 }