001 /*
002 * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package de.kumpe.hadooptimizer.hadoop;
017
018 import java.io.IOException;
019 import java.util.ArrayList;
020 import java.util.Collection;
021
022 import org.apache.hadoop.io.DoubleWritable;
023 import org.apache.hadoop.mapreduce.TaskAttemptContext;
024
025 import de.kumpe.hadooptimizer.EsIndividual;
026 import de.kumpe.hadooptimizer.EsOptimizerConfiguration;
027 import de.kumpe.hadooptimizer.EvaluationResult;
028 import de.kumpe.hadooptimizer.Evaluator;
029 import de.kumpe.hadooptimizer.Halter;
030 import de.kumpe.hadooptimizer.impl.MemoryPopulationReader;
031 import de.kumpe.hadooptimizer.impl.MemoryPopulationWriter;
032 import de.kumpe.hadooptimizer.impl.NeedsRandomWrapperBase;
033 import de.kumpe.hadooptimizer.simple.SimpleEsOptimizer;
034
035 /**
036 * @see EsMultiPopulationsHadoOptimizer
037 *
038 * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a>
039 */
040 public final class EvolutionCycleMapper extends
041 MapperWithOptimizerConfiguration<EsIndividual> {
042 private static final class ProgressReportingEvaluatorWrapper extends
043 NeedsRandomWrapperBase<Evaluator<EsIndividual>> implements
044 Evaluator<EsIndividual> {
045 private static final long serialVersionUID = 1L;
046
047 private final TaskAttemptContext context;
048
049 public ProgressReportingEvaluatorWrapper(
050 final Evaluator<EsIndividual> delegate,
051 final TaskAttemptContext context) {
052 super(delegate);
053
054 this.context = context;
055 }
056
057 @Override
058 public double evaluate(final EsIndividual individual) {
059 final double result = delegate.evaluate(individual);
060 context.progress();
061 return result;
062 }
063 }
064
065 private static final class SetStatusHalterWrapper extends
066 NeedsRandomWrapperBase<Halter<EsIndividual>> implements
067 Halter<EsIndividual> {
068 private static final long serialVersionUID = 1L;
069
070 private final TaskAttemptContext context;
071 private long counter;
072
073 public SetStatusHalterWrapper(final Halter<EsIndividual> delegate,
074 final TaskAttemptContext context) {
075 super(delegate);
076
077 this.context = context;
078 }
079
080 @Override
081 public boolean halt(
082 final Collection<EvaluationResult<EsIndividual>> evaluationResults) {
083 try {
084 context.setStatus("Finished cycle " + counter);
085 } catch (final IOException e) {
086 throw new RuntimeException(e);
087 }
088 counter++;
089 return delegate.halt(evaluationResults);
090 }
091 }
092
093 @Override
094 public void run(final Context context) throws IOException,
095 InterruptedException {
096 setup(context);
097
098 wrapEvaluatorForReportingProgress(context);
099 wrapHalterForSettingStatus(context);
100
101 readAndSetPopulation(context);
102
103 // create output
104 final MemoryPopulationWriter<EsIndividual> buf = new MemoryPopulationWriter<EsIndividual>();
105 configuration.setPopulationWriter(buf);
106
107 // create and start optimizer
108 final SimpleEsOptimizer optimizer = new SimpleEsOptimizer(
109 (EsOptimizerConfiguration) configuration);
110 optimizer.optimize();
111
112 // write results
113 final DoubleWritable key = new DoubleWritable();
114 for (final EsIndividual individual : buf.getPopulation()) {
115 key.set(configuration.getEvaluator().evaluate(individual));
116
117 context.getCounter(HadoOptimizerBase.COUNTER_GROUP,
118 "EvolutionCycleMapper writes").increment(1);
119
120 context.write(key, individual);
121 }
122
123 cleanup(context);
124 }
125
126 private void wrapEvaluatorForReportingProgress(final Context context) {
127 final Evaluator<EsIndividual> delegate = configuration.getEvaluator();
128 configuration.setEvaluator(new ProgressReportingEvaluatorWrapper(
129 delegate, context));
130 }
131
132 private void wrapHalterForSettingStatus(final Context context) {
133 final Halter<EsIndividual> delegate = configuration.getHalter();
134 configuration.setHalter(new SetStatusHalterWrapper(delegate, context));
135 }
136
137 private void readAndSetPopulation(final Context context)
138 throws IOException, InterruptedException {
139 final Collection<EsIndividual> population = new ArrayList<EsIndividual>();
140 while (context.nextKeyValue()) {
141 context.getCounter(HadoOptimizerBase.COUNTER_GROUP,
142 "EvolutionCycleMapper reads").increment(1);
143
144 population.add(context.getCurrentValue());
145 }
146 configuration
147 .setPopulationReader(new MemoryPopulationReader<EsIndividual>(
148 population));
149 }
150 }