blob: 91d688f91498bc3586b676e8ed2e5a4f35ddea4f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.scheduler.adaptive;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.KvStateHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
* Abstract state class which contains an {@link ExecutionGraph} and the required handlers to
* execute common operations.
*/
abstract class StateWithExecutionGraph implements State {
private final Context context;
private final ExecutionGraph executionGraph;
private final ExecutionGraphHandler executionGraphHandler;
private final OperatorCoordinatorHandler operatorCoordinatorHandler;
private final KvStateHandler kvStateHandler;
private final Logger logger;
StateWithExecutionGraph(
Context context,
ExecutionGraph executionGraph,
ExecutionGraphHandler executionGraphHandler,
OperatorCoordinatorHandler operatorCoordinatorHandler,
Logger logger) {
this.context = context;
this.executionGraph = executionGraph;
this.executionGraphHandler = executionGraphHandler;
this.operatorCoordinatorHandler = operatorCoordinatorHandler;
this.kvStateHandler = new KvStateHandler(executionGraph);
this.logger = logger;
FutureUtils.assertNoException(
executionGraph
.getTerminationFuture()
.thenAcceptAsync(
jobStatus -> {
if (jobStatus.isGloballyTerminalState()) {
context.runIfState(
this, () -> onGloballyTerminalState(jobStatus));
}
},
context.getMainThreadExecutor()));
}
@VisibleForTesting
ExecutionGraph getExecutionGraph() {
return executionGraph;
}
JobID getJobId() {
return executionGraph.getJobID();
}
protected OperatorCoordinatorHandler getOperatorCoordinatorHandler() {
return operatorCoordinatorHandler;
}
protected ExecutionGraphHandler getExecutionGraphHandler() {
return executionGraphHandler;
}
@Override
public void onLeave(Class<? extends State> newState) {
if (!StateWithExecutionGraph.class.isAssignableFrom(newState)) {
// we are leaving the StateWithExecutionGraph --> we need to dispose temporary services
operatorCoordinatorHandler.disposeAllOperatorCoordinators();
}
}
@Override
public ArchivedExecutionGraph getJob() {
return ArchivedExecutionGraph.createFrom(executionGraph, getJobStatus());
}
@Override
public void suspend(Throwable cause) {
executionGraph.suspend(cause);
Preconditions.checkState(executionGraph.getState().isTerminalState());
context.goToFinished(ArchivedExecutionGraph.createFrom(executionGraph));
}
@Override
public Logger getLogger() {
return logger;
}
void notifyPartitionDataAvailable(ResultPartitionID partitionID) {
executionGraph.notifyPartitionDataAvailable(partitionID);
}
SerializedInputSplit requestNextInputSplit(
JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException {
return executionGraphHandler.requestNextInputSplit(vertexID, executionAttempt);
}
ExecutionState requestPartitionState(
IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId)
throws PartitionProducerDisposedException {
return executionGraphHandler.requestPartitionState(intermediateResultId, resultPartitionId);
}
void acknowledgeCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot checkpointState) {
executionGraphHandler.acknowledgeCheckpoint(
jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
}
void declineCheckpoint(DeclineCheckpoint decline) {
executionGraphHandler.declineCheckpoint(decline);
}
void reportCheckpointMetrics(
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics) {
executionGraphHandler.reportCheckpointMetrics(
executionAttemptID, checkpointId, checkpointMetrics);
}
void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
executionGraph.updateAccumulators(accumulatorSnapshot);
}
KvStateLocation requestKvStateLocation(JobID jobId, String registrationName)
throws FlinkJobNotFoundException, UnknownKvStateLocation {
return kvStateHandler.requestKvStateLocation(jobId, registrationName);
}
void notifyKvStateRegistered(
JobID jobId,
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName,
KvStateID kvStateId,
InetSocketAddress kvStateServerAddress)
throws FlinkJobNotFoundException {
kvStateHandler.notifyKvStateRegistered(
jobId,
jobVertexId,
keyGroupRange,
registrationName,
kvStateId,
kvStateServerAddress);
}
void notifyKvStateUnregistered(
JobID jobId,
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName)
throws FlinkJobNotFoundException {
kvStateHandler.notifyKvStateUnregistered(
jobId, jobVertexId, keyGroupRange, registrationName);
}
CompletableFuture<String> triggerSavepoint(String targetDirectory, boolean cancelJob) {
final CheckpointCoordinator checkpointCoordinator =
executionGraph.getCheckpointCoordinator();
if (checkpointCoordinator == null) {
throw new IllegalStateException(
String.format("Job %s is not a streaming job.", executionGraph.getJobID()));
} else if (targetDirectory == null
&& !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
logger.info(
"Trying to cancel job {} with savepoint, but no savepoint directory configured.",
executionGraph.getJobID());
throw new IllegalStateException(
"No savepoint directory configured. You can either specify a directory "
+ "while cancelling via -s :targetDirectory or configure a cluster-wide "
+ "default via key '"
+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key()
+ "'.");
}
logger.info(
"Triggering {}savepoint for job {}.",
cancelJob ? "cancel-with-" : "",
executionGraph.getJobID());
if (cancelJob) {
checkpointCoordinator.stopCheckpointScheduler();
}
return checkpointCoordinator
.triggerSavepoint(targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer)
.handleAsync(
(path, throwable) -> {
if (throwable != null) {
if (cancelJob && context.isState(this)) {
startCheckpointScheduler(checkpointCoordinator);
}
throw new CompletionException(throwable);
} else if (cancelJob && context.isState(this)) {
logger.info(
"Savepoint stored in {}. Now cancelling {}.",
path,
executionGraph.getJobID());
cancel();
}
return path;
},
context.getMainThreadExecutor());
}
private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) {
if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
try {
checkpointCoordinator.startCheckpointScheduler();
} catch (IllegalStateException ignored) {
// Concurrent shut down of the coordinator
}
}
}
void deliverOperatorEventToCoordinator(
ExecutionAttemptID taskExecutionId, OperatorID operatorId, OperatorEvent evt)
throws FlinkException {
operatorCoordinatorHandler.deliverOperatorEventToCoordinator(
taskExecutionId, operatorId, evt);
}
CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
OperatorID operatorId, CoordinationRequest request) throws FlinkException {
return operatorCoordinatorHandler.deliverCoordinationRequestToCoordinator(
operatorId, request);
}
/**
* Updates the execution graph with the given task execution state transition.
*
* @param taskExecutionStateTransition taskExecutionStateTransition to update the ExecutionGraph
* with
* @return {@code true} if the update was successful; otherwise {@code false}
*/
abstract boolean updateTaskExecutionState(
TaskExecutionStateTransition taskExecutionStateTransition);
/**
* Callback which is called once the execution graph reaches a globally terminal state.
*
* @param globallyTerminalState globally terminal state which the execution graph reached
*/
abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
/** Context of the {@link StateWithExecutionGraph} state. */
interface Context {
/**
* Run the given action if the current state equals the expected state.
*
* @param expectedState expectedState is the expected state
* @param action action to run if the current state equals the expected state
*/
void runIfState(State expectedState, Runnable action);
/**
* Checks whether the current state is the expected state.
*
* @param expectedState expectedState is the expected state
* @return {@code true} if the current state equals the expected state; otherwise {@code
* false}
*/
boolean isState(State expectedState);
/**
* Gets the main thread executor.
*
* @return the main thread executor
*/
Executor getMainThreadExecutor();
/**
* Transitions into the {@link Finished} state.
*
* @param archivedExecutionGraph archivedExecutionGraph which is passed to the {@link
* Finished} state
*/
void goToFinished(ArchivedExecutionGraph archivedExecutionGraph);
}
}