TEZ-3391. Optimize single split MR split reader
Signed-off-by: Jonathan Eagles <jeagles@apache.org>
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
index 0d703e0..db156d2 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
@@ -48,24 +48,19 @@
public static final int META_SPLIT_VERSION = JobSplit.META_SPLIT_VERSION;
public static final byte[] META_SPLIT_FILE_HEADER = JobSplit.META_SPLIT_FILE_HEADER;
-
- // Forked from the MR variant so that the metaInfo file as well as the split
- // file can be read from local fs - relying on these files being localized.
- public static TaskSplitMetaInfo[] readSplitMetaInfo(Configuration conf,
+ private static FSDataInputStream getFSDataIS(Configuration conf,
FileSystem fs) throws IOException {
-
long maxMetaInfoSize = conf.getLong(
MRJobConfig.SPLIT_METAINFO_MAXSIZE,
MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
-
+ FSDataInputStream in = null;
// TODO NEWTEZ Figure out how this can be improved. i.e. access from context instead of setting in conf ?
String basePath = conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, ".");
LOG.info("Attempting to find splits in dir: " + basePath);
-
+
Path metaSplitFile = new Path(
basePath,
MRJobConfig.JOB_SPLIT_METAINFO);
- String jobSplitFile = MRJobConfig.JOB_SPLIT;
File file = new File(metaSplitFile.toUri().getPath()).getAbsoluteFile();
if (LOG.isDebugEnabled()) {
@@ -74,34 +69,96 @@
+ FileSystem.getDefaultUri(conf));
}
- FileStatus fStatus = fs.getFileStatus(metaSplitFile);
- if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
- throw new IOException("Split metadata size exceeded " + maxMetaInfoSize
- + ". Aborting job ");
+ FileStatus fStatus = null;
+ try {
+ fStatus = fs.getFileStatus(metaSplitFile);
+ if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
+ throw new IOException("Split metadata size exceeded " + maxMetaInfoSize
+ + ". Aborting job ");
+ }
+ in = fs.open(metaSplitFile);
+ byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
+ in.readFully(header);
+ if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
+ throw new IOException("Invalid header on split file");
+ }
+ int vers = WritableUtils.readVInt(in);
+ if (vers != JobSplit.META_SPLIT_VERSION) {
+ throw new IOException("Unsupported split version " + vers);
+ }
+ } catch (IOException e) {
+ if (in != null) {
+ in.close();
+ }
+ throw e;
}
- FSDataInputStream in = fs.open(metaSplitFile);
- byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
- in.readFully(header);
- if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
- throw new IOException("Invalid header on split file");
+ return in;
+ }
+
+ // Forked from the MR variant so that the metaInfo file as well as the split
+ // file can be read from local fs - relying on these files being localized.
+ public static TaskSplitMetaInfo[] readSplitMetaInfo(Configuration conf,
+ FileSystem fs) throws IOException {
+ FSDataInputStream in = null;
+ try {
+ in = getFSDataIS(conf, fs);
+ final String jobSplitFile = MRJobConfig.JOB_SPLIT;
+ final String basePath = conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, ".");
+ int numSplits = WritableUtils.readVInt(in); // TODO: check for insane values
+ JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = new JobSplit.TaskSplitMetaInfo[numSplits];
+ for (int i = 0; i < numSplits; i++) {
+ JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
+ splitMetaInfo.readFields(in);
+ JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
+ new Path(basePath, jobSplitFile)
+ .toUri().toString(), splitMetaInfo.getStartOffset());
+ allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
+ splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength());
+ }
+ return allSplitMetaInfo;
+ } finally {
+ if (in != null) {
+ in.close();
+ }
}
- int vers = WritableUtils.readVInt(in);
- if (vers != JobSplit.META_SPLIT_VERSION) {
- in.close();
- throw new IOException("Unsupported split version " + vers);
- }
- int numSplits = WritableUtils.readVInt(in); // TODO: check for insane values
- JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = new JobSplit.TaskSplitMetaInfo[numSplits];
- for (int i = 0; i < numSplits; i++) {
+ }
+
+ /**
+ * Get the split meta info for the task with a specific index. This method
+ * reduces the overhead of creating meta objects below the index of the task.
+ *
+ * @param conf job configuration.
+ * @param fs FileSystem.
+ * @param index the index of the task.
+ * @return split meta info object of the task.
+ * @throws IOException
+ */
+ public static TaskSplitMetaInfo getSplitMetaInfo(Configuration conf,
+ FileSystem fs, int index) throws IOException {
+ FSDataInputStream in = null;
+ try {
+ in = getFSDataIS(conf, fs);
+ final String jobSplitFile = MRJobConfig.JOB_SPLIT;
+ final String basePath =
+ conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, ".");
+ final int numSplits = WritableUtils.readVInt(in); // TODO: check for insane values
+ if (numSplits <= index) {
+ throw new IOException("Index is larger than the number of splits");
+ }
JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
- splitMetaInfo.readFields(in);
+ int iter = 0;
+ while (iter++ <= index) {
+ splitMetaInfo.readFields(in);
+ }
JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
new Path(basePath, jobSplitFile)
.toUri().toString(), splitMetaInfo.getStartOffset());
- allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
+ return new JobSplit.TaskSplitMetaInfo(splitIndex,
splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength());
+ } finally {
+ if (in != null) {
+ in.close();
+ }
}
- in.close();
- return allSplitMetaInfo;
}
}
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 248a92a..317f6eb 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -490,8 +490,8 @@
getContext());
}
} else {
- TaskSplitMetaInfo[] allMetaInfo = MRInputUtils.readSplits(jobConf);
- TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[getContext().getTaskIndex()];
+ TaskSplitMetaInfo thisTaskMetaInfo = MRInputUtils.getSplits(jobConf,
+ getContext().getTaskIndex());
TaskSplitIndex splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
thisTaskMetaInfo.getStartOffset());
long splitLength = -1;
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRInputUtils.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRInputUtils.java
index bc96e38..a2b87e0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRInputUtils.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRInputUtils.java
@@ -47,11 +47,10 @@
private static final Logger LOG = LoggerFactory.getLogger(MRInputUtils.class);
- public static TaskSplitMetaInfo[] readSplits(Configuration conf) throws IOException {
- TaskSplitMetaInfo[] allTaskSplitMetaInfo;
- allTaskSplitMetaInfo = SplitMetaInfoReaderTez
- .readSplitMetaInfo(conf, FileSystem.getLocal(conf));
- return allTaskSplitMetaInfo;
+ public static TaskSplitMetaInfo getSplits(Configuration conf, int index) throws IOException {
+ TaskSplitMetaInfo taskSplitMInfo = SplitMetaInfoReaderTez
+ .getSplitMetaInfo(conf, FileSystem.getLocal(conf), index);
+ return taskSplitMInfo;
}
public static org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromEvent(