TWILL-211, use ALLOCATE_ONE_INSTANCE_AT_A_TIME for retries to prevent poll starvation
This closes #29 from Github
Signed-off-by: Henry Saputra <hsaputra@apache.org>
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 4aa3800..985fad1 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -38,6 +38,7 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -103,6 +104,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+
import javax.annotation.Nullable;
/**
@@ -464,7 +466,8 @@
//Check the placement policy
TwillSpecification.PlacementPolicy placementPolicy =
placementPolicyManager.getPlacementPolicy(currentAllocationSpecification.getRunnableName());
- if (placementPolicy.getType().equals(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
+ if (placementPolicy != null
+ && placementPolicy.getType().equals(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
//Update blacklist with hosts which are running DISTRIBUTED runnables
for (String runnable : placementPolicyManager.getFellowRunnables(request.getKey().getRunnableName())) {
@@ -692,6 +695,11 @@
amClient.completeContainerRequest(provisionRequest.getRequestId());
}
+ /*
+ * The provisionRequest will either contain a single container (ALLOCATE_ONE_INSTANCE_AT_A_TIME), or all the
+ * containers to satisfy the expectedContainers count. In the later case, the provision request is complete once
+ * all the containers have run at which point we poll() to remove the provisioning request.
+ */
if (expectedContainers.getExpected(runnableName) == runningContainers.count(runnableName) ||
provisioning.peek().getType().equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
provisioning.poll();
@@ -844,7 +852,15 @@
addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
}
} else {
- AllocationSpecification allocationSpecification = new AllocationSpecification(capability);
+ AllocationSpecification allocationSpecification;
+ if (numberOfInstances > 1) {
+ allocationSpecification = new AllocationSpecification(capability);
+ } else {
+ // for a single instance, we always insert ALLOCATE_ONE_INSTANCE_AT_A_TIME. for multi-instance
+ // runnables, this case occurs during retries.
+ allocationSpecification = new AllocationSpecification(capability,
+ AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME, runnableName, 0);
+ }
addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
}
return new RunnableContainerRequest(order.getType(), requestsMap, isProvisioned);