001    /*
002     * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     *     http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package de.kumpe.hadooptimizer.hadoop;
017    
018    import java.io.IOException;
019    import java.util.ArrayList;
020    import java.util.Collection;
021    
022    import org.apache.hadoop.io.DoubleWritable;
023    import org.apache.hadoop.mapreduce.TaskAttemptContext;
024    
025    import de.kumpe.hadooptimizer.EsIndividual;
026    import de.kumpe.hadooptimizer.EsOptimizerConfiguration;
027    import de.kumpe.hadooptimizer.EvaluationResult;
028    import de.kumpe.hadooptimizer.Evaluator;
029    import de.kumpe.hadooptimizer.Halter;
030    import de.kumpe.hadooptimizer.impl.MemoryPopulationReader;
031    import de.kumpe.hadooptimizer.impl.MemoryPopulationWriter;
032    import de.kumpe.hadooptimizer.impl.NeedsRandomWrapperBase;
033    import de.kumpe.hadooptimizer.simple.SimpleEsOptimizer;
034    
035    /**
036     * @see EsMultiPopulationsHadoOptimizer
037     * 
038     * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a>
039     */
040    public final class EvolutionCycleMapper extends
041                    MapperWithOptimizerConfiguration<EsIndividual> {
042            private static final class ProgressReportingEvaluatorWrapper extends
043                            NeedsRandomWrapperBase<Evaluator<EsIndividual>> implements
044                            Evaluator<EsIndividual> {
045                    private static final long serialVersionUID = 1L;
046    
047                    private final TaskAttemptContext context;
048    
049                    public ProgressReportingEvaluatorWrapper(
050                                    final Evaluator<EsIndividual> delegate,
051                                    final TaskAttemptContext context) {
052                            super(delegate);
053    
054                            this.context = context;
055                    }
056    
057                    @Override
058                    public double evaluate(final EsIndividual individual) {
059                            final double result = delegate.evaluate(individual);
060                            context.progress();
061                            return result;
062                    }
063            }
064    
065            private static final class SetStatusHalterWrapper extends
066                            NeedsRandomWrapperBase<Halter<EsIndividual>> implements
067                            Halter<EsIndividual> {
068                    private static final long serialVersionUID = 1L;
069    
070                    private final TaskAttemptContext context;
071                    private long counter;
072    
073                    public SetStatusHalterWrapper(final Halter<EsIndividual> delegate,
074                                    final TaskAttemptContext context) {
075                            super(delegate);
076    
077                            this.context = context;
078                    }
079    
080                    @Override
081                    public boolean halt(
082                                    final Collection<EvaluationResult<EsIndividual>> evaluationResults) {
083                            try {
084                                    context.setStatus("Finished cycle " + counter);
085                            } catch (final IOException e) {
086                                    throw new RuntimeException(e);
087                            }
088                            counter++;
089                            return delegate.halt(evaluationResults);
090                    }
091            }
092    
093            @Override
094            public void run(final Context context) throws IOException,
095                            InterruptedException {
096                    setup(context);
097    
098                    wrapEvaluatorForReportingProgress(context);
099                    wrapHalterForSettingStatus(context);
100    
101                    readAndSetPopulation(context);
102    
103                    // create output
104                    final MemoryPopulationWriter<EsIndividual> buf = new MemoryPopulationWriter<EsIndividual>();
105                    configuration.setPopulationWriter(buf);
106    
107                    // create and start optimizer
108                    final SimpleEsOptimizer optimizer = new SimpleEsOptimizer(
109                                    (EsOptimizerConfiguration) configuration);
110                    optimizer.optimize();
111    
112                    // write results
113                    final DoubleWritable key = new DoubleWritable();
114                    for (final EsIndividual individual : buf.getPopulation()) {
115                            key.set(configuration.getEvaluator().evaluate(individual));
116    
117                            context.getCounter(HadoOptimizerBase.COUNTER_GROUP,
118                                            "EvolutionCycleMapper writes").increment(1);
119    
120                            context.write(key, individual);
121                    }
122    
123                    cleanup(context);
124            }
125    
126            private void wrapEvaluatorForReportingProgress(final Context context) {
127                    final Evaluator<EsIndividual> delegate = configuration.getEvaluator();
128                    configuration.setEvaluator(new ProgressReportingEvaluatorWrapper(
129                                    delegate, context));
130            }
131    
132            private void wrapHalterForSettingStatus(final Context context) {
133                    final Halter<EsIndividual> delegate = configuration.getHalter();
134                    configuration.setHalter(new SetStatusHalterWrapper(delegate, context));
135            }
136    
137            private void readAndSetPopulation(final Context context)
138                            throws IOException, InterruptedException {
139                    final Collection<EsIndividual> population = new ArrayList<EsIndividual>();
140                    while (context.nextKeyValue()) {
141                            context.getCounter(HadoOptimizerBase.COUNTER_GROUP,
142                                            "EvolutionCycleMapper reads").increment(1);
143    
144                            population.add(context.getCurrentValue());
145                    }
146                    configuration
147                                    .setPopulationReader(new MemoryPopulationReader<EsIndividual>(
148                                                    population));
149            }
150    }