SAMZA-2601: avoid infinite loop when resource not allocated with host affinity disabled
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 5de9950..390b4dc 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -234,6 +234,12 @@
         if (isRequestExpired(request)) {
           updateExpiryMetrics(request);
           containerManager.handleExpiredRequest(processorId, preferredHost, request, this, resourceRequestState);
+          // SAMZA-2601: to prevent infinite looping and logs filling up the disk, when host affinity is disabled,
+          // we explicitly break the loop here and the whole process gets retried in run() after allocatorSleepIntervalMs
+          if (!hostAffinityEnabled) {
+            LOG.info("Host affinity is disabled on expired request.");
+            break;
+          }
         } else {
           LOG.info("Request for Processor ID: {} on preferred host {} has not expired yet."
                   + "Request creation time: {}. Current Time: {}. Request timeout: {} ms", processorId, preferredHost,
@@ -447,7 +453,7 @@
    * @param request the request to check
    * @return true if request has expired
    */
-  private boolean isRequestExpired(SamzaResourceRequest request) {
+  protected boolean isRequestExpired(SamzaResourceRequest request) {
     long currTime = Instant.now().toEpochMilli();
     boolean requestExpired =  currTime - request.getRequestTimestamp().toEpochMilli() > getRequestTimeout(request).toMillis();
     if (requestExpired) {
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
index 7448e57..a65f6ff 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
@@ -18,17 +18,35 @@
  */
 package org.apache.samza.clustermanager;
 
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.samza.config.Config;
 
 import java.lang.reflect.Field;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.MapConfig;
+
 
 public class MockContainerAllocatorWithoutHostAffinity extends ContainerAllocator {
   public int requestedContainers = 0;
   private Semaphore semaphore = new Semaphore(0);
 
+  private Semaphore expiredRequestSemaphore = new Semaphore(0);
+  private AtomicInteger expiredRequestCallCount = new AtomicInteger(0);
+  private volatile boolean overrideIsRequestExpired = false;
+
+  // Create a MockContainerAllocator with certain config overrides
+  public static MockContainerAllocatorWithoutHostAffinity createContainerAllocatorWithConfigOverride(
+      ClusterResourceManager resourceManager, Config config, SamzaApplicationState state,
+      ContainerManager containerManager, Config overrideConfig) {
+    Map<String, String> mergedConfig = new HashMap<>();
+    mergedConfig.putAll(config);
+    mergedConfig.putAll(overrideConfig);
+    return new MockContainerAllocatorWithoutHostAffinity(resourceManager, new MapConfig(mergedConfig), state, containerManager);
+  }
+
   public MockContainerAllocatorWithoutHostAffinity(ClusterResourceManager resourceManager,
                                 Config config, SamzaApplicationState state, ContainerManager containerManager) {
     super(resourceManager, config, state, false, containerManager);
@@ -54,6 +72,29 @@
     super.requestResources(processorToHostMapping);
   }
 
+  public void setOverrideIsRequestExpired() {
+    overrideIsRequestExpired = true;
+  }
+
+  public int getExpiredRequestCallCount() {
+    return expiredRequestCallCount.get();
+  }
+
+  @Override
+  protected boolean isRequestExpired(SamzaResourceRequest request) {
+    if (!overrideIsRequestExpired) {
+      // if not set to override, then return the original result
+      return super.isRequestExpired(request);
+    }
+    expiredRequestSemaphore.release();
+    expiredRequestCallCount.incrementAndGet();
+    return true;
+  }
+
+  public boolean awaitIsRequestExpiredCall(long timeoutMs) throws InterruptedException {
+    return expiredRequestSemaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
+  }
+
   public ResourceRequestState getContainerRequestState() throws Exception {
     Field field = ContainerAllocator.class.getDeclaredField("resourceRequestState");
     field.setAccessible(true);
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index b808296..ac5d6f3 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
@@ -165,6 +166,56 @@
   }
 
   /**
+   * See SAMZA-2601: we want to prevent an infinite loop in the case of expired request call with host affinity
+   * disabled. This test make sure we don't have that infinite loop.
+   */
+  @Test
+  public void testExpiredRequestInfiniteLoop() throws Exception {
+    Config override = new MapConfig(new HashMap<String, String>() {
+      {
+        // override to have a proper sleep interval for this test
+        put("cluster-manager.allocator.sleep.ms", "100");
+      }
+    });
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
+    containerAllocator =
+        MockContainerAllocatorWithoutHostAffinity.createContainerAllocatorWithConfigOverride(manager, config, state,
+            new ContainerManager(containerPlacementMetadataStore, state, manager, false, false, mockLocalityManager),
+            override);
+    MockContainerAllocatorWithoutHostAffinity mockAllocator =
+        (MockContainerAllocatorWithoutHostAffinity) containerAllocator;
+    mockAllocator.setOverrideIsRequestExpired();
+    allocatorThread = new Thread(containerAllocator);
+
+    Map<String, String> containersToHostMapping = new HashMap<String, String>() {
+      {
+        put("0", null);
+        put("1", null);
+        put("2", null);
+        put("3", null);
+      }
+    };
+
+    allocatorThread.start();
+
+    mockAllocator.requestResources(containersToHostMapping);
+    // Wait for at least one expired request call is made, which should happen.
+    // If the test passes, this should return immediately (within 100 ms). Only when the test fails will it exhaust the
+    // timeout, which is worth the wait to find out the failure
+    assertTrue(mockAllocator.awaitIsRequestExpiredCall(TimeUnit.SECONDS.toMillis(10)));
+    // TODO: we can eliminate the thread sleep if the whole container allocator and test codes are refactored to use
+    // a Clock which can be simulated and controlled.
+    Thread.sleep(500);
+    // Given that we wait for 500 ms above, and a sleep interval of 100 ms, we should roughly see 5 times the
+    // isRequestExpired is called. We give some extra buffer here (<100). Because if we do run into infinite loop,
+    // isRequestExpired would be called MILLIONS of times (4~5 million times after a dozen of runs on my machine).
+    assertTrue(
+        String.format("Too many call count: %d. Seems to be in infinite loop", mockAllocator.getExpiredRequestCallCount()),
+        mockAllocator.getExpiredRequestCallCount() < 100);
+  }
+
+  /**
    * Test requestContainers with containerToHostMapping with host.affinity disabled
    */
   @Test
@@ -294,6 +345,8 @@
     spyThread = new Thread(spyAllocator, "Container Allocator Thread");
     // Start the container allocator thread periodic assignment
     spyThread.start();
+    // TODO: we can eliminate the thread sleep if the whole container allocator and test codes are refactored to use
+    // a Clock which can be simulated and controlled.
     Thread.sleep(1000);
     // Verify that all the request that were created were "ANY_HOST" requests
     ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);