| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.flink.runtime.scheduler; |
| |
| import org.apache.flink.api.common.JobID; |
| import org.apache.flink.api.common.JobStatus; |
| import org.apache.flink.api.common.restartstrategy.RestartStrategies; |
| import org.apache.flink.api.common.time.Time; |
| import org.apache.flink.configuration.CheckpointingOptions; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.core.io.InputSplit; |
| import org.apache.flink.queryablestate.KvStateID; |
| import org.apache.flink.runtime.JobException; |
| import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; |
| import org.apache.flink.runtime.blob.BlobWriter; |
| import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; |
| import org.apache.flink.runtime.checkpoint.CheckpointException; |
| import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; |
| import org.apache.flink.runtime.checkpoint.CheckpointMetrics; |
| import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; |
| import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; |
| import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; |
| import org.apache.flink.runtime.client.JobExecutionException; |
| import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; |
| import org.apache.flink.runtime.concurrent.FutureUtils; |
| import org.apache.flink.runtime.execution.ExecutionState; |
| import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; |
| import org.apache.flink.runtime.executiongraph.Execution; |
| import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; |
| import org.apache.flink.runtime.executiongraph.ExecutionGraph; |
| import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; |
| import org.apache.flink.runtime.executiongraph.ExecutionGraphException; |
| import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; |
| import org.apache.flink.runtime.executiongraph.ExecutionVertex; |
| import org.apache.flink.runtime.executiongraph.IntermediateResult; |
| import org.apache.flink.runtime.executiongraph.JobStatusListener; |
| import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; |
| import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader; |
| import org.apache.flink.runtime.executiongraph.failover.NoOpFailoverStrategy; |
| import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology; |
| import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker; |
| import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; |
| import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; |
| import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving; |
| import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; |
| import org.apache.flink.runtime.io.network.partition.ResultPartitionID; |
| import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; |
| import org.apache.flink.runtime.jobgraph.JobGraph; |
| import org.apache.flink.runtime.jobgraph.JobVertexID; |
| import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; |
| import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; |
| import org.apache.flink.runtime.jobmaster.SerializedInputSplit; |
| import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; |
| import org.apache.flink.runtime.messages.FlinkJobNotFoundException; |
| import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; |
| import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; |
| import org.apache.flink.runtime.messages.webmonitor.JobDetails; |
| import org.apache.flink.runtime.metrics.MetricNames; |
| import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; |
| import org.apache.flink.runtime.query.KvStateLocation; |
| import org.apache.flink.runtime.query.KvStateLocationRegistry; |
| import org.apache.flink.runtime.query.UnknownKvStateLocation; |
| import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; |
| import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; |
| import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; |
| import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; |
| import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; |
| import org.apache.flink.runtime.shuffle.ShuffleMaster; |
| import org.apache.flink.runtime.state.KeyGroupRange; |
| import org.apache.flink.runtime.taskmanager.TaskExecutionState; |
| import org.apache.flink.runtime.taskmanager.TaskManagerLocation; |
| import org.apache.flink.runtime.webmonitor.WebMonitorUtils; |
| import org.apache.flink.util.FlinkException; |
| import org.apache.flink.util.InstantiationUtil; |
| import org.apache.flink.util.IterableUtils; |
| import org.apache.flink.util.function.FunctionUtils; |
| |
| import org.slf4j.Logger; |
| |
| import javax.annotation.Nullable; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.stream.Collectors; |
| |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| import static org.apache.flink.util.Preconditions.checkState; |
| |
| /** |
| * Base class which can be used to implement {@link SchedulerNG}. |
| */ |
| public abstract class SchedulerBase implements SchedulerNG { |
| |
| private final Logger log; |
| |
| private final JobGraph jobGraph; |
| |
| private final ExecutionGraph executionGraph; |
| |
| private final SchedulingTopology<?, ?> schedulingTopology; |
| |
| private final FailoverTopology<?, ?> failoverTopology; |
| |
| private final InputsLocationsRetriever inputsLocationsRetriever; |
| |
| private final BackPressureStatsTracker backPressureStatsTracker; |
| |
| private final Executor ioExecutor; |
| |
| private final Configuration jobMasterConfiguration; |
| |
| private final SlotProvider slotProvider; |
| |
| private final ScheduledExecutorService futureExecutor; |
| |
| private final ClassLoader userCodeLoader; |
| |
| private final CheckpointRecoveryFactory checkpointRecoveryFactory; |
| |
| private final Time rpcTimeout; |
| |
| private final RestartStrategy restartStrategy; |
| |
| private final BlobWriter blobWriter; |
| |
| private final JobManagerJobMetricGroup jobManagerJobMetricGroup; |
| |
| private final Time slotRequestTimeout; |
| |
| private final boolean legacyScheduling; |
| |
| protected final ExecutionVertexVersioner executionVertexVersioner; |
| |
| private ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor( |
| "SchedulerBase is not initialized with proper main thread executor. " + |
| "Call to SchedulerBase.setMainThreadExecutor(...) required."); |
| |
| public SchedulerBase( |
| final Logger log, |
| final JobGraph jobGraph, |
| final BackPressureStatsTracker backPressureStatsTracker, |
| final Executor ioExecutor, |
| final Configuration jobMasterConfiguration, |
| final SlotProvider slotProvider, |
| final ScheduledExecutorService futureExecutor, |
| final ClassLoader userCodeLoader, |
| final CheckpointRecoveryFactory checkpointRecoveryFactory, |
| final Time rpcTimeout, |
| final RestartStrategyFactory restartStrategyFactory, |
| final BlobWriter blobWriter, |
| final JobManagerJobMetricGroup jobManagerJobMetricGroup, |
| final Time slotRequestTimeout, |
| final ShuffleMaster<?> shuffleMaster, |
| final JobMasterPartitionTracker partitionTracker, |
| final ExecutionVertexVersioner executionVertexVersioner, |
| final boolean legacyScheduling) throws Exception { |
| |
| this.log = checkNotNull(log); |
| this.jobGraph = checkNotNull(jobGraph); |
| this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker); |
| this.ioExecutor = checkNotNull(ioExecutor); |
| this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration); |
| this.slotProvider = checkNotNull(slotProvider); |
| this.futureExecutor = checkNotNull(futureExecutor); |
| this.userCodeLoader = checkNotNull(userCodeLoader); |
| this.checkpointRecoveryFactory = checkNotNull(checkpointRecoveryFactory); |
| this.rpcTimeout = checkNotNull(rpcTimeout); |
| |
| final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = |
| jobGraph.getSerializedExecutionConfig() |
| .deserializeValue(userCodeLoader) |
| .getRestartStrategy(); |
| |
| this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration, |
| restartStrategyFactory, |
| jobGraph.isCheckpointingEnabled()); |
| |
| if (legacyScheduling) { |
| log.info("Using restart strategy {} for {} ({}).", this.restartStrategy, jobGraph.getName(), jobGraph.getJobID()); |
| } |
| |
| this.blobWriter = checkNotNull(blobWriter); |
| this.jobManagerJobMetricGroup = checkNotNull(jobManagerJobMetricGroup); |
| this.slotRequestTimeout = checkNotNull(slotRequestTimeout); |
| this.executionVertexVersioner = checkNotNull(executionVertexVersioner); |
| this.legacyScheduling = legacyScheduling; |
| |
| this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker)); |
| this.schedulingTopology = executionGraph.getSchedulingTopology(); |
| this.failoverTopology = executionGraph.getFailoverTopology(); |
| |
| this.inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph); |
| } |
| |
| private ExecutionGraph createAndRestoreExecutionGraph( |
| JobManagerJobMetricGroup currentJobManagerJobMetricGroup, |
| ShuffleMaster<?> shuffleMaster, |
| JobMasterPartitionTracker partitionTracker) throws Exception { |
| |
| ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker); |
| |
| final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator(); |
| |
| if (checkpointCoordinator != null) { |
| // check whether we find a valid checkpoint |
| if (!checkpointCoordinator.restoreLatestCheckpointedState( |
| new HashSet<>(newExecutionGraph.getAllVertices().values()), |
| false, |
| false)) { |
| |
| // check whether we can restore from a savepoint |
| tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings()); |
| } |
| } |
| |
| return newExecutionGraph; |
| } |
| |
| private ExecutionGraph createExecutionGraph( |
| JobManagerJobMetricGroup currentJobManagerJobMetricGroup, |
| ShuffleMaster<?> shuffleMaster, |
| final JobMasterPartitionTracker partitionTracker) throws JobExecutionException, JobException { |
| |
| final FailoverStrategy.Factory failoverStrategy = legacyScheduling ? |
| FailoverStrategyLoader.loadFailoverStrategy(jobMasterConfiguration, log) : |
| new NoOpFailoverStrategy.Factory(); |
| |
| return ExecutionGraphBuilder.buildGraph( |
| null, |
| jobGraph, |
| jobMasterConfiguration, |
| futureExecutor, |
| ioExecutor, |
| slotProvider, |
| userCodeLoader, |
| checkpointRecoveryFactory, |
| rpcTimeout, |
| restartStrategy, |
| currentJobManagerJobMetricGroup, |
| blobWriter, |
| slotRequestTimeout, |
| log, |
| shuffleMaster, |
| partitionTracker, |
| failoverStrategy); |
| } |
| |
| /** |
| * @deprecated Direct access to the execution graph by scheduler implementations is discouraged |
| * because currently the execution graph has various features and responsibilities that a |
| * scheduler should not be concerned about. The following specialized abstractions to the |
| * execution graph and accessors should be preferred over direct access: |
| * <ul> |
| * <li>{@link #getSchedulingTopology()} |
| * <li>{@link #getFailoverTopology()} |
| * <li>{@link #getInputsLocationsRetriever()} |
| * <li>{@link #getExecutionVertex(ExecutionVertexID)} |
| * <li>{@link #getExecutionVertexId(ExecutionAttemptID)} |
| * <li>{@link #getExecutionVertexIdOrThrow(ExecutionAttemptID)} |
| * </ul> |
| * Currently, only {@link LegacyScheduler} requires direct access to the execution graph. |
| */ |
| @Deprecated |
| protected ExecutionGraph getExecutionGraph() { |
| return executionGraph; |
| } |
| |
| /** |
| * Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}. |
| * |
| * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored |
| * @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about the savepoint to restore from |
| * @throws Exception if the {@link ExecutionGraph} could not be restored |
| */ |
| private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception { |
| if (savepointRestoreSettings.restoreSavepoint()) { |
| final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator(); |
| if (checkpointCoordinator != null) { |
| checkpointCoordinator.restoreSavepoint( |
| savepointRestoreSettings.getRestorePath(), |
| savepointRestoreSettings.allowNonRestoredState(), |
| executionGraphToRestore.getAllVertices(), |
| userCodeLoader); |
| } |
| } |
| } |
| |
| protected void resetForNewExecutions(final Collection<ExecutionVertexID> vertices) { |
| vertices.forEach(executionVertexId -> getExecutionVertex(executionVertexId) |
| .resetForNewExecution()); |
| } |
| |
| protected void restoreState(final Set<ExecutionVertexID> vertices) throws Exception { |
| // if there is checkpointed state, reload it into the executions |
| if (executionGraph.getCheckpointCoordinator() != null) { |
| // abort pending checkpoints to |
| // i) enable new checkpoint triggering without waiting for last checkpoint expired. |
| // ii) ensure the EXACTLY_ONCE semantics if needed. |
| executionGraph.getCheckpointCoordinator().abortPendingCheckpoints( |
| new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION)); |
| |
| executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState( |
| getInvolvedExecutionJobVertices(vertices), |
| false, |
| true); |
| } |
| } |
| |
| private Set<ExecutionJobVertex> getInvolvedExecutionJobVertices( |
| final Set<ExecutionVertexID> executionVertices) { |
| |
| final Set<ExecutionJobVertex> tasks = new HashSet<>(); |
| for (ExecutionVertexID executionVertexID : executionVertices) { |
| final ExecutionVertex executionVertex = getExecutionVertex(executionVertexID); |
| tasks.add(executionVertex.getJobVertex()); |
| } |
| return tasks; |
| } |
| |
| protected void transitionToScheduled(final List<ExecutionVertexID> verticesToDeploy) { |
| verticesToDeploy.forEach(executionVertexId -> getExecutionVertex(executionVertexId) |
| .getCurrentExecutionAttempt() |
| .transitionState(ExecutionState.SCHEDULED)); |
| } |
| |
| protected void setGlobalFailureCause(@Nullable final Throwable cause) { |
| if (cause != null) { |
| getExecutionGraph().initFailureCause(cause); |
| } |
| } |
| |
| protected ComponentMainThreadExecutor getMainThreadExecutor() { |
| return mainThreadExecutor; |
| } |
| |
| protected void failJob(Throwable cause) { |
| incrementVersionsOfAllVertices(); |
| executionGraph.failJob(cause); |
| } |
| |
| protected final FailoverTopology<?, ?> getFailoverTopology() { |
| return failoverTopology; |
| } |
| |
| protected final SchedulingTopology<?, ?> getSchedulingTopology() { |
| return schedulingTopology; |
| } |
| |
| protected final ResultPartitionAvailabilityChecker getResultPartitionAvailabilityChecker() { |
| return getExecutionGraph().getResultPartitionAvailabilityChecker(); |
| } |
| |
| protected final InputsLocationsRetriever getInputsLocationsRetriever() { |
| return inputsLocationsRetriever; |
| } |
| |
| protected final void prepareExecutionGraphForNgScheduling() { |
| executionGraph.enableNgScheduling(new UpdateSchedulerNgOnInternalFailuresListener(this, jobGraph.getJobID())); |
| executionGraph.transitionToRunning(); |
| } |
| |
| protected Optional<ExecutionVertexID> getExecutionVertexId(final ExecutionAttemptID executionAttemptId) { |
| return Optional.ofNullable(executionGraph.getRegisteredExecutions().get(executionAttemptId)) |
| .map(this::getExecutionVertexId); |
| } |
| |
| protected ExecutionVertexID getExecutionVertexIdOrThrow(final ExecutionAttemptID executionAttemptId) { |
| return getExecutionVertexId(executionAttemptId) |
| .orElseThrow(() -> new IllegalStateException("Cannot find execution " + executionAttemptId)); |
| } |
| |
| private ExecutionVertexID getExecutionVertexId(final Execution execution) { |
| return execution.getVertex().getID(); |
| } |
| |
| protected ExecutionVertex getExecutionVertex(final ExecutionVertexID executionVertexId) { |
| return executionGraph.getAllVertices().get(executionVertexId.getJobVertexId()).getTaskVertices()[executionVertexId.getSubtaskIndex()]; |
| } |
| |
| protected JobGraph getJobGraph() { |
| return jobGraph; |
| } |
| |
| protected abstract long getNumberOfRestarts(); |
| |
| private Map<ExecutionVertexID, ExecutionVertexVersion> incrementVersionsOfAllVertices() { |
| return executionVertexVersioner.recordVertexModifications( |
| IterableUtils.toStream(schedulingTopology.getVertices()) |
| .map(SchedulingExecutionVertex::getId) |
| .collect(Collectors.toSet())); |
| } |
| |
| protected void transitionExecutionGraphState(final JobStatus current, final JobStatus newState) { |
| executionGraph.transitionState(current, newState); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // SchedulerNG |
| // ------------------------------------------------------------------------ |
| |
| @Override |
| public void setMainThreadExecutor(final ComponentMainThreadExecutor mainThreadExecutor) { |
| this.mainThreadExecutor = checkNotNull(mainThreadExecutor); |
| executionGraph.start(mainThreadExecutor); |
| } |
| |
| @Override |
| public void registerJobStatusListener(final JobStatusListener jobStatusListener) { |
| executionGraph.registerJobStatusListener(jobStatusListener); |
| } |
| |
| @Override |
| public final void startScheduling() { |
| mainThreadExecutor.assertRunningInMainThread(); |
| registerJobMetrics(); |
| startSchedulingInternal(); |
| } |
| |
| private void registerJobMetrics() { |
| jobManagerJobMetricGroup.gauge(MetricNames.NUM_RESTARTS, this::getNumberOfRestarts); |
| jobManagerJobMetricGroup.gauge(MetricNames.FULL_RESTARTS, this::getNumberOfRestarts); |
| } |
| |
| protected abstract void startSchedulingInternal(); |
| |
| @Override |
| public void suspend(Throwable cause) { |
| mainThreadExecutor.assertRunningInMainThread(); |
| |
| incrementVersionsOfAllVertices(); |
| executionGraph.suspend(cause); |
| } |
| |
| @Override |
| public void cancel() { |
| mainThreadExecutor.assertRunningInMainThread(); |
| |
| incrementVersionsOfAllVertices(); |
| executionGraph.cancel(); |
| } |
| |
| @Override |
| public CompletableFuture<Void> getTerminationFuture() { |
| return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn()); |
| } |
| |
| @Override |
| public final boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { |
| final Optional<ExecutionVertexID> executionVertexId = getExecutionVertexId(taskExecutionState.getID()); |
| |
| boolean updateSuccess = executionGraph.updateState(taskExecutionState); |
| |
| if (updateSuccess) { |
| checkState(executionVertexId.isPresent()); |
| |
| if (isNotifiable(executionVertexId.get(), taskExecutionState)) { |
| updateTaskExecutionStateInternal(executionVertexId.get(), taskExecutionState); |
| } |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| private boolean isNotifiable( |
| final ExecutionVertexID executionVertexId, |
| final TaskExecutionState taskExecutionState) { |
| |
| final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId); |
| |
| // only notifies FINISHED and FAILED states which are needed at the moment. |
| // can be refined in FLINK-14233 after the legacy scheduler is removed and |
| // the actions are factored out from ExecutionGraph. |
| switch (taskExecutionState.getExecutionState()) { |
| case FINISHED: |
| case FAILED: |
| // only notifies a state update if it's effective, namely it successfully |
| // turns the execution state to the expected value. |
| if (executionVertex.getExecutionState() == taskExecutionState.getExecutionState()) { |
| return true; |
| } |
| break; |
| default: |
| break; |
| } |
| |
| return false; |
| } |
| |
| protected void updateTaskExecutionStateInternal(final ExecutionVertexID executionVertexId, final TaskExecutionState taskExecutionState) { |
| } |
| |
| @Override |
| public SerializedInputSplit requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException { |
| mainThreadExecutor.assertRunningInMainThread(); |
| |
| final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); |
| if (execution == null) { |
| // can happen when JobManager had already unregistered this execution upon on task failure, |
| // but TaskManager get some delay to aware of that situation |
| if (log.isDebugEnabled()) { |
| log.debug("Can not find Execution for attempt {}.", executionAttempt); |
| } |
| // but we should TaskManager be aware of this |
| throw new IllegalArgumentException("Can not find Execution for attempt " + executionAttempt); |
| } |
| |
| final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); |
| if (vertex == null) { |
| throw new IllegalArgumentException("Cannot find execution vertex for vertex ID " + vertexID); |
| } |
| |
| if (vertex.getSplitAssigner() == null) { |
| throw new IllegalStateException("No InputSplitAssigner for vertex ID " + vertexID); |
| } |
| |
| final InputSplit nextInputSplit = execution.getNextInputSplit(); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Send next input split {}.", nextInputSplit); |
| } |
| |
| try { |
| final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); |
| return new SerializedInputSplit(serializedInputSplit); |
| } catch (Exception ex) { |
| IOException reason = new IOException("Could not serialize the next input split of class " + |
| nextInputSplit.getClass() + ".", ex); |
| vertex.fail(reason); |
| throw reason; |
| } |
| } |
| |
| @Override |
| public ExecutionState requestPartitionState( |
| final IntermediateDataSetID intermediateResultId, |
| final ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException { |
| |
| mainThreadExecutor.assertRunningInMainThread(); |
| |
| final Execution execution = executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId()); |
| if (execution != null) { |
| return execution.getState(); |
| } |
| else { |
| final IntermediateResult intermediateResult = |
| executionGraph.getAllIntermediateResults().get(intermediateResultId); |
| |
| if (intermediateResult != null) { |
| // Try to find the producing execution |
| Execution producerExecution = intermediateResult |
| .getPartitionById(resultPartitionId.getPartitionId()) |
| .getProducer() |
| .getCurrentExecutionAttempt(); |
| |
| if (producerExecution.getAttemptId().equals(resultPartitionId.getProducerId())) { |
| return producerExecution.getState(); |
| } else { |
| throw new PartitionProducerDisposedException(resultPartitionId); |
| } |
| } else { |
| throw new IllegalArgumentException("Intermediate data set with ID " |
| + intermediateResultId + " not found."); |
| } |
| } |
| } |
| |
| @Override |
| public final void scheduleOrUpdateConsumers(final ResultPartitionID partitionId) { |
| mainThreadExecutor.assertRunningInMainThread(); |
| |
| try { |
| executionGraph.scheduleOrUpdateConsumers(partitionId); |
| } catch (ExecutionGraphException e) { |
| throw new RuntimeException(e); |
| } |
| |
| final ExecutionVertexID producerVertexId = getExecutionVertexIdOrThrow(partitionId.getProducerId()); |
| scheduleOrUpdateConsumersInternal(producerVertexId, partitionId); |
| } |
| |
| protected void scheduleOrUpdateConsumersInternal(ExecutionVertexID producerVertexId, ResultPartitionID resultPartitionId) { |
| } |
| |
| @Override |
| public ArchivedExecutionGraph requestJob() { |
| mainThreadExecutor.assertRunningInMainThread(); |
| return ArchivedExecutionGraph.createFrom(executionGraph); |
| } |
| |
| @Override |
| public JobStatus requestJobStatus() { |
| return executionGraph.getState(); |
| } |
| |
| @Override |
| public JobDetails requestJobDetails() { |
| mainThreadExecutor.assertRunningInMainThread(); |
| return WebMonitorUtils.createDetailsForJob(executionGraph); |
| } |
| |
| @Override |
| public KvStateLocation requestKvStateLocation(final JobID jobId, final String registrationName) throws UnknownKvStateLocation, FlinkJobNotFoundException { |
| mainThreadExecutor.assertRunningInMainThread(); |
| |
| // sanity check for the correct JobID |
| if (jobGraph.getJobID().equals(jobId)) { |
| if (log.isDebugEnabled()) { |
| log.debug("Lookup key-value state for job {} with registration " + |
| "name {}.", jobGraph.getJobID(), registrationName); |
| } |
| |
| final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry(); |
| final KvStateLocation location = registry.getKvStateLocation(registrationName); |
| if (location != null) { |
| return location; |
| } else { |
| throw new UnknownKvStateLocation(registrationName); |
| } |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("Request of key-value state location for unknown job {} received.", jobId); |
| } |
| throw new FlinkJobNotFoundException(jobId); |
| } |
| } |
| |
| @Override |
| public void notifyKvStateRegistered(final JobID jobId, final JobVertexID jobVertexId, final KeyGroupRange keyGroupRange, final String registrationName, final KvStateID kvStateId, final InetSocketAddress kvStateServerAddress) throws FlinkJobNotFoundException { |
| mainThreadExecutor.assertRunningInMainThread(); |
| |
| if (jobGraph.getJobID().equals(jobId)) { |
| if (log.isDebugEnabled()) { |
| log.debug("Key value state registered for job {} under name {}.", |
| jobGraph.getJobID(), registrationName); |
| } |
| |
| try { |
| executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered( |
| jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } else { |
| throw new FlinkJobNotFoundException(jobId); |
| } |
| } |
| |
| @Override |
| public void notifyKvStateUnregistered(final JobID jobId, final JobVertexID jobVertexId, final KeyGroupRange keyGroupRange, final String registrationName) throws FlinkJobNotFoundException { |
| mainThreadExecutor.assertRunningInMainThread(); |
| |
| if (jobGraph.getJobID().equals(jobId)) { |
| if (log.isDebugEnabled()) { |
| log.debug("Key value state unregistered for job {} under name {}.", |
| jobGraph.getJobID(), registrationName); |
| } |
| |
| try { |
| executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered( |
| jobVertexId, keyGroupRange, registrationName); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } else { |
| throw new FlinkJobNotFoundException(jobId); |
| } |
| } |
| |
| @Override |
| public void updateAccumulators(final AccumulatorSnapshot accumulatorSnapshot) { |
| mainThreadExecutor.assertRunningInMainThread(); |
| |
| executionGraph.updateAccumulators(accumulatorSnapshot); |
| } |
| |
| @Override |
| public Optional<OperatorBackPressureStats> requestOperatorBackPressureStats(final JobVertexID jobVertexId) throws FlinkException { |
| final ExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexId); |
| if (jobVertex == null) { |
| throw new FlinkException("JobVertexID not found " + |
| jobVertexId); |
| } |
| |
| return backPressureStatsTracker.getOperatorBackPressureStats(jobVertex); |
| } |
| |
| @Override |
| public CompletableFuture<String> triggerSavepoint(final String targetDirectory, final boolean cancelJob) { |
| mainThreadExecutor.assertRunningInMainThread(); |
| |
| final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); |
| if (checkpointCoordinator == null) { |
| throw new IllegalStateException( |
| String.format("Job %s is not a streaming job.", jobGraph.getJobID())); |
| } else if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) { |
| log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID()); |
| |
| throw new IllegalStateException( |
| "No savepoint directory configured. You can either specify a directory " + |
| "while cancelling via -s :targetDirectory or configure a cluster-wide " + |
| "default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."); |
| } |
| |
| if (cancelJob) { |
| checkpointCoordinator.stopCheckpointScheduler(); |
| } |
| |
| return checkpointCoordinator |
| .triggerSavepoint(System.currentTimeMillis(), targetDirectory) |
| .thenApply(CompletedCheckpoint::getExternalPointer) |
| .handleAsync((path, throwable) -> { |
| if (throwable != null) { |
| if (cancelJob) { |
| startCheckpointScheduler(checkpointCoordinator); |
| } |
| throw new CompletionException(throwable); |
| } else if (cancelJob) { |
| log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID()); |
| cancel(); |
| } |
| return path; |
| }, mainThreadExecutor); |
| } |
| |
| private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) { |
| mainThreadExecutor.assertRunningInMainThread(); |
| |
| if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) { |
| try { |
| checkpointCoordinator.startCheckpointScheduler(); |
| } catch (IllegalStateException ignored) { |
| // Concurrent shut down of the coordinator |
| } |
| } |
| } |
| |
| @Override |
| public void acknowledgeCheckpoint(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) { |
| mainThreadExecutor.assertRunningInMainThread(); |
| |
| final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); |
| final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint( |
| jobID, |
| executionAttemptID, |
| checkpointId, |
| checkpointMetrics, |
| checkpointState); |
| |
| final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID); |
| |
| if (checkpointCoordinator != null) { |
| ioExecutor.execute(() -> { |
| try { |
| checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo); |
| } catch (Throwable t) { |
| log.warn("Error while processing checkpoint acknowledgement message", t); |
| } |
| }); |
| } else { |
| String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator"; |
| if (executionGraph.getState() == JobStatus.RUNNING) { |
| log.error(errorMessage, jobGraph.getJobID()); |
| } else { |
| log.debug(errorMessage, jobGraph.getJobID()); |
| } |
| } |
| } |
| |
| @Override |
| public void declineCheckpoint(final DeclineCheckpoint decline) { |
| mainThreadExecutor.assertRunningInMainThread(); |
| |
| final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); |
| final String taskManagerLocationInfo = retrieveTaskManagerLocation(decline.getTaskExecutionId()); |
| |
| if (checkpointCoordinator != null) { |
| ioExecutor.execute(() -> { |
| try { |
| checkpointCoordinator.receiveDeclineMessage(decline, taskManagerLocationInfo); |
| } catch (Exception e) { |
| log.error("Error in CheckpointCoordinator while processing {}", decline, e); |
| } |
| }); |
| } else { |
| String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator"; |
| if (executionGraph.getState() == JobStatus.RUNNING) { |
| log.error(errorMessage, jobGraph.getJobID()); |
| } else { |
| log.debug(errorMessage, jobGraph.getJobID()); |
| } |
| } |
| } |
| |
| @Override |
| public CompletableFuture<String> stopWithSavepoint(final String targetDirectory, final boolean advanceToEndOfEventTime) { |
| mainThreadExecutor.assertRunningInMainThread(); |
| |
| final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); |
| |
| if (checkpointCoordinator == null) { |
| return FutureUtils.completedExceptionally(new IllegalStateException( |
| String.format("Job %s is not a streaming job.", jobGraph.getJobID()))); |
| } |
| |
| if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) { |
| log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID()); |
| |
| return FutureUtils.completedExceptionally(new IllegalStateException( |
| "No savepoint directory configured. You can either specify a directory " + |
| "while cancelling via -s :targetDirectory or configure a cluster-wide " + |
| "default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.")); |
| } |
| |
| // we stop the checkpoint coordinator so that we are guaranteed |
| // to have only the data of the synchronous savepoint committed. |
| // in case of failure, and if the job restarts, the coordinator |
| // will be restarted by the CheckpointCoordinatorDeActivator. |
| checkpointCoordinator.stopCheckpointScheduler(); |
| |
| final long now = System.currentTimeMillis(); |
| final CompletableFuture<String> savepointFuture = checkpointCoordinator |
| .triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory) |
| .thenApply(CompletedCheckpoint::getExternalPointer); |
| |
| final CompletableFuture<JobStatus> terminationFuture = executionGraph |
| .getTerminationFuture() |
| .handle((jobstatus, throwable) -> { |
| |
| if (throwable != null) { |
| log.info("Failed during stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), throwable.getMessage()); |
| throw new CompletionException(throwable); |
| } else if (jobstatus != JobStatus.FINISHED) { |
| log.info("Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", jobGraph.getJobID(), jobstatus); |
| throw new CompletionException(new FlinkException("Reached state " + jobstatus + " instead of FINISHED.")); |
| } |
| return jobstatus; |
| }); |
| |
| return savepointFuture.thenCompose((path) -> |
| terminationFuture.thenApply((jobStatus -> path))) |
| .handleAsync((path, throwable) -> { |
| if (throwable != null) { |
| // restart the checkpoint coordinator if stopWithSavepoint failed. |
| startCheckpointScheduler(checkpointCoordinator); |
| throw new CompletionException(throwable); |
| } |
| |
| return path; |
| }, mainThreadExecutor); |
| } |
| |
| private String retrieveTaskManagerLocation(ExecutionAttemptID executionAttemptID) { |
| final Optional<Execution> currentExecution = Optional.ofNullable(executionGraph.getRegisteredExecutions().get(executionAttemptID)); |
| |
| return currentExecution |
| .map(Execution::getAssignedResourceLocation) |
| .map(TaskManagerLocation::toString) |
| .orElse("Unknown location"); |
| } |
| |
| } |