TEZ-2635. Limit number of attempts being downloaded in unordered fetch (rbalamohan)

(cherry picked from commit 3347c94f863867aab241db34aafa502dc6a1ff1b)
diff --git a/CHANGES.txt b/CHANGES.txt
index 20d31b2..b4ba939 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -215,6 +215,7 @@
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
 
 ALL CHANGES:
+  TEZ-2635. Limit number of attempts being downloaded in unordered fetch.
   TEZ-2636. MRInput and MultiMRInput should work for cases when there are 0 physical inputs.
   TEZ-2600. When used with HDFS federation(viewfs) ,tez will throw a error
 
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index b7067e5..5d1da7c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -135,8 +135,10 @@
   private final boolean ifileReadAhead;
   private final int ifileReadAheadLength;
   
-  private final String srcNameTrimmed; 
-  
+  private final String srcNameTrimmed;
+
+  private final int maxTaskOutputAtOnce;
+
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
 
   private final TezCounter shuffledInputsCounter;
@@ -235,6 +237,14 @@
     this.localDisks = Iterables.toArray(
         localDirAllocator.getAllLocalPathsToRead(".", conf), Path.class);
 
+    /**
+     * Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would
+     * be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL.
+     */
+    this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT)));
+
     Arrays.sort(this.localDisks);
 
     LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec="
@@ -243,7 +253,7 @@
         + ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength +", "
         + "localDiskFetchEnabled=" + localDiskFetchEnabled + ", "
         + "sharedFetchEnabled=" + sharedFetchEnabled + ", "
-        + httpConnectionParams.toString());
+        + httpConnectionParams.toString() + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce);
   }
 
   public void run() throws IOException {
@@ -365,6 +375,7 @@
     // remove from the obsolete list.
     List<InputAttemptIdentifier> pendingInputsForHost = inputHost
         .clearAndGetPendingInputs();
+    int includedMaps = 0;
     for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
         .iterator(); inputIter.hasNext();) {
       InputAttemptIdentifier input = inputIter.next();
@@ -376,10 +387,20 @@
       // Avoid adding attempts which have been marked as OBSOLETE 
       if (obsoletedInputs.contains(input)) {
         inputIter.remove();
+        continue;
+      }
+
+      // Check if max threshold is met
+      if (includedMaps >= maxTaskOutputAtOnce) {
+        inputIter.remove();
+        inputHost.addKnownInput(input); //add to inputHost
+      } else {
+        includedMaps++;
       }
     }
-    // TODO NEWTEZ Maybe limit the number of inputs being given to a single
-    // fetcher, especially in the case where #hosts < #fetchers
+    if (inputHost.getNumPendingInputs() > 0) {
+      pendingHosts.add(inputHost); //add it to queue
+    }
     fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(),
         inputHost.getSrcPhysicalIndex(), pendingInputsForHost);
     LOG.info("Created Fetcher for host: " + inputHost.getHost()
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 52e7334..74f0585 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -143,9 +143,13 @@
         conf.getBoolean(
             TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR, 
             TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT);
-    this.maxTaskOutputAtOnce = Math.max(1, conf.getInt(
+    /**
+     * Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would
+     * be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL.
+     */
+    this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt(
             TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT));
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT)));
     
     this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS);
     this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
@@ -157,7 +161,8 @@
         + ", reportReadErrorImmediately=" + reportReadErrorImmediately
         + ", maxFailedUniqueFetches=" + maxFailedUniqueFetches
         + ", abortFailureLimit=" + abortFailureLimit
-        + ", maxMapRuntime=" + maxMapRuntime);
+        + ", maxMapRuntime=" + maxMapRuntime
+        + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce);
   }
 
   protected synchronized  void updateEventReceivedTime() {