[FLINK-34906][autoscaler] Only scale when all tasks are running
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
index e99424c..ba12cde 100644
--- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
@@ -20,12 +20,15 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.standalone.JobListFetcher;
+import org.apache.flink.autoscaler.utils.JobStatusUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
import org.apache.flink.util.function.FunctionWithException;
@@ -54,7 +57,11 @@
public Collection<JobAutoScalerContext<JobID>> fetch() throws Exception {
try (var restClusterClient = restClientGetter.apply(new Configuration())) {
return restClusterClient
- .listJobs()
+ .sendRequest(
+ JobsOverviewHeaders.getInstance(),
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance())
+ .thenApply(JobStatusUtils::toJobStatusMessage)
.get(restClientTimeout.toSeconds(), TimeUnit.SECONDS)
.stream()
.map(
diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcherTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcherTest.java
index 658eb58..9192e52 100644
--- a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcherTest.java
+++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcherTest.java
@@ -21,10 +21,13 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
@@ -38,13 +41,12 @@
import javax.annotation.Nullable;
import java.time.Duration;
-import java.time.Instant;
-import java.util.Collection;
-import java.util.List;
+import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -56,12 +58,9 @@
/** Test whether the job list and confs are expected. */
@Test
void testFetchJobListAndConfigurationInfo() throws Exception {
- var job1 =
- new JobStatusMessage(
- new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
- var job2 =
- new JobStatusMessage(
- new JobID(), "", JobStatus.CANCELLING, Instant.now().toEpochMilli());
+ var job1 = new JobID();
+ var job2 = new JobID();
+ var jobs = Map.of(job1, JobStatus.RUNNING, job2, JobStatus.CANCELLING);
Configuration expectedConf1 = new Configuration();
expectedConf1.setString("option_key1", "option_value1");
@@ -70,18 +69,17 @@
expectedConf2.setString("option_key2", "option_value2");
expectedConf2.setString("option_key3", "option_value3");
- var jobs = Map.of(job1.getJobId(), job1, job2.getJobId(), job2);
- var configurations = Map.of(job1.getJobId(), expectedConf1, job2.getJobId(), expectedConf2);
+ var configurations = Map.of(job1, expectedConf1, job2, expectedConf2);
var closeCounter = new AtomicLong();
FlinkClusterJobListFetcher jobListFetcher =
new FlinkClusterJobListFetcher(
getRestClusterClient(
- Either.Left(List.of(job1, job2)),
+ Either.Left(jobs),
Either.Left(
Map.of(
- job1.getJobId(),
+ job1,
ConfigurationInfo.from(expectedConf1),
- job2.getJobId(),
+ job2,
ConfigurationInfo.from(expectedConf2))),
closeCounter),
Duration.ofSeconds(10));
@@ -94,11 +92,9 @@
assertThat(fetchedJobList).hasSize(2);
for (var jobContext : fetchedJobList) {
- JobStatusMessage expectedJobStatusMessage = jobs.get(jobContext.getJobID());
+ var expectedJobState = jobs.get(jobContext.getJobID());
Configuration expectedConf = configurations.get(jobContext.getJobID());
- assertThat(expectedJobStatusMessage).isNotNull();
- assertThat(jobContext.getJobStatus())
- .isEqualTo(expectedJobStatusMessage.getJobState());
+ assertThat(jobContext.getJobStatus()).isEqualTo(expectedJobState);
assertThat(jobContext.getConfiguration()).isNotNull().isEqualTo(expectedConf);
}
}
@@ -130,16 +126,13 @@
*/
@Test
void testFetchConfigurationException() {
- var job1 =
- new JobStatusMessage(
- new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
var expectedException = new RuntimeException("Expected exception.");
var closeCounter = new AtomicLong();
FlinkClusterJobListFetcher jobListFetcher =
new FlinkClusterJobListFetcher(
getRestClusterClient(
- Either.Left(List.of(job1)),
+ Either.Left(Map.of(new JobID(), JobStatus.RUNNING)),
Either.Right(expectedException),
closeCounter),
Duration.ofSeconds(10));
@@ -171,14 +164,12 @@
*/
@Test
void testFetchConfigurationTimeout() {
- var job1 =
- new JobStatusMessage(
- new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
FlinkClusterJobListFetcher jobListFetcher =
new FlinkClusterJobListFetcher(
- getTimeoutableRestClusterClient(List.of(job1), null, closeFuture),
+ getTimeoutableRestClusterClient(
+ Map.of(new JobID(), JobStatus.RUNNING), null, closeFuture),
Duration.ofSeconds(2));
assertThat(closeFuture).isNotDone();
@@ -189,8 +180,8 @@
}
/**
- * @param jobListOrException When listJobs is called, return jobList if Either is left, return
- * failedFuture if Either is right.
+ * @param jobsOrException When the jobs overview is called, return jobList if Either is left,
+ * return failedFuture if Either is right.
* @param configurationsOrException When fetch job conf, return configuration if Either is left,
* return failedFuture if Either is right.
* @param closeCounter Increment the count each time the {@link RestClusterClient#close} is
@@ -198,7 +189,7 @@
*/
private static FunctionWithException<Configuration, RestClusterClient<String>, Exception>
getRestClusterClient(
- Either<Collection<JobStatusMessage>, Throwable> jobListOrException,
+ Either<Map<JobID, JobStatus>, Throwable> jobsOrException,
Either<Map<JobID, ConfigurationInfo>, Throwable> configurationsOrException,
AtomicLong closeCounter) {
return conf ->
@@ -208,14 +199,6 @@
(c, e) -> new StandaloneClientHAServices("localhost")) {
@Override
- public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
- if (jobListOrException.isLeft()) {
- return CompletableFuture.completedFuture(jobListOrException.left());
- }
- return CompletableFuture.failedFuture(jobListOrException.right());
- }
-
- @Override
public <
M extends MessageHeaders<R, P, U>,
U extends MessageParameters,
@@ -231,6 +214,22 @@
return (CompletableFuture<P>)
CompletableFuture.completedFuture(
configurationsOrException.left().get(jobID));
+ } else if (h instanceof JobsOverviewHeaders) {
+ if (jobsOrException.isLeft()) {
+ return (CompletableFuture<P>)
+ CompletableFuture.completedFuture(
+ new MultipleJobsDetails(
+ jobsOrException.left().entrySet().stream()
+ .map(
+ entry ->
+ generateJobDetails(
+ entry
+ .getKey(),
+ entry
+ .getValue()))
+ .collect(Collectors.toList())));
+ }
+ return CompletableFuture.failedFuture(jobsOrException.right());
}
fail("Unknown request");
return null;
@@ -245,7 +244,7 @@
}
/**
- * @param jobList When listJobs is called, return jobList if it's not null, don't complete
+ * @param jobs When the jobs overview is called, return jobList if it's not null, don't complete
* future if it's null.
* @param configuration When fetch job conf, return configuration if it's not null, don't
* complete future if it's null.
@@ -253,7 +252,7 @@
*/
private static FunctionWithException<Configuration, RestClusterClient<String>, Exception>
getTimeoutableRestClusterClient(
- @Nullable Collection<JobStatusMessage> jobList,
+ @Nullable Map<JobID, JobStatus> jobs,
@Nullable ConfigurationInfo configuration,
CompletableFuture<Void> closeFuture) {
return conf ->
@@ -263,14 +262,6 @@
(c, e) -> new StandaloneClientHAServices("localhost")) {
@Override
- public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
- if (jobList == null) {
- return new CompletableFuture<>();
- }
- return CompletableFuture.completedFuture(jobList);
- }
-
- @Override
public <
M extends MessageHeaders<R, P, U>,
U extends MessageParameters,
@@ -283,6 +274,21 @@
}
return (CompletableFuture<P>)
CompletableFuture.completedFuture(configuration);
+ } else if (h instanceof JobsOverviewHeaders) {
+ if (jobs == null) {
+ return new CompletableFuture<>();
+ }
+ return (CompletableFuture<P>)
+ CompletableFuture.completedFuture(
+ new MultipleJobsDetails(
+ jobs.entrySet().stream()
+ .map(
+ entry ->
+ generateJobDetails(
+ entry.getKey(),
+ entry
+ .getValue()))
+ .collect(Collectors.toList())));
}
fail("Unknown request");
return null;
@@ -295,4 +301,25 @@
}
};
}
+
+ private static JobDetails generateJobDetails(JobID jobID, JobStatus jobStatus) {
+ int[] countPerState = new int[ExecutionState.values().length];
+ if (jobStatus == JobStatus.RUNNING) {
+ countPerState[ExecutionState.RUNNING.ordinal()] = 5;
+ countPerState[ExecutionState.FINISHED.ordinal()] = 2;
+ } else if (jobStatus == JobStatus.CANCELLING) {
+ countPerState[ExecutionState.CANCELING.ordinal()] = 7;
+ }
+ int numTasks = Arrays.stream(countPerState).sum();
+ return new JobDetails(
+ jobID,
+ "test-job",
+ System.currentTimeMillis(),
+ -1,
+ 0,
+ jobStatus,
+ System.currentTimeMillis(),
+ countPerState,
+ numTasks);
+ }
}
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/JobStatusUtils.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/JobStatusUtils.java
new file mode 100644
index 0000000..c4b9923
--- /dev/null
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/JobStatusUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.utils;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Job status related utilities. */
+public class JobStatusUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JobStatusUtils.class);
+
+ public static List<JobStatusMessage> toJobStatusMessage(
+ MultipleJobsDetails multipleJobsDetails) {
+ return multipleJobsDetails.getJobs().stream()
+ .map(
+ details ->
+ new JobStatusMessage(
+ details.getJobId(),
+ details.getJobName(),
+ getEffectiveStatus(details),
+ details.getStartTime()))
+ .collect(Collectors.toList());
+ }
+
+ @VisibleForTesting
+ static JobStatus getEffectiveStatus(JobDetails details) {
+ int numRunning = details.getTasksPerState()[ExecutionState.RUNNING.ordinal()];
+ int numFinished = details.getTasksPerState()[ExecutionState.FINISHED.ordinal()];
+ boolean allRunningOrFinished = details.getNumTasks() == (numRunning + numFinished);
+ JobStatus effectiveStatus = details.getStatus();
+ if (JobStatus.RUNNING.equals(effectiveStatus) && !allRunningOrFinished) {
+ LOG.debug("Adjusting job state from {} to {}", JobStatus.RUNNING, JobStatus.CREATED);
+ return JobStatus.CREATED;
+ }
+ return effectiveStatus;
+ }
+}
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/JobStatusUtilsTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/JobStatusUtilsTest.java
new file mode 100644
index 0000000..4cdb166
--- /dev/null
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/JobStatusUtilsTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test for {@link JobStatusUtils}. */
+class JobStatusUtilsTest {
+
+ @Test
+ void effectiveStatusTest() {
+ var allRunning = getJobDetails(JobStatus.RUNNING, Tuple2.of(ExecutionState.RUNNING, 4));
+ assertEquals(JobStatus.RUNNING, JobStatusUtils.getEffectiveStatus(allRunning));
+
+ var allRunningOrFinished =
+ getJobDetails(
+ JobStatus.RUNNING,
+ Tuple2.of(ExecutionState.RUNNING, 2),
+ Tuple2.of(ExecutionState.FINISHED, 2));
+ assertEquals(JobStatus.RUNNING, JobStatusUtils.getEffectiveStatus(allRunningOrFinished));
+
+ var allRunningOrScheduled =
+ getJobDetails(
+ JobStatus.RUNNING,
+ Tuple2.of(ExecutionState.RUNNING, 2),
+ Tuple2.of(ExecutionState.SCHEDULED, 2));
+ assertEquals(JobStatus.CREATED, JobStatusUtils.getEffectiveStatus(allRunningOrScheduled));
+
+ var allFinished = getJobDetails(JobStatus.FINISHED, Tuple2.of(ExecutionState.FINISHED, 4));
+ assertEquals(JobStatus.FINISHED, JobStatusUtils.getEffectiveStatus(allFinished));
+ }
+
+ @SafeVarargs
+ private JobDetails getJobDetails(
+ JobStatus status, Tuple2<ExecutionState, Integer>... tasksPerState) {
+ int[] countPerState = new int[ExecutionState.values().length];
+ for (var taskPerState : tasksPerState) {
+ countPerState[taskPerState.f0.ordinal()] = taskPerState.f1;
+ }
+ int numTasks = Arrays.stream(countPerState).sum();
+ return new JobDetails(
+ new JobID(),
+ "test-job",
+ System.currentTimeMillis(),
+ -1,
+ 0,
+ status,
+ System.currentTimeMillis(),
+ countPerState,
+ numTasks);
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 42ce5eb..ae8979a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.autoscaler.utils.JobStatusUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
@@ -50,12 +51,9 @@
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
@@ -272,7 +270,7 @@
JobsOverviewHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
- .thenApply(AbstractFlinkService::toJobStatusMessage)
+ .thenApply(JobStatusUtils::toJobStatusMessage)
.get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
}
}
@@ -902,19 +900,6 @@
timeout);
}
- private static List<JobStatusMessage> toJobStatusMessage(
- MultipleJobsDetails multipleJobsDetails) {
- return multipleJobsDetails.getJobs().stream()
- .map(
- details ->
- new JobStatusMessage(
- details.getJobId(),
- details.getJobName(),
- getEffectiveStatus(details),
- details.getStartTime()))
- .collect(Collectors.toList());
- }
-
@VisibleForTesting
protected static Configuration removeOperatorConfigs(Configuration config) {
Configuration newConfig = new Configuration();
@@ -929,19 +914,6 @@
return newConfig;
}
- @VisibleForTesting
- protected static JobStatus getEffectiveStatus(JobDetails details) {
- int numRunning = details.getTasksPerState()[ExecutionState.RUNNING.ordinal()];
- int numFinished = details.getTasksPerState()[ExecutionState.FINISHED.ordinal()];
- boolean allRunningOrFinished = details.getNumTasks() == (numRunning + numFinished);
- JobStatus effectiveStatus = details.getStatus();
- if (JobStatus.RUNNING.equals(effectiveStatus) && !allRunningOrFinished) {
- effectiveStatus = JobStatus.CREATED;
- LOG.debug("Adjusting job state from {} to {}", JobStatus.RUNNING, effectiveStatus);
- }
- return effectiveStatus;
- }
-
private void validateHaMetadataExists(Configuration conf) {
if (!isHaMetadataAvailable(conf)) {
throw new RecoveryFailureException(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index 8a928a2..8e9683d 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -52,10 +52,8 @@
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
@@ -123,7 +121,6 @@
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -962,63 +959,6 @@
}
@Test
- public void effectiveStatusTest() {
- JobDetails allRunning =
- getJobDetails(
- org.apache.flink.api.common.JobStatus.RUNNING,
- Tuple2.of(ExecutionState.RUNNING, 4));
- assertEquals(
- org.apache.flink.api.common.JobStatus.RUNNING,
- AbstractFlinkService.getEffectiveStatus(allRunning));
-
- JobDetails allRunningOrFinished =
- getJobDetails(
- org.apache.flink.api.common.JobStatus.RUNNING,
- Tuple2.of(ExecutionState.RUNNING, 2),
- Tuple2.of(ExecutionState.FINISHED, 2));
- assertEquals(
- org.apache.flink.api.common.JobStatus.RUNNING,
- AbstractFlinkService.getEffectiveStatus(allRunningOrFinished));
-
- JobDetails allRunningOrScheduled =
- getJobDetails(
- org.apache.flink.api.common.JobStatus.RUNNING,
- Tuple2.of(ExecutionState.RUNNING, 2),
- Tuple2.of(ExecutionState.SCHEDULED, 2));
- assertEquals(
- org.apache.flink.api.common.JobStatus.CREATED,
- AbstractFlinkService.getEffectiveStatus(allRunningOrScheduled));
-
- JobDetails allFinished =
- getJobDetails(
- org.apache.flink.api.common.JobStatus.FINISHED,
- Tuple2.of(ExecutionState.FINISHED, 4));
- assertEquals(
- org.apache.flink.api.common.JobStatus.FINISHED,
- AbstractFlinkService.getEffectiveStatus(allFinished));
- }
-
- private JobDetails getJobDetails(
- org.apache.flink.api.common.JobStatus status,
- Tuple2<ExecutionState, Integer>... tasksPerState) {
- int[] countPerState = new int[ExecutionState.values().length];
- for (var taskPerState : tasksPerState) {
- countPerState[taskPerState.f0.ordinal()] = taskPerState.f1;
- }
- int numTasks = Arrays.stream(countPerState).sum();
- return new JobDetails(
- new JobID(),
- "test-job",
- System.currentTimeMillis(),
- -1,
- 0,
- status,
- System.currentTimeMillis(),
- countPerState,
- numTasks);
- }
-
- @Test
public void isJobManagerReadyTest() throws Exception {
AtomicReference<String> url = new AtomicReference<>();
var clusterClient =