TEZ-4165. Speed up TestShuffleScheduler#testNumParallelScheduledFetchers
Signed-off-by: Jonathan Eagles <jeagles@apache.org>
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index 7a7b1ee..fabfa27 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -31,11 +31,11 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
@@ -90,7 +90,6 @@
new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager,
mergeManager,
System.currentTimeMillis(), null, false, 0, "srcName", true);
-
Future<Void> executorFuture = null;
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
@@ -111,10 +110,9 @@
scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
identifiers[i] = inputAttemptIdentifier;
}
-
- // Sleep for a bit to allow the copies to be scheduled.
- Thread.sleep(2000l);
- assertEquals(10, scheduler.numFetchersCreated.get());
+ // wait for all the copies to be scheduled with timeout
+ scheduler.latch.await(2000, TimeUnit.MILLISECONDS);
+ assertEquals(0, scheduler.latch.getCount());
} finally {
scheduler.close();
@@ -1033,7 +1031,7 @@
private static class ShuffleSchedulerForTest extends ShuffleScheduler {
- private final AtomicInteger numFetchersCreated = new AtomicInteger(0);
+ private CountDownLatch latch = new CountDownLatch(10);
private final boolean fetcherShouldWait;
private final ExceptionReporter reporter;
private final InputContext inputContext;
@@ -1067,7 +1065,7 @@
@Override
FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) {
- numFetchersCreated.incrementAndGet();
+ latch.countDown();
FetcherOrderedGrouped mockFetcher = mock(FetcherOrderedGrouped.class);
doAnswer(new Answer<Void>() {
@Override