SAMZA-2504: Improve Container Placement Flaky Test & Running Time

Improvement [Bug fix]:

- Fix a flaky test for Container Placements on Request status
- Improve the running time of Test suite from 40 secs to under 4 seconds

API changes: None

Upgrade Instructions: None

Usage Instructions: None

Author: Sanil15 <sanil.jain15@gmail.com>

Reviewers: mynameborat <bharathkk@apache.org>

Closes #1376 from Sanil15/SAMZA-2504

(cherry picked from commit bcee407801d5966015009e9c1d0337ce7e13fc96)
Signed-off-by: mynameborat <bharath.kumarasubramanian@gmail.com>
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java
index 5161cfb..7eb6175 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.clustermanager.container.placement;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.samza.clustermanager.ContainerProcessManager;
 import org.apache.samza.config.ApplicationConfig;
@@ -50,7 +51,10 @@
    * RunId of the app
    */
   private final String appRunId;
-
+  /**
+   * Sleep time for container placement handler thread
+   */
+  private final int containerPlacementHandlerSleepMs;
   public ContainerPlacementRequestAllocator(ContainerPlacementMetadataStore containerPlacementMetadataStore, ContainerProcessManager manager, ApplicationConfig config) {
     Preconditions.checkNotNull(containerPlacementMetadataStore, "containerPlacementMetadataStore cannot be null");
     Preconditions.checkNotNull(manager, "ContainerProcessManager cannot be null");
@@ -58,6 +62,22 @@
     this.containerPlacementMetadataStore = containerPlacementMetadataStore;
     this.isRunning = true;
     this.appRunId = config.getRunId();
+    this.containerPlacementHandlerSleepMs = DEFAULT_CLUSTER_MANAGER_CONTAINER_PLACEMENT_HANDLER_SLEEP_MS;
+  }
+
+  @VisibleForTesting
+  /**
+   * Should only get used for testing, cannot make it package private because end to end integeration test
+   * need package private methods which live in org.apache.samza.clustermanager
+   */
+  public ContainerPlacementRequestAllocator(ContainerPlacementMetadataStore containerPlacementMetadataStore, ContainerProcessManager manager, ApplicationConfig config, int containerPlacementHandlerSleepMs) {
+    Preconditions.checkNotNull(containerPlacementMetadataStore, "containerPlacementMetadataStore cannot be null");
+    Preconditions.checkNotNull(manager, "ContainerProcessManager cannot be null");
+    this.containerProcessManager = manager;
+    this.containerPlacementMetadataStore = containerPlacementMetadataStore;
+    this.isRunning = true;
+    this.appRunId = config.getRunId();
+    this.containerPlacementHandlerSleepMs = containerPlacementHandlerSleepMs;
   }
 
   @Override
@@ -75,7 +95,7 @@
             containerPlacementMetadataStore.deleteAllContainerPlacementMessages(message.getUuid());
           }
         }
-        Thread.sleep(DEFAULT_CLUSTER_MANAGER_CONTAINER_PLACEMENT_HANDLER_SLEEP_MS);
+        Thread.sleep(containerPlacementHandlerSleepMs);
       } catch (InterruptedException e) {
         LOG.warn("Got InterruptedException in ContainerPlacementRequestAllocator thread.", e);
         Thread.currentThread().interrupt();
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index 87bf85e..49b013d 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -64,6 +64,10 @@
 
 /**
  * Set of Integration tests for container placement actions
+ *
+ * Please note that semaphores are used wherever possible, there are some Thread.sleep used for the main thread to check
+ * on state changes to atomic variables or synchroized metadata objects because of difficulty of plugging semaphores to
+ * those pieces of logic
  */
 @RunWith(MockitoJUnitRunner.class)
 public class TestContainerPlacementActions {
@@ -275,7 +279,7 @@
   public void testActionQueuingForConsecutivePlacementActions() throws Exception {
     // Spawn a Request Allocator Thread
     ContainerPlacementRequestAllocator requestAllocator =
-        new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config));
+        new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config), 100);
     Thread requestAllocatorThread = new Thread(requestAllocator, "ContainerPlacement Request Allocator Thread");
 
     requestAllocatorThread.start();
@@ -345,7 +349,7 @@
               == ContainerPlacementMessage.StatusCode.SUCCEEDED) {
         break;
       }
-      Thread.sleep(Duration.ofSeconds(5).toMillis());
+      Thread.sleep(100);
     }
 
     assertEquals(state.preferredHostRequests.get(), 4);
@@ -647,8 +651,9 @@
       fail("timed out waiting for the containers to start");
     }
 
-    // Wait for both the containers to be in running state
-    while (state.runningProcessors.size() != 2) {
+    // Wait for both the containers to be in running state & control action metadata to succeed
+    while (state.runningProcessors.size() != 2
+        && metadata.getActionStatus() != ContainerPlacementMessage.StatusCode.SUCCEEDED) {
       Thread.sleep(100);
     }
 
@@ -660,8 +665,6 @@
     assertEquals(state.anyHostRequests.get(), 0);
     // Failed processors must be empty
     assertEquals(state.failedProcessors.size(), 0);
-    // Control Action should be success in this case
-    assertEquals(metadata.getActionStatus(), ContainerPlacementMessage.StatusCode.SUCCEEDED);
   }
 
   @Test(timeout = 10000)
@@ -850,8 +853,9 @@
 
     // Spawn a Request Allocator Thread
     ContainerPlacementRequestAllocator requestAllocator =
-        new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config));
+        new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config), 100);
     Thread requestAllocatorThread = new Thread(requestAllocator, "ContainerPlacement Request Allocator Thread");
+
     requestAllocatorThread.start();
 
     doAnswer(new Answer<Void>() {
@@ -923,7 +927,7 @@
               == ContainerPlacementMessage.StatusCode.BAD_REQUEST) {
         break;
       }
-      Thread.sleep(Duration.ofSeconds(5).toMillis());
+      Thread.sleep(100);
     }
 
     // App running state should remain the same
@@ -960,7 +964,7 @@
               == ContainerPlacementMessage.StatusCode.SUCCEEDED) {
         break;
       }
-      Thread.sleep(Duration.ofSeconds(5).toMillis());
+      Thread.sleep(100);
     }
 
     assertEquals(4, state.runningProcessors.size());