blob: 6aeb01aabd9bf18e177ce4f1f7ab2fb7fdd2e2a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.taskmanager;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.security.FlinkSecurityManager;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask;
import org.apache.flink.runtime.jobgraph.tasks.CoordinatedTask;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.KvStateService;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotPayload;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TaskManagerExceptionUtils;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.WrappingRuntimeException;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.RunnableWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_SAMPLES;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
* The Task represents one execution of a parallel subtask on a TaskManager. A Task wraps a Flink
* operator (which may be a user function) and runs it, providing all services necessary for example
* to consume input data, produce its results (intermediate result partitions) and communicate with
* the JobManager.
*
* <p>The Flink operators (implemented as subclasses of {@link TaskInvokable} have only data
* readers, writers, and certain event callbacks. The task connects those to the network stack and
* actor messages, and tracks the state of the execution and handles exceptions.
*
* <p>Tasks have no knowledge about how they relate to other tasks, or whether they are the first
* attempt to execute the task, or a repeated attempt. All of that is only known to the JobManager.
* All the task knows are its own runnable code, the task's configuration, and the IDs of the
* intermediate results to consume and produce (if any).
*
* <p>Each Task is run by one dedicated thread.
*/
public class Task
implements Runnable, TaskSlotPayload, TaskActions, PartitionProducerStateProvider {
/** The class logger. */
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
/** The thread group that contains all task threads. */
private static final ThreadGroup TASK_THREADS_GROUP = new ThreadGroup("Flink Task Threads");
/** For atomic state updates. */
private static final AtomicReferenceFieldUpdater<Task, ExecutionState> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(
Task.class, ExecutionState.class, "executionState");
// ------------------------------------------------------------------------
// Constant fields that are part of the initial Task construction
// ------------------------------------------------------------------------
/** The job that the task belongs to. */
private final JobID jobId;
/** The vertex in the JobGraph whose code the task executes. */
private final JobVertexID vertexId;
/** The execution attempt of the parallel subtask. */
private final ExecutionAttemptID executionId;
/** ID which identifies the slot in which the task is supposed to run. */
private final AllocationID allocationId;
/** TaskInfo object for this task. */
private final TaskInfo taskInfo;
/** The name of the task, including subtask indexes. */
private final String taskNameWithSubtask;
/** The job-wide configuration object. */
private final Configuration jobConfiguration;
/** The task-specific configuration. */
private final Configuration taskConfiguration;
/** The jar files used by this task. */
private final Collection<PermanentBlobKey> requiredJarFiles;
/** The classpaths used by this task. */
private final Collection<URL> requiredClasspaths;
/** The name of the class that holds the invokable code. */
private final String nameOfInvokableClass;
/** Access to task manager configuration and host names. */
private final TaskManagerRuntimeInfo taskManagerConfig;
/** The memory manager to be used by this task. */
private final MemoryManager memoryManager;
/** The I/O manager to be used by this task. */
private final IOManager ioManager;
/** The BroadcastVariableManager to be used by this task. */
private final BroadcastVariableManager broadcastVariableManager;
private final TaskEventDispatcher taskEventDispatcher;
/** Information provider for external resources. */
private final ExternalResourceInfoProvider externalResourceInfoProvider;
/** The manager for state of operators running in this task/slot. */
private final TaskStateManager taskStateManager;
/**
* Serialized version of the job specific execution configuration (see {@link ExecutionConfig}).
*/
private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
private final ResultPartitionWriter[] consumableNotifyingPartitionWriters;
private final IndexedInputGate[] inputGates;
/** Connection to the task manager. */
private final TaskManagerActions taskManagerActions;
/** Input split provider for the task. */
private final InputSplitProvider inputSplitProvider;
/** Checkpoint notifier used to communicate with the CheckpointCoordinator. */
private final CheckpointResponder checkpointResponder;
/**
* The gateway for operators to send messages to the operator coordinators on the Job Manager.
*/
private final TaskOperatorEventGateway operatorCoordinatorEventGateway;
/** GlobalAggregateManager used to update aggregates on the JobMaster. */
private final GlobalAggregateManager aggregateManager;
/** The library cache, from which the task can request its class loader. */
private final LibraryCacheManager.ClassLoaderHandle classLoaderHandle;
/** The cache for user-defined files that the invokable requires. */
private final FileCache fileCache;
/** The service for kvState registration of this task. */
private final KvStateService kvStateService;
/** The registry of this task which enables live reporting of accumulators. */
private final AccumulatorRegistry accumulatorRegistry;
/** The thread that executes the task. */
private final Thread executingThread;
/** Parent group for all metrics of this task. */
private final TaskMetricGroup metrics;
/** Partition producer state checker to request partition states from. */
private final PartitionProducerStateChecker partitionProducerStateChecker;
/** Executor to run future callbacks. */
private final Executor executor;
/** Future that is completed once {@link #run()} exits. */
private final CompletableFuture<ExecutionState> terminationFuture = new CompletableFuture<>();
// ------------------------------------------------------------------------
// Fields that control the task execution. All these fields are volatile
// (which means that they introduce memory barriers), to establish
// proper happens-before semantics on parallel modification
// ------------------------------------------------------------------------
/** atomic flag that makes sure the invokable is canceled exactly once upon error. */
private final AtomicBoolean invokableHasBeenCanceled;
/**
* The invokable of this task, if initialized. All accesses must copy the reference and check
* for null, as this field is cleared as part of the disposal logic.
*/
@Nullable private volatile TaskInvokable invokable;
/** The current execution state of the task. */
private volatile ExecutionState executionState = ExecutionState.CREATED;
/** The observed exception, in case the task execution failed. */
private volatile Throwable failureCause;
/** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
private long taskCancellationInterval;
/** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
private long taskCancellationTimeout;
/**
* This class loader should be set as the context class loader for threads that may dynamically
* load user code.
*/
private UserCodeClassLoader userCodeClassLoader;
/** The only one throughput meter per subtask. */
private ThroughputCalculator throughputCalculator;
/**
* <b>IMPORTANT:</b> This constructor may not start any work that would need to be undone in the
* case of a failing task deployment.
*/
public Task(
JobInformation jobInformation,
TaskInformation taskInformation,
ExecutionAttemptID executionAttemptID,
AllocationID slotAllocationId,
int subtaskIndex,
int attemptNumber,
List<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors,
List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors,
MemoryManager memManager,
IOManager ioManager,
ShuffleEnvironment<?, ?> shuffleEnvironment,
KvStateService kvStateService,
BroadcastVariableManager bcVarManager,
TaskEventDispatcher taskEventDispatcher,
ExternalResourceInfoProvider externalResourceInfoProvider,
TaskStateManager taskStateManager,
TaskManagerActions taskManagerActions,
InputSplitProvider inputSplitProvider,
CheckpointResponder checkpointResponder,
TaskOperatorEventGateway operatorCoordinatorEventGateway,
GlobalAggregateManager aggregateManager,
LibraryCacheManager.ClassLoaderHandle classLoaderHandle,
FileCache fileCache,
TaskManagerRuntimeInfo taskManagerConfig,
@Nonnull TaskMetricGroup metricGroup,
ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
PartitionProducerStateChecker partitionProducerStateChecker,
Executor executor) {
Preconditions.checkNotNull(jobInformation);
Preconditions.checkNotNull(taskInformation);
Preconditions.checkArgument(0 <= subtaskIndex, "The subtask index must be positive.");
Preconditions.checkArgument(0 <= attemptNumber, "The attempt number must be positive.");
this.taskInfo =
new TaskInfo(
taskInformation.getTaskName(),
taskInformation.getMaxNumberOfSubtasks(),
subtaskIndex,
taskInformation.getNumberOfSubtasks(),
attemptNumber,
String.valueOf(slotAllocationId));
this.jobId = jobInformation.getJobId();
this.vertexId = taskInformation.getJobVertexId();
this.executionId = Preconditions.checkNotNull(executionAttemptID);
this.allocationId = Preconditions.checkNotNull(slotAllocationId);
this.taskNameWithSubtask = taskInfo.getTaskNameWithSubtasks();
this.jobConfiguration = jobInformation.getJobConfiguration();
this.taskConfiguration = taskInformation.getTaskConfiguration();
this.requiredJarFiles = jobInformation.getRequiredJarFileBlobKeys();
this.requiredClasspaths = jobInformation.getRequiredClasspathURLs();
this.nameOfInvokableClass = taskInformation.getInvokableClassName();
this.serializedExecutionConfig = jobInformation.getSerializedExecutionConfig();
Configuration tmConfig = taskManagerConfig.getConfiguration();
this.taskCancellationInterval =
tmConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
this.taskCancellationTimeout =
tmConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT);
this.memoryManager = Preconditions.checkNotNull(memManager);
this.ioManager = Preconditions.checkNotNull(ioManager);
this.broadcastVariableManager = Preconditions.checkNotNull(bcVarManager);
this.taskEventDispatcher = Preconditions.checkNotNull(taskEventDispatcher);
this.taskStateManager = Preconditions.checkNotNull(taskStateManager);
this.accumulatorRegistry = new AccumulatorRegistry(jobId, executionId);
this.inputSplitProvider = Preconditions.checkNotNull(inputSplitProvider);
this.checkpointResponder = Preconditions.checkNotNull(checkpointResponder);
this.operatorCoordinatorEventGateway =
Preconditions.checkNotNull(operatorCoordinatorEventGateway);
this.aggregateManager = Preconditions.checkNotNull(aggregateManager);
this.taskManagerActions = checkNotNull(taskManagerActions);
this.externalResourceInfoProvider = checkNotNull(externalResourceInfoProvider);
this.classLoaderHandle = Preconditions.checkNotNull(classLoaderHandle);
this.fileCache = Preconditions.checkNotNull(fileCache);
this.kvStateService = Preconditions.checkNotNull(kvStateService);
this.taskManagerConfig = Preconditions.checkNotNull(taskManagerConfig);
this.metrics = metricGroup;
this.partitionProducerStateChecker =
Preconditions.checkNotNull(partitionProducerStateChecker);
this.executor = Preconditions.checkNotNull(executor);
// create the reader and writer structures
final String taskNameWithSubtaskAndId = taskNameWithSubtask + " (" + executionId + ')';
final ShuffleIOOwnerContext taskShuffleContext =
shuffleEnvironment.createShuffleIOOwnerContext(
taskNameWithSubtaskAndId, executionId, metrics.getIOMetricGroup());
// produced intermediate result partitions
final ResultPartitionWriter[] resultPartitionWriters =
shuffleEnvironment
.createResultPartitionWriters(
taskShuffleContext, resultPartitionDeploymentDescriptors)
.toArray(new ResultPartitionWriter[] {});
this.consumableNotifyingPartitionWriters =
ConsumableNotifyingResultPartitionWriterDecorator.decorate(
resultPartitionDeploymentDescriptors,
resultPartitionWriters,
this,
jobId,
resultPartitionConsumableNotifier);
// consumed intermediate result partitions
final IndexedInputGate[] gates =
shuffleEnvironment
.createInputGates(taskShuffleContext, this, inputGateDeploymentDescriptors)
.toArray(new IndexedInputGate[0]);
this.inputGates = new IndexedInputGate[gates.length];
this.throughputCalculator =
new ThroughputCalculator(
SystemClock.getInstance(), taskConfiguration.get(BUFFER_DEBLOAT_SAMPLES));
int counter = 0;
for (IndexedInputGate gate : gates) {
inputGates[counter++] =
new InputGateWithMetrics(
gate,
metrics.getIOMetricGroup().getNumBytesInCounter(),
throughputCalculator);
}
if (shuffleEnvironment instanceof NettyShuffleEnvironment) {
//noinspection deprecation
((NettyShuffleEnvironment) shuffleEnvironment)
.registerLegacyNetworkMetrics(
metrics.getIOMetricGroup(), resultPartitionWriters, gates);
}
invokableHasBeenCanceled = new AtomicBoolean(false);
// finally, create the executing thread, but do not start it
executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
}
// ------------------------------------------------------------------------
// Accessors
// ------------------------------------------------------------------------
@Override
public JobID getJobID() {
return jobId;
}
public JobVertexID getJobVertexId() {
return vertexId;
}
@Override
public ExecutionAttemptID getExecutionId() {
return executionId;
}
@Override
public AllocationID getAllocationId() {
return allocationId;
}
public TaskInfo getTaskInfo() {
return taskInfo;
}
public Configuration getJobConfiguration() {
return jobConfiguration;
}
public Configuration getTaskConfiguration() {
return this.taskConfiguration;
}
public AccumulatorRegistry getAccumulatorRegistry() {
return accumulatorRegistry;
}
public TaskMetricGroup getMetricGroup() {
return metrics;
}
public Thread getExecutingThread() {
return executingThread;
}
@Override
public CompletableFuture<ExecutionState> getTerminationFuture() {
return terminationFuture;
}
@VisibleForTesting
long getTaskCancellationInterval() {
return taskCancellationInterval;
}
@VisibleForTesting
long getTaskCancellationTimeout() {
return taskCancellationTimeout;
}
@Nullable
@VisibleForTesting
TaskInvokable getInvokable() {
return invokable;
}
public boolean isBackPressured() {
if (invokable == null
|| consumableNotifyingPartitionWriters.length == 0
|| (executionState != ExecutionState.INITIALIZING
&& executionState != ExecutionState.RUNNING)) {
return false;
}
for (int i = 0; i < consumableNotifyingPartitionWriters.length; ++i) {
if (!consumableNotifyingPartitionWriters[i].isAvailable()) {
return true;
}
}
return false;
}
// ------------------------------------------------------------------------
// Task Execution
// ------------------------------------------------------------------------
/**
* Returns the current execution state of the task.
*
* @return The current execution state of the task.
*/
public ExecutionState getExecutionState() {
return this.executionState;
}
/**
* Checks whether the task has failed, is canceled, or is being canceled at the moment.
*
* @return True is the task in state FAILED, CANCELING, or CANCELED, false otherwise.
*/
public boolean isCanceledOrFailed() {
return executionState == ExecutionState.CANCELING
|| executionState == ExecutionState.CANCELED
|| executionState == ExecutionState.FAILED;
}
/**
* If the task has failed, this method gets the exception that caused this task to fail.
* Otherwise this method returns null.
*
* @return The exception that caused the task to fail, or null, if the task has not failed.
*/
public Throwable getFailureCause() {
return failureCause;
}
/** Starts the task's thread. */
public void startTaskThread() {
executingThread.start();
}
/** The core work method that bootstraps the task and executes its code. */
@Override
public void run() {
try {
doRun();
} finally {
terminationFuture.complete(executionState);
}
}
private void doRun() {
// ----------------------------
// Initial State transition
// ----------------------------
while (true) {
ExecutionState current = this.executionState;
if (current == ExecutionState.CREATED) {
if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
// success, we can start our work
break;
}
} else if (current == ExecutionState.FAILED) {
// we were immediately failed. tell the TaskManager that we reached our final state
notifyFinalState();
if (metrics != null) {
metrics.close();
}
return;
} else if (current == ExecutionState.CANCELING) {
if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
// we were immediately canceled. tell the TaskManager that we reached our final
// state
notifyFinalState();
if (metrics != null) {
metrics.close();
}
return;
}
} else {
if (metrics != null) {
metrics.close();
}
throw new IllegalStateException(
"Invalid state for beginning of operation of task " + this + '.');
}
}
// all resource acquisitions and registrations from here on
// need to be undone in the end
Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
TaskInvokable invokable = null;
try {
// ----------------------------
// Task Bootstrap - We periodically
// check for canceling as a shortcut
// ----------------------------
// activate safety net for task thread
LOG.debug("Creating FileSystem stream leak safety net for task {}", this);
FileSystemSafetyNet.initializeSafetyNetForThread();
// first of all, get a user-code classloader
// this may involve downloading the job's JAR files and/or classes
LOG.info("Loading JAR files for task {}.", this);
userCodeClassLoader = createUserCodeClassloader();
final ExecutionConfig executionConfig =
serializedExecutionConfig.deserializeValue(userCodeClassLoader.asClassLoader());
if (executionConfig.getTaskCancellationInterval() >= 0) {
// override task cancellation interval from Flink config if set in ExecutionConfig
taskCancellationInterval = executionConfig.getTaskCancellationInterval();
}
if (executionConfig.getTaskCancellationTimeout() >= 0) {
// override task cancellation timeout from Flink config if set in ExecutionConfig
taskCancellationTimeout = executionConfig.getTaskCancellationTimeout();
}
if (isCanceledOrFailed()) {
throw new CancelTaskException();
}
// ----------------------------------------------------------------
// register the task with the network stack
// this operation may fail if the system does not have enough
// memory to run the necessary data exchanges
// the registration must also strictly be undone
// ----------------------------------------------------------------
LOG.debug("Registering task at network: {}.", this);
setupPartitionsAndGates(consumableNotifyingPartitionWriters, inputGates);
for (ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) {
taskEventDispatcher.registerPartition(partitionWriter.getPartitionId());
}
// next, kick off the background copying of files for the distributed cache
try {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
DistributedCache.readFileInfoFromConfig(jobConfiguration)) {
LOG.info("Obtaining local cache file for '{}'.", entry.getKey());
Future<Path> cp =
fileCache.createTmpFile(
entry.getKey(), entry.getValue(), jobId, executionId);
distributedCacheEntries.put(entry.getKey(), cp);
}
} catch (Exception e) {
throw new Exception(
String.format(
"Exception while adding files to distributed cache of task %s (%s).",
taskNameWithSubtask, executionId),
e);
}
if (isCanceledOrFailed()) {
throw new CancelTaskException();
}
// ----------------------------------------------------------------
// call the user code initialization methods
// ----------------------------------------------------------------
TaskKvStateRegistry kvStateRegistry =
kvStateService.createKvStateTaskRegistry(jobId, getJobVertexId());
Environment env =
new RuntimeEnvironment(
jobId,
vertexId,
executionId,
executionConfig,
taskInfo,
jobConfiguration,
taskConfiguration,
userCodeClassLoader,
memoryManager,
ioManager,
broadcastVariableManager,
taskStateManager,
aggregateManager,
accumulatorRegistry,
kvStateRegistry,
inputSplitProvider,
distributedCacheEntries,
consumableNotifyingPartitionWriters,
inputGates,
taskEventDispatcher,
checkpointResponder,
operatorCoordinatorEventGateway,
taskManagerConfig,
metrics,
this,
externalResourceInfoProvider,
throughputCalculator);
// Make sure the user code classloader is accessible thread-locally.
// We are setting the correct context class loader before instantiating the invokable
// so that it is available to the invokable during its entire lifetime.
executingThread.setContextClassLoader(userCodeClassLoader.asClassLoader());
// When constructing invokable, separate threads can be constructed and thus should be
// monitored for system exit (in addition to invoking thread itself monitored below).
FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
try {
// now load and instantiate the task's invokable code
invokable =
loadAndInstantiateInvokable(
userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);
} finally {
FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
}
// ----------------------------------------------------------------
// actual task core work
// ----------------------------------------------------------------
// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;
restoreAndInvoke(invokable);
// make sure, we enter the catch block if the task leaves the invoke() method due
// to the fact that it has been canceled
if (isCanceledOrFailed()) {
throw new CancelTaskException();
}
// ----------------------------------------------------------------
// finalization of a successful execution
// ----------------------------------------------------------------
// finish the produced partitions. if this fails, we consider the execution failed.
for (ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) {
if (partitionWriter != null) {
partitionWriter.finish();
}
}
// try to mark the task as finished
// if that fails, the task was canceled/failed in the meantime
if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
throw new CancelTaskException();
}
} catch (Throwable t) {
// unwrap wrapped exceptions to make stack traces more compact
if (t instanceof WrappingRuntimeException) {
t = ((WrappingRuntimeException) t).unwrap();
}
// ----------------------------------------------------------------
// the execution failed. either the invokable code properly failed, or
// an exception was thrown as a side effect of cancelling
// ----------------------------------------------------------------
TaskManagerExceptionUtils.tryEnrichTaskManagerError(t);
try {
// check if the exception is unrecoverable
if (ExceptionUtils.isJvmFatalError(t)
|| (t instanceof OutOfMemoryError
&& taskManagerConfig.shouldExitJvmOnOutOfMemoryError())) {
// terminate the JVM immediately
// don't attempt a clean shutdown, because we cannot expect the clean shutdown
// to complete
try {
LOG.error(
"Encountered fatal error {} - terminating the JVM",
t.getClass().getName(),
t);
} finally {
Runtime.getRuntime().halt(-1);
}
}
// transition into our final state. we should be either in DEPLOYING, INITIALIZING,
// RUNNING, CANCELING, or FAILED
// loop for multiple retries during concurrent state changes via calls to cancel()
// or to failExternally()
while (true) {
ExecutionState current = this.executionState;
if (current == ExecutionState.RUNNING
|| current == ExecutionState.INITIALIZING
|| current == ExecutionState.DEPLOYING) {
if (t instanceof CancelTaskException) {
if (transitionState(current, ExecutionState.CANCELED)) {
cancelInvokable(invokable);
break;
}
} else {
if (transitionState(current, ExecutionState.FAILED, t)) {
// proper failure of the task. record the exception as the root
// cause
failureCause = t;
cancelInvokable(invokable);
break;
}
}
} else if (current == ExecutionState.CANCELING) {
if (transitionState(current, ExecutionState.CANCELED)) {
break;
}
} else if (current == ExecutionState.FAILED) {
// in state failed already, no transition necessary any more
break;
}
// unexpected state, go to failed
else if (transitionState(current, ExecutionState.FAILED, t)) {
LOG.error(
"Unexpected state in task {} ({}) during an exception: {}.",
taskNameWithSubtask,
executionId,
current);
break;
}
// else fall through the loop and
}
} catch (Throwable tt) {
String message =
String.format(
"FATAL - exception in exception handler of task %s (%s).",
taskNameWithSubtask, executionId);
LOG.error(message, tt);
notifyFatalError(message, tt);
}
} finally {
try {
LOG.info("Freeing task resources for {} ({}).", taskNameWithSubtask, executionId);
// clear the reference to the invokable. this helps guard against holding references
// to the invokable and its structures in cases where this Task object is still
// referenced
this.invokable = null;
// free the network resources
releaseResources();
// free memory resources
if (invokable != null) {
memoryManager.releaseAll(invokable);
}
// remove all of the tasks resources
fileCache.releaseJob(jobId, executionId);
// close and de-activate safety net for task thread
LOG.debug("Ensuring all FileSystem streams are closed for task {}", this);
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
notifyFinalState();
} catch (Throwable t) {
// an error in the resource cleanup is fatal
String message =
String.format(
"FATAL - exception in resource cleanup of task %s (%s).",
taskNameWithSubtask, executionId);
LOG.error(message, t);
notifyFatalError(message, t);
}
// un-register the metrics at the end so that the task may already be
// counted as finished when this happens
// errors here will only be logged
try {
metrics.close();
} catch (Throwable t) {
LOG.error(
"Error during metrics de-registration of task {} ({}).",
taskNameWithSubtask,
executionId,
t);
}
}
}
private void restoreAndInvoke(TaskInvokable finalInvokable) throws Exception {
try {
// switch to the INITIALIZING state, if that fails, we have been canceled/failed in the
// meantime
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.INITIALIZING)) {
throw new CancelTaskException();
}
taskManagerActions.updateTaskExecutionState(
new TaskExecutionState(executionId, ExecutionState.INITIALIZING));
// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader.asClassLoader());
runWithSystemExitMonitoring(finalInvokable::restore);
if (!transitionState(ExecutionState.INITIALIZING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
// notify everyone that we switched to running
taskManagerActions.updateTaskExecutionState(
new TaskExecutionState(executionId, ExecutionState.RUNNING));
runWithSystemExitMonitoring(finalInvokable::invoke);
} catch (Throwable throwable) {
try {
runWithSystemExitMonitoring(() -> finalInvokable.cleanUp(throwable));
} catch (Throwable cleanUpThrowable) {
throwable.addSuppressed(cleanUpThrowable);
}
throw throwable;
}
runWithSystemExitMonitoring(() -> finalInvokable.cleanUp(null));
}
/**
* Monitor user codes from exiting JVM covering user function invocation. This can be done in a
* finer-grained way like enclosing user callback functions individually, but as exit triggered
* by framework is not performed and expected in this invoke function anyhow, we can monitor
* exiting JVM for entire scope.
*/
private void runWithSystemExitMonitoring(RunnableWithException action) throws Exception {
FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
try {
action.run();
} finally {
FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
}
}
@VisibleForTesting
public static void setupPartitionsAndGates(
ResultPartitionWriter[] producedPartitions, InputGate[] inputGates) throws IOException {
for (ResultPartitionWriter partition : producedPartitions) {
partition.setup();
}
// InputGates must be initialized after the partitions, since during InputGate#setup
// we are requesting partitions
for (InputGate gate : inputGates) {
gate.setup();
}
}
/**
* Releases resources before task exits. We should also fail the partition to release if the
* task has failed, is canceled, or is being canceled at the moment.
*/
private void releaseResources() {
LOG.debug(
"Release task {} network resources (state: {}).",
taskNameWithSubtask,
getExecutionState());
for (ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) {
taskEventDispatcher.unregisterPartition(partitionWriter.getPartitionId());
}
// close network resources
if (isCanceledOrFailed()) {
failAllResultPartitions();
}
closeAllResultPartitions();
closeAllInputGates();
try {
taskStateManager.close();
} catch (Exception e) {
LOG.error("Failed to close task state manager for task {}.", taskNameWithSubtask, e);
}
}
private void failAllResultPartitions() {
for (ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) {
partitionWriter.fail(getFailureCause());
}
}
private void closeAllResultPartitions() {
for (ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) {
try {
partitionWriter.close();
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalError(t);
LOG.error(
"Failed to release result partition for task {}.", taskNameWithSubtask, t);
}
}
}
private void closeAllInputGates() {
TaskInvokable invokable = this.invokable;
if (invokable == null || !invokable.isUsingNonBlockingInput()) {
// Cleanup resources instead of invokable if it is null, or prevent it from being
// blocked on input, or interrupt if it is already blocked. Not needed for StreamTask
// (which does NOT use blocking input); for which this could cause race conditions
for (InputGate inputGate : inputGates) {
try {
inputGate.close();
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalError(t);
LOG.error("Failed to release input gate for task {}.", taskNameWithSubtask, t);
}
}
}
}
private UserCodeClassLoader createUserCodeClassloader() throws Exception {
long startDownloadTime = System.currentTimeMillis();
// triggers the download of all missing jar files from the job manager
final UserCodeClassLoader userCodeClassLoader =
classLoaderHandle.getOrResolveClassLoader(requiredJarFiles, requiredClasspaths);
LOG.debug(
"Getting user code class loader for task {} at library cache manager took {} milliseconds",
executionId,
System.currentTimeMillis() - startDownloadTime);
return userCodeClassLoader;
}
private void notifyFinalState() {
checkState(executionState.isTerminal());
taskManagerActions.updateTaskExecutionState(
new TaskExecutionState(executionId, executionState, failureCause));
}
private void notifyFatalError(String message, Throwable cause) {
taskManagerActions.notifyFatalError(message, cause);
}
/**
* Try to transition the execution state from the current state to the new state.
*
* @param currentState of the execution
* @param newState of the execution
* @return true if the transition was successful, otherwise false
*/
private boolean transitionState(ExecutionState currentState, ExecutionState newState) {
return transitionState(currentState, newState, null);
}
/**
* Try to transition the execution state from the current state to the new state.
*
* @param currentState of the execution
* @param newState of the execution
* @param cause of the transition change or null
* @return true if the transition was successful, otherwise false
*/
private boolean transitionState(
ExecutionState currentState, ExecutionState newState, Throwable cause) {
if (STATE_UPDATER.compareAndSet(this, currentState, newState)) {
if (cause == null) {
LOG.info(
"{} ({}) switched from {} to {}.",
taskNameWithSubtask,
executionId,
currentState,
newState);
} else {
LOG.warn(
"{} ({}) switched from {} to {} with failure cause: {}",
taskNameWithSubtask,
executionId,
currentState,
newState,
ExceptionUtils.stringifyException(cause));
}
return true;
} else {
return false;
}
}
// ----------------------------------------------------------------------------------------------------------------
// Canceling / Failing the task from the outside
// ----------------------------------------------------------------------------------------------------------------
/**
* Cancels the task execution. If the task is already in a terminal state (such as FINISHED,
* CANCELED, FAILED), or if the task is already canceling this does nothing. Otherwise it sets
* the state to CANCELING, and, if the invokable code is running, starts an asynchronous thread
* that aborts that code.
*
* <p>This method never blocks.
*/
public void cancelExecution() {
LOG.info("Attempting to cancel task {} ({}).", taskNameWithSubtask, executionId);
cancelOrFailAndCancelInvokable(ExecutionState.CANCELING, null);
}
/**
* Marks task execution failed for an external reason (a reason other than the task code itself
* throwing an exception). If the task is already in a terminal state (such as FINISHED,
* CANCELED, FAILED), or if the task is already canceling this does nothing. Otherwise it sets
* the state to FAILED, and, if the invokable code is running, starts an asynchronous thread
* that aborts that code.
*
* <p>This method never blocks.
*/
@Override
public void failExternally(Throwable cause) {
LOG.info("Attempting to fail task externally {} ({}).", taskNameWithSubtask, executionId);
cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause);
}
private void cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwable cause) {
try {
cancelOrFailAndCancelInvokableInternal(targetState, cause);
} catch (Throwable t) {
if (ExceptionUtils.isJvmFatalOrOutOfMemoryError(t)) {
String message =
String.format(
"FATAL - exception in cancelling task %s (%s).",
taskNameWithSubtask, executionId);
notifyFatalError(message, t);
} else {
throw t;
}
}
}
@VisibleForTesting
void cancelOrFailAndCancelInvokableInternal(ExecutionState targetState, Throwable cause) {
while (true) {
ExecutionState current = executionState;
// if the task is already canceled (or canceling) or finished or failed,
// then we need not do anything
if (current.isTerminal() || current == ExecutionState.CANCELING) {
LOG.info("Task {} is already in state {}", taskNameWithSubtask, current);
return;
}
if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) {
if (transitionState(current, targetState, cause)) {
// if we manage this state transition, then the invokable gets never called
// we need not call cancel on it
this.failureCause = cause;
return;
}
} else if (current == ExecutionState.INITIALIZING
|| current == ExecutionState.RUNNING) {
if (transitionState(current, targetState, cause)) {
// we are canceling / failing out of the running state
// we need to cancel the invokable
// copy reference to guard against concurrent null-ing out the reference
final TaskInvokable invokable = this.invokable;
if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
this.failureCause = cause;
LOG.info(
"Triggering cancellation of task code {} ({}).",
taskNameWithSubtask,
executionId);
// because the canceling may block on user code, we cancel from a separate
// thread
// we do not reuse the async call handler, because that one may be blocked,
// in which
// case the canceling could not continue
// The canceller calls cancel and interrupts the executing thread once
Runnable canceler =
new TaskCanceler(
LOG,
taskCancellationTimeout > 0
? taskCancellationTimeout
: TaskManagerOptions.TASK_CANCELLATION_TIMEOUT
.defaultValue(),
invokable,
executingThread,
taskNameWithSubtask);
Thread cancelThread =
new Thread(
executingThread.getThreadGroup(),
canceler,
String.format(
"Canceler for %s (%s).",
taskNameWithSubtask, executionId));
cancelThread.setDaemon(true);
cancelThread.setUncaughtExceptionHandler(
FatalExitExceptionHandler.INSTANCE);
cancelThread.start();
// the periodic interrupting thread - a different thread than the canceller,
// in case
// the application code does blocking stuff in its cancellation paths.
Runnable interrupter =
new TaskInterrupter(
LOG,
invokable,
executingThread,
taskNameWithSubtask,
taskCancellationInterval);
Thread interruptingThread =
new Thread(
executingThread.getThreadGroup(),
interrupter,
String.format(
"Canceler/Interrupts for %s (%s).",
taskNameWithSubtask, executionId));
interruptingThread.setDaemon(true);
interruptingThread.setUncaughtExceptionHandler(
FatalExitExceptionHandler.INSTANCE);
interruptingThread.start();
// if a cancellation timeout is set, the watchdog thread kills the process
// if graceful cancellation does not succeed
if (taskCancellationTimeout > 0) {
Runnable cancelWatchdog =
new TaskCancelerWatchDog(
taskInfo,
executingThread,
taskManagerActions,
taskCancellationTimeout);
Thread watchDogThread =
new Thread(
executingThread.getThreadGroup(),
cancelWatchdog,
String.format(
"Cancellation Watchdog for %s (%s).",
taskNameWithSubtask, executionId));
watchDogThread.setDaemon(true);
watchDogThread.setUncaughtExceptionHandler(
FatalExitExceptionHandler.INSTANCE);
watchDogThread.start();
}
}
return;
}
} else {
throw new IllegalStateException(
String.format(
"Unexpected state: %s of task %s (%s).",
current, taskNameWithSubtask, executionId));
}
}
}
// ------------------------------------------------------------------------
// Partition State Listeners
// ------------------------------------------------------------------------
@Override
public void requestPartitionProducerState(
final IntermediateDataSetID intermediateDataSetId,
final ResultPartitionID resultPartitionId,
Consumer<? super ResponseHandle> responseConsumer) {
final CompletableFuture<ExecutionState> futurePartitionState =
partitionProducerStateChecker.requestPartitionProducerState(
jobId, intermediateDataSetId, resultPartitionId);
FutureUtils.assertNoException(
futurePartitionState
.handle(PartitionProducerStateResponseHandle::new)
.thenAcceptAsync(responseConsumer, executor));
}
// ------------------------------------------------------------------------
// Notifications on the invokable
// ------------------------------------------------------------------------
/**
* Calls the invokable to trigger a checkpoint.
*
* @param checkpointID The ID identifying the checkpoint.
* @param checkpointTimestamp The timestamp associated with the checkpoint.
* @param checkpointOptions Options for performing this checkpoint.
*/
public void triggerCheckpointBarrier(
final long checkpointID,
final long checkpointTimestamp,
final CheckpointOptions checkpointOptions) {
final TaskInvokable invokable = this.invokable;
final CheckpointMetaData checkpointMetaData =
new CheckpointMetaData(
checkpointID, checkpointTimestamp, System.currentTimeMillis());
if (executionState == ExecutionState.RUNNING) {
checkState(invokable instanceof CheckpointableTask, "invokable is not checkpointable");
try {
((CheckpointableTask) invokable)
.triggerCheckpointAsync(checkpointMetaData, checkpointOptions)
.handle(
(triggerResult, exception) -> {
if (exception != null || !triggerResult) {
declineCheckpoint(
checkpointID,
CheckpointFailureReason.TASK_FAILURE,
exception);
return false;
}
return true;
});
} catch (RejectedExecutionException ex) {
// This may happen if the mailbox is closed. It means that the task is shutting
// down, so we just ignore it.
LOG.debug(
"Triggering checkpoint {} for {} ({}) was rejected by the mailbox",
checkpointID,
taskNameWithSubtask,
executionId);
declineCheckpoint(
checkpointID, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING);
} catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(
new Exception(
"Error while triggering checkpoint "
+ checkpointID
+ " for "
+ taskNameWithSubtask,
t));
} else {
LOG.debug(
"Encountered error while triggering checkpoint {} for "
+ "{} ({}) while being not in state running.",
checkpointID,
taskNameWithSubtask,
executionId,
t);
}
}
} else {
LOG.debug(
"Declining checkpoint request for non-running task {} ({}).",
taskNameWithSubtask,
executionId);
// send back a message that we did not do the checkpoint
declineCheckpoint(
checkpointID, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY);
}
}
private void declineCheckpoint(long checkpointID, CheckpointFailureReason failureReason) {
declineCheckpoint(checkpointID, failureReason, null);
}
private void declineCheckpoint(
long checkpointID,
CheckpointFailureReason failureReason,
@Nullable Throwable failureCause) {
checkpointResponder.declineCheckpoint(
jobId,
executionId,
checkpointID,
new CheckpointException(
"Task name with subtask : " + taskNameWithSubtask,
failureReason,
failureCause));
}
public void notifyCheckpointComplete(final long checkpointID) {
final TaskInvokable invokable = this.invokable;
if (executionState == ExecutionState.RUNNING) {
checkState(invokable instanceof CheckpointableTask, "invokable is not checkpointable");
try {
((CheckpointableTask) invokable).notifyCheckpointCompleteAsync(checkpointID);
} catch (RejectedExecutionException ex) {
// This may happen if the mailbox is closed. It means that the task is shutting
// down, so we just ignore it.
LOG.debug(
"Notify checkpoint complete {} for {} ({}) was rejected by the mailbox",
checkpointID,
taskNameWithSubtask,
executionId);
} catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
// fail task if checkpoint confirmation failed.
failExternally(new RuntimeException("Error while confirming checkpoint", t));
}
}
} else {
LOG.debug(
"Ignoring checkpoint commit notification for non-running task {}.",
taskNameWithSubtask);
}
}
public void notifyCheckpointAborted(
final long checkpointID, final long latestCompletedCheckpointId) {
final TaskInvokable invokable = this.invokable;
if (executionState == ExecutionState.RUNNING) {
checkState(invokable instanceof CheckpointableTask, "invokable is not checkpointable");
try {
((CheckpointableTask) invokable)
.notifyCheckpointAbortAsync(checkpointID, latestCompletedCheckpointId);
} catch (RejectedExecutionException ex) {
// This may happen if the mailbox is closed. It means that the task is shutting
// down, so we just ignore it.
LOG.debug(
"Notify checkpoint abort {} for {} ({}) was rejected by the mailbox",
checkpointID,
taskNameWithSubtask,
executionId);
} catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
// fail task if checkpoint aborted notification failed.
failExternally(new RuntimeException("Error while aborting checkpoint", t));
}
}
} else {
LOG.info(
"Ignoring checkpoint aborted notification for non-running task {}.",
taskNameWithSubtask);
}
}
/**
* Dispatches an operator event to the invokable task.
*
* <p>If the event delivery did not succeed, this method throws an exception. Callers can use
* that exception for error reporting, but need not react with failing this task (this method
* takes care of that).
*
* @throws FlinkException This method throws exceptions indicating the reason why delivery did
* not succeed.
*/
public void deliverOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> evt)
throws FlinkException {
final TaskInvokable invokable = this.invokable;
final ExecutionState currentState = this.executionState;
if (invokable == null
|| (currentState != ExecutionState.RUNNING
&& currentState != ExecutionState.INITIALIZING)) {
throw new TaskNotRunningException("Task is not running, but in state " + currentState);
}
if (invokable instanceof CoordinatedTask) {
try {
((CoordinatedTask) invokable).dispatchOperatorEvent(operator, evt);
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
if (getExecutionState() == ExecutionState.RUNNING
|| getExecutionState() == ExecutionState.INITIALIZING) {
FlinkException e = new FlinkException("Error while handling operator event", t);
failExternally(e);
throw e;
}
}
}
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
private void cancelInvokable(TaskInvokable invokable) {
// in case of an exception during execution, we still call "cancel()" on the task
if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
try {
invokable.cancel();
} catch (Throwable t) {
LOG.error("Error while canceling task {}.", taskNameWithSubtask, t);
}
}
}
@Override
public String toString() {
return String.format("%s (%s) [%s]", taskNameWithSubtask, executionId, executionState);
}
@VisibleForTesting
class PartitionProducerStateResponseHandle implements ResponseHandle {
private final Either<ExecutionState, Throwable> result;
PartitionProducerStateResponseHandle(
@Nullable ExecutionState producerState, @Nullable Throwable t) {
this.result = producerState != null ? Either.Left(producerState) : Either.Right(t);
}
@Override
public ExecutionState getConsumerExecutionState() {
return executionState;
}
@Override
public Either<ExecutionState, Throwable> getProducerExecutionState() {
return result;
}
@Override
public void cancelConsumption() {
cancelExecution();
}
@Override
public void failConsumption(Throwable cause) {
failExternally(cause);
}
}
/**
* Instantiates the given task invokable class, passing the given environment (and possibly the
* initial task state) to the task's constructor.
*
* <p>The method will first try to instantiate the task via a constructor accepting both the
* Environment and the TaskStateSnapshot. If no such constructor exists, and there is no initial
* state, the method will fall back to the stateless convenience constructor that accepts only
* the Environment.
*
* @param classLoader The classloader to load the class through.
* @param className The name of the class to load.
* @param environment The task environment.
* @return The instantiated invokable task object.
* @throws Throwable Forwards all exceptions that happen during initialization of the task. Also
* throws an exception if the task class misses the necessary constructor.
*/
private static TaskInvokable loadAndInstantiateInvokable(
ClassLoader classLoader, String className, Environment environment) throws Throwable {
final Class<? extends TaskInvokable> invokableClass;
try {
invokableClass =
Class.forName(className, true, classLoader).asSubclass(TaskInvokable.class);
} catch (Throwable t) {
throw new Exception("Could not load the task's invokable class.", t);
}
Constructor<? extends TaskInvokable> statelessCtor;
try {
statelessCtor = invokableClass.getConstructor(Environment.class);
} catch (NoSuchMethodException ee) {
throw new FlinkException("Task misses proper constructor", ee);
}
// instantiate the class
try {
//noinspection ConstantConditions --> cannot happen
return statelessCtor.newInstance(environment);
} catch (InvocationTargetException e) {
// directly forward exceptions from the eager initialization
throw e.getTargetException();
} catch (Exception e) {
throw new FlinkException("Could not instantiate the task's invokable class.", e);
}
}
// ------------------------------------------------------------------------
// Task cancellation
//
// The task cancellation uses in total three threads, as a safety net
// against various forms of user- and JVM bugs.
//
// - The first thread calls 'cancel()' on the invokable and closes
// the input and output connections, for fast thread termination
// - The second thread periodically interrupts the invokable in order
// to pull the thread out of blocking wait and I/O operations
// - The third thread (watchdog thread) waits until the cancellation
// timeout and then performs a hard cancel (kill process, or let
// the TaskManager know)
//
// Previously, thread two and three were in one thread, but we needed
// to separate this to make sure the watchdog thread does not call
// 'interrupt()'. This is a workaround for the following JVM bug
// https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8138622
// ------------------------------------------------------------------------
/**
* This runner calls cancel() on the invokable, closes input-/output resources, and initially
* interrupts the task thread.
*/
private class TaskCanceler implements Runnable {
private final Logger logger;
/** Time to wait after cancellation and interruption before releasing network resources. */
private final long taskCancellationTimeout;
private final TaskInvokable invokable;
private final Thread executer;
private final String taskName;
TaskCanceler(
Logger logger,
long taskCancellationTimeout,
TaskInvokable invokable,
Thread executer,
String taskName) {
this.logger = logger;
this.taskCancellationTimeout = taskCancellationTimeout;
this.invokable = invokable;
this.executer = executer;
this.taskName = taskName;
}
@Override
public void run() {
try {
// the user-defined cancel method may throw errors.
// we need do continue despite that
try {
Future<Void> cancellationFuture = invokable.cancel();
// Wait for any active actions to complete (e.g. timers, mailbox actions)
// Before that, interrupt to notify them about cancellation
invokable.maybeInterruptOnCancel(executer, null, null);
try {
cancellationFuture.get(taskCancellationTimeout, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException | InterruptedException e) {
logger.debug("Error while waiting the task to terminate {}.", taskName, e);
}
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalError(t);
logger.error("Error while canceling the task {}.", taskName, t);
}
// Early release of input and output buffer pools. We do this
// in order to unblock async Threads, which produce/consume the
// intermediate streams outside of the main Task Thread (like
// the Kafka consumer).
// Notes: 1) This does not mean to release all network resources,
// the task thread itself will release them; 2) We can not close
// ResultPartitions here because of possible race conditions with
// Task thread so we just call the fail here.
failAllResultPartitions();
closeAllInputGates();
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalError(t);
logger.error("Error in the task canceler for task {}.", taskName, t);
}
}
}
/** This thread sends the delayed, periodic interrupt calls to the executing thread. */
private static final class TaskInterrupter implements Runnable {
/** The logger to report on the fatal condition. */
private final Logger log;
/** The invokable task. */
private final TaskInvokable task;
/** The executing task thread that we wait for to terminate. */
private final Thread executerThread;
/** The name of the task, for logging purposes. */
private final String taskName;
/** The interval in which we interrupt. */
private final long interruptIntervalMillis;
TaskInterrupter(
Logger log,
TaskInvokable task,
Thread executerThread,
String taskName,
long interruptIntervalMillis) {
this.log = log;
this.task = task;
this.executerThread = executerThread;
this.taskName = taskName;
this.interruptIntervalMillis = interruptIntervalMillis;
}
@Override
public void run() {
try {
// we initially wait for one interval
// in most cases, the threads go away immediately (by the cancellation thread)
// and we need not actually do anything
executerThread.join(interruptIntervalMillis);
// log stack trace where the executing thread is stuck and
// interrupt the running thread periodically while it is still alive
while (executerThread.isAlive()) {
task.maybeInterruptOnCancel(executerThread, taskName, interruptIntervalMillis);
try {
executerThread.join(interruptIntervalMillis);
} catch (InterruptedException e) {
// we ignore this and fall through the loop
}
}
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalError(t);
log.error("Error in the task canceler for task {}.", taskName, t);
}
}
}
/**
* Watchdog for the cancellation. If the task thread does not go away gracefully within a
* certain time, we trigger a hard cancel action (notify TaskManager of fatal error, which in
* turn kills the process).
*/
private static class TaskCancelerWatchDog implements Runnable {
/** The executing task thread that we wait for to terminate. */
private final Thread executerThread;
/** The TaskManager to notify if cancellation does not happen in time. */
private final TaskManagerActions taskManager;
/** The timeout for cancellation. */
private final long timeoutMillis;
private final TaskInfo taskInfo;
TaskCancelerWatchDog(
TaskInfo taskInfo,
Thread executerThread,
TaskManagerActions taskManager,
long timeoutMillis) {
checkArgument(timeoutMillis > 0);
this.taskInfo = taskInfo;
this.executerThread = executerThread;
this.taskManager = taskManager;
this.timeoutMillis = timeoutMillis;
}
@Override
public void run() {
try {
final long hardKillDeadline = System.nanoTime() + timeoutMillis * 1_000_000;
long millisLeft;
while (executerThread.isAlive()
&& (millisLeft = (hardKillDeadline - System.nanoTime()) / 1_000_000) > 0) {
try {
executerThread.join(millisLeft);
} catch (InterruptedException ignored) {
// we don't react to interrupted exceptions, simply fall through the loop
}
}
if (executerThread.isAlive()) {
logTaskThreadStackTrace(
executerThread,
taskInfo.getTaskNameWithSubtasks(),
timeoutMillis,
"notifying TM");
String msg =
"Task did not exit gracefully within "
+ (timeoutMillis / 1000)
+ " + seconds.";
taskManager.notifyFatalError(msg, new FlinkRuntimeException(msg));
}
} catch (Throwable t) {
throw new FlinkRuntimeException("Error in Task Cancellation Watch Dog", t);
}
}
}
public static void logTaskThreadStackTrace(
Thread thread, String taskName, long timeoutMs, String action) {
StackTraceElement[] stack = thread.getStackTrace();
StringBuilder stackTraceStr = new StringBuilder();
for (StackTraceElement e : stack) {
stackTraceStr.append(e).append('\n');
}
LOG.warn(
"Task '{}' did not react to cancelling signal - {}; it is stuck for {} seconds in method:\n {}",
taskName,
action,
timeoutMs / 1000,
stackTraceStr);
}
}