[FLINK-22547][tests] Harden OperatorCoordinatorHolderTest.

Ensure that the 'FutureCompletedAfterSendingEventsCoordinator' cannot exit before it has completed
the triggered checkpoint (completed the checkpoint future).
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
index cfe31a4..d3b7994 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
@@ -348,8 +349,11 @@
         final OperatorCoordinatorHolder holder =
                 createCoordinatorHolder(sender, coordinatorCtor, mainThreadExecutor);
 
-        // give the coordinator some time to emit some events
-        Thread.sleep(new Random().nextInt(10) + 20);
+        // give the coordinator some time to emit some events. This isn't strictly necessary,
+        // but it randomly alters the timings between the coordinator's thread (event sender) and
+        // the main thread (holder). This should produce a flaky test if we missed some corner
+        // cases.
+        Thread.sleep(new Random().nextInt(10));
         executor.triggerAll();
 
         // trigger the checkpoint - this should also shut the valve as soon as the future is
@@ -358,8 +362,9 @@
         holder.checkpointCoordinator(0L, checkpointFuture);
         executor.triggerAll();
 
-        // give the coordinator some time to emit some events
-        Thread.sleep(new Random().nextInt(10) + 10);
+        // give the coordinator some time to emit some events. Same as above, this adds some
+        // randomization
+        Thread.sleep(new Random().nextInt(10));
         holder.close();
         executor.triggerAll();
 
@@ -563,7 +568,7 @@
         @Override
         public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result)
                 throws Exception {
-            // before returning from this methof, we wait on a condition.
+            // before returning from this method, we wait on a condition.
             // that way, we simulate a "context switch" just at the time when the
             // future would be returned and make the other thread complete the future and send an
             // event before this method returns
@@ -601,7 +606,9 @@
     private static final class FutureCompletedAfterSendingEventsCoordinator
             extends CheckpointEventOrderTestBaseCoordinator {
 
-        @Nullable private CompletableFuture<byte[]> checkpoint;
+        private final OneShotLatch checkpointCompleted = new OneShotLatch();
+
+        @Nullable private volatile CompletableFuture<byte[]> checkpoint;
 
         private int num;
 
@@ -623,11 +630,21 @@
             subtaskGateways[1].sendEvent(new TestOperatorEvent(num++));
             subtaskGateways[2].sendEvent(new TestOperatorEvent(num++));
 
-            if (checkpoint != null) {
-                checkpoint.complete(intToBytes(num));
-                checkpoint = null;
+            final CompletableFuture<byte[]> chkpnt = this.checkpoint;
+            if (chkpnt != null) {
+                chkpnt.complete(intToBytes(num));
+                checkpointCompleted.trigger();
+                this.checkpoint = null;
             }
         }
+
+        @Override
+        public void close() throws Exception {
+            // we need to ensure that we don't close this before we have actually completed the
+            // triggered checkpoint, to ensure the test conditions are robust.
+            checkpointCompleted.await();
+            super.close();
+        }
     }
 
     private abstract static class CheckpointEventOrderTestBaseCoordinator