blob: db156d2ce25510392051e1726be4b28d688e32ee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.split;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.tez.common.MRFrameworkConfigs;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
/**
* A utility that reads the split meta info and creates split meta info objects
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class SplitMetaInfoReaderTez {
public static final Logger LOG = LoggerFactory.getLogger(SplitMetaInfoReaderTez.class);
public static final int META_SPLIT_VERSION = JobSplit.META_SPLIT_VERSION;
public static final byte[] META_SPLIT_FILE_HEADER = JobSplit.META_SPLIT_FILE_HEADER;
private static FSDataInputStream getFSDataIS(Configuration conf,
FileSystem fs) throws IOException {
long maxMetaInfoSize = conf.getLong(
MRJobConfig.SPLIT_METAINFO_MAXSIZE,
MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
FSDataInputStream in = null;
// TODO NEWTEZ Figure out how this can be improved. i.e. access from context instead of setting in conf ?
String basePath = conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, ".");
LOG.info("Attempting to find splits in dir: " + basePath);
Path metaSplitFile = new Path(
basePath,
MRJobConfig.JOB_SPLIT_METAINFO);
File file = new File(metaSplitFile.toUri().getPath()).getAbsoluteFile();
if (LOG.isDebugEnabled()) {
LOG.debug("Setting up JobSplitIndex with JobSplitFile at: "
+ file.getAbsolutePath() + ", defaultFS from conf: "
+ FileSystem.getDefaultUri(conf));
}
FileStatus fStatus = null;
try {
fStatus = fs.getFileStatus(metaSplitFile);
if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
throw new IOException("Split metadata size exceeded " + maxMetaInfoSize
+ ". Aborting job ");
}
in = fs.open(metaSplitFile);
byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
in.readFully(header);
if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
throw new IOException("Invalid header on split file");
}
int vers = WritableUtils.readVInt(in);
if (vers != JobSplit.META_SPLIT_VERSION) {
throw new IOException("Unsupported split version " + vers);
}
} catch (IOException e) {
if (in != null) {
in.close();
}
throw e;
}
return in;
}
// Forked from the MR variant so that the metaInfo file as well as the split
// file can be read from local fs - relying on these files being localized.
public static TaskSplitMetaInfo[] readSplitMetaInfo(Configuration conf,
FileSystem fs) throws IOException {
FSDataInputStream in = null;
try {
in = getFSDataIS(conf, fs);
final String jobSplitFile = MRJobConfig.JOB_SPLIT;
final String basePath = conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, ".");
int numSplits = WritableUtils.readVInt(in); // TODO: check for insane values
JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = new JobSplit.TaskSplitMetaInfo[numSplits];
for (int i = 0; i < numSplits; i++) {
JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
splitMetaInfo.readFields(in);
JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
new Path(basePath, jobSplitFile)
.toUri().toString(), splitMetaInfo.getStartOffset());
allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength());
}
return allSplitMetaInfo;
} finally {
if (in != null) {
in.close();
}
}
}
/**
* Get the split meta info for the task with a specific index. This method
* reduces the overhead of creating meta objects below the index of the task.
*
* @param conf job configuration.
* @param fs FileSystem.
* @param index the index of the task.
* @return split meta info object of the task.
* @throws IOException
*/
public static TaskSplitMetaInfo getSplitMetaInfo(Configuration conf,
FileSystem fs, int index) throws IOException {
FSDataInputStream in = null;
try {
in = getFSDataIS(conf, fs);
final String jobSplitFile = MRJobConfig.JOB_SPLIT;
final String basePath =
conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, ".");
final int numSplits = WritableUtils.readVInt(in); // TODO: check for insane values
if (numSplits <= index) {
throw new IOException("Index is larger than the number of splits");
}
JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
int iter = 0;
while (iter++ <= index) {
splitMetaInfo.readFields(in);
}
JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
new Path(basePath, jobSplitFile)
.toUri().toString(), splitMetaInfo.getStartOffset());
return new JobSplit.TaskSplitMetaInfo(splitIndex,
splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength());
} finally {
if (in != null) {
in.close();
}
}
}
}