blob: fd6c0d04611d7b3f264013fa6900acdfeeffa875 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.mapred;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
/**
* UniformSizeInputFormat extends the InputFormat<> class, to produce
* input-splits for DistCp.
* It looks at the copy-listing and groups the contents into input-splits such
* that the total-number of bytes to be copied for each input split is
* uniform.
*/
public class UniformSizeInputFormat extends InputFormat<Text, FileStatus> {
private static final Log LOG
= LogFactory.getLog(UniformSizeInputFormat.class);
/**
* Implementation of InputFormat::getSplits(). Returns a list of InputSplits,
* such that the number of bytes to be copied for all the splits are
* approximately equal.
* @param context JobContext for the job.
* @return The list of uniformly-distributed input-splits.
* @throws IOException: On failure.
* @throws InterruptedException
*/
@Override
public List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
int numSplits = DistCpUtils.getInt(configuration,
JobContext.NUM_MAPS);
if (numSplits == 0) return new ArrayList<InputSplit>();
return getSplits(configuration, numSplits,
DistCpUtils.getLong(configuration,
DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED));
}
private List<InputSplit> getSplits(Configuration configuration, int numSplits,
long totalSizeBytes) throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
long nBytesPerSplit = (long) Math.ceil(totalSizeBytes * 1.0 / numSplits);
FileStatus srcFileStatus = new FileStatus();
Text srcRelPath = new Text();
long currentSplitSize = 0;
long lastSplitStart = 0;
long lastPosition = 0;
final Path listingFilePath = getListingFilePath(configuration);
if (LOG.isDebugEnabled()) {
LOG.debug("Average bytes per map: " + nBytesPerSplit +
", Number of maps: " + numSplits + ", total size: " + totalSizeBytes);
}
SequenceFile.Reader reader=null;
try {
reader = getListingFileReader(configuration);
while (reader.next(srcRelPath, srcFileStatus)) {
// If adding the current file would cause the bytes per map to exceed
// limit. Add the current file to new split
if (currentSplitSize + srcFileStatus.getLen() > nBytesPerSplit && lastPosition != 0) {
FileSplit split = new FileSplit(listingFilePath, lastSplitStart,
lastPosition - lastSplitStart, null);
if (LOG.isDebugEnabled()) {
LOG.debug ("Creating split : " + split + ", bytes in split: " + currentSplitSize);
}
splits.add(split);
lastSplitStart = lastPosition;
currentSplitSize = 0;
}
currentSplitSize += srcFileStatus.getLen();
lastPosition = reader.getPosition();
}
if (lastPosition > lastSplitStart) {
FileSplit split = new FileSplit(listingFilePath, lastSplitStart,
lastPosition - lastSplitStart, null);
if (LOG.isDebugEnabled()) {
LOG.info ("Creating split : " + split + ", bytes in split: " + currentSplitSize);
}
splits.add(split);
}
} finally {
IOUtils.closeStream(reader);
}
return splits;
}
private static Path getListingFilePath(Configuration configuration) {
final String listingFilePathString =
configuration.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
assert !listingFilePathString.equals("")
: "Couldn't find listing file. Invalid input.";
return new Path(listingFilePathString);
}
private SequenceFile.Reader getListingFileReader(Configuration configuration) {
final Path listingFilePath = getListingFilePath(configuration);
try {
final FileSystem fileSystem = listingFilePath.getFileSystem(configuration);
if (!fileSystem.exists(listingFilePath))
throw new IllegalArgumentException("Listing file doesn't exist at: "
+ listingFilePath);
return new SequenceFile.Reader(configuration,
SequenceFile.Reader.file(listingFilePath));
}
catch (IOException exception) {
LOG.error("Couldn't find listing file at: " + listingFilePath, exception);
throw new IllegalArgumentException("Couldn't find listing-file at: "
+ listingFilePath, exception);
}
}
/**
* Implementation of InputFormat::createRecordReader().
* @param split The split for which the RecordReader is sought.
* @param context The context of the current task-attempt.
* @return A SequenceFileRecordReader instance, (since the copy-listing is a
* simple sequence-file.)
* @throws IOException
* @throws InterruptedException
*/
@Override
public RecordReader<Text, FileStatus> createRecordReader(InputSplit split,
TaskAttemptContext context)
throws IOException, InterruptedException {
return new SequenceFileRecordReader<Text, FileStatus>();
}
}