SAMZA-2601: avoid infinite loop when resource not allocated with host affinity disabled
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 5de9950..390b4dc 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -234,6 +234,12 @@
if (isRequestExpired(request)) {
updateExpiryMetrics(request);
containerManager.handleExpiredRequest(processorId, preferredHost, request, this, resourceRequestState);
+ // SAMZA-2601: to prevent infinite looping and logs filling up the disk, when host affinity is disabled,
+ // we explicitly break the loop here and the whole process gets retried in run() after allocatorSleepIntervalMs
+ if (!hostAffinityEnabled) {
+ LOG.info("Host affinity is disabled on expired request.");
+ break;
+ }
} else {
LOG.info("Request for Processor ID: {} on preferred host {} has not expired yet."
+ "Request creation time: {}. Current Time: {}. Request timeout: {} ms", processorId, preferredHost,
@@ -447,7 +453,7 @@
* @param request the request to check
* @return true if request has expired
*/
- private boolean isRequestExpired(SamzaResourceRequest request) {
+ protected boolean isRequestExpired(SamzaResourceRequest request) {
long currTime = Instant.now().toEpochMilli();
boolean requestExpired = currTime - request.getRequestTimestamp().toEpochMilli() > getRequestTimeout(request).toMillis();
if (requestExpired) {
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
index 7448e57..a65f6ff 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocatorWithoutHostAffinity.java
@@ -18,17 +18,35 @@
*/
package org.apache.samza.clustermanager;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.samza.config.Config;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.MapConfig;
+
public class MockContainerAllocatorWithoutHostAffinity extends ContainerAllocator {
public int requestedContainers = 0;
private Semaphore semaphore = new Semaphore(0);
+ private Semaphore expiredRequestSemaphore = new Semaphore(0);
+ private AtomicInteger expiredRequestCallCount = new AtomicInteger(0);
+ private volatile boolean overrideIsRequestExpired = false;
+
+ // Create a MockContainerAllocator with certain config overrides
+ public static MockContainerAllocatorWithoutHostAffinity createContainerAllocatorWithConfigOverride(
+ ClusterResourceManager resourceManager, Config config, SamzaApplicationState state,
+ ContainerManager containerManager, Config overrideConfig) {
+ Map<String, String> mergedConfig = new HashMap<>();
+ mergedConfig.putAll(config);
+ mergedConfig.putAll(overrideConfig);
+ return new MockContainerAllocatorWithoutHostAffinity(resourceManager, new MapConfig(mergedConfig), state, containerManager);
+ }
+
public MockContainerAllocatorWithoutHostAffinity(ClusterResourceManager resourceManager,
Config config, SamzaApplicationState state, ContainerManager containerManager) {
super(resourceManager, config, state, false, containerManager);
@@ -54,6 +72,29 @@
super.requestResources(processorToHostMapping);
}
+ public void setOverrideIsRequestExpired() {
+ overrideIsRequestExpired = true;
+ }
+
+ public int getExpiredRequestCallCount() {
+ return expiredRequestCallCount.get();
+ }
+
+ @Override
+ protected boolean isRequestExpired(SamzaResourceRequest request) {
+ if (!overrideIsRequestExpired) {
+ // if not set to override, then return the original result
+ return super.isRequestExpired(request);
+ }
+ expiredRequestSemaphore.release();
+ expiredRequestCallCount.incrementAndGet();
+ return true;
+ }
+
+ public boolean awaitIsRequestExpiredCall(long timeoutMs) throws InterruptedException {
+ return expiredRequestSemaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
+ }
+
public ResourceRequestState getContainerRequestState() throws Exception {
Field field = ContainerAllocator.class.getDeclaredField("resourceRequestState");
field.setAccessible(true);
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index b808296..ac5d6f3 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
@@ -165,6 +166,56 @@
}
/**
+ * See SAMZA-2601: we want to prevent an infinite loop in the case of expired request call with host affinity
+ * disabled. This test make sure we don't have that infinite loop.
+ */
+ @Test
+ public void testExpiredRequestInfiniteLoop() throws Exception {
+ Config override = new MapConfig(new HashMap<String, String>() {
+ {
+ // override to have a proper sleep interval for this test
+ put("cluster-manager.allocator.sleep.ms", "100");
+ }
+ });
+ LocalityManager mockLocalityManager = mock(LocalityManager.class);
+ when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new HashMap<>()));
+ containerAllocator =
+ MockContainerAllocatorWithoutHostAffinity.createContainerAllocatorWithConfigOverride(manager, config, state,
+ new ContainerManager(containerPlacementMetadataStore, state, manager, false, false, mockLocalityManager),
+ override);
+ MockContainerAllocatorWithoutHostAffinity mockAllocator =
+ (MockContainerAllocatorWithoutHostAffinity) containerAllocator;
+ mockAllocator.setOverrideIsRequestExpired();
+ allocatorThread = new Thread(containerAllocator);
+
+ Map<String, String> containersToHostMapping = new HashMap<String, String>() {
+ {
+ put("0", null);
+ put("1", null);
+ put("2", null);
+ put("3", null);
+ }
+ };
+
+ allocatorThread.start();
+
+ mockAllocator.requestResources(containersToHostMapping);
+ // Wait for at least one expired request call is made, which should happen.
+ // If the test passes, this should return immediately (within 100 ms). Only when the test fails will it exhaust the
+ // timeout, which is worth the wait to find out the failure
+ assertTrue(mockAllocator.awaitIsRequestExpiredCall(TimeUnit.SECONDS.toMillis(10)));
+ // TODO: we can eliminate the thread sleep if the whole container allocator and test codes are refactored to use
+ // a Clock which can be simulated and controlled.
+ Thread.sleep(500);
+ // Given that we wait for 500 ms above, and a sleep interval of 100 ms, we should roughly see 5 times the
+ // isRequestExpired is called. We give some extra buffer here (<100). Because if we do run into infinite loop,
+ // isRequestExpired would be called MILLIONS of times (4~5 million times after a dozen of runs on my machine).
+ assertTrue(
+ String.format("Too many call count: %d. Seems to be in infinite loop", mockAllocator.getExpiredRequestCallCount()),
+ mockAllocator.getExpiredRequestCallCount() < 100);
+ }
+
+ /**
* Test requestContainers with containerToHostMapping with host.affinity disabled
*/
@Test
@@ -294,6 +345,8 @@
spyThread = new Thread(spyAllocator, "Container Allocator Thread");
// Start the container allocator thread periodic assignment
spyThread.start();
+ // TODO: we can eliminate the thread sleep if the whole container allocator and test codes are refactored to use
+ // a Clock which can be simulated and controlled.
Thread.sleep(1000);
// Verify that all the request that were created were "ANY_HOST" requests
ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);