001 /* 002 * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package de.kumpe.hadooptimizer.hadoop; 017 018 import java.io.IOException; 019 import java.util.ArrayList; 020 import java.util.Collection; 021 022 import org.apache.hadoop.io.DoubleWritable; 023 import org.apache.hadoop.mapreduce.TaskAttemptContext; 024 025 import de.kumpe.hadooptimizer.EsIndividual; 026 import de.kumpe.hadooptimizer.EsOptimizerConfiguration; 027 import de.kumpe.hadooptimizer.EvaluationResult; 028 import de.kumpe.hadooptimizer.Evaluator; 029 import de.kumpe.hadooptimizer.Halter; 030 import de.kumpe.hadooptimizer.impl.MemoryPopulationReader; 031 import de.kumpe.hadooptimizer.impl.MemoryPopulationWriter; 032 import de.kumpe.hadooptimizer.impl.NeedsRandomWrapperBase; 033 import de.kumpe.hadooptimizer.simple.SimpleEsOptimizer; 034 035 /** 036 * @see EsMultiPopulationsHadoOptimizer 037 * 038 * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a> 039 */ 040 public final class EvolutionCycleMapper extends 041 MapperWithOptimizerConfiguration<EsIndividual> { 042 private static final class ProgressReportingEvaluatorWrapper extends 043 NeedsRandomWrapperBase<Evaluator<EsIndividual>> implements 044 Evaluator<EsIndividual> { 045 private static final long serialVersionUID = 1L; 046 047 private final TaskAttemptContext context; 048 049 public ProgressReportingEvaluatorWrapper( 050 final Evaluator<EsIndividual> delegate, 051 final TaskAttemptContext context) { 052 super(delegate); 053 054 this.context = context; 055 } 056 057 @Override 058 public double evaluate(final EsIndividual individual) { 059 final double result = delegate.evaluate(individual); 060 context.progress(); 061 return result; 062 } 063 } 064 065 private static final class SetStatusHalterWrapper extends 066 NeedsRandomWrapperBase<Halter<EsIndividual>> implements 067 Halter<EsIndividual> { 068 private static final long serialVersionUID = 1L; 069 070 private final TaskAttemptContext context; 071 private long counter; 072 073 public SetStatusHalterWrapper(final Halter<EsIndividual> delegate, 074 final TaskAttemptContext context) { 075 super(delegate); 076 077 this.context = context; 078 } 079 080 @Override 081 public boolean halt( 082 final Collection<EvaluationResult<EsIndividual>> evaluationResults) { 083 try { 084 context.setStatus("Finished cycle " + counter); 085 } catch (final IOException e) { 086 throw new RuntimeException(e); 087 } 088 counter++; 089 return delegate.halt(evaluationResults); 090 } 091 } 092 093 @Override 094 public void run(final Context context) throws IOException, 095 InterruptedException { 096 setup(context); 097 098 wrapEvaluatorForReportingProgress(context); 099 wrapHalterForSettingStatus(context); 100 101 readAndSetPopulation(context); 102 103 // create output 104 final MemoryPopulationWriter<EsIndividual> buf = new MemoryPopulationWriter<EsIndividual>(); 105 configuration.setPopulationWriter(buf); 106 107 // create and start optimizer 108 final SimpleEsOptimizer optimizer = new SimpleEsOptimizer( 109 (EsOptimizerConfiguration) configuration); 110 optimizer.optimize(); 111 112 // write results 113 final DoubleWritable key = new DoubleWritable(); 114 for (final EsIndividual individual : buf.getPopulation()) { 115 key.set(configuration.getEvaluator().evaluate(individual)); 116 117 context.getCounter(HadoOptimizerBase.COUNTER_GROUP, 118 "EvolutionCycleMapper writes").increment(1); 119 120 context.write(key, individual); 121 } 122 123 cleanup(context); 124 } 125 126 private void wrapEvaluatorForReportingProgress(final Context context) { 127 final Evaluator<EsIndividual> delegate = configuration.getEvaluator(); 128 configuration.setEvaluator(new ProgressReportingEvaluatorWrapper( 129 delegate, context)); 130 } 131 132 private void wrapHalterForSettingStatus(final Context context) { 133 final Halter<EsIndividual> delegate = configuration.getHalter(); 134 configuration.setHalter(new SetStatusHalterWrapper(delegate, context)); 135 } 136 137 private void readAndSetPopulation(final Context context) 138 throws IOException, InterruptedException { 139 final Collection<EsIndividual> population = new ArrayList<EsIndividual>(); 140 while (context.nextKeyValue()) { 141 context.getCounter(HadoOptimizerBase.COUNTER_GROUP, 142 "EvolutionCycleMapper reads").increment(1); 143 144 population.add(context.getCurrentValue()); 145 } 146 configuration 147 .setPopulationReader(new MemoryPopulationReader<EsIndividual>( 148 population)); 149 } 150 }