blob: dbb0e4acdf493d5a423c75f2eae68a600628a6e4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.heartbeat.NoOpHeartbeatManager;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.PartitionTrackerFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.PermanentlyFencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation.ResolutionMode;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import javax.annotation.Nullable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* JobMaster implementation. The job master is responsible for the execution of a single {@link
* JobGraph}.
*
* <p>It offers the following methods as part of its rpc interface to interact with the JobMaster
* remotely:
*
* <ul>
* <li>{@link #updateTaskExecutionState} updates the task execution state for given task
* </ul>
*/
public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId>
implements JobMasterGateway, JobMasterService {
/** Default names for Flink's distributed components. */
public static final String JOB_MANAGER_NAME = "jobmanager";
// ------------------------------------------------------------------------
private final JobMasterConfiguration jobMasterConfiguration;
private final ResourceID resourceId;
private final JobGraph jobGraph;
private final Time rpcTimeout;
private final HighAvailabilityServices highAvailabilityServices;
private final BlobWriter blobWriter;
private final HeartbeatServices heartbeatServices;
private final ScheduledExecutorService scheduledExecutorService;
private final OnCompletionActions jobCompletionActions;
private final FatalErrorHandler fatalErrorHandler;
private final ClassLoader userCodeLoader;
private final SlotPoolService slotPoolService;
private final long initializationTimestamp;
private final boolean retrieveTaskManagerHostName;
// --------- ResourceManager --------
private final LeaderRetrievalService resourceManagerLeaderRetriever;
// --------- TaskManagers --------
private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>>
registeredTaskManagers;
private final ShuffleMaster<?> shuffleMaster;
// --------- Scheduler --------
private final SchedulerNG schedulerNG;
private final JobManagerJobStatusListener jobStatusListener;
private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
// -------- Misc ---------
private final Map<String, Object> accumulators;
private final JobMasterPartitionTracker partitionTracker;
private final ExecutionDeploymentTracker executionDeploymentTracker;
private final ExecutionDeploymentReconciler executionDeploymentReconciler;
// -------- Mutable fields ---------
@Nullable private ResourceManagerAddress resourceManagerAddress;
@Nullable private ResourceManagerConnection resourceManagerConnection;
@Nullable private EstablishedResourceManagerConnection establishedResourceManagerConnection;
private HeartbeatManager<TaskExecutorToJobManagerHeartbeatPayload, AllocatedSlotReport>
taskManagerHeartbeatManager;
private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
// ------------------------------------------------------------------------
public JobMaster(
RpcService rpcService,
JobMasterId jobMasterId,
JobMasterConfiguration jobMasterConfiguration,
ResourceID resourceId,
JobGraph jobGraph,
HighAvailabilityServices highAvailabilityService,
SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory,
JobManagerSharedServices jobManagerSharedServices,
HeartbeatServices heartbeatServices,
JobManagerJobMetricGroupFactory jobMetricGroupFactory,
OnCompletionActions jobCompletionActions,
FatalErrorHandler fatalErrorHandler,
ClassLoader userCodeLoader,
ShuffleMaster<?> shuffleMaster,
PartitionTrackerFactory partitionTrackerFactory,
ExecutionDeploymentTracker executionDeploymentTracker,
ExecutionDeploymentReconciler.Factory executionDeploymentReconcilerFactory,
long initializationTimestamp)
throws Exception {
super(rpcService, RpcServiceUtils.createRandomName(JOB_MANAGER_NAME), jobMasterId);
final ExecutionDeploymentReconciliationHandler executionStateReconciliationHandler =
new ExecutionDeploymentReconciliationHandler() {
@Override
public void onMissingDeploymentsOf(
Collection<ExecutionAttemptID> executionAttemptIds, ResourceID host) {
log.debug(
"Failing deployments {} due to no longer being deployed.",
executionAttemptIds);
for (ExecutionAttemptID executionAttemptId : executionAttemptIds) {
schedulerNG.updateTaskExecutionState(
new TaskExecutionState(
executionAttemptId,
ExecutionState.FAILED,
new FlinkException(
String.format(
"Execution %s is unexpectedly no longer running on task executor %s.",
executionAttemptId, host))));
}
}
@Override
public void onUnknownDeploymentsOf(
Collection<ExecutionAttemptID> executionAttemptIds, ResourceID host) {
log.debug(
"Canceling left-over deployments {} on task executor {}.",
executionAttemptIds,
host);
for (ExecutionAttemptID executionAttemptId : executionAttemptIds) {
Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerInfo =
registeredTaskManagers.get(host);
if (taskManagerInfo != null) {
taskManagerInfo.f1.cancelTask(executionAttemptId, rpcTimeout);
}
}
}
};
this.executionDeploymentTracker = executionDeploymentTracker;
this.executionDeploymentReconciler =
executionDeploymentReconcilerFactory.create(executionStateReconciliationHandler);
this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration);
this.resourceId = checkNotNull(resourceId);
this.jobGraph = checkNotNull(jobGraph);
this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
this.highAvailabilityServices = checkNotNull(highAvailabilityService);
this.blobWriter = jobManagerSharedServices.getBlobWriter();
this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();
this.jobCompletionActions = checkNotNull(jobCompletionActions);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.userCodeLoader = checkNotNull(userCodeLoader);
this.initializationTimestamp = initializationTimestamp;
this.retrieveTaskManagerHostName =
jobMasterConfiguration
.getConfiguration()
.getBoolean(JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME);
final String jobName = jobGraph.getName();
final JobID jid = jobGraph.getJobID();
log.info("Initializing job {} ({}).", jobName, jid);
resourceManagerLeaderRetriever =
highAvailabilityServices.getResourceManagerLeaderRetriever();
this.slotPoolService =
checkNotNull(slotPoolServiceSchedulerFactory).createSlotPoolService(jid);
this.registeredTaskManagers = new HashMap<>(4);
this.partitionTracker =
checkNotNull(partitionTrackerFactory)
.create(
resourceID -> {
Tuple2<TaskManagerLocation, TaskExecutorGateway>
taskManagerInfo =
registeredTaskManagers.get(resourceID);
if (taskManagerInfo == null) {
return Optional.empty();
}
return Optional.of(taskManagerInfo.f1);
});
this.shuffleMaster = checkNotNull(shuffleMaster);
this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
this.jobStatusListener = new JobManagerJobStatusListener();
this.schedulerNG =
createScheduler(
slotPoolServiceSchedulerFactory,
executionDeploymentTracker,
jobManagerJobMetricGroup,
jobStatusListener);
this.heartbeatServices = checkNotNull(heartbeatServices);
this.taskManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();
this.resourceManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();
this.resourceManagerConnection = null;
this.establishedResourceManagerConnection = null;
this.accumulators = new HashMap<>();
}
private SchedulerNG createScheduler(
SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory,
ExecutionDeploymentTracker executionDeploymentTracker,
JobManagerJobMetricGroup jobManagerJobMetricGroup,
JobStatusListener jobStatusListener)
throws Exception {
final SchedulerNG scheduler =
slotPoolServiceSchedulerFactory.createScheduler(
log,
jobGraph,
scheduledExecutorService,
jobMasterConfiguration.getConfiguration(),
slotPoolService,
scheduledExecutorService,
userCodeLoader,
highAvailabilityServices.getCheckpointRecoveryFactory(),
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
jobMasterConfiguration.getSlotRequestTimeout(),
shuffleMaster,
partitionTracker,
executionDeploymentTracker,
initializationTimestamp,
getMainThreadExecutor(),
fatalErrorHandler,
jobStatusListener);
return scheduler;
}
private HeartbeatManager<Void, Void> createResourceManagerHeartbeatManager(
HeartbeatServices heartbeatServices) {
return heartbeatServices.createHeartbeatManager(
resourceId, new ResourceManagerHeartbeatListener(), getMainThreadExecutor(), log);
}
private HeartbeatManager<TaskExecutorToJobManagerHeartbeatPayload, AllocatedSlotReport>
createTaskManagerHeartbeatManager(HeartbeatServices heartbeatServices) {
return heartbeatServices.createHeartbeatManagerSender(
resourceId, new TaskManagerHeartbeatListener(), getMainThreadExecutor(), log);
}
// ----------------------------------------------------------------------------------------------
// Lifecycle management
// ----------------------------------------------------------------------------------------------
@Override
protected void onStart() throws JobMasterException {
try {
startJobExecution();
} catch (Exception e) {
final JobMasterException jobMasterException =
new JobMasterException("Could not start the JobMaster.", e);
handleJobMasterError(jobMasterException);
throw jobMasterException;
}
}
/** Suspend the job and shutdown all other services including rpc. */
@Override
public CompletableFuture<Void> onStop() {
log.info("Stopping the JobMaster for job {}({}).", jobGraph.getName(), jobGraph.getJobID());
// make sure there is a graceful exit
return stopJobExecution(
new FlinkException(
String.format(
"Stopping JobMaster for job %s(%s).",
jobGraph.getName(), jobGraph.getJobID())))
.exceptionally(
exception -> {
throw new CompletionException(
new JobMasterException(
"Could not properly stop the JobMaster.", exception));
});
}
// ----------------------------------------------------------------------------------------------
// RPC methods
// ----------------------------------------------------------------------------------------------
@Override
public CompletableFuture<Acknowledge> cancel(Time timeout) {
schedulerNG.cancel();
return CompletableFuture.completedFuture(Acknowledge.get());
}
/**
* Updates the task execution state for a given task.
*
* @param taskExecutionState New task execution state for a given task
* @return Acknowledge the task execution state update
*/
@Override
public CompletableFuture<Acknowledge> updateTaskExecutionState(
final TaskExecutionState taskExecutionState) {
FlinkException taskExecutionException;
try {
checkNotNull(taskExecutionState, "taskExecutionState");
if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
taskExecutionException =
new ExecutionGraphException(
"The execution attempt "
+ taskExecutionState.getID()
+ " was not found.");
}
} catch (Exception e) {
taskExecutionException =
new JobMasterException(
"Could not update the state of task execution for JobMaster.", e);
handleJobMasterError(taskExecutionException);
}
return FutureUtils.completedExceptionally(taskExecutionException);
}
@Override
public CompletableFuture<SerializedInputSplit> requestNextInputSplit(
final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) {
try {
return CompletableFuture.completedFuture(
schedulerNG.requestNextInputSplit(vertexID, executionAttempt));
} catch (IOException e) {
log.warn("Error while requesting next input split", e);
return FutureUtils.completedExceptionally(e);
}
}
@Override
public CompletableFuture<ExecutionState> requestPartitionState(
final IntermediateDataSetID intermediateResultId,
final ResultPartitionID resultPartitionId) {
try {
return CompletableFuture.completedFuture(
schedulerNG.requestPartitionState(intermediateResultId, resultPartitionId));
} catch (PartitionProducerDisposedException e) {
log.info("Error while requesting partition state", e);
return FutureUtils.completedExceptionally(e);
}
}
@Override
public CompletableFuture<Acknowledge> notifyPartitionDataAvailable(
final ResultPartitionID partitionID, final Time timeout) {
schedulerNG.notifyPartitionDataAvailable(partitionID);
return CompletableFuture.completedFuture(Acknowledge.get());
}
@Override
public CompletableFuture<Acknowledge> disconnectTaskManager(
final ResourceID resourceID, final Exception cause) {
log.debug(
"Disconnect TaskExecutor {} because: {}",
resourceID.getStringWithMetadata(),
cause.getMessage());
taskManagerHeartbeatManager.unmonitorTarget(resourceID);
slotPoolService.releaseTaskManager(resourceID, cause);
partitionTracker.stopTrackingPartitionsFor(resourceID);
Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection =
registeredTaskManagers.remove(resourceID);
if (taskManagerConnection != null) {
taskManagerConnection.f1.disconnectJobManager(jobGraph.getJobID(), cause);
}
return CompletableFuture.completedFuture(Acknowledge.get());
}
// TODO: This method needs a leader session ID
@Override
public void acknowledgeCheckpoint(
final JobID jobID,
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
final CheckpointMetrics checkpointMetrics,
final TaskStateSnapshot checkpointState) {
schedulerNG.acknowledgeCheckpoint(
jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
}
@Override
public void reportCheckpointMetrics(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics) {
schedulerNG.reportCheckpointMetrics(
jobID, executionAttemptID, checkpointId, checkpointMetrics);
}
// TODO: This method needs a leader session ID
@Override
public void declineCheckpoint(DeclineCheckpoint decline) {
schedulerNG.declineCheckpoint(decline);
}
@Override
public CompletableFuture<Acknowledge> sendOperatorEventToCoordinator(
final ExecutionAttemptID task,
final OperatorID operatorID,
final SerializedValue<OperatorEvent> serializedEvent) {
try {
final OperatorEvent evt = serializedEvent.deserializeValue(userCodeLoader);
schedulerNG.deliverOperatorEventToCoordinator(task, operatorID, evt);
return CompletableFuture.completedFuture(Acknowledge.get());
} catch (Exception e) {
return FutureUtils.completedExceptionally(e);
}
}
@Override
public CompletableFuture<KvStateLocation> requestKvStateLocation(
final JobID jobId, final String registrationName) {
try {
return CompletableFuture.completedFuture(
schedulerNG.requestKvStateLocation(jobId, registrationName));
} catch (UnknownKvStateLocation | FlinkJobNotFoundException e) {
log.info("Error while request key-value state location", e);
return FutureUtils.completedExceptionally(e);
}
}
@Override
public CompletableFuture<Acknowledge> notifyKvStateRegistered(
final JobID jobId,
final JobVertexID jobVertexId,
final KeyGroupRange keyGroupRange,
final String registrationName,
final KvStateID kvStateId,
final InetSocketAddress kvStateServerAddress) {
try {
schedulerNG.notifyKvStateRegistered(
jobId,
jobVertexId,
keyGroupRange,
registrationName,
kvStateId,
kvStateServerAddress);
return CompletableFuture.completedFuture(Acknowledge.get());
} catch (FlinkJobNotFoundException e) {
log.info("Error while receiving notification about key-value state registration", e);
return FutureUtils.completedExceptionally(e);
}
}
@Override
public CompletableFuture<Acknowledge> notifyKvStateUnregistered(
JobID jobId,
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName) {
try {
schedulerNG.notifyKvStateUnregistered(
jobId, jobVertexId, keyGroupRange, registrationName);
return CompletableFuture.completedFuture(Acknowledge.get());
} catch (FlinkJobNotFoundException e) {
log.info("Error while receiving notification about key-value state de-registration", e);
return FutureUtils.completedExceptionally(e);
}
}
@Override
public CompletableFuture<Collection<SlotOffer>> offerSlots(
final ResourceID taskManagerId, final Collection<SlotOffer> slots, final Time timeout) {
Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager =
registeredTaskManagers.get(taskManagerId);
if (taskManager == null) {
return FutureUtils.completedExceptionally(
new Exception("Unknown TaskManager " + taskManagerId));
}
final TaskManagerLocation taskManagerLocation = taskManager.f0;
final TaskExecutorGateway taskExecutorGateway = taskManager.f1;
final RpcTaskManagerGateway rpcTaskManagerGateway =
new RpcTaskManagerGateway(taskExecutorGateway, getFencingToken());
return CompletableFuture.completedFuture(
slotPoolService.offerSlots(taskManagerLocation, rpcTaskManagerGateway, slots));
}
@Override
public void failSlot(
final ResourceID taskManagerId,
final AllocationID allocationId,
final Exception cause) {
if (registeredTaskManagers.containsKey(taskManagerId)) {
internalFailAllocation(taskManagerId, allocationId, cause);
} else {
log.warn(
"Cannot fail slot "
+ allocationId
+ " because the TaskManager "
+ taskManagerId
+ " is unknown.");
}
}
private void internalFailAllocation(
@Nullable ResourceID resourceId, AllocationID allocationId, Exception cause) {
final Optional<ResourceID> resourceIdOptional =
slotPoolService.failAllocation(resourceId, allocationId, cause);
resourceIdOptional.ifPresent(
taskManagerId -> {
if (!partitionTracker.isTrackingPartitionsFor(taskManagerId)) {
releaseEmptyTaskManager(taskManagerId);
}
});
}
private void releaseEmptyTaskManager(ResourceID resourceId) {
disconnectTaskManager(
resourceId,
new FlinkException(
String.format(
"No more slots registered at JobMaster %s.",
resourceId.getStringWithMetadata())));
}
@Override
public CompletableFuture<RegistrationResponse> registerTaskManager(
final String taskManagerRpcAddress,
final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation,
final JobID jobId,
final Time timeout) {
if (!jobGraph.getJobID().equals(jobId)) {
log.debug(
"Rejecting TaskManager registration attempt because of wrong job id {}.",
jobId);
return CompletableFuture.completedFuture(
new JMTMRegistrationRejection(
String.format(
"The JobManager is not responsible for job %s. Maybe the TaskManager used outdated connection information.",
jobId)));
}
final TaskManagerLocation taskManagerLocation;
try {
if (retrieveTaskManagerHostName) {
taskManagerLocation =
TaskManagerLocation.fromUnresolvedLocation(
unresolvedTaskManagerLocation, ResolutionMode.RETRIEVE_HOST_NAME);
} else {
taskManagerLocation =
TaskManagerLocation.fromUnresolvedLocation(
unresolvedTaskManagerLocation, ResolutionMode.USE_IP_ONLY);
}
} catch (Throwable throwable) {
final String errMsg =
String.format(
"Could not accept TaskManager registration. TaskManager address %s cannot be resolved. %s",
unresolvedTaskManagerLocation.getExternalAddress(),
throwable.getMessage());
log.error(errMsg);
return CompletableFuture.completedFuture(
new RegistrationResponse.Failure(new FlinkException(errMsg, throwable)));
}
final ResourceID taskManagerId = taskManagerLocation.getResourceID();
if (registeredTaskManagers.containsKey(taskManagerId)) {
final RegistrationResponse response = new JMTMRegistrationSuccess(resourceId);
return CompletableFuture.completedFuture(response);
} else {
return getRpcService()
.connect(taskManagerRpcAddress, TaskExecutorGateway.class)
.handleAsync(
(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
if (throwable != null) {
return new RegistrationResponse.Failure(throwable);
}
slotPoolService.registerTaskManager(taskManagerId);
registeredTaskManagers.put(
taskManagerId,
Tuple2.of(taskManagerLocation, taskExecutorGateway));
// monitor the task manager as heartbeat target
taskManagerHeartbeatManager.monitorTarget(
taskManagerId,
new TaskExecutorHeartbeatTarget(taskExecutorGateway));
return new JMTMRegistrationSuccess(resourceId);
},
getMainThreadExecutor());
}
}
@Override
public void disconnectResourceManager(
final ResourceManagerId resourceManagerId, final Exception cause) {
if (isConnectingToResourceManager(resourceManagerId)) {
reconnectToResourceManager(cause);
}
}
private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerId) {
return resourceManagerAddress != null
&& resourceManagerAddress.getResourceManagerId().equals(resourceManagerId);
}
@Override
public CompletableFuture<Void> heartbeatFromTaskManager(
final ResourceID resourceID, TaskExecutorToJobManagerHeartbeatPayload payload) {
return taskManagerHeartbeatManager.receiveHeartbeat(resourceID, payload);
}
@Override
public CompletableFuture<Void> heartbeatFromResourceManager(final ResourceID resourceID) {
return resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}
@Override
public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
return CompletableFuture.completedFuture(schedulerNG.requestJobDetails());
}
@Override
public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
return CompletableFuture.completedFuture(schedulerNG.requestJobStatus());
}
@Override
public CompletableFuture<ExecutionGraphInfo> requestJob(Time timeout) {
return CompletableFuture.completedFuture(schedulerNG.requestJob());
}
@Override
public CompletableFuture<String> triggerSavepoint(
@Nullable final String targetDirectory, final boolean cancelJob, final Time timeout) {
return schedulerNG.triggerSavepoint(targetDirectory, cancelJob);
}
@Override
public CompletableFuture<String> stopWithSavepoint(
@Nullable final String targetDirectory, final boolean terminate, final Time timeout) {
return schedulerNG.stopWithSavepoint(targetDirectory, terminate);
}
@Override
public void notifyAllocationFailure(AllocationID allocationID, Exception cause) {
internalFailAllocation(null, allocationID, cause);
}
@Override
public void notifyNotEnoughResourcesAvailable(
Collection<ResourceRequirement> acquiredResources) {
slotPoolService.notifyNotEnoughResourcesAvailable(acquiredResources);
}
@Override
public CompletableFuture<Object> updateGlobalAggregate(
String aggregateName, Object aggregand, byte[] serializedAggregateFunction) {
AggregateFunction aggregateFunction = null;
try {
aggregateFunction =
InstantiationUtil.deserializeObject(
serializedAggregateFunction, userCodeLoader);
} catch (Exception e) {
log.error("Error while attempting to deserialize user AggregateFunction.");
return FutureUtils.completedExceptionally(e);
}
Object accumulator = accumulators.get(aggregateName);
if (null == accumulator) {
accumulator = aggregateFunction.createAccumulator();
}
accumulator = aggregateFunction.add(aggregand, accumulator);
accumulators.put(aggregateName, accumulator);
return CompletableFuture.completedFuture(aggregateFunction.getResult(accumulator));
}
@Override
public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
OperatorID operatorId,
SerializedValue<CoordinationRequest> serializedRequest,
Time timeout) {
try {
CoordinationRequest request = serializedRequest.deserializeValue(userCodeLoader);
return schedulerNG.deliverCoordinationRequestToCoordinator(operatorId, request);
} catch (Exception e) {
return FutureUtils.completedExceptionally(e);
}
}
// ----------------------------------------------------------------------------------------------
// Internal methods
// ----------------------------------------------------------------------------------------------
// -- job starting and stopping
// -----------------------------------------------------------------
private void startJobExecution() throws Exception {
validateRunsInMainThread();
startJobMasterServices();
log.info(
"Starting execution of job {} ({}) under job master id {}.",
jobGraph.getName(),
jobGraph.getJobID(),
getFencingToken());
startScheduling();
}
private void startJobMasterServices() throws Exception {
try {
this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);
this.resourceManagerHeartbeatManager =
createResourceManagerHeartbeatManager(heartbeatServices);
// start the slot pool make sure the slot pool now accepts messages for this leader
slotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());
// job is ready to go, try to establish connection with resource manager
// - activate leader retrieval for the resource manager
// - on notification of the leader, the connection will be established and
// the slot pool will start requesting slots
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
} catch (Exception e) {
handleStartJobMasterServicesError(e);
}
}
private void handleStartJobMasterServicesError(Exception e) throws Exception {
try {
stopJobMasterServices();
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
private void stopJobMasterServices() throws Exception {
Exception resultingException = null;
try {
resourceManagerLeaderRetriever.stop();
} catch (Exception e) {
resultingException = e;
}
// TODO: Distinguish between job termination which should free all slots and a loss of
// leadership which should keep the slots
slotPoolService.close();
stopHeartbeatServices();
ExceptionUtils.tryRethrowException(resultingException);
}
private CompletableFuture<Void> stopJobExecution(final Exception cause) {
validateRunsInMainThread();
final CompletableFuture<Void> terminationFuture = stopScheduling();
return FutureUtils.runAfterwards(
terminationFuture,
() -> {
disconnectTaskManagerResourceManagerConnections(cause);
stopJobMasterServices();
});
}
private void disconnectTaskManagerResourceManagerConnections(Exception cause) {
// disconnect from all registered TaskExecutors
final Set<ResourceID> taskManagerResourceIds =
new HashSet<>(registeredTaskManagers.keySet());
for (ResourceID taskManagerResourceId : taskManagerResourceIds) {
disconnectTaskManager(taskManagerResourceId, cause);
}
// disconnect from resource manager:
closeResourceManagerConnection(cause);
}
private void stopHeartbeatServices() {
taskManagerHeartbeatManager.stop();
resourceManagerHeartbeatManager.stop();
}
private void startScheduling() {
schedulerNG.startScheduling();
}
private CompletableFuture<Void> stopScheduling() {
jobManagerJobMetricGroup.close();
jobStatusListener.stop();
return schedulerNG.closeAsync();
}
// ----------------------------------------------------------------------------------------------
private void handleJobMasterError(final Throwable cause) {
if (ExceptionUtils.isJvmFatalError(cause)) {
log.error("Fatal error occurred on JobManager.", cause);
// The fatal error handler implementation should make sure that this call is
// non-blocking
fatalErrorHandler.onFatalError(cause);
} else {
jobCompletionActions.jobMasterFailed(cause);
}
}
private void jobStatusChanged(final JobStatus newJobStatus) {
validateRunsInMainThread();
if (newJobStatus.isGloballyTerminalState()) {
runAsync(
() -> {
Collection<ResultPartitionID> allTracked =
partitionTracker.getAllTrackedPartitions().stream()
.map(d -> d.getShuffleDescriptor().getResultPartitionID())
.collect(Collectors.toList());
if (newJobStatus == JobStatus.FINISHED) {
partitionTracker.stopTrackingAndReleaseOrPromotePartitions(allTracked);
} else {
partitionTracker.stopTrackingAndReleasePartitions(allTracked);
}
});
final ExecutionGraphInfo executionGraphInfo = schedulerNG.requestJob();
scheduledExecutorService.execute(
() -> jobCompletionActions.jobReachedGloballyTerminalState(executionGraphInfo));
}
}
private void notifyOfNewResourceManagerLeader(
final String newResourceManagerAddress, final ResourceManagerId resourceManagerId) {
resourceManagerAddress =
createResourceManagerAddress(newResourceManagerAddress, resourceManagerId);
reconnectToResourceManager(
new FlinkException(
String.format(
"ResourceManager leader changed to new address %s",
resourceManagerAddress)));
}
@Nullable
private ResourceManagerAddress createResourceManagerAddress(
@Nullable String newResourceManagerAddress,
@Nullable ResourceManagerId resourceManagerId) {
if (newResourceManagerAddress != null) {
// the contract is: address == null <=> id == null
checkNotNull(resourceManagerId);
return new ResourceManagerAddress(newResourceManagerAddress, resourceManagerId);
} else {
return null;
}
}
private void reconnectToResourceManager(Exception cause) {
closeResourceManagerConnection(cause);
tryConnectToResourceManager();
}
private void tryConnectToResourceManager() {
if (resourceManagerAddress != null) {
connectToResourceManager();
}
}
private void connectToResourceManager() {
assert (resourceManagerAddress != null);
assert (resourceManagerConnection == null);
assert (establishedResourceManagerConnection == null);
log.info("Connecting to ResourceManager {}", resourceManagerAddress);
resourceManagerConnection =
new ResourceManagerConnection(
log,
jobGraph.getJobID(),
resourceId,
getAddress(),
getFencingToken(),
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
scheduledExecutorService);
resourceManagerConnection.start();
}
private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {
final ResourceManagerId resourceManagerId = success.getResourceManagerId();
// verify the response with current connection
if (resourceManagerConnection != null
&& Objects.equals(
resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) {
log.info(
"JobManager successfully registered at ResourceManager, leader id: {}.",
resourceManagerId);
final ResourceManagerGateway resourceManagerGateway =
resourceManagerConnection.getTargetGateway();
final ResourceID resourceManagerResourceId = success.getResourceManagerResourceId();
establishedResourceManagerConnection =
new EstablishedResourceManagerConnection(
resourceManagerGateway, resourceManagerResourceId);
slotPoolService.connectToResourceManager(resourceManagerGateway);
resourceManagerHeartbeatManager.monitorTarget(
resourceManagerResourceId,
new ResourceManagerHeartbeatTarget(resourceManagerGateway));
} else {
log.debug(
"Ignoring resource manager connection to {} because it's duplicated or outdated.",
resourceManagerId);
}
}
private void closeResourceManagerConnection(Exception cause) {
if (establishedResourceManagerConnection != null) {
dissolveResourceManagerConnection(establishedResourceManagerConnection, cause);
establishedResourceManagerConnection = null;
}
if (resourceManagerConnection != null) {
// stop a potentially ongoing registration process
resourceManagerConnection.close();
resourceManagerConnection = null;
}
}
private void dissolveResourceManagerConnection(
EstablishedResourceManagerConnection establishedResourceManagerConnection,
Exception cause) {
final ResourceID resourceManagerResourceID =
establishedResourceManagerConnection.getResourceManagerResourceID();
if (log.isDebugEnabled()) {
log.debug(
"Close ResourceManager connection {}.",
resourceManagerResourceID.getStringWithMetadata(),
cause);
} else {
log.info(
"Close ResourceManager connection {}: {}",
resourceManagerResourceID.getStringWithMetadata(),
cause.getMessage());
}
resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceID);
ResourceManagerGateway resourceManagerGateway =
establishedResourceManagerConnection.getResourceManagerGateway();
resourceManagerGateway.disconnectJobManager(
jobGraph.getJobID(), schedulerNG.requestJobStatus(), cause);
slotPoolService.disconnectResourceManager();
}
// ----------------------------------------------------------------------------------------------
// Service methods
// ----------------------------------------------------------------------------------------------
@Override
public JobMasterGateway getGateway() {
return getSelfGateway(JobMasterGateway.class);
}
// ----------------------------------------------------------------------------------------------
// Utility classes
// ----------------------------------------------------------------------------------------------
private static final class TaskExecutorHeartbeatTarget
implements HeartbeatTarget<AllocatedSlotReport> {
private final TaskExecutorGateway taskExecutorGateway;
private TaskExecutorHeartbeatTarget(TaskExecutorGateway taskExecutorGateway) {
this.taskExecutorGateway = taskExecutorGateway;
}
@Override
public CompletableFuture<Void> receiveHeartbeat(
ResourceID resourceID, AllocatedSlotReport payload) {
// the task manager will not request heartbeat, so
// this method will never be called currently
return FutureUtils.unsupportedOperationFuture();
}
@Override
public CompletableFuture<Void> requestHeartbeat(
ResourceID resourceID, AllocatedSlotReport allocatedSlotReport) {
return taskExecutorGateway.heartbeatFromJobManager(resourceID, allocatedSlotReport);
}
}
private static final class ResourceManagerHeartbeatTarget implements HeartbeatTarget<Void> {
private final ResourceManagerGateway resourceManagerGateway;
private ResourceManagerHeartbeatTarget(ResourceManagerGateway resourceManagerGateway) {
this.resourceManagerGateway = resourceManagerGateway;
}
@Override
public CompletableFuture<Void> receiveHeartbeat(ResourceID resourceID, Void payload) {
return resourceManagerGateway.heartbeatFromJobManager(resourceID);
}
@Override
public CompletableFuture<Void> requestHeartbeat(ResourceID resourceID, Void payload) {
// request heartbeat will never be called on the job manager side
return FutureUtils.unsupportedOperationFuture();
}
}
private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
runAsync(
() ->
notifyOfNewResourceManagerLeader(
leaderAddress,
ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
@Override
public void handleError(final Exception exception) {
handleJobMasterError(
new Exception("Fatal error in the ResourceManager leader service", exception));
}
}
// ----------------------------------------------------------------------------------------------
private class ResourceManagerConnection
extends RegisteredRpcConnection<
ResourceManagerId,
ResourceManagerGateway,
JobMasterRegistrationSuccess,
RegistrationResponse.Rejection> {
private final JobID jobID;
private final ResourceID jobManagerResourceID;
private final String jobManagerRpcAddress;
private final JobMasterId jobMasterId;
ResourceManagerConnection(
final Logger log,
final JobID jobID,
final ResourceID jobManagerResourceID,
final String jobManagerRpcAddress,
final JobMasterId jobMasterId,
final String resourceManagerAddress,
final ResourceManagerId resourceManagerId,
final Executor executor) {
super(log, resourceManagerAddress, resourceManagerId, executor);
this.jobID = checkNotNull(jobID);
this.jobManagerResourceID = checkNotNull(jobManagerResourceID);
this.jobManagerRpcAddress = checkNotNull(jobManagerRpcAddress);
this.jobMasterId = checkNotNull(jobMasterId);
}
@Override
protected RetryingRegistration<
ResourceManagerId,
ResourceManagerGateway,
JobMasterRegistrationSuccess,
RegistrationResponse.Rejection>
generateRegistration() {
return new RetryingRegistration<
ResourceManagerId,
ResourceManagerGateway,
JobMasterRegistrationSuccess,
RegistrationResponse.Rejection>(
log,
getRpcService(),
"ResourceManager",
ResourceManagerGateway.class,
getTargetAddress(),
getTargetLeaderId(),
jobMasterConfiguration.getRetryingRegistrationConfiguration()) {
@Override
protected CompletableFuture<RegistrationResponse> invokeRegistration(
ResourceManagerGateway gateway,
ResourceManagerId fencingToken,
long timeoutMillis) {
Time timeout = Time.milliseconds(timeoutMillis);
return gateway.registerJobManager(
jobMasterId,
jobManagerResourceID,
jobManagerRpcAddress,
jobID,
timeout);
}
};
}
@Override
protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
runAsync(
() -> {
// filter out outdated connections
//noinspection ObjectEquality
if (this == resourceManagerConnection) {
establishResourceManagerConnection(success);
}
});
}
@Override
protected void onRegistrationRejection(RegistrationResponse.Rejection rejection) {
handleJobMasterError(
new IllegalStateException(
"The ResourceManager should never reject a JobMaster registration."));
}
@Override
protected void onRegistrationFailure(final Throwable failure) {
handleJobMasterError(failure);
}
}
// ----------------------------------------------------------------------------------------------
private class JobManagerJobStatusListener implements JobStatusListener {
private volatile boolean running = true;
@Override
public void jobStatusChanges(
final JobID jobId,
final JobStatus newJobStatus,
final long timestamp,
final Throwable error) {
if (running) {
// run in rpc thread to avoid concurrency
runAsync(() -> jobStatusChanged(newJobStatus));
}
}
private void stop() {
running = false;
}
}
private class TaskManagerHeartbeatListener
implements HeartbeatListener<
TaskExecutorToJobManagerHeartbeatPayload, AllocatedSlotReport> {
@Override
public void notifyHeartbeatTimeout(ResourceID resourceID) {
final String message =
String.format(
"Heartbeat of TaskManager with id %s timed out.",
resourceID.getStringWithMetadata());
log.info(message);
handleTaskManagerConnectionLoss(resourceID, new TimeoutException(message));
}
private void handleTaskManagerConnectionLoss(ResourceID resourceID, Exception cause) {
validateRunsInMainThread();
disconnectTaskManager(resourceID, cause);
}
@Override
public void notifyTargetUnreachable(ResourceID resourceID) {
final String message =
String.format(
"TaskManager with id %s is no longer reachable.",
resourceID.getStringWithMetadata());
log.info(message);
handleTaskManagerConnectionLoss(resourceID, new JobMasterException(message));
}
@Override
public void reportPayload(
ResourceID resourceID, TaskExecutorToJobManagerHeartbeatPayload payload) {
validateRunsInMainThread();
executionDeploymentReconciler.reconcileExecutionDeployments(
resourceID,
payload.getExecutionDeploymentReport(),
executionDeploymentTracker.getExecutionsOn(resourceID));
for (AccumulatorSnapshot snapshot :
payload.getAccumulatorReport().getAccumulatorSnapshots()) {
schedulerNG.updateAccumulators(snapshot);
}
}
@Override
public AllocatedSlotReport retrievePayload(ResourceID resourceID) {
validateRunsInMainThread();
return slotPoolService.createAllocatedSlotReport(resourceID);
}
}
private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
@Override
public void notifyHeartbeatTimeout(final ResourceID resourceId) {
final String message =
String.format(
"The heartbeat of ResourceManager with id %s timed out.",
resourceId.getStringWithMetadata());
log.info(message);
handleResourceManagerConnectionLoss(resourceId, new TimeoutException(message));
}
private void handleResourceManagerConnectionLoss(ResourceID resourceId, Exception cause) {
validateRunsInMainThread();
if (establishedResourceManagerConnection != null
&& establishedResourceManagerConnection
.getResourceManagerResourceID()
.equals(resourceId)) {
reconnectToResourceManager(cause);
}
}
@Override
public void notifyTargetUnreachable(ResourceID resourceID) {
final String message =
String.format(
"ResourceManager with id %s is no longer reachable.",
resourceID.getStringWithMetadata());
log.info(message);
handleResourceManagerConnectionLoss(resourceID, new JobMasterException(message));
}
@Override
public void reportPayload(ResourceID resourceID, Void payload) {
// nothing to do since the payload is of type Void
}
@Override
public Void retrievePayload(ResourceID resourceID) {
return null;
}
}
}