SAMZA-2601: avoid infinite loop when resource not allocated with host affinity disabled (#1441)

Problem: If YARN does not immediately allocate resources for a container and host affinity is disabled for the job, then the container allocator will spam the logs with messages about the request, even if the request is considered expired. This is because the allocator loops over the pending requests to check if they have been allocated, but there is no delay between loop iterations, so it might just keep checking the same pending request over and over as fast as possible. A metric is also incremented in this flow, so that metric is extremely high, which is probably not the intention.

Changes: Avoid infinite loop by breaking the loop if host affinity is disabled

Tests: new unit test

API changes: N/A

Usage instructions: N/A

Upgrade instructions: None
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 5de9950..fa5f783 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -234,6 +234,13 @@
         if (isRequestExpired(request)) {
           updateExpiryMetrics(request);
           containerManager.handleExpiredRequest(processorId, preferredHost, request, this, resourceRequestState);
+          // SAMZA-2601: to prevent infinite looping and logs filling up the disk, when host affinity is disabled,
+          // we explicitly break the loop here and the whole process gets retried in run() after allocatorSleepIntervalMs
+          if (!hostAffinityEnabled) {
+            LOG.info("Waiting for resources to get allocated for request {},"
+                + " no retries will be issued since host affinity is disabled", request);
+            break;
+          }
         } else {
           LOG.info("Request for Processor ID: {} on preferred host {} has not expired yet."
                   + "Request creation time: {}. Current Time: {}. Request timeout: {} ms", processorId, preferredHost,
@@ -447,7 +454,7 @@
    * @param request the request to check
    * @return true if request has expired
    */
-  private boolean isRequestExpired(SamzaResourceRequest request) {
+  protected boolean isRequestExpired(SamzaResourceRequest request) {
     long currTime = Instant.now().toEpochMilli();
     boolean requestExpired =  currTime - request.getRequestTimestamp().toEpochMilli() > getRequestTimeout(request).toMillis();
     if (requestExpired) {
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
index 7448e57..a65f6ff 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
@@ -18,17 +18,35 @@
  */
 package org.apache.samza.clustermanager;
 
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.samza.config.Config;
 
 import java.lang.reflect.Field;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.MapConfig;
+
 
 public class MockContainerAllocatorWithoutHostAffinity extends ContainerAllocator {
   public int requestedContainers = 0;
   private Semaphore semaphore = new Semaphore(0);
 
+  private Semaphore expiredRequestSemaphore = new Semaphore(0);
+  private AtomicInteger expiredRequestCallCount = new AtomicInteger(0);
+  private volatile boolean overrideIsRequestExpired = false;
+
+  // Create a MockContainerAllocator with certain config overrides
+  public static MockContainerAllocatorWithoutHostAffinity createContainerAllocatorWithConfigOverride(
+      ClusterResourceManager resourceManager, Config config, SamzaApplicationState state,
+      ContainerManager containerManager, Config overrideConfig) {
+    Map<String, String> mergedConfig = new HashMap<>();
+    mergedConfig.putAll(config);
+    mergedConfig.putAll(overrideConfig);
+    return new MockContainerAllocatorWithoutHostAffinity(resourceManager, new MapConfig(mergedConfig), state, containerManager);
+  }
+
   public MockContainerAllocatorWithoutHostAffinity(ClusterResourceManager resourceManager,
                                 Config config, SamzaApplicationState state, ContainerManager containerManager) {
     super(resourceManager, config, state, false, containerManager);
@@ -54,6 +72,29 @@
     super.requestResources(processorToHostMapping);
   }
 
+  public void setOverrideIsRequestExpired() {
+    overrideIsRequestExpired = true;
+  }
+
+  public int getExpiredRequestCallCount() {
+    return expiredRequestCallCount.get();
+  }
+
+  @Override
+  protected boolean isRequestExpired(SamzaResourceRequest request) {
+    if (!overrideIsRequestExpired) {
+      // if not set to override, then return the original result
+      return super.isRequestExpired(request);
+    }
+    expiredRequestSemaphore.release();
+    expiredRequestCallCount.incrementAndGet();
+    return true;
+  }
+
+  public boolean awaitIsRequestExpiredCall(long timeoutMs) throws InterruptedException {
+    return expiredRequestSemaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
+  }
+
   public ResourceRequestState getContainerRequestState() throws Exception {
     Field field = ContainerAllocator.class.getDeclaredField("resourceRequestState");
     field.setAccessible(true);
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index b808296..ac5d6f3 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
@@ -165,6 +166,56 @@
   }
 
   /**
+   * See SAMZA-2601: we want to prevent an infinite loop in the case of expired request call with host affinity
+   * disabled. This test make sure we don't have that infinite loop.
+   */
+  @Test
+  public void testExpiredRequestInfiniteLoop() throws Exception {
+    Config override = new MapConfig(new HashMap<String, String>() {
+      {
+        // override to have a proper sleep interval for this test
+        put("cluster-manager.allocator.sleep.ms", "100");
+      }
+    });
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
+    containerAllocator =
+        MockContainerAllocatorWithoutHostAffinity.createContainerAllocatorWithConfigOverride(manager, config, state,
+            new ContainerManager(containerPlacementMetadataStore, state, manager, false, false, mockLocalityManager),
+            override);
+    MockContainerAllocatorWithoutHostAffinity mockAllocator =
+        (MockContainerAllocatorWithoutHostAffinity) containerAllocator;
+    mockAllocator.setOverrideIsRequestExpired();
+    allocatorThread = new Thread(containerAllocator);
+
+    Map<String, String> containersToHostMapping = new HashMap<String, String>() {
+      {
+        put("0", null);
+        put("1", null);
+        put("2", null);
+        put("3", null);
+      }
+    };
+
+    allocatorThread.start();
+
+    mockAllocator.requestResources(containersToHostMapping);
+    // Wait for at least one expired request call is made, which should happen.
+    // If the test passes, this should return immediately (within 100 ms). Only when the test fails will it exhaust the
+    // timeout, which is worth the wait to find out the failure
+    assertTrue(mockAllocator.awaitIsRequestExpiredCall(TimeUnit.SECONDS.toMillis(10)));
+    // TODO: we can eliminate the thread sleep if the whole container allocator and test codes are refactored to use
+    // a Clock which can be simulated and controlled.
+    Thread.sleep(500);
+    // Given that we wait for 500 ms above, and a sleep interval of 100 ms, we should roughly see 5 times the
+    // isRequestExpired is called. We give some extra buffer here (<100). Because if we do run into infinite loop,
+    // isRequestExpired would be called MILLIONS of times (4~5 million times after a dozen of runs on my machine).
+    assertTrue(
+        String.format("Too many call count: %d. Seems to be in infinite loop", mockAllocator.getExpiredRequestCallCount()),
+        mockAllocator.getExpiredRequestCallCount() < 100);
+  }
+
+  /**
    * Test requestContainers with containerToHostMapping with host.affinity disabled
    */
   @Test
@@ -294,6 +345,8 @@
     spyThread = new Thread(spyAllocator, "Container Allocator Thread");
     // Start the container allocator thread periodic assignment
     spyThread.start();
+    // TODO: we can eliminate the thread sleep if the whole container allocator and test codes are refactored to use
+    // a Clock which can be simulated and controlled.
     Thread.sleep(1000);
     // Verify that all the request that were created were "ANY_HOST" requests
     ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);