| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.runtime.executiongraph; |
| |
| import org.apache.flink.annotation.VisibleForTesting; |
| import org.apache.flink.api.common.ArchivedExecutionConfig; |
| import org.apache.flink.api.common.ExecutionConfig; |
| import org.apache.flink.api.common.JobID; |
| import org.apache.flink.api.common.JobStatus; |
| import org.apache.flink.api.common.accumulators.Accumulator; |
| import org.apache.flink.api.common.accumulators.AccumulatorHelper; |
| import org.apache.flink.api.common.time.Time; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.metrics.Counter; |
| import org.apache.flink.metrics.SimpleCounter; |
| import org.apache.flink.runtime.JobException; |
| import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; |
| import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; |
| import org.apache.flink.runtime.blob.BlobWriter; |
| import org.apache.flink.runtime.blob.PermanentBlobKey; |
| import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; |
| import org.apache.flink.runtime.checkpoint.CheckpointFailureManager; |
| import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; |
| import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; |
| import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; |
| import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; |
| import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; |
| import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; |
| import org.apache.flink.runtime.concurrent.FutureUtils; |
| import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture; |
| import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; |
| import org.apache.flink.runtime.execution.ExecutionState; |
| import org.apache.flink.runtime.execution.SuppressRestartsException; |
| import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; |
| import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology; |
| import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker; |
| import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy; |
| import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback; |
| import org.apache.flink.runtime.executiongraph.restart.RestartCallback; |
| import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; |
| import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; |
| import org.apache.flink.runtime.io.network.partition.ResultPartitionID; |
| import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; |
| import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; |
| import org.apache.flink.runtime.jobgraph.JobVertex; |
| import org.apache.flink.runtime.jobgraph.JobVertexID; |
| import org.apache.flink.runtime.jobgraph.ScheduleMode; |
| import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; |
| import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; |
| import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; |
| import org.apache.flink.runtime.query.KvStateLocationRegistry; |
| import org.apache.flink.runtime.scheduler.InternalFailuresListener; |
| import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology; |
| import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; |
| import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; |
| import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition; |
| import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; |
| import org.apache.flink.runtime.shuffle.ShuffleMaster; |
| import org.apache.flink.runtime.state.SharedStateRegistry; |
| import org.apache.flink.runtime.state.StateBackend; |
| import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; |
| import org.apache.flink.runtime.taskmanager.TaskExecutionState; |
| import org.apache.flink.types.Either; |
| import org.apache.flink.util.ExceptionUtils; |
| import org.apache.flink.util.FlinkException; |
| import org.apache.flink.util.OptionalFailure; |
| import org.apache.flink.util.Preconditions; |
| import org.apache.flink.util.SerializedThrowable; |
| import org.apache.flink.util.SerializedValue; |
| import org.apache.flink.util.StringUtils; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.annotation.Nonnull; |
| import javax.annotation.Nullable; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.Optional; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.stream.Collectors; |
| |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| import static org.apache.flink.util.Preconditions.checkState; |
| |
| /** |
| * The execution graph is the central data structure that coordinates the distributed |
| * execution of a data flow. It keeps representations of each parallel task, each |
| * intermediate stream, and the communication between them. |
| * |
| * <p>The execution graph consists of the following constructs: |
| * <ul> |
| * <li>The {@link ExecutionJobVertex} represents one vertex from the JobGraph (usually one operation like |
| * "map" or "join") during execution. It holds the aggregated state of all parallel subtasks. |
| * The ExecutionJobVertex is identified inside the graph by the {@link JobVertexID}, which it takes |
| * from the JobGraph's corresponding JobVertex.</li> |
| * <li>The {@link ExecutionVertex} represents one parallel subtask. For each ExecutionJobVertex, there are |
| * as many ExecutionVertices as the parallelism. The ExecutionVertex is identified by |
| * the ExecutionJobVertex and the index of the parallel subtask</li> |
| * <li>The {@link Execution} is one attempt to execute a ExecutionVertex. There may be multiple Executions |
| * for the ExecutionVertex, in case of a failure, or in the case where some data needs to be recomputed |
| * because it is no longer available when requested by later operations. An Execution is always |
| * identified by an {@link ExecutionAttemptID}. All messages between the JobManager and the TaskManager |
| * about deployment of tasks and updates in the task status always use the ExecutionAttemptID to |
| * address the message receiver.</li> |
| * </ul> |
| * |
| * <h2>Global and local failover</h2> |
| * |
| * <p>The Execution Graph has two failover modes: <i>global failover</i> and <i>local failover</i>. |
| * |
| * <p>A <b>global failover</b> aborts the task executions for all vertices and restarts whole |
| * data flow graph from the last completed checkpoint. Global failover is considered the |
| * "fallback strategy" that is used when a local failover is unsuccessful, or when a issue is |
| * found in the state of the ExecutionGraph that could mark it as inconsistent (caused by a bug). |
| * |
| * <p>A <b>local failover</b> is triggered when an individual vertex execution (a task) fails. |
| * The local failover is coordinated by the {@link FailoverStrategy}. A local failover typically |
| * attempts to restart as little as possible, but as much as necessary. |
| * |
| * <p>Between local- and global failover, the global failover always takes precedence, because it |
| * is the core mechanism that the ExecutionGraph relies on to bring back consistency. The |
| * guard that, the ExecutionGraph maintains a <i>global modification version</i>, which is incremented |
| * with every global failover (and other global actions, like job cancellation, or terminal |
| * failure). Local failover is always scoped by the modification version that the execution graph |
| * had when the failover was triggered. If a new global modification version is reached during |
| * local failover (meaning there is a concurrent global failover), the failover strategy has to |
| * yield before the global failover. |
| */ |
| public class ExecutionGraph implements AccessExecutionGraph { |
| |
| /** The log object used for debugging. */ |
| static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class); |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| /** Job specific information like the job id, job name, job configuration, etc. */ |
| private final JobInformation jobInformation; |
| |
| /** Serialized job information or a blob key pointing to the offloaded job information. */ |
| private final Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey; |
| |
| /** The executor which is used to execute futures. */ |
| private final ScheduledExecutorService futureExecutor; |
| |
| /** The executor which is used to execute blocking io operations. */ |
| private final Executor ioExecutor; |
| |
| /** Executor that runs tasks in the job manager's main thread. */ |
| @Nonnull |
| private ComponentMainThreadExecutor jobMasterMainThreadExecutor; |
| |
| /** {@code true} if all source tasks are stoppable. */ |
| private boolean isStoppable = true; |
| |
| /** All job vertices that are part of this graph. */ |
| private final Map<JobVertexID, ExecutionJobVertex> tasks; |
| |
| /** All vertices, in the order in which they were created. **/ |
| private final List<ExecutionJobVertex> verticesInCreationOrder; |
| |
| /** All intermediate results that are part of this graph. */ |
| private final Map<IntermediateDataSetID, IntermediateResult> intermediateResults; |
| |
| /** The currently executed tasks, for callbacks. */ |
| private final Map<ExecutionAttemptID, Execution> currentExecutions; |
| |
| /** Listeners that receive messages when the entire job switches it status |
| * (such as from RUNNING to FINISHED). */ |
| private final List<JobStatusListener> jobStatusListeners; |
| |
| /** The implementation that decides how to recover the failures of tasks. */ |
| private final FailoverStrategy failoverStrategy; |
| |
| /** Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when |
| * the execution graph transitioned into a certain state. The index into this array is the |
| * ordinal of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is |
| * at {@code stateTimestamps[RUNNING.ordinal()]}. */ |
| private final long[] stateTimestamps; |
| |
| /** The timeout for all messages that require a response/acknowledgement. */ |
| private final Time rpcTimeout; |
| |
| /** The timeout for slot allocations. */ |
| private final Time allocationTimeout; |
| |
| /** Strategy to use for restarts. */ |
| private final RestartStrategy restartStrategy; |
| |
| /** The slot provider strategy to use for allocating slots for tasks as they are needed. */ |
| private final SlotProviderStrategy slotProviderStrategy; |
| |
| /** The classloader for the user code. Needed for calls into user code classes. */ |
| private final ClassLoader userClassLoader; |
| |
| /** Registered KvState instances reported by the TaskManagers. */ |
| private final KvStateLocationRegistry kvStateLocationRegistry; |
| |
| /** Blob writer used to offload RPC messages. */ |
| private final BlobWriter blobWriter; |
| |
| private boolean legacyScheduling = true; |
| |
| /** The total number of vertices currently in the execution graph. */ |
| private int numVerticesTotal; |
| |
| private final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory; |
| |
| private PartitionReleaseStrategy partitionReleaseStrategy; |
| |
| private DefaultExecutionTopology executionTopology; |
| |
| @Nullable |
| private InternalFailuresListener internalTaskFailuresListener; |
| |
| /** Counts all restarts. Used by other Gauges/Meters and does not register to metric group. */ |
| private final Counter numberOfRestartsCounter = new SimpleCounter(); |
| |
| // ------ Configuration of the Execution ------- |
| |
| /** The mode of scheduling. Decides how to select the initial set of tasks to be deployed. |
| * May indicate to deploy all sources, or to deploy everything, or to deploy via backtracking |
| * from results than need to be materialized. */ |
| private final ScheduleMode scheduleMode; |
| |
| /** The maximum number of prior execution attempts kept in history. */ |
| private final int maxPriorAttemptsHistoryLength; |
| |
| // ------ Execution status and progress. These values are volatile, and accessed under the lock ------- |
| |
| private int verticesFinished; |
| |
| /** Current status of the job execution. */ |
| private volatile JobStatus state = JobStatus.CREATED; |
| |
| /** A future that completes once the job has reached a terminal state. */ |
| private final CompletableFuture<JobStatus> terminationFuture = new CompletableFuture<>(); |
| |
| /** On each global recovery, this version is incremented. The version breaks conflicts |
| * between concurrent restart attempts by local failover strategies. */ |
| private volatile long globalModVersion; |
| |
| /** The exception that caused the job to fail. This is set to the first root exception |
| * that was not recoverable and triggered job failure. */ |
| private volatile Throwable failureCause; |
| |
| /** The extended failure cause information for the job. This exists in addition to 'failureCause', |
| * to let 'failureCause' be a strong reference to the exception, while this info holds no |
| * strong reference to any user-defined classes.*/ |
| private volatile ErrorInfo failureInfo; |
| |
| private final JobMasterPartitionTracker partitionTracker; |
| |
| private final ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker; |
| |
| /** |
| * Future for an ongoing or completed scheduling action. |
| */ |
| @Nullable |
| private volatile CompletableFuture<Void> schedulingFuture; |
| |
| // ------ Fields that are relevant to the execution and need to be cleared before archiving ------- |
| |
| /** The coordinator for checkpoints, if snapshot checkpoints are enabled. */ |
| @Nullable |
| private CheckpointCoordinator checkpointCoordinator; |
| |
| /** TODO, replace it with main thread executor. */ |
| @Nullable |
| private ScheduledExecutorService checkpointCoordinatorTimer; |
| |
| /** Checkpoint stats tracker separate from the coordinator in order to be |
| * available after archiving. */ |
| private CheckpointStatsTracker checkpointStatsTracker; |
| |
| // ------ Fields that are only relevant for archived execution graphs ------------ |
| @Nullable |
| private String stateBackendName; |
| |
| private String jsonPlan; |
| |
| /** Shuffle master to register partitions for task deployment. */ |
| private final ShuffleMaster<?> shuffleMaster; |
| |
| // -------------------------------------------------------------------------------------------- |
| // Constructors |
| // -------------------------------------------------------------------------------------------- |
| |
| public ExecutionGraph( |
| JobInformation jobInformation, |
| ScheduledExecutorService futureExecutor, |
| Executor ioExecutor, |
| Time rpcTimeout, |
| RestartStrategy restartStrategy, |
| int maxPriorAttemptsHistoryLength, |
| FailoverStrategy.Factory failoverStrategyFactory, |
| SlotProvider slotProvider, |
| ClassLoader userClassLoader, |
| BlobWriter blobWriter, |
| Time allocationTimeout, |
| PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory, |
| ShuffleMaster<?> shuffleMaster, |
| JobMasterPartitionTracker partitionTracker, |
| ScheduleMode scheduleMode) throws IOException { |
| |
| this.jobInformation = Preconditions.checkNotNull(jobInformation); |
| |
| this.blobWriter = Preconditions.checkNotNull(blobWriter); |
| |
| this.scheduleMode = checkNotNull(scheduleMode); |
| |
| this.jobInformationOrBlobKey = BlobWriter.serializeAndTryOffload(jobInformation, jobInformation.getJobId(), blobWriter); |
| |
| this.futureExecutor = Preconditions.checkNotNull(futureExecutor); |
| this.ioExecutor = Preconditions.checkNotNull(ioExecutor); |
| |
| this.slotProviderStrategy = SlotProviderStrategy.from( |
| scheduleMode, |
| slotProvider, |
| allocationTimeout); |
| this.userClassLoader = Preconditions.checkNotNull(userClassLoader, "userClassLoader"); |
| |
| this.tasks = new HashMap<>(16); |
| this.intermediateResults = new HashMap<>(16); |
| this.verticesInCreationOrder = new ArrayList<>(16); |
| this.currentExecutions = new HashMap<>(16); |
| |
| this.jobStatusListeners = new CopyOnWriteArrayList<>(); |
| |
| this.stateTimestamps = new long[JobStatus.values().length]; |
| this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis(); |
| |
| this.rpcTimeout = checkNotNull(rpcTimeout); |
| this.allocationTimeout = checkNotNull(allocationTimeout); |
| |
| this.partitionReleaseStrategyFactory = checkNotNull(partitionReleaseStrategyFactory); |
| |
| this.restartStrategy = restartStrategy; |
| this.kvStateLocationRegistry = new KvStateLocationRegistry(jobInformation.getJobId(), getAllVertices()); |
| |
| this.globalModVersion = 1L; |
| |
| // the failover strategy must be instantiated last, so that the execution graph |
| // is ready by the time the failover strategy sees it |
| this.failoverStrategy = checkNotNull(failoverStrategyFactory.create(this), "null failover strategy"); |
| |
| this.maxPriorAttemptsHistoryLength = maxPriorAttemptsHistoryLength; |
| |
| this.schedulingFuture = null; |
| this.jobMasterMainThreadExecutor = |
| new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor( |
| "ExecutionGraph is not initialized with proper main thread executor. " + |
| "Call to ExecutionGraph.start(...) required."); |
| |
| this.shuffleMaster = checkNotNull(shuffleMaster); |
| |
| this.partitionTracker = checkNotNull(partitionTracker); |
| |
| this.resultPartitionAvailabilityChecker = new ExecutionGraphResultPartitionAvailabilityChecker( |
| this::createResultPartitionId, |
| partitionTracker); |
| } |
| |
| public void start(@Nonnull ComponentMainThreadExecutor jobMasterMainThreadExecutor) { |
| this.jobMasterMainThreadExecutor = jobMasterMainThreadExecutor; |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Configuration of Data-flow wide execution settings |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Gets the number of job vertices currently held by this execution graph. |
| * @return The current number of job vertices. |
| */ |
| public int getNumberOfExecutionJobVertices() { |
| return this.verticesInCreationOrder.size(); |
| } |
| |
| public SchedulingTopology<?, ?> getSchedulingTopology() { |
| return executionTopology; |
| } |
| |
| public FailoverTopology<?, ?> getFailoverTopology() { |
| return executionTopology; |
| } |
| |
| public ScheduleMode getScheduleMode() { |
| return scheduleMode; |
| } |
| |
| public Time getAllocationTimeout() { |
| return allocationTimeout; |
| } |
| |
| @Nonnull |
| public ComponentMainThreadExecutor getJobMasterMainThreadExecutor() { |
| return jobMasterMainThreadExecutor; |
| } |
| |
| @Override |
| public boolean isArchived() { |
| return false; |
| } |
| |
| @Override |
| public Optional<String> getStateBackendName() { |
| return Optional.ofNullable(stateBackendName); |
| } |
| |
| public void enableCheckpointing( |
| CheckpointCoordinatorConfiguration chkConfig, |
| List<ExecutionJobVertex> verticesToTrigger, |
| List<ExecutionJobVertex> verticesToWaitFor, |
| List<ExecutionJobVertex> verticesToCommitTo, |
| List<MasterTriggerRestoreHook<?>> masterHooks, |
| CheckpointIDCounter checkpointIDCounter, |
| CompletedCheckpointStore checkpointStore, |
| StateBackend checkpointStateBackend, |
| CheckpointStatsTracker statsTracker) { |
| |
| checkState(state == JobStatus.CREATED, "Job must be in CREATED state"); |
| checkState(checkpointCoordinator == null, "checkpointing already enabled"); |
| |
| ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger); |
| ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor); |
| ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo); |
| |
| checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker"); |
| |
| CheckpointFailureManager failureManager = new CheckpointFailureManager( |
| chkConfig.getTolerableCheckpointFailureNumber(), |
| new CheckpointFailureManager.FailJobCallback() { |
| @Override |
| public void failJob(Throwable cause) { |
| getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause)); |
| } |
| |
| @Override |
| public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) { |
| getJobMasterMainThreadExecutor().execute(() -> failGlobalIfExecutionIsStillRunning(cause, failingTask)); |
| } |
| } |
| ); |
| |
| checkState(checkpointCoordinatorTimer == null); |
| |
| checkpointCoordinatorTimer = Executors.newSingleThreadScheduledExecutor( |
| new DispatcherThreadFactory( |
| Thread.currentThread().getThreadGroup(), "Checkpoint Timer")); |
| |
| // create the coordinator that triggers and commits checkpoints and holds the state |
| checkpointCoordinator = new CheckpointCoordinator( |
| jobInformation.getJobId(), |
| chkConfig, |
| tasksToTrigger, |
| tasksToWaitFor, |
| tasksToCommitTo, |
| checkpointIDCounter, |
| checkpointStore, |
| checkpointStateBackend, |
| ioExecutor, |
| new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer), |
| SharedStateRegistry.DEFAULT_FACTORY, |
| failureManager); |
| |
| // register the master hooks on the checkpoint coordinator |
| for (MasterTriggerRestoreHook<?> hook : masterHooks) { |
| if (!checkpointCoordinator.addMasterHook(hook)) { |
| LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier()); |
| } |
| } |
| |
| checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker); |
| |
| // interval of max long value indicates disable periodic checkpoint, |
| // the CheckpointActivatorDeactivator should be created only if the interval is not max value |
| if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) { |
| // the periodic checkpoint scheduler is activated and deactivated as a result of |
| // job status changes (running -> on, all other states -> off) |
| registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator()); |
| } |
| |
| this.stateBackendName = checkpointStateBackend.getClass().getSimpleName(); |
| } |
| |
| @Nullable |
| public CheckpointCoordinator getCheckpointCoordinator() { |
| return checkpointCoordinator; |
| } |
| |
| public KvStateLocationRegistry getKvStateLocationRegistry() { |
| return kvStateLocationRegistry; |
| } |
| |
| public RestartStrategy getRestartStrategy() { |
| return restartStrategy; |
| } |
| |
| @Override |
| public CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration() { |
| if (checkpointStatsTracker != null) { |
| return checkpointStatsTracker.getJobCheckpointingConfiguration(); |
| } else { |
| return null; |
| } |
| } |
| |
| @Override |
| public CheckpointStatsSnapshot getCheckpointStatsSnapshot() { |
| if (checkpointStatsTracker != null) { |
| return checkpointStatsTracker.createSnapshot(); |
| } else { |
| return null; |
| } |
| } |
| |
| private ExecutionVertex[] collectExecutionVertices(List<ExecutionJobVertex> jobVertices) { |
| if (jobVertices.size() == 1) { |
| ExecutionJobVertex jv = jobVertices.get(0); |
| if (jv.getGraph() != this) { |
| throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph"); |
| } |
| return jv.getTaskVertices(); |
| } |
| else { |
| ArrayList<ExecutionVertex> all = new ArrayList<>(); |
| for (ExecutionJobVertex jv : jobVertices) { |
| if (jv.getGraph() != this) { |
| throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph"); |
| } |
| all.addAll(Arrays.asList(jv.getTaskVertices())); |
| } |
| return all.toArray(new ExecutionVertex[all.size()]); |
| } |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Properties and Status of the Execution Graph |
| // -------------------------------------------------------------------------------------------- |
| |
| public void setJsonPlan(String jsonPlan) { |
| this.jsonPlan = jsonPlan; |
| } |
| |
| @Override |
| public String getJsonPlan() { |
| return jsonPlan; |
| } |
| |
| public SlotProviderStrategy getSlotProviderStrategy() { |
| return slotProviderStrategy; |
| } |
| |
| public Either<SerializedValue<JobInformation>, PermanentBlobKey> getJobInformationOrBlobKey() { |
| return jobInformationOrBlobKey; |
| } |
| |
| @Override |
| public JobID getJobID() { |
| return jobInformation.getJobId(); |
| } |
| |
| @Override |
| public String getJobName() { |
| return jobInformation.getJobName(); |
| } |
| |
| @Override |
| public boolean isStoppable() { |
| return this.isStoppable; |
| } |
| |
| public Configuration getJobConfiguration() { |
| return jobInformation.getJobConfiguration(); |
| } |
| |
| public ClassLoader getUserClassLoader() { |
| return this.userClassLoader; |
| } |
| |
| @Override |
| public JobStatus getState() { |
| return state; |
| } |
| |
| public Throwable getFailureCause() { |
| return failureCause; |
| } |
| |
| public ErrorInfo getFailureInfo() { |
| return failureInfo; |
| } |
| |
| /** |
| * Gets the number of restarts, including full restarts and fine grained restarts. |
| * If a recovery is currently pending, this recovery is included in the count. |
| * |
| * @return The number of restarts so far |
| */ |
| public long getNumberOfRestarts() { |
| return numberOfRestartsCounter.getCount(); |
| } |
| |
| @Override |
| public ExecutionJobVertex getJobVertex(JobVertexID id) { |
| return this.tasks.get(id); |
| } |
| |
| @Override |
| public Map<JobVertexID, ExecutionJobVertex> getAllVertices() { |
| return Collections.unmodifiableMap(this.tasks); |
| } |
| |
| @Override |
| public Iterable<ExecutionJobVertex> getVerticesTopologically() { |
| // we return a specific iterator that does not fail with concurrent modifications |
| // the list is append only, so it is safe for that |
| final int numElements = this.verticesInCreationOrder.size(); |
| |
| return new Iterable<ExecutionJobVertex>() { |
| @Override |
| public Iterator<ExecutionJobVertex> iterator() { |
| return new Iterator<ExecutionJobVertex>() { |
| private int pos = 0; |
| |
| @Override |
| public boolean hasNext() { |
| return pos < numElements; |
| } |
| |
| @Override |
| public ExecutionJobVertex next() { |
| if (hasNext()) { |
| return verticesInCreationOrder.get(pos++); |
| } else { |
| throw new NoSuchElementException(); |
| } |
| } |
| |
| @Override |
| public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| }; |
| } |
| }; |
| } |
| |
| public int getTotalNumberOfVertices() { |
| return numVerticesTotal; |
| } |
| |
| public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() { |
| return Collections.unmodifiableMap(this.intermediateResults); |
| } |
| |
| @Override |
| public Iterable<ExecutionVertex> getAllExecutionVertices() { |
| return new Iterable<ExecutionVertex>() { |
| @Override |
| public Iterator<ExecutionVertex> iterator() { |
| return new AllVerticesIterator(getVerticesTopologically().iterator()); |
| } |
| }; |
| } |
| |
| @Override |
| public long getStatusTimestamp(JobStatus status) { |
| return this.stateTimestamps[status.ordinal()]; |
| } |
| |
| public final BlobWriter getBlobWriter() { |
| return blobWriter; |
| } |
| |
| /** |
| * Returns the ExecutionContext associated with this ExecutionGraph. |
| * |
| * @return ExecutionContext associated with this ExecutionGraph |
| */ |
| public Executor getFutureExecutor() { |
| return futureExecutor; |
| } |
| |
| /** |
| * Merges all accumulator results from the tasks previously executed in the Executions. |
| * @return The accumulator map |
| */ |
| public Map<String, OptionalFailure<Accumulator<?, ?>>> aggregateUserAccumulators() { |
| |
| Map<String, OptionalFailure<Accumulator<?, ?>>> userAccumulators = new HashMap<>(); |
| |
| for (ExecutionVertex vertex : getAllExecutionVertices()) { |
| Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators(); |
| if (next != null) { |
| AccumulatorHelper.mergeInto(userAccumulators, next); |
| } |
| } |
| |
| return userAccumulators; |
| } |
| |
| /** |
| * Gets a serialized accumulator map. |
| * @return The accumulator map with serialized accumulator values. |
| */ |
| @Override |
| public Map<String, SerializedValue<OptionalFailure<Object>>> getAccumulatorsSerialized() { |
| return aggregateUserAccumulators() |
| .entrySet() |
| .stream() |
| .collect(Collectors.toMap( |
| Map.Entry::getKey, |
| entry -> serializeAccumulator(entry.getKey(), entry.getValue()))); |
| } |
| |
| private static SerializedValue<OptionalFailure<Object>> serializeAccumulator(String name, OptionalFailure<Accumulator<?, ?>> accumulator) { |
| try { |
| if (accumulator.isFailure()) { |
| return new SerializedValue<>(OptionalFailure.ofFailure(accumulator.getFailureCause())); |
| } |
| return new SerializedValue<>(OptionalFailure.of(accumulator.getUnchecked().getLocalValue())); |
| } catch (IOException ioe) { |
| LOG.error("Could not serialize accumulator " + name + '.', ioe); |
| try { |
| return new SerializedValue<>(OptionalFailure.ofFailure(ioe)); |
| } catch (IOException e) { |
| throw new RuntimeException("It should never happen that we cannot serialize the accumulator serialization exception.", e); |
| } |
| } |
| } |
| |
| /** |
| * Returns the a stringified version of the user-defined accumulators. |
| * @return an Array containing the StringifiedAccumulatorResult objects |
| */ |
| @Override |
| public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() { |
| Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = aggregateUserAccumulators(); |
| return StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap); |
| } |
| |
| public void enableNgScheduling(final InternalFailuresListener internalTaskFailuresListener) { |
| checkNotNull(internalTaskFailuresListener); |
| checkState(this.internalTaskFailuresListener == null, "enableNgScheduling can be only called once"); |
| this.internalTaskFailuresListener = internalTaskFailuresListener; |
| this.legacyScheduling = false; |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Actions |
| // -------------------------------------------------------------------------------------------- |
| |
| public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException { |
| |
| assertRunningInJobMasterMainThread(); |
| |
| LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " + |
| "vertices and {} intermediate results.", |
| topologiallySorted.size(), |
| tasks.size(), |
| intermediateResults.size()); |
| |
| final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size()); |
| final long createTimestamp = System.currentTimeMillis(); |
| |
| for (JobVertex jobVertex : topologiallySorted) { |
| |
| if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) { |
| this.isStoppable = false; |
| } |
| |
| // create the execution job vertex and attach it to the graph |
| ExecutionJobVertex ejv = new ExecutionJobVertex( |
| this, |
| jobVertex, |
| 1, |
| maxPriorAttemptsHistoryLength, |
| rpcTimeout, |
| globalModVersion, |
| createTimestamp); |
| |
| ejv.connectToPredecessors(this.intermediateResults); |
| |
| ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv); |
| if (previousTask != null) { |
| throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", |
| jobVertex.getID(), ejv, previousTask)); |
| } |
| |
| for (IntermediateResult res : ejv.getProducedDataSets()) { |
| IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res); |
| if (previousDataSet != null) { |
| throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]", |
| res.getId(), res, previousDataSet)); |
| } |
| } |
| |
| this.verticesInCreationOrder.add(ejv); |
| this.numVerticesTotal += ejv.getParallelism(); |
| newExecJobVertices.add(ejv); |
| } |
| |
| // the topology assigning should happen before notifying new vertices to failoverStrategy |
| executionTopology = new DefaultExecutionTopology(this); |
| |
| failoverStrategy.notifyNewVertices(newExecJobVertices); |
| |
| partitionReleaseStrategy = partitionReleaseStrategyFactory.createInstance(getSchedulingTopology()); |
| } |
| |
| public boolean isLegacyScheduling() { |
| return legacyScheduling; |
| } |
| |
| public void transitionToRunning() { |
| if (!transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { |
| throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED); |
| } |
| } |
| |
| public void scheduleForExecution() throws JobException { |
| |
| assertRunningInJobMasterMainThread(); |
| |
| if (isLegacyScheduling()) { |
| LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName()); |
| } |
| |
| final long currentGlobalModVersion = globalModVersion; |
| |
| if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { |
| |
| final CompletableFuture<Void> newSchedulingFuture = SchedulingUtils.schedule( |
| scheduleMode, |
| getAllExecutionVertices(), |
| this); |
| |
| if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) { |
| schedulingFuture = newSchedulingFuture; |
| newSchedulingFuture.whenComplete( |
| (Void ignored, Throwable throwable) -> { |
| if (throwable != null) { |
| final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); |
| |
| if (!(strippedThrowable instanceof CancellationException)) { |
| // only fail if the scheduling future was not canceled |
| failGlobal(strippedThrowable); |
| } |
| } |
| }); |
| } else { |
| newSchedulingFuture.cancel(false); |
| } |
| } |
| else { |
| throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED); |
| } |
| } |
| |
| public void cancel() { |
| |
| assertRunningInJobMasterMainThread(); |
| |
| while (true) { |
| JobStatus current = state; |
| |
| if (current == JobStatus.RUNNING || current == JobStatus.CREATED || current == JobStatus.RESTARTING) { |
| if (transitionState(current, JobStatus.CANCELLING)) { |
| |
| // make sure no concurrent local actions interfere with the cancellation |
| final long globalVersionForRestart = incrementGlobalModVersion(); |
| |
| final CompletableFuture<Void> ongoingSchedulingFuture = schedulingFuture; |
| |
| // cancel ongoing scheduling action |
| if (ongoingSchedulingFuture != null) { |
| ongoingSchedulingFuture.cancel(false); |
| } |
| |
| final ConjunctFuture<Void> allTerminal = cancelVerticesAsync(); |
| allTerminal.whenComplete( |
| (Void value, Throwable throwable) -> { |
| if (throwable != null) { |
| transitionState( |
| JobStatus.CANCELLING, |
| JobStatus.FAILED, |
| new FlinkException( |
| "Could not cancel job " + getJobName() + " because not all execution job vertices could be cancelled.", |
| throwable)); |
| } else { |
| // cancellations may currently be overridden by failures which trigger |
| // restarts, so we need to pass a proper restart global version here |
| allVerticesInTerminalState(globalVersionForRestart); |
| } |
| }); |
| |
| return; |
| } |
| } |
| // Executions are being canceled. Go into cancelling and wait for |
| // all vertices to be in their final state. |
| else if (current == JobStatus.FAILING) { |
| if (transitionState(current, JobStatus.CANCELLING)) { |
| return; |
| } |
| } |
| else { |
| // no need to treat other states |
| return; |
| } |
| } |
| } |
| |
| private ConjunctFuture<Void> cancelVerticesAsync() { |
| final ArrayList<CompletableFuture<?>> futures = new ArrayList<>(verticesInCreationOrder.size()); |
| |
| // cancel all tasks (that still need cancelling) |
| for (ExecutionJobVertex ejv : verticesInCreationOrder) { |
| futures.add(ejv.cancelWithFuture()); |
| } |
| |
| // we build a future that is complete once all vertices have reached a terminal state |
| return FutureUtils.waitForAll(futures); |
| } |
| |
| /** |
| * Suspends the current ExecutionGraph. |
| * |
| * <p>The JobStatus will be directly set to {@link JobStatus#SUSPENDED} iff the current state is not a terminal |
| * state. All ExecutionJobVertices will be canceled and the onTerminalState() is executed. |
| * |
| * <p>The {@link JobStatus#SUSPENDED} state is a local terminal state which stops the execution of the job but does |
| * not remove the job from the HA job store so that it can be recovered by another JobManager. |
| * |
| * @param suspensionCause Cause of the suspension |
| */ |
| public void suspend(Throwable suspensionCause) { |
| |
| assertRunningInJobMasterMainThread(); |
| |
| if (state.isTerminalState()) { |
| // stay in a terminal state |
| return; |
| } else if (transitionState(state, JobStatus.SUSPENDED, suspensionCause)) { |
| initFailureCause(suspensionCause); |
| |
| // make sure no concurrent local actions interfere with the cancellation |
| incrementGlobalModVersion(); |
| |
| // cancel ongoing scheduling action |
| if (schedulingFuture != null) { |
| schedulingFuture.cancel(false); |
| } |
| final ArrayList<CompletableFuture<Void>> executionJobVertexTerminationFutures = new ArrayList<>(verticesInCreationOrder.size()); |
| |
| for (ExecutionJobVertex ejv: verticesInCreationOrder) { |
| executionJobVertexTerminationFutures.add(ejv.suspend()); |
| } |
| |
| final ConjunctFuture<Void> jobVerticesTerminationFuture = FutureUtils.waitForAll(executionJobVertexTerminationFutures); |
| |
| checkState(jobVerticesTerminationFuture.isDone(), "Suspend needs to happen atomically"); |
| |
| jobVerticesTerminationFuture.whenComplete( |
| (Void ignored, Throwable throwable) -> { |
| if (throwable != null) { |
| LOG.debug("Could not properly suspend the execution graph.", throwable); |
| } |
| |
| onTerminalState(state); |
| LOG.info("Job {} has been suspended.", getJobID()); |
| }); |
| } else { |
| throw new IllegalStateException(String.format("Could not suspend because transition from %s to %s failed.", state, JobStatus.SUSPENDED)); |
| } |
| } |
| |
| void failGlobalIfExecutionIsStillRunning(Throwable cause, ExecutionAttemptID failingAttempt) { |
| final Execution failedExecution = currentExecutions.get(failingAttempt); |
| if (failedExecution != null && failedExecution.getState() == ExecutionState.RUNNING) { |
| failGlobal(cause); |
| } else { |
| LOG.debug("The failing attempt {} belongs to an already not" + |
| " running task thus won't fail the job", failingAttempt); |
| } |
| } |
| |
| /** |
| * Fails the execution graph globally. This failure will not be recovered by a specific |
| * failover strategy, but results in a full restart of all tasks. |
| * |
| * <p>This global failure is meant to be triggered in cases where the consistency of the |
| * execution graph' state cannot be guaranteed any more (for example when catching unexpected |
| * exceptions that indicate a bug or an unexpected call race), and where a full restart is the |
| * safe way to get consistency back. |
| * |
| * @param t The exception that caused the failure. |
| */ |
| public void failGlobal(Throwable t) { |
| if (!isLegacyScheduling()) { |
| internalTaskFailuresListener.notifyGlobalFailure(t); |
| return; |
| } |
| |
| assertRunningInJobMasterMainThread(); |
| |
| while (true) { |
| JobStatus current = state; |
| // stay in these states |
| if (current == JobStatus.FAILING || |
| current == JobStatus.SUSPENDED || |
| current.isGloballyTerminalState()) { |
| return; |
| } else if (transitionState(current, JobStatus.FAILING, t)) { |
| initFailureCause(t); |
| |
| // make sure no concurrent local or global actions interfere with the failover |
| final long globalVersionForRestart = incrementGlobalModVersion(); |
| |
| final CompletableFuture<Void> ongoingSchedulingFuture = schedulingFuture; |
| |
| // cancel ongoing scheduling action |
| if (ongoingSchedulingFuture != null) { |
| ongoingSchedulingFuture.cancel(false); |
| } |
| |
| // we build a future that is complete once all vertices have reached a terminal state |
| final ConjunctFuture<Void> allTerminal = cancelVerticesAsync(); |
| FutureUtils.assertNoException(allTerminal.handle( |
| (Void ignored, Throwable throwable) -> { |
| if (throwable != null) { |
| transitionState( |
| JobStatus.FAILING, |
| JobStatus.FAILED, |
| new FlinkException("Could not cancel all execution job vertices properly.", throwable)); |
| } else { |
| allVerticesInTerminalState(globalVersionForRestart); |
| } |
| return null; |
| })); |
| |
| return; |
| } |
| |
| // else: concurrent change to execution state, retry |
| } |
| } |
| |
| public void restart(long expectedGlobalVersion) { |
| |
| assertRunningInJobMasterMainThread(); |
| |
| try { |
| // check the global version to see whether this recovery attempt is still valid |
| if (globalModVersion != expectedGlobalVersion) { |
| LOG.info("Concurrent full restart subsumed this restart."); |
| return; |
| } |
| |
| final JobStatus current = state; |
| |
| if (current == JobStatus.CANCELED) { |
| LOG.info("Canceled job during restart. Aborting restart."); |
| return; |
| } else if (current == JobStatus.FAILED) { |
| LOG.info("Failed job during restart. Aborting restart."); |
| return; |
| } else if (current == JobStatus.SUSPENDED) { |
| LOG.info("Suspended job during restart. Aborting restart."); |
| return; |
| } else if (current != JobStatus.RESTARTING) { |
| throw new IllegalStateException("Can only restart job from state restarting."); |
| } |
| |
| this.currentExecutions.clear(); |
| |
| final Collection<CoLocationGroup> colGroups = new HashSet<>(); |
| final long resetTimestamp = System.currentTimeMillis(); |
| |
| for (ExecutionJobVertex jv : this.verticesInCreationOrder) { |
| |
| CoLocationGroup cgroup = jv.getCoLocationGroup(); |
| if (cgroup != null && !colGroups.contains(cgroup)){ |
| cgroup.resetConstraints(); |
| colGroups.add(cgroup); |
| } |
| |
| jv.resetForNewExecution(resetTimestamp, expectedGlobalVersion); |
| } |
| |
| for (int i = 0; i < stateTimestamps.length; i++) { |
| if (i != JobStatus.RESTARTING.ordinal()) { |
| // Only clear the non restarting state in order to preserve when the job was |
| // restarted. This is needed for the restarting time gauge |
| stateTimestamps[i] = 0; |
| } |
| } |
| |
| transitionState(JobStatus.RESTARTING, JobStatus.CREATED); |
| |
| // if we have checkpointed state, reload it into the executions |
| if (checkpointCoordinator != null) { |
| checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false); |
| } |
| |
| scheduleForExecution(); |
| } |
| // TODO remove the catch block if we align the schematics to not fail global within the restarter. |
| catch (Throwable t) { |
| LOG.warn("Failed to restart the job.", t); |
| failGlobal(t); |
| } |
| } |
| |
| /** |
| * Returns the serializable {@link ArchivedExecutionConfig}. |
| * |
| * @return ArchivedExecutionConfig which may be null in case of errors |
| */ |
| @Override |
| public ArchivedExecutionConfig getArchivedExecutionConfig() { |
| // create a summary of all relevant data accessed in the web interface's JobConfigHandler |
| try { |
| ExecutionConfig executionConfig = jobInformation.getSerializedExecutionConfig().deserializeValue(userClassLoader); |
| if (executionConfig != null) { |
| return executionConfig.archive(); |
| } |
| } catch (IOException | ClassNotFoundException e) { |
| LOG.error("Couldn't create ArchivedExecutionConfig for job {} ", getJobID(), e); |
| } |
| return null; |
| } |
| |
| /** |
| * Returns the termination future of this {@link ExecutionGraph}. The termination future |
| * is completed with the terminal {@link JobStatus} once the ExecutionGraph reaches this |
| * terminal state and all {@link Execution} have been terminated. |
| * |
| * @return Termination future of this {@link ExecutionGraph}. |
| */ |
| public CompletableFuture<JobStatus> getTerminationFuture() { |
| return terminationFuture; |
| } |
| |
| @VisibleForTesting |
| public JobStatus waitUntilTerminal() throws InterruptedException { |
| try { |
| return terminationFuture.get(); |
| } |
| catch (ExecutionException e) { |
| // this should never happen |
| // it would be a bug, so we don't expect this to be handled and throw |
| // an unchecked exception here |
| throw new RuntimeException(e); |
| } |
| } |
| |
| /** |
| * Gets the failover strategy used by the execution graph to recover from failures of tasks. |
| */ |
| public FailoverStrategy getFailoverStrategy() { |
| return this.failoverStrategy; |
| } |
| |
| /** |
| * Gets the current global modification version of the ExecutionGraph. |
| * The global modification version is incremented with each global action (cancel/fail/restart) |
| * and is used to disambiguate concurrent modifications between local and global |
| * failover actions. |
| */ |
| public long getGlobalModVersion() { |
| return globalModVersion; |
| } |
| |
| // ------------------------------------------------------------------------ |
| // State Transitions |
| // ------------------------------------------------------------------------ |
| |
| public boolean transitionState(JobStatus current, JobStatus newState) { |
| return transitionState(current, newState, null); |
| } |
| |
| private void transitionState(JobStatus newState, Throwable error) { |
| transitionState(state, newState, error); |
| } |
| |
| private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) { |
| assertRunningInJobMasterMainThread(); |
| // consistency check |
| if (current.isTerminalState()) { |
| String message = "Job is trying to leave terminal state " + current; |
| LOG.error(message); |
| throw new IllegalStateException(message); |
| } |
| |
| // now do the actual state transition |
| if (state == current) { |
| state = newState; |
| LOG.info("Job {} ({}) switched from state {} to {}.", getJobName(), getJobID(), current, newState, error); |
| |
| stateTimestamps[newState.ordinal()] = System.currentTimeMillis(); |
| notifyJobStatusChange(newState, error); |
| return true; |
| } |
| else { |
| return false; |
| } |
| } |
| |
| private long incrementGlobalModVersion() { |
| incrementRestarts(); |
| return ++globalModVersion; |
| } |
| |
| public void incrementRestarts() { |
| numberOfRestartsCounter.inc(); |
| } |
| |
| public void initFailureCause(Throwable t) { |
| this.failureCause = t; |
| this.failureInfo = new ErrorInfo(t, System.currentTimeMillis()); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Job Status Progress |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Called whenever a vertex reaches state FINISHED (completed successfully). |
| * Once all vertices are in the FINISHED state, the program is successfully done. |
| */ |
| void vertexFinished() { |
| assertRunningInJobMasterMainThread(); |
| final int numFinished = ++verticesFinished; |
| if (numFinished == numVerticesTotal) { |
| // done :-) |
| |
| // check whether we are still in "RUNNING" and trigger the final cleanup |
| if (state == JobStatus.RUNNING) { |
| // we do the final cleanup in the I/O executor, because it may involve |
| // some heavier work |
| |
| try { |
| for (ExecutionJobVertex ejv : verticesInCreationOrder) { |
| ejv.getJobVertex().finalizeOnMaster(getUserClassLoader()); |
| } |
| } |
| catch (Throwable t) { |
| ExceptionUtils.rethrowIfFatalError(t); |
| failGlobal(new Exception("Failed to finalize execution on master", t)); |
| return; |
| } |
| |
| // if we do not make this state transition, then a concurrent |
| // cancellation or failure happened |
| if (transitionState(JobStatus.RUNNING, JobStatus.FINISHED)) { |
| onTerminalState(JobStatus.FINISHED); |
| } |
| } |
| } |
| } |
| |
| void vertexUnFinished() { |
| assertRunningInJobMasterMainThread(); |
| verticesFinished--; |
| } |
| |
| /** |
| * This method is a callback during cancellation/failover and called when all tasks |
| * have reached a terminal state (cancelled/failed/finished). |
| */ |
| private void allVerticesInTerminalState(long expectedGlobalVersionForRestart) { |
| |
| assertRunningInJobMasterMainThread(); |
| |
| // we are done, transition to the final state |
| JobStatus current; |
| while (true) { |
| current = this.state; |
| |
| if (current == JobStatus.RUNNING) { |
| failGlobal(new Exception("ExecutionGraph went into allVerticesInTerminalState() from RUNNING")); |
| } |
| else if (current == JobStatus.CANCELLING) { |
| if (transitionState(current, JobStatus.CANCELED)) { |
| onTerminalState(JobStatus.CANCELED); |
| break; |
| } |
| } |
| else if (current == JobStatus.FAILING) { |
| if (tryRestartOrFail(expectedGlobalVersionForRestart)) { |
| break; |
| } |
| // concurrent job status change, let's check again |
| } |
| else if (current.isGloballyTerminalState()) { |
| LOG.warn("Job has entered globally terminal state without waiting for all " + |
| "job vertices to reach final state."); |
| break; |
| } |
| else { |
| failGlobal(new Exception("ExecutionGraph went into final state from state " + current)); |
| break; |
| } |
| } |
| // done transitioning the state |
| } |
| |
| /** |
| * Try to restart the job. If we cannot restart the job (e.g. no more restarts allowed), then |
| * try to fail the job. This operation is only permitted if the current state is FAILING or |
| * RESTARTING. |
| * |
| * @return true if the operation could be executed; false if a concurrent job status change occurred |
| */ |
| @Deprecated |
| private boolean tryRestartOrFail(long globalModVersionForRestart) { |
| if (!isLegacyScheduling()) { |
| return true; |
| } |
| |
| JobStatus currentState = state; |
| |
| if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) { |
| final Throwable failureCause = this.failureCause; |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Try to restart or fail the job {} ({}) if no longer possible.", getJobName(), getJobID(), failureCause); |
| } else { |
| LOG.info("Try to restart or fail the job {} ({}) if no longer possible.", getJobName(), getJobID()); |
| } |
| |
| final boolean isFailureCauseAllowingRestart = !(failureCause instanceof SuppressRestartsException); |
| final boolean isRestartStrategyAllowingRestart = restartStrategy.canRestart(); |
| boolean isRestartable = isFailureCauseAllowingRestart && isRestartStrategyAllowingRestart; |
| |
| if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) { |
| LOG.info("Restarting the job {} ({}).", getJobName(), getJobID()); |
| |
| RestartCallback restarter = new ExecutionGraphRestartCallback(this, globalModVersionForRestart); |
| FutureUtils.assertNoException( |
| restartStrategy |
| .restart(restarter, getJobMasterMainThreadExecutor()) |
| .exceptionally((throwable) -> { |
| failGlobal(throwable); |
| return null; |
| })); |
| return true; |
| } |
| else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) { |
| final String cause1 = isFailureCauseAllowingRestart ? null : |
| "a type of SuppressRestartsException was thrown"; |
| final String cause2 = isRestartStrategyAllowingRestart ? null : |
| "the restart strategy prevented it"; |
| |
| LOG.info("Could not restart the job {} ({}) because {}.", getJobName(), getJobID(), |
| StringUtils.concatenateWithAnd(cause1, cause2), failureCause); |
| onTerminalState(JobStatus.FAILED); |
| |
| return true; |
| } else { |
| // we must have changed the state concurrently, thus we cannot complete this operation |
| return false; |
| } |
| } else { |
| // this operation is only allowed in the state FAILING or RESTARTING |
| return false; |
| } |
| } |
| |
| public void failJob(Throwable cause) { |
| if (state == JobStatus.FAILING || state.isGloballyTerminalState()) { |
| return; |
| } |
| |
| transitionState(JobStatus.FAILING, cause); |
| initFailureCause(cause); |
| |
| FutureUtils.assertNoException( |
| cancelVerticesAsync().whenComplete((aVoid, throwable) -> { |
| transitionState(JobStatus.FAILED, cause); |
| onTerminalState(JobStatus.FAILED); |
| })); |
| } |
| |
| private void onTerminalState(JobStatus status) { |
| try { |
| CheckpointCoordinator coord = this.checkpointCoordinator; |
| this.checkpointCoordinator = null; |
| if (coord != null) { |
| coord.shutdown(status); |
| } |
| if (checkpointCoordinatorTimer != null) { |
| checkpointCoordinatorTimer.shutdownNow(); |
| checkpointCoordinatorTimer = null; |
| } |
| } |
| catch (Exception e) { |
| LOG.error("Error while cleaning up after execution", e); |
| } |
| finally { |
| terminationFuture.complete(status); |
| } |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Callbacks and Callback Utilities |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Updates the state of one of the ExecutionVertex's Execution attempts. |
| * If the new status if "FINISHED", this also updates the accumulators. |
| * |
| * @param state The state update. |
| * @return True, if the task update was properly applied, false, if the execution attempt was not found. |
| */ |
| public boolean updateState(TaskExecutionState state) { |
| assertRunningInJobMasterMainThread(); |
| final Execution attempt = currentExecutions.get(state.getID()); |
| |
| if (attempt != null) { |
| try { |
| final boolean stateUpdated = updateStateInternal(state, attempt); |
| maybeReleasePartitions(attempt); |
| return stateUpdated; |
| } |
| catch (Throwable t) { |
| ExceptionUtils.rethrowIfFatalErrorOrOOM(t); |
| |
| // failures during updates leave the ExecutionGraph inconsistent |
| failGlobal(t); |
| return false; |
| } |
| } |
| else { |
| return false; |
| } |
| } |
| |
| private boolean updateStateInternal(final TaskExecutionState state, final Execution attempt) { |
| Map<String, Accumulator<?, ?>> accumulators; |
| |
| switch (state.getExecutionState()) { |
| case RUNNING: |
| return attempt.switchToRunning(); |
| |
| case FINISHED: |
| // this deserialization is exception-free |
| accumulators = deserializeAccumulators(state); |
| attempt.markFinished(accumulators, state.getIOMetrics()); |
| return true; |
| |
| case CANCELED: |
| // this deserialization is exception-free |
| accumulators = deserializeAccumulators(state); |
| attempt.completeCancelling(accumulators, state.getIOMetrics(), false); |
| return true; |
| |
| case FAILED: |
| // this deserialization is exception-free |
| accumulators = deserializeAccumulators(state); |
| attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics(), !isLegacyScheduling()); |
| return true; |
| |
| default: |
| // we mark as failed and return false, which triggers the TaskManager |
| // to remove the task |
| attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState())); |
| return false; |
| } |
| } |
| |
| private void maybeReleasePartitions(final Execution attempt) { |
| final ExecutionVertexID finishedExecutionVertex = attempt.getVertex().getID(); |
| |
| if (attempt.getState() == ExecutionState.FINISHED) { |
| final List<IntermediateResultPartitionID> releasablePartitions = partitionReleaseStrategy.vertexFinished(finishedExecutionVertex); |
| releasePartitions(releasablePartitions); |
| } else { |
| partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex); |
| } |
| } |
| |
| private void releasePartitions(final List<IntermediateResultPartitionID> releasablePartitions) { |
| if (releasablePartitions.size() > 0) { |
| final List<ResultPartitionID> partitionIds = releasablePartitions.stream() |
| .map(this::createResultPartitionId) |
| .collect(Collectors.toList()); |
| |
| partitionTracker.stopTrackingAndReleasePartitions(partitionIds); |
| } |
| } |
| |
| ResultPartitionID createResultPartitionId(final IntermediateResultPartitionID resultPartitionId) { |
| final SchedulingResultPartition<?, ?> schedulingResultPartition = |
| getSchedulingTopology().getResultPartitionOrThrow(resultPartitionId); |
| final SchedulingExecutionVertex<?, ?> producer = schedulingResultPartition.getProducer(); |
| final ExecutionVertexID producerId = producer.getId(); |
| final JobVertexID jobVertexId = producerId.getJobVertexId(); |
| final ExecutionJobVertex jobVertex = getJobVertex(jobVertexId); |
| checkNotNull(jobVertex, "Unknown job vertex %s", jobVertexId); |
| |
| final ExecutionVertex[] taskVertices = jobVertex.getTaskVertices(); |
| final int subtaskIndex = producerId.getSubtaskIndex(); |
| checkState(subtaskIndex < taskVertices.length, "Invalid subtask index %d for job vertex %s", subtaskIndex, jobVertexId); |
| |
| final ExecutionVertex taskVertex = taskVertices[subtaskIndex]; |
| final Execution execution = taskVertex.getCurrentExecutionAttempt(); |
| return new ResultPartitionID(resultPartitionId, execution.getAttemptId()); |
| } |
| |
| /** |
| * Deserializes accumulators from a task state update. |
| * |
| * <p>This method never throws an exception! |
| * |
| * @param state The task execution state from which to deserialize the accumulators. |
| * @return The deserialized accumulators, of null, if there are no accumulators or an error occurred. |
| */ |
| private Map<String, Accumulator<?, ?>> deserializeAccumulators(TaskExecutionState state) { |
| AccumulatorSnapshot serializedAccumulators = state.getAccumulators(); |
| |
| if (serializedAccumulators != null) { |
| try { |
| return serializedAccumulators.deserializeUserAccumulators(userClassLoader); |
| } |
| catch (Throwable t) { |
| // we catch Throwable here to include all form of linking errors that may |
| // occur if user classes are missing in the classpath |
| LOG.error("Failed to deserialize final accumulator results.", t); |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Schedule or updates consumers of the given result partition. |
| * |
| * @param partitionId specifying the result partition whose consumer shall be scheduled or updated |
| * @throws ExecutionGraphException if the schedule or update consumers operation could not be executed |
| */ |
| public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) throws ExecutionGraphException { |
| |
| assertRunningInJobMasterMainThread(); |
| |
| final Execution execution = currentExecutions.get(partitionId.getProducerId()); |
| |
| if (execution == null) { |
| throw new ExecutionGraphException("Cannot find execution for execution Id " + |
| partitionId.getPartitionId() + '.'); |
| } |
| else if (execution.getVertex() == null){ |
| throw new ExecutionGraphException("Execution with execution Id " + |
| partitionId.getPartitionId() + " has no vertex assigned."); |
| } else { |
| execution.getVertex().scheduleOrUpdateConsumers(partitionId); |
| } |
| } |
| |
| public Map<ExecutionAttemptID, Execution> getRegisteredExecutions() { |
| return Collections.unmodifiableMap(currentExecutions); |
| } |
| |
| void registerExecution(Execution exec) { |
| assertRunningInJobMasterMainThread(); |
| Execution previous = currentExecutions.putIfAbsent(exec.getAttemptId(), exec); |
| if (previous != null) { |
| failGlobal(new Exception("Trying to register execution " + exec + " for already used ID " + exec.getAttemptId())); |
| } |
| } |
| |
| void deregisterExecution(Execution exec) { |
| assertRunningInJobMasterMainThread(); |
| Execution contained = currentExecutions.remove(exec.getAttemptId()); |
| |
| if (contained != null && contained != exec) { |
| failGlobal(new Exception("De-registering execution " + exec + " failed. Found for same ID execution " + contained)); |
| } |
| } |
| |
| /** |
| * Updates the accumulators during the runtime of a job. Final accumulator results are transferred |
| * through the UpdateTaskExecutionState message. |
| * @param accumulatorSnapshot The serialized flink and user-defined accumulators |
| */ |
| public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) { |
| Map<String, Accumulator<?, ?>> userAccumulators; |
| try { |
| userAccumulators = accumulatorSnapshot.deserializeUserAccumulators(userClassLoader); |
| |
| ExecutionAttemptID execID = accumulatorSnapshot.getExecutionAttemptID(); |
| Execution execution = currentExecutions.get(execID); |
| if (execution != null) { |
| execution.setAccumulators(userAccumulators); |
| } else { |
| LOG.debug("Received accumulator result for unknown execution {}.", execID); |
| } |
| } catch (Exception e) { |
| LOG.error("Cannot update accumulators for job {}.", getJobID(), e); |
| } |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Listeners & Observers |
| // -------------------------------------------------------------------------------------------- |
| |
| public void registerJobStatusListener(JobStatusListener listener) { |
| if (listener != null) { |
| jobStatusListeners.add(listener); |
| } |
| } |
| |
| private void notifyJobStatusChange(JobStatus newState, Throwable error) { |
| if (jobStatusListeners.size() > 0) { |
| final long timestamp = System.currentTimeMillis(); |
| final Throwable serializedError = error == null ? null : new SerializedThrowable(error); |
| |
| for (JobStatusListener listener : jobStatusListeners) { |
| try { |
| listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError); |
| } catch (Throwable t) { |
| LOG.warn("Error while notifying JobStatusListener", t); |
| } |
| } |
| } |
| } |
| |
| void notifyExecutionChange( |
| final Execution execution, |
| final ExecutionState newExecutionState, |
| final Throwable error) { |
| |
| if (!isLegacyScheduling()) { |
| return; |
| } |
| |
| // see what this means for us. currently, the first FAILED state means -> FAILED |
| if (newExecutionState == ExecutionState.FAILED) { |
| final Throwable ex = error != null ? error : new FlinkException("Unknown Error (missing cause)"); |
| |
| // by filtering out late failure calls, we can save some work in |
| // avoiding redundant local failover |
| if (execution.getGlobalModVersion() == globalModVersion) { |
| try { |
| // fail all checkpoints which the failed task has not yet acknowledged |
| if (checkpointCoordinator != null) { |
| checkpointCoordinator.failUnacknowledgedPendingCheckpointsFor(execution.getAttemptId(), ex); |
| } |
| |
| failoverStrategy.onTaskFailure(execution, ex); |
| } |
| catch (Throwable t) { |
| // bug in the failover strategy - fall back to global failover |
| LOG.warn("Error in failover strategy - falling back to global restart", t); |
| failGlobal(ex); |
| } |
| } |
| } |
| } |
| |
| void assertRunningInJobMasterMainThread() { |
| if (!(jobMasterMainThreadExecutor instanceof ComponentMainThreadExecutor.DummyComponentMainThreadExecutor)) { |
| jobMasterMainThreadExecutor.assertRunningInMainThread(); |
| } |
| } |
| |
| void notifySchedulerNgAboutInternalTaskFailure(final ExecutionAttemptID attemptId, final Throwable t) { |
| if (internalTaskFailuresListener != null) { |
| internalTaskFailuresListener.notifyTaskFailure(attemptId, t); |
| } |
| } |
| |
| ShuffleMaster<?> getShuffleMaster() { |
| return shuffleMaster; |
| } |
| |
| public JobMasterPartitionTracker getPartitionTracker() { |
| return partitionTracker; |
| } |
| |
| public ResultPartitionAvailabilityChecker getResultPartitionAvailabilityChecker() { |
| return resultPartitionAvailabilityChecker; |
| } |
| |
| PartitionReleaseStrategy getPartitionReleaseStrategy() { |
| return partitionReleaseStrategy; |
| } |
| } |