| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.streaming.runtime.tasks; |
| |
| import org.apache.flink.api.common.JobID; |
| import org.apache.flink.api.common.functions.MapFunction; |
| import org.apache.flink.api.common.operators.MailboxExecutor; |
| import org.apache.flink.api.common.state.CheckpointListener; |
| import org.apache.flink.api.common.typeinfo.BasicTypeInfo; |
| import org.apache.flink.api.common.typeutils.TypeSerializer; |
| import org.apache.flink.api.common.typeutils.base.StringSerializer; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.configuration.ReadableConfig; |
| import org.apache.flink.core.fs.FSDataInputStream; |
| import org.apache.flink.core.testutils.OneShotLatch; |
| import org.apache.flink.metrics.Gauge; |
| import org.apache.flink.metrics.Metric; |
| import org.apache.flink.runtime.checkpoint.CheckpointException; |
| import org.apache.flink.runtime.checkpoint.CheckpointMetaData; |
| import org.apache.flink.runtime.checkpoint.CheckpointMetrics; |
| import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder; |
| import org.apache.flink.runtime.checkpoint.CheckpointOptions; |
| import org.apache.flink.runtime.checkpoint.CheckpointType; |
| import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; |
| import org.apache.flink.runtime.checkpoint.SubtaskState; |
| import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; |
| import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; |
| import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; |
| import org.apache.flink.runtime.execution.CancelTaskException; |
| import org.apache.flink.runtime.execution.Environment; |
| import org.apache.flink.runtime.execution.ExecutionState; |
| import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; |
| import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; |
| import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; |
| import org.apache.flink.runtime.io.network.api.writer.AvailabilityTestResultPartitionWriter; |
| import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; |
| import org.apache.flink.runtime.io.network.partition.consumer.InputGate; |
| import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel; |
| import org.apache.flink.runtime.jobgraph.OperatorID; |
| import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable; |
| import org.apache.flink.runtime.metrics.MetricNames; |
| import org.apache.flink.runtime.metrics.TimerGauge; |
| import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; |
| import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; |
| import org.apache.flink.runtime.operators.testutils.DummyEnvironment; |
| import org.apache.flink.runtime.operators.testutils.ExpectedTestException; |
| import org.apache.flink.runtime.operators.testutils.MockEnvironment; |
| import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; |
| import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder; |
| import org.apache.flink.runtime.shuffle.ShuffleEnvironment; |
| import org.apache.flink.runtime.state.AbstractKeyedStateBackend; |
| import org.apache.flink.runtime.state.AbstractStateBackend; |
| import org.apache.flink.runtime.state.CheckpointStreamFactory; |
| import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; |
| import org.apache.flink.runtime.state.DoneFuture; |
| import org.apache.flink.runtime.state.FunctionInitializationContext; |
| import org.apache.flink.runtime.state.FunctionSnapshotContext; |
| import org.apache.flink.runtime.state.KeyGroupRange; |
| import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; |
| import org.apache.flink.runtime.state.KeyedStateHandle; |
| import org.apache.flink.runtime.state.OperatorStateBackend; |
| import org.apache.flink.runtime.state.OperatorStateHandle; |
| import org.apache.flink.runtime.state.OperatorStreamStateHandle; |
| import org.apache.flink.runtime.state.SharedStateRegistry; |
| import org.apache.flink.runtime.state.SnapshotResult; |
| import org.apache.flink.runtime.state.StateBackendFactory; |
| import org.apache.flink.runtime.state.StateInitializationContext; |
| import org.apache.flink.runtime.state.StatePartitionStreamProvider; |
| import org.apache.flink.runtime.state.StreamStateHandle; |
| import org.apache.flink.runtime.state.TaskLocalStateStoreImpl; |
| import org.apache.flink.runtime.state.TaskStateManager; |
| import org.apache.flink.runtime.state.TaskStateManagerImpl; |
| import org.apache.flink.runtime.state.changelog.StateChangelogStorage; |
| import org.apache.flink.runtime.state.memory.MemoryStateBackend; |
| import org.apache.flink.runtime.taskmanager.CheckpointResponder; |
| import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions; |
| import org.apache.flink.runtime.taskmanager.Task; |
| import org.apache.flink.runtime.taskmanager.TaskExecutionState; |
| import org.apache.flink.runtime.taskmanager.TaskManagerActions; |
| import org.apache.flink.runtime.taskmanager.TestTaskBuilder; |
| import org.apache.flink.runtime.throughput.ThroughputCalculator; |
| import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder; |
| import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; |
| import org.apache.flink.streaming.api.TimeCharacteristic; |
| import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; |
| import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; |
| import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; |
| import org.apache.flink.streaming.api.functions.source.SourceFunction; |
| import org.apache.flink.streaming.api.graph.StreamConfig; |
| import org.apache.flink.streaming.api.operators.AbstractStreamOperator; |
| import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; |
| import org.apache.flink.streaming.api.operators.ChainingStrategy; |
| import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; |
| import org.apache.flink.streaming.api.operators.OneInputStreamOperator; |
| import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; |
| import org.apache.flink.streaming.api.operators.Output; |
| import org.apache.flink.streaming.api.operators.StreamMap; |
| import org.apache.flink.streaming.api.operators.StreamOperator; |
| import org.apache.flink.streaming.api.operators.StreamOperatorParameters; |
| import org.apache.flink.streaming.api.operators.StreamOperatorStateContext; |
| import org.apache.flink.streaming.api.operators.StreamSource; |
| import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; |
| import org.apache.flink.streaming.runtime.io.DataInputStatus; |
| import org.apache.flink.streaming.runtime.io.StreamInputProcessor; |
| import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; |
| import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction; |
| import org.apache.flink.streaming.util.MockStreamConfig; |
| import org.apache.flink.streaming.util.MockStreamTaskBuilder; |
| import org.apache.flink.util.CloseableIterable; |
| import org.apache.flink.util.ExceptionUtils; |
| import org.apache.flink.util.FatalExitExceptionHandler; |
| import org.apache.flink.util.FlinkRuntimeException; |
| import org.apache.flink.util.Preconditions; |
| import org.apache.flink.util.TestLogger; |
| import org.apache.flink.util.clock.SystemClock; |
| import org.apache.flink.util.concurrent.FutureUtils; |
| import org.apache.flink.util.concurrent.TestingUncaughtExceptionHandler; |
| import org.apache.flink.util.function.BiConsumerWithException; |
| import org.apache.flink.util.function.FunctionWithException; |
| import org.apache.flink.util.function.RunnableWithException; |
| import org.apache.flink.util.function.SupplierWithException; |
| |
| import org.hamcrest.CoreMatchers; |
| import org.hamcrest.Matchers; |
| import org.junit.Assert; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| import org.mockito.ArgumentCaptor; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| import javax.annotation.Nullable; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.io.ObjectInputStream; |
| import java.nio.ByteBuffer; |
| import java.time.Duration; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.OptionalLong; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.RunnableFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.function.Consumer; |
| |
| import static java.util.Arrays.asList; |
| import static java.util.Collections.singletonList; |
| import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; |
| import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND; |
| import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_ENABLED; |
| import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_PERIOD; |
| import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_TARGET; |
| import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE; |
| import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton; |
| import static org.apache.flink.runtime.io.network.api.writer.RecordWriter.DEFAULT_OUTPUT_FLUSH_THREAD_NAME; |
| import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault; |
| import static org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MAX_PRIORITY; |
| import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning; |
| import static org.apache.flink.util.Preconditions.checkState; |
| import static org.hamcrest.CoreMatchers.containsString; |
| import static org.hamcrest.CoreMatchers.not; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.Matchers.greaterThanOrEqualTo; |
| import static org.hamcrest.Matchers.instanceOf; |
| import static org.hamcrest.Matchers.is; |
| import static org.hamcrest.Matchers.sameInstance; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.ArgumentMatchers.nullable; |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Matchers.anyLong; |
| import static org.mockito.Matchers.eq; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.never; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.timeout; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| /** Tests for {@link StreamTask}. */ |
| public class StreamTaskTest extends TestLogger { |
| |
| private static OneShotLatch syncLatch; |
| |
| @Rule public ExpectedException thrown = ExpectedException.none(); |
| |
| @Test |
| public void testCancellationWaitsForActiveTimers() throws Exception { |
| StreamTaskWithBlockingTimer.reset(); |
| ResultPartitionDeploymentDescriptor descriptor = |
| new ResultPartitionDeploymentDescriptor( |
| PartitionDescriptorBuilder.newBuilder().build(), |
| NettyShuffleDescriptorBuilder.newBuilder().buildLocal(), |
| 1, |
| false); |
| Task task = |
| new TestTaskBuilder(new NettyShuffleEnvironmentBuilder().build()) |
| .setInvokable(StreamTaskWithBlockingTimer.class) |
| .setResultPartitions(singletonList(descriptor)) |
| .build(); |
| task.startTaskThread(); |
| |
| StreamTaskWithBlockingTimer.timerStarted.join(); |
| task.cancelExecution(); |
| |
| task.getTerminationFuture().join(); |
| // explicitly check for exceptions as they are ignored after cancellation |
| StreamTaskWithBlockingTimer.timerFinished.join(); |
| checkState(task.getExecutionState() == ExecutionState.CANCELED); |
| } |
| |
| @Test |
| public void testSavepointSuspendCompleted() throws Exception { |
| testSyncSavepointWithEndInput( |
| StreamTask::notifyCheckpointCompleteAsync, CheckpointType.SAVEPOINT_SUSPEND, false); |
| } |
| |
| @Test |
| public void testSavepointTerminateCompleted() throws Exception { |
| testSyncSavepointWithEndInput( |
| StreamTask::notifyCheckpointCompleteAsync, |
| CheckpointType.SAVEPOINT_TERMINATE, |
| true); |
| } |
| |
| @Test |
| public void testSavepointSuspendedAborted() throws Exception { |
| testSyncSavepointWithEndInput( |
| (task, id) -> |
| task.abortCheckpointOnBarrier( |
| id, |
| new CheckpointException( |
| UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE)), |
| CheckpointType.SAVEPOINT_SUSPEND, |
| false); |
| } |
| |
| @Test |
| public void testSavepointTerminateAborted() throws Exception { |
| thrown.expect(FlinkRuntimeException.class); |
| thrown.expectMessage("Stop-with-savepoint --drain failed."); |
| testSyncSavepointWithEndInput( |
| (task, id) -> |
| task.abortCheckpointOnBarrier( |
| id, |
| new CheckpointException( |
| UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE)), |
| CheckpointType.SAVEPOINT_TERMINATE, |
| true); |
| } |
| |
| @Test |
| public void testSavepointSuspendAbortedAsync() throws Exception { |
| testSyncSavepointWithEndInput( |
| (streamTask, abortCheckpointId) -> |
| streamTask.notifyCheckpointAbortAsync(abortCheckpointId, 0), |
| CheckpointType.SAVEPOINT_SUSPEND, |
| false); |
| } |
| |
| @Test |
| public void testSavepointTerminateAbortedAsync() throws Exception { |
| thrown.expect(FlinkRuntimeException.class); |
| thrown.expectMessage("Stop-with-savepoint --drain failed."); |
| testSyncSavepointWithEndInput( |
| (streamTask, abortCheckpointId) -> |
| streamTask.notifyCheckpointAbortAsync(abortCheckpointId, 0), |
| CheckpointType.SAVEPOINT_TERMINATE, |
| true); |
| } |
| |
| /** |
| * Test for SyncSavepoint and EndInput interactions. Targets following scenarios scenarios: |
| * |
| * <ol> |
| * <li>Thread1: notify sync savepoint |
| * <li>Thread2: endInput |
| * <li>Thread1: confirm/abort/abortAsync |
| * <li>assert inputEnded: confirmed - no, abort/abortAsync - yes |
| * </ol> |
| */ |
| private void testSyncSavepointWithEndInput( |
| BiConsumerWithException<StreamTask<?, ?>, Long, IOException> savepointResult, |
| CheckpointType checkpointType, |
| boolean expectEndInput) |
| throws Exception { |
| StreamTaskMailboxTestHarness<String> harness = |
| new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) |
| .addInput(STRING_TYPE_INFO) |
| .setupOutputForSingletonOperatorChain( |
| new TestBoundedOneInputStreamOperator()) |
| .build(); |
| |
| final long checkpointId = 1L; |
| CountDownLatch savepointTriggeredLatch = new CountDownLatch(1); |
| CountDownLatch inputEndedLatch = new CountDownLatch(1); |
| |
| MailboxExecutor executor = |
| harness.streamTask.getMailboxExecutorFactory().createExecutor(MAX_PRIORITY); |
| executor.execute( |
| () -> { |
| try { |
| harness.streamTask.triggerCheckpointOnBarrier( |
| new CheckpointMetaData(checkpointId, checkpointId), |
| new CheckpointOptions(checkpointType, getDefault()), |
| new CheckpointMetricsBuilder()); |
| } catch (IOException e) { |
| fail(e.getMessage()); |
| } |
| }, |
| "triggerCheckpointOnBarrier"); |
| new Thread( |
| () -> { |
| try { |
| savepointTriggeredLatch.await(); |
| harness.endInput(expectEndInput); |
| inputEndedLatch.countDown(); |
| } catch (InterruptedException e) { |
| fail(e.getMessage()); |
| } |
| }) |
| .start(); |
| // this mails should be executed from the one above (from triggerCheckpointOnBarrier) |
| executor.execute(savepointTriggeredLatch::countDown, "savepointTriggeredLatch"); |
| executor.execute( |
| () -> { |
| inputEndedLatch.await(); |
| savepointResult.accept(harness.streamTask, checkpointId); |
| }, |
| "savepointResult"); |
| harness.processAll(); |
| |
| Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded()); |
| } |
| |
| @Test |
| public void testCleanUpExceptionSuppressing() throws Exception { |
| OneInputStreamTaskTestHarness<String, String> testHarness = |
| new OneInputStreamTaskTestHarness<>( |
| OneInputStreamTask::new, STRING_TYPE_INFO, STRING_TYPE_INFO); |
| |
| testHarness.setupOutputForSingletonOperatorChain(); |
| |
| StreamConfig streamConfig = testHarness.getStreamConfig(); |
| streamConfig.setStreamOperator(new FailingTwiceOperator()); |
| streamConfig.setOperatorID(new OperatorID()); |
| |
| testHarness.invoke(); |
| testHarness.waitForTaskRunning(); |
| |
| testHarness.processElement(new StreamRecord<>("Doesn't matter", 0)); |
| |
| try { |
| testHarness.waitForTaskCompletion(); |
| throw new RuntimeException("Expected an exception but ran successfully"); |
| } catch (Exception ex) { |
| if (!(ex.getCause() instanceof ExpectedTestException)) { |
| throw ex; |
| } |
| } |
| |
| try { |
| testHarness.getTask().cleanUp(null); |
| } catch (Exception ex) { |
| // todo: checking for suppression if there are more exceptions during cleanup |
| if (!(ex instanceof FailingTwiceOperator.CloseException)) { |
| throw ex; |
| } |
| } |
| } |
| |
| private static class FailingTwiceOperator extends AbstractStreamOperator<String> |
| implements OneInputStreamOperator<String, String> { |
| private static final long serialVersionUID = 1L; |
| |
| @Override |
| public void processElement(StreamRecord<String> element) throws Exception { |
| throw new ExpectedTestException(); |
| } |
| |
| @Override |
| public void close() throws Exception { |
| throw new CloseException(); |
| } |
| |
| static class CloseException extends Exception { |
| public CloseException() { |
| super("Close Exception. This exception should be suppressed"); |
| } |
| } |
| } |
| |
| /** |
| * This test checks the async exceptions handling wraps the message and cause as an |
| * AsynchronousException and propagates this to the environment. |
| */ |
| @Test |
| public void streamTaskAsyncExceptionHandler_handleException_forwardsMessageProperly() { |
| MockEnvironment mockEnvironment = MockEnvironment.builder().build(); |
| RuntimeException expectedException = new RuntimeException("RUNTIME EXCEPTION"); |
| |
| final StreamTask.StreamTaskAsyncExceptionHandler asyncExceptionHandler = |
| new StreamTask.StreamTaskAsyncExceptionHandler(mockEnvironment); |
| |
| mockEnvironment.setExpectedExternalFailureCause(AsynchronousException.class); |
| final String expectedErrorMessage = "EXPECTED_ERROR MESSAGE"; |
| |
| asyncExceptionHandler.handleAsyncException(expectedErrorMessage, expectedException); |
| |
| // expect an AsynchronousException containing the supplied error details |
| Optional<? extends Throwable> actualExternalFailureCause = |
| mockEnvironment.getActualExternalFailureCause(); |
| final Throwable actualException = |
| actualExternalFailureCause.orElseThrow( |
| () -> new AssertionError("Expected exceptional completion")); |
| |
| assertThat(actualException, instanceOf(AsynchronousException.class)); |
| assertThat(actualException.getMessage(), is("EXPECTED_ERROR MESSAGE")); |
| assertThat(actualException.getCause(), is(expectedException)); |
| } |
| |
| /** |
| * This test checks that cancel calls that are issued before the operator is instantiated still |
| * lead to proper canceling. |
| */ |
| @Test |
| public void testEarlyCanceling() throws Exception { |
| final StreamConfig cfg = new StreamConfig(new Configuration()); |
| cfg.setOperatorID(new OperatorID(4711L, 42L)); |
| cfg.setStreamOperator(new SlowlyDeserializingOperator()); |
| cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); |
| |
| final TaskManagerActions taskManagerActions = spy(new NoOpTaskManagerActions()); |
| try (NettyShuffleEnvironment shuffleEnvironment = |
| new NettyShuffleEnvironmentBuilder().build()) { |
| final Task task = |
| new TestTaskBuilder(shuffleEnvironment) |
| .setInvokable(SourceStreamTask.class) |
| .setTaskConfig(cfg.getConfiguration()) |
| .setTaskManagerActions(taskManagerActions) |
| .build(); |
| |
| final TaskExecutionState state = |
| new TaskExecutionState(task.getExecutionId(), ExecutionState.RUNNING); |
| |
| task.startTaskThread(); |
| |
| verify(taskManagerActions, timeout(2000L)).updateTaskExecutionState(eq(state)); |
| |
| // send a cancel. because the operator takes a long time to deserialize, this should |
| // hit the task before the operator is deserialized |
| task.cancelExecution(); |
| |
| task.getExecutingThread().join(); |
| |
| assertFalse("Task did not cancel", task.getExecutingThread().isAlive()); |
| assertEquals(ExecutionState.CANCELED, task.getExecutionState()); |
| } |
| } |
| |
| @Test |
| public void testStateBackendLoadingAndClosing() throws Exception { |
| Configuration taskManagerConfig = new Configuration(); |
| taskManagerConfig.setString(STATE_BACKEND, TestMemoryStateBackendFactory.class.getName()); |
| |
| StreamConfig cfg = new StreamConfig(new Configuration()); |
| cfg.setStateKeySerializer(mock(TypeSerializer.class)); |
| cfg.setOperatorID(new OperatorID(4711L, 42L)); |
| TestStreamSource<Long, MockSourceFunction> streamSource = |
| new TestStreamSource<>(new MockSourceFunction()); |
| cfg.setStreamOperator(streamSource); |
| cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); |
| |
| try (ShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build()) { |
| Task task = |
| createTask( |
| StateBackendTestSource.class, |
| shuffleEnvironment, |
| cfg, |
| taskManagerConfig); |
| |
| StateBackendTestSource.fail = false; |
| task.startTaskThread(); |
| |
| // wait for clean termination |
| task.getExecutingThread().join(); |
| |
| // ensure that the state backends and stream iterables are closed ... |
| verify(TestStreamSource.operatorStateBackend).close(); |
| verify(TestStreamSource.keyedStateBackend).close(); |
| verify(TestStreamSource.rawOperatorStateInputs).close(); |
| verify(TestStreamSource.rawKeyedStateInputs).close(); |
| // ... and disposed |
| verify(TestStreamSource.operatorStateBackend).dispose(); |
| verify(TestStreamSource.keyedStateBackend).dispose(); |
| |
| assertEquals(ExecutionState.FINISHED, task.getExecutionState()); |
| } |
| } |
| |
| @Test |
| public void testStateBackendClosingOnFailure() throws Exception { |
| Configuration taskManagerConfig = new Configuration(); |
| taskManagerConfig.setString(STATE_BACKEND, TestMemoryStateBackendFactory.class.getName()); |
| |
| StreamConfig cfg = new StreamConfig(new Configuration()); |
| cfg.setStateKeySerializer(mock(TypeSerializer.class)); |
| cfg.setOperatorID(new OperatorID(4711L, 42L)); |
| TestStreamSource<Long, MockSourceFunction> streamSource = |
| new TestStreamSource<>(new MockSourceFunction()); |
| cfg.setStreamOperator(streamSource); |
| cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); |
| |
| try (NettyShuffleEnvironment shuffleEnvironment = |
| new NettyShuffleEnvironmentBuilder().build()) { |
| Task task = |
| createTask( |
| StateBackendTestSource.class, |
| shuffleEnvironment, |
| cfg, |
| taskManagerConfig); |
| |
| StateBackendTestSource.fail = true; |
| task.startTaskThread(); |
| |
| // wait for clean termination |
| task.getExecutingThread().join(); |
| |
| // ensure that the state backends and stream iterables are closed ... |
| verify(TestStreamSource.operatorStateBackend).close(); |
| verify(TestStreamSource.keyedStateBackend).close(); |
| verify(TestStreamSource.rawOperatorStateInputs).close(); |
| verify(TestStreamSource.rawKeyedStateInputs).close(); |
| // ... and disposed |
| verify(TestStreamSource.operatorStateBackend).dispose(); |
| verify(TestStreamSource.keyedStateBackend).dispose(); |
| |
| assertEquals(ExecutionState.FAILED, task.getExecutionState()); |
| } |
| } |
| |
| @Test |
| public void testCanceleablesCanceledOnCancelTaskError() throws Exception { |
| syncLatch = new OneShotLatch(); |
| |
| StreamConfig cfg = new StreamConfig(new Configuration()); |
| try (NettyShuffleEnvironment shuffleEnvironment = |
| new NettyShuffleEnvironmentBuilder().build()) { |
| |
| Task task = |
| createTask( |
| CancelFailingTask.class, shuffleEnvironment, cfg, new Configuration()); |
| |
| // start the task and wait until it runs |
| // execution state RUNNING is not enough, we need to wait until the stream task's run() |
| // method |
| // is entered |
| task.startTaskThread(); |
| syncLatch.await(); |
| |
| // cancel the execution - this should lead to smooth shutdown |
| task.cancelExecution(); |
| task.getExecutingThread().join(); |
| |
| assertEquals(ExecutionState.CANCELED, task.getExecutionState()); |
| } |
| } |
| |
| /** |
| * A task that locks for ever, fail in {@link #cancelTask()}. It can be only shut down cleanly |
| * if {@link StreamTask#getCancelables()} are closed properly. |
| */ |
| public static class CancelFailingTask |
| extends StreamTask<String, AbstractStreamOperator<String>> { |
| |
| public CancelFailingTask(Environment env) throws Exception { |
| super(env); |
| } |
| |
| @Override |
| protected void init() {} |
| |
| @Override |
| protected void processInput(MailboxDefaultAction.Controller controller) throws Exception { |
| final OneShotLatch latch = new OneShotLatch(); |
| final Object lock = new Object(); |
| |
| LockHolder holder = new LockHolder(lock, latch); |
| holder.start(); |
| try { |
| // cancellation should try and cancel this |
| getCancelables().registerCloseable(holder); |
| |
| // wait till the lock holder has the lock |
| latch.await(); |
| |
| // we are at the point where cancelling can happen |
| syncLatch.trigger(); |
| |
| // try to acquire the lock - this is not possible as long as the lock holder |
| // thread lives |
| //noinspection SynchronizationOnLocalVariableOrMethodParameter |
| synchronized (lock) { |
| // nothing |
| } |
| } finally { |
| holder.close(); |
| } |
| controller.suspendDefaultAction(); |
| mailboxProcessor.suspend(); |
| } |
| |
| @Override |
| protected void cleanUpInternal() {} |
| |
| @Override |
| protected void cancelTask() throws Exception { |
| throw new Exception("test exception"); |
| } |
| |
| /** A thread that holds a lock as long as it lives. */ |
| private static final class LockHolder extends Thread implements Closeable { |
| |
| private final OneShotLatch trigger; |
| private final Object lock; |
| private volatile boolean canceled; |
| |
| private LockHolder(Object lock, OneShotLatch trigger) { |
| this.lock = lock; |
| this.trigger = trigger; |
| } |
| |
| @Override |
| public void run() { |
| synchronized (lock) { |
| while (!canceled) { |
| // signal that we grabbed the lock |
| trigger.trigger(); |
| |
| // basically freeze this thread |
| try { |
| //noinspection SleepWhileHoldingLock |
| Thread.sleep(1000000000); |
| } catch (InterruptedException ignored) { |
| } |
| } |
| } |
| } |
| |
| public void cancel() { |
| canceled = true; |
| } |
| |
| @Override |
| public void close() { |
| canceled = true; |
| interrupt(); |
| } |
| } |
| } |
| |
| /** |
| * CancelTaskException can be thrown in a down stream task, for example if an upstream task was |
| * cancelled first and those two tasks were connected via {@link |
| * org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel}. {@link StreamTask} |
| * should be able to correctly handle such situation. |
| */ |
| @Test |
| public void testCancelTaskExceptionHandling() throws Exception { |
| StreamConfig cfg = new StreamConfig(new Configuration()); |
| |
| try (NettyShuffleEnvironment shuffleEnvironment = |
| new NettyShuffleEnvironmentBuilder().build()) { |
| Task task = |
| createTask( |
| CancelThrowingTask.class, shuffleEnvironment, cfg, new Configuration()); |
| |
| task.startTaskThread(); |
| task.getExecutingThread().join(); |
| |
| assertEquals(ExecutionState.CANCELED, task.getExecutionState()); |
| } |
| } |
| |
| /** A task that throws {@link CancelTaskException}. */ |
| public static class CancelThrowingTask |
| extends StreamTask<String, AbstractStreamOperator<String>> { |
| |
| public CancelThrowingTask(Environment env) throws Exception { |
| super(env); |
| } |
| |
| @Override |
| protected void init() {} |
| |
| @Override |
| protected void processInput(MailboxDefaultAction.Controller controller) { |
| throw new CancelTaskException(); |
| } |
| } |
| |
| @Test |
| public void testDecliningCheckpointStreamOperator() throws Exception { |
| DummyEnvironment dummyEnvironment = new DummyEnvironment(); |
| |
| // mock the returned snapshots |
| OperatorSnapshotFutures operatorSnapshotResult1 = mock(OperatorSnapshotFutures.class); |
| OperatorSnapshotFutures operatorSnapshotResult2 = mock(OperatorSnapshotFutures.class); |
| |
| final Exception testException = new ExpectedTestException(); |
| |
| RunningTask<MockStreamTask> task = |
| runTask( |
| () -> |
| createMockStreamTask( |
| dummyEnvironment, |
| operatorChain( |
| streamOperatorWithSnapshotException(testException), |
| streamOperatorWithSnapshot(operatorSnapshotResult1), |
| streamOperatorWithSnapshot( |
| operatorSnapshotResult2)))); |
| MockStreamTask streamTask = task.streamTask; |
| |
| waitTaskIsRunning(streamTask, task.invocationFuture); |
| |
| streamTask.triggerCheckpointAsync( |
| new CheckpointMetaData(42L, 1L), |
| CheckpointOptions.forCheckpointWithDefaultLocation()); |
| |
| try { |
| task.waitForTaskCompletion(false); |
| } catch (Exception ex) { |
| if (!ExceptionUtils.findThrowable(ex, ExpectedTestException.class).isPresent()) { |
| throw ex; |
| } |
| } |
| |
| verify(operatorSnapshotResult1).cancel(); |
| verify(operatorSnapshotResult2).cancel(); |
| } |
| |
| /** |
| * Tests that uncaught exceptions in the async part of a checkpoint operation are forwarded to |
| * the uncaught exception handler. See <a |
| * href="https://issues.apache.org/jira/browse/FLINK-12889">FLINK-12889</a>. |
| */ |
| @Test |
| public void testUncaughtExceptionInAsynchronousCheckpointingOperation() throws Exception { |
| final RuntimeException failingCause = new RuntimeException("Test exception"); |
| FailingDummyEnvironment failingDummyEnvironment = new FailingDummyEnvironment(failingCause); |
| |
| // mock the returned snapshots |
| OperatorSnapshotFutures operatorSnapshotResult = |
| new OperatorSnapshotFutures( |
| ExceptionallyDoneFuture.of(failingCause), |
| DoneFuture.of(SnapshotResult.empty()), |
| DoneFuture.of(SnapshotResult.empty()), |
| DoneFuture.of(SnapshotResult.empty()), |
| DoneFuture.of(SnapshotResult.empty()), |
| DoneFuture.of(SnapshotResult.empty())); |
| |
| final TestingUncaughtExceptionHandler uncaughtExceptionHandler = |
| new TestingUncaughtExceptionHandler(); |
| |
| RunningTask<MockStreamTask> task = |
| runTask( |
| () -> |
| new MockStreamTask( |
| failingDummyEnvironment, |
| operatorChain( |
| streamOperatorWithSnapshot(operatorSnapshotResult)), |
| uncaughtExceptionHandler)); |
| MockStreamTask streamTask = task.streamTask; |
| |
| waitTaskIsRunning(streamTask, task.invocationFuture); |
| |
| streamTask.triggerCheckpointAsync( |
| new CheckpointMetaData(42L, 1L), |
| CheckpointOptions.forCheckpointWithDefaultLocation()); |
| |
| final Throwable uncaughtException = uncaughtExceptionHandler.waitForUncaughtException(); |
| assertThat(uncaughtException, is(failingCause)); |
| |
| streamTask.finishInput(); |
| task.waitForTaskCompletion(false); |
| } |
| |
| /** |
| * Tests that in case of a failing AsyncCheckpointRunnable all operator snapshot results are |
| * cancelled and all non partitioned state handles are discarded. |
| */ |
| @Test |
| public void testFailingAsyncCheckpointRunnable() throws Exception { |
| |
| // mock the new state operator snapshots |
| OperatorSnapshotFutures operatorSnapshotResult1 = mock(OperatorSnapshotFutures.class); |
| OperatorSnapshotFutures operatorSnapshotResult2 = mock(OperatorSnapshotFutures.class); |
| OperatorSnapshotFutures operatorSnapshotResult3 = mock(OperatorSnapshotFutures.class); |
| |
| RunnableFuture<SnapshotResult<OperatorStateHandle>> failingFuture = |
| mock(RunnableFuture.class); |
| when(failingFuture.get()) |
| .thenThrow(new ExecutionException(new Exception("Test exception"))); |
| |
| when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture); |
| |
| try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build()) { |
| RunningTask<MockStreamTask> task = |
| runTask( |
| () -> |
| createMockStreamTask( |
| mockEnvironment, |
| operatorChain( |
| streamOperatorWithSnapshot( |
| operatorSnapshotResult1), |
| streamOperatorWithSnapshot( |
| operatorSnapshotResult2), |
| streamOperatorWithSnapshot( |
| operatorSnapshotResult3)))); |
| |
| MockStreamTask streamTask = task.streamTask; |
| |
| waitTaskIsRunning(streamTask, task.invocationFuture); |
| |
| mockEnvironment.setExpectedExternalFailureCause(Throwable.class); |
| streamTask |
| .triggerCheckpointAsync( |
| new CheckpointMetaData(42L, 1L), |
| CheckpointOptions.forCheckpointWithDefaultLocation()) |
| .get(); |
| |
| // wait for the completion of the async task |
| ExecutorService executor = streamTask.getAsyncOperationsThreadPool(); |
| executor.shutdown(); |
| if (!executor.awaitTermination(10000L, TimeUnit.MILLISECONDS)) { |
| fail( |
| "Executor did not shut down within the given timeout. This indicates that the " |
| + "checkpointing did not resume."); |
| } |
| |
| assertTrue(mockEnvironment.getActualExternalFailureCause().isPresent()); |
| |
| verify(operatorSnapshotResult1).cancel(); |
| verify(operatorSnapshotResult2).cancel(); |
| verify(operatorSnapshotResult3).cancel(); |
| |
| streamTask.finishInput(); |
| task.waitForTaskCompletion(false); |
| } |
| } |
| |
| /** |
| * FLINK-5667 |
| * |
| * <p>Tests that a concurrent cancel operation does not discard the state handles of an |
| * acknowledged checkpoint. The situation can only happen if the cancel call is executed after |
| * Environment.acknowledgeCheckpoint() and before the CloseableRegistry.unregisterClosable() |
| * call. |
| */ |
| @Test |
| public void testAsyncCheckpointingConcurrentCloseAfterAcknowledge() throws Exception { |
| |
| final OneShotLatch acknowledgeCheckpointLatch = new OneShotLatch(); |
| final OneShotLatch completeAcknowledge = new OneShotLatch(); |
| |
| CheckpointResponder checkpointResponder = mock(CheckpointResponder.class); |
| doAnswer( |
| new Answer() { |
| @Override |
| public Object answer(InvocationOnMock invocation) { |
| acknowledgeCheckpointLatch.trigger(); |
| |
| // block here so that we can issue the concurrent cancel call |
| while (true) { |
| try { |
| // wait until we successfully await (no pun intended) |
| completeAcknowledge.await(); |
| |
| // when await() returns normally, we break out of the loop |
| break; |
| } catch (InterruptedException e) { |
| // survive interruptions that arise from thread pool |
| // shutdown |
| // production code cannot actually throw |
| // InterruptedException from |
| // checkpoint acknowledgement |
| } |
| } |
| |
| return null; |
| } |
| }) |
| .when(checkpointResponder) |
| .acknowledgeCheckpoint( |
| any(JobID.class), |
| any(ExecutionAttemptID.class), |
| anyLong(), |
| any(CheckpointMetrics.class), |
| any(TaskStateSnapshot.class)); |
| |
| TaskStateManager taskStateManager = |
| new TaskStateManagerImpl( |
| new JobID(1L, 2L), |
| new ExecutionAttemptID(), |
| mock(TaskLocalStateStoreImpl.class), |
| mock(StateChangelogStorage.class), |
| null, |
| checkpointResponder); |
| |
| KeyedStateHandle managedKeyedStateHandle = mock(KeyedStateHandle.class); |
| KeyedStateHandle rawKeyedStateHandle = mock(KeyedStateHandle.class); |
| OperatorStateHandle managedOperatorStateHandle = mock(OperatorStreamStateHandle.class); |
| OperatorStateHandle rawOperatorStateHandle = mock(OperatorStreamStateHandle.class); |
| |
| OperatorSnapshotFutures operatorSnapshotResult = |
| new OperatorSnapshotFutures( |
| DoneFuture.of(SnapshotResult.of(managedKeyedStateHandle)), |
| DoneFuture.of(SnapshotResult.of(rawKeyedStateHandle)), |
| DoneFuture.of(SnapshotResult.of(managedOperatorStateHandle)), |
| DoneFuture.of(SnapshotResult.of(rawOperatorStateHandle)), |
| DoneFuture.of(SnapshotResult.empty()), |
| DoneFuture.of(SnapshotResult.empty())); |
| |
| try (MockEnvironment mockEnvironment = |
| new MockEnvironmentBuilder() |
| .setTaskName("mock-task") |
| .setTaskStateManager(taskStateManager) |
| .build()) { |
| |
| RunningTask<MockStreamTask> task = |
| runTask( |
| () -> |
| createMockStreamTask( |
| mockEnvironment, |
| operatorChain( |
| streamOperatorWithSnapshot( |
| operatorSnapshotResult)))); |
| |
| MockStreamTask streamTask = task.streamTask; |
| waitTaskIsRunning(streamTask, task.invocationFuture); |
| |
| final long checkpointId = 42L; |
| streamTask.triggerCheckpointAsync( |
| new CheckpointMetaData(checkpointId, 1L), |
| CheckpointOptions.forCheckpointWithDefaultLocation()); |
| |
| acknowledgeCheckpointLatch.await(); |
| |
| ArgumentCaptor<TaskStateSnapshot> subtaskStateCaptor = |
| ArgumentCaptor.forClass(TaskStateSnapshot.class); |
| |
| // check that the checkpoint has been completed |
| verify(checkpointResponder) |
| .acknowledgeCheckpoint( |
| any(JobID.class), |
| any(ExecutionAttemptID.class), |
| eq(checkpointId), |
| any(CheckpointMetrics.class), |
| subtaskStateCaptor.capture()); |
| |
| TaskStateSnapshot subtaskStates = subtaskStateCaptor.getValue(); |
| OperatorSubtaskState subtaskState = |
| subtaskStates.getSubtaskStateMappings().iterator().next().getValue(); |
| |
| // check that the subtask state contains the expected state handles |
| assertEquals(singleton(managedKeyedStateHandle), subtaskState.getManagedKeyedState()); |
| assertEquals(singleton(rawKeyedStateHandle), subtaskState.getRawKeyedState()); |
| assertEquals( |
| singleton(managedOperatorStateHandle), subtaskState.getManagedOperatorState()); |
| assertEquals(singleton(rawOperatorStateHandle), subtaskState.getRawOperatorState()); |
| |
| // check that the state handles have not been discarded |
| verify(managedKeyedStateHandle, never()).discardState(); |
| verify(rawKeyedStateHandle, never()).discardState(); |
| verify(managedOperatorStateHandle, never()).discardState(); |
| verify(rawOperatorStateHandle, never()).discardState(); |
| |
| streamTask.cancel(); |
| |
| completeAcknowledge.trigger(); |
| |
| // canceling the stream task after it has acknowledged the checkpoint should not discard |
| // the state handles |
| verify(managedKeyedStateHandle, never()).discardState(); |
| verify(rawKeyedStateHandle, never()).discardState(); |
| verify(managedOperatorStateHandle, never()).discardState(); |
| verify(rawOperatorStateHandle, never()).discardState(); |
| |
| task.waitForTaskCompletion(true); |
| } |
| } |
| |
| /** |
| * FLINK-5667 |
| * |
| * <p>Tests that a concurrent cancel operation discards the state handles of a not yet |
| * acknowledged checkpoint and prevents sending an acknowledge message to the |
| * CheckpointCoordinator. The situation can only happen if the cancel call is executed before |
| * Environment.acknowledgeCheckpoint(). |
| */ |
| @Test |
| public void testAsyncCheckpointingConcurrentCloseBeforeAcknowledge() throws Exception { |
| |
| final TestingKeyedStateHandle managedKeyedStateHandle = new TestingKeyedStateHandle(); |
| final TestingKeyedStateHandle rawKeyedStateHandle = new TestingKeyedStateHandle(); |
| final TestingOperatorStateHandle managedOperatorStateHandle = |
| new TestingOperatorStateHandle(); |
| final TestingOperatorStateHandle rawOperatorStateHandle = new TestingOperatorStateHandle(); |
| |
| final BlockingRunnableFuture<SnapshotResult<KeyedStateHandle>> rawKeyedStateHandleFuture = |
| new BlockingRunnableFuture<>(2, SnapshotResult.of(rawKeyedStateHandle)); |
| OperatorSnapshotFutures operatorSnapshotResult = |
| new OperatorSnapshotFutures( |
| DoneFuture.of(SnapshotResult.of(managedKeyedStateHandle)), |
| rawKeyedStateHandleFuture, |
| DoneFuture.of(SnapshotResult.of(managedOperatorStateHandle)), |
| DoneFuture.of(SnapshotResult.of(rawOperatorStateHandle)), |
| DoneFuture.of(SnapshotResult.empty()), |
| DoneFuture.of(SnapshotResult.empty())); |
| |
| final OneInputStreamOperator<String, String> streamOperator = |
| streamOperatorWithSnapshot(operatorSnapshotResult); |
| |
| final AcknowledgeDummyEnvironment mockEnvironment = new AcknowledgeDummyEnvironment(); |
| |
| RunningTask<MockStreamTask> task = |
| runTask(() -> createMockStreamTask(mockEnvironment, operatorChain(streamOperator))); |
| |
| waitTaskIsRunning(task.streamTask, task.invocationFuture); |
| |
| final long checkpointId = 42L; |
| task.streamTask.triggerCheckpointAsync( |
| new CheckpointMetaData(checkpointId, 1L), |
| CheckpointOptions.forCheckpointWithDefaultLocation()); |
| |
| rawKeyedStateHandleFuture.awaitRun(); |
| |
| task.streamTask.cancel(); |
| |
| final FutureUtils.ConjunctFuture<Void> discardFuture = |
| FutureUtils.waitForAll( |
| asList( |
| managedKeyedStateHandle.getDiscardFuture(), |
| rawKeyedStateHandle.getDiscardFuture(), |
| managedOperatorStateHandle.getDiscardFuture(), |
| rawOperatorStateHandle.getDiscardFuture())); |
| |
| // make sure that all state handles have been discarded |
| discardFuture.get(); |
| |
| try { |
| mockEnvironment.getAcknowledgeCheckpointFuture().get(10L, TimeUnit.MILLISECONDS); |
| fail("The checkpoint should not get acknowledged."); |
| } catch (TimeoutException expected) { |
| // future should not be completed |
| } |
| |
| task.waitForTaskCompletion(true); |
| } |
| |
| /** |
| * FLINK-5985 |
| * |
| * <p>This test ensures that empty snapshots (no op/keyed stated whatsoever) will be reported as |
| * stateless tasks. This happens by translating an empty {@link SubtaskState} into reporting |
| * 'null' to #acknowledgeCheckpoint. |
| */ |
| @Test |
| public void testEmptySubtaskStateLeadsToStatelessAcknowledgment() throws Exception { |
| |
| // latch blocks until the async checkpoint thread acknowledges |
| final OneShotLatch checkpointCompletedLatch = new OneShotLatch(); |
| final List<SubtaskState> checkpointResult = new ArrayList<>(1); |
| |
| CheckpointResponder checkpointResponder = mock(CheckpointResponder.class); |
| doAnswer( |
| new Answer() { |
| @Override |
| public Object answer(InvocationOnMock invocation) throws Throwable { |
| SubtaskState subtaskState = invocation.getArgument(4); |
| checkpointResult.add(subtaskState); |
| checkpointCompletedLatch.trigger(); |
| return null; |
| } |
| }) |
| .when(checkpointResponder) |
| .acknowledgeCheckpoint( |
| any(JobID.class), |
| any(ExecutionAttemptID.class), |
| anyLong(), |
| any(CheckpointMetrics.class), |
| nullable(TaskStateSnapshot.class)); |
| |
| TaskStateManager taskStateManager = |
| new TaskStateManagerImpl( |
| new JobID(1L, 2L), |
| new ExecutionAttemptID(), |
| mock(TaskLocalStateStoreImpl.class), |
| mock(StateChangelogStorage.class), |
| null, |
| checkpointResponder); |
| |
| // mock the operator with empty snapshot result (all state handles are null) |
| OneInputStreamOperator<String, String> statelessOperator = |
| streamOperatorWithSnapshot(new OperatorSnapshotFutures()); |
| |
| try (MockEnvironment mockEnvironment = |
| new MockEnvironmentBuilder().setTaskStateManager(taskStateManager).build()) { |
| |
| RunningTask<MockStreamTask> task = |
| runTask( |
| () -> |
| createMockStreamTask( |
| mockEnvironment, operatorChain(statelessOperator))); |
| |
| waitTaskIsRunning(task.streamTask, task.invocationFuture); |
| |
| task.streamTask.triggerCheckpointAsync( |
| new CheckpointMetaData(42L, 1L), |
| CheckpointOptions.forCheckpointWithDefaultLocation()); |
| |
| checkpointCompletedLatch.await(30, TimeUnit.SECONDS); |
| |
| // ensure that 'null' was acknowledged as subtask state |
| Assert.assertNull(checkpointResult.get(0)); |
| |
| task.streamTask.cancel(); |
| task.waitForTaskCompletion(true); |
| } |
| } |
| |
| /** |
| * Tests that {@link StreamTask#notifyCheckpointCompleteAsync(long)} is not relayed to closed |
| * operators. |
| * |
| * <p>See FLINK-16383. |
| */ |
| @Test |
| public void testNotifyCheckpointOnClosedOperator() throws Throwable { |
| ClosingOperator<Integer> operator = new ClosingOperator<>(); |
| StreamTaskMailboxTestHarnessBuilder<Integer> builder = |
| new StreamTaskMailboxTestHarnessBuilder<>( |
| OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) |
| .addInput(BasicTypeInfo.INT_TYPE_INFO); |
| StreamTaskMailboxTestHarness<Integer> harness = |
| builder.setupOutputForSingletonOperatorChain(operator).build(); |
| // keeps the mailbox from suspending |
| harness.setAutoProcess(false); |
| harness.processElement(new StreamRecord<>(1)); |
| harness.streamTask.runMailboxStep(); |
| |
| harness.streamTask.notifyCheckpointCompleteAsync(1); |
| harness.streamTask.runMailboxStep(); |
| assertEquals(1, ClosingOperator.notified.get()); |
| assertFalse(ClosingOperator.closed.get()); |
| |
| // close operators directly, so that task is still fully running |
| harness.streamTask.operatorChain.finishOperators(harness.streamTask.getActionExecutor()); |
| harness.streamTask.operatorChain.closeAllOperators(); |
| harness.streamTask.notifyCheckpointCompleteAsync(2); |
| harness.streamTask.runMailboxStep(); |
| assertEquals(1, ClosingOperator.notified.get()); |
| assertTrue(ClosingOperator.closed.get()); |
| } |
| |
| @Test |
| public void testFailToConfirmCheckpointCompleted() throws Exception { |
| testFailToConfirmCheckpointMessage( |
| streamTask -> streamTask.notifyCheckpointCompleteAsync(1L)); |
| } |
| |
| @Test |
| public void testFailToConfirmCheckpointAborted() throws Exception { |
| testFailToConfirmCheckpointMessage( |
| streamTask -> streamTask.notifyCheckpointAbortAsync(1L, 0L)); |
| } |
| |
| private void testFailToConfirmCheckpointMessage(Consumer<StreamTask<?, ?>> consumer) |
| throws Exception { |
| StreamMap<Integer, Integer> streamMap = |
| new StreamMap<>(new FailOnNotifyCheckpointMapper<>()); |
| StreamTaskMailboxTestHarnessBuilder<Integer> builder = |
| new StreamTaskMailboxTestHarnessBuilder<>( |
| OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) |
| .addInput(BasicTypeInfo.INT_TYPE_INFO); |
| StreamTaskMailboxTestHarness<Integer> harness = |
| builder.setupOutputForSingletonOperatorChain(streamMap).build(); |
| |
| try { |
| consumer.accept(harness.streamTask); |
| harness.streamTask.runMailboxLoop(); |
| fail(); |
| } catch (ExpectedTestException expected) { |
| // expected exceptionestProcessWithUnAvailableInput |
| } |
| } |
| |
| /** |
| * Tests exeptions is thrown by triggering checkpoint if operators are closed. This was |
| * initially implemented for FLINK-16383. However after FLINK-2491 operators lifecycle has |
| * changed and now we: (1) redefined close() to dispose(). After closing operators, there should |
| * be no opportunity to invoke anything on the task. close() mentioned in FLINK-16383 is now |
| * more like finish(). (2) We support triggering and performing checkpoints if operators are |
| * finished. |
| */ |
| @Test |
| public void testCheckpointFailueOnClosedOperator() throws Throwable { |
| ClosingOperator<Integer> operator = new ClosingOperator<>(); |
| StreamTaskMailboxTestHarnessBuilder<Integer> builder = |
| new StreamTaskMailboxTestHarnessBuilder<>( |
| OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) |
| .addInput(BasicTypeInfo.INT_TYPE_INFO); |
| try (StreamTaskMailboxTestHarness<Integer> harness = |
| builder.setupOutputForSingletonOperatorChain(operator).build()) { |
| // keeps the mailbox from suspending |
| harness.setAutoProcess(false); |
| harness.processElement(new StreamRecord<>(1)); |
| |
| harness.streamTask.operatorChain.finishOperators( |
| harness.streamTask.getActionExecutor()); |
| harness.streamTask.operatorChain.closeAllOperators(); |
| assertTrue(ClosingOperator.closed.get()); |
| |
| harness.streamTask.triggerCheckpointOnBarrier( |
| new CheckpointMetaData(1, 0), |
| CheckpointOptions.forCheckpointWithDefaultLocation(), |
| new CheckpointMetricsBuilder()); |
| } catch (Exception ex) { |
| ExceptionUtils.assertThrowableWithMessage( |
| ex, "OperatorChain and Task should never be closed at this point"); |
| } |
| } |
| |
| @Test |
| public void testExecuteMailboxActionsAfterLeavingInputProcessorMailboxLoop() throws Exception { |
| OneShotLatch latch = new OneShotLatch(); |
| try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build()) { |
| RunningTask<StreamTask<?, ?>> task = |
| runTask( |
| () -> |
| new StreamTask<Object, StreamOperator<Object>>( |
| mockEnvironment) { |
| @Override |
| protected void init() throws Exception {} |
| |
| @Override |
| protected void processInput( |
| MailboxDefaultAction.Controller controller) |
| throws Exception { |
| mailboxProcessor |
| .getMailboxExecutor(0) |
| .execute(latch::trigger, "trigger"); |
| controller.suspendDefaultAction(); |
| mailboxProcessor.suspend(); |
| } |
| }); |
| latch.await(); |
| task.waitForTaskCompletion(false); |
| } |
| } |
| |
| /** |
| * Tests that some StreamTask methods are called only in the main task's thread. Currently, the |
| * main task's thread is the thread that creates the task. |
| */ |
| @Test |
| public void testThreadInvariants() throws Throwable { |
| Configuration taskConfiguration = new Configuration(); |
| StreamConfig streamConfig = new StreamConfig(taskConfiguration); |
| streamConfig.setStreamOperator(new StreamMap<>(value -> value)); |
| streamConfig.setOperatorID(new OperatorID()); |
| try (MockEnvironment mockEnvironment = |
| new MockEnvironmentBuilder().setTaskConfiguration(taskConfiguration).build()) { |
| |
| ClassLoader taskClassLoader = new TestUserCodeClassLoader(); |
| |
| RunningTask<ThreadInspectingTask> runningTask = |
| runTask( |
| () -> { |
| Thread.currentThread().setContextClassLoader(taskClassLoader); |
| return new ThreadInspectingTask(mockEnvironment); |
| }); |
| runningTask.invocationFuture.get(); |
| |
| assertThat( |
| runningTask.streamTask.getTaskClassLoader(), is(sameInstance(taskClassLoader))); |
| } |
| } |
| |
| @Test |
| public void testRecordWriterClosedOnTransitDeployingStateError() throws Exception { |
| testRecordWriterClosedOnTransitStateError(ExecutionState.DEPLOYING); |
| } |
| |
| @Test |
| public void testRecordWriterClosedOnTransitInitializingStateError() throws Exception { |
| testRecordWriterClosedOnTransitStateError(ExecutionState.INITIALIZING); |
| } |
| |
| @Test |
| public void testRecordWriterClosedOnTransitRunningStateError() throws Exception { |
| testRecordWriterClosedOnTransitStateError(ExecutionState.RUNNING); |
| } |
| |
| private void testRecordWriterClosedOnTransitStateError(ExecutionState executionState) |
| throws Exception { |
| // Throw the exception when the state updating to the expected one. |
| NoOpTaskManagerActions taskManagerActions = |
| new NoOpTaskManagerActions() { |
| @Override |
| public void updateTaskExecutionState(TaskExecutionState taskExecutionState) { |
| if (taskExecutionState.getExecutionState() == executionState) { |
| throw new ExpectedTestException(); |
| } |
| } |
| }; |
| |
| testRecordWriterClosedOnError( |
| env -> |
| taskBuilderWithConfiguredRecordWriter(env) |
| .setTaskManagerActions(taskManagerActions) |
| .build()); |
| } |
| |
| private void testRecordWriterClosedOnError( |
| FunctionWithException<NettyShuffleEnvironment, Task, Exception> taskProvider) |
| throws Exception { |
| try (NettyShuffleEnvironment shuffleEnvironment = |
| new NettyShuffleEnvironmentBuilder().build()) { |
| Task task = taskProvider.apply(shuffleEnvironment); |
| |
| task.startTaskThread(); |
| task.getExecutingThread().join(); |
| |
| assertEquals(ExecutionState.FAILED, task.getExecutionState()); |
| for (Thread thread : Thread.getAllStackTraces().keySet()) { |
| assertThat( |
| thread.getName(), |
| CoreMatchers.is(not(containsString(DEFAULT_OUTPUT_FLUSH_THREAD_NAME)))); |
| } |
| } |
| } |
| |
| private TestTaskBuilder taskBuilderWithConfiguredRecordWriter( |
| NettyShuffleEnvironment shuffleEnvironment) { |
| Configuration taskConfiguration = new Configuration(); |
| outputEdgeConfiguration(taskConfiguration); |
| |
| ResultPartitionDeploymentDescriptor descriptor = |
| new ResultPartitionDeploymentDescriptor( |
| PartitionDescriptorBuilder.newBuilder().build(), |
| NettyShuffleDescriptorBuilder.newBuilder().buildLocal(), |
| 1, |
| false); |
| return new TestTaskBuilder(shuffleEnvironment) |
| .setInvokable(NoOpStreamTask.class) |
| .setTaskConfig(taskConfiguration) |
| .setResultPartitions(singletonList(descriptor)); |
| } |
| |
| /** |
| * Make sure that there is some output edge in the config so that some RecordWriter is created. |
| */ |
| private void outputEdgeConfiguration(Configuration taskConfiguration) { |
| StreamConfig streamConfig = new StreamConfig(taskConfiguration); |
| streamConfig.setStreamOperatorFactory(new UnusedOperatorFactory()); |
| |
| StreamConfigChainer cfg = |
| new StreamConfigChainer(new OperatorID(42, 42), streamConfig, this, 1); |
| // The OutputFlusher thread is started only if the buffer timeout more than 0(default value |
| // is 0). |
| cfg.setBufferTimeout(1); |
| cfg.chain( |
| new OperatorID(44, 44), |
| new UnusedOperatorFactory(), |
| StringSerializer.INSTANCE, |
| StringSerializer.INSTANCE, |
| false); |
| cfg.finish(); |
| } |
| |
| @Test |
| public void testProcessWithAvailableOutput() throws Exception { |
| try (final MockEnvironment environment = setupEnvironment(true, true)) { |
| final int numberOfProcessCalls = 10; |
| final AvailabilityTestInputProcessor inputProcessor = |
| new AvailabilityTestInputProcessor(numberOfProcessCalls); |
| final StreamTask task = |
| new MockStreamTaskBuilder(environment) |
| .setStreamInputProcessor(inputProcessor) |
| .build(); |
| |
| task.invoke(); |
| assertEquals(numberOfProcessCalls, inputProcessor.currentNumProcessCalls); |
| } |
| } |
| |
| /** |
| * In this weird construct, we are: |
| * |
| * <ul> |
| * <li>1. We start a thread, which will... |
| * <li>2. ... sleep for X ms, and enqueue another mail, that will... |
| * <li>3. ... sleep for Y ms, and make the output available again |
| * </ul> |
| * |
| * <p>2nd step is to check that back pressure or idle counter is at least X. In the last 3rd |
| * step, we test whether this counter was paused for the duration of processing mails. |
| */ |
| private static class WaitingThread extends Thread { |
| private final MailboxExecutor executor; |
| private final RunnableWithException resumeTask; |
| private final long sleepTimeInsideMail; |
| private final long sleepTimeOutsideMail; |
| private final TimerGauge sleepOutsideMailTimer; |
| |
| @Nullable private Exception asyncException; |
| |
| public WaitingThread( |
| MailboxExecutor executor, |
| RunnableWithException resumeTask, |
| long sleepTimeInsideMail, |
| long sleepTimeOutsideMail, |
| TimerGauge sleepOutsideMailTimer) { |
| this.executor = executor; |
| this.resumeTask = resumeTask; |
| this.sleepTimeInsideMail = sleepTimeInsideMail; |
| this.sleepTimeOutsideMail = sleepTimeOutsideMail; |
| this.sleepOutsideMailTimer = sleepOutsideMailTimer; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| // Make sure that the Task thread actually starts measuring the backpressure before |
| // we start the measured sleep. The WaitingThread is started from within the mailbox |
| // so we should first wait until mailbox loop starts idling before we enter the |
| // measured sleep |
| while (!sleepOutsideMailTimer.isMeasuring()) { |
| Thread.sleep(1); |
| } |
| Thread.sleep(sleepTimeOutsideMail); |
| } catch (InterruptedException e) { |
| asyncException = e; |
| } |
| executor.submit( |
| () -> { |
| if (asyncException != null) { |
| throw asyncException; |
| } |
| Thread.sleep(sleepTimeInsideMail); |
| resumeTask.run(); |
| }, |
| "This task will complete the future to resume process input action."); |
| } |
| } |
| |
| @Test |
| public void testProcessWithUnAvailableOutput() throws Exception { |
| final long sleepTimeOutsideMail = 42; |
| final long sleepTimeInsideMail = 44; |
| |
| @Nullable WaitingThread waitingThread = null; |
| try (final MockEnvironment environment = setupEnvironment(true, false)) { |
| final int numberOfProcessCalls = 10; |
| final AvailabilityTestInputProcessor inputProcessor = |
| new AvailabilityTestInputProcessor(numberOfProcessCalls); |
| final StreamTask task = |
| new MockStreamTaskBuilder(environment) |
| .setStreamInputProcessor(inputProcessor) |
| .build(); |
| final MailboxExecutor executor = task.mailboxProcessor.getMainMailboxExecutor(); |
| TaskIOMetricGroup ioMetricGroup = |
| task.getEnvironment().getMetricGroup().getIOMetricGroup(); |
| |
| final RunnableWithException completeFutureTask = |
| () -> { |
| assertEquals(1, inputProcessor.currentNumProcessCalls); |
| assertFalse(task.mailboxProcessor.isDefaultActionAvailable()); |
| environment.getWriter(1).getAvailableFuture().complete(null); |
| }; |
| |
| waitingThread = |
| new WaitingThread( |
| executor, |
| completeFutureTask, |
| sleepTimeInsideMail, |
| sleepTimeOutsideMail, |
| ioMetricGroup.getBackPressuredTimePerSecond()); |
| // Make sure WaitingThread is started after Task starts processing. |
| executor.submit( |
| waitingThread::start, |
| "This task will submit another task to execute after processing input once."); |
| |
| long startTs = System.currentTimeMillis(); |
| |
| task.invoke(); |
| long totalDuration = System.currentTimeMillis() - startTs; |
| assertThat( |
| ioMetricGroup.getBackPressuredTimePerSecond().getCount(), |
| greaterThanOrEqualTo(sleepTimeOutsideMail)); |
| assertThat( |
| ioMetricGroup.getBackPressuredTimePerSecond().getCount(), |
| Matchers.lessThanOrEqualTo(totalDuration - sleepTimeInsideMail)); |
| assertThat(ioMetricGroup.getIdleTimeMsPerSecond().getCount(), is(0L)); |
| assertEquals(numberOfProcessCalls, inputProcessor.currentNumProcessCalls); |
| } finally { |
| if (waitingThread != null) { |
| waitingThread.join(); |
| } |
| } |
| } |
| |
| @Test |
| public void testProcessWithUnAvailableInput() throws Exception { |
| final long sleepTimeOutsideMail = 42; |
| final long sleepTimeInsideMail = 44; |
| final int incomingDataSize = 10_000; |
| |
| @Nullable WaitingThread waitingThread = null; |
| try (final MockEnvironment environment = setupEnvironment(true, true)) { |
| final UnAvailableTestInputProcessor inputProcessor = |
| new UnAvailableTestInputProcessor(); |
| final StreamTask task = |
| new MockStreamTaskBuilder(environment) |
| .setStreamInputProcessor(inputProcessor) |
| .build(); |
| TaskIOMetricGroup ioMetricGroup = |
| task.getEnvironment().getMetricGroup().getIOMetricGroup(); |
| ThroughputCalculator throughputCalculator = environment.getThroughputCalculator(); |
| |
| final MailboxExecutor executor = task.mailboxProcessor.getMainMailboxExecutor(); |
| final RunnableWithException completeFutureTask = |
| () -> { |
| inputProcessor |
| .availabilityProvider |
| .getUnavailableToResetAvailable() |
| .complete(null); |
| }; |
| |
| waitingThread = |
| new WaitingThread( |
| executor, |
| completeFutureTask, |
| sleepTimeInsideMail, |
| sleepTimeOutsideMail, |
| ioMetricGroup.getIdleTimeMsPerSecond()); |
| // Make sure WaitingThread is started after Task starts processing. |
| executor.submit( |
| waitingThread::start, |
| "Start WaitingThread after Task starts processing input."); |
| |
| SystemClock clock = SystemClock.getInstance(); |
| |
| long startTs = clock.absoluteTimeMillis(); |
| throughputCalculator.incomingDataSize(incomingDataSize); |
| task.invoke(); |
| long resultThroughput = throughputCalculator.calculateThroughput(); |
| long totalDuration = clock.absoluteTimeMillis() - startTs; |
| |
| assertThat( |
| resultThroughput, |
| greaterThanOrEqualTo( |
| incomingDataSize * 1000 / (totalDuration - sleepTimeOutsideMail))); |
| |
| assertThat( |
| ioMetricGroup.getIdleTimeMsPerSecond().getCount(), |
| greaterThanOrEqualTo(sleepTimeOutsideMail)); |
| assertThat( |
| ioMetricGroup.getIdleTimeMsPerSecond().getCount(), |
| Matchers.lessThanOrEqualTo(totalDuration - sleepTimeInsideMail)); |
| assertThat(ioMetricGroup.getBackPressuredTimePerSecond().getCount(), is(0L)); |
| } finally { |
| if (waitingThread != null) { |
| waitingThread.join(); |
| } |
| } |
| } |
| |
| @Test |
| public void testRestorePerformedOnlyOnce() throws Exception { |
| // given: the operator with empty snapshot result (all state handles are null) |
| OneInputStreamOperator<String, String> statelessOperator = |
| streamOperatorWithSnapshot(new OperatorSnapshotFutures()); |
| DummyEnvironment dummyEnvironment = new DummyEnvironment(); |
| |
| // when: Invoke the restore explicitly before launching the task. |
| RunningTask<MockStreamTask> task = |
| runTask( |
| () -> { |
| MockStreamTask mockStreamTask = |
| createMockStreamTask( |
| dummyEnvironment, operatorChain(statelessOperator)); |
| |
| mockStreamTask.restore(); |
| |
| return mockStreamTask; |
| }); |
| waitTaskIsRunning(task.streamTask, task.invocationFuture); |
| |
| task.streamTask.cancel(); |
| |
| // then: 'restore' was called only once. |
| assertThat(task.streamTask.restoreInvocationCount, is(1)); |
| } |
| |
| @Test |
| public void testRestorePerformedFromInvoke() throws Exception { |
| // given: the operator with empty snapshot result (all state handles are null) |
| OneInputStreamOperator<String, String> statelessOperator = |
| streamOperatorWithSnapshot(new OperatorSnapshotFutures()); |
| DummyEnvironment dummyEnvironment = new DummyEnvironment(); |
| |
| // when: Launch the task. |
| RunningTask<MockStreamTask> task = |
| runTask( |
| () -> |
| createMockStreamTask( |
| dummyEnvironment, operatorChain(statelessOperator))); |
| |
| waitTaskIsRunning(task.streamTask, task.invocationFuture); |
| |
| task.streamTask.cancel(); |
| |
| // then: 'restore' was called even without explicit 'restore' invocation. |
| assertThat(task.streamTask.restoreInvocationCount, is(1)); |
| } |
| |
| @Test |
| public void testQuiesceOfMailboxRightBeforeSubmittingActionViaTimerService() throws Exception { |
| // given: the stream task with configured handle async exception. |
| AtomicBoolean submitThroughputFail = new AtomicBoolean(); |
| MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build(); |
| |
| final UnAvailableTestInputProcessor inputProcessor = new UnAvailableTestInputProcessor(); |
| RunningTask<StreamTask<?, ?>> task = |
| runTask( |
| () -> |
| new MockStreamTaskBuilder(mockEnvironment) |
| .setHandleAsyncException( |
| (str, t) -> submitThroughputFail.set(true)) |
| .setStreamInputProcessor(inputProcessor) |
| .build()); |
| |
| waitTaskIsRunning(task.streamTask, task.invocationFuture); |
| |
| TimerService timerService = task.streamTask.systemTimerService; |
| MailboxExecutor mainMailboxExecutor = |
| task.streamTask.mailboxProcessor.getMainMailboxExecutor(); |
| |
| CountDownLatch stoppingMailboxLatch = new CountDownLatch(1); |
| timerService.registerTimer( |
| timerService.getCurrentProcessingTime(), |
| (time) -> { |
| stoppingMailboxLatch.await(); |
| // The time to the start 'afterInvoke' inside of mailbox. |
| // 'afterInvoke' won't finish until this execution won't finish so it is |
| // impossible to wait on latch or something else. |
| Thread.sleep(5); |
| mainMailboxExecutor.submit(() -> {}, "test"); |
| }); |
| |
| // when: Calling the quiesce for mailbox and finishing the timer service. |
| mainMailboxExecutor |
| .submit( |
| () -> { |
| stoppingMailboxLatch.countDown(); |
| task.streamTask.afterInvoke(); |
| }, |
| "test") |
| .get(); |
| |
| // then: the exception handle wasn't invoked because the such situation is expected. |
| assertFalse(submitThroughputFail.get()); |
| |
| // Correctly shutdown the stream task to avoid hanging. |
| inputProcessor.availabilityProvider.getUnavailableToResetAvailable().complete(null); |
| } |
| |
| @Test |
| public void testTaskAvoidHangingAfterSnapshotStateThrownException() throws Exception { |
| // given: Configured SourceStreamTask with source which fails on checkpoint. |
| Configuration taskManagerConfig = new Configuration(); |
| taskManagerConfig.setString(STATE_BACKEND, TestMemoryStateBackendFactory.class.getName()); |
| |
| StreamConfig cfg = new StreamConfig(new Configuration()); |
| cfg.setStateKeySerializer(mock(TypeSerializer.class)); |
| cfg.setOperatorID(new OperatorID(4712L, 43L)); |
| |
| FailedSource failedSource = new FailedSource(); |
| cfg.setStreamOperator(new TestStreamSource<String, FailedSource>(failedSource)); |
| cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); |
| |
| try (NettyShuffleEnvironment shuffleEnvironment = |
| new NettyShuffleEnvironmentBuilder().build()) { |
| Task task = |
| createTask(SourceStreamTask.class, shuffleEnvironment, cfg, taskManagerConfig); |
| |
| // when: Task starts |
| task.startTaskThread(); |
| |
| // wait for the task starts doing the work. |
| failedSource.awaitRunning(); |
| |
| // and: Checkpoint is triggered which should lead to exception in Source. |
| task.triggerCheckpointBarrier( |
| 42L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()); |
| |
| // wait for clean termination. |
| task.getExecutingThread().join(); |
| |
| // then: The task doesn't hang but finished with FAILED state. |
| assertEquals(ExecutionState.FAILED, task.getExecutionState()); |
| } |
| } |
| |
| /** |
| * This test checks the fact that throughput calculation is started automatically(just to be |
| * sure that the scheduler is configured). |
| */ |
| @Test |
| public void testThroughputSchedulerStartsOnInvoke() throws Exception { |
| CompletableFuture<?> finishFuture = new CompletableFuture<>(); |
| try (StreamTaskMailboxTestHarness<String> harness = |
| new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) |
| .modifyStreamConfig( |
| config -> |
| config.getConfiguration() |
| .set(BUFFER_DEBLOAT_PERIOD, Duration.ofMillis(1))) |
| .addInput(STRING_TYPE_INFO) |
| .setupOutputForSingletonOperatorChain( |
| new TestBoundedOneInputStreamOperator()) |
| .setThroughputCalculator( |
| new ThroughputCalculator(SystemClock.getInstance(), 10) { |
| @Override |
| public long calculateThroughput() { |
| finishFuture.complete(null); |
| return super.calculateThroughput(); |
| } |
| }) |
| .build()) { |
| finishFuture.thenApply( |
| (value) -> { |
| harness.endInput(); |
| return value; |
| }); |
| harness.streamTask.invoke(); |
| } |
| } |
| |
| @Test |
| public void testSkipRepeatCheckpointComplete() throws Exception { |
| try (StreamTaskMailboxTestHarness<String> testHarness = |
| new StreamTaskMailboxTestHarnessBuilder<>( |
| OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) |
| .addInput(BasicTypeInfo.STRING_TYPE_INFO, 3) |
| .modifyStreamConfig( |
| config -> { |
| config.setCheckpointingEnabled(true); |
| config.getConfiguration() |
| .set( |
| ExecutionCheckpointingOptions |
| .ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, |
| true); |
| }) |
| .setupOutputForSingletonOperatorChain( |
| new CheckpointCompleteRecordOperator()) |
| .build()) { |
| testHarness.streamTask.notifyCheckpointCompleteAsync(3); |
| testHarness.streamTask.notifyCheckpointAbortAsync(5, 3); |
| |
| testHarness.streamTask.notifyCheckpointAbortAsync(10, 8); |
| testHarness.streamTask.notifyCheckpointCompleteAsync(8); |
| |
| testHarness.processAll(); |
| |
| CheckpointCompleteRecordOperator operator = |
| (CheckpointCompleteRecordOperator) |
| (AbstractStreamOperator<?>) testHarness.streamTask.getMainOperator(); |
| assertEquals(Arrays.asList(3L, 8L), operator.getNotifiedCheckpoint()); |
| } |
| } |
| |
| @Test |
| public void testIgnoreCompleteCheckpointBeforeStartup() throws Exception { |
| try (StreamTaskMailboxTestHarness<String> testHarness = |
| new StreamTaskMailboxTestHarnessBuilder<>( |
| OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) |
| .addInput(BasicTypeInfo.STRING_TYPE_INFO, 3) |
| .setTaskStateSnapshot(3, new TaskStateSnapshot()) |
| .modifyStreamConfig( |
| config -> { |
| config.setCheckpointingEnabled(true); |
| config.getConfiguration() |
| .set( |
| ExecutionCheckpointingOptions |
| .ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, |
| true); |
| }) |
| .setupOutputForSingletonOperatorChain( |
| new CheckpointCompleteRecordOperator()) |
| .build()) { |
| testHarness.streamTask.notifyCheckpointCompleteAsync(2); |
| testHarness.streamTask.notifyCheckpointAbortAsync(4, 3); |
| testHarness.streamTask.notifyCheckpointCompleteAsync(5); |
| testHarness.streamTask.notifyCheckpointAbortAsync(7, 6); |
| |
| testHarness.processAll(); |
| |
| CheckpointCompleteRecordOperator operator = |
| (CheckpointCompleteRecordOperator) |
| (AbstractStreamOperator<?>) testHarness.streamTask.getMainOperator(); |
| assertEquals(Arrays.asList(5L, 6L), operator.getNotifiedCheckpoint()); |
| } |
| } |
| |
| @Test |
| public void testBufferSizeRecalculationStartSuccessfully() throws Exception { |
| int expectedThroughput = 13333; |
| int inputChannels = 3; |
| |
| // debloat period doesn't matter, we will schedule debloating manually |
| Configuration config = |
| new Configuration() |
| .set(BUFFER_DEBLOAT_PERIOD, Duration.ofHours(10)) |
| .set(BUFFER_DEBLOAT_TARGET, Duration.ofSeconds(1)) |
| .set(BUFFER_DEBLOAT_ENABLED, true); |
| |
| Map<String, Metric> metrics = new ConcurrentHashMap<>(); |
| final TaskMetricGroup taskMetricGroup = |
| StreamTaskTestHarness.createTaskMetricGroup(metrics); |
| |
| try (StreamTaskMailboxTestHarness<String> harness = |
| new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) |
| .setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(config)) |
| .setTaskMetricGroup(taskMetricGroup) |
| .addInput(STRING_TYPE_INFO, inputChannels) |
| .setupOutputForSingletonOperatorChain( |
| new TestBoundedOneInputStreamOperator()) |
| .setThroughputCalculator( |
| new ThroughputCalculator(SystemClock.getInstance(), 10) { |
| @Override |
| public long calculateThroughput() { |
| return expectedThroughput; |
| } |
| }) |
| .build()) { |
| harness.processAll(); |
| harness.streamTask.debloat(); |
| |
| int expectedBufferSize = expectedThroughput / inputChannels; |
| for (InputGate inputGate : harness.streamTask.getEnvironment().getAllInputGates()) { |
| for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) { |
| assertThat( |
| ((TestInputChannel) inputGate.getChannel(i)).getCurrentBufferSize(), |
| is(expectedBufferSize)); |
| } |
| } |
| assertThat( |
| ((Gauge<Integer>) metrics.get(MetricNames.DEBLOATED_BUFFER_SIZE)).getValue(), |
| is(expectedBufferSize)); |
| assertThat( |
| ((Gauge<Long>) metrics.get(MetricNames.ESTIMATED_TIME_TO_CONSUME_BUFFERS)) |
| .getValue(), |
| is(999L)); |
| } |
| } |
| |
| private MockEnvironment setupEnvironment(boolean... outputAvailabilities) { |
| final Configuration configuration = new Configuration(); |
| new MockStreamConfig(configuration, outputAvailabilities.length); |
| |
| final List<ResultPartitionWriter> writers = new ArrayList<>(outputAvailabilities.length); |
| for (int i = 0; i < outputAvailabilities.length; i++) { |
| writers.add(new AvailabilityTestResultPartitionWriter(outputAvailabilities[i])); |
| } |
| |
| final MockEnvironment environment = |
| new MockEnvironmentBuilder().setTaskConfiguration(configuration).build(); |
| environment.addOutputs(writers); |
| return environment; |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Test Utilities |
| // ------------------------------------------------------------------------ |
| |
| private static <T> OneInputStreamOperator<T, T> streamOperatorWithSnapshot( |
| OperatorSnapshotFutures operatorSnapshotResult) throws Exception { |
| @SuppressWarnings("unchecked") |
| OneInputStreamOperator<T, T> operator = mock(OneInputStreamOperator.class); |
| when(operator.getOperatorID()).thenReturn(new OperatorID()); |
| |
| when(operator.snapshotState( |
| anyLong(), |
| anyLong(), |
| any(CheckpointOptions.class), |
| any(CheckpointStreamFactory.class))) |
| .thenReturn(operatorSnapshotResult); |
| |
| return operator; |
| } |
| |
| private static <T> OneInputStreamOperator<T, T> streamOperatorWithSnapshotException( |
| Exception exception) throws Exception { |
| @SuppressWarnings("unchecked") |
| OneInputStreamOperator<T, T> operator = mock(OneInputStreamOperator.class); |
| when(operator.getOperatorID()).thenReturn(new OperatorID()); |
| |
| when(operator.snapshotState( |
| anyLong(), |
| anyLong(), |
| any(CheckpointOptions.class), |
| any(CheckpointStreamFactory.class))) |
| .thenThrow(exception); |
| |
| return operator; |
| } |
| |
| private static <T> OperatorChain<T, AbstractStreamOperator<T>> operatorChain( |
| OneInputStreamOperator<T, T>... streamOperators) throws Exception { |
| return OperatorChainTest.setupOperatorChain(streamOperators); |
| } |
| |
| private static class RunningTask<T extends StreamTask<?, ?>> { |
| final T streamTask; |
| final CompletableFuture<Void> invocationFuture; |
| |
| RunningTask(T streamTask, CompletableFuture<Void> invocationFuture) { |
| this.streamTask = streamTask; |
| this.invocationFuture = invocationFuture; |
| } |
| |
| void waitForTaskCompletion(boolean cancelled) throws Exception { |
| try { |
| invocationFuture.get(); |
| } catch (Exception e) { |
| if (cancelled) { |
| assertThat(e.getCause(), is(instanceOf(CancelTaskException.class))); |
| } else { |
| throw e; |
| } |
| } |
| assertThat(streamTask.isCanceled(), is(cancelled)); |
| } |
| } |
| |
| private static <T extends StreamTask<?, ?>> RunningTask<T> runTask( |
| SupplierWithException<T, Exception> taskFactory) throws Exception { |
| CompletableFuture<T> taskCreationFuture = new CompletableFuture<>(); |
| CompletableFuture<Void> invocationFuture = |
| CompletableFuture.runAsync( |
| () -> { |
| T task; |
| try { |
| task = taskFactory.get(); |
| taskCreationFuture.complete(task); |
| } catch (Exception e) { |
| taskCreationFuture.completeExceptionally(e); |
| return; |
| } |
| try { |
| task.invoke(); |
| } catch (RuntimeException e) { |
| throw e; |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }, |
| Executors.newSingleThreadExecutor()); |
| |
| // Wait until task is created. |
| return new RunningTask<>(taskCreationFuture.get(), invocationFuture); |
| } |
| |
| /** |
| * Operator that does nothing. |
| * |
| * @param <T> |
| * @param <OP> |
| */ |
| public static class NoOpStreamTask<T, OP extends StreamOperator<T>> extends StreamTask<T, OP> { |
| |
| public NoOpStreamTask(Environment environment) throws Exception { |
| super(environment); |
| } |
| |
| @Override |
| protected void init() throws Exception { |
| inputProcessor = new EmptyInputProcessor(); |
| } |
| |
| @Override |
| protected void cleanUpInternal() throws Exception {} |
| } |
| |
| /** |
| * A stream input processor implementation used to control the returned input status based on |
| * the total number of processing calls. |
| */ |
| private static class AvailabilityTestInputProcessor implements StreamInputProcessor { |
| private final int totalProcessCalls; |
| private int currentNumProcessCalls; |
| |
| AvailabilityTestInputProcessor(int totalProcessCalls) { |
| this.totalProcessCalls = totalProcessCalls; |
| } |
| |
| @Override |
| public DataInputStatus processInput() { |
| return ++currentNumProcessCalls < totalProcessCalls |
| ? DataInputStatus.MORE_AVAILABLE |
| : DataInputStatus.END_OF_INPUT; |
| } |
| |
| @Override |
| public CompletableFuture<Void> prepareSnapshot( |
| ChannelStateWriter channelStateWriter, final long checkpointId) |
| throws CheckpointException { |
| return FutureUtils.completedVoidFuture(); |
| } |
| |
| @Override |
| public void close() throws IOException {} |
| |
| @Override |
| public CompletableFuture<?> getAvailableFuture() { |
| return AVAILABLE; |
| } |
| } |
| |
| /** |
| * A stream input processor implementation with input unavailable for a specified amount of |
| * time, after which processor is closing. |
| */ |
| private static class UnAvailableTestInputProcessor implements StreamInputProcessor { |
| private final AvailabilityHelper availabilityProvider = new AvailabilityHelper(); |
| |
| @Override |
| public DataInputStatus processInput() { |
| return availabilityProvider.isAvailable() |
| ? DataInputStatus.END_OF_INPUT |
| : DataInputStatus.NOTHING_AVAILABLE; |
| } |
| |
| @Override |
| public CompletableFuture<Void> prepareSnapshot( |
| ChannelStateWriter channelStateWriter, final long checkpointId) |
| throws CheckpointException { |
| return FutureUtils.completedVoidFuture(); |
| } |
| |
| @Override |
| public void close() throws IOException {} |
| |
| @Override |
| public CompletableFuture<?> getAvailableFuture() { |
| return availabilityProvider.getAvailableFuture(); |
| } |
| } |
| |
| private static class BlockingFinishStreamOperator extends AbstractStreamOperator<Void> { |
| private static final long serialVersionUID = -9042150529568008847L; |
| |
| private static volatile OneShotLatch inFinish; |
| private static volatile OneShotLatch finishClose; |
| |
| @Override |
| public void finish() throws Exception { |
| checkLatches(); |
| inFinish.trigger(); |
| finishClose.await(); |
| super.close(); |
| } |
| |
| private void checkLatches() { |
| Preconditions.checkNotNull(inFinish); |
| Preconditions.checkNotNull(finishClose); |
| } |
| |
| private static void resetLatches() { |
| inFinish = new OneShotLatch(); |
| finishClose = new OneShotLatch(); |
| } |
| } |
| |
| public static Task createTask( |
| Class<? extends TaskInvokable> invokable, |
| ShuffleEnvironment shuffleEnvironment, |
| StreamConfig taskConfig, |
| Configuration taskManagerConfig) |
| throws Exception { |
| |
| return new TestTaskBuilder(shuffleEnvironment) |
| .setTaskManagerConfig(taskManagerConfig) |
| .setInvokable(invokable) |
| .setTaskConfig(taskConfig.getConfiguration()) |
| .build(); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Test operators |
| // ------------------------------------------------------------------------ |
| |
| private static class SlowlyDeserializingOperator |
| extends StreamSource<Long, SourceFunction<Long>> { |
| private static final long serialVersionUID = 1L; |
| |
| private volatile boolean canceled = false; |
| |
| public SlowlyDeserializingOperator() { |
| super(new MockSourceFunction()); |
| } |
| |
| @Override |
| public void run( |
| Object lockingObject, |
| Output<StreamRecord<Long>> collector, |
| OperatorChain<?, ?> operatorChain) |
| throws Exception { |
| while (!canceled) { |
| try { |
| Thread.sleep(500); |
| } catch (InterruptedException ignored) { |
| } |
| } |
| } |
| |
| @Override |
| public void cancel() { |
| canceled = true; |
| } |
| |
| // slow deserialization |
| private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { |
| in.defaultReadObject(); |
| |
| long delay = 500; |
| long deadline = System.currentTimeMillis() + delay; |
| do { |
| try { |
| Thread.sleep(delay); |
| } catch (InterruptedException ignored) { |
| } |
| } while ((delay = deadline - System.currentTimeMillis()) > 0); |
| } |
| } |
| |
| private static class MockSourceFunction implements SourceFunction<Long> { |
| private static final long serialVersionUID = 1L; |
| |
| @Override |
| public void run(SourceContext<Long> ctx) {} |
| |
| @Override |
| public void cancel() {} |
| } |
| |
| /** |
| * Mocked state backend factory which returns mocks for the operator and keyed state backends. |
| */ |
| public static final class TestMemoryStateBackendFactory |
| implements StateBackendFactory<AbstractStateBackend> { |
| private static final long serialVersionUID = 1L; |
| |
| @Override |
| public AbstractStateBackend createFromConfig( |
| ReadableConfig config, ClassLoader classLoader) { |
| return new TestSpyWrapperStateBackend(createInnerBackend(config)); |
| } |
| |
| protected MemoryStateBackend createInnerBackend(ReadableConfig config) { |
| return new MemoryStateBackend(); |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| // ------------------------------------------------------------------------ |
| |
| private static class MockStreamTask extends StreamTask<String, AbstractStreamOperator<String>> { |
| |
| private final OperatorChain<String, AbstractStreamOperator<String>> overrideOperatorChain; |
| private int restoreInvocationCount = 0; |
| |
| MockStreamTask( |
| Environment env, |
| OperatorChain<String, AbstractStreamOperator<String>> operatorChain, |
| Thread.UncaughtExceptionHandler uncaughtExceptionHandler) |
| throws Exception { |
| super(env, null, uncaughtExceptionHandler); |
| this.overrideOperatorChain = operatorChain; |
| } |
| |
| @Override |
| public void restoreInternal() throws Exception { |
| super.restoreInternal(); |
| restoreInvocationCount++; |
| } |
| |
| @Override |
| protected void init() { |
| // The StreamTask initializes operatorChain first on it's own in `invoke()` method. |
| // Later it calls the `init()` method before actual `run()`, so we are overriding the |
| // operatorChain |
| // here for test purposes. |
| super.operatorChain = this.overrideOperatorChain; |
| super.mainOperator = super.operatorChain.getMainOperator(); |
| super.inputProcessor = new EmptyInputProcessor(false); |
| } |
| |
| void finishInput() { |
| checkState( |
| inputProcessor != null, |
| "Tried to finishInput before MockStreamTask was started"); |
| ((EmptyInputProcessor) inputProcessor).finishInput(); |
| } |
| } |
| |
| private static class EmptyInputProcessor implements StreamInputProcessor { |
| private volatile boolean isFinished; |
| |
| public EmptyInputProcessor() { |
| this(true); |
| } |
| |
| public EmptyInputProcessor(boolean startFinished) { |
| isFinished = startFinished; |
| } |
| |
| @Override |
| public DataInputStatus processInput() throws Exception { |
| return isFinished ? DataInputStatus.END_OF_INPUT : DataInputStatus.NOTHING_AVAILABLE; |
| } |
| |
| @Override |
| public CompletableFuture<Void> prepareSnapshot( |
| ChannelStateWriter channelStateWriter, long checkpointId) |
| throws CheckpointException { |
| return FutureUtils.completedVoidFuture(); |
| } |
| |
| @Override |
| public void close() throws IOException {} |
| |
| @Override |
| public CompletableFuture<?> getAvailableFuture() { |
| return AVAILABLE; |
| } |
| |
| public void finishInput() { |
| isFinished = true; |
| } |
| } |
| |
| private static MockStreamTask createMockStreamTask( |
| Environment env, OperatorChain<String, AbstractStreamOperator<String>> operatorChain) |
| throws Exception { |
| return new MockStreamTask(env, operatorChain, FatalExitExceptionHandler.INSTANCE); |
| } |
| |
| /** |
| * Source that instantiates the operator state backend and the keyed state backend. The created |
| * state backends can be retrieved from the static fields to check if the CloseableRegistry |
| * closed them correctly. |
| */ |
| public static class StateBackendTestSource |
| extends StreamTask<Long, StreamSource<Long, SourceFunction<Long>>> { |
| |
| private static volatile boolean fail; |
| |
| public StateBackendTestSource(Environment env) throws Exception { |
| super(env); |
| } |
| |
| @Override |
| protected void init() throws Exception {} |
| |
| @Override |
| protected void processInput(MailboxDefaultAction.Controller controller) throws Exception { |
| if (fail) { |
| throw new RuntimeException(); |
| } |
| controller.suspendDefaultAction(); |
| mailboxProcessor.suspend(); |
| } |
| |
| @Override |
| protected void cleanUpInternal() throws Exception {} |
| |
| @Override |
| public StreamTaskStateInitializer createStreamTaskStateInitializer() { |
| final StreamTaskStateInitializer streamTaskStateManager = |
| super.createStreamTaskStateInitializer(); |
| return (operatorID, |
| operatorClassName, |
| processingTimeService, |
| keyContext, |
| keySerializer, |
| closeableRegistry, |
| metricGroup, |
| fraction, |
| isUsingCustomRawKeyedState) -> { |
| final StreamOperatorStateContext controller = |
| streamTaskStateManager.streamOperatorStateContext( |
| operatorID, |
| operatorClassName, |
| processingTimeService, |
| keyContext, |
| keySerializer, |
| closeableRegistry, |
| metricGroup, |
| fraction, |
| isUsingCustomRawKeyedState); |
| |
| return new StreamOperatorStateContext() { |
| @Override |
| public boolean isRestored() { |
| return controller.isRestored(); |
| } |
| |
| @Override |
| public OptionalLong getRestoredCheckpointId() { |
| return controller.getRestoredCheckpointId(); |
| } |
| |
| @Override |
| public OperatorStateBackend operatorStateBackend() { |
| return controller.operatorStateBackend(); |
| } |
| |
| @Override |
| public CheckpointableKeyedStateBackend<?> keyedStateBackend() { |
| return controller.keyedStateBackend(); |
| } |
| |
| @Override |
| public InternalTimeServiceManager<?> internalTimerServiceManager() { |
| InternalTimeServiceManager<?> timeServiceManager = |
| controller.internalTimerServiceManager(); |
| return timeServiceManager != null ? spy(timeServiceManager) : null; |
| } |
| |
| @Override |
| public CloseableIterable<StatePartitionStreamProvider> |
| rawOperatorStateInputs() { |
| return replaceWithSpy(controller.rawOperatorStateInputs()); |
| } |
| |
| @Override |
| public CloseableIterable<KeyGroupStatePartitionStreamProvider> |
| rawKeyedStateInputs() { |
| return replaceWithSpy(controller.rawKeyedStateInputs()); |
| } |
| |
| public <T extends Closeable> T replaceWithSpy(T closeable) { |
| T spyCloseable = spy(closeable); |
| if (closeableRegistry.unregisterCloseable(closeable)) { |
| try { |
| closeableRegistry.registerCloseable(spyCloseable); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| return spyCloseable; |
| } |
| }; |
| }; |
| } |
| } |
| |
| private static class ThreadInspectingTask |
| extends StreamTask<String, AbstractStreamOperator<String>> { |
| |
| private final long taskThreadId; |
| private final ClassLoader taskClassLoader; |
| |
| /** Flag to wait until time trigger has been called. */ |
| private transient boolean hasTimerTriggered; |
| |
| ThreadInspectingTask(Environment env) throws Exception { |
| super(env); |
| Thread currentThread = Thread.currentThread(); |
| taskThreadId = currentThread.getId(); |
| taskClassLoader = currentThread.getContextClassLoader(); |
| } |
| |
| @Nullable |
| ClassLoader getTaskClassLoader() { |
| return taskClassLoader; |
| } |
| |
| @Override |
| protected void init() throws Exception { |
| checkTaskThreadInfo(); |
| |
| // Create a time trigger to validate that it would also be invoked in the task's thread. |
| getMainOperator() |
| .getProcessingTimeService() |
| .registerTimer( |
| 0, |
| new ProcessingTimeCallback() { |
| @Override |
| public void onProcessingTime(long timestamp) throws Exception { |
| checkTaskThreadInfo(); |
| hasTimerTriggered = true; |
| } |
| }); |
| } |
| |
| @Override |
| protected void processInput(MailboxDefaultAction.Controller controller) throws Exception { |
| checkTaskThreadInfo(); |
| if (hasTimerTriggered) { |
| controller.suspendDefaultAction(); |
| mailboxProcessor.suspend(); |
| } |
| } |
| |
| @Override |
| protected void cleanUpInternal() throws Exception { |
| checkTaskThreadInfo(); |
| } |
| |
| private void checkTaskThreadInfo() { |
| Thread currentThread = Thread.currentThread(); |
| checkState( |
| taskThreadId == currentThread.getId(), |
| "Task's method was called in non task thread."); |
| checkState( |
| taskClassLoader == currentThread.getContextClassLoader(), |
| "Task's controller class loader has been changed during invocation."); |
| } |
| } |
| |
| /** |
| * A {@link ClassLoader} that delegates everything to {@link |
| * ClassLoader#getSystemClassLoader()}. |
| */ |
| private static class TestUserCodeClassLoader extends ClassLoader { |
| public TestUserCodeClassLoader() { |
| super(ClassLoader.getSystemClassLoader()); |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| // ------------------------------------------------------------------------ |
| |
| static class TestStreamSource<OUT, SRC extends SourceFunction<OUT>> |
| extends StreamSource<OUT, SRC> { |
| |
| static AbstractKeyedStateBackend<?> keyedStateBackend; |
| static OperatorStateBackend operatorStateBackend; |
| static CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs; |
| static CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs; |
| |
| public TestStreamSource(SRC sourceFunction) { |
| super(sourceFunction); |
| } |
| |
| @Override |
| public void initializeState(StateInitializationContext controller) throws Exception { |
| keyedStateBackend = (AbstractKeyedStateBackend<?>) getKeyedStateBackend(); |
| operatorStateBackend = getOperatorStateBackend(); |
| rawOperatorStateInputs = |
| (CloseableIterable<StatePartitionStreamProvider>) |
| controller.getRawOperatorStateInputs(); |
| rawKeyedStateInputs = |
| (CloseableIterable<KeyGroupStatePartitionStreamProvider>) |
| controller.getRawKeyedStateInputs(); |
| super.initializeState(controller); |
| } |
| } |
| |
| private static class TestingKeyedStateHandle implements KeyedStateHandle { |
| |
| private static final long serialVersionUID = -2473861305282291582L; |
| |
| private final transient CompletableFuture<Void> discardFuture = new CompletableFuture<>(); |
| |
| public CompletableFuture<Void> getDiscardFuture() { |
| return discardFuture; |
| } |
| |
| @Override |
| public KeyGroupRange getKeyGroupRange() { |
| return KeyGroupRange.EMPTY_KEY_GROUP_RANGE; |
| } |
| |
| @Override |
| public TestingKeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { |
| return this; |
| } |
| |
| @Override |
| public void registerSharedStates(SharedStateRegistry stateRegistry) {} |
| |
| @Override |
| public void discardState() { |
| discardFuture.complete(null); |
| } |
| |
| @Override |
| public long getStateSize() { |
| return 0L; |
| } |
| } |
| |
| private static class TestingOperatorStateHandle implements OperatorStateHandle { |
| |
| private static final long serialVersionUID = 923794934187614088L; |
| |
| private final transient CompletableFuture<Void> discardFuture = new CompletableFuture<>(); |
| |
| public CompletableFuture<Void> getDiscardFuture() { |
| return discardFuture; |
| } |
| |
| @Override |
| public Map<String, StateMetaInfo> getStateNameToPartitionOffsets() { |
| return Collections.emptyMap(); |
| } |
| |
| @Override |
| public FSDataInputStream openInputStream() throws IOException { |
| throw new IOException("Cannot open input streams in testing implementation."); |
| } |
| |
| @Override |
| public Optional<byte[]> asBytesIfInMemory() { |
| return Optional.empty(); |
| } |
| |
| @Override |
| public StreamStateHandle getDelegateStateHandle() { |
| throw new UnsupportedOperationException("Not implemented."); |
| } |
| |
| @Override |
| public void discardState() throws Exception { |
| discardFuture.complete(null); |
| } |
| |
| @Override |
| public long getStateSize() { |
| return 0L; |
| } |
| } |
| |
| private static class AcknowledgeDummyEnvironment extends DummyEnvironment { |
| |
| private final CompletableFuture<Long> acknowledgeCheckpointFuture = |
| new CompletableFuture<>(); |
| |
| public CompletableFuture<Long> getAcknowledgeCheckpointFuture() { |
| return acknowledgeCheckpointFuture; |
| } |
| |
| @Override |
| public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) { |
| acknowledgeCheckpointFuture.complete(checkpointId); |
| } |
| |
| @Override |
| public void acknowledgeCheckpoint( |
| long checkpointId, |
| CheckpointMetrics checkpointMetrics, |
| TaskStateSnapshot subtaskState) { |
| acknowledgeCheckpointFuture.complete(checkpointId); |
| } |
| } |
| |
| private static final class BlockingRunnableFuture<V> implements RunnableFuture<V> { |
| |
| private final CompletableFuture<V> future = new CompletableFuture<>(); |
| |
| private final OneShotLatch signalRunLatch = new OneShotLatch(); |
| |
| private final CountDownLatch continueRunLatch; |
| |
| private final V value; |
| |
| private BlockingRunnableFuture(int parties, V value) { |
| this.continueRunLatch = new CountDownLatch(parties); |
| this.value = value; |
| } |
| |
| @Override |
| public void run() { |
| signalRunLatch.trigger(); |
| continueRunLatch.countDown(); |
| |
| try { |
| // poor man's barrier because it can happen that the async operations thread gets |
| // interrupted by the mail box thread. The CyclicBarrier would in this case fail |
| // all participants of the barrier, leaving the future uncompleted |
| continueRunLatch.await(); |
| } catch (InterruptedException e) { |
| ExceptionUtils.rethrow(e); |
| } |
| |
| future.complete(value); |
| } |
| |
| @Override |
| public boolean cancel(boolean mayInterruptIfRunning) { |
| return false; |
| } |
| |
| @Override |
| public boolean isCancelled() { |
| return future.isCancelled(); |
| } |
| |
| @Override |
| public boolean isDone() { |
| return future.isDone(); |
| } |
| |
| @Override |
| public V get() throws InterruptedException, ExecutionException { |
| return future.get(); |
| } |
| |
| @Override |
| public V get(long timeout, TimeUnit unit) |
| throws InterruptedException, ExecutionException, TimeoutException { |
| return future.get(timeout, unit); |
| } |
| |
| void awaitRun() throws InterruptedException { |
| signalRunLatch.await(); |
| } |
| } |
| |
| private static class FailingDummyEnvironment extends DummyEnvironment { |
| |
| final RuntimeException failingCause; |
| |
| private FailingDummyEnvironment(RuntimeException failingCause) { |
| this.failingCause = failingCause; |
| } |
| |
| @Override |
| public void declineCheckpoint(long checkpointId, CheckpointException cause) { |
| throw failingCause; |
| } |
| |
| @Override |
| public void failExternally(Throwable cause) { |
| throw failingCause; |
| } |
| } |
| |
| private static class UnusedOperatorFactory extends AbstractStreamOperatorFactory<String> { |
| @Override |
| public <T extends StreamOperator<String>> T createStreamOperator( |
| StreamOperatorParameters<String> parameters) { |
| throw new UnsupportedOperationException("This shouldn't be called"); |
| } |
| |
| @Override |
| public void setChainingStrategy(ChainingStrategy strategy) {} |
| |
| @Override |
| public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) { |
| throw new UnsupportedOperationException(); |
| } |
| } |
| |
| private static class ClosingOperator<T> extends AbstractStreamOperator<T> |
| implements OneInputStreamOperator<T, T> { |
| static AtomicBoolean closed = new AtomicBoolean(); |
| static AtomicInteger notified = new AtomicInteger(); |
| |
| @Override |
| public void open() throws Exception { |
| super.open(); |
| closed.set(false); |
| notified.set(0); |
| } |
| |
| @Override |
| public void close() throws Exception { |
| closed.set(true); |
| super.close(); |
| } |
| |
| @Override |
| public void notifyCheckpointComplete(long checkpointId) throws Exception { |
| super.notifyCheckpointComplete(checkpointId); |
| notified.incrementAndGet(); |
| } |
| |
| @Override |
| public void processElement(StreamRecord<T> element) throws Exception {} |
| } |
| |
| private static class FailOnNotifyCheckpointMapper<T> |
| implements MapFunction<T, T>, CheckpointListener { |
| private static final long serialVersionUID = 1L; |
| |
| @Override |
| public T map(T value) throws Exception { |
| return value; |
| } |
| |
| @Override |
| public void notifyCheckpointAborted(long checkpointId) { |
| throw new ExpectedTestException(); |
| } |
| |
| @Override |
| public void notifyCheckpointComplete(long checkpointId) { |
| throw new ExpectedTestException(); |
| } |
| } |
| |
| private static class FailedSource extends RichParallelSourceFunction<String> |
| implements CheckpointedFunction { |
| private static CountDownLatch runningLatch = null; |
| |
| private volatile boolean running; |
| |
| public FailedSource() { |
| runningLatch = new CountDownLatch(1); |
| } |
| |
| @Override |
| public void open(Configuration parameters) throws Exception { |
| running = true; |
| } |
| |
| @Override |
| public void run(SourceContext<String> ctx) throws Exception { |
| runningLatch.countDown(); |
| while (running) { |
| try { |
| Thread.sleep(Integer.MAX_VALUE); |
| } catch (InterruptedException ignore) { |
| // ignore |
| } |
| } |
| } |
| |
| @Override |
| public void cancel() { |
| running = false; |
| } |
| |
| @Override |
| public void snapshotState(FunctionSnapshotContext context) throws Exception { |
| if (runningLatch.getCount() == 0) { |
| throw new RuntimeException("source failed"); |
| } |
| } |
| |
| @Override |
| public void initializeState(FunctionInitializationContext context) throws Exception {} |
| |
| public void awaitRunning() throws InterruptedException { |
| runningLatch.await(); |
| } |
| } |
| |
| static class OpenFailingOperator<T> extends AbstractStreamOperator<T> |
| implements OneInputStreamOperator<T, T> { |
| static boolean wasClosed; |
| |
| public OpenFailingOperator() { |
| wasClosed = false; |
| } |
| |
| @Override |
| public void open() throws Exception { |
| throw new ExpectedTestException(); |
| } |
| |
| @Override |
| public void close() throws Exception { |
| wasClosed = true; |
| } |
| |
| @Override |
| public void processElement(StreamRecord<T> element) throws Exception {} |
| } |
| |
| /** |
| * A {@link StreamTask} that register a single timer that waits for a cancellation and then |
| * emits some data. The assumption is that output remains available until the future returned |
| * from {@link TaskInvokable#cancel()} is completed. Public * access to allow reflection in |
| * {@link Task}. |
| */ |
| public static class StreamTaskWithBlockingTimer extends StreamTask { |
| static volatile CompletableFuture<Void> timerStarted; |
| static volatile CompletableFuture<Void> timerFinished; |
| static volatile CompletableFuture<Void> invokableCancelled; |
| |
| public static void reset() { |
| timerStarted = new CompletableFuture<>(); |
| timerFinished = new CompletableFuture<>(); |
| invokableCancelled = new CompletableFuture<>(); |
| } |
| |
| // public access to allow reflection in Task |
| public StreamTaskWithBlockingTimer(Environment env) throws Exception { |
| super(env); |
| super.inputProcessor = getInputProcessor(); |
| getProcessingTimeServiceFactory() |
| .createProcessingTimeService(mainMailboxExecutor) |
| .registerTimer(0, unused -> onProcessingTime()); |
| } |
| |
| @Override |
| protected void cancelTask() throws Exception { |
| super.cancelTask(); |
| invokableCancelled.complete(null); |
| } |
| |
| private void onProcessingTime() { |
| try { |
| timerStarted.complete(null); |
| waitForCancellation(); |
| emit(); |
| timerFinished.complete(null); |
| } catch (Throwable e) { // assertion is Error |
| timerFinished.completeExceptionally(e); |
| } |
| } |
| |
| private void waitForCancellation() { |
| invokableCancelled.join(); |
| // allow network resources to be closed mistakenly |
| for (int i = 0; i < 10; i++) { |
| try { |
| Thread.sleep(50); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| // ignore: can be interrupted by TaskCanceller/Interrupter |
| } |
| } |
| } |
| |
| private void emit() throws IOException { |
| checkState(getEnvironment().getAllWriters().length > 0); |
| for (ResultPartitionWriter writer : getEnvironment().getAllWriters()) { |
| assertFalse(writer.isReleased()); |
| assertFalse(writer.isFinished()); |
| writer.emitRecord(ByteBuffer.allocate(10), 0); |
| } |
| } |
| |
| @Override |
| protected void init() {} |
| |
| private static StreamInputProcessor getInputProcessor() { |
| return new StreamInputProcessor() { |
| |
| @Override |
| public DataInputStatus processInput() { |
| return DataInputStatus.NOTHING_AVAILABLE; |
| } |
| |
| @Override |
| public CompletableFuture<Void> prepareSnapshot( |
| ChannelStateWriter channelStateWriter, long checkpointId) { |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| @Override |
| public CompletableFuture<?> getAvailableFuture() { |
| return new CompletableFuture<>(); |
| } |
| |
| @Override |
| public void close() {} |
| }; |
| } |
| } |
| |
| private static class CheckpointCompleteRecordOperator extends AbstractStreamOperator<Integer> |
| implements OneInputStreamOperator<Integer, Integer> { |
| |
| private final List<Long> notifiedCheckpoint = new ArrayList<>(); |
| |
| @Override |
| public void processElement(StreamRecord<Integer> element) throws Exception {} |
| |
| @Override |
| public void notifyCheckpointComplete(long checkpointId) throws Exception { |
| notifiedCheckpoint.add(checkpointId); |
| } |
| |
| public List<Long> getNotifiedCheckpoint() { |
| return notifiedCheckpoint; |
| } |
| } |
| } |