TEZ-4165. Speed up TestShuffleScheduler#testNumParallelScheduledFetchers

Signed-off-by: Jonathan Eagles <jeagles@apache.org>
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index 7a7b1ee..fabfa27 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -31,11 +31,11 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
@@ -90,7 +90,6 @@
         new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager,
             mergeManager,
             System.currentTimeMillis(), null, false, 0, "srcName", true);
-
     Future<Void> executorFuture = null;
     ExecutorService executor = Executors.newFixedThreadPool(1);
     try {
@@ -111,10 +110,9 @@
         scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
         identifiers[i] = inputAttemptIdentifier;
       }
-
-      // Sleep for a bit to allow the copies to be scheduled.
-      Thread.sleep(2000l);
-      assertEquals(10, scheduler.numFetchersCreated.get());
+      // wait for all the copies to be scheduled with timeout
+      scheduler.latch.await(2000, TimeUnit.MILLISECONDS);
+      assertEquals(0, scheduler.latch.getCount());
 
     } finally {
       scheduler.close();
@@ -1033,7 +1031,7 @@
 
   private static class ShuffleSchedulerForTest extends ShuffleScheduler {
 
-    private final AtomicInteger numFetchersCreated = new AtomicInteger(0);
+    private CountDownLatch latch = new CountDownLatch(10);
     private final boolean fetcherShouldWait;
     private final ExceptionReporter reporter;
     private final InputContext inputContext;
@@ -1067,7 +1065,7 @@
 
     @Override
     FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) {
-      numFetchersCreated.incrementAndGet();
+      latch.countDown();
       FetcherOrderedGrouped mockFetcher = mock(FetcherOrderedGrouped.class);
       doAnswer(new Answer<Void>() {
         @Override