001    /*
002     * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     *     http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package de.kumpe.hadooptimizer.hadoop;
017    
018    import java.io.IOException;
019    import java.io.ObjectInputStream;
020    import java.io.ObjectOutputStream;
021    import java.net.URISyntaxException;
022    import java.util.ArrayList;
023    import java.util.Collection;
024    import java.util.Date;
025    import java.util.TreeSet;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    import org.apache.commons.math.random.RandomGenerator;
031    import org.apache.commons.math.random.Well44497b;
032    import org.apache.hadoop.conf.Configurable;
033    import org.apache.hadoop.conf.Configuration;
034    import org.apache.hadoop.fs.FileSystem;
035    import org.apache.hadoop.fs.Path;
036    import org.apache.hadoop.io.DoubleWritable;
037    import org.apache.hadoop.io.SequenceFile;
038    import org.apache.hadoop.io.serializer.WritableSerialization;
039    import org.apache.hadoop.mapred.JobClient;
040    import org.apache.hadoop.mapred.JobConf;
041    import org.apache.hadoop.mapreduce.InputSplit;
042    import org.apache.hadoop.mapreduce.Job;
043    import org.apache.hadoop.mapreduce.Mapper;
044    import org.apache.hadoop.mapreduce.Reducer;
045    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
046    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
047    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
048    
049    import de.kumpe.hadooptimizer.EaOptimizerConfigurationBase;
050    import de.kumpe.hadooptimizer.EvaluationResult;
051    import de.kumpe.hadooptimizer.Optimizer;
052    import de.kumpe.hadooptimizer.OptimizerConfiguration;
053    import de.kumpe.hadooptimizer.OptimizerException;
054    
055    /**
056     * Base-class for all Hadoop-based {@link Optimizer} implementations. Subclasses
057     * need to override the {@link #doOptimize()} method and setup and create the
058     * Hadoop-{@link Job jobs} to do the actual optimization.
059     * <p>
060     * When setting up {@link Mapper}s or {@link Reducer}s you can use
061     * {@link #readConfiguration(Mapper.Context)} to read in the
062     * {@link OptimizerConfiguration}.
063     * 
064     * @param <I>
065     *            the individuals' type
066     * 
067     * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a>
068     */
069    @SuppressWarnings("deprecation")
070    public abstract class HadoOptimizerBase<I> extends Optimizer<I> implements
071                    Configurable {
072            private static final Log log = LogFactory.getLog(HadoOptimizerBase.class);
073    
074            static final String COUNTER_GROUP = "HadoOptimizer";
075            private static final String CONFIGURATION_FILE = "de.kumpe.hadooptimizer.EaOptimizerConfiguration.file";
076    
077            /**
078             * Reads the {@link OptimizerConfiguration} out of the {@link Configuration
079             * hadoop's job configuration} from the specified {@link Mapper.Context}.
080             * <p>
081             * If the {@link InputSplit} of the specified {@link Mapper.Context} is
082             * implementing {@link HasSeed} then a new
083             * {@link Well44497b#Well44497b(long)} -instance with the
084             * {@link HasSeed#getSeed() retrieved seed} is
085             * {@link OptimizerConfiguration#injectRandomGenerator(RandomGenerator) injected}
086             * into the configuration. Otherwise taskAttemptID is used to create a seed.
087             * 
088             * @param context
089             *            the mapper's {@link Mapper.Context context}
090             * 
091             * @return the read configuration
092             */
093            @SuppressWarnings("rawtypes")
094            public static OptimizerConfiguration readConfiguration(
095                            final Mapper.Context context) throws IOException {
096                    final Configuration conf = context.getConfiguration();
097    
098                    final Path configurationFile = new Path(
099                                    conf.get(HadoOptimizerBase.CONFIGURATION_FILE));
100                    final ObjectInputStream in = new ObjectInputStream(FileSystem.get(conf)
101                                    .open(configurationFile));
102    
103                    final InputSplit inputSplit = context.getInputSplit();
104                    final long seed;
105                    if (inputSplit instanceof HasSeed) {
106                            seed = ((HasSeed) inputSplit).getSeed();
107                    } else {
108                            if (log.isWarnEnabled()) {
109                                    log.warn("No seed delivered, instantiating with taskAttemptID.");
110                            }
111                            seed = context.getTaskAttemptID().hashCode()
112                                            * System.currentTimeMillis();
113                    }
114    
115                    try {
116                            final OptimizerConfiguration optimizerConfiguration = (EaOptimizerConfigurationBase) in
117                                            .readObject();
118                            optimizerConfiguration.injectRandomGenerator(optimizerConfiguration
119                                            .getRandomGeneratorFactory().createRandomGenerator(seed));
120                            return optimizerConfiguration;
121                    } catch (final ClassNotFoundException e) {
122                            throw new RuntimeException(e);
123                    }
124            }
125    
126            private final AtomicBoolean running = new AtomicBoolean();
127            final TreeSet<EvaluationResult<I>> evaluationResults = new TreeSet<EvaluationResult<I>>();
128            final DoubleWritable key = new DoubleWritable();
129            Class<?> valueClass;
130            Configuration conf;
131            FileSystem fileSystem;
132            Path baseDir;
133    
134            public HadoOptimizerBase(final EaOptimizerConfigurationBase<I> configuration) {
135                    super(configuration);
136            }
137    
138            @Override
139            protected EaOptimizerConfigurationBase<I> getConfiguration() {
140                    return (EaOptimizerConfigurationBase<I>) super.getConfiguration();
141            }
142    
143            @Override
144            public Configuration getConf() {
145                    return conf;
146            }
147    
148            @Override
149            public void setConf(final Configuration conf) {
150                    this.conf = conf;
151            }
152    
153            /**
154             * Creates the output-directory, writes the
155             * {@link EaOptimizerConfigurationBase} into the file-system and starts the
156             * actual optimization in {@link #doOptimize()}.
157             * 
158             * @see Optimizer#optimize()
159             */
160            @Override
161            public final void optimize() {
162                    if (!running.compareAndSet(false, true)) {
163                            throw new IllegalStateException(
164                                            "The optimizer can only be started once.");
165                    }
166                    if (log.isDebugEnabled()) {
167                            log.debug("Initializing optimization...");
168                    }
169    
170                    final Collection<I> startPopulation = getConfiguration()
171                                    .getPopulationReader().read();
172    
173                    try {
174                            if (conf == null) {
175                                    conf = new Configuration();
176                            }
177    
178                            getConfiguration().injectRandomGenerator(getRandomGenerator());
179    
180                            evaluationResults.clear();
181                            valueClass = startPopulation.iterator().next().getClass();
182                            fileSystem = FileSystem.get(conf);
183    
184                            conf.setStrings("io.serializations",
185                                            WritableSerialization.class.getName(),
186                                            JavaSerialization.class.getName());
187    
188                            baseDir = new Path(String.format(
189                                            "/tmp/hadooptimizer-%1$tY%1$tm%1$td-%1$tH%1$tM%1$tS",
190                                            new Date()));
191                            fileSystem.mkdirs(baseDir);
192    
193                            final Path configurationFile = new Path(baseDir, "configuration");
194                            final ObjectOutputStream out = new ObjectOutputStream(
195                                            fileSystem.create(configurationFile));
196                            out.writeObject(getConfiguration());
197                            out.close();
198                            conf.set(HadoOptimizerBase.CONFIGURATION_FILE,
199                                            configurationFile.toString());
200    
201                            initialEvaluation(startPopulation);
202    
203                            if (log.isInfoEnabled()) {
204                                    log.info("Starting optimization...");
205                            }
206    
207                            doOptimize();
208    
209                            if (log.isInfoEnabled()) {
210                                    log.info("Optimization finished.");
211                            }
212    
213                            // unpack individuals from evaluationResults
214                            final Collection<I> resultPopulation = new ArrayList<I>();
215                            for (final EvaluationResult<I> evaluationResult : evaluationResults) {
216                                    resultPopulation.add(evaluationResult.getIndividual());
217                            }
218    
219                            fileSystem.delete(baseDir, conf.getBoolean("clean.basedir", true));
220    
221                            getConfiguration().getPopulationWriter().write(resultPopulation);
222                    } catch (final Exception e) {
223                            throw new OptimizerException(e);
224                    } finally {
225                            running.set(false);
226                    }
227            }
228    
229            void initialEvaluation(final Collection<I> startPopulation)
230                            throws Exception {
231                    if (log.isInfoEnabled()) {
232                            log.info(String.format(
233                                            "Starting initial evaluation for %d individuals.",
234                                            startPopulation.size()));
235                    }
236    
237                    final Path inputPath = new Path(baseDir, "startPopulation");
238                    final Path evaluationsPath = new Path(baseDir, "evaluations-0");
239    
240                    writePopulation(startPopulation, inputPath);
241    
242                    // mutation and evaluation
243                    final Job job = createJob(0);
244    
245                    // mapper
246                    job.setMapperClass(EvaluateIndividualMapper.class);
247    
248                    // input
249                    job.setInputFormatClass(SequenceFileInputFormat.class);
250                    SequenceFileInputFormat.setInputPaths(job, inputPath);
251    
252                    // output
253                    FileOutputFormat.setOutputPath(job, evaluationsPath);
254    
255                    executeJob(job);
256    
257                    new EvaluationResultReader<I>(evaluationResults, getConfiguration()
258                                    .getParents()).readIndivuals(conf, evaluationsPath);
259            }
260    
261            void writePopulation(final Collection<I> population, final Path inputPath)
262                            throws IOException {
263                    final SequenceFile.Writer writer = SequenceFile.createWriter(
264                                    fileSystem, conf, inputPath, DoubleWritable.class, population
265                                                    .iterator().next().getClass());
266                    key.set(Double.NaN);
267                    for (final I individual : population) {
268                            writer.append(key, individual);
269                    }
270                    writer.close();
271            }
272    
273            /**
274             * Needs to be overridden to do the actual optimization.
275             */
276            protected abstract void doOptimize() throws Exception;
277    
278            Job createJob(final long cycle) throws IOException, URISyntaxException {
279                    final Job job = new Job(conf);
280                    job.setJobName(getClass().getSimpleName() + " cycle " + cycle);
281                    job.setJarByClass(SelectionReducer.class);
282    
283                    // reducer
284                    job.setCombinerClass(SelectionReducer.class);
285                    job.setReducerClass(SelectionReducer.class);
286                    SelectionReducer.setParents(job, getConfiguration().getParents());
287                    job.setNumReduceTasks(1);
288    
289                    // output
290                    job.setOutputFormatClass(SequenceFileOutputFormat.class);
291                    job.setOutputKeyClass(DoubleWritable.class);
292                    job.setOutputValueClass(valueClass);
293    
294                    return job;
295            }
296    
297            void executeJob(final Job job) throws IOException, InterruptedException,
298                            ClassNotFoundException {
299                    if (job.waitForCompletion(true)) {
300                            return;
301                    }
302                    throw new RuntimeException("Job failed.");
303            }
304    
305            int getMaxMapTasks() throws IOException {
306                    int maxMapTasks = conf.getInt("hadooptimizer.maxMapTasks", 0);
307                    if (maxMapTasks <= 0) {
308                            maxMapTasks = new JobClient(new JobConf(conf)).getClusterStatus()
309                                            .getMaxMapTasks();
310                    }
311                    return maxMapTasks;
312            }
313    }