Revert "TEZ-4397 Open Tez Input splits asynchronously"

This reverts commit f724c546069885e29e6446813805bb63bf0d5d9d.
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
index 6266ec1..61ba560 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
@@ -19,16 +19,8 @@
 package org.apache.hadoop.mapred.split;
 
 import java.io.IOException;
-import java.util.LinkedList;
 import java.util.Objects;
-import java.util.Queue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -137,58 +129,14 @@
     int idx = 0;
     long progress;
     RecordReader<K, V> curReader;
-    final AtomicInteger initIndex;
-    final int numReaders;
-    final ExecutorService initReaderExecService;
-    final Queue<Future<RecordReader<K,V>>> initedReaders;
-
+    
     public TezGroupedSplitsRecordReader(TezGroupedSplit split, JobConf job,
         Reporter reporter) throws IOException {
       this.groupedSplit = split;
       this.job = job;
       this.reporter = reporter;
-      this.initIndex = new AtomicInteger(0);
-      int numThreads = conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS,
-          TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT);
-      this.numReaders = conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS,
-          TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT);
-      this.initReaderExecService = Executors.newFixedThreadPool(numThreads,
-          new ThreadFactoryBuilder()
-              .setDaemon(true)
-              .setPriority(Thread.MAX_PRIORITY)
-              .setNameFormat("TEZ-Split-Init-Thread-%d")
-              .build());
-      this.initedReaders = new LinkedList<>();
-      preInitReaders();
       initNextRecordReader();
     }
-
-    private void preInitReaders() {
-      if (initReaderExecService == null) {
-        LOG.info("Init record reader threadpool is not initialized");
-        return;
-      }
-      for (int i = 0; i < numReaders; i++) {
-        initedReaders.offer(this.initReaderExecService.submit(() -> {
-          try {
-            int index = initIndex.getAndIncrement();
-            if (index >= groupedSplit.wrappedSplits.size()) {
-              return null;
-            }
-            InputSplit s = groupedSplit.wrappedSplits.get(index);
-            RecordReader<K, V> reader = wrappedInputFormat.getRecordReader(s, job, reporter);
-            LOG.debug("Init Thread processed reader number {} initialization", index);
-            return reader;
-          } catch (Exception e) {
-            if (e instanceof InterruptedException) {
-              Thread.currentThread().interrupt();
-            }
-            cancelsFutures();
-            throw new RuntimeException(e);
-          }
-        }));
-      }
-    }
     
     @Override
     public boolean next(K key, V value) throws IOException {
@@ -235,8 +183,6 @@
 
       // if all chunks have been processed, nothing more to do.
       if (idx == groupedSplit.wrappedSplits.size()) {
-        LOG.info("Shutting down the init record reader threadpool");
-        initReaderExecService.shutdownNow();
         return false;
       }
 
@@ -247,25 +193,15 @@
 
       // get a record reader for the idx-th chunk
       try {
-        curReader = initedReaders.poll().get();
-        preInitReaders();
+        curReader = wrappedInputFormat.getRecordReader(
+            groupedSplit.wrappedSplits.get(idx), job, reporter);
       } catch (Exception e) {
-        if (e instanceof InterruptedException) {
-          Thread.currentThread().interrupt();
-        }
-        cancelsFutures();
-        throw new RuntimeException(e);
+        throw new RuntimeException (e);
       }
       idx++;
       return true;
     }
 
-    private void cancelsFutures() {
-      for (Future<RecordReader<K,V>> f : initedReaders) {
-        f.cancel(true);
-      }
-    }
-
     @Override
     public long getPos() throws IOException {
       long subprogress = 0;    // bytes processed in current split
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
index 3b2f17d..a1d6b6c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
@@ -102,17 +102,6 @@
   public static final String TEZ_GROUPING_NODE_LOCAL_ONLY = "tez.grouping.node.local.only";
   public static final boolean TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT = false;
 
-  /**
-   * Number of threads used to initialize the grouped splits, to asynchronously open the readers.
-   */
-  public static final String TEZ_GROUPING_SPLIT_INIT_THREADS = "tez.grouping.split.init-threads";
-  public static final int TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT = 4;
-
-  /**
-   * Number of record readers to asynchronously and proactively init.
-   */
-  public static final String TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS = "tez.grouping.split.init.num-recordreaders";
-  public static final int TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT = 10;
 
   static class LocationHolder {
     List<SplitContainer> splits;