SAMZA-2511 : Adding logic to handle container stop fail  (#1347)

* Adding logic to handle container stop fail

* Fixing test

* Adding cancel resource request

* Adding status for stop-failed

* Adding comments

Co-authored-by: Ray Manpreet  Singh Matharu <rmatharu@rmatharu-mn1.linkedin.biz>
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
index 8ea3c30..43b3d36 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
@@ -180,6 +180,13 @@
      */
     void onStreamProcessorLaunchFailure(SamzaResource resource, Throwable t);
 
+    /**
+     * Callback invoked when there is a failure in stopping a processor on the provided {@link SamzaResource}.
+     * @param resource the resource on which the processor was running
+     * @param t the error in stopping the processor
+     */
+    void onStreamProcessorStopFailure(SamzaResource resource, Throwable t);
+
     /***
      * This callback is invoked when there is an error in the ClusterResourceManager. This is
      * guaranteed to be invoked when there is an uncaught exception in any other
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index b9427b3..70a050c 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -134,6 +134,12 @@
       } else if (actionStatus == ContainerPlacementMetadata.ContainerStatus.STOP_IN_PROGRESS) {
         LOG.info("Waiting for running container to shutdown due to existing ContainerPlacement action {}", actionMetaData);
         return false;
+      } else if (actionStatus == ContainerPlacementMetadata.ContainerStatus.STOP_FAILED) {
+        LOG.info("Shutdown on running container failed for action {}", actionMetaData);
+        markContainerPlacementActionFailed(actionMetaData,
+            String.format("failed to stop container on current host %s", actionMetaData.getSourceHost()));
+        resourceRequestState.cancelResourceRequest(request);
+        return true;
       } else if (actionStatus == ContainerPlacementMetadata.ContainerStatus.STOPPED) {
         // If the job has standby containers enabled, always check standby constraints before issuing a start on container
         // Note: Always check constraints against allocated resource, since preferred host can be ANY_HOST as well
@@ -234,6 +240,29 @@
   }
 
   /**
+   * Handle the container stop failure for active containers and standby (if enabled).
+   * @param processorId logical id of the container eg 1,2,3
+   * @param containerId last known id of the container deployed
+   * @param containerHost host on which container is requested to be deployed
+   * @param containerAllocator allocator for requesting resources
+   * TODO: SAMZA-2512 Add integ test for handleContainerStopFail
+   */
+  void handleContainerStopFail(String processorId, String containerId, String containerHost,
+      ContainerAllocator containerAllocator) {
+    if (processorId != null && hasActiveContainerPlacementAction(processorId)) {
+      // Assuming resource acquired on destination host will be relinquished by the containerAllocator,
+      // We mark the placement action as failed, and return.
+      ContainerPlacementMetadata metaData = getPlacementActionMetadata(processorId).get();
+      metaData.setContainerStatus(ContainerPlacementMetadata.ContainerStatus.STOP_FAILED);
+    } else if (processorId != null && standbyContainerManager.isPresent()) {
+      standbyContainerManager.get().handleContainerStopFail(processorId, containerId, containerAllocator);
+    } else {
+      LOG.warn("Did not find a running Processor ID for Container ID: {} on host: {}. "
+          + "Ignoring invalid/redundant notification.", containerId, containerHost);
+    }
+  }
+
+  /**
    * Handles the state update on successful launch of a container, if this launch is due to a container placement action updates the
    * related metadata to report success
    *
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index d3962ab..c54918d 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -25,6 +25,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.SamzaException;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
@@ -299,17 +300,10 @@
    */
   public void onResourceCompleted(SamzaResourceStatus resourceStatus) {
     String containerId = resourceStatus.getContainerId();
-    String processorId = null;
-    String hostName = null;
-    for (Map.Entry<String, SamzaResource> entry: state.runningProcessors.entrySet()) {
-      if (entry.getValue().getContainerId().equals(resourceStatus.getContainerId())) {
-        LOG.info("Container ID: {} matched running Processor ID: {} on host: {}", containerId, entry.getKey(), entry.getValue().getHost());
+    Pair<String, String> runningProcessorIdHostname = getRunningProcessor(containerId);
+    String processorId = runningProcessorIdHostname.getKey();
+    String hostName = runningProcessorIdHostname.getValue();
 
-        processorId = entry.getKey();
-        hostName = entry.getValue().getHost();
-        break;
-      }
-    }
     if (processorId == null) {
       LOG.info("No running Processor ID found for Container ID: {} with Status: {}. Ignoring redundant notification.", containerId, resourceStatus.toString());
       state.redundantNotifications.incrementAndGet();
@@ -431,6 +425,18 @@
     containerManager.handleContainerLaunchFail(processorId, containerId, containerHost, containerAllocator);
   }
 
+  @Override
+  public void onStreamProcessorStopFailure(SamzaResource resource, Throwable t) {
+    String containerId = resource.getContainerId();
+    String containerHost = resource.getHost();
+    String processorId = getRunningProcessor(containerId).getKey();
+    LOG.warn("Stop failed for running Processor ID: {} on Container ID: {} on host: {} with exception: {}",
+        processorId, containerId, containerHost, t);
+
+    // Notify container-manager of the failed container-stop request
+    containerManager.handleContainerStopFail(processorId, containerId, containerHost, containerAllocator);
+  }
+
   /**
    * An error in the callback terminates the JobCoordinator
    * @param e the underlying exception/error
@@ -623,6 +629,20 @@
     return null;
   }
 
+  private Pair<String, String> getRunningProcessor(String containerId) {
+    for (Map.Entry<String, SamzaResource> entry: state.runningProcessors.entrySet()) {
+      if (entry.getValue().getContainerId().equals(containerId)) {
+        LOG.info("Container ID: {} matched running Processor ID: {} on host: {}", containerId, entry.getKey(), entry.getValue().getHost());
+
+        String processorId = entry.getKey();
+        String hostName = entry.getValue().getHost();
+        return new ImmutablePair<>(processorId, hostName);
+      }
+    }
+
+    return new ImmutablePair<>(null, null);
+  }
+
   /**
    * Request {@link ContainerManager#handleContainerStop} to determine next step of actions for the stopped container
    */
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
index a9d298d..10d5f09 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -127,6 +127,31 @@
   }
 
   /**
+   *  Handle the failed stop for a container, based on
+   *  Case 1. If it is standby container, continue the failover
+   *  Case 2. If it is an active container, then this is in invalid state and throw an exception to alarm/restart.
+   * @param containerID the ID (e.g., 0, 1, 2) of the container that has failed
+   * @param resourceID id of the resource used for the failed container
+   */
+  public void handleContainerStopFail(String containerID, String resourceID,
+      ContainerAllocator containerAllocator) {
+    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      log.info("Handling stop fail for standby-container {}, continuing the failover (if present)", containerID);
+
+      // if this standbyContainerResource was stopped for a failover, we will find a metadata entry
+      Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata = this.checkIfUsedForFailover(resourceID);
+
+      // if we find a metadata entry, we continue with the failover (select another standby or any-host appropriately)
+      failoverMetadata.ifPresent(
+          metadata -> initiateStandbyAwareAllocation(metadata.activeContainerID, metadata.activeContainerResourceID,
+              containerAllocator));
+    } else {
+      // If this class receives a callback for stop-fail on an active container, throw an exception
+      throw new SamzaException("Invalid State. Received stop container fail for container Id: " + containerID);
+    }
+  }
+
+  /**
    *  If a standby container has stopped, then there are two possible cases
    *    Case 1. during a failover, the standby container was stopped for an active's start, then we
    *       1. request a resource on the standby's host to place the activeContainer, and
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadata.java b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadata.java
index 15c9e1c..0f415d6 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadata.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadata.java
@@ -40,7 +40,7 @@
   /**
    * State to track container failover
    */
-  public enum ContainerStatus { RUNNING, STOP_IN_PROGRESS, STOPPED }
+  public enum ContainerStatus { RUNNING, STOP_IN_PROGRESS, STOP_FAILED, STOPPED }
   // Container Placement request message
   private final ContainerPlacementRequestMessage requestMessage;
   // Host where the container is actively running
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
index 4e6e2c9..f3ab1d2 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
@@ -48,6 +48,11 @@
   }
 
   @Override
+  public void onStreamProcessorStopFailure(SamzaResource resource, Throwable t) {
+    // no op
+  }
+
+  @Override
   public void onError(Throwable e) {
     error = e;
   }
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index e05b31e..5459a9d 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -570,6 +570,9 @@
     if (processorId != null) {
       log.info("Got stop error notification for Container ID: {} for Processor ID: {}", containerId, processorId, t);
       YarnContainer container = state.runningProcessors.get(processorId);
+      SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
+          container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
+      clusterManagerCallback.onStreamProcessorStopFailure(resource, t);
     } else {
       log.warn("Did not find the running Processor ID for the stop error notification for Container ID: {}. " +
           "Ignoring notification", containerId);