diff --git a/KEYS b/KEYS
index 9087ca7..ee681c1 100644
--- a/KEYS
+++ b/KEYS
@@ -928,3 +928,40 @@
 VcSdqZkA/iKbC7Fv6xS7fTmhnLmAJ40h
 =Ja6V
 -----END PGP PUBLIC KEY BLOCK-----
+pub   2048R/D2103453 2020-11-19 [expires: 2023-11-19]
+uid                  Boris Shkolnik (Linkedin Samza) <boryas@apache.org>
+sig 3        D2103453 2020-11-19  Boris Shkolnik (Linkedin Samza) <boryas@apache.org>
+sub   2048R/4C01BF41 2020-11-19 [expires: 2023-11-19]
+sig          D2103453 2020-11-19  Boris Shkolnik (Linkedin Samza) <boryas@apache.org>
+
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v2.0.22 (GNU/Linux)
+
+mQENBF+2rJsBCACpNvGmT730xGXb0/lELUhzBu10BhdXfkr6FupYU6IxNMSZc/W4
+LKjB0ptF/OfodjdOhA2Gbv0kZt4mRBgmwe+vIJkFAXOZccq+G708ZPBsjifUDESN
+oG9/juCJSSrYGcclWiPg0Fe3bBWm5KPWSkYG1OnHTWnqT7a5n6comgZmNori5tmA
+aPN/RnEywEY7vnqyvcJnfPKysvvaoTUbfoUaiGIVGwprYnQ3huvq+o81etWAOVvZ
+qry+6m6fA5l35bmr29xlbiK7lYCShie7u03U9eRiaLGevFSziViq/cp7OlcAXVN4
+hGtuXh+NVMgaxsy7HIs4H+x9oc9IO+WpdNS5ABEBAAG0M0JvcmlzIFNoa29sbmlr
+IChMaW5rZWRpbiBTYW16YSkgPGJvcnlhc0BhcGFjaGUub3JnPokBPwQTAQIAKQUC
+X7asmwIbAwUJBaOagAcLCQgHAwIBBhUIAgkKCwQWAgMBAh4BAheAAAoJED6OsXvS
+EDRTz4IH/iWHlHFlvasnLCgVU+29NMrx+jFRXMG5pDzd/bNocrlD+E42ocpOU1vo
+bGwvey0I4Hi8DCGKR2p14IAs2tzaKYZR0kel1QesciI+ngyE/zWEI3xKv8OXiS8A
+XdQC1Eq8Q+BJhun3A8zuQU095r9/sygTFBWlOtQ1Wn2qJ5oC6005qDDbyDAbEsxp
+4m+VaM7faHGjpNOSekHCJW2wue55I+IEF/eH8BsRbw6DGlIFhspsl17wJQAcnmRu
+wakPKZGtEyjgwVYdmUpne7GW87n18JdT/6ywfTqw17/ly0zBJhl2M0JTq0d0+Z9P
+NFwt1pWDDJOtmMClMrj3PcAk5DVVPFG5AQ0EX7asmwEIALMe7kvm6j5bgrj2fZVg
+663bqvTSdn90WQ+lQgpURYXMz4VAFNKlaqDGOX2L94vdI4pIEvPfedQ0C9wZQFz3
+NujaRL4gsxx9CHolKzTs68DE7pyiikzpZFuSIKfv0bukSmzqgvz83VKH33jAZiqd
+HFzRh6ad0iMhDpF4g//E6SLnqJdrf335JAB1P/v9LH579XWfbGWegIkXOwG8QTPB
+LSslAKah8JFBILQfeL4MmbJnwb/BQqLIWVyK5hjzon+mxKEWHQNLaxM4R517JCFQ
+DNzBYyoqO517XXEbCdUQ/ZXCRH2+gCFEWj9kA2b6Frqf4B1PD8BUWBjBYDv1RNNd
+kfsAEQEAAYkBJQQYAQIADwUCX7asmwIbDAUJBaOagAAKCRA+jrF70hA0U/trB/9G
+gqyT6JSlmBY+8ep9NW+bDaoWQZY0f4kAR2JyehUM8kLrLqB7D3moah7XrR6nOoWt
+e56yp81+3hGLTcHK2WKapTgYssPrm/nv7NySMUw04UvRMrn+wV5U+7MUtdBxO23R
+hSRuqb40wPBzo97Lb7LOEqLF0OHNMiFSV1qLuhyDGg87zEkfJM+o52Y/CT9nh60Z
+5OXa1fCQhpPC29d7OsBJklE3EtwokCyeeqxJFpcDBNTqM+5GLWbRZp6YQfnNdoPB
+XgSlVbifAXWaMZr8kFpNF6H+zm6tZ3rclihqgWhUqrAAmT11qtMacbK8dRVw3im5
+oZPFXmvDQy/353sy1WRs
+=MLmR
+-----END PGP PUBLIC KEY BLOCK-----
diff --git a/build.gradle b/build.gradle
index e30776a..2931169 100644
--- a/build.gradle
+++ b/build.gradle
@@ -82,6 +82,7 @@
     'docs/_site/**',
     'docs/sitemap.xml',
     'docs/learn/documentation/*/api/javadocs/**',
