[FLINK-34906][autoscaler] Only scale when all tasks are running
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
index e99424c..ba12cde 100644
--- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
@@ -20,12 +20,15 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.standalone.JobListFetcher;
+import org.apache.flink.autoscaler.utils.JobStatusUtils;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
 import org.apache.flink.util.function.FunctionWithException;
 
@@ -54,7 +57,11 @@
     public Collection<JobAutoScalerContext<JobID>> fetch() throws Exception {
         try (var restClusterClient = restClientGetter.apply(new Configuration())) {
             return restClusterClient
-                    .listJobs()
+                    .sendRequest(
+                            JobsOverviewHeaders.getInstance(),
+                            EmptyMessageParameters.getInstance(),
+                            EmptyRequestBody.getInstance())
+                    .thenApply(JobStatusUtils::toJobStatusMessage)
                     .get(restClientTimeout.toSeconds(), TimeUnit.SECONDS)
                     .stream()
                     .map(
diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcherTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcherTest.java
index 658eb58..9192e52 100644
--- a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcherTest.java
+++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcherTest.java
@@ -21,10 +21,13 @@
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.RequestBody;
@@ -38,13 +41,12 @@
 import javax.annotation.Nullable;
 
 import java.time.Duration;
-import java.time.Instant;
-import java.util.Collection;
-import java.util.List;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -56,12 +58,9 @@
     /** Test whether the job list and confs are expected. */
     @Test
     void testFetchJobListAndConfigurationInfo() throws Exception {
-        var job1 =
-                new JobStatusMessage(
-                        new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
-        var job2 =
-                new JobStatusMessage(
-                        new JobID(), "", JobStatus.CANCELLING, Instant.now().toEpochMilli());
+        var job1 = new JobID();
+        var job2 = new JobID();
+        var jobs = Map.of(job1, JobStatus.RUNNING, job2, JobStatus.CANCELLING);
 
         Configuration expectedConf1 = new Configuration();
         expectedConf1.setString("option_key1", "option_value1");
@@ -70,18 +69,17 @@
         expectedConf2.setString("option_key2", "option_value2");
         expectedConf2.setString("option_key3", "option_value3");
 
-        var jobs = Map.of(job1.getJobId(), job1, job2.getJobId(), job2);
-        var configurations = Map.of(job1.getJobId(), expectedConf1, job2.getJobId(), expectedConf2);
+        var configurations = Map.of(job1, expectedConf1, job2, expectedConf2);
         var closeCounter = new AtomicLong();
         FlinkClusterJobListFetcher jobListFetcher =
                 new FlinkClusterJobListFetcher(
                         getRestClusterClient(
-                                Either.Left(List.of(job1, job2)),
+                                Either.Left(jobs),
                                 Either.Left(
                                         Map.of(
-                                                job1.getJobId(),
+                                                job1,
                                                 ConfigurationInfo.from(expectedConf1),
-                                                job2.getJobId(),
+                                                job2,
                                                 ConfigurationInfo.from(expectedConf2))),
                                 closeCounter),
                         Duration.ofSeconds(10));
@@ -94,11 +92,9 @@
 
             assertThat(fetchedJobList).hasSize(2);
             for (var jobContext : fetchedJobList) {
-                JobStatusMessage expectedJobStatusMessage = jobs.get(jobContext.getJobID());
+                var expectedJobState = jobs.get(jobContext.getJobID());
                 Configuration expectedConf = configurations.get(jobContext.getJobID());
-                assertThat(expectedJobStatusMessage).isNotNull();
-                assertThat(jobContext.getJobStatus())
-                        .isEqualTo(expectedJobStatusMessage.getJobState());
+                assertThat(jobContext.getJobStatus()).isEqualTo(expectedJobState);
                 assertThat(jobContext.getConfiguration()).isNotNull().isEqualTo(expectedConf);
             }
         }
@@ -130,16 +126,13 @@
      */
     @Test
     void testFetchConfigurationException() {
-        var job1 =
-                new JobStatusMessage(
-                        new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
         var expectedException = new RuntimeException("Expected exception.");
         var closeCounter = new AtomicLong();
 
         FlinkClusterJobListFetcher jobListFetcher =
                 new FlinkClusterJobListFetcher(
                         getRestClusterClient(
-                                Either.Left(List.of(job1)),
+                                Either.Left(Map.of(new JobID(), JobStatus.RUNNING)),
                                 Either.Right(expectedException),
                                 closeCounter),
                         Duration.ofSeconds(10));
@@ -171,14 +164,12 @@
      */
     @Test
     void testFetchConfigurationTimeout() {
-        var job1 =
-                new JobStatusMessage(
-                        new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
         CompletableFuture<Void> closeFuture = new CompletableFuture<>();
 
         FlinkClusterJobListFetcher jobListFetcher =
                 new FlinkClusterJobListFetcher(
-                        getTimeoutableRestClusterClient(List.of(job1), null, closeFuture),
+                        getTimeoutableRestClusterClient(
+                                Map.of(new JobID(), JobStatus.RUNNING), null, closeFuture),
                         Duration.ofSeconds(2));
 
         assertThat(closeFuture).isNotDone();
@@ -189,8 +180,8 @@
     }
 
     /**
-     * @param jobListOrException When listJobs is called, return jobList if Either is left, return
-     *     failedFuture if Either is right.
+     * @param jobsOrException When the jobs overview is called, return jobList if Either is left,
+     *     return failedFuture if Either is right.
      * @param configurationsOrException When fetch job conf, return configuration if Either is left,
      *     return failedFuture if Either is right.
      * @param closeCounter Increment the count each time the {@link RestClusterClient#close} is
@@ -198,7 +189,7 @@
      */
     private static FunctionWithException<Configuration, RestClusterClient<String>, Exception>
             getRestClusterClient(
-                    Either<Collection<JobStatusMessage>, Throwable> jobListOrException,
+                    Either<Map<JobID, JobStatus>, Throwable> jobsOrException,
                     Either<Map<JobID, ConfigurationInfo>, Throwable> configurationsOrException,
                     AtomicLong closeCounter) {
         return conf ->
@@ -208,14 +199,6 @@
                         (c, e) -> new StandaloneClientHAServices("localhost")) {
 
                     @Override
-                    public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
-                        if (jobListOrException.isLeft()) {
-                            return CompletableFuture.completedFuture(jobListOrException.left());
-                        }
-                        return CompletableFuture.failedFuture(jobListOrException.right());
-                    }
-
-                    @Override
                     public <
                                     M extends MessageHeaders<R, P, U>,
                                     U extends MessageParameters,
@@ -231,6 +214,22 @@
                             return (CompletableFuture<P>)
                                     CompletableFuture.completedFuture(
                                             configurationsOrException.left().get(jobID));
+                        } else if (h instanceof JobsOverviewHeaders) {
+                            if (jobsOrException.isLeft()) {
+                                return (CompletableFuture<P>)
+                                        CompletableFuture.completedFuture(
+                                                new MultipleJobsDetails(
+                                                        jobsOrException.left().entrySet().stream()
+                                                                .map(
+                                                                        entry ->
+                                                                                generateJobDetails(
+                                                                                        entry
+                                                                                                .getKey(),
+                                                                                        entry
+                                                                                                .getValue()))
+                                                                .collect(Collectors.toList())));
+                            }
+                            return CompletableFuture.failedFuture(jobsOrException.right());
                         }
                         fail("Unknown request");
                         return null;
@@ -245,7 +244,7 @@
     }
 
     /**
-     * @param jobList When listJobs is called, return jobList if it's not null, don't complete
+     * @param jobs When the jobs overview is called, return jobList if it's not null, don't complete
      *     future if it's null.
      * @param configuration When fetch job conf, return configuration if it's not null, don't
      *     complete future if it's null.
@@ -253,7 +252,7 @@
      */
     private static FunctionWithException<Configuration, RestClusterClient<String>, Exception>
             getTimeoutableRestClusterClient(
-                    @Nullable Collection<JobStatusMessage> jobList,
+                    @Nullable Map<JobID, JobStatus> jobs,
                     @Nullable ConfigurationInfo configuration,
                     CompletableFuture<Void> closeFuture) {
         return conf ->
@@ -263,14 +262,6 @@
                         (c, e) -> new StandaloneClientHAServices("localhost")) {
 
                     @Override
-                    public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
-                        if (jobList == null) {
-                            return new CompletableFuture<>();
-                        }
-                        return CompletableFuture.completedFuture(jobList);
-                    }
-
-                    @Override
                     public <
                                     M extends MessageHeaders<R, P, U>,
                                     U extends MessageParameters,
@@ -283,6 +274,21 @@
                             }
                             return (CompletableFuture<P>)
                                     CompletableFuture.completedFuture(configuration);
+                        } else if (h instanceof JobsOverviewHeaders) {
+                            if (jobs == null) {
+                                return new CompletableFuture<>();
+                            }
+                            return (CompletableFuture<P>)
+                                    CompletableFuture.completedFuture(
+                                            new MultipleJobsDetails(
+                                                    jobs.entrySet().stream()
+                                                            .map(
+                                                                    entry ->
+                                                                            generateJobDetails(
+                                                                                    entry.getKey(),
+                                                                                    entry
+                                                                                            .getValue()))
+                                                            .collect(Collectors.toList())));
                         }
                         fail("Unknown request");
                         return null;
@@ -295,4 +301,25 @@
                     }
                 };
     }
