TWILL-211, use ALLOCATE_ONE_INSTANCE_AT_A_TIME for retries to prevent poll starvation

This closes #29 from Github

Signed-off-by: Henry Saputra <hsaputra@apache.org>
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 4aa3800..985fad1 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -38,6 +38,7 @@
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -103,6 +104,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+
 import javax.annotation.Nullable;
 
 /**
@@ -464,7 +466,8 @@
       //Check the placement policy
       TwillSpecification.PlacementPolicy placementPolicy =
         placementPolicyManager.getPlacementPolicy(currentAllocationSpecification.getRunnableName());
-      if (placementPolicy.getType().equals(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
+      if (placementPolicy != null
+        && placementPolicy.getType().equals(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
 
         //Update blacklist with hosts which are running DISTRIBUTED runnables
         for (String runnable : placementPolicyManager.getFellowRunnables(request.getKey().getRunnableName())) {
@@ -692,6 +695,11 @@
         amClient.completeContainerRequest(provisionRequest.getRequestId());
       }
 
+      /*
+       * The provisionRequest will either contain a single container (ALLOCATE_ONE_INSTANCE_AT_A_TIME), or all the
+       * containers to satisfy the expectedContainers count. In the later case, the provision request is complete once
+       * all the containers have run at which point we poll() to remove the provisioning request.
+       */
       if (expectedContainers.getExpected(runnableName) == runningContainers.count(runnableName) ||
         provisioning.peek().getType().equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
         provisioning.poll();
@@ -844,7 +852,15 @@
         addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
       }
     } else {
-      AllocationSpecification allocationSpecification = new AllocationSpecification(capability);
+      AllocationSpecification allocationSpecification;
+      if (numberOfInstances > 1) {
+        allocationSpecification = new AllocationSpecification(capability);
+      } else {
+        // for a single instance, we always insert ALLOCATE_ONE_INSTANCE_AT_A_TIME. for multi-instance
+        // runnables, this case occurs during retries.
+        allocationSpecification = new AllocationSpecification(capability,
+          AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME, runnableName, 0);
+      }
       addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
     }
     return new RunnableContainerRequest(order.getType(), requestsMap, isProvisioned);