001    /*
002     * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     *     http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package de.kumpe.hadooptimizer.hadoop;
017    
018    import java.io.IOException;
019    import java.util.ArrayList;
020    import java.util.List;
021    
022    import org.apache.hadoop.mapreduce.InputSplit;
023    import org.apache.hadoop.mapreduce.Job;
024    import org.apache.hadoop.mapreduce.JobContext;
025    import org.apache.hadoop.mapreduce.RecordReader;
026    import org.apache.hadoop.mapreduce.TaskAttemptContext;
027    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
028    import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
029    
030    public final class DuplicatingSequenceFileInputFormat<K, V> extends
031                    SequenceFileInputFormat<K, V> {
032            private final static class DuplicatingRecordReader<K, V> extends
033                            SequenceFileRecordReader<K, V> {
034                    private final int outputFactor;
035                    private int count;
036    
037                    public DuplicatingRecordReader(final int outputFactor) {
038                            this.outputFactor = outputFactor;
039                    }
040    
041                    @Override
042                    public void initialize(final InputSplit split,
043                                    final TaskAttemptContext context) throws IOException,
044                                    InterruptedException {
045                            super.initialize(split, context);
046                            count = outputFactor;
047                    }
048    
049                    @Override
050                    public boolean nextKeyValue() throws IOException, InterruptedException {
051                            if (count < outputFactor) {
052                                    count++;
053                                    return true;
054                            }
055                            if (super.nextKeyValue()) {
056                                    count = 1;
057                                    return true;
058                            }
059                            return false;
060                    }
061            }
062    
063            private static final String OUTPUT_FACTOR = "de.kumpe.hadooptimizer.hadoop.DuplicatingSequenceFileInputFormat.outputFactor";
064            private static final String SPLIT_REPLICATION_FACTOR = "de.kumpe.hadooptimizer.hadoop.DuplicatingSequenceFileInputFormat.splitReplicationFactor";
065    
066            static void setOutputFactor(final Job job, final int outputFactor) {
067                    job.getConfiguration().setInt(OUTPUT_FACTOR, outputFactor);
068            }
069    
070            static void setSplitReplicationFactor(final Job job,
071                            final int splitReplicationFactor) {
072                    job.getConfiguration().setInt(SPLIT_REPLICATION_FACTOR,
073                                    splitReplicationFactor);
074            }
075    
076            @Override
077            public RecordReader<K, V> createRecordReader(final InputSplit split,
078                            final TaskAttemptContext context) throws IOException {
079                    return new DuplicatingRecordReader<K, V>(context.getConfiguration()
080                                    .getInt(OUTPUT_FACTOR, 1));
081            };
082    
083            @Override
084            public List<InputSplit> getSplits(final JobContext job) throws IOException {
085                    final int splitReplicationFactor = job.getConfiguration().getInt(
086                                    SPLIT_REPLICATION_FACTOR, 1);
087    
088                    // get splits from superclass
089                    final List<InputSplit> splits = super.getSplits(job);
090                    final List<InputSplit> replicatedSplits = new ArrayList<InputSplit>(
091                                    splits.size() * splitReplicationFactor);
092    
093                    // replicate splits
094                    for (int i = 0; i < splitReplicationFactor; i++) {
095                            replicatedSplits.addAll(splits);
096                    }
097    
098                    return replicatedSplits;
099            }
100    }