[FLINK-22574] Adaptive Scheduler: Fix cancellation while in Restarting state.

The Canceling state of Adaptive Scheduler was expecting the ExecutionGraph to be in state RUNNING when entering the state.
However, the Restarting state is cancelling the ExecutionGraph already, thus the ExectionGraph can be in state CANCELING or CANCELED when entering the Canceling state.

Calling the ExecutionGraph.cancel() method in the Canceling state while being in ExecutionGraph.state = CANCELED || CANCELLED is not a problem.

The change is guarded by a new ITCase, as this issue affects the interplay between different AS states.

This closes #15882
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
index 9962c78..91d688f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
@@ -91,8 +91,6 @@
         this.operatorCoordinatorHandler = operatorCoordinatorHandler;
         this.kvStateHandler = new KvStateHandler(executionGraph);
         this.logger = logger;
-        Preconditions.checkState(
-                executionGraph.getState() == JobStatus.RUNNING, "Assuming running execution graph");
 
         FutureUtils.assertNoException(
                 executionGraph
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase.java
index 9280dbc..992d2c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase.java
@@ -20,7 +20,9 @@
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.execution.Environment;
@@ -33,6 +35,7 @@
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -43,6 +46,7 @@
 
 import java.io.IOException;
 import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 
 import static org.junit.Assert.assertTrue;
 
@@ -105,6 +109,34 @@
     }
 
     @Test
+    public void testJobCancellationWhileRestartingSucceeds() throws Exception {
+        final long timeInRestartingState = 10000L;
+
+        final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
+        final JobVertex alwaysFailingOperator = new JobVertex("Always failing operator");
+        alwaysFailingOperator.setInvokableClass(AlwaysFailingInvokable.class);
+        alwaysFailingOperator.setParallelism(1);
+
+        final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(alwaysFailingOperator);
+        ExecutionConfig executionConfig = new ExecutionConfig();
+        // configure a high delay between attempts: We'll stay in RESTARTING for 10 seconds.
+        executionConfig.setRestartStrategy(
+                RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, timeInRestartingState));
+        jobGraph.setExecutionConfig(executionConfig);
+
+        miniCluster.submitJob(jobGraph).join();
+
+        // wait until we are in RESTARTING state
+        CommonTestUtils.waitUntilCondition(
+                () -> miniCluster.getJobStatus(jobGraph.getJobID()).get() == JobStatus.RESTARTING,
+                Deadline.fromNow(Duration.of(timeInRestartingState, ChronoUnit.MILLIS)),
+                5);
+
+        // now cancel while in RESTARTING state
+        miniCluster.cancelJob(jobGraph.getJobID()).get();
+    }
+
+    @Test
     public void testGlobalFailoverIfTaskFails() throws Throwable {
         final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
         final JobGraph jobGraph = createOnceFailingJobGraph();
@@ -160,4 +192,16 @@
             hasFailed = false;
         }
     }
+
+    /** Always failing {@link AbstractInvokable}. */
+    public static final class AlwaysFailingInvokable extends AbstractInvokable {
+        public AlwaysFailingInvokable(Environment environment) {
+            super(environment);
+        }
+
+        @Override
+        public void invoke() throws Exception {
+            throw new FlinkRuntimeException("Test failure.");
+        }
+    }
 }