+
+    private static JobDetails generateJobDetails(JobID jobID, JobStatus jobStatus) {
+        int[] countPerState = new int[ExecutionState.values().length];
+        if (jobStatus == JobStatus.RUNNING) {
+            countPerState[ExecutionState.RUNNING.ordinal()] = 5;
+            countPerState[ExecutionState.FINISHED.ordinal()] = 2;
+        } else if (jobStatus == JobStatus.CANCELLING) {
+            countPerState[ExecutionState.CANCELING.ordinal()] = 7;
+        }
+        int numTasks = Arrays.stream(countPerState).sum();
+        return new JobDetails(
+                jobID,
+                "test-job",
+                System.currentTimeMillis(),
+                -1,
+                0,
+                jobStatus,
+                System.currentTimeMillis(),
+                countPerState,
+                numTasks);
+    }
 }
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/JobStatusUtils.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/JobStatusUtils.java
new file mode 100644
index 0000000..c4b9923
--- /dev/null
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/JobStatusUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.utils;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Job status related utilities. */
+public class JobStatusUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobStatusUtils.class);
+
+    public static List<JobStatusMessage> toJobStatusMessage(
+            MultipleJobsDetails multipleJobsDetails) {
+        return multipleJobsDetails.getJobs().stream()
+                .map(
+                        details ->
+                                new JobStatusMessage(
+                                        details.getJobId(),
+                                        details.getJobName(),
+                                        getEffectiveStatus(details),
+                                        details.getStartTime()))
+                .collect(Collectors.toList());
+    }
+
+    @VisibleForTesting
+    static JobStatus getEffectiveStatus(JobDetails details) {
+        int numRunning = details.getTasksPerState()[ExecutionState.RUNNING.ordinal()];
+        int numFinished = details.getTasksPerState()[ExecutionState.FINISHED.ordinal()];
+        boolean allRunningOrFinished = details.getNumTasks() == (numRunning + numFinished);
+        JobStatus effectiveStatus = details.getStatus();
+        if (JobStatus.RUNNING.equals(effectiveStatus) && !allRunningOrFinished) {
+            LOG.debug("Adjusting job state from {} to {}", JobStatus.RUNNING, JobStatus.CREATED);
+            return JobStatus.CREATED;
+        }
+        return effectiveStatus;
+    }
+}
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/JobStatusUtilsTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/JobStatusUtilsTest.java
new file mode 100644
index 0000000..4cdb166
--- /dev/null
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/JobStatusUtilsTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test for {@link JobStatusUtils}. */
+class JobStatusUtilsTest {
+
+    @Test
+    void effectiveStatusTest() {
+        var allRunning = getJobDetails(JobStatus.RUNNING, Tuple2.of(ExecutionState.RUNNING, 4));
+        assertEquals(JobStatus.RUNNING, JobStatusUtils.getEffectiveStatus(allRunning));
+
+        var allRunningOrFinished =
+                getJobDetails(
+                        JobStatus.RUNNING,
+                        Tuple2.of(ExecutionState.RUNNING, 2),
+                        Tuple2.of(ExecutionState.FINISHED, 2));
+        assertEquals(JobStatus.RUNNING, JobStatusUtils.getEffectiveStatus(allRunningOrFinished));
+
+        var allRunningOrScheduled =
+                getJobDetails(
+                        JobStatus.RUNNING,
+                        Tuple2.of(ExecutionState.RUNNING, 2),
+                        Tuple2.of(ExecutionState.SCHEDULED, 2));
+        assertEquals(JobStatus.CREATED, JobStatusUtils.getEffectiveStatus(allRunningOrScheduled));
+
+        var allFinished = getJobDetails(JobStatus.FINISHED, Tuple2.of(ExecutionState.FINISHED, 4));
+        assertEquals(JobStatus.FINISHED, JobStatusUtils.getEffectiveStatus(allFinished));
+    }
+
+    @SafeVarargs
+    private JobDetails getJobDetails(
+            JobStatus status, Tuple2<ExecutionState, Integer>... tasksPerState) {
+        int[] countPerState = new int[ExecutionState.values().length];
+        for (var taskPerState : tasksPerState) {
+            countPerState[taskPerState.f0.ordinal()] = taskPerState.f1;
+        }
+        int numTasks = Arrays.stream(countPerState).sum();
+        return new JobDetails(
+                new JobID(),
+                "test-job",
+                System.currentTimeMillis(),
+                -1,
+                0,
+                status,
+                System.currentTimeMillis(),
+                countPerState,
+                numTasks);
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 42ce5eb..ae8979a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.autoscaler.utils.JobStatusUtils;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
@@ -50,12 +51,9 @@
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
 import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.FileUpload;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
@@ -272,7 +270,7 @@
                             JobsOverviewHeaders.getInstance(),
                             EmptyMessageParameters.getInstance(),
                             EmptyRequestBody.getInstance())
-                    .thenApply(AbstractFlinkService::toJobStatusMessage)
+                    .thenApply(JobStatusUtils::toJobStatusMessage)
                     .get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
         }
     }
