[FLINK-22547][tests] Harden OperatorCoordinatorHolderTest.
Ensure that the 'FutureCompletedAfterSendingEventsCoordinator' cannot exit before it has completed
the triggered checkpoint (completed the checkpoint future).
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
index cfe31a4..d3b7994 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.operators.coordination;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
@@ -348,8 +349,11 @@
final OperatorCoordinatorHolder holder =
createCoordinatorHolder(sender, coordinatorCtor, mainThreadExecutor);
- // give the coordinator some time to emit some events
- Thread.sleep(new Random().nextInt(10) + 20);
+ // give the coordinator some time to emit some events. This isn't strictly necessary,
+ // but it randomly alters the timings between the coordinator's thread (event sender) and
+ // the main thread (holder). This should produce a flaky test if we missed some corner
+ // cases.
+ Thread.sleep(new Random().nextInt(10));
executor.triggerAll();
// trigger the checkpoint - this should also shut the valve as soon as the future is
@@ -358,8 +362,9 @@
holder.checkpointCoordinator(0L, checkpointFuture);
executor.triggerAll();
- // give the coordinator some time to emit some events
- Thread.sleep(new Random().nextInt(10) + 10);
+ // give the coordinator some time to emit some events. Same as above, this adds some
+ // randomization
+ Thread.sleep(new Random().nextInt(10));
holder.close();
executor.triggerAll();
@@ -563,7 +568,7 @@
@Override
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result)
throws Exception {
- // before returning from this methof, we wait on a condition.
+ // before returning from this method, we wait on a condition.
// that way, we simulate a "context switch" just at the time when the
// future would be returned and make the other thread complete the future and send an
// event before this method returns
@@ -601,7 +606,9 @@
private static final class FutureCompletedAfterSendingEventsCoordinator
extends CheckpointEventOrderTestBaseCoordinator {
- @Nullable private CompletableFuture<byte[]> checkpoint;
+ private final OneShotLatch checkpointCompleted = new OneShotLatch();
+
+ @Nullable private volatile CompletableFuture<byte[]> checkpoint;
private int num;
@@ -623,11 +630,21 @@
subtaskGateways[1].sendEvent(new TestOperatorEvent(num++));
subtaskGateways[2].sendEvent(new TestOperatorEvent(num++));
- if (checkpoint != null) {
- checkpoint.complete(intToBytes(num));
- checkpoint = null;
+ final CompletableFuture<byte[]> chkpnt = this.checkpoint;
+ if (chkpnt != null) {
+ chkpnt.complete(intToBytes(num));
+ checkpointCompleted.trigger();
+ this.checkpoint = null;
}
}
+
+ @Override
+ public void close() throws Exception {
+ // we need to ensure that we don't close this before we have actually completed the
+ // triggered checkpoint, to ensure the test conditions are robust.
+ checkpointCompleted.await();
+ super.close();
+ }
}
private abstract static class CheckpointEventOrderTestBaseCoordinator