001    /*
002     * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     *     http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package de.kumpe.hadooptimizer.hadoop;
017    
018    import java.io.IOException;
019    
020    import org.apache.hadoop.mapreduce.Job;
021    import org.apache.hadoop.mapreduce.Reducer;
022    
023    /**
024     * A {@link Reducer} implementation for the selection of an evolutionary
025     * algorithm. It outputs the best evaluated individuals and discards the rest.
026     * 
027     * @author <a href="http://kumpe.de/christian/java">Christian Kumpe</a>
028     */
029    public final class SelectionReducer<K, V> extends Reducer<K, V, K, V> {
030            private static final String PARENTS = "de.kumpe.hadooptimizer.hadoop.SelectionReducer.parents";
031    
032            public static void setParents(final Job job, final long survivors) {
033                    job.getConfiguration().setLong(PARENTS, survivors);
034            }
035    
036            private long parents;
037    
038            @Override
039            protected void setup(final Context context) throws IOException,
040                            InterruptedException {
041                    parents = context.getConfiguration().getLong(PARENTS, 1);
042    
043                    context.getCounter(HadoOptimizerBase.COUNTER_GROUP,
044                                    "SelectionReducer instances").increment(1);
045            }
046    
047            @Override
048            public void run(final Context context) throws IOException,
049                            InterruptedException {
050                    setup(context);
051    
052                    long counter = 0;
053                    loopOverKeys: while (context.nextKey()) {
054                            for (final V value : context.getValues()) {
055                                    if (counter >= parents) {
056                                            break loopOverKeys;
057                                    }
058    
059                                    context.getCounter(HadoOptimizerBase.COUNTER_GROUP,
060                                                    "SelectionReducer writes").increment(1);
061    
062                                    context.write(context.getCurrentKey(), value);
063                                    counter++;
064                            }
065                    }
066    
067                    cleanup(context);
068            }
069    }