@@ -902,19 +900,6 @@
                 timeout);
     }
 
-    private static List<JobStatusMessage> toJobStatusMessage(
-            MultipleJobsDetails multipleJobsDetails) {
-        return multipleJobsDetails.getJobs().stream()
-                .map(
-                        details ->
-                                new JobStatusMessage(
-                                        details.getJobId(),
-                                        details.getJobName(),
-                                        getEffectiveStatus(details),
-                                        details.getStartTime()))
-                .collect(Collectors.toList());
-    }
-
     @VisibleForTesting
     protected static Configuration removeOperatorConfigs(Configuration config) {
         Configuration newConfig = new Configuration();
@@ -929,19 +914,6 @@
         return newConfig;
     }
 
-    @VisibleForTesting
-    protected static JobStatus getEffectiveStatus(JobDetails details) {
-        int numRunning = details.getTasksPerState()[ExecutionState.RUNNING.ordinal()];
-        int numFinished = details.getTasksPerState()[ExecutionState.FINISHED.ordinal()];
-        boolean allRunningOrFinished = details.getNumTasks() == (numRunning + numFinished);
-        JobStatus effectiveStatus = details.getStatus();
-        if (JobStatus.RUNNING.equals(effectiveStatus) && !allRunningOrFinished) {
-            effectiveStatus = JobStatus.CREATED;
-            LOG.debug("Adjusting job state from {} to {}", JobStatus.RUNNING, effectiveStatus);
-        }
-        return effectiveStatus;
-    }
-
     private void validateHaMetadataExists(Configuration conf) {
         if (!isHaMetadataAvailable(conf)) {
             throw new RecoveryFailureException(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index 8a928a2..8e9683d 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -52,10 +52,8 @@
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
@@ -123,7 +121,6 @@
 import java.nio.file.Path;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -962,63 +959,6 @@
     }
 
     @Test
-    public void effectiveStatusTest() {
-        JobDetails allRunning =
-                getJobDetails(
-                        org.apache.flink.api.common.JobStatus.RUNNING,
-                        Tuple2.of(ExecutionState.RUNNING, 4));
-        assertEquals(
-                org.apache.flink.api.common.JobStatus.RUNNING,
-                AbstractFlinkService.getEffectiveStatus(allRunning));
-
-        JobDetails allRunningOrFinished =
-                getJobDetails(
-                        org.apache.flink.api.common.JobStatus.RUNNING,
-                        Tuple2.of(ExecutionState.RUNNING, 2),
-                        Tuple2.of(ExecutionState.FINISHED, 2));
-        assertEquals(
-                org.apache.flink.api.common.JobStatus.RUNNING,
-                AbstractFlinkService.getEffectiveStatus(allRunningOrFinished));
-
-        JobDetails allRunningOrScheduled =
-                getJobDetails(
-                        org.apache.flink.api.common.JobStatus.RUNNING,
-                        Tuple2.of(ExecutionState.RUNNING, 2),
-                        Tuple2.of(ExecutionState.SCHEDULED, 2));
-        assertEquals(
-                org.apache.flink.api.common.JobStatus.CREATED,
-                AbstractFlinkService.getEffectiveStatus(allRunningOrScheduled));
-
-        JobDetails allFinished =
-                getJobDetails(
-                        org.apache.flink.api.common.JobStatus.FINISHED,
-                        Tuple2.of(ExecutionState.FINISHED, 4));
-        assertEquals(
-                org.apache.flink.api.common.JobStatus.FINISHED,
-                AbstractFlinkService.getEffectiveStatus(allFinished));
-    }
-
-    private JobDetails getJobDetails(
-            org.apache.flink.api.common.JobStatus status,
-            Tuple2<ExecutionState, Integer>... tasksPerState) {
-        int[] countPerState = new int[ExecutionState.values().length];
-        for (var taskPerState : tasksPerState) {
-            countPerState[taskPerState.f0.ordinal()] = taskPerState.f1;
-        }
-        int numTasks = Arrays.stream(countPerState).sum();
-        return new JobDetails(
-                new JobID(),
-                "test-job",
-                System.currentTimeMillis(),
-                -1,
-                0,
-                status,
-                System.currentTimeMillis(),
-                countPerState,
-                numTasks);
-    }
-
-    @Test
     public void isJobManagerReadyTest() throws Exception {
         AtomicReference<String> url = new AtomicReference<>();
         var clusterClient =