[FLINK-15918][runtime] Fix that 'uptime' metric is not reset after restart

The 'uptime' metric is the time difference between 'now' and the timestamp when
the job transitioned to state RUNNING. The new scheduler until now never
transitioned out of status RUNNING when restarting tasks which had the effect
that the 'uptime' metric was not reset after a restart. This introduces new
state transitions to the job. We transition the job status to RESTARTING if at
least one ExecutionVertex is waiting to be restarted, and we transition from
RESTARTING immediately to RUNNING again after the restart.

This closes #11032.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 5803f8d..4447b1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -898,7 +898,7 @@
 		while (true) {
 			JobStatus current = state;
 
-			if (current == JobStatus.RUNNING || current == JobStatus.CREATED) {
+			if (current == JobStatus.RUNNING || current == JobStatus.CREATED || current == JobStatus.RESTARTING) {
 				if (transitionState(current, JobStatus.CANCELLING)) {
 
 					// make sure no concurrent local actions interfere with the cancellation
@@ -938,16 +938,6 @@
 					return;
 				}
 			}
-			// All vertices have been cancelled and it's safe to directly go
-			// into the canceled state.
-			else if (current == JobStatus.RESTARTING) {
-				if (transitionState(current, JobStatus.CANCELED)) {
-					onTerminalState(JobStatus.CANCELED);
-
-					LOG.info("Canceled during restart.");
-					return;
-				}
-			}
 			else {
 				// no need to treat other states
 				return;
@@ -1220,7 +1210,7 @@
 	//  State Transitions
 	// ------------------------------------------------------------------------
 
-	private boolean transitionState(JobStatus current, JobStatus newState) {
+	public boolean transitionState(JobStatus current, JobStatus newState) {
 		return transitionState(current, newState, null);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 9779dac..2fe8340 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobWriter;
@@ -90,6 +91,8 @@
 
 	private final ExecutionVertexOperations executionVertexOperations;
 
+	private final Set<ExecutionVertexID> verticesWaitingForRestart;
+
 	public DefaultScheduler(
 		final Logger log,
 		final JobGraph jobGraph,
@@ -151,6 +154,8 @@
 			restartBackoffTimeStrategy);
 		this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology());
 		this.executionSlotAllocator = checkNotNull(executionSlotAllocatorFactory).createInstance(getInputsLocationsRetriever());
+
+		this.verticesWaitingForRestart = new HashSet<>();
 	}
 
 	// ------------------------------------------------------------------------
@@ -211,6 +216,8 @@
 		final Set<ExecutionVertexVersion> executionVertexVersions =
 			new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
 
+		addVerticesToRestartPending(verticesToRestart);
+
 		final CompletableFuture<?> cancelFuture = cancelTasksAsync(verticesToRestart);
 
 		delayExecutor.schedule(
@@ -220,10 +227,24 @@
 			TimeUnit.MILLISECONDS);
 	}
 
+	private void addVerticesToRestartPending(final Set<ExecutionVertexID> verticesToRestart) {
+		verticesWaitingForRestart.addAll(verticesToRestart);
+		transitionExecutionGraphState(JobStatus.RUNNING, JobStatus.RESTARTING);
+	}
+
+	private void removeVerticesFromRestartPending(final Set<ExecutionVertexID> verticesToRestart) {
+		verticesWaitingForRestart.removeAll(verticesToRestart);
+		if (verticesWaitingForRestart.isEmpty()) {
+			transitionExecutionGraphState(JobStatus.RESTARTING, JobStatus.RUNNING);
+		}
+	}
+
 	private Runnable restartTasks(final Set<ExecutionVertexVersion> executionVertexVersions) {
 		return () -> {
 			final Set<ExecutionVertexID> verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
 
+			removeVerticesFromRestartPending(verticesToRestart);
+
 			resetForNewExecutions(verticesToRestart);
 
 			try {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index bd62c1e..609877e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -417,6 +417,10 @@
 				.collect(Collectors.toSet()));
 	}
 
+	protected void transitionExecutionGraphState(final JobStatus current, final JobStatus newState) {
+		executionGraph.transitionState(current, newState);
+	}
+
 	// ------------------------------------------------------------------------
 	// SchedulerNG
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
index de2ec72..04d1f48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
@@ -26,6 +26,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.NoSuchElementException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -143,6 +144,18 @@
 		triggerNonPeriodicScheduledTasks();
 	}
 
+	/**
+	 * Triggers a single non-periodically scheduled task.
+	 *
+	 * @throws NoSuchElementException If there is no such task.
+	 */
+	public void triggerNonPeriodicScheduledTask() {
+		final ScheduledTask<?> poll = nonPeriodicScheduledTasks.remove();
+		if (poll != null) {
+			poll.execute();
+		}
+	}
+
 	public void triggerNonPeriodicScheduledTasks() {
 		final Iterator<ScheduledTask<?>> iterator = nonPeriodicScheduledTasks.iterator();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 40126cf..b1ef7c7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -600,6 +600,53 @@
 	}
 
 	@Test
+	public void jobStatusIsRestartingIfOneVertexIsWaitingForRestart() {
+		final JobGraph jobGraph = singleJobVertexJobGraph(2);
+		final JobID jobId = jobGraph.getJobID();
+		final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+
+		final Iterator<ArchivedExecutionVertex> vertexIterator = scheduler.requestJob().getAllExecutionVertices().iterator();
+		final ExecutionAttemptID attemptId1 = vertexIterator.next().getCurrentExecutionAttempt().getAttemptId();
+		final ExecutionAttemptID attemptId2 = vertexIterator.next().getCurrentExecutionAttempt().getAttemptId();
+
+		scheduler.updateTaskExecutionState(new TaskExecutionState(jobId, attemptId1, ExecutionState.FAILED, new RuntimeException("expected")));
+		final JobStatus jobStatusAfterFirstFailure = scheduler.requestJobStatus();
+		scheduler.updateTaskExecutionState(new TaskExecutionState(jobId, attemptId2, ExecutionState.FAILED, new RuntimeException("expected")));
+
+		taskRestartExecutor.triggerNonPeriodicScheduledTask();
+		final JobStatus jobStatusWithPendingRestarts = scheduler.requestJobStatus();
+		taskRestartExecutor.triggerNonPeriodicScheduledTask();
+		final JobStatus jobStatusAfterRestarts = scheduler.requestJobStatus();
+
+		assertThat(jobStatusAfterFirstFailure, equalTo(JobStatus.RESTARTING));
+		assertThat(jobStatusWithPendingRestarts, equalTo(JobStatus.RESTARTING));
+		assertThat(jobStatusAfterRestarts, equalTo(JobStatus.RUNNING));
+	}
+
+	@Test
+	public void cancelWhileRestartingShouldWaitForRunningTasks() {
+		final JobGraph jobGraph = singleJobVertexJobGraph(2);
+		final JobID jobid = jobGraph.getJobID();
+		final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+		final SchedulingTopology<?, ?> topology = scheduler.getSchedulingTopology();
+
+		final Iterator<ArchivedExecutionVertex> vertexIterator = scheduler.requestJob().getAllExecutionVertices().iterator();
+		final ExecutionAttemptID attemptId1 = vertexIterator.next().getCurrentExecutionAttempt().getAttemptId();
+		final ExecutionAttemptID attemptId2 = vertexIterator.next().getCurrentExecutionAttempt().getAttemptId();
+		final ExecutionVertexID executionVertex2 = scheduler.getExecutionVertexIdOrThrow(attemptId2);
+
+		scheduler.updateTaskExecutionState(new TaskExecutionState(jobid, attemptId1, ExecutionState.FAILED, new RuntimeException("expected")));
+		scheduler.cancel();
+		final ExecutionState vertex2StateAfterCancel = topology.getVertexOrThrow(executionVertex2).getState();
+		final JobStatus statusAfterCancelWhileRestarting = scheduler.requestJobStatus();
+		scheduler.updateTaskExecutionState(new TaskExecutionState(jobid, attemptId2, ExecutionState.CANCELED, new RuntimeException("expected")));
+
+		assertThat(vertex2StateAfterCancel, is(equalTo(ExecutionState.CANCELING)));
+		assertThat(statusAfterCancelWhileRestarting, is(equalTo(JobStatus.CANCELLING)));
+		assertThat(scheduler.requestJobStatus(), is(equalTo(JobStatus.CANCELED)));
+	}
+
+	@Test
 	public void failureInfoIsSetAfterTaskFailure() {
 		final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
 		final JobID jobId = jobGraph.getJobID();