TEZ-2635. Limit number of attempts being downloaded in unordered fetch (rbalamohan)
(cherry picked from commit 3347c94f863867aab241db34aafa502dc6a1ff1b)
diff --git a/CHANGES.txt b/CHANGES.txt
index 20d31b2..b4ba939 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -215,6 +215,7 @@
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-2635. Limit number of attempts being downloaded in unordered fetch.
TEZ-2636. MRInput and MultiMRInput should work for cases when there are 0 physical inputs.
TEZ-2600. When used with HDFS federation(viewfs) ,tez will throw a error
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index b7067e5..5d1da7c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -135,8 +135,10 @@
private final boolean ifileReadAhead;
private final int ifileReadAheadLength;
- private final String srcNameTrimmed;
-
+ private final String srcNameTrimmed;
+
+ private final int maxTaskOutputAtOnce;
+
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final TezCounter shuffledInputsCounter;
@@ -235,6 +237,14 @@
this.localDisks = Iterables.toArray(
localDirAllocator.getAllLocalPathsToRead(".", conf), Path.class);
+ /**
+ * Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would
+ * be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL.
+ */
+ this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT)));
+
Arrays.sort(this.localDisks);
LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec="
@@ -243,7 +253,7 @@
+ ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength +", "
+ "localDiskFetchEnabled=" + localDiskFetchEnabled + ", "
+ "sharedFetchEnabled=" + sharedFetchEnabled + ", "
- + httpConnectionParams.toString());
+ + httpConnectionParams.toString() + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce);
}
public void run() throws IOException {
@@ -365,6 +375,7 @@
// remove from the obsolete list.
List<InputAttemptIdentifier> pendingInputsForHost = inputHost
.clearAndGetPendingInputs();
+ int includedMaps = 0;
for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
.iterator(); inputIter.hasNext();) {
InputAttemptIdentifier input = inputIter.next();
@@ -376,10 +387,20 @@
// Avoid adding attempts which have been marked as OBSOLETE
if (obsoletedInputs.contains(input)) {
inputIter.remove();
+ continue;
+ }
+
+ // Check if max threshold is met
+ if (includedMaps >= maxTaskOutputAtOnce) {
+ inputIter.remove();
+ inputHost.addKnownInput(input); //add to inputHost
+ } else {
+ includedMaps++;
}
}
- // TODO NEWTEZ Maybe limit the number of inputs being given to a single
- // fetcher, especially in the case where #hosts < #fetchers
+ if (inputHost.getNumPendingInputs() > 0) {
+ pendingHosts.add(inputHost); //add it to queue
+ }
fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(),
inputHost.getSrcPhysicalIndex(), pendingInputsForHost);
LOG.info("Created Fetcher for host: " + inputHost.getHost()
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 52e7334..74f0585 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -143,9 +143,13 @@
conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT);
- this.maxTaskOutputAtOnce = Math.max(1, conf.getInt(
+ /**
+ * Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would
+ * be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL.
+ */
+ this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT));
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT)));
this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS);
this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
@@ -157,7 +161,8 @@
+ ", reportReadErrorImmediately=" + reportReadErrorImmediately
+ ", maxFailedUniqueFetches=" + maxFailedUniqueFetches
+ ", abortFailureLimit=" + abortFailureLimit
- + ", maxMapRuntime=" + maxMapRuntime);
+ + ", maxMapRuntime=" + maxMapRuntime
+ + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce);
}
protected synchronized void updateEventReceivedTime() {