blob: be1f3ea9db38581d4cdf099adf9a4a9a347759a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.heartbeat.NoOpHeartbeatManager;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.DataSetMetaInfo;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistration;
import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rest.messages.LogInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.FileType;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorHeartbeatPayload;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationRejection;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* ResourceManager implementation. The resource manager is responsible for resource de-/allocation
* and bookkeeping.
*
* <p>It offers the following methods as part of its rpc interface to interact with him remotely:
*
* <ul>
* <li>{@link #registerJobManager(JobMasterId, ResourceID, String, JobID, Time)} registers a
* {@link JobMaster} at the resource manager
* </ul>
*/
public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
extends FencedRpcEndpoint<ResourceManagerId> implements ResourceManagerGateway {
public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
/** Unique id of the resource manager. */
private final ResourceID resourceId;
/** All currently registered JobMasterGateways scoped by JobID. */
private final Map<JobID, JobManagerRegistration> jobManagerRegistrations;
/** All currently registered JobMasterGateways scoped by ResourceID. */
private final Map<ResourceID, JobManagerRegistration> jmResourceIdRegistrations;
/** Service to retrieve the job leader ids. */
private final JobLeaderIdService jobLeaderIdService;
/** All currently registered TaskExecutors with there framework specific worker information. */
private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors;
/** Ongoing registration of TaskExecutors per resource ID. */
private final Map<ResourceID, CompletableFuture<TaskExecutorGateway>>
taskExecutorGatewayFutures;
private final HeartbeatServices heartbeatServices;
/** Fatal error handler. */
private final FatalErrorHandler fatalErrorHandler;
/** The slot manager maintains the available slots. */
private final SlotManager slotManager;
private final ResourceManagerPartitionTracker clusterPartitionTracker;
private final ClusterInformation clusterInformation;
protected final ResourceManagerMetricGroup resourceManagerMetricGroup;
protected final Executor ioExecutor;
private final CompletableFuture<Void> startedFuture;
/** The heartbeat manager with task managers. */
private HeartbeatManager<TaskExecutorHeartbeatPayload, Void> taskManagerHeartbeatManager;
/** The heartbeat manager with job managers. */
private HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
public ResourceManager(
RpcService rpcService,
UUID leaderSessionId,
ResourceID resourceId,
HeartbeatServices heartbeatServices,
SlotManager slotManager,
ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory,
JobLeaderIdService jobLeaderIdService,
ClusterInformation clusterInformation,
FatalErrorHandler fatalErrorHandler,
ResourceManagerMetricGroup resourceManagerMetricGroup,
Time rpcTimeout,
Executor ioExecutor) {
super(
rpcService,
RpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME),
ResourceManagerId.fromUuid(leaderSessionId));
this.resourceId = checkNotNull(resourceId);
this.heartbeatServices = checkNotNull(heartbeatServices);
this.slotManager = checkNotNull(slotManager);
this.jobLeaderIdService = checkNotNull(jobLeaderIdService);
this.clusterInformation = checkNotNull(clusterInformation);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.resourceManagerMetricGroup = checkNotNull(resourceManagerMetricGroup);
this.jobManagerRegistrations = new HashMap<>(4);
this.jmResourceIdRegistrations = new HashMap<>(4);
this.taskExecutors = new HashMap<>(8);
this.taskExecutorGatewayFutures = new HashMap<>(8);
this.jobManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();
this.taskManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();
this.clusterPartitionTracker =
checkNotNull(clusterPartitionTrackerFactory)
.get(
(taskExecutorResourceId, dataSetIds) ->
taskExecutors
.get(taskExecutorResourceId)
.getTaskExecutorGateway()
.releaseClusterPartitions(dataSetIds, rpcTimeout)
.exceptionally(
throwable -> {
log.debug(
"Request for release of cluster partitions belonging to data sets {} was not successful.",
dataSetIds,
throwable);
throw new CompletionException(
throwable);
}));
this.ioExecutor = ioExecutor;
this.startedFuture = new CompletableFuture<>();
}
// ------------------------------------------------------------------------
// RPC lifecycle methods
// ------------------------------------------------------------------------
@Override
public final void onStart() throws Exception {
try {
log.info("Starting the resource manager.");
startResourceManagerServices();
startedFuture.complete(null);
} catch (Throwable t) {
final ResourceManagerException exception =
new ResourceManagerException(
String.format("Could not start the ResourceManager %s", getAddress()),
t);
onFatalError(exception);
throw exception;
}
}
private void startResourceManagerServices() throws Exception {
try {
jobLeaderIdService.start(new JobLeaderIdActionsImpl());
registerMetrics();
startHeartbeatServices();
slotManager.start(
getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
initialize();
} catch (Exception e) {
handleStartResourceManagerServicesException(e);
}
}
private void handleStartResourceManagerServicesException(Exception e) throws Exception {
try {
stopResourceManagerServices();
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
/**
* Completion of this future indicates that the resource manager is fully started and is ready
* to serve.
*/
public CompletableFuture<Void> getStartedFuture() {
return startedFuture;
}
@Override
public final CompletableFuture<Void> onStop() {
try {
stopResourceManagerServices();
} catch (Exception exception) {
return FutureUtils.completedExceptionally(
new FlinkException(
"Could not properly shut down the ResourceManager.", exception));
}
return CompletableFuture.completedFuture(null);
}
private void stopResourceManagerServices() throws Exception {
Exception exception = null;
try {
terminate();
} catch (Exception e) {
exception =
new ResourceManagerException("Error while shutting down resource manager", e);
}
stopHeartbeatServices();
try {
slotManager.close();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
try {
jobLeaderIdService.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
resourceManagerMetricGroup.close();
clearStateInternal();
ExceptionUtils.tryRethrowException(exception);
}
// ------------------------------------------------------------------------
// RPC methods
// ------------------------------------------------------------------------
@Override
public CompletableFuture<RegistrationResponse> registerJobManager(
final JobMasterId jobMasterId,
final ResourceID jobManagerResourceId,
final String jobManagerAddress,
final JobID jobId,
final Time timeout) {
checkNotNull(jobMasterId);
checkNotNull(jobManagerResourceId);
checkNotNull(jobManagerAddress);
checkNotNull(jobId);
if (!jobLeaderIdService.containsJob(jobId)) {
try {
jobLeaderIdService.addJob(jobId);
} catch (Exception e) {
ResourceManagerException exception =
new ResourceManagerException(
"Could not add the job " + jobId + " to the job id leader service.",
e);
onFatalError(exception);
log.error("Could not add job {} to job leader id service.", jobId, e);
return FutureUtils.completedExceptionally(exception);
}
}
log.info(
"Registering job manager {}@{} for job {}.", jobMasterId, jobManagerAddress, jobId);
CompletableFuture<JobMasterId> jobMasterIdFuture;
try {
jobMasterIdFuture = jobLeaderIdService.getLeaderId(jobId);
} catch (Exception e) {
// we cannot check the job leader id so let's fail
// TODO: Maybe it's also ok to skip this check in case that we cannot check the leader
// id
ResourceManagerException exception =
new ResourceManagerException(
"Cannot obtain the "
+ "job leader id future to verify the correct job leader.",
e);
onFatalError(exception);
log.debug(
"Could not obtain the job leader id future to verify the correct job leader.");
return FutureUtils.completedExceptionally(exception);
}
CompletableFuture<JobMasterGateway> jobMasterGatewayFuture =
getRpcService().connect(jobManagerAddress, jobMasterId, JobMasterGateway.class);
CompletableFuture<RegistrationResponse> registrationResponseFuture =
jobMasterGatewayFuture.thenCombineAsync(
jobMasterIdFuture,
(JobMasterGateway jobMasterGateway, JobMasterId leadingJobMasterId) -> {
if (Objects.equals(leadingJobMasterId, jobMasterId)) {
return registerJobMasterInternal(
jobMasterGateway,
jobId,
jobManagerAddress,
jobManagerResourceId);
} else {
final String declineMessage =
String.format(
"The leading JobMaster id %s did not match the received JobMaster id %s. "
+ "This indicates that a JobMaster leader change has happened.",
leadingJobMasterId, jobMasterId);
log.debug(declineMessage);
return new RegistrationResponse.Failure(
new FlinkException(declineMessage));
}
},
getMainThreadExecutor());
// handle exceptions which might have occurred in one of the futures inputs of combine
return registrationResponseFuture.handleAsync(
(RegistrationResponse registrationResponse, Throwable throwable) -> {
if (throwable != null) {
if (log.isDebugEnabled()) {
log.debug(
"Registration of job manager {}@{} failed.",
jobMasterId,
jobManagerAddress,
throwable);
} else {
log.info(
"Registration of job manager {}@{} failed.",
jobMasterId,
jobManagerAddress);
}
return new RegistrationResponse.Failure(throwable);
} else {
return registrationResponse;
}
},
ioExecutor);
}
@Override
public CompletableFuture<RegistrationResponse> registerTaskExecutor(
final TaskExecutorRegistration taskExecutorRegistration, final Time timeout) {
CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture =
getRpcService()
.connect(
taskExecutorRegistration.getTaskExecutorAddress(),
TaskExecutorGateway.class);
taskExecutorGatewayFutures.put(
taskExecutorRegistration.getResourceId(), taskExecutorGatewayFuture);
return taskExecutorGatewayFuture.handleAsync(
(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
final ResourceID resourceId = taskExecutorRegistration.getResourceId();
if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(resourceId)) {
taskExecutorGatewayFutures.remove(resourceId);
if (throwable != null) {
return new RegistrationResponse.Failure(throwable);
} else {
return registerTaskExecutorInternal(
taskExecutorGateway, taskExecutorRegistration);
}
} else {
log.debug(
"Ignoring outdated TaskExecutorGateway connection for {}.",
resourceId.getStringWithMetadata());
return new RegistrationResponse.Failure(
new FlinkException("Decline outdated task executor registration."));
}
},
getMainThreadExecutor());
}
@Override
public CompletableFuture<Acknowledge> sendSlotReport(
ResourceID taskManagerResourceId,
InstanceID taskManagerRegistrationId,
SlotReport slotReport,
Time timeout) {
final WorkerRegistration<WorkerType> workerTypeWorkerRegistration =
taskExecutors.get(taskManagerResourceId);
if (workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)) {
if (slotManager.registerTaskManager(
workerTypeWorkerRegistration,
slotReport,
workerTypeWorkerRegistration.getTotalResourceProfile(),
workerTypeWorkerRegistration.getDefaultSlotResourceProfile())) {
onWorkerRegistered(workerTypeWorkerRegistration.getWorker());
}
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
return FutureUtils.completedExceptionally(
new ResourceManagerException(
String.format(
"Unknown TaskManager registration id %s.",
taskManagerRegistrationId)));
}
}
protected void onWorkerRegistered(WorkerType worker) {
// noop
}
@Override
public CompletableFuture<Void> heartbeatFromTaskManager(
final ResourceID resourceID, final TaskExecutorHeartbeatPayload heartbeatPayload) {
return taskManagerHeartbeatManager.receiveHeartbeat(resourceID, heartbeatPayload);
}
@Override
public CompletableFuture<Void> heartbeatFromJobManager(final ResourceID resourceID) {
return jobManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
}
@Override
public void disconnectTaskManager(final ResourceID resourceId, final Exception cause) {
closeTaskManagerConnection(resourceId, cause);
}
@Override
public void disconnectJobManager(
final JobID jobId, JobStatus jobStatus, final Exception cause) {
if (jobStatus.isGloballyTerminalState()) {
removeJob(jobId, cause);
} else {
closeJobManagerConnection(jobId, ResourceRequirementHandling.RETAIN, cause);
}
}
@Override
public CompletableFuture<Acknowledge> declareRequiredResources(
JobMasterId jobMasterId, ResourceRequirements resourceRequirements, Time timeout) {
final JobID jobId = resourceRequirements.getJobId();
final JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
if (null != jobManagerRegistration) {
if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
slotManager.processResourceRequirements(resourceRequirements);
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
return FutureUtils.completedExceptionally(
new ResourceManagerException(
"The job leader's id "
+ jobManagerRegistration.getJobMasterId()
+ " does not match the received id "
+ jobMasterId
+ '.'));
}
} else {
return FutureUtils.completedExceptionally(
new ResourceManagerException(
"Could not find registered job manager for job " + jobId + '.'));
}
}
@Override
public void notifySlotAvailable(
final InstanceID instanceID, final SlotID slotId, final AllocationID allocationId) {
final ResourceID resourceId = slotId.getResourceID();
WorkerRegistration<WorkerType> registration = taskExecutors.get(resourceId);
if (registration != null) {
InstanceID registrationId = registration.getInstanceID();
if (Objects.equals(registrationId, instanceID)) {
slotManager.freeSlot(slotId, allocationId);
} else {
log.debug(
"Invalid registration id for slot available message. This indicates an"
+ " outdated request.");
}
} else {
log.debug(
"Could not find registration for resource id {}. Discarding the slot available"
+ "message {}.",
resourceId.getStringWithMetadata(),
slotId);
}
}
/**
* Cleanup application and shut down cluster.
*
* @param finalStatus of the Flink application
* @param diagnostics diagnostics message for the Flink application or {@code null}
*/
@Override
public CompletableFuture<Acknowledge> deregisterApplication(
final ApplicationStatus finalStatus, @Nullable final String diagnostics) {
log.info(
"Shut down cluster because application is in {}, diagnostics {}.",
finalStatus,
diagnostics);
try {
internalDeregisterApplication(finalStatus, diagnostics);
} catch (ResourceManagerException e) {
log.warn("Could not properly shutdown the application.", e);
}
return CompletableFuture.completedFuture(Acknowledge.get());
}
@Override
public CompletableFuture<Integer> getNumberOfRegisteredTaskManagers() {
return CompletableFuture.completedFuture(taskExecutors.size());
}
@Override
public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Time timeout) {
final ArrayList<TaskManagerInfo> taskManagerInfos = new ArrayList<>(taskExecutors.size());
for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> taskExecutorEntry :
taskExecutors.entrySet()) {
final ResourceID resourceId = taskExecutorEntry.getKey();
final WorkerRegistration<WorkerType> taskExecutor = taskExecutorEntry.getValue();
taskManagerInfos.add(
new TaskManagerInfo(
resourceId,
taskExecutor.getTaskExecutorGateway().getAddress(),
taskExecutor.getDataPort(),
taskExecutor.getJmxPort(),
taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()),
slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()),
slotManager.getRegisteredResourceOf(taskExecutor.getInstanceID()),
slotManager.getFreeResourceOf(taskExecutor.getInstanceID()),
taskExecutor.getHardwareDescription(),
taskExecutor.getMemoryConfiguration()));
}
return CompletableFuture.completedFuture(taskManagerInfos);
}
@Override
public CompletableFuture<TaskManagerInfoWithSlots> requestTaskManagerDetailsInfo(
ResourceID resourceId, Time timeout) {
final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(resourceId);
if (taskExecutor == null) {
return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(resourceId));
} else {
final InstanceID instanceId = taskExecutor.getInstanceID();
final TaskManagerInfoWithSlots taskManagerInfoWithSlots =
new TaskManagerInfoWithSlots(
new TaskManagerInfo(
resourceId,
taskExecutor.getTaskExecutorGateway().getAddress(),
taskExecutor.getDataPort(),
taskExecutor.getJmxPort(),
taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
slotManager.getNumberRegisteredSlotsOf(instanceId),
slotManager.getNumberFreeSlotsOf(instanceId),
slotManager.getRegisteredResourceOf(instanceId),
slotManager.getFreeResourceOf(instanceId),
taskExecutor.getHardwareDescription(),
taskExecutor.getMemoryConfiguration()),
slotManager.getAllocatedSlotsOf(instanceId));
return CompletableFuture.completedFuture(taskManagerInfoWithSlots);
}
}
@Override
public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
final int numberSlots = slotManager.getNumberRegisteredSlots();
final int numberFreeSlots = slotManager.getNumberFreeSlots();
final ResourceProfile totalResource = slotManager.getRegisteredResource();
final ResourceProfile freeResource = slotManager.getFreeResource();
return CompletableFuture.completedFuture(
new ResourceOverview(
taskExecutors.size(),
numberSlots,
numberFreeSlots,
totalResource,
freeResource));
}
@Override
public CompletableFuture<Collection<Tuple2<ResourceID, String>>>
requestTaskManagerMetricQueryServiceAddresses(Time timeout) {
final ArrayList<CompletableFuture<Optional<Tuple2<ResourceID, String>>>>
metricQueryServiceAddressFutures = new ArrayList<>(taskExecutors.size());
for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerRegistrationEntry :
taskExecutors.entrySet()) {
final ResourceID tmResourceId = workerRegistrationEntry.getKey();
final WorkerRegistration<WorkerType> workerRegistration =
workerRegistrationEntry.getValue();
final TaskExecutorGateway taskExecutorGateway =
workerRegistration.getTaskExecutorGateway();
final CompletableFuture<Optional<Tuple2<ResourceID, String>>>
metricQueryServiceAddressFuture =
taskExecutorGateway
.requestMetricQueryServiceAddress(timeout)
.thenApply(
o ->
o.toOptional()
.map(
address ->
Tuple2.of(
tmResourceId,
address)));
metricQueryServiceAddressFutures.add(metricQueryServiceAddressFuture);
}
return FutureUtils.combineAll(metricQueryServiceAddressFutures)
.thenApply(
collection ->
collection.stream()
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList()));
}
@Override
public CompletableFuture<TransientBlobKey> requestTaskManagerFileUploadByType(
ResourceID taskManagerId, FileType fileType, Time timeout) {
log.debug(
"Request {} file upload from TaskExecutor {}.",
fileType,
taskManagerId.getStringWithMetadata());
final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(taskManagerId);
if (taskExecutor == null) {
log.debug(
"Request upload of file {} from unregistered TaskExecutor {}.",
fileType,
taskManagerId.getStringWithMetadata());
return FutureUtils.completedExceptionally(
new UnknownTaskExecutorException(taskManagerId));
} else {
return taskExecutor.getTaskExecutorGateway().requestFileUploadByType(fileType, timeout);
}
}
@Override
public CompletableFuture<TransientBlobKey> requestTaskManagerFileUploadByName(
ResourceID taskManagerId, String fileName, Time timeout) {
log.debug(
"Request upload of file {} from TaskExecutor {}.",
fileName,
taskManagerId.getStringWithMetadata());
final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(taskManagerId);
if (taskExecutor == null) {
log.debug(
"Request upload of file {} from unregistered TaskExecutor {}.",
fileName,
taskManagerId.getStringWithMetadata());
return FutureUtils.completedExceptionally(
new UnknownTaskExecutorException(taskManagerId));
} else {
return taskExecutor.getTaskExecutorGateway().requestFileUploadByName(fileName, timeout);
}
}
@Override
public CompletableFuture<Collection<LogInfo>> requestTaskManagerLogList(
ResourceID taskManagerId, Time timeout) {
final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(taskManagerId);
if (taskExecutor == null) {
log.debug(
"Requested log list from unregistered TaskExecutor {}.",
taskManagerId.getStringWithMetadata());
return FutureUtils.completedExceptionally(
new UnknownTaskExecutorException(taskManagerId));
} else {
return taskExecutor.getTaskExecutorGateway().requestLogList(timeout);
}
}
@Override
public CompletableFuture<Void> releaseClusterPartitions(IntermediateDataSetID dataSetId) {
return clusterPartitionTracker.releaseClusterPartitions(dataSetId);
}
@Override
public CompletableFuture<Map<IntermediateDataSetID, DataSetMetaInfo>> listDataSets() {
return CompletableFuture.completedFuture(clusterPartitionTracker.listDataSets());
}
@Override
public CompletableFuture<ThreadDumpInfo> requestThreadDump(
ResourceID taskManagerId, Time timeout) {
final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(taskManagerId);
if (taskExecutor == null) {
log.debug(
"Requested thread dump from unregistered TaskExecutor {}.",
taskManagerId.getStringWithMetadata());
return FutureUtils.completedExceptionally(
new UnknownTaskExecutorException(taskManagerId));
} else {
return taskExecutor.getTaskExecutorGateway().requestThreadDump(timeout);
}
}
@Override
public CompletableFuture<TaskExecutorThreadInfoGateway> requestTaskExecutorThreadInfoGateway(
ResourceID taskManagerId, Time timeout) {
final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(taskManagerId);
if (taskExecutor == null) {
return FutureUtils.completedExceptionally(
new UnknownTaskExecutorException(taskManagerId));
} else {
return CompletableFuture.completedFuture(taskExecutor.getTaskExecutorGateway());
}
}
// ------------------------------------------------------------------------
// Internal methods
// ------------------------------------------------------------------------
/**
* Registers a new JobMaster.
*
* @param jobMasterGateway to communicate with the registering JobMaster
* @param jobId of the job for which the JobMaster is responsible
* @param jobManagerAddress address of the JobMaster
* @param jobManagerResourceId ResourceID of the JobMaster
* @return RegistrationResponse
*/
private RegistrationResponse registerJobMasterInternal(
final JobMasterGateway jobMasterGateway,
JobID jobId,
String jobManagerAddress,
ResourceID jobManagerResourceId) {
if (jobManagerRegistrations.containsKey(jobId)) {
JobManagerRegistration oldJobManagerRegistration = jobManagerRegistrations.get(jobId);
if (Objects.equals(
oldJobManagerRegistration.getJobMasterId(),
jobMasterGateway.getFencingToken())) {
// same registration
log.debug(
"Job manager {}@{} was already registered.",
jobMasterGateway.getFencingToken(),
jobManagerAddress);
} else {
// tell old job manager that he is no longer the job leader
closeJobManagerConnection(
oldJobManagerRegistration.getJobID(),
ResourceRequirementHandling.RETAIN,
new Exception("New job leader for job " + jobId + " found."));
JobManagerRegistration jobManagerRegistration =
new JobManagerRegistration(jobId, jobManagerResourceId, jobMasterGateway);
jobManagerRegistrations.put(jobId, jobManagerRegistration);
jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
}
} else {
// new registration for the job
JobManagerRegistration jobManagerRegistration =
new JobManagerRegistration(jobId, jobManagerResourceId, jobMasterGateway);
jobManagerRegistrations.put(jobId, jobManagerRegistration);
jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
}
log.info(
"Registered job manager {}@{} for job {}.",
jobMasterGateway.getFencingToken(),
jobManagerAddress,
jobId);
jobManagerHeartbeatManager.monitorTarget(
jobManagerResourceId, new JobMasterHeartbeatTarget(jobMasterGateway));
return new JobMasterRegistrationSuccess(getFencingToken(), resourceId);
}
/**
* Registers a new TaskExecutor.
*
* @param taskExecutorRegistration task executor registration parameters
* @return RegistrationResponse
*/
private RegistrationResponse registerTaskExecutorInternal(
TaskExecutorGateway taskExecutorGateway,
TaskExecutorRegistration taskExecutorRegistration) {
ResourceID taskExecutorResourceId = taskExecutorRegistration.getResourceId();
WorkerRegistration<WorkerType> oldRegistration =
taskExecutors.remove(taskExecutorResourceId);
if (oldRegistration != null) {
// TODO :: suggest old taskExecutor to stop itself
log.debug(
"Replacing old registration of TaskExecutor {}.",
taskExecutorResourceId.getStringWithMetadata());
// remove old task manager registration from slot manager
slotManager.unregisterTaskManager(
oldRegistration.getInstanceID(),
new ResourceManagerException(
String.format(
"TaskExecutor %s re-connected to the ResourceManager.",
taskExecutorResourceId.getStringWithMetadata())));
}
final WorkerType newWorker = workerStarted(taskExecutorResourceId);
String taskExecutorAddress = taskExecutorRegistration.getTaskExecutorAddress();
if (newWorker == null) {
log.warn(
"Discard registration from TaskExecutor {} at ({}) because the framework did "
+ "not recognize it",
taskExecutorResourceId.getStringWithMetadata(),
taskExecutorAddress);
return new TaskExecutorRegistrationRejection(
"The ResourceManager does not recognize this TaskExecutor.");
} else {
WorkerRegistration<WorkerType> registration =
new WorkerRegistration<>(
taskExecutorGateway,
newWorker,
taskExecutorRegistration.getDataPort(),
taskExecutorRegistration.getJmxPort(),
taskExecutorRegistration.getHardwareDescription(),
taskExecutorRegistration.getMemoryConfiguration(),
taskExecutorRegistration.getTotalResourceProfile(),
taskExecutorRegistration.getDefaultSlotResourceProfile());
log.info(
"Registering TaskManager with ResourceID {} ({}) at ResourceManager",
taskExecutorResourceId.getStringWithMetadata(),
taskExecutorAddress);
taskExecutors.put(taskExecutorResourceId, registration);
taskManagerHeartbeatManager.monitorTarget(
taskExecutorResourceId, new TaskExecutorHeartbeatTarget(taskExecutorGateway));
return new TaskExecutorRegistrationSuccess(
registration.getInstanceID(), resourceId, clusterInformation);
}
}
protected void registerMetrics() {
resourceManagerMetricGroup.gauge(
MetricNames.NUM_REGISTERED_TASK_MANAGERS, () -> (long) taskExecutors.size());
}
private void clearStateInternal() {
jobManagerRegistrations.clear();
jmResourceIdRegistrations.clear();
taskExecutors.clear();
try {
jobLeaderIdService.clear();
} catch (Exception e) {
onFatalError(
new ResourceManagerException(
"Could not properly clear the job leader id service.", e));
}
}
/**
* This method should be called by the framework once it detects that a currently registered job
* manager has failed.
*
* @param jobId identifying the job whose leader shall be disconnected.
* @param resourceRequirementHandling indicating how existing resource requirements for the
* corresponding job should be handled
* @param cause The exception which cause the JobManager failed.
*/
protected void closeJobManagerConnection(
JobID jobId, ResourceRequirementHandling resourceRequirementHandling, Exception cause) {
JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.remove(jobId);
if (jobManagerRegistration != null) {
final ResourceID jobManagerResourceId =
jobManagerRegistration.getJobManagerResourceID();
final JobMasterGateway jobMasterGateway = jobManagerRegistration.getJobManagerGateway();
final JobMasterId jobMasterId = jobManagerRegistration.getJobMasterId();
log.info(
"Disconnect job manager {}@{} for job {} from the resource manager.",
jobMasterId,
jobMasterGateway.getAddress(),
jobId);
jobManagerHeartbeatManager.unmonitorTarget(jobManagerResourceId);
jmResourceIdRegistrations.remove(jobManagerResourceId);
if (resourceRequirementHandling == ResourceRequirementHandling.CLEAR) {
slotManager.clearResourceRequirements(jobId);
}
// tell the job manager about the disconnect
jobMasterGateway.disconnectResourceManager(getFencingToken(), cause);
} else {
log.debug("There was no registered job manager for job {}.", jobId);
}
}
/**
* This method should be called by the framework once it detects that a currently registered
* task executor has failed.
*
* @param resourceID Id of the TaskManager that has failed.
* @param cause The exception which cause the TaskManager failed.
*/
protected void closeTaskManagerConnection(final ResourceID resourceID, final Exception cause) {
taskManagerHeartbeatManager.unmonitorTarget(resourceID);
WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID);
if (workerRegistration != null) {
log.info(
"Closing TaskExecutor connection {} because: {}",
resourceID.getStringWithMetadata(),
cause.getMessage());
// TODO :: suggest failed task executor to stop itself
slotManager.unregisterTaskManager(workerRegistration.getInstanceID(), cause);
clusterPartitionTracker.processTaskExecutorShutdown(resourceID);
workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
} else {
log.debug(
"No open TaskExecutor connection {}. Ignoring close TaskExecutor connection. Closing reason was: {}",
resourceID.getStringWithMetadata(),
cause.getMessage());
}
}
protected void removeJob(JobID jobId, Exception cause) {
try {
jobLeaderIdService.removeJob(jobId);
} catch (Exception e) {
log.warn(
"Could not properly remove the job {} from the job leader id service.",
jobId,
e);
}
if (jobManagerRegistrations.containsKey(jobId)) {
closeJobManagerConnection(jobId, ResourceRequirementHandling.CLEAR, cause);
}
}
protected void jobLeaderLostLeadership(JobID jobId, JobMasterId oldJobMasterId) {
if (jobManagerRegistrations.containsKey(jobId)) {
JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
if (Objects.equals(jobManagerRegistration.getJobMasterId(), oldJobMasterId)) {
closeJobManagerConnection(
jobId,
ResourceRequirementHandling.RETAIN,
new Exception("Job leader lost leadership."));
} else {
log.debug(
"Discarding job leader lost leadership, because a new job leader was found for job {}. ",
jobId);
}
} else {
log.debug(
"Discard job leader lost leadership for outdated leader {} for job {}.",
oldJobMasterId,
jobId);
}
}
protected void releaseResource(InstanceID instanceId, Exception cause) {
WorkerType worker = null;
// TODO: Improve performance by having an index on the instanceId
for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> entry :
taskExecutors.entrySet()) {
if (entry.getValue().getInstanceID().equals(instanceId)) {
worker = entry.getValue().getWorker();
break;
}
}
if (worker != null) {
if (stopWorker(worker)) {
closeTaskManagerConnection(worker.getResourceID(), cause);
} else {
log.debug(
"Worker {} could not be stopped.",
worker.getResourceID().getStringWithMetadata());
}
} else {
// unregister in order to clean up potential left over state
slotManager.unregisterTaskManager(instanceId, cause);
}
}
private enum ResourceRequirementHandling {
RETAIN,
CLEAR
}
// ------------------------------------------------------------------------
// Error Handling
// ------------------------------------------------------------------------
/**
* Notifies the ResourceManager that a fatal error has occurred and it cannot proceed.
*
* @param t The exception describing the fatal error
*/
protected void onFatalError(Throwable t) {
try {
log.error("Fatal error occurred in ResourceManager.", t);
} catch (Throwable ignored) {
}
// The fatal error handler implementation should make sure that this call is non-blocking
fatalErrorHandler.onFatalError(t);
}
private void startHeartbeatServices() {
taskManagerHeartbeatManager =
heartbeatServices.createHeartbeatManagerSender(
resourceId,
new TaskManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
jobManagerHeartbeatManager =
heartbeatServices.createHeartbeatManagerSender(
resourceId,
new JobManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
}
private void stopHeartbeatServices() {
taskManagerHeartbeatManager.stop();
jobManagerHeartbeatManager.stop();
}
// ------------------------------------------------------------------------
// Framework specific behavior
// ------------------------------------------------------------------------
/**
* Initializes the framework specific components.
*
* @throws ResourceManagerException which occurs during initialization and causes the resource
* manager to fail.
*/
protected abstract void initialize() throws ResourceManagerException;
/**
* Terminates the framework specific components.
*
* @throws Exception which occurs during termination.
*/
protected abstract void terminate() throws Exception;
/**
* The framework specific code to deregister the application. This should report the
* application's final status and shut down the resource manager cleanly.
*
* <p>This method also needs to make sure all pending containers that are not registered yet are
* returned.
*
* @param finalStatus The application status to report.
* @param optionalDiagnostics A diagnostics message or {@code null}.
* @throws ResourceManagerException if the application could not be shut down.
*/
protected abstract void internalDeregisterApplication(
ApplicationStatus finalStatus, @Nullable String optionalDiagnostics)
throws ResourceManagerException;
/**
* Allocates a resource using the worker resource specification.
*
* @param workerResourceSpec workerResourceSpec specifies the size of the to be allocated
* resource
* @return whether the resource can be allocated
*/
@VisibleForTesting
public abstract boolean startNewWorker(WorkerResourceSpec workerResourceSpec);
/**
* Callback when a worker was started.
*
* @param resourceID The worker resource id
*/
protected abstract WorkerType workerStarted(ResourceID resourceID);
/**
* Stops the given worker.
*
* @param worker The worker.
* @return True if the worker was stopped, otherwise false
*/
public abstract boolean stopWorker(WorkerType worker);
/**
* Set {@link SlotManager} whether to fail unfulfillable slot requests.
*
* @param failUnfulfillableRequest whether to fail unfulfillable requests
*/
protected void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) {
slotManager.setFailUnfulfillableRequest(failUnfulfillableRequest);
}
// ------------------------------------------------------------------------
// Static utility classes
// ------------------------------------------------------------------------
private static final class JobMasterHeartbeatTarget implements HeartbeatTarget<Void> {
private final JobMasterGateway jobMasterGateway;
private JobMasterHeartbeatTarget(JobMasterGateway jobMasterGateway) {
this.jobMasterGateway = jobMasterGateway;
}
@Override
public CompletableFuture<Void> receiveHeartbeat(ResourceID resourceID, Void payload) {
// the ResourceManager will always send heartbeat requests to the JobManager
return FutureUtils.unsupportedOperationFuture();
}
@Override
public CompletableFuture<Void> requestHeartbeat(ResourceID resourceID, Void payload) {
return jobMasterGateway.heartbeatFromResourceManager(resourceID);
}
}
private static final class TaskExecutorHeartbeatTarget implements HeartbeatTarget<Void> {
private final TaskExecutorGateway taskExecutorGateway;
private TaskExecutorHeartbeatTarget(TaskExecutorGateway taskExecutorGateway) {
this.taskExecutorGateway = taskExecutorGateway;
}
@Override
public CompletableFuture<Void> receiveHeartbeat(ResourceID resourceID, Void payload) {
// the ResourceManager will always send heartbeat requests to the
// TaskManager
return FutureUtils.unsupportedOperationFuture();
}
@Override
public CompletableFuture<Void> requestHeartbeat(ResourceID resourceID, Void payload) {
return taskExecutorGateway.heartbeatFromResourceManager(resourceID);
}
}
private class ResourceActionsImpl implements ResourceActions {
@Override
public void releaseResource(InstanceID instanceId, Exception cause) {
validateRunsInMainThread();
ResourceManager.this.releaseResource(instanceId, cause);
}
@Override
public boolean allocateResource(WorkerResourceSpec workerResourceSpec) {
validateRunsInMainThread();
return startNewWorker(workerResourceSpec);
}
@Override
public void notifyAllocationFailure(
JobID jobId, AllocationID allocationId, Exception cause) {
validateRunsInMainThread();
JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
if (jobManagerRegistration != null) {
jobManagerRegistration
.getJobManagerGateway()
.notifyAllocationFailure(allocationId, cause);
}
}
@Override
public void notifyNotEnoughResourcesAvailable(
JobID jobId, Collection<ResourceRequirement> acquiredResources) {
validateRunsInMainThread();
JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
if (jobManagerRegistration != null) {
jobManagerRegistration
.getJobManagerGateway()
.notifyNotEnoughResourcesAvailable(acquiredResources);
}
}
}
private class JobLeaderIdActionsImpl implements JobLeaderIdActions {
@Override
public void jobLeaderLostLeadership(final JobID jobId, final JobMasterId oldJobMasterId) {
runAsync(
new Runnable() {
@Override
public void run() {
ResourceManager.this.jobLeaderLostLeadership(jobId, oldJobMasterId);
}
});
}
@Override
public void notifyJobTimeout(final JobID jobId, final UUID timeoutId) {
runAsync(
new Runnable() {
@Override
public void run() {
if (jobLeaderIdService.isValidTimeout(jobId, timeoutId)) {
removeJob(
jobId,
new Exception(
"Job " + jobId + "was removed because of timeout"));
}
}
});
}
@Override
public void handleError(Throwable error) {
onFatalError(error);
}
}
private class TaskManagerHeartbeatListener
implements HeartbeatListener<TaskExecutorHeartbeatPayload, Void> {
@Override
public void notifyHeartbeatTimeout(final ResourceID resourceID) {
final String message =
String.format(
"The heartbeat of TaskManager with id %s timed out.",
resourceID.getStringWithMetadata());
log.info(message);
handleTaskManagerConnectionLoss(resourceID, new TimeoutException(message));
}
private void handleTaskManagerConnectionLoss(ResourceID resourceID, Exception cause) {
validateRunsInMainThread();
closeTaskManagerConnection(resourceID, cause);
}
@Override
public void notifyTargetUnreachable(ResourceID resourceID) {
final String message =
String.format(
"TaskManager with id %s is no longer reachable.",
resourceID.getStringWithMetadata());
log.info(message);
handleTaskManagerConnectionLoss(resourceID, new ResourceManagerException(message));
}
@Override
public void reportPayload(
final ResourceID resourceID, final TaskExecutorHeartbeatPayload payload) {
validateRunsInMainThread();
final WorkerRegistration<WorkerType> workerRegistration = taskExecutors.get(resourceID);
if (workerRegistration == null) {
log.debug(
"Received slot report from TaskManager {} which is no longer registered.",
resourceID.getStringWithMetadata());
} else {
InstanceID instanceId = workerRegistration.getInstanceID();
slotManager.reportSlotStatus(instanceId, payload.getSlotReport());
clusterPartitionTracker.processTaskExecutorClusterPartitionReport(
resourceID, payload.getClusterPartitionReport());
}
}
@Override
public Void retrievePayload(ResourceID resourceID) {
return null;
}
}
private class JobManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
@Override
public void notifyHeartbeatTimeout(final ResourceID resourceID) {
final String message =
String.format(
"The heartbeat of JobManager with id %s timed out.",
resourceID.getStringWithMetadata());
log.info(message);
handleJobManagerConnectionLoss(resourceID, new TimeoutException(message));
}
private void handleJobManagerConnectionLoss(ResourceID resourceID, Exception cause) {
validateRunsInMainThread();
if (jmResourceIdRegistrations.containsKey(resourceID)) {
JobManagerRegistration jobManagerRegistration =
jmResourceIdRegistrations.get(resourceID);
if (jobManagerRegistration != null) {
closeJobManagerConnection(
jobManagerRegistration.getJobID(),
ResourceRequirementHandling.RETAIN,
cause);
}
}
}
@Override
public void notifyTargetUnreachable(ResourceID resourceID) {
final String message =
String.format(
"JobManager with id %s is no longer reachable.",
resourceID.getStringWithMetadata());
log.info(message);
handleJobManagerConnectionLoss(resourceID, new ResourceManagerException(message));
}
@Override
public void reportPayload(ResourceID resourceID, Void payload) {
// nothing to do since there is no payload
}
@Override
public Void retrievePayload(ResourceID resourceID) {
return null;
}
}
// ------------------------------------------------------------------------
// Resource Management
// ------------------------------------------------------------------------
protected int getNumberRequiredTaskManagers() {
return getRequiredResources().values().stream().reduce(0, Integer::sum);
}
protected Map<WorkerResourceSpec, Integer> getRequiredResources() {
return slotManager.getRequiredResources();
}
}