SAMZA-2545: Fix testBatchOperationTriggeredByBatchSize flakiness
**Problem**: The test uses sleep and expects the future to complete at the end of the sleep duration. It introduces flakiness and results in false negatives.
**Change**: Modified the tests to use latch instead of sleep
**Tests**: None
**API Changes**: None
**Upgrade Instructions**: None
**Usage Instructions**: None
Author: mynameborat <bharath.kumarasubramanian@gmail.com>
Reviewers: Dengpanyin <dyin@linkedin.com>
Closes #1379 from mynameborat/SAMZA-2545
diff --git a/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java b/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
index 2de3170..44ea246 100644
--- a/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
@@ -23,25 +23,21 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import org.apache.samza.table.ReadWriteTable;
import org.junit.Assert;
import org.junit.Test;
-import static java.lang.Thread.*;
-import static org.mockito.Mockito.*;
+import static java.lang.Thread.sleep;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class TestBatchProcessor {
- private static final int SLOW_OPERATION_TIME_MS = 500;
- private static final Supplier<Void> SLOW_UPDATE_SUPPLIER = () -> {
- try {
- sleep(SLOW_OPERATION_TIME_MS);
- } catch (InterruptedException e) {
- // ignore
- }
- return null;
- };
public static class TestCreate {
@Test
@@ -86,9 +82,18 @@
@Test
public void testBatchOperationTriggeredByBatchSize() {
final int maxBatchSize = 3;
+ final CountDownLatch batchCompletionTriggerLatch = new CountDownLatch(1);
+ final Supplier<Void> tableUpdateSupplier = () -> {
+ try {
+ batchCompletionTriggerLatch.await();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ return null;
+ };
final ReadWriteTable<Integer, Integer> table = mock(ReadWriteTable.class);
- when(table.putAllAsync(anyList())).thenReturn(CompletableFuture.supplyAsync(SLOW_UPDATE_SUPPLIER));
+ when(table.putAllAsync(anyList())).thenReturn(CompletableFuture.supplyAsync(tableUpdateSupplier));
final BatchProcessor<Integer, Integer> batchProcessor =
createBatchProcessor(table, maxBatchSize, Integer.MAX_VALUE);
@@ -104,15 +109,12 @@
}
Assert.assertEquals(0, batchProcessor.size());
- try {
- sleep(SLOW_OPERATION_TIME_MS * 2);
- } catch (InterruptedException e) {
- // ignore
- }
-
- for (int i = 0; i < maxBatchSize; i++) {
- Assert.assertTrue(futureList.get(i).isDone());
- }
+ // Complete the async call to the underlying table
+ batchCompletionTriggerLatch.countDown();
+ // The latch should eventually trigger completion to the future returned by the batch processor
+ CompletableFuture
+ .allOf(futureList.toArray(new CompletableFuture[0]))
+ .join();
}
@Test