TEZ-3391. Optimize single split MR split reader

Signed-off-by: Jonathan Eagles <jeagles@apache.org>
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
index 0d703e0..db156d2 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
@@ -48,24 +48,19 @@
   public static final int META_SPLIT_VERSION = JobSplit.META_SPLIT_VERSION;
   public static final byte[] META_SPLIT_FILE_HEADER = JobSplit.META_SPLIT_FILE_HEADER;
 
-
-  // Forked from the MR variant so that the metaInfo file as well as the split
-  // file can be read from local fs - relying on these files being localized.
-  public static TaskSplitMetaInfo[] readSplitMetaInfo(Configuration conf,
+  private static FSDataInputStream getFSDataIS(Configuration conf,
       FileSystem fs) throws IOException {
-
     long maxMetaInfoSize = conf.getLong(
         MRJobConfig.SPLIT_METAINFO_MAXSIZE,
         MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
-
+    FSDataInputStream in = null;
     // TODO NEWTEZ Figure out how this can be improved. i.e. access from context instead of setting in conf ?
     String basePath = conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, ".");
     LOG.info("Attempting to find splits in dir: " + basePath);
-    
+
     Path metaSplitFile = new Path(
         basePath,
         MRJobConfig.JOB_SPLIT_METAINFO);
-    String jobSplitFile = MRJobConfig.JOB_SPLIT;
 
     File file = new File(metaSplitFile.toUri().getPath()).getAbsoluteFile();
     if (LOG.isDebugEnabled()) {
@@ -74,34 +69,96 @@
           + FileSystem.getDefaultUri(conf));
     }
 
-    FileStatus fStatus = fs.getFileStatus(metaSplitFile);
-    if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
-      throw new IOException("Split metadata size exceeded " + maxMetaInfoSize
-          + ". Aborting job ");
+    FileStatus fStatus = null;
+    try {
+      fStatus = fs.getFileStatus(metaSplitFile);
+      if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
+        throw new IOException("Split metadata size exceeded " + maxMetaInfoSize
+            + ". Aborting job ");
+      }
+      in = fs.open(metaSplitFile);
+      byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
+      in.readFully(header);
+      if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
+        throw new IOException("Invalid header on split file");
+      }
+      int vers = WritableUtils.readVInt(in);
+      if (vers != JobSplit.META_SPLIT_VERSION) {
+        throw new IOException("Unsupported split version " + vers);
+      }
+    } catch (IOException e) {
+      if (in != null) {
+        in.close();
+      }
+      throw e;
     }
-    FSDataInputStream in = fs.open(metaSplitFile);
-    byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
-    in.readFully(header);
-    if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
-      throw new IOException("Invalid header on split file");
+    return in;
+  }
+
+  // Forked from the MR variant so that the metaInfo file as well as the split
+  // file can be read from local fs - relying on these files being localized.
+  public static TaskSplitMetaInfo[] readSplitMetaInfo(Configuration conf,
+      FileSystem fs) throws IOException {
+    FSDataInputStream in = null;
+    try {
+      in = getFSDataIS(conf, fs);
+      final String jobSplitFile = MRJobConfig.JOB_SPLIT;
+      final String basePath = conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, ".");
+      int numSplits = WritableUtils.readVInt(in); // TODO: check for insane values
+      JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = new JobSplit.TaskSplitMetaInfo[numSplits];
+      for (int i = 0; i < numSplits; i++) {
+        JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
+        splitMetaInfo.readFields(in);
+        JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
+            new Path(basePath, jobSplitFile)
+                .toUri().toString(), splitMetaInfo.getStartOffset());
+        allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
+            splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength());
+      }
+      return allSplitMetaInfo;
+    } finally {
+      if (in != null) {
+        in.close();
+      }
     }
-    int vers = WritableUtils.readVInt(in);
-    if (vers != JobSplit.META_SPLIT_VERSION) {
-      in.close();
-      throw new IOException("Unsupported split version " + vers);
-    }
-    int numSplits = WritableUtils.readVInt(in); // TODO: check for insane values
-    JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = new JobSplit.TaskSplitMetaInfo[numSplits];
-    for (int i = 0; i < numSplits; i++) {
+  }
+
+  /**
+   * Get the split meta info for the task with a specific index. This method
+   * reduces the overhead of creating meta objects below the index of the task.
+   *
+   * @param conf job configuration.
+   * @param fs FileSystem.
+   * @param index the index of the task.
+   * @return split meta info object of the task.
+   * @throws IOException
+   */
+  public static TaskSplitMetaInfo getSplitMetaInfo(Configuration conf,
+      FileSystem fs, int index) throws IOException {
+    FSDataInputStream in = null;
+    try {
+      in = getFSDataIS(conf, fs);
+      final String jobSplitFile = MRJobConfig.JOB_SPLIT;
+      final String basePath =
+          conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, ".");
+      final int numSplits = WritableUtils.readVInt(in); // TODO: check for insane values
+      if (numSplits <= index) {
+        throw new IOException("Index is larger than the number of splits");
+      }
       JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
-      splitMetaInfo.readFields(in);
+      int iter = 0;
+      while (iter++ <= index) {
+        splitMetaInfo.readFields(in);
+      }
       JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
           new Path(basePath, jobSplitFile)
               .toUri().toString(), splitMetaInfo.getStartOffset());
-      allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
+      return new JobSplit.TaskSplitMetaInfo(splitIndex,
           splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength());
+    } finally {
+      if (in != null) {
+        in.close();
+      }
     }
-    in.close();
-    return allSplitMetaInfo;
   }
 }
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 248a92a..317f6eb 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -490,8 +490,8 @@
               getContext());
         }
       } else {
-        TaskSplitMetaInfo[] allMetaInfo = MRInputUtils.readSplits(jobConf);
-        TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[getContext().getTaskIndex()];
+        TaskSplitMetaInfo thisTaskMetaInfo = MRInputUtils.getSplits(jobConf,
+            getContext().getTaskIndex());
         TaskSplitIndex splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
             thisTaskMetaInfo.getStartOffset());
         long splitLength = -1;
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRInputUtils.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRInputUtils.java
index bc96e38..a2b87e0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRInputUtils.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRInputUtils.java
@@ -47,11 +47,10 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(MRInputUtils.class);
 
-  public static TaskSplitMetaInfo[] readSplits(Configuration conf) throws IOException {
-    TaskSplitMetaInfo[] allTaskSplitMetaInfo;
-    allTaskSplitMetaInfo = SplitMetaInfoReaderTez
-        .readSplitMetaInfo(conf, FileSystem.getLocal(conf));
-    return allTaskSplitMetaInfo;
+  public static TaskSplitMetaInfo getSplits(Configuration conf, int index) throws IOException {
+    TaskSplitMetaInfo taskSplitMInfo = SplitMetaInfoReaderTez
+        .getSplitMetaInfo(conf, FileSystem.getLocal(conf), index);
+    return taskSplitMInfo;
   }
 
   public static org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromEvent(