TEZ-4397 Open Tez Input splits asynchronously
Contributed by Ramesh Kumar Thangarajan
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
index 61ba560..6266ec1 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
@@ -19,8 +19,16 @@
package org.apache.hadoop.mapred.split;
import java.io.IOException;
+import java.util.LinkedList;
import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -129,14 +137,58 @@
int idx = 0;
long progress;
RecordReader<K, V> curReader;
-
+ final AtomicInteger initIndex;
+ final int numReaders;
+ final ExecutorService initReaderExecService;
+ final Queue<Future<RecordReader<K,V>>> initedReaders;
+
public TezGroupedSplitsRecordReader(TezGroupedSplit split, JobConf job,
Reporter reporter) throws IOException {
this.groupedSplit = split;
this.job = job;
this.reporter = reporter;
+ this.initIndex = new AtomicInteger(0);
+ int numThreads = conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS,
+ TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT);
+ this.numReaders = conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS,
+ TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT);
+ this.initReaderExecService = Executors.newFixedThreadPool(numThreads,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setPriority(Thread.MAX_PRIORITY)
+ .setNameFormat("TEZ-Split-Init-Thread-%d")
+ .build());
+ this.initedReaders = new LinkedList<>();
+ preInitReaders();
initNextRecordReader();
}
+
+ private void preInitReaders() {
+ if (initReaderExecService == null) {
+ LOG.info("Init record reader threadpool is not initialized");
+ return;
+ }
+ for (int i = 0; i < numReaders; i++) {
+ initedReaders.offer(this.initReaderExecService.submit(() -> {
+ try {
+ int index = initIndex.getAndIncrement();
+ if (index >= groupedSplit.wrappedSplits.size()) {
+ return null;
+ }
+ InputSplit s = groupedSplit.wrappedSplits.get(index);
+ RecordReader<K, V> reader = wrappedInputFormat.getRecordReader(s, job, reporter);
+ LOG.debug("Init Thread processed reader number {} initialization", index);
+ return reader;
+ } catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ cancelsFutures();
+ throw new RuntimeException(e);
+ }
+ }));
+ }
+ }
@Override
public boolean next(K key, V value) throws IOException {
@@ -183,6 +235,8 @@
// if all chunks have been processed, nothing more to do.
if (idx == groupedSplit.wrappedSplits.size()) {
+ LOG.info("Shutting down the init record reader threadpool");
+ initReaderExecService.shutdownNow();
return false;
}
@@ -193,15 +247,25 @@
// get a record reader for the idx-th chunk
try {
- curReader = wrappedInputFormat.getRecordReader(
- groupedSplit.wrappedSplits.get(idx), job, reporter);
+ curReader = initedReaders.poll().get();
+ preInitReaders();
} catch (Exception e) {
- throw new RuntimeException (e);
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ cancelsFutures();
+ throw new RuntimeException(e);
}
idx++;
return true;
}
+ private void cancelsFutures() {
+ for (Future<RecordReader<K,V>> f : initedReaders) {
+ f.cancel(true);
+ }
+ }
+
@Override
public long getPos() throws IOException {
long subprogress = 0; // bytes processed in current split
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
index a1d6b6c..3b2f17d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
@@ -102,6 +102,17 @@
public static final String TEZ_GROUPING_NODE_LOCAL_ONLY = "tez.grouping.node.local.only";
public static final boolean TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT = false;
+ /**
+ * Number of threads used to initialize the grouped splits, to asynchronously open the readers.
+ */
+ public static final String TEZ_GROUPING_SPLIT_INIT_THREADS = "tez.grouping.split.init-threads";
+ public static final int TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT = 4;
+
+ /**
+ * Number of record readers to asynchronously and proactively init.
+ */
+ public static final String TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS = "tez.grouping.split.init.num-recordreaders";
+ public static final int TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT = 10;
static class LocationHolder {
List<SplitContainer> splits;