| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.runtime.operators.coordination; |
| |
| import org.apache.flink.core.testutils.OneShotLatch; |
| import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; |
| import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; |
| import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService; |
| import org.apache.flink.runtime.jobgraph.OperatorID; |
| import org.apache.flink.runtime.messages.Acknowledge; |
| import org.apache.flink.runtime.operators.coordination.EventReceivingTasks.EventWithSubtask; |
| import org.apache.flink.util.ExceptionUtils; |
| import org.apache.flink.util.TestLogger; |
| |
| import org.junit.After; |
| import org.junit.Test; |
| |
| import javax.annotation.Nonnull; |
| import javax.annotation.Nullable; |
| import javax.annotation.concurrent.GuardedBy; |
| |
| import java.nio.ByteBuffer; |
| import java.util.ArrayDeque; |
| import java.util.Queue; |
| import java.util.Random; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| |
| import static org.hamcrest.Matchers.contains; |
| import static org.junit.Assert.assertArrayEquals; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.assertTrue; |
| |
| /** |
| * A test that ensures the before/after conditions around event sending and checkpoint are met. |
| * concurrency |
| */ |
| @SuppressWarnings("serial") |
| public class OperatorCoordinatorHolderTest extends TestLogger { |
| |
| private final Consumer<Throwable> globalFailureHandler = (t) -> globalFailure = t; |
| private Throwable globalFailure; |
| |
| @After |
| public void checkNoGlobalFailure() throws Exception { |
| if (globalFailure != null) { |
| ExceptionUtils.rethrowException(globalFailure); |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| |
| @Test |
| public void checkpointFutureInitiallyNotDone() throws Exception { |
| final EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks(); |
| final OperatorCoordinatorHolder holder = |
| createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); |
| |
| final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>(); |
| holder.checkpointCoordinator(1L, checkpointFuture); |
| |
| assertFalse(checkpointFuture.isDone()); |
| } |
| |
| @Test |
| public void completedCheckpointFuture() throws Exception { |
| final EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks(); |
| final OperatorCoordinatorHolder holder = |
| createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); |
| |
| final byte[] testData = new byte[] {11, 22, 33, 44}; |
| |
| final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>(); |
| holder.checkpointCoordinator(9L, checkpointFuture); |
| getCoordinator(holder).getLastTriggeredCheckpoint().complete(testData); |
| |
| assertTrue(checkpointFuture.isDone()); |
| assertArrayEquals(testData, checkpointFuture.get()); |
| } |
| |
| @Test |
| public void eventsBeforeCheckpointFutureCompletionPassThrough() throws Exception { |
| final EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks(); |
| final OperatorCoordinatorHolder holder = |
| createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); |
| |
| holder.checkpointCoordinator(1L, new CompletableFuture<>()); |
| getCoordinator(holder).getSubtaskGateway(1).sendEvent(new TestOperatorEvent(1)); |
| |
| assertThat(tasks.getSentEventsForSubtask(1), contains(new TestOperatorEvent(1))); |
| } |
| |
| @Test |
| public void eventsAreBlockedAfterCheckpointFutureCompletes() throws Exception { |
| final EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks(); |
| final OperatorCoordinatorHolder holder = |
| createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); |
| |
| triggerAndCompleteCheckpoint(holder, 10L); |
| getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(1337)); |
| |
| assertEquals(0, tasks.getNumberOfSentEvents()); |
| } |
| |
| @Test |
| public void abortedCheckpointReleasesBlockedEvents() throws Exception { |
| final EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks(); |
| final OperatorCoordinatorHolder holder = |
| createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); |
| |
| triggerAndCompleteCheckpoint(holder, 123L); |
| getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(1337)); |
| holder.abortCurrentTriggering(); |
| |
| assertThat(tasks.getSentEventsForSubtask(0), contains(new TestOperatorEvent(1337))); |
| } |
| |
| @Test |
| public void sourceBarrierInjectionReleasesBlockedEvents() throws Exception { |
| final EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks(); |
| final OperatorCoordinatorHolder holder = |
| createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); |
| |
| triggerAndCompleteCheckpoint(holder, 1111L); |
| getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(1337)); |
| holder.afterSourceBarrierInjection(1111L); |
| |
| assertThat(tasks.getSentEventsForSubtask(0), contains(new TestOperatorEvent(1337))); |
| } |
| |
| @Test |
| public void restoreOpensValveEvents() throws Exception { |
| final EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks(); |
| final OperatorCoordinatorHolder holder = |
| createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); |
| |
| triggerAndCompleteCheckpoint(holder, 1000L); |
| holder.resetToCheckpoint(1L, new byte[0]); |
| getCoordinator(holder).getSubtaskGateway(1).sendEvent(new TestOperatorEvent(999)); |
| |
| assertThat(tasks.getSentEventsForSubtask(1), contains(new TestOperatorEvent(999))); |
| } |
| |
| @Test |
| public void lateCompleteCheckpointFutureDoesNotBlockEvents() throws Exception { |
| final EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks(); |
| final OperatorCoordinatorHolder holder = |
| createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); |
| |
| final CompletableFuture<byte[]> holderFuture = new CompletableFuture<>(); |
| holder.checkpointCoordinator(1000L, holderFuture); |
| |
| final CompletableFuture<byte[]> future1 = |
| getCoordinator(holder).getLastTriggeredCheckpoint(); |
| holder.abortCurrentTriggering(); |
| |
| triggerAndCompleteCheckpoint(holder, 1010L); |
| holder.afterSourceBarrierInjection(1010L); |
| |
| future1.complete(new byte[0]); |
| |
| getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(123)); |
| |
| assertThat(tasks.events, contains(new EventWithSubtask(new TestOperatorEvent(123), 0))); |
| } |
| |
| @Test |
| public void triggeringFailsIfOtherTriggeringInProgress() throws Exception { |
| final EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks(); |
| final OperatorCoordinatorHolder holder = |
| createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); |
| |
| holder.checkpointCoordinator(11L, new CompletableFuture<>()); |
| |
| final CompletableFuture<byte[]> future = new CompletableFuture<>(); |
| holder.checkpointCoordinator(12L, future); |
| |
| assertTrue(future.isCompletedExceptionally()); |
| assertNotNull(globalFailure); |
| globalFailure = null; |
| } |
| |
| @Test |
| public void takeCheckpointAfterSuccessfulCheckpoint() throws Exception { |
| final EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks(); |
| final OperatorCoordinatorHolder holder = |
| createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); |
| |
| getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0)); |
| |
| triggerAndCompleteCheckpoint(holder, 22L); |
| getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(1)); |
| holder.afterSourceBarrierInjection(22L); |
| |
| getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(2)); |
| |
| triggerAndCompleteCheckpoint(holder, 23L); |
| getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(3)); |
| holder.afterSourceBarrierInjection(23L); |
| |
| assertThat( |
| tasks.getSentEventsForSubtask(0), |
| contains( |
| new TestOperatorEvent(0), |
| new TestOperatorEvent(1), |
| new TestOperatorEvent(2), |
| new TestOperatorEvent(3))); |
| } |
| |
| @Test |
| public void takeCheckpointAfterAbortedCheckpoint() throws Exception { |
| final EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks(); |
| final OperatorCoordinatorHolder holder = |
| createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); |
| |
| getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0)); |
| |
| triggerAndCompleteCheckpoint(holder, 22L); |
| getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(1)); |
| holder.abortCurrentTriggering(); |
| |
| getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(2)); |
| |
| triggerAndCompleteCheckpoint(holder, 23L); |
| getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(3)); |
| holder.afterSourceBarrierInjection(23L); |
| |
| assertThat( |
| tasks.getSentEventsForSubtask(0), |
| contains( |
| new TestOperatorEvent(0), |
| new TestOperatorEvent(1), |
| new TestOperatorEvent(2), |
| new TestOperatorEvent(3))); |
| } |
| |
| @Test |
| public void testFailingJobMultipleTimesNotCauseCascadingJobFailure() throws Exception { |
| Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorProvider = |
| context -> |
| new TestingOperatorCoordinator(context) { |
| @Override |
| public void handleEventFromOperator(int subtask, OperatorEvent event) { |
| context.failJob(new RuntimeException("Artificial Exception")); |
| } |
| }; |
| final EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks(); |
| final OperatorCoordinatorHolder holder = |
| createCoordinatorHolder(tasks, coordinatorProvider); |
| |
| holder.handleEventFromOperator(0, new TestOperatorEvent()); |
| assertNotNull(globalFailure); |
| final Throwable firstGlobalFailure = globalFailure; |
| |
| holder.handleEventFromOperator(1, new TestOperatorEvent()); |
| assertEquals( |
| "The global failure should be the same instance because the context" |
| + "should only take the first request from the coordinator to fail the job.", |
| firstGlobalFailure, |
| globalFailure); |
| |
| holder.resetToCheckpoint(0L, new byte[0]); |
| holder.handleEventFromOperator(1, new TestOperatorEvent()); |
| assertNotEquals( |
| "The new failures should be propagated after the coordinator " + "is reset.", |
| firstGlobalFailure, |
| globalFailure); |
| // Reset global failure to null to make the after method check happy. |
| globalFailure = null; |
| } |
| |
| @Test |
| public void checkpointCompletionWaitsForEventFutures() throws Exception { |
| final CompletableFuture<Acknowledge> ackFuture = new CompletableFuture<>(); |
| final EventReceivingTasks tasks = |
| EventReceivingTasks.createForRunningTasksWithRpcResult(ackFuture); |
| final OperatorCoordinatorHolder holder = |
| createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); |
| |
| getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0)); |
| |
| final CompletableFuture<?> checkpointFuture = triggerAndCompleteCheckpoint(holder, 22L); |
| assertFalse(checkpointFuture.isDone()); |
| |
| ackFuture.complete(Acknowledge.get()); |
| assertTrue(checkpointFuture.isDone()); |
| } |
| |
| /** |
| * This test verifies that the order of Checkpoint Completion and Event Sending observed from |
| * the outside matches that from within the OperatorCoordinator. |
| * |
| * <p>Extreme case 1: The coordinator immediately completes the checkpoint future and sends an |
| * event directly after that. |
| */ |
| @Test |
| public void verifyCheckpointEventOrderWhenCheckpointFutureCompletedImmediately() |
| throws Exception { |
| checkpointEventValueAtomicity(FutureCompletedInstantlyTestCoordinator::new); |
| } |
| |
| /** |
| * This test verifies that the order of Checkpoint Completion and Event Sending observed from |
| * the outside matches that from within the OperatorCoordinator. |
| * |
| * <p>Extreme case 2: After the checkpoint triggering, the coordinator flushes a bunch of events |
| * before completing the checkpoint future. |
| */ |
| @Test |
| public void verifyCheckpointEventOrderWhenCheckpointFutureCompletesLate() throws Exception { |
| checkpointEventValueAtomicity(FutureCompletedAfterSendingEventsCoordinator::new); |
| } |
| |
| private void checkpointEventValueAtomicity( |
| final Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor) |
| throws Exception { |
| |
| final ManuallyTriggeredScheduledExecutorService executor = |
| new ManuallyTriggeredScheduledExecutorService(); |
| final ComponentMainThreadExecutor mainThreadExecutor = |
| new ComponentMainThreadExecutorServiceAdapter( |
| (ScheduledExecutorService) executor, Thread.currentThread()); |
| |
| final EventReceivingTasks sender = EventReceivingTasks.createForRunningTasks(); |
| final OperatorCoordinatorHolder holder = |
| createCoordinatorHolder(sender, coordinatorCtor, mainThreadExecutor); |
| |
| // give the coordinator some time to emit some events. This isn't strictly necessary, |
| // but it randomly alters the timings between the coordinator's thread (event sender) and |
| // the main thread (holder). This should produce a flaky test if we missed some corner |
| // cases. |
| Thread.sleep(new Random().nextInt(10)); |
| executor.triggerAll(); |
| |
| // trigger the checkpoint - this should also shut the valve as soon as the future is |
| // completed |
| final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>(); |
| holder.checkpointCoordinator(0L, checkpointFuture); |
| executor.triggerAll(); |
| |
| // give the coordinator some time to emit some events. Same as above, this adds some |
| // randomization |
| Thread.sleep(new Random().nextInt(10)); |
| holder.close(); |
| executor.triggerAll(); |
| |
| assertTrue(checkpointFuture.isDone()); |
| final int checkpointedNumber = bytesToInt(checkpointFuture.get()); |
| |
| assertEquals(checkpointedNumber, sender.getNumberOfSentEvents()); |
| for (int i = 0; i < checkpointedNumber; i++) { |
| assertEquals( |
| i, ((TestOperatorEvent) sender.getAllSentEvents().get(i).event).getValue()); |
| } |
| } |
| |
| @Test |
| public void testCheckpointFailsIfSendingEventFailedAfterTrigger() throws Exception { |
| CompletableFuture<Acknowledge> eventSendingResult = new CompletableFuture<>(); |
| final EventReceivingTasks tasks = |
| EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult); |
| final OperatorCoordinatorHolder holder = |
| createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); |
| |
| // Send one event without finishing it. |
| getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0)); |
| |
| // Trigger one checkpoint. |
| CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>(); |
| holder.checkpointCoordinator(1, checkpointResult); |
| getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]); |
| |
| // Fail the event sending. |
| eventSendingResult.completeExceptionally(new RuntimeException("Artificial")); |
| |
| assertTrue(checkpointResult.isCompletedExceptionally()); |
| } |
| |
| @Test |
| public void testCheckpointFailsIfSendingEventFailedBeforeTrigger() throws Exception { |
| final ReorderableManualExecutorService executor = new ReorderableManualExecutorService(); |
| final ComponentMainThreadExecutor mainThreadExecutor = |
| new ComponentMainThreadExecutorServiceAdapter( |
| (ScheduledExecutorService) executor, Thread.currentThread()); |
| |
| CompletableFuture<Acknowledge> eventSendingResult = new CompletableFuture<>(); |
| final EventReceivingTasks tasks = |
| EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult); |
| |
| final OperatorCoordinatorHolder holder = |
| createCoordinatorHolder(tasks, TestingOperatorCoordinator::new, mainThreadExecutor); |
| |
| // Send one event without finishing it. |
| getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0)); |
| executor.triggerAll(); |
| |
| // Finish the event sending. This will insert one runnable that handles |
| // failed events to the executor. And we delay this runnable to |
| // simulates checkpoints triggered before the failure get processed. |
| executor.setDelayNewRunnables(true); |
| eventSendingResult.completeExceptionally(new RuntimeException("Artificial")); |
| executor.setDelayNewRunnables(false); |
| |
| // Trigger one checkpoint, the checkpoint should not be confirmed |
| // before the failure get triggered. |
| CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>(); |
| holder.checkpointCoordinator(1, checkpointResult); |
| executor.triggerAll(); |
| getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]); |
| executor.triggerAll(); |
| assertFalse(checkpointResult.isDone()); |
| |
| // Then the failure finally get processed by fail the corresponding tasks. |
| executor.executeAllDelayedRunnables(); |
| executor.triggerAll(); |
| |
| // The checkpoint would be finally confirmed. |
| assertTrue(checkpointResult.isCompletedExceptionally()); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // test actions |
| // ------------------------------------------------------------------------ |
| |
| private CompletableFuture<byte[]> triggerAndCompleteCheckpoint( |
| OperatorCoordinatorHolder holder, long checkpointId) throws Exception { |
| |
| final CompletableFuture<byte[]> future = new CompletableFuture<>(); |
| holder.checkpointCoordinator(checkpointId, future); |
| getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]); |
| return future; |
| } |
| |
| // ------------------------------------------------------------------------ |
| // miscellaneous helpers |
| // ------------------------------------------------------------------------ |
| |
| static byte[] intToBytes(int value) { |
| return ByteBuffer.allocate(4).putInt(value).array(); |
| } |
| |
| static int bytesToInt(byte[] bytes) { |
| return ByteBuffer.wrap(bytes).getInt(); |
| } |
| |
| private static TestingOperatorCoordinator getCoordinator(OperatorCoordinatorHolder holder) { |
| return (TestingOperatorCoordinator) holder.coordinator(); |
| } |
| |
| private OperatorCoordinatorHolder createCoordinatorHolder( |
| final SubtaskAccess.SubtaskAccessFactory eventTarget, |
| final Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor) |
| throws Exception { |
| |
| return createCoordinatorHolder( |
| eventTarget, |
| coordinatorCtor, |
| ComponentMainThreadExecutorServiceAdapter.forMainThread()); |
| } |
| |
| private OperatorCoordinatorHolder createCoordinatorHolder( |
| final SubtaskAccess.SubtaskAccessFactory eventTarget, |
| final Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor, |
| final ComponentMainThreadExecutor mainThreadExecutor) |
| throws Exception { |
| |
| final OperatorID opId = new OperatorID(); |
| final OperatorCoordinator.Provider provider = |
| new OperatorCoordinator.Provider() { |
| @Override |
| public OperatorID getOperatorId() { |
| return opId; |
| } |
| |
| @Override |
| public OperatorCoordinator create(OperatorCoordinator.Context context) { |
| return coordinatorCtor.apply(context); |
| } |
| }; |
| |
| final OperatorCoordinatorHolder holder = |
| OperatorCoordinatorHolder.create( |
| opId, |
| provider, |
| "test-coordinator-name", |
| getClass().getClassLoader(), |
| 3, |
| 1775, |
| eventTarget); |
| |
| holder.lazyInitialize(globalFailureHandler, mainThreadExecutor); |
| holder.start(); |
| |
| return holder; |
| } |
| |
| private static class ReorderableManualExecutorService |
| extends ManuallyTriggeredScheduledExecutorService { |
| |
| private boolean delayNewRunnables; |
| |
| private final Queue<Runnable> delayedRunnables = new ArrayDeque<>(); |
| |
| public void setDelayNewRunnables(boolean delayNewRunnables) { |
| this.delayNewRunnables = delayNewRunnables; |
| } |
| |
| @Override |
| public void execute(@Nonnull Runnable command) { |
| if (delayNewRunnables) { |
| delayedRunnables.add(command); |
| } else { |
| super.execute(command); |
| } |
| } |
| |
| public void executeAllDelayedRunnables() { |
| while (!delayedRunnables.isEmpty()) { |
| super.execute(delayedRunnables.poll()); |
| } |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| // test implementations |
| // ------------------------------------------------------------------------ |
| |
| private static final class FutureCompletedInstantlyTestCoordinator |
| extends CheckpointEventOrderTestBaseCoordinator { |
| |
| private final ReentrantLock lock = new ReentrantLock(true); |
| private final Condition condition = lock.newCondition(); |
| |
| @Nullable |
| @GuardedBy("lock") |
| private CompletableFuture<byte[]> checkpoint; |
| |
| private int num; |
| |
| FutureCompletedInstantlyTestCoordinator(Context context) { |
| super(context); |
| } |
| |
| @Override |
| public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) |
| throws Exception { |
| // before returning from this method, we wait on a condition. |
| // that way, we simulate a "context switch" just at the time when the |
| // future would be returned and make the other thread complete the future and send an |
| // event before this method returns |
| lock.lock(); |
| try { |
| checkpoint = result; |
| condition.await(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| @Override |
| protected void step() throws Exception { |
| lock.lock(); |
| try { |
| // if there is a checkpoint to complete, we complete it and immediately |
| // try to send another event, without releasing the lock. that way we |
| // force the situation as if the checkpoint get completed and an event gets |
| // sent while the triggering thread is stalled |
| if (checkpoint != null) { |
| checkpoint.complete(intToBytes(num)); |
| checkpoint = null; |
| } |
| subtaskGateways[0].sendEvent(new TestOperatorEvent(num++)); |
| condition.signalAll(); |
| } finally { |
| lock.unlock(); |
| } |
| |
| Thread.sleep(2); |
| } |
| } |
| |
| private static final class FutureCompletedAfterSendingEventsCoordinator |
| extends CheckpointEventOrderTestBaseCoordinator { |
| |
| private final OneShotLatch checkpointCompleted = new OneShotLatch(); |
| |
| @Nullable private volatile CompletableFuture<byte[]> checkpoint; |
| |
| private int num; |
| |
| FutureCompletedAfterSendingEventsCoordinator(Context context) { |
| super(context); |
| } |
| |
| @Override |
| public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) |
| throws Exception { |
| checkpoint = result; |
| } |
| |
| @Override |
| protected void step() throws Exception { |
| Thread.sleep(2); |
| |
| subtaskGateways[0].sendEvent(new TestOperatorEvent(num++)); |
| subtaskGateways[1].sendEvent(new TestOperatorEvent(num++)); |
| subtaskGateways[2].sendEvent(new TestOperatorEvent(num++)); |
| |
| final CompletableFuture<byte[]> chkpnt = this.checkpoint; |
| if (chkpnt != null) { |
| chkpnt.complete(intToBytes(num)); |
| checkpointCompleted.trigger(); |
| this.checkpoint = null; |
| } |
| } |
| |
| @Override |
| public void close() throws Exception { |
| // we need to ensure that we don't close this before we have actually completed the |
| // triggered checkpoint, to ensure the test conditions are robust. |
| checkpointCompleted.await(); |
| super.close(); |
| } |
| } |
| |
| private abstract static class CheckpointEventOrderTestBaseCoordinator |
| implements OperatorCoordinator, Runnable { |
| |
| private final Thread coordinatorThread; |
| |
| protected final Context context; |
| protected final SubtaskGateway[] subtaskGateways; |
| |
| private volatile boolean closed; |
| |
| CheckpointEventOrderTestBaseCoordinator(Context context) { |
| this.context = context; |
| this.subtaskGateways = new SubtaskGateway[context.currentParallelism()]; |
| this.coordinatorThread = new Thread(this); |
| } |
| |
| @Override |
| public void start() throws Exception {} |
| |
| @Override |
| public void close() throws Exception { |
| closed = true; |
| coordinatorThread.interrupt(); |
| coordinatorThread.join(); |
| } |
| |
| @Override |
| public void handleEventFromOperator(int subtask, OperatorEvent event) {} |
| |
| @Override |
| public void subtaskFailed(int subtask, @Nullable Throwable reason) {} |
| |
| @Override |
| public void subtaskReset(int subtask, long checkpointId) {} |
| |
| @Override |
| public void subtaskReady(int subtask, SubtaskGateway gateway) { |
| subtaskGateways[subtask] = gateway; |
| |
| for (SubtaskGateway subtaskGateway : subtaskGateways) { |
| if (subtaskGateway == null) { |
| return; |
| } |
| } |
| |
| // start only once all tasks are ready |
| coordinatorThread.start(); |
| } |
| |
| @Override |
| public abstract void checkpointCoordinator( |
| long checkpointId, CompletableFuture<byte[]> result) throws Exception; |
| |
| @Override |
| public void notifyCheckpointComplete(long checkpointId) {} |
| |
| @Override |
| public void resetToCheckpoint(long checkpointId, byte[] checkpointData) throws Exception {} |
| |
| @Override |
| public void run() { |
| try { |
| while (!closed) { |
| step(); |
| } |
| } catch (Throwable t) { |
| if (closed) { |
| return; |
| } |
| |
| // this should never happen, but just in case, print and crash the test |
| //noinspection CallToPrintStackTrace |
| t.printStackTrace(); |
| System.exit(-1); |
| } |
| } |
| |
| protected abstract void step() throws Exception; |
| } |
| } |