001 /*
002 * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package de.kumpe.hadooptimizer.hadoop;
017
018 import java.io.IOException;
019
020 import org.apache.hadoop.mapreduce.Job;
021 import org.apache.hadoop.mapreduce.Reducer;
022
023 /**
024 * A {@link Reducer} implementation for the selection of an evolutionary
025 * algorithm. It outputs the best evaluated individuals and discards the rest.
026 *
027 * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a>
028 */
029 public final class SelectionReducer<K, V> extends Reducer<K, V, K, V> {
030 private static final String PARENTS = "de.kumpe.hadooptimizer.hadoop.SelectionReducer.parents";
031
032 public static void setParents(final Job job, final long survivors) {
033 job.getConfiguration().setLong(PARENTS, survivors);
034 }
035
036 private long parents;
037
038 @Override
039 protected void setup(final Context context) throws IOException,
040 InterruptedException {
041 parents = context.getConfiguration().getLong(PARENTS, 1);
042
043 context.getCounter(HadoOptimizerBase.COUNTER_GROUP,
044 "SelectionReducer instances").increment(1);
045 }
046
047 @Override
048 public void run(final Context context) throws IOException,
049 InterruptedException {
050 setup(context);
051
052 long counter = 0;
053 loopOverKeys: while (context.nextKey()) {
054 for (final V value : context.getValues()) {
055 if (counter >= parents) {
056 break loopOverKeys;
057 }
058
059 context.getCounter(HadoOptimizerBase.COUNTER_GROUP,
060 "SelectionReducer writes").increment(1);
061
062 context.write(context.getCurrentKey(), value);
063 counter++;
064 }
065 }
066
067 cleanup(context);
068 }
069 }