blob: 6de67477749797672180492e76bc384318f5a1b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionInfo;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.ResourceManagerAddress;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.TaskBackPressureResponse;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.registration.RegistrationConnectionListener;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException;
import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
import org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider;
import org.apache.flink.runtime.taskexecutor.rpc.RpcKvStateRegistryListener;
import org.apache.flink.runtime.taskexecutor.rpc.RpcPartitionStateChecker;
import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlot;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* TaskExecutor implementation. The task executor is responsible for the execution of multiple
* {@link Task}.
*/
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
public static final String TASK_MANAGER_NAME = "taskmanager";
/** The access to the leader election and retrieval services. */
private final HighAvailabilityServices haServices;
private final TaskManagerServices taskExecutorServices;
/** The task manager configuration. */
private final TaskManagerConfiguration taskManagerConfiguration;
/** The fatal error handler to use in case of a fatal error. */
private final FatalErrorHandler fatalErrorHandler;
private final BlobCacheService blobCacheService;
/** The address to metric query service on this Task Manager. */
private final String metricQueryServiceAddress;
// --------- TaskManager services --------
/** The connection information of this task manager. */
private final TaskManagerLocation taskManagerLocation;
private final TaskManagerMetricGroup taskManagerMetricGroup;
/** The state manager for this task, providing state managers per slot. */
private final TaskExecutorLocalStateStoresManager localStateStoresManager;
/** The network component in the task manager. */
private final ShuffleEnvironment<?, ?> shuffleEnvironment;
/** The kvState registration service in the task manager. */
private final KvStateService kvStateService;
// --------- job manager connections -----------
private final Map<ResourceID, JobManagerConnection> jobManagerConnections;
// --------- task slot allocation table -----------
private final TaskSlotTable<Task> taskSlotTable;
private final JobManagerTable jobManagerTable;
private final JobLeaderService jobLeaderService;
private final LeaderRetrievalService resourceManagerLeaderRetriever;
// ------------------------------------------------------------------------
private final HardwareDescription hardwareDescription;
private FileCache fileCache;
/** The heartbeat manager for job manager in the task manager. */
private final HeartbeatManager<AllocatedSlotReport, AccumulatorReport> jobManagerHeartbeatManager;
/** The heartbeat manager for resource manager in the task manager. */
private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager;
private final TaskExecutorPartitionTracker partitionTracker;
private final BackPressureSampleService backPressureSampleService;
// --------- resource manager --------
@Nullable
private ResourceManagerAddress resourceManagerAddress;
@Nullable
private EstablishedResourceManagerConnection establishedResourceManagerConnection;
@Nullable
private TaskExecutorToResourceManagerConnection resourceManagerConnection;
@Nullable
private UUID currentRegistrationTimeoutId;
private Map<JobID, Collection<CompletableFuture<ExecutionState>>> taskResultPartitionCleanupFuturesPerJob = new HashMap<>(8);
public TaskExecutor(
RpcService rpcService,
TaskManagerConfiguration taskManagerConfiguration,
HighAvailabilityServices haServices,
TaskManagerServices taskExecutorServices,
HeartbeatServices heartbeatServices,
TaskManagerMetricGroup taskManagerMetricGroup,
String metricQueryServiceAddress,
BlobCacheService blobCacheService,
FatalErrorHandler fatalErrorHandler,
TaskExecutorPartitionTracker partitionTracker,
BackPressureSampleService backPressureSampleService) {
super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
checkArgument(taskManagerConfiguration.getNumberSlots() > 0, "The number of slots has to be larger than 0.");
this.taskManagerConfiguration = checkNotNull(taskManagerConfiguration);
this.taskExecutorServices = checkNotNull(taskExecutorServices);
this.haServices = checkNotNull(haServices);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.partitionTracker = partitionTracker;
this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
this.blobCacheService = checkNotNull(blobCacheService);
this.metricQueryServiceAddress = checkNotNull(metricQueryServiceAddress);
this.backPressureSampleService = checkNotNull(backPressureSampleService);
this.taskSlotTable = taskExecutorServices.getTaskSlotTable();
this.jobManagerTable = taskExecutorServices.getJobManagerTable();
this.jobLeaderService = taskExecutorServices.getJobLeaderService();
this.taskManagerLocation = taskExecutorServices.getTaskManagerLocation();
this.localStateStoresManager = taskExecutorServices.getTaskManagerStateStore();
this.shuffleEnvironment = taskExecutorServices.getShuffleEnvironment();
this.kvStateService = taskExecutorServices.getKvStateService();
this.resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
this.jobManagerConnections = new HashMap<>(4);
this.hardwareDescription = HardwareDescription.extractFromSystem(taskExecutorServices.getManagedMemorySize());
this.resourceManagerAddress = null;
this.resourceManagerConnection = null;
this.currentRegistrationTimeoutId = null;
final ResourceID resourceId = taskExecutorServices.getTaskManagerLocation().getResourceID();
this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices, resourceId);
this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
}
private HeartbeatManager<Void, TaskExecutorHeartbeatPayload> createResourceManagerHeartbeatManager(HeartbeatServices heartbeatServices, ResourceID resourceId) {
return heartbeatServices.createHeartbeatManager(
resourceId,
new ResourceManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
}
private HeartbeatManager<AllocatedSlotReport, AccumulatorReport> createJobManagerHeartbeatManager(HeartbeatServices heartbeatServices, ResourceID resourceId) {
return heartbeatServices.createHeartbeatManager(
resourceId,
new JobManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
}
@Override
public CompletableFuture<Boolean> canBeReleased() {
return CompletableFuture.completedFuture(shuffleEnvironment.getPartitionsOccupyingLocalResources().isEmpty());
}
// ------------------------------------------------------------------------
// Life cycle
// ------------------------------------------------------------------------
@Override
public void onStart() throws Exception {
try {
startTaskExecutorServices();
} catch (Exception e) {
final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), e);
onFatalError(exception);
throw exception;
}
startRegistrationTimeout();
}
private void startTaskExecutorServices() throws Exception {
try {
// start by connecting to the ResourceManager
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
// tell the task slot table who's responsible for the task slot actions
taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
// start the job leader service
jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
} catch (Exception e) {
handleStartTaskExecutorServicesException(e);
}
}
private void handleStartTaskExecutorServicesException(Exception e) throws Exception {
try {
stopTaskExecutorServices();
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
/**
* Called to shut down the TaskManager. The method closes all TaskManager services.
*/
@Override
public CompletableFuture<Void> onStop() {
log.info("Stopping TaskExecutor {}.", getAddress());
Throwable jobManagerDisconnectThrowable = null;
FlinkException cause = new FlinkException("The TaskExecutor is shutting down.");
closeResourceManagerConnection(cause);
for (JobManagerConnection jobManagerConnection : jobManagerConnections.values()) {
try {
disassociateFromJobManager(jobManagerConnection, cause);
} catch (Throwable t) {
jobManagerDisconnectThrowable = ExceptionUtils.firstOrSuppressed(t, jobManagerDisconnectThrowable);
}
}
final Throwable throwableBeforeTasksCompletion = jobManagerDisconnectThrowable;
return FutureUtils
.runAfterwards(
taskSlotTable.closeAsync(),
this::stopTaskExecutorServices)
.handle((ignored, throwable) -> {
handleOnStopException(throwableBeforeTasksCompletion, throwable);
return null;
});
}
private void handleOnStopException(Throwable throwableBeforeTasksCompletion, Throwable throwableAfterTasksCompletion) {
final Throwable throwable;
if (throwableBeforeTasksCompletion != null) {
throwable = ExceptionUtils.firstOrSuppressed(throwableBeforeTasksCompletion, throwableAfterTasksCompletion);
} else {
throwable = throwableAfterTasksCompletion;
}
if (throwable != null) {
throw new CompletionException(new FlinkException("Error while shutting the TaskExecutor down.", throwable));
} else {
log.info("Stopped TaskExecutor {}.", getAddress());
}
}
private void stopTaskExecutorServices() throws Exception {
Exception exception = null;
try {
jobLeaderService.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
try {
resourceManagerLeaderRetriever.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
try {
taskExecutorServices.shutDown();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
try {
fileCache.shutdown();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
// it will call close() recursively from the parent to children
taskManagerMetricGroup.close();
ExceptionUtils.tryRethrowException(exception);
}
// ======================================================================
// RPC methods
// ======================================================================
@Override
public CompletableFuture<TaskBackPressureResponse> requestTaskBackPressure(
ExecutionAttemptID executionAttemptId,
int requestId,
@RpcTimeout Time timeout) {
final Task task = taskSlotTable.getTask(executionAttemptId);
if (task == null) {
return FutureUtils.completedExceptionally(
new IllegalStateException(String.format("Cannot request back pressure of task %s. " +
"Task is not known to the task manager.", executionAttemptId)));
}
final CompletableFuture<Double> backPressureRatioFuture =
backPressureSampleService.sampleTaskBackPressure(task);
return backPressureRatioFuture.thenApply(backPressureRatio ->
new TaskBackPressureResponse(requestId, executionAttemptId, backPressureRatio));
}
// ----------------------------------------------------------------------
// Task lifecycle RPCs
// ----------------------------------------------------------------------
@Override
public CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd,
JobMasterId jobMasterId,
Time timeout) {
try {
final JobID jobId = tdd.getJobId();
final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId);
if (jobManagerConnection == null) {
final String message = "Could not submit task because there is no JobManager " +
"associated for the job " + jobId + '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
if (!Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId)) {
final String message = "Rejecting the task submission because the job manager leader id " +
jobMasterId + " does not match the expected job manager leader id " +
jobManagerConnection.getJobMasterId() + '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
if (!taskSlotTable.tryMarkSlotActive(jobId, tdd.getAllocationId())) {
final String message = "No task slot allocated for job ID " + jobId +
" and allocation ID " + tdd.getAllocationId() + '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
// re-integrate offloaded data:
try {
tdd.loadBigData(blobCacheService.getPermanentBlobService());
} catch (IOException | ClassNotFoundException e) {
throw new TaskSubmissionException("Could not re-integrate offloaded TaskDeploymentDescriptor data.", e);
}
// deserialize the pre-serialized information
final JobInformation jobInformation;
final TaskInformation taskInformation;
try {
jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new TaskSubmissionException("Could not deserialize the job or task information.", e);
}
if (!jobId.equals(jobInformation.getJobId())) {
throw new TaskSubmissionException(
"Inconsistent job ID information inside TaskDeploymentDescriptor (" +
tdd.getJobId() + " vs. " + jobInformation.getJobId() + ")");
}
TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(
jobInformation.getJobId(),
jobInformation.getJobName(),
taskInformation.getJobVertexId(),
tdd.getExecutionAttemptId(),
taskInformation.getTaskName(),
tdd.getSubtaskIndex(),
tdd.getAttemptNumber());
InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(
jobManagerConnection.getJobManagerGateway(),
taskInformation.getJobVertexId(),
tdd.getExecutionAttemptId(),
taskManagerConfiguration.getTimeout());
TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
GlobalAggregateManager aggregateManager = jobManagerConnection.getGlobalAggregateManager();
LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager();
ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask(
jobId,
tdd.getAllocationId(),
taskInformation.getJobVertexId(),
tdd.getSubtaskIndex());
final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();
final TaskStateManager taskStateManager = new TaskStateManagerImpl(
jobId,
tdd.getExecutionAttemptId(),
localStateStore,
taskRestore,
checkpointResponder);
MemoryManager memoryManager;
try {
memoryManager = taskSlotTable.getTaskMemoryManager(tdd.getAllocationId());
} catch (SlotNotFoundException e) {
throw new TaskSubmissionException("Could not submit task.", e);
}
Task task = new Task(
jobInformation,
taskInformation,
tdd.getExecutionAttemptId(),
tdd.getAllocationId(),
tdd.getSubtaskIndex(),
tdd.getAttemptNumber(),
tdd.getProducedPartitions(),
tdd.getInputGates(),
tdd.getTargetSlotNumber(),
memoryManager,
taskExecutorServices.getIOManager(),
taskExecutorServices.getShuffleEnvironment(),
taskExecutorServices.getKvStateService(),
taskExecutorServices.getBroadcastVariableManager(),
taskExecutorServices.getTaskEventDispatcher(),
taskStateManager,
taskManagerActions,
inputSplitProvider,
checkpointResponder,
aggregateManager,
blobCacheService,
libraryCache,
fileCache,
taskManagerConfiguration,
taskMetricGroup,
resultPartitionConsumableNotifier,
partitionStateChecker,
getRpcService().getExecutor());
taskMetricGroup.gauge(MetricNames.IS_BACKPRESSURED, task::isBackPressured);
log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());
boolean taskAdded;
try {
taskAdded = taskSlotTable.addTask(task);
} catch (SlotNotFoundException | SlotNotActiveException e) {
throw new TaskSubmissionException("Could not submit task.", e);
}
if (taskAdded) {
task.startTaskThread();
setupResultPartitionBookkeeping(
tdd.getJobId(),
tdd.getProducedPartitions(),
task.getTerminationFuture());
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message = "TaskManager already contains a task for id " +
task.getExecutionId() + '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
} catch (TaskSubmissionException e) {
return FutureUtils.completedExceptionally(e);
}
}
private void setupResultPartitionBookkeeping(
JobID jobId,
Collection<ResultPartitionDeploymentDescriptor> producedResultPartitions,
CompletableFuture<ExecutionState> terminationFuture) {
final Set<ResultPartitionID> partitionsRequiringRelease = filterPartitionsRequiringRelease(producedResultPartitions)
.peek(rpdd -> partitionTracker.startTrackingPartition(jobId, TaskExecutorPartitionInfo.from(rpdd)))
.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
.map(ShuffleDescriptor::getResultPartitionID)
.collect(Collectors.toSet());
final CompletableFuture<ExecutionState> taskTerminationWithResourceCleanupFuture =
terminationFuture.thenApplyAsync(
executionState -> {
if (executionState != ExecutionState.FINISHED) {
partitionTracker.stopTrackingPartitions(partitionsRequiringRelease);
}
return executionState;
},
getMainThreadExecutor());
taskResultPartitionCleanupFuturesPerJob.compute(
jobId,
(ignored, completableFutures) -> {
if (completableFutures == null) {
completableFutures = new ArrayList<>(4);
}
completableFutures.add(taskTerminationWithResourceCleanupFuture);
return completableFutures;
});
}
private Stream<ResultPartitionDeploymentDescriptor> filterPartitionsRequiringRelease(Collection<ResultPartitionDeploymentDescriptor> producedResultPartitions) {
return producedResultPartitions.stream()
// only blocking partitions require explicit release call
.filter(d -> d.getPartitionType().isBlocking())
// partitions without local resources don't store anything on the TaskExecutor
.filter(d -> d.getShuffleDescriptor().storesLocalResourcesOn().isPresent());
}
@Override
public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
try {
task.cancelExecution();
return CompletableFuture.completedFuture(Acknowledge.get());
} catch (Throwable t) {
return FutureUtils.completedExceptionally(
new TaskException("Cannot cancel task for execution " + executionAttemptID + '.', t));
}
} else {
final String message = "Cannot find task to stop for execution " + executionAttemptID + '.';
log.debug(message);
return FutureUtils.completedExceptionally(new TaskException(message));
}
}
// ----------------------------------------------------------------------
// Partition lifecycle RPCs
// ----------------------------------------------------------------------
@Override
public CompletableFuture<Acknowledge> updatePartitions(
final ExecutionAttemptID executionAttemptID,
Iterable<PartitionInfo> partitionInfos,
Time timeout) {
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
for (final PartitionInfo partitionInfo: partitionInfos) {
// Run asynchronously because it might be blocking
FutureUtils.assertNoException(
CompletableFuture.runAsync(
() -> {
try {
if (!shuffleEnvironment.updatePartitionInfo(executionAttemptID, partitionInfo)) {
log.debug(
"Discard update for input gate partition {} of result {} in task {}. " +
"The partition is no longer available.",
partitionInfo.getShuffleDescriptor().getResultPartitionID(),
partitionInfo.getIntermediateDataSetID(),
executionAttemptID);
}
} catch (IOException | InterruptedException e) {
log.error(
"Could not update input data location for task {}. Trying to fail task.",
task.getTaskInfo().getTaskName(),
e);
task.failExternally(e);
}
},
getRpcService().getExecutor()));
}
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
log.debug("Discard update for input partitions of task {}. Task is no longer running.", executionAttemptID);
return CompletableFuture.completedFuture(Acknowledge.get());
}
}
@Override
public void releaseOrPromotePartitions(JobID jobId, Set<ResultPartitionID> partitionToRelease, Set<ResultPartitionID> partitionsToPromote) {
try {
partitionTracker.stopTrackingAndReleaseJobPartitions(partitionToRelease);
partitionTracker.promoteJobPartitions(partitionsToPromote);
closeJobManagerConnectionIfNoAllocatedResources(jobId);
} catch (Throwable t) {
// TODO: Do we still need this catch branch?
onFatalError(t);
}
// TODO: Maybe it's better to return an Acknowledge here to notify the JM about the success/failure with an Exception
}
// ----------------------------------------------------------------------
// Heartbeat RPC
// ----------------------------------------------------------------------
@Override
public void heartbeatFromJobManager(ResourceID resourceID, AllocatedSlotReport allocatedSlotReport) {
jobManagerHeartbeatManager.requestHeartbeat(resourceID, allocatedSlotReport);
}
@Override
public void heartbeatFromResourceManager(ResourceID resourceID) {
resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}
// ----------------------------------------------------------------------
// Checkpointing RPCs
// ----------------------------------------------------------------------
@Override
public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) {
log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
}
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';
log.debug(message);
return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
}
}
@Override
public CompletableFuture<Acknowledge> confirmCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp) {
log.debug("Confirm checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
task.notifyCheckpointComplete(checkpointId);
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message = "TaskManager received a checkpoint confirmation for unknown task " + executionAttemptID + '.';
log.debug(message);
return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
}
}
// ----------------------------------------------------------------------
// Slot allocation RPCs
// ----------------------------------------------------------------------
@Override
public CompletableFuture<Acknowledge> requestSlot(
final SlotID slotId,
final JobID jobId,
final AllocationID allocationId,
final ResourceProfile resourceProfile,
final String targetAddress,
final ResourceManagerId resourceManagerId,
final Time timeout) {
// TODO: Filter invalid requests from the resource manager by using the instance/registration Id
log.info("Receive slot request {} for job {} from resource manager with leader id {}.",
allocationId, jobId, resourceManagerId);
try {
if (!isConnectedToResourceManager(resourceManagerId)) {
final String message = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId);
log.debug(message);
throw new TaskManagerException(message);
}
if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, resourceProfile, taskManagerConfiguration.getTimeout())) {
log.info("Allocated slot for {}.", allocationId);
} else {
log.info("Could not allocate slot for {}.", allocationId);
throw new SlotAllocationException("Could not allocate slot.");
}
} else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
final String message = "The slot " + slotId + " has already been allocated for a different job.";
log.info(message);
final AllocationID allocationID = taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
throw new SlotOccupiedException(message, allocationID, taskSlotTable.getOwningJob(allocationID));
}
if (jobManagerTable.contains(jobId)) {
offerSlotsToJobManager(jobId);
} else {
try {
jobLeaderService.addJob(jobId, targetAddress);
} catch (Exception e) {
// free the allocated slot
try {
taskSlotTable.freeSlot(allocationId);
} catch (SlotNotFoundException slotNotFoundException) {
// slot no longer existent, this should actually never happen, because we've
// just allocated the slot. So let's fail hard in this case!
onFatalError(slotNotFoundException);
}
// release local state under the allocation id.
localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
// sanity check
if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
onFatalError(new Exception("Could not free slot " + slotId));
}
throw new SlotAllocationException("Could not add job to job leader service.", e);
}
}
} catch (TaskManagerException taskManagerException) {
return FutureUtils.completedExceptionally(taskManagerException);
}
return CompletableFuture.completedFuture(Acknowledge.get());
}
@Override
public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
freeSlotInternal(allocationId, cause);
return CompletableFuture.completedFuture(Acknowledge.get());
}
@Override
public CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType, Time timeout) {
log.debug("Request file {} upload.", fileType);
final String filePath;
switch (fileType) {
case LOG:
filePath = taskManagerConfiguration.getTaskManagerLogPath();
break;
case STDOUT:
filePath = taskManagerConfiguration.getTaskManagerStdoutPath();
break;
default:
filePath = null;
}
if (filePath != null && !filePath.isEmpty()) {
final File file = new File(filePath);
if (file.exists()) {
final TransientBlobCache transientBlobService = blobCacheService.getTransientBlobService();
final TransientBlobKey transientBlobKey;
try (FileInputStream fileInputStream = new FileInputStream(file)) {
transientBlobKey = transientBlobService.putTransient(fileInputStream);
} catch (IOException e) {
log.debug("Could not upload file {}.", fileType, e);
return FutureUtils.completedExceptionally(new FlinkException("Could not upload file " + fileType + '.', e));
}
return CompletableFuture.completedFuture(transientBlobKey);
} else {
log.debug("The file {} does not exist on the TaskExecutor {}.", fileType, getResourceID());
return FutureUtils.completedExceptionally(new FlinkException("The file " + fileType + " does not exist on the TaskExecutor."));
}
} else {
log.debug("The file {} is unavailable on the TaskExecutor {}.", fileType, getResourceID());
return FutureUtils.completedExceptionally(new FlinkException("The file " + fileType + " is not available on the TaskExecutor."));
}
}
@Override
public CompletableFuture<SerializableOptional<String>> requestMetricQueryServiceAddress(Time timeout) {
return CompletableFuture.completedFuture(SerializableOptional.ofNullable(metricQueryServiceAddress));
}
// ----------------------------------------------------------------------
// Disconnection RPCs
// ----------------------------------------------------------------------
@Override
public void disconnectJobManager(JobID jobId, Exception cause) {
closeJobManagerConnection(jobId, cause);
jobLeaderService.reconnect(jobId);
}
@Override
public void disconnectResourceManager(Exception cause) {
if (isRunning()) {
reconnectToResourceManager(cause);
}
}
// ======================================================================
// Internal methods
// ======================================================================
// ------------------------------------------------------------------------
// Internal resource manager connection methods
// ------------------------------------------------------------------------
private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
resourceManagerAddress = createResourceManagerAddress(newLeaderAddress, newResourceManagerId);
reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
}
@Nullable
private ResourceManagerAddress createResourceManagerAddress(@Nullable String newLeaderAddress, @Nullable ResourceManagerId newResourceManagerId) {
if (newLeaderAddress == null) {
return null;
} else {
assert(newResourceManagerId != null);
return new ResourceManagerAddress(newLeaderAddress, newResourceManagerId);
}
}
private void reconnectToResourceManager(Exception cause) {
closeResourceManagerConnection(cause);
startRegistrationTimeout();
tryConnectToResourceManager();
}
private void tryConnectToResourceManager() {
if (resourceManagerAddress != null) {
connectToResourceManager();
}
}
private void connectToResourceManager() {
assert(resourceManagerAddress != null);
assert(establishedResourceManagerConnection == null);
assert(resourceManagerConnection == null);
log.info("Connecting to ResourceManager {}.", resourceManagerAddress);
final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(
getAddress(),
getResourceID(),
taskManagerLocation.dataPort(),
hardwareDescription,
taskManagerConfiguration.getDefaultSlotResourceProfile(),
taskManagerConfiguration.getTotalResourceProfile()
);
resourceManagerConnection =
new TaskExecutorToResourceManagerConnection(
log,
getRpcService(),
taskManagerConfiguration.getRetryingRegistrationConfiguration(),
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
getMainThreadExecutor(),
new ResourceManagerRegistrationListener(),
taskExecutorRegistration);
resourceManagerConnection.start();
}
private void establishResourceManagerConnection(
ResourceManagerGateway resourceManagerGateway,
ResourceID resourceManagerResourceId,
InstanceID taskExecutorRegistrationId,
ClusterInformation clusterInformation) {
final CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport(
getResourceID(),
taskExecutorRegistrationId,
taskSlotTable.createSlotReport(getResourceID()),
taskManagerConfiguration.getTimeout());
slotReportResponseFuture.whenCompleteAsync(
(acknowledge, throwable) -> {
if (throwable != null) {
reconnectToResourceManager(new TaskManagerException("Failed to send initial slot report to ResourceManager.", throwable));
}
}, getMainThreadExecutor());
// monitor the resource manager as heartbeat target
resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<TaskExecutorHeartbeatPayload>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {
resourceManagerGateway.heartbeatFromTaskManager(resourceID, heartbeatPayload);
}
@Override
public void requestHeartbeat(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {
// the TaskManager won't send heartbeat requests to the ResourceManager
}
});
// set the propagated blob server address
final InetSocketAddress blobServerAddress = new InetSocketAddress(
clusterInformation.getBlobServerHostname(),
clusterInformation.getBlobServerPort());
blobCacheService.setBlobServerAddress(blobServerAddress);
establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
resourceManagerGateway,
resourceManagerResourceId,
taskExecutorRegistrationId);
stopRegistrationTimeout();
}
private void closeResourceManagerConnection(Exception cause) {
if (establishedResourceManagerConnection != null) {
final ResourceID resourceManagerResourceId = establishedResourceManagerConnection.getResourceManagerResourceId();
if (log.isDebugEnabled()) {
log.debug("Close ResourceManager connection {}.",
resourceManagerResourceId, cause);
} else {
log.info("Close ResourceManager connection {}.",
resourceManagerResourceId);
}
resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceId);
ResourceManagerGateway resourceManagerGateway = establishedResourceManagerConnection.getResourceManagerGateway();
resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
establishedResourceManagerConnection = null;
partitionTracker.stopTrackingAndReleaseAllClusterPartitions();
}
if (resourceManagerConnection != null) {
if (!resourceManagerConnection.isConnected()) {
if (log.isDebugEnabled()) {
log.debug("Terminating registration attempts towards ResourceManager {}.",
resourceManagerConnection.getTargetAddress(), cause);
} else {
log.info("Terminating registration attempts towards ResourceManager {}.",
resourceManagerConnection.getTargetAddress());
}
}
resourceManagerConnection.close();
resourceManagerConnection = null;
}
}
private void startRegistrationTimeout() {
final Time maxRegistrationDuration = taskManagerConfiguration.getMaxRegistrationDuration();
if (maxRegistrationDuration != null) {
final UUID newRegistrationTimeoutId = UUID.randomUUID();
currentRegistrationTimeoutId = newRegistrationTimeoutId;
scheduleRunAsync(() -> registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration);
}
}
private void stopRegistrationTimeout() {
currentRegistrationTimeoutId = null;
}
private void registrationTimeout(@Nonnull UUID registrationTimeoutId) {
if (registrationTimeoutId.equals(currentRegistrationTimeoutId)) {
final Time maxRegistrationDuration = taskManagerConfiguration.getMaxRegistrationDuration();
onFatalError(
new RegistrationTimeoutException(
String.format("Could not register at the ResourceManager within the specified maximum " +
"registration duration %s. This indicates a problem with this instance. Terminating now.",
maxRegistrationDuration)));
}
}
// ------------------------------------------------------------------------
// Internal job manager connection methods
// ------------------------------------------------------------------------
private void offerSlotsToJobManager(final JobID jobId) {
final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId);
if (jobManagerConnection == null) {
log.debug("There is no job manager connection to the leader of job {}.", jobId);
} else {
if (taskSlotTable.hasAllocatedSlots(jobId)) {
log.info("Offer reserved slots to the leader of job {}.", jobId);
final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();
final Iterator<TaskSlot<Task>> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
final JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();
final Collection<SlotOffer> reservedSlots = new HashSet<>(2);
while (reservedSlotsIterator.hasNext()) {
SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
reservedSlots.add(offer);
}
CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
getResourceID(),
reservedSlots,
taskManagerConfiguration.getTimeout());
acceptedSlotsFuture.whenCompleteAsync(
handleAcceptedSlotOffers(jobId, jobMasterGateway, jobMasterId, reservedSlots),
getMainThreadExecutor());
} else {
log.debug("There are no unassigned slots for the job {}.", jobId);
}
}
}
@Nonnull
private BiConsumer<Iterable<SlotOffer>, Throwable> handleAcceptedSlotOffers(JobID jobId, JobMasterGateway jobMasterGateway, JobMasterId jobMasterId, Collection<SlotOffer> offeredSlots) {
return (Iterable<SlotOffer> acceptedSlots, Throwable throwable) -> {
if (throwable != null) {
if (throwable instanceof TimeoutException) {
log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering.");
// We ran into a timeout. Try again.
offerSlotsToJobManager(jobId);
} else {
log.warn("Slot offering to JobManager failed. Freeing the slots " +
"and returning them to the ResourceManager.", throwable);
// We encountered an exception. Free the slots and return them to the RM.
for (SlotOffer reservedSlot: offeredSlots) {
freeSlotInternal(reservedSlot.getAllocationId(), throwable);
}
}
} else {
// check if the response is still valid
if (isJobManagerConnectionValid(jobId, jobMasterId)) {
// mark accepted slots active
for (SlotOffer acceptedSlot : acceptedSlots) {
try {
if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) {
// the slot is either free or releasing at the moment
final String message = "Could not mark slot " + jobId + " active.";
log.debug(message);
jobMasterGateway.failSlot(
getResourceID(),
acceptedSlot.getAllocationId(),
new FlinkException(message));
}
} catch (SlotNotFoundException e) {
final String message = "Could not mark slot " + jobId + " active.";
jobMasterGateway.failSlot(
getResourceID(),
acceptedSlot.getAllocationId(),
new FlinkException(message));
}
offeredSlots.remove(acceptedSlot);
}
final Exception e = new Exception("The slot was rejected by the JobManager.");
for (SlotOffer rejectedSlot : offeredSlots) {
freeSlotInternal(rejectedSlot.getAllocationId(), e);
}
} else {
// discard the response since there is a new leader for the job
log.debug("Discard offer slot response since there is a new leader " +
"for the job {}.", jobId);
}
}
};
}
private void establishJobManagerConnection(JobID jobId, final JobMasterGateway jobMasterGateway, JMTMRegistrationSuccess registrationSuccess) {
if (jobManagerTable.contains(jobId)) {
JobManagerConnection oldJobManagerConnection = jobManagerTable.get(jobId);
if (Objects.equals(oldJobManagerConnection.getJobMasterId(), jobMasterGateway.getFencingToken())) {
// we already are connected to the given job manager
log.debug("Ignore JobManager gained leadership message for {} because we are already connected to it.", jobMasterGateway.getFencingToken());
return;
} else {
closeJobManagerConnection(jobId, new Exception("Found new job leader for job id " + jobId + '.'));
}
}
log.info("Establish JobManager connection for job {}.", jobId);
ResourceID jobManagerResourceID = registrationSuccess.getResourceID();
JobManagerConnection newJobManagerConnection = associateWithJobManager(
jobId,
jobManagerResourceID,
jobMasterGateway);
jobManagerConnections.put(jobManagerResourceID, newJobManagerConnection);
jobManagerTable.put(jobId, newJobManagerConnection);
// monitor the job manager as heartbeat target
jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<AccumulatorReport>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, AccumulatorReport payload) {
jobMasterGateway.heartbeatFromTaskManager(resourceID, payload);
}
@Override
public void requestHeartbeat(ResourceID resourceID, AccumulatorReport payload) {
// request heartbeat will never be called on the task manager side
}
});
offerSlotsToJobManager(jobId);
}
private void closeJobManagerConnection(JobID jobId, Exception cause) {
if (log.isDebugEnabled()) {
log.debug("Close JobManager connection for job {}.", jobId, cause);
} else {
log.info("Close JobManager connection for job {}.", jobId);
}
// 1. fail tasks running under this JobID
Iterator<Task> tasks = taskSlotTable.getTasks(jobId);
final FlinkException failureCause = new FlinkException("JobManager responsible for " + jobId +
" lost the leadership.", cause);
while (tasks.hasNext()) {
tasks.next().failExternally(failureCause);
}
// 2. Move the active slots to state allocated (possible to time out again)
Iterator<AllocationID> activeSlots = taskSlotTable.getActiveSlots(jobId);
final FlinkException freeingCause = new FlinkException("Slot could not be marked inactive.");
while (activeSlots.hasNext()) {
AllocationID activeSlot = activeSlots.next();
try {
if (!taskSlotTable.markSlotInactive(activeSlot, taskManagerConfiguration.getTimeout())) {
freeSlotInternal(activeSlot, freeingCause);
}
} catch (SlotNotFoundException e) {
log.debug("Could not mark the slot {} inactive.", jobId, e);
}
}
// 3. Disassociate from the JobManager
JobManagerConnection jobManagerConnection = jobManagerTable.remove(jobId);
if (jobManagerConnection != null) {
try {
jobManagerHeartbeatManager.unmonitorTarget(jobManagerConnection.getResourceID());
jobManagerConnections.remove(jobManagerConnection.getResourceID());
disassociateFromJobManager(jobManagerConnection, cause);
} catch (IOException e) {
log.warn("Could not properly disassociate from JobManager {}.",
jobManagerConnection.getJobManagerGateway().getAddress(), e);
}
}
}
private JobManagerConnection associateWithJobManager(
JobID jobID,
ResourceID resourceID,
JobMasterGateway jobMasterGateway) {
checkNotNull(jobID);
checkNotNull(resourceID);
checkNotNull(jobMasterGateway);
TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobMasterGateway);
CheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway);
GlobalAggregateManager aggregateManager = new RpcGlobalAggregateManager(jobMasterGateway);
final LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(
blobCacheService.getPermanentBlobService(),
taskManagerConfiguration.getClassLoaderResolveOrder(),
taskManagerConfiguration.getAlwaysParentFirstLoaderPatterns());
ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(
jobMasterGateway,
getRpcService().getExecutor(),
taskManagerConfiguration.getTimeout());
PartitionProducerStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobMasterGateway);
registerQueryableState(jobID, jobMasterGateway);
return new JobManagerConnection(
jobID,
resourceID,
jobMasterGateway,
taskManagerActions,
checkpointResponder,
aggregateManager,
libraryCacheManager,
resultPartitionConsumableNotifier,
partitionStateChecker);
}
private void disassociateFromJobManager(JobManagerConnection jobManagerConnection, Exception cause) throws IOException {
checkNotNull(jobManagerConnection);
final JobID jobId = jobManagerConnection.getJobID();
// cleanup remaining partitions once all tasks for this job have completed
scheduleResultPartitionCleanup(jobId);
final KvStateRegistry kvStateRegistry = kvStateService.getKvStateRegistry();
if (kvStateRegistry != null) {
kvStateRegistry.unregisterListener(jobId);
}
final KvStateClientProxy kvStateClientProxy = kvStateService.getKvStateClientProxy();
if (kvStateClientProxy != null) {
kvStateClientProxy.updateKvStateLocationOracle(jobManagerConnection.getJobID(), null);
}
JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
jobManagerGateway.disconnectTaskManager(getResourceID(), cause);
jobManagerConnection.getLibraryCacheManager().shutdown();
}
private void scheduleResultPartitionCleanup(JobID jobId) {
final Collection<CompletableFuture<ExecutionState>> taskTerminationFutures = taskResultPartitionCleanupFuturesPerJob.remove(jobId);
if (taskTerminationFutures != null) {
FutureUtils.waitForAll(taskTerminationFutures)
.thenRunAsync(() -> {
partitionTracker.stopTrackingAndReleaseJobPartitionsFor(jobId);
}, getMainThreadExecutor());
}
}
private void registerQueryableState(JobID jobId, JobMasterGateway jobMasterGateway) {
final KvStateServer kvStateServer = kvStateService.getKvStateServer();
final KvStateRegistry kvStateRegistry = kvStateService.getKvStateRegistry();
if (kvStateServer != null && kvStateRegistry != null) {
kvStateRegistry.registerListener(
jobId,
new RpcKvStateRegistryListener(
jobMasterGateway,
kvStateServer.getServerAddress()));
}
final KvStateClientProxy kvStateProxy = kvStateService.getKvStateClientProxy();
if (kvStateProxy != null) {
kvStateProxy.updateKvStateLocationOracle(jobId, jobMasterGateway);
}
}
// ------------------------------------------------------------------------
// Internal task methods
// ------------------------------------------------------------------------
private void failTask(final ExecutionAttemptID executionAttemptID, final Throwable cause) {
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
try {
task.failExternally(cause);
} catch (Throwable t) {
log.error("Could not fail task {}.", executionAttemptID, t);
}
} else {
log.debug("Cannot find task to fail for execution {}.", executionAttemptID);
}
}
private void updateTaskExecutionState(
final JobMasterGateway jobMasterGateway,
final TaskExecutionState taskExecutionState) {
final ExecutionAttemptID executionAttemptID = taskExecutionState.getID();
CompletableFuture<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(taskExecutionState);
futureAcknowledge.whenCompleteAsync(
(ack, throwable) -> {
if (throwable != null) {
failTask(executionAttemptID, throwable);
}
},
getMainThreadExecutor());
}
private void unregisterTaskAndNotifyFinalState(
final JobMasterGateway jobMasterGateway,
final ExecutionAttemptID executionAttemptID) {
Task task = taskSlotTable.removeTask(executionAttemptID);
if (task != null) {
if (!task.getExecutionState().isTerminal()) {
try {
task.failExternally(new IllegalStateException("Task is being remove from TaskManager."));
} catch (Exception e) {
log.error("Could not properly fail task.", e);
}
}
log.info("Un-registering task and sending final execution state {} to JobManager for task {} {}.",
task.getExecutionState(), task.getTaskInfo().getTaskName(), task.getExecutionId());
AccumulatorSnapshot accumulatorSnapshot = task.getAccumulatorRegistry().getSnapshot();
updateTaskExecutionState(
jobMasterGateway,
new TaskExecutionState(
task.getJobID(),
task.getExecutionId(),
task.getExecutionState(),
task.getFailureCause(),
accumulatorSnapshot,
task.getMetricGroup().getIOMetricGroup().createSnapshot()));
} else {
log.error("Cannot find task with ID {} to unregister.", executionAttemptID);
}
}
private void freeSlotInternal(AllocationID allocationId, Throwable cause) {
checkNotNull(allocationId);
log.debug("Free slot with allocation id {} because: {}", allocationId, cause.getMessage());
try {
final JobID jobId = taskSlotTable.getOwningJob(allocationId);
final int slotIndex = taskSlotTable.freeSlot(allocationId, cause);
if (slotIndex != -1) {
if (isConnectedToResourceManager()) {
// the slot was freed. Tell the RM about it
ResourceManagerGateway resourceManagerGateway = establishedResourceManagerConnection.getResourceManagerGateway();
resourceManagerGateway.notifySlotAvailable(
establishedResourceManagerConnection.getTaskExecutorRegistrationId(),
new SlotID(getResourceID(), slotIndex),
allocationId);
}
if (jobId != null) {
closeJobManagerConnectionIfNoAllocatedResources(jobId);
}
}
} catch (SlotNotFoundException e) {
log.debug("Could not free slot for allocation id {}.", allocationId, e);
}
localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
}
private void closeJobManagerConnectionIfNoAllocatedResources(JobID jobId) {
// check whether we still have allocated slots for the same job
if (taskSlotTable.getAllocationIdsPerJob(jobId).isEmpty() && !partitionTracker.isTrackingPartitionsFor(jobId)) {
// we can remove the job from the job leader service
try {
jobLeaderService.removeJob(jobId);
} catch (Exception e) {
log.info("Could not remove job {} from JobLeaderService.", jobId, e);
}
closeJobManagerConnection(
jobId,
new FlinkException("TaskExecutor " + getAddress() +
" has no more allocated slots for job " + jobId + '.'));
}
}
private void timeoutSlot(AllocationID allocationId, UUID ticket) {
checkNotNull(allocationId);
checkNotNull(ticket);
if (taskSlotTable.isValidTimeout(allocationId, ticket)) {
freeSlotInternal(allocationId, new Exception("The slot " + allocationId + " has timed out."));
} else {
log.debug("Received an invalid timeout for allocation id {} with ticket {}.", allocationId, ticket);
}
}
/**
* Syncs the TaskExecutor's view on its allocated slots with the JobMaster's view.
* Slots which are no longer reported by the JobMaster are being freed.
* Slots which the JobMaster thinks it still owns but which are no longer allocated to it
* will be failed via {@link JobMasterGateway#failSlot}.
*
* @param allocatedSlotReport represents the JobMaster's view on the current slot allocation state
*/
private void syncSlotsWithSnapshotFromJobMaster(AllocatedSlotReport allocatedSlotReport) {
JobManagerConnection jobManagerConnection = jobManagerTable.get(allocatedSlotReport.getJobId());
if (jobManagerConnection != null) {
final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();
failNoLongerAllocatedSlots(allocatedSlotReport, jobMasterGateway);
freeNoLongerUsedSlots(allocatedSlotReport);
} else {
log.debug("Ignoring allocated slot report from job {} because there is no active leader.",
allocatedSlotReport.getJobId());
}
}
private void failNoLongerAllocatedSlots(AllocatedSlotReport allocatedSlotReport, JobMasterGateway jobMasterGateway) {
for (AllocatedSlotInfo allocatedSlotInfo : allocatedSlotReport.getAllocatedSlotInfos()) {
final AllocationID allocationId = allocatedSlotInfo.getAllocationId();
if (!taskSlotTable.isAllocated(
allocatedSlotInfo.getSlotIndex(),
allocatedSlotReport.getJobId(),
allocationId)) {
jobMasterGateway.failSlot(
getResourceID(),
allocationId,
new FlinkException(
String.format(
"Slot %s on TaskExecutor %s is not allocated by job %s.",
allocatedSlotInfo.getSlotIndex(),
getResourceID(),
allocatedSlotReport.getJobId())));
}
}
}
private void freeNoLongerUsedSlots(AllocatedSlotReport allocatedSlotReport) {
final Iterator<AllocationID> slotsTaskManagerSide = taskSlotTable.getActiveSlots(allocatedSlotReport.getJobId());
final Set<AllocationID> activeSlots = Sets.newHashSet(slotsTaskManagerSide);
final Set<AllocationID> reportedSlots = allocatedSlotReport.getAllocatedSlotInfos().stream()
.map(AllocatedSlotInfo::getAllocationId).collect(Collectors.toSet());
final Sets.SetView<AllocationID> difference = Sets.difference(activeSlots, reportedSlots);
for (AllocationID allocationID : difference) {
freeSlotInternal(
allocationID,
new FlinkException(
String.format("%s is no longer allocated by job %s.", allocationID, allocatedSlotReport.getJobId())));
}
}
// ------------------------------------------------------------------------
// Internal utility methods
// ------------------------------------------------------------------------
private boolean isConnectedToResourceManager() {
return establishedResourceManagerConnection != null;
}
private boolean isConnectedToResourceManager(ResourceManagerId resourceManagerId) {
return establishedResourceManagerConnection != null && resourceManagerAddress != null && resourceManagerAddress.getResourceManagerId().equals(resourceManagerId);
}
private boolean isJobManagerConnectionValid(JobID jobId, JobMasterId jobMasterId) {
JobManagerConnection jmConnection = jobManagerTable.get(jobId);
return jmConnection != null && Objects.equals(jmConnection.getJobMasterId(), jobMasterId);
}
// ------------------------------------------------------------------------
// Properties
// ------------------------------------------------------------------------
public ResourceID getResourceID() {
return taskManagerLocation.getResourceID();
}
// ------------------------------------------------------------------------
// Error Handling
// ------------------------------------------------------------------------
/**
* Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed.
*
* @param t The exception describing the fatal error
*/
void onFatalError(final Throwable t) {
try {
log.error("Fatal error occurred in TaskExecutor {}.", getAddress(), t);
} catch (Throwable ignored) {}
// The fatal error handler implementation should make sure that this call is non-blocking
fatalErrorHandler.onFatalError(t);
}
// ------------------------------------------------------------------------
// Access to fields for testing
// ------------------------------------------------------------------------
@VisibleForTesting
TaskExecutorToResourceManagerConnection getResourceManagerConnection() {
return resourceManagerConnection;
}
@VisibleForTesting
HeartbeatManager<Void, TaskExecutorHeartbeatPayload> getResourceManagerHeartbeatManager() {
return resourceManagerHeartbeatManager;
}
// ------------------------------------------------------------------------
// Utility classes
// ------------------------------------------------------------------------
/**
* The listener for leader changes of the resource manager.
*/
private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
runAsync(
() -> notifyOfNewResourceManagerLeader(
leaderAddress,
ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
@Override
public void handleError(Exception exception) {
onFatalError(exception);
}
}
private final class JobLeaderListenerImpl implements JobLeaderListener {
@Override
public void jobManagerGainedLeadership(
final JobID jobId,
final JobMasterGateway jobManagerGateway,
final JMTMRegistrationSuccess registrationMessage) {
runAsync(
() ->
establishJobManagerConnection(
jobId,
jobManagerGateway,
registrationMessage));
}
@Override
public void jobManagerLostLeadership(final JobID jobId, final JobMasterId jobMasterId) {
log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobMasterId);
runAsync(() ->
closeJobManagerConnection(
jobId,
new Exception("Job leader for job id " + jobId + " lost leadership.")));
}
@Override
public void handleError(Throwable throwable) {
onFatalError(throwable);
}
}
private final class ResourceManagerRegistrationListener implements RegistrationConnectionListener<TaskExecutorToResourceManagerConnection, TaskExecutorRegistrationSuccess> {
@Override
public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection connection, TaskExecutorRegistrationSuccess success) {
final ResourceID resourceManagerId = success.getResourceManagerId();
final InstanceID taskExecutorRegistrationId = success.getRegistrationId();
final ClusterInformation clusterInformation = success.getClusterInformation();
final ResourceManagerGateway resourceManagerGateway = connection.getTargetGateway();
runAsync(
() -> {
// filter out outdated connections
//noinspection ObjectEquality
if (resourceManagerConnection == connection) {
establishResourceManagerConnection(
resourceManagerGateway,
resourceManagerId,
taskExecutorRegistrationId,
clusterInformation);
}
});
}
@Override
public void onRegistrationFailure(Throwable failure) {
onFatalError(failure);
}
}
private final class TaskManagerActionsImpl implements TaskManagerActions {
private final JobMasterGateway jobMasterGateway;
private TaskManagerActionsImpl(JobMasterGateway jobMasterGateway) {
this.jobMasterGateway = checkNotNull(jobMasterGateway);
}
@Override
public void notifyFatalError(String message, Throwable cause) {
try {
log.error(message, cause);
} catch (Throwable ignored) {}
// The fatal error handler implementation should make sure that this call is non-blocking
fatalErrorHandler.onFatalError(cause);
}
@Override
public void failTask(final ExecutionAttemptID executionAttemptID, final Throwable cause) {
runAsync(() -> TaskExecutor.this.failTask(executionAttemptID, cause));
}
@Override
public void updateTaskExecutionState(final TaskExecutionState taskExecutionState) {
if (taskExecutionState.getExecutionState().isTerminal()) {
runAsync(() -> unregisterTaskAndNotifyFinalState(jobMasterGateway, taskExecutionState.getID()));
} else {
TaskExecutor.this.updateTaskExecutionState(jobMasterGateway, taskExecutionState);
}
}
}
private class SlotActionsImpl implements SlotActions {
@Override
public void freeSlot(final AllocationID allocationId) {
runAsync(() ->
freeSlotInternal(
allocationId,
new FlinkException("TaskSlotTable requested freeing the TaskSlot " + allocationId + '.')));
}
@Override
public void timeoutSlot(final AllocationID allocationId, final UUID ticket) {
runAsync(() -> TaskExecutor.this.timeoutSlot(allocationId, ticket));
}
}
private class JobManagerHeartbeatListener implements HeartbeatListener<AllocatedSlotReport, AccumulatorReport> {
@Override
public void notifyHeartbeatTimeout(final ResourceID resourceID) {
validateRunsInMainThread();
log.info("The heartbeat of JobManager with id {} timed out.", resourceID);
if (jobManagerConnections.containsKey(resourceID)) {
JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
if (jobManagerConnection != null) {
closeJobManagerConnection(
jobManagerConnection.getJobID(),
new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
jobLeaderService.reconnect(jobManagerConnection.getJobID());
}
}
}
@Override
public void reportPayload(ResourceID resourceID, AllocatedSlotReport allocatedSlotReport) {
validateRunsInMainThread();
syncSlotsWithSnapshotFromJobMaster(allocatedSlotReport);
}
@Override
public AccumulatorReport retrievePayload(ResourceID resourceID) {
validateRunsInMainThread();
JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
if (jobManagerConnection != null) {
JobID jobId = jobManagerConnection.getJobID();
List<AccumulatorSnapshot> accumulatorSnapshots = new ArrayList<>(16);
Iterator<Task> allTasks = taskSlotTable.getTasks(jobId);
while (allTasks.hasNext()) {
Task task = allTasks.next();
accumulatorSnapshots.add(task.getAccumulatorRegistry().getSnapshot());
}
return new AccumulatorReport(accumulatorSnapshots);
} else {
return new AccumulatorReport(Collections.emptyList());
}
}
}
private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, TaskExecutorHeartbeatPayload> {
@Override
public void notifyHeartbeatTimeout(final ResourceID resourceId) {
validateRunsInMainThread();
// first check whether the timeout is still valid
if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceId().equals(resourceId)) {
log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
reconnectToResourceManager(new TaskManagerException(
String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId)));
} else {
log.debug("Received heartbeat timeout for outdated ResourceManager id {}. Ignoring the timeout.", resourceId);
}
}
@Override
public void reportPayload(ResourceID resourceID, Void payload) {
// nothing to do since the payload is of type Void
}
@Override
public TaskExecutorHeartbeatPayload retrievePayload(ResourceID resourceID) {
validateRunsInMainThread();
return new TaskExecutorHeartbeatPayload(taskSlotTable.createSlotReport(getResourceID()), partitionTracker.createClusterPartitionReport());
}
}
}