SAMZA-2545: Fix testBatchOperationTriggeredByBatchSize flakiness

**Problem**: The test uses sleep and expects the future to complete at the end of the sleep duration. It introduces flakiness and results in false negatives.
**Change**: Modified the tests to use latch instead of sleep
**Tests**: None
**API Changes**: None
**Upgrade Instructions**: None
**Usage Instructions**: None

Author: mynameborat <bharath.kumarasubramanian@gmail.com>

Reviewers: Dengpanyin <dyin@linkedin.com>

Closes #1379 from mynameborat/SAMZA-2545
diff --git a/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java b/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
index 2de3170..44ea246 100644
--- a/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
@@ -23,25 +23,21 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.function.Supplier;
 import org.apache.samza.table.ReadWriteTable;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static java.lang.Thread.*;
-import static org.mockito.Mockito.*;
+import static java.lang.Thread.sleep;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 
 public class TestBatchProcessor {
-  private static final int SLOW_OPERATION_TIME_MS = 500;
-  private static final Supplier<Void> SLOW_UPDATE_SUPPLIER = () -> {
-    try {
-      sleep(SLOW_OPERATION_TIME_MS);
-    } catch (InterruptedException e) {
-      // ignore
-    }
-    return null;
-  };
 
   public static class TestCreate {
     @Test
@@ -86,9 +82,18 @@
     @Test
     public void testBatchOperationTriggeredByBatchSize() {
       final int maxBatchSize = 3;
+      final CountDownLatch batchCompletionTriggerLatch = new CountDownLatch(1);
+      final Supplier<Void> tableUpdateSupplier = () -> {
+        try {
+          batchCompletionTriggerLatch.await();
+        } catch (InterruptedException e) {
+          // ignore
+        }
+        return null;
+      };
 
       final ReadWriteTable<Integer, Integer> table = mock(ReadWriteTable.class);
-      when(table.putAllAsync(anyList())).thenReturn(CompletableFuture.supplyAsync(SLOW_UPDATE_SUPPLIER));
+      when(table.putAllAsync(anyList())).thenReturn(CompletableFuture.supplyAsync(tableUpdateSupplier));
 
       final BatchProcessor<Integer, Integer> batchProcessor =
           createBatchProcessor(table, maxBatchSize, Integer.MAX_VALUE);
@@ -104,15 +109,12 @@
       }
       Assert.assertEquals(0, batchProcessor.size());
 
-      try {
-        sleep(SLOW_OPERATION_TIME_MS * 2);
-      } catch (InterruptedException e) {
-        // ignore
-      }
-
-      for (int i = 0; i < maxBatchSize; i++) {
-        Assert.assertTrue(futureList.get(i).isDone());
-      }
+      // Complete the async call to the underlying table
+      batchCompletionTriggerLatch.countDown();
+      // The latch should eventually trigger completion to the future returned by the batch processor
+      CompletableFuture
+          .allOf(futureList.toArray(new CompletableFuture[0]))
+          .join();
     }
 
     @Test