+    'docs/learn/documentation/*/rest/javadocs/**',
     'docs/Gemfile.lock',
     'gradle/wrapper/**',
     'gradlew',
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 5de9950..fa5f783 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -234,6 +234,13 @@
         if (isRequestExpired(request)) {
           updateExpiryMetrics(request);
           containerManager.handleExpiredRequest(processorId, preferredHost, request, this, resourceRequestState);
+          // SAMZA-2601: to prevent infinite looping and logs filling up the disk, when host affinity is disabled,
+          // we explicitly break the loop here and the whole process gets retried in run() after allocatorSleepIntervalMs
+          if (!hostAffinityEnabled) {
+            LOG.info("Waiting for resources to get allocated for request {},"
+                + " no retries will be issued since host affinity is disabled", request);
+            break;
+          }
         } else {
           LOG.info("Request for Processor ID: {} on preferred host {} has not expired yet."
                   + "Request creation time: {}. Current Time: {}. Request timeout: {} ms", processorId, preferredHost,
@@ -447,7 +454,7 @@
    * @param request the request to check
    * @return true if request has expired
    */
-  private boolean isRequestExpired(SamzaResourceRequest request) {
+  protected boolean isRequestExpired(SamzaResourceRequest request) {
     long currTime = Instant.now().toEpochMilli();
     boolean requestExpired =  currTime - request.getRequestTimestamp().toEpochMilli() > getRequestTimeout(request).toMillis();
     if (requestExpired) {
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
index 7448e57..a65f6ff 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
@@ -18,17 +18,35 @@
  */
 package org.apache.samza.clustermanager;
 
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.samza.config.Config;
 
 import java.lang.reflect.Field;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.MapConfig;
+
 
 public class MockContainerAllocatorWithoutHostAffinity extends ContainerAllocator {
   public int requestedContainers = 0;
   private Semaphore semaphore = new Semaphore(0);
 
+  private Semaphore expiredRequestSemaphore = new Semaphore(0);
+  private AtomicInteger expiredRequestCallCount = new AtomicInteger(0);
+  private volatile boolean overrideIsRequestExpired = false;
+
+  // Create a MockContainerAllocator with certain config overrides
+  public static MockContainerAllocatorWithoutHostAffinity createContainerAllocatorWithConfigOverride(
+      ClusterResourceManager resourceManager, Config config, SamzaApplicationState state,
+      ContainerManager containerManager, Config overrideConfig) {
+    Map<String, String> mergedConfig = new HashMap<>();
+    mergedConfig.putAll(config);
+    mergedConfig.putAll(overrideConfig);
+    return new MockContainerAllocatorWithoutHostAffinity(resourceManager, new MapConfig(mergedConfig), state, containerManager);
+  }
+
   public MockContainerAllocatorWithoutHostAffinity(ClusterResourceManager resourceManager,
                                 Config config, SamzaApplicationState state, ContainerManager containerManager) {
     super(resourceManager, config, state, false, containerManager);
@@ -54,6 +72,29 @@
     super.requestResources(processorToHostMapping);
   }
 
+  public void setOverrideIsRequestExpired() {
+    overrideIsRequestExpired = true;
+  }
+
+  public int getExpiredRequestCallCount() {
+    return expiredRequestCallCount.get();
+  }
+
+  @Override
+  protected boolean isRequestExpired(SamzaResourceRequest request) {
+    if (!overrideIsRequestExpired) {
+      // if not set to override, then return the original result
+      return super.isRequestExpired(request);
+    }
+    expiredRequestSemaphore.release();
+    expiredRequestCallCount.incrementAndGet();
+    return true;
+  }
+
+  public boolean awaitIsRequestExpiredCall(long timeoutMs) throws InterruptedException {
+    return expiredRequestSemaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
+  }
+
   public ResourceRequestState getContainerRequestState() throws Exception {
     Field field = ContainerAllocator.class.getDeclaredField("resourceRequestState");
     field.setAccessible(true);
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index b808296..ac5d6f3 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
@@ -165,6 +166,56 @@
   }
 
   /**
+   * See SAMZA-2601: we want to prevent an infinite loop in the case of expired request call with host affinity
+   * disabled. This test make sure we don't have that infinite loop.
+   */
+  @Test
+  public void testExpiredRequestInfiniteLoop() throws Exception {
+    Config override = new MapConfig(new HashMap<String, String>() {
+      {
+        // override to have a proper sleep interval for this test
+        put("cluster-manager.allocator.sleep.ms", "100");
+      }
+    });
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
+    containerAllocator =
+        MockContainerAllocatorWithoutHostAffinity.createContainerAllocatorWithConfigOverride(manager, config, state,
+            new ContainerManager(containerPlacementMetadataStore, state, manager, false, false, mockLocalityManager),
+            override);
+    MockContainerAllocatorWithoutHostAffinity mockAllocator =
+        (MockContainerAllocatorWithoutHostAffinity) containerAllocator;
+    mockAllocator.setOverrideIsRequestExpired();
+    allocatorThread = new Thread(containerAllocator);
+
+    Map<String, String> containersToHostMapping = new HashMap<String, String>() {
+      {
+        put("0", null);
+        put("1", null);
+        put("2", null);
+        put("3", null);
+      }
+    };
+
+    allocatorThread.start();
+
+    mockAllocator.requestResources(containersToHostMapping);
+    // Wait for at least one expired request call is made, which should happen.
+    // If the test passes, this should return immediately (within 100 ms). Only when the test fails will it exhaust the
+    // timeout, which is worth the wait to find out the failure
+    assertTrue(mockAllocator.awaitIsRequestExpiredCall(TimeUnit.SECONDS.toMillis(10)));
+    // TODO: we can eliminate the thread sleep if the whole container allocator and test codes are refactored to use
+    // a Clock which can be simulated and controlled.
+    Thread.sleep(500);
+    // Given that we wait for 500 ms above, and a sleep interval of 100 ms, we should roughly see 5 times the
+    // isRequestExpired is called. We give some extra buffer here (<100). Because if we do run into infinite loop,
+    // isRequestExpired would be called MILLIONS of times (4~5 million times after a dozen of runs on my machine).
+    assertTrue(
+        String.format("Too many call count: %d. Seems to be in infinite loop", mockAllocator.getExpiredRequestCallCount()),
+        mockAllocator.getExpiredRequestCallCount() < 100);
+  }
+
+  /**
    * Test requestContainers with containerToHostMapping with host.affinity disabled
    */
   @Test
@@ -294,6 +345,8 @@
     spyThread = new Thread(spyAllocator, "Container Allocator Thread");
     // Start the container allocator thread periodic assignment
     spyThread.start();
+    // TODO: we can eliminate the thread sleep if the whole container allocator and test codes are refactored to use
+    // a Clock which can be simulated and controlled.
     Thread.sleep(1000);
     // Verify that all the request that were created were "ANY_HOST" requests
     ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
