| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.streaming.runtime.tasks; |
| |
| import org.apache.flink.api.common.ExecutionConfig; |
| import org.apache.flink.api.common.JobID; |
| import org.apache.flink.api.common.operators.MailboxExecutor; |
| import org.apache.flink.api.common.typeinfo.TypeInformation; |
| import org.apache.flink.api.common.typeutils.TypeSerializer; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.core.memory.ManagedMemoryUseCase; |
| import org.apache.flink.metrics.Metric; |
| import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; |
| import org.apache.flink.runtime.clusterframework.types.ResourceID; |
| import org.apache.flink.runtime.event.AbstractEvent; |
| import org.apache.flink.runtime.execution.CancelTaskException; |
| import org.apache.flink.runtime.execution.Environment; |
| import org.apache.flink.runtime.io.network.api.EndOfData; |
| import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; |
| import org.apache.flink.runtime.io.network.api.StopMode; |
| import org.apache.flink.runtime.io.network.partition.ResultPartitionType; |
| import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate; |
| import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; |
| import org.apache.flink.runtime.jobgraph.JobVertexID; |
| import org.apache.flink.runtime.jobgraph.OperatorID; |
| import org.apache.flink.runtime.memory.MemoryManager; |
| import org.apache.flink.runtime.metrics.NoOpMetricRegistry; |
| import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; |
| import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; |
| import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; |
| import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; |
| import org.apache.flink.runtime.state.LocalRecoveryConfig; |
| import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl; |
| import org.apache.flink.runtime.state.TestLocalRecoveryConfig; |
| import org.apache.flink.runtime.state.TestTaskStateManager; |
| import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; |
| import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; |
| import org.apache.flink.streaming.api.TimeCharacteristic; |
| import org.apache.flink.streaming.api.graph.NonChainedOutput; |
| import org.apache.flink.streaming.api.graph.StreamConfig; |
| import org.apache.flink.streaming.api.graph.StreamNode; |
| import org.apache.flink.streaming.api.operators.AbstractStreamOperator; |
| import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; |
| import org.apache.flink.streaming.api.operators.StreamOperator; |
| import org.apache.flink.streaming.api.operators.StreamOperatorFactory; |
| import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; |
| import org.apache.flink.streaming.runtime.streamrecord.StreamElement; |
| import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; |
| import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor; |
| import org.apache.flink.util.ExceptionUtils; |
| import org.apache.flink.util.Preconditions; |
| import org.apache.flink.util.function.FunctionWithException; |
| import org.apache.flink.util.function.SupplierWithException; |
| |
| import org.junit.Assert; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| import static org.apache.flink.util.Preconditions.checkState; |
| |
| /** |
| * Test harness for testing a {@link StreamTask}. |
| * |
| * <p>This mock Invokable provides the task with a basic runtime context and allows pushing elements |
| * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements and |
| * events. You are free to modify the retrieved list. |
| * |
| * <p>After setting up everything the Task can be invoked using {@link #invoke()}. This will start a |
| * new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task thread |
| * to finish. |
| * |
| * @deprecated Please use {@link StreamTaskMailboxTestHarness} and {@link |
| * StreamTaskMailboxTestHarnessBuilder}. Do not add new code using this test harness. |
| */ |
| @Deprecated |
| public class StreamTaskTestHarness<OUT> { |
| |
| public static final int DEFAULT_MEMORY_MANAGER_SIZE = 1024 * 1024; |
| |
| public static final int DEFAULT_NETWORK_BUFFER_SIZE = 1024; |
| |
| private final FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> |
| taskFactory; |
| |
| public long memorySize; |
| public int bufferSize; |
| |
| protected StreamMockEnvironment mockEnv; |
| protected ExecutionConfig executionConfig; |
| public Configuration jobConfig; |
| public Configuration taskConfig; |
| protected StreamConfig streamConfig; |
| protected TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo(); |
| |
| protected TestTaskStateManager taskStateManager; |
| |
| private TypeSerializer<OUT> outputSerializer; |
| private TypeSerializer<StreamElement> outputStreamRecordSerializer; |
| |
| private LinkedBlockingQueue<Object> outputList; |
| |
| protected TaskThread taskThread; |
| |
| // These don't get initialized, the one-input/two-input specific test harnesses |
| // must initialize these if they want to simulate input. We have them here so that all the |
| // input related methods only need to be implemented once, in generic form |
| protected int numInputGates; |
| protected int numInputChannelsPerGate; |
| |
| private boolean setupCalled = false; |
| |
| @SuppressWarnings("rawtypes") |
| protected StreamTestSingleInputGate[] inputGates; |
| |
| public StreamTaskTestHarness( |
| FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory, |
| TypeInformation<OUT> outputType) { |
| this(taskFactory, outputType, TestLocalRecoveryConfig.disabled()); |
| } |
| |
| public StreamTaskTestHarness( |
| FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory, |
| TypeInformation<OUT> outputType, |
| File localRootDir) { |
| this( |
| taskFactory, |
| outputType, |
| new LocalRecoveryConfig( |
| new LocalRecoveryDirectoryProviderImpl( |
| localRootDir, new JobID(), new JobVertexID(), 0))); |
| } |
| |
| public StreamTaskTestHarness( |
| FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory, |
| TypeInformation<OUT> outputType, |
| LocalRecoveryConfig localRecoveryConfig) { |
| this.taskFactory = checkNotNull(taskFactory); |
| this.memorySize = DEFAULT_MEMORY_MANAGER_SIZE; |
| this.bufferSize = DEFAULT_NETWORK_BUFFER_SIZE; |
| |
| this.jobConfig = new Configuration(); |
| this.taskConfig = new Configuration(); |
| this.executionConfig = new ExecutionConfig(); |
| |
| streamConfig = new StreamConfig(taskConfig); |
| streamConfig.setStateBackendUsesManagedMemory(true); |
| streamConfig.setManagedMemoryFractionOperatorOfUseCase( |
| ManagedMemoryUseCase.STATE_BACKEND, 1.0); |
| |
| outputSerializer = outputType.createSerializer(executionConfig); |
| outputStreamRecordSerializer = new StreamElementSerializer<>(outputSerializer); |
| |
| this.taskStateManager = new TestTaskStateManager(localRecoveryConfig); |
| } |
| |
| public StreamMockEnvironment getEnvironment() { |
| return mockEnv; |
| } |
| |
| public TimerService getTimerService() { |
| return taskThread.task.getTimerService(); |
| } |
| |
| public TaskManagerRuntimeInfo getTaskManagerRuntimeInfo() { |
| return taskManagerRuntimeInfo; |
| } |
| |
| @SuppressWarnings("unchecked") |
| public <OP extends StreamOperator<OUT>> OP getHeadOperator() { |
| return (OP) taskThread.task.getMainOperator(); |
| } |
| |
| /** This must be overwritten for OneInputStreamTask or TwoInputStreamTask test harnesses. */ |
| protected void initializeInputs() throws IOException, InterruptedException {} |
| |
| public TestTaskStateManager getTaskStateManager() { |
| return taskStateManager; |
| } |
| |
| public void setTaskStateSnapshot(long checkpointId, TaskStateSnapshot taskStateSnapshot) { |
| taskStateManager.setReportedCheckpointId(checkpointId); |
| taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId( |
| Collections.singletonMap(checkpointId, taskStateSnapshot)); |
| } |
| |
| private void initializeOutput() { |
| outputList = new LinkedBlockingQueue<>(); |
| mockEnv.addOutput(outputList, outputStreamRecordSerializer); |
| } |
| |
| /** |
| * Users of the test harness can call this utility method to setup the stream config if there |
| * will only be a single operator to be tested. The method will setup the outgoing network |
| * connection for the operator. |
| * |
| * <p>For more advanced test cases such as testing chains of multiple operators with the |
| * harness, please manually configure the stream config. |
| */ |
| public void setupOutputForSingletonOperatorChain() { |
| Preconditions.checkState(!setupCalled, "This harness was already setup."); |
| setupCalled = true; |
| streamConfig.setChainStart(); |
| streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime); |
| streamConfig.setNumberOfOutputs(1); |
| streamConfig.setTypeSerializerOut(outputSerializer); |
| streamConfig.setVertexID(0); |
| streamConfig.setOperatorID(new OperatorID(4711L, 123L)); |
| |
| StreamOperator<OUT> dummyOperator = |
| new AbstractStreamOperator<OUT>() { |
| private static final long serialVersionUID = 1L; |
| }; |
| |
| List<NonChainedOutput> streamOutputs = new LinkedList<>(); |
| StreamNode sourceVertexDummy = |
| new StreamNode( |
| 0, "group", null, dummyOperator, "source dummy", SourceStreamTask.class); |
| |
| streamOutputs.add( |
| new NonChainedOutput( |
| true, |
| sourceVertexDummy.getId(), |
| 1, |
| 1, |
| 100, |
| false, |
| new IntermediateDataSetID(), |
| null, |
| new BroadcastPartitioner<>(), |
| ResultPartitionType.PIPELINED_BOUNDED)); |
| |
| streamConfig.setVertexNonChainedOutputs(streamOutputs); |
| streamConfig.setOperatorNonChainedOutputs(streamOutputs); |
| streamConfig.serializeAllConfigs(); |
| } |
| |
| public StreamMockEnvironment createEnvironment() { |
| StreamMockEnvironment streamMockEnvironment = |
| new StreamMockEnvironment( |
| jobConfig, |
| taskConfig, |
| executionConfig, |
| memorySize, |
| new MockInputSplitProvider(), |
| bufferSize, |
| taskStateManager); |
| if (taskManagerRuntimeInfo != null) { |
| streamMockEnvironment.setTaskManagerInfo(taskManagerRuntimeInfo); |
| } |
| |
| return streamMockEnvironment; |
| } |
| |
| /** |
| * Invoke the Task. This resets the output of any previous invocation. This will start a new |
| * Thread to execute the Task in. Use {@link #waitForTaskCompletion()} to wait for the Task |
| * thread to finish running. |
| */ |
| public Thread invoke() throws Exception { |
| streamConfig.serializeAllConfigs(); |
| return invoke(createEnvironment()); |
| } |
| |
| /** |
| * Invoke the Task. This resets the output of any previous invocation. This will start a new |
| * Thread to execute the Task in. Use {@link #waitForTaskCompletion()} to wait for the Task |
| * thread to finish running. |
| */ |
| public Thread invoke(StreamMockEnvironment mockEnv) throws Exception { |
| checkState(this.mockEnv == null); |
| checkState(this.taskThread == null); |
| this.mockEnv = checkNotNull(mockEnv); |
| |
| initializeInputs(); |
| initializeOutput(); |
| streamConfig.serializeAllConfigs(); |
| |
| taskThread = new TaskThread(() -> taskFactory.apply(mockEnv)); |
| taskThread.start(); |
| // Wait until the task is set |
| while (taskThread.task == null) { |
| if (taskThread.error != null) { |
| ExceptionUtils.rethrow(taskThread.error); |
| } |
| Thread.sleep(10L); |
| } |
| |
| return taskThread; |
| } |
| |
| /** Waits for the task completion. */ |
| public void waitForTaskCompletion() throws Exception { |
| waitForTaskCompletion(Long.MAX_VALUE); |
| } |
| |
| public void waitForTaskCompletion(long timeout) throws Exception { |
| waitForTaskCompletion(timeout, false); |
| } |
| |
| /** |
| * Waits for the task completion. If this does not happen within the timeout, then a |
| * TimeoutException is thrown. |
| * |
| * @param timeout Timeout for the task completion |
| */ |
| public void waitForTaskCompletion( |
| long timeout, boolean ignoreCancellationOrInterruptedException) throws Exception { |
| Preconditions.checkState(taskThread != null, "Task thread was not started."); |
| |
| taskThread.join(timeout); |
| if (taskThread.getError() != null) { |
| boolean errorIsCancellationOrInterrupted = |
| ExceptionUtils.findThrowable(taskThread.getError(), CancelTaskException.class) |
| .isPresent() |
| || ExceptionUtils.findThrowable( |
| taskThread.getError(), InterruptedException.class) |
| .isPresent(); |
| if (ignoreCancellationOrInterruptedException && errorIsCancellationOrInterrupted) { |
| return; |
| } |
| throw new Exception("error in task", taskThread.getError()); |
| } |
| } |
| |
| /** Waits for the task to be running. */ |
| public void waitForTaskRunning() throws Exception { |
| Preconditions.checkState(taskThread != null, "Task thread was not started."); |
| StreamTask<?, ?> streamTask = taskThread.task; |
| while (!streamTask.isRunning()) { |
| Thread.sleep(10); |
| if (!taskThread.isAlive()) { |
| if (taskThread.getError() != null) { |
| throw new Exception( |
| "Task Thread failed due to an error.", taskThread.getError()); |
| } else { |
| throw new Exception("Task Thread unexpectedly shut down."); |
| } |
| } |
| } |
| } |
| |
| public StreamTask<OUT, ?> getTask() { |
| return taskThread.task; |
| } |
| |
| public Thread getTaskThread() { |
| return taskThread; |
| } |
| |
| /** |
| * Get all the output from the task. This contains StreamRecords and Events interleaved. Use |
| * {@link |
| * org.apache.flink.streaming.util.TestHarnessUtil#getRawElementsFromOutput(java.util.Queue)}} |
| * to extract only the StreamRecords. |
| */ |
| public LinkedBlockingQueue<Object> getOutput() { |
| return outputList; |
| } |
| |
| public StreamConfig getStreamConfig() { |
| return streamConfig; |
| } |
| |
| public ExecutionConfig getExecutionConfig() { |
| return executionConfig; |
| } |
| |
| private void shutdownIOManager() throws Exception { |
| this.mockEnv.getIOManager().close(); |
| } |
| |
| private void shutdownMemoryManager() { |
| if (this.memorySize > 0) { |
| MemoryManager memMan = this.mockEnv.getMemoryManager(); |
| if (memMan != null) { |
| Assert.assertTrue( |
| "Memory Manager managed memory was not completely freed.", |
| memMan.verifyEmpty()); |
| memMan.shutdown(); |
| } |
| } |
| } |
| |
| /** Sends the element to input gate 0 on channel 0. */ |
| public void processElement(Object element) { |
| inputGates[0].sendElement(element, 0); |
| } |
| |
| /** Sends the element to the specified channel on the specified input gate. */ |
| public void processElement(Object element, int inputGate, int channel) { |
| inputGates[inputGate].sendElement(element, channel); |
| } |
| |
| /** Sends the event to input gate 0 on channel 0. */ |
| public void processEvent(AbstractEvent event) { |
| inputGates[0].sendEvent(event, 0); |
| } |
| |
| /** Sends the event to the specified channel on the specified input gate. */ |
| public void processEvent(AbstractEvent event, int inputGate, int channel) { |
| inputGates[inputGate].sendEvent(event, channel); |
| } |
| |
| /** This only returns after all input queues are empty. */ |
| public void waitForInputProcessing() throws Exception { |
| while (taskThread.isAlive()) { |
| boolean allEmpty = true; |
| for (int i = 0; i < numInputGates; i++) { |
| if (!inputGates[i].allQueuesEmpty()) { |
| allEmpty = false; |
| } |
| } |
| |
| if (allEmpty) { |
| break; |
| } |
| } |
| |
| // Wait for all currently available input has been processed. |
| final AtomicBoolean allInputProcessed = new AtomicBoolean(); |
| final MailboxProcessor mailboxProcessor = taskThread.task.mailboxProcessor; |
| final MailboxExecutor mailboxExecutor = mailboxProcessor.getMainMailboxExecutor(); |
| while (taskThread.isAlive()) { |
| try { |
| final CountDownLatch latch = new CountDownLatch(1); |
| mailboxExecutor.execute( |
| () -> { |
| allInputProcessed.set(!mailboxProcessor.isDefaultActionAvailable()); |
| latch.countDown(); |
| }, |
| "query-whether-processInput-has-suspend-itself"); |
| // Mail could be dropped due to task exception, so we do timed-await here. |
| latch.await(1, TimeUnit.SECONDS); |
| } catch (RejectedExecutionException ex) { |
| // Loop until task thread exit for possible task exception. |
| } |
| |
| if (allInputProcessed.get()) { |
| break; |
| } |
| |
| try { |
| Thread.sleep(1); |
| } catch (InterruptedException ignored) { |
| } |
| } |
| |
| Throwable error = taskThread.getError(); |
| if (error != null) { |
| throw new Exception("Exception in the task thread", error); |
| } |
| } |
| |
| /** |
| * Notifies all input channels on all input gates that no more input will arrive. This will |
| * usually make the Task exit from his internal loop. |
| */ |
| public void endInput() { |
| for (int i = 0; i < numInputGates; i++) { |
| inputGates[i].endInput(); |
| } |
| } |
| |
| /** |
| * Notifies the specified input channel on the specified input gate that no more data will |
| * arrive. |
| */ |
| public void endInput(int gateIndex, int channelIndex) { |
| endInput(gateIndex, channelIndex, true); |
| } |
| |
| public void endInput(int gateIndex, int channelIndex, boolean emitEndOfData) { |
| if (emitEndOfData) { |
| inputGates[gateIndex].sendEvent(new EndOfData(StopMode.DRAIN), channelIndex); |
| } |
| inputGates[gateIndex].sendEvent(EndOfPartitionEvent.INSTANCE, channelIndex); |
| } |
| |
| public StreamConfigChainer<StreamTaskTestHarness<OUT>> setupOperatorChain( |
| OperatorID headOperatorId, StreamOperator<?> headOperator) { |
| return setupOperatorChain(headOperatorId, SimpleOperatorFactory.of(headOperator)); |
| } |
| |
| public StreamConfigChainer<StreamTaskTestHarness<OUT>> setupOperatorChain( |
| OperatorID headOperatorId, StreamOperatorFactory<?> headOperatorFactory) { |
| Preconditions.checkState(!setupCalled, "This harness was already setup."); |
| setupCalled = true; |
| StreamConfig streamConfig = getStreamConfig(); |
| streamConfig.setStreamOperatorFactory(headOperatorFactory); |
| streamConfig.serializeAllConfigs(); |
| return new StreamConfigChainer(headOperatorId, streamConfig, this, 1); |
| } |
| |
| // ------------------------------------------------------------------------ |
| |
| class TaskThread extends Thread { |
| |
| private final SupplierWithException<? extends StreamTask<OUT, ?>, Exception> taskFactory; |
| private volatile StreamTask<OUT, ?> task; |
| |
| private volatile Throwable error; |
| |
| TaskThread(SupplierWithException<? extends StreamTask<OUT, ?>, Exception> taskFactory) { |
| super("Task Thread"); |
| this.taskFactory = taskFactory; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| task = taskFactory.get(); |
| task.invoke(); |
| shutdownIOManager(); |
| shutdownMemoryManager(); |
| } catch (Throwable throwable) { |
| this.error = throwable; |
| } finally { |
| try { |
| task.cleanUp(this.error); |
| } catch (Exception cleanUpException) { |
| if (this.error == null) { |
| this.error = cleanUpException; |
| } else { |
| this.error.addSuppressed(cleanUpException); |
| } |
| } |
| } |
| } |
| |
| public Throwable getError() { |
| return error; |
| } |
| } |
| |
| static TaskMetricGroup createTaskMetricGroup(Map<String, Metric> metrics) { |
| return TaskManagerMetricGroup.createTaskManagerMetricGroup( |
| new TestMetricRegistry(metrics), "localhost", ResourceID.generate()) |
| .addJob(new JobID(), "jobName") |
| .addTask(createExecutionAttemptId(), "test"); |
| } |
| |
| /** The metric registry for storing the registered metrics to verify in tests. */ |
| static class TestMetricRegistry extends NoOpMetricRegistry { |
| private final Map<String, Metric> metrics; |
| |
| TestMetricRegistry(Map<String, Metric> metrics) { |
| super(); |
| this.metrics = metrics; |
| } |
| |
| @Override |
| public void register(Metric metric, String metricName, AbstractMetricGroup group) { |
| metrics.put(metricName, metric); |
| } |
| } |
| } |