APEXCORE-624 decrement unallocated containers and also released containers so that the exit condition for the shutdown check is satisfied.
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 3b2c4de..087d6d5 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -770,6 +770,7 @@
for (Map.Entry<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> entry : requestedResources.entrySet()) {
if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) {
StreamingContainerAgent.ContainerStartRequest csr = entry.getKey();
+ LOG.debug("Request for container {} timed out. Re-requesting container", csr.container);
removedContainerRequests.add(entry.getValue().getRight());
ContainerRequest cr = resourceRequestor.createContainerRequest(csr, false);
entry.getValue().setLeft(loopCounter);
@@ -779,7 +780,7 @@
}
}
- /* Remove nodes from blacklist after timeout */
+ /* Remove nodes from blacklist after timeout */
long currentTime = System.currentTimeMillis();
List<String> blacklistRemovals = new ArrayList<String>();
for (Iterator<Pair<Long, List<String>>> it = blacklistedNodesQueueWithTimeStamp.iterator(); it.hasNext();) {
@@ -797,7 +798,7 @@
}
numTotalContainers += containerRequests.size();
- numRequestedContainers += containerRequests.size();
+ numRequestedContainers += containerRequests.size() - removedContainerRequests.size();
AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers);
if (amResp.getAMCommand() != null) {
LOG.info(" statement executed:{}", amResp.getAMCommand());
@@ -836,7 +837,7 @@
LOG.info("Releasing {} as resource with priority {} was already assigned", allocatedContainer.getId(), allocatedContainer.getPriority());
releasedContainers.add(allocatedContainer.getId());
numReleasedContainers++;
- numRequestedContainers++;
+ numRequestedContainers--;
continue;
}
if (csr != null) {
@@ -964,7 +965,8 @@
appDone = true;
}
- LOG.debug("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total=" + numTotalContainers + ", requested=" + numRequestedContainers + ", released=" + numReleasedContainers + ", completed=" + numCompletedContainers + ", failed=" + numFailedContainers + ", currentAllocated=" + allocatedContainers.size());
+ LOG.debug("Current application state: loop={}, appDone={}, total={}, requested={}, released={}, completed={}, failed={}, currentAllocated={}, dnmgr.containerStartRequests={}",
+ loopCounter, appDone, numTotalContainers, numRequestedContainers, numReleasedContainers, numCompletedContainers, numFailedContainers, allocatedContainers.size(), dnmgr.containerStartRequests);
// monitor child containers
dnmgr.monitorHeartbeat();
@@ -1038,16 +1040,14 @@
private AllocateResponse sendContainerAskToRM(List<ContainerRequest> containerRequests, List<ContainerRequest> removedContainerRequests, List<ContainerId> releasedContainers) throws YarnException, IOException
{
if (removedContainerRequests.size() > 0) {
- LOG.info(" Removing container request: " + removedContainerRequests);
+ LOG.debug("Removing container request: {}", removedContainerRequests);
for (ContainerRequest cr : removedContainerRequests) {
- LOG.info("Removed container: {}", cr.toString());
amRmClient.removeContainerRequest(cr);
}
}
if (containerRequests.size() > 0) {
- LOG.info("Asking RM for containers: " + containerRequests);
+ LOG.debug("Asking RM for containers: {}", containerRequests);
for (ContainerRequest cr : containerRequests) {
- LOG.info("Requested container: {}", cr.toString());
amRmClient.addContainerRequest(cr);
}
}