001 /*
002 * Copyright 2011 Christian Kumpe http://kumpe.de/christian/java
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package de.kumpe.hadooptimizer.hadoop;
017
018 import java.io.IOException;
019 import java.util.ArrayList;
020 import java.util.List;
021
022 import org.apache.hadoop.mapreduce.InputSplit;
023 import org.apache.hadoop.mapreduce.Job;
024 import org.apache.hadoop.mapreduce.JobContext;
025 import org.apache.hadoop.mapreduce.RecordReader;
026 import org.apache.hadoop.mapreduce.TaskAttemptContext;
027 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
028 import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
029
030 public final class DuplicatingSequenceFileInputFormat<K, V> extends
031 SequenceFileInputFormat<K, V> {
032 private final static class DuplicatingRecordReader<K, V> extends
033 SequenceFileRecordReader<K, V> {
034 private final int outputFactor;
035 private int count;
036
037 public DuplicatingRecordReader(final int outputFactor) {
038 this.outputFactor = outputFactor;
039 }
040
041 @Override
042 public void initialize(final InputSplit split,
043 final TaskAttemptContext context) throws IOException,
044 InterruptedException {
045 super.initialize(split, context);
046 count = outputFactor;
047 }
048
049 @Override
050 public boolean nextKeyValue() throws IOException, InterruptedException {
051 if (count < outputFactor) {
052 count++;
053 return true;
054 }
055 if (super.nextKeyValue()) {
056 count = 1;
057 return true;
058 }
059 return false;
060 }
061 }
062
063 private static final String OUTPUT_FACTOR = "de.kumpe.hadooptimizer.hadoop.DuplicatingSequenceFileInputFormat.outputFactor";
064 private static final String SPLIT_REPLICATION_FACTOR = "de.kumpe.hadooptimizer.hadoop.DuplicatingSequenceFileInputFormat.splitReplicationFactor";
065
066 static void setOutputFactor(final Job job, final int outputFactor) {
067 job.getConfiguration().setInt(OUTPUT_FACTOR, outputFactor);
068 }
069
070 static void setSplitReplicationFactor(final Job job,
071 final int splitReplicationFactor) {
072 job.getConfiguration().setInt(SPLIT_REPLICATION_FACTOR,
073 splitReplicationFactor);
074 }
075
076 @Override
077 public RecordReader<K, V> createRecordReader(final InputSplit split,
078 final TaskAttemptContext context) throws IOException {
079 return new DuplicatingRecordReader<K, V>(context.getConfiguration()
080 .getInt(OUTPUT_FACTOR, 1));
081 };
082
083 @Override
084 public List<InputSplit> getSplits(final JobContext job) throws IOException {
085 final int splitReplicationFactor = job.getConfiguration().getInt(
086 SPLIT_REPLICATION_FACTOR, 1);
087
088 // get splits from superclass
089 final List<InputSplit> splits = super.getSplits(job);
090 final List<InputSplit> replicatedSplits = new ArrayList<InputSplit>(
091 splits.size() * splitReplicationFactor);
092
093 // replicate splits
094 for (int i = 0; i < splitReplicationFactor; i++) {
095 replicatedSplits.addAll(splits);
096 }
097
098 return replicatedSplits;
099 }
100 }