blob: b087ecd7292e4b0fc48d8d4302ea8224a08b7ef0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.client.JobInitializationException;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
/**
* Default {@link JobMasterServiceProcess} which is responsible for creating and running a {@link
* JobMasterService}. The process is responsible for receiving the signals from the {@link
* JobMasterService} and to create the respective {@link JobManagerRunnerResult} from it.
*
* <p>The {@link JobMasterService} can be created asynchronously and the creation can also fail.
* That is why the process needs to observe the creation operation and complete the {@link
* #resultFuture} with an initialization failure.
*
* <p>The {@link #resultFuture} can be completed with the following values:
*
* <ul>
* <li>{@link JobManagerRunnerResult} to signal an initialization failure of the {@link
* JobMasterService} or the completion of a job
* <li>{@link JobNotFinishedException} to signal that the job has not been completed by the {@link
* JobMasterService}
* <li>{@link Exception} to signal an unexpected failure
* </ul>
*/
public class DefaultJobMasterServiceProcess
implements JobMasterServiceProcess, OnCompletionActions {
private static final Logger LOG = LoggerFactory.getLogger(DefaultJobMasterServiceProcess.class);
private final Object lock = new Object();
private final JobID jobId;
private final UUID leaderSessionId;
private final CompletableFuture<JobMasterService> jobMasterServiceFuture;
private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
private final CompletableFuture<JobManagerRunnerResult> resultFuture =
new CompletableFuture<>();
private final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture =
new CompletableFuture<>();
private final CompletableFuture<String> leaderAddressFuture = new CompletableFuture<>();
@GuardedBy("lock")
private boolean isRunning = true;
public DefaultJobMasterServiceProcess(
JobID jobId,
UUID leaderSessionId,
JobMasterServiceFactory jobMasterServiceFactory,
Function<Throwable, ArchivedExecutionGraph> failedArchivedExecutionGraphFactory) {
this.jobId = jobId;
this.leaderSessionId = leaderSessionId;
this.jobMasterServiceFuture =
jobMasterServiceFactory.createJobMasterService(leaderSessionId, this);
jobMasterServiceFuture.whenComplete(
(jobMasterService, throwable) -> {
if (throwable != null) {
final JobInitializationException jobInitializationException =
new JobInitializationException(
jobId, "Could not start the JobMaster.", throwable);
LOG.debug(
"Initialization of the JobMasterService for job {} under leader id {} failed.",
jobId,
leaderSessionId,
jobInitializationException);
resultFuture.complete(
JobManagerRunnerResult.forInitializationFailure(
new ExecutionGraphInfo(
failedArchivedExecutionGraphFactory.apply(
jobInitializationException)),
jobInitializationException));
} else {
registerJobMasterServiceFutures(jobMasterService);
}
});
}
private void registerJobMasterServiceFutures(JobMasterService jobMasterService) {
LOG.debug(
"Successfully created the JobMasterService for job {} under leader id {}.",
jobId,
leaderSessionId);
jobMasterGatewayFuture.complete(jobMasterService.getGateway());
leaderAddressFuture.complete(jobMasterService.getAddress());
jobMasterService
.getTerminationFuture()
.whenComplete(
(unused, throwable) -> {
synchronized (lock) {
if (isRunning) {
LOG.warn(
"Unexpected termination of the JobMasterService for job {} under leader id {}.",
jobId,
leaderSessionId);
jobMasterFailed(
new FlinkException(
"Unexpected termination of the JobMasterService.",
throwable));
}
}
});
}
@Override
public CompletableFuture<Void> closeAsync() {
synchronized (lock) {
if (isRunning) {
isRunning = false;
LOG.debug(
"Terminating the JobMasterService process for job {} under leader id {}.",
jobId,
leaderSessionId);
resultFuture.completeExceptionally(new JobNotFinishedException(jobId));
jobMasterGatewayFuture.completeExceptionally(
new FlinkException("Process has been closed."));
jobMasterServiceFuture.whenComplete(
(jobMasterService, throwable) -> {
if (throwable != null) {
// JobMasterService creation has failed. Nothing to stop then :-)
terminationFuture.complete(null);
} else {
FutureUtils.forward(
jobMasterService.closeAsync(), terminationFuture);
}
});
terminationFuture.whenComplete(
(unused, throwable) ->
LOG.debug(
"JobMasterService process for job {} under leader id {} has been terminated.",
jobId,
leaderSessionId));
}
}
return terminationFuture;
}
@Override
public boolean isInitializedAndRunning() {
synchronized (lock) {
return jobMasterServiceFuture.isDone()
&& !jobMasterServiceFuture.isCompletedExceptionally()
&& isRunning;
}
}
@Override
public CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture() {
return jobMasterGatewayFuture;
}
@Override
public CompletableFuture<JobManagerRunnerResult> getResultFuture() {
return resultFuture;
}
@Override
public CompletableFuture<String> getLeaderAddressFuture() {
return leaderAddressFuture;
}
@Override
public void jobReachedGloballyTerminalState(ExecutionGraphInfo executionGraphInfo) {
LOG.debug(
"Job {} under leader id {} reached a globally terminal state {}.",
jobId,
leaderSessionId,
executionGraphInfo.getArchivedExecutionGraph().getState());
resultFuture.complete(JobManagerRunnerResult.forSuccess(executionGraphInfo));
}
@Override
public void jobFinishedByOther() {
LOG.debug(
"Job {} under leader id {} has been finished by another process.",
jobId,
leaderSessionId);
resultFuture.completeExceptionally(new JobNotFinishedException(jobId));
}
@Override
public void jobMasterFailed(Throwable cause) {
LOG.debug("Job {} under leader id {} failed.", jobId, leaderSessionId);
resultFuture.completeExceptionally(cause);
}
}