001 /*
002 * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package de.kumpe.hadooptimizer.hadoop;
017
018 import java.io.IOException;
019 import java.io.ObjectInputStream;
020 import java.io.ObjectOutputStream;
021 import java.net.URISyntaxException;
022 import java.util.ArrayList;
023 import java.util.Collection;
024 import java.util.Date;
025 import java.util.TreeSet;
026 import java.util.concurrent.atomic.AtomicBoolean;
027
028 import org.apache.commons.logging.Log;
029 import org.apache.commons.logging.LogFactory;
030 import org.apache.commons.math.random.RandomGenerator;
031 import org.apache.commons.math.random.Well44497b;
032 import org.apache.hadoop.conf.Configurable;
033 import org.apache.hadoop.conf.Configuration;
034 import org.apache.hadoop.fs.FileSystem;
035 import org.apache.hadoop.fs.Path;
036 import org.apache.hadoop.io.DoubleWritable;
037 import org.apache.hadoop.io.SequenceFile;
038 import org.apache.hadoop.io.serializer.WritableSerialization;
039 import org.apache.hadoop.mapred.JobClient;
040 import org.apache.hadoop.mapred.JobConf;
041 import org.apache.hadoop.mapreduce.InputSplit;
042 import org.apache.hadoop.mapreduce.Job;
043 import org.apache.hadoop.mapreduce.Mapper;
044 import org.apache.hadoop.mapreduce.Reducer;
045 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
046 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
047 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
048
049 import de.kumpe.hadooptimizer.EaOptimizerConfigurationBase;
050 import de.kumpe.hadooptimizer.EvaluationResult;
051 import de.kumpe.hadooptimizer.Optimizer;
052 import de.kumpe.hadooptimizer.OptimizerConfiguration;
053 import de.kumpe.hadooptimizer.OptimizerException;
054
055 /**
056 * Base-class for all Hadoop-based {@link Optimizer} implementations. Subclasses
057 * need to override the {@link #doOptimize()} method and setup and create the
058 * Hadoop-{@link Job jobs} to do the actual optimization.
059 * <p>
060 * When setting up {@link Mapper}s or {@link Reducer}s you can use
061 * {@link #readConfiguration(Mapper.Context)} to read in the
062 * {@link OptimizerConfiguration}.
063 *
064 * @param <I>
065 * the individuals' type
066 *
067 * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a>
068 */
069 @SuppressWarnings("deprecation")
070 public abstract class HadoOptimizerBase<I> extends Optimizer<I> implements
071 Configurable {
072 private static final Log log = LogFactory.getLog(HadoOptimizerBase.class);
073
074 static final String COUNTER_GROUP = "HadoOptimizer";
075 private static final String CONFIGURATION_FILE = "de.kumpe.hadooptimizer.EaOptimizerConfiguration.file";
076
077 /**
078 * Reads the {@link OptimizerConfiguration} out of the {@link Configuration
079 * hadoop's job configuration} from the specified {@link Mapper.Context}.
080 * <p>
081 * If the {@link InputSplit} of the specified {@link Mapper.Context} is
082 * implementing {@link HasSeed} then a new
083 * {@link Well44497b#Well44497b(long)} -instance with the
084 * {@link HasSeed#getSeed() retrieved seed} is
085 * {@link OptimizerConfiguration#injectRandomGenerator(RandomGenerator) injected}
086 * into the configuration. Otherwise taskAttemptID is used to create a seed.
087 *
088 * @param context
089 * the mapper's {@link Mapper.Context context}
090 *
091 * @return the read configuration
092 */
093 @SuppressWarnings("rawtypes")
094 public static OptimizerConfiguration readConfiguration(
095 final Mapper.Context context) throws IOException {
096 final Configuration conf = context.getConfiguration();
097
098 final Path configurationFile = new Path(
099 conf.get(HadoOptimizerBase.CONFIGURATION_FILE));
100 final ObjectInputStream in = new ObjectInputStream(FileSystem.get(conf)
101 .open(configurationFile));
102
103 final InputSplit inputSplit = context.getInputSplit();
104 final long seed;
105 if (inputSplit instanceof HasSeed) {
106 seed = ((HasSeed) inputSplit).getSeed();
107 } else {
108 if (log.isWarnEnabled()) {
109 log.warn("No seed delivered, instantiating with taskAttemptID.");
110 }
111 seed = context.getTaskAttemptID().hashCode()
112 * System.currentTimeMillis();
113 }
114
115 try {
116 final OptimizerConfiguration optimizerConfiguration = (EaOptimizerConfigurationBase) in
117 .readObject();
118 optimizerConfiguration.injectRandomGenerator(optimizerConfiguration
119 .getRandomGeneratorFactory().createRandomGenerator(seed));
120 return optimizerConfiguration;
121 } catch (final ClassNotFoundException e) {
122 throw new RuntimeException(e);
123 }
124 }
125
126 private final AtomicBoolean running = new AtomicBoolean();
127 final TreeSet<EvaluationResult<I>> evaluationResults = new TreeSet<EvaluationResult<I>>();
128 final DoubleWritable key = new DoubleWritable();
129 Class<?> valueClass;
130 Configuration conf;
131 FileSystem fileSystem;
132 Path baseDir;
133
134 public HadoOptimizerBase(final EaOptimizerConfigurationBase<I> configuration) {
135 super(configuration);
136 }
137
138 @Override
139 protected EaOptimizerConfigurationBase<I> getConfiguration() {
140 return (EaOptimizerConfigurationBase<I>) super.getConfiguration();
141 }
142
143 @Override
144 public Configuration getConf() {
145 return conf;
146 }
147
148 @Override
149 public void setConf(final Configuration conf) {
150 this.conf = conf;
151 }
152
153 /**
154 * Creates the output-directory, writes the
155 * {@link EaOptimizerConfigurationBase} into the file-system and starts the
156 * actual optimization in {@link #doOptimize()}.
157 *
158 * @see Optimizer#optimize()
159 */
160 @Override
161 public final void optimize() {
162 if (!running.compareAndSet(false, true)) {
163 throw new IllegalStateException(
164 "The optimizer can only be started once.");
165 }
166 if (log.isDebugEnabled()) {
167 log.debug("Initializing optimization...");
168 }
169
170 final Collection<I> startPopulation = getConfiguration()
171 .getPopulationReader().read();
172
173 try {
174 if (conf == null) {
175 conf = new Configuration();
176 }
177
178 getConfiguration().injectRandomGenerator(getRandomGenerator());
179
180 evaluationResults.clear();
181 valueClass = startPopulation.iterator().next().getClass();
182 fileSystem = FileSystem.get(conf);
183
184 conf.setStrings("io.serializations",
185 WritableSerialization.class.getName(),
186 JavaSerialization.class.getName());
187
188 baseDir = new Path(String.format(
189 "/tmp/hadooptimizer-%1$tY%1$tm%1$td-%1$tH%1$tM%1$tS",
190 new Date()));
191 fileSystem.mkdirs(baseDir);
192
193 final Path configurationFile = new Path(baseDir, "configuration");
194 final ObjectOutputStream out = new ObjectOutputStream(
195 fileSystem.create(configurationFile));
196 out.writeObject(getConfiguration());
197 out.close();
198 conf.set(HadoOptimizerBase.CONFIGURATION_FILE,
199 configurationFile.toString());
200
201 initialEvaluation(startPopulation);
202
203 if (log.isInfoEnabled()) {
204 log.info("Starting optimization...");
205 }
206
207 doOptimize();
208
209 if (log.isInfoEnabled()) {
210 log.info("Optimization finished.");
211 }
212
213 // unpack individuals from evaluationResults
214 final Collection<I> resultPopulation = new ArrayList<I>();
215 for (final EvaluationResult<I> evaluationResult : evaluationResults) {
216 resultPopulation.add(evaluationResult.getIndividual());
217 }
218
219 fileSystem.delete(baseDir, conf.getBoolean("clean.basedir", true));
220
221 getConfiguration().getPopulationWriter().write(resultPopulation);
222 } catch (final Exception e) {
223 throw new OptimizerException(e);
224 } finally {
225 running.set(false);
226 }
227 }
228
229 void initialEvaluation(final Collection<I> startPopulation)
230 throws Exception {
231 if (log.isInfoEnabled()) {
232 log.info(String.format(
233 "Starting initial evaluation for %d individuals.",
234 startPopulation.size()));
235 }
236
237 final Path inputPath = new Path(baseDir, "startPopulation");
238 final Path evaluationsPath = new Path(baseDir, "evaluations-0");
239
240 writePopulation(startPopulation, inputPath);
241
242 // mutation and evaluation
243 final Job job = createJob(0);
244
245 // mapper
246 job.setMapperClass(EvaluateIndividualMapper.class);
247
248 // input
249 job.setInputFormatClass(SequenceFileInputFormat.class);
250 SequenceFileInputFormat.setInputPaths(job, inputPath);
251
252 // output
253 FileOutputFormat.setOutputPath(job, evaluationsPath);
254
255 executeJob(job);
256
257 new EvaluationResultReader<I>(evaluationResults, getConfiguration()
258 .getParents()).readIndivuals(conf, evaluationsPath);
259 }
260
261 void writePopulation(final Collection<I> population, final Path inputPath)
262 throws IOException {
263 final SequenceFile.Writer writer = SequenceFile.createWriter(
264 fileSystem, conf, inputPath, DoubleWritable.class, population
265 .iterator().next().getClass());
266 key.set(Double.NaN);
267 for (final I individual : population) {
268 writer.append(key, individual);
269 }
270 writer.close();
271 }
272
273 /**
274 * Needs to be overridden to do the actual optimization.
275 */
276 protected abstract void doOptimize() throws Exception;
277
278 Job createJob(final long cycle) throws IOException, URISyntaxException {
279 final Job job = new Job(conf);
280 job.setJobName(getClass().getSimpleName() + " cycle " + cycle);
281 job.setJarByClass(SelectionReducer.class);
282
283 // reducer
284 job.setCombinerClass(SelectionReducer.class);
285 job.setReducerClass(SelectionReducer.class);
286 SelectionReducer.setParents(job, getConfiguration().getParents());
287 job.setNumReduceTasks(1);
288
289 // output
290 job.setOutputFormatClass(SequenceFileOutputFormat.class);
291 job.setOutputKeyClass(DoubleWritable.class);
292 job.setOutputValueClass(valueClass);
293
294 return job;
295 }
296
297 void executeJob(final Job job) throws IOException, InterruptedException,
298 ClassNotFoundException {
299 if (job.waitForCompletion(true)) {
300 return;
301 }
302 throw new RuntimeException("Job failed.");
303 }
304
305 int getMaxMapTasks() throws IOException {
306 int maxMapTasks = conf.getInt("hadooptimizer.maxMapTasks", 0);
307 if (maxMapTasks <= 0) {
308 maxMapTasks = new JobClient(new JobConf(conf)).getClusterStatus()
309 .getMaxMapTasks();
310 }
311 return maxMapTasks;
312 }
313 }