Make PlacementPolicyTestRun more robust.
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/PlacementPolicyTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/PlacementPolicyTestRun.java
index 0da9dd2..a886491 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/PlacementPolicyTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/PlacementPolicyTestRun.java
@@ -17,16 +17,13 @@
*/
package org.apache.twill.yarn;
-import com.google.common.collect.Sets;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.twill.api.Hosts;
import org.apache.twill.api.Racks;
-import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.ResourceSpecification;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillController;
-import org.apache.twill.api.TwillRunResources;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.api.logging.PrinterLogHandler;
@@ -36,17 +33,20 @@
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
-import java.util.Collection;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* Tests for placement Policies.
*/
public class PlacementPolicyTestRun extends BaseYarnTest {
+ private static final Logger LOG = LoggerFactory.getLogger(PlacementPolicyTestRun.class);
+
private static final int RUNNABLE_MEMORY = 512;
private static final int RUNNABLE_CORES = 1;
@@ -103,12 +103,8 @@
// Ignore test if it is running against older Hadoop versions which does not support blacklists.
Assume.assumeTrue(YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_22));
- ServiceDiscovered serviceDiscovered;
- ResourceReport resourceReport;
- Set<Integer> nmPorts = Sets.newHashSet();
- Collection<TwillRunResources> distributedResource;
+ waitNodeManagerCount(0, 10, TimeUnit.SECONDS);
- Assert.assertEquals(0, getProvisionedNodeManagerCount());
TwillRunner runner = YarnTestUtils.getTwillRunner();
TwillController controller = runner.prepare(new PlacementPolicyApplication())
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
@@ -120,7 +116,7 @@
try {
// All runnables should get started.
- serviceDiscovered = controller.discoverService("PlacementPolicyTest");
+ ServiceDiscovered serviceDiscovered = controller.discoverService("PlacementPolicyTest");
Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 4, 80));
// DISTRIBUTED runnables should be provisioned on different nodes.
@@ -164,13 +160,8 @@
// Ignore test if it is running against older Hadoop versions which does not support blacklists.
Assume.assumeTrue(YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_22));
- ServiceDiscovered serviceDiscovered;
- ResourceReport resourceReport;
- Set<Integer> nmPorts = Sets.newHashSet();
- Collection<TwillRunResources> aliceResources;
- Collection<TwillRunResources> bobResources;
+ waitNodeManagerCount(0, 10, TimeUnit.SECONDS);
- Assert.assertEquals(0, getProvisionedNodeManagerCount());
TwillRunner runner = YarnTestUtils.getTwillRunner();
TwillController controller = runner.prepare(new DistributedApplication())
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
@@ -182,7 +173,7 @@
try {
// All runnables should get started with DISTRIBUTED ones being on different nodes.
- serviceDiscovered = controller.discoverService("DistributedTest");
+ ServiceDiscovered serviceDiscovered = controller.discoverService("DistributedTest");
Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 3, 60));
Assert.assertTrue(getProvisionedNodeManagerCount() >= 2);
@@ -212,6 +203,23 @@
TimeUnit.SECONDS.sleep(2);
}
+ private void waitNodeManagerCount(int expected, long timeout, TimeUnit unit) throws Exception {
+ int count = getProvisionedNodeManagerCount();
+ long startTime = System.currentTimeMillis();
+ long elapse = 0L;
+
+ while (count != expected && elapse < unit.toMillis(timeout)) {
+ LOG.info("Waiting for expected number of node managers. Expected: {}. Actual: {}", expected, count);
+ TimeUnit.SECONDS.sleep(1);
+ count = getProvisionedNodeManagerCount();
+ elapse = System.currentTimeMillis() - startTime;
+ }
+ if (count != expected) {
+ throw new TimeoutException("Failed to get expected number of node managers. " +
+ "Expected: " + expected + ". Actual: " + count);
+ }
+ }
+
/**
* An application that runs three runnables, with a DISTRIBUTED placement policy for two of them.
*/
@@ -241,10 +249,6 @@
Assume.assumeTrue(YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_22));
ServiceDiscovered serviceDiscovered;
- ResourceReport resourceReport;
- Set<Integer> nmPorts = Sets.newHashSet();
- Collection<TwillRunResources> aliceResources;
- Collection<TwillRunResources> bobResources;
TwillRunner runner = YarnTestUtils.getTwillRunner();
TwillController controller = runner.prepare(new ChangeInstanceApplication())