[FLINK-33099][autoscaler] Introduce the Standalone Autoscaler and support flink cluster
diff --git a/flink-autoscaler-standalone/README.md b/flink-autoscaler-standalone/README.md
new file mode 100644
index 0000000..a4e2daa
--- /dev/null
+++ b/flink-autoscaler-standalone/README.md
@@ -0,0 +1,75 @@
+# Flink Autoscaler Standalone
+
+## What's the autoscaler standalone?
+
+`Flink Autoscaler Standalone` is an implementation of `Flink Autoscaler`, it runs as
+a separate java process. It computes the reasonable parallelism of all job vertices
+by monitoring the metrics, such as: processing rate, busy time, etc. Please see
+[Autoscaler official doc](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/)
+for an overview of how autoscaling works.
+
+`Flink Autoscaler Standalone` rescales flink job in-place by rest api of
+[Externalized Declarative Resource Management](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#externalized-declarative-resource-management).
+`RescaleApiScalingRealizer` is the default implementation of `ScalingRealizer`,
+it uses the Rescale API to apply parallelism changes.
+
+Kubernetes Operator is well integrated with Autoscaler, we strongly recommend using
+Kubernetes Operator directly for the kubernetes flink jobs, and only flink jobs in
+non-kubernetes environments use Autoscaler Standalone.
+
+## How To Use
+
+Currently, `Flink Autoscaler Standalone` only supports a single Flink cluster.
+It can be any type of Flink cluster, includes:
+
+- Flink Standalone Cluster
+- MiniCluster
+- Flink yarn session cluster
+- Flink yarn application cluster
+- Flink kubernetes session cluster
+- Flink kubernetes application cluster
+- etc
+
+You can start a Flink Streaming job with the following ConfigOptions.
+
+```
+# Enable Adaptvie scheduler to play the in-place rescaling.
+jobmanager.scheduler : adaptive
+
+# Enable autoscale and scaling
+job.autoscaler.enabled : true
+job.autoscaler.scaling.enabled : true
+job.autoscaler.stabilization.interval : 1m
+job.autoscaler.metrics.window : 3m
+```
+
+Note: In-place rescaling is only supported since Flink 1.18. Flink jobs before version
+1.18 cannot be scaled automatically, but you can view the ScalingReport in Log.
+ScalingReport will show the recommended parallelism for each vertex.
+
+After the flink job starts, please start the StandaloneAutoscaler process by the
+following command.
+
+```
+java -cp flink-autoscaler-standalone-1.7-SNAPSHOT.jar \
+org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint \
+--flinkClusterHost localhost \
+--flinkClusterPort 8081
+```
+
+Updating the `flinkClusterHost` and `flinkClusterPort` based on your flink cluster.
+In general, the host and port are the same as Flink WebUI.
+
+## Extensibility of autoscaler standalone
+
+Please click [here](../flink-autoscaler/README.md) to check out extensibility of generic autoscaler.
+
+`Autoscaler Standalone` isn't responsible for job management, so it doesn't have job information.
+`Autoscaler Standalone` defines the `JobListFetcher` interface in order to get the
+`JobAutoScalerContext` of the job. It has a control loop that periodically calls
+`JobListFetcher#fetch` to fetch the job list and scale these jobs.
+
+Currently `FlinkClusterJobListFetcher` is the only implementation of the `JobListFetcher`
+interface, that's why `Flink Autoscaler Standalone` only supports a single Flink cluster so far.
+We will implement `YarnJobListFetcher` in the future, `Flink Autoscaler Standalone` will call
+`YarnJobListFetcher#fetch` to fetch job list from yarn cluster periodically.
diff --git a/flink-autoscaler-standalone/pom.xml b/flink-autoscaler-standalone/pom.xml
new file mode 100644
index 0000000..5e572d1
--- /dev/null
+++ b/flink-autoscaler-standalone/pom.xml
@@ -0,0 +1,207 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-kubernetes-operator-parent</artifactId>
+ <version>1.7-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-autoscaler-standalone</artifactId>
+ <name>Flink Autoscaler Standalone</name>
+ <packaging>jar</packaging>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-autoscaler</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-rpc-akka-loader</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-zookeeper-3</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-queryable-state-client-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-hadoop-fs</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-text</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.lz4</groupId>
+ <artifactId>lz4-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>flink-streaming-java</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>flink-optimizer</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Logging -->
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-1.2-api</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+
+ <!-- Test -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-autoscaler</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>${assertj.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <!-- Run shade goal on package phase -->
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <excludes>
+ <exclude>org.apache.flink:flink-shaded-force-shading</exclude>
+ <exclude>com.google.code.findbugs:jsr305</exclude>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <!-- Do not copy the signatures in the META-INF folder.
+ Otherwise, this might cause SecurityExceptions when using the JAR. -->
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>META-INF/**/module-info.class</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/JobListFetcher.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/JobListFetcher.java
new file mode 100644
index 0000000..4e12dd2
--- /dev/null
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/JobListFetcher.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import java.util.List;
+
+/** The JobListFetcher will fetch the jobContext of all jobs. */
+@Experimental
+public interface JobListFetcher<KEY, Context extends JobAutoScalerContext<KEY>> {
+
+ List<Context> fetch() throws Exception;
+}
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java
new file mode 100644
index 0000000..3336b91
--- /dev/null
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.autoscaler.JobAutoScaler;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.JobAutoScalerImpl;
+import org.apache.flink.autoscaler.RestApiMetricsCollector;
+import org.apache.flink.autoscaler.ScalingExecutor;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.event.LoggingEventHandler;
+import org.apache.flink.autoscaler.standalone.flinkcluster.FlinkClusterJobListFetcher;
+import org.apache.flink.autoscaler.standalone.realizer.RescaleApiScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.util.TimeUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.FLINK_CLIENT_TIMEOUT;
+
+/** The entrypoint of the standalone autoscaler. */
+@Experimental
+public class StandaloneAutoscalerEntrypoint {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StandaloneAutoscalerEntrypoint.class);
+
+ public static final String SCALING_INTERVAL = "scalingInterval";
+ private static final Duration DEFAULT_SCALING_INTERVAL = Duration.ofSeconds(10);
+
+ // This timeout option is used before the job config is got, such as: listJobs, get
+ // Configuration, etc.
+ public static final String REST_CLIENT_TIMEOUT = "restClientTimeout";
+
+ /** Arguments related to {@link FlinkClusterJobListFetcher}. */
+ public static final String FLINK_CLUSTER_HOST = "flinkClusterHost";
+
+ private static final String DEFAULT_FLINK_CLUSTER_HOST = "localhost";
+
+ public static final String FLINK_CLUSTER_PORT = "flinkClusterPort";
+ private static final int DEFAULT_FLINK_CLUSTER_PORT = 8081;
+
+ public static <KEY, Context extends JobAutoScalerContext<KEY>> void main(String[] args) {
+ var parameters = ParameterTool.fromArgs(args);
+ LOG.info("The standalone autoscaler is started, parameters: {}", parameters.toMap());
+
+ var scalingInterval = DEFAULT_SCALING_INTERVAL;
+ if (parameters.get(SCALING_INTERVAL) != null) {
+ scalingInterval = TimeUtils.parseDuration(parameters.get(SCALING_INTERVAL));
+ }
+
+ var restClientTimeout = FLINK_CLIENT_TIMEOUT.defaultValue();
+ if (parameters.get(REST_CLIENT_TIMEOUT) != null) {
+ restClientTimeout = TimeUtils.parseDuration(parameters.get(REST_CLIENT_TIMEOUT));
+ }
+
+ // Initialize JobListFetcher and JobAutoScaler.
+ var eventHandler = new LoggingEventHandler<KEY, Context>();
+ JobListFetcher<KEY, Context> jobListFetcher =
+ createJobListFetcher(parameters, restClientTimeout);
+ var autoScaler = createJobAutoscaler(eventHandler);
+
+ var autoscalerExecutor =
+ new StandaloneAutoscalerExecutor<>(
+ scalingInterval, jobListFetcher, eventHandler, autoScaler);
+ autoscalerExecutor.start();
+ }
+
+ private static <KEY, Context extends JobAutoScalerContext<KEY>>
+ JobListFetcher<KEY, Context> createJobListFetcher(
+ ParameterTool parameters, Duration restClientTimeout) {
+ var host = parameters.get(FLINK_CLUSTER_HOST, DEFAULT_FLINK_CLUSTER_HOST);
+ var port = parameters.getInt(FLINK_CLUSTER_PORT, DEFAULT_FLINK_CLUSTER_PORT);
+ var restServerAddress = String.format("http://%s:%s", host, port);
+
+ return (JobListFetcher<KEY, Context>)
+ new FlinkClusterJobListFetcher(
+ conf ->
+ new RestClusterClient<>(
+ conf,
+ "clusterId",
+ (c, e) ->
+ new StandaloneClientHAServices(restServerAddress)),
+ restClientTimeout);
+ }
+
+ private static <KEY, Context extends JobAutoScalerContext<KEY>>
+ JobAutoScaler<KEY, Context> createJobAutoscaler(
+ AutoScalerEventHandler<KEY, Context> eventHandler) {
+ AutoScalerStateStore<KEY, Context> stateStore = new InMemoryAutoScalerStateStore<>();
+ return new JobAutoScalerImpl<>(
+ new RestApiMetricsCollector<>(),
+ new ScalingMetricEvaluator(),
+ new ScalingExecutor<>(eventHandler, stateStore),
+ eventHandler,
+ new RescaleApiScalingRealizer<>(eventHandler),
+ stateStore);
+ }
+}
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
new file mode 100644
index 0000000..aa39812
--- /dev/null
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.JobAutoScaler;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+
+import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/** The executor of the standalone autoscaler. */
+public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerContext<KEY>>
+ implements Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StandaloneAutoscalerExecutor.class);
+
+ @VisibleForTesting protected static final String AUTOSCALER_ERROR = "AutoscalerError";
+
+ private final Duration scalingInterval;
+ private final JobListFetcher<KEY, Context> jobListFetcher;
+ private final AutoScalerEventHandler<KEY, Context> eventHandler;
+ private final JobAutoScaler<KEY, Context> autoScaler;
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ public StandaloneAutoscalerExecutor(
+ @Nonnull Duration scalingInterval,
+ @Nonnull JobListFetcher<KEY, Context> jobListFetcher,
+ @Nonnull AutoScalerEventHandler<KEY, Context> eventHandler,
+ @Nonnull JobAutoScaler<KEY, Context> autoScaler) {
+ this.scalingInterval = scalingInterval;
+ this.jobListFetcher = jobListFetcher;
+ this.eventHandler = eventHandler;
+ this.autoScaler = autoScaler;
+ this.scheduledExecutorService =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat("StandaloneAutoscalerControlLoop")
+ .setDaemon(false)
+ .build());
+ }
+
+ public void start() {
+ LOG.info("Schedule control loop.");
+ scheduledExecutorService.scheduleWithFixedDelay(
+ this::scaling, 0, scalingInterval.toMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void close() {
+ scheduledExecutorService.shutdownNow();
+ }
+
+ @VisibleForTesting
+ protected void scaling() {
+ LOG.info("Standalone autoscaler starts scaling.");
+ try {
+ var jobList = jobListFetcher.fetch();
+ for (var jobContext : jobList) {
+ try {
+ autoScaler.scale(jobContext);
+ } catch (Throwable e) {
+ LOG.error("Error while scaling job", e);
+ eventHandler.handleEvent(
+ jobContext,
+ AutoScalerEventHandler.Type.Warning,
+ AUTOSCALER_ERROR,
+ e.getMessage(),
+ null,
+ null);
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error("Error while fetch job list.", e);
+ }
+ }
+}
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
new file mode 100644
index 0000000..caf4ee4
--- /dev/null
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone.flinkcluster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.standalone.JobListFetcher;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/** Fetch JobAutoScalerContext based on flink cluster. */
+public class FlinkClusterJobListFetcher
+ implements JobListFetcher<JobID, JobAutoScalerContext<JobID>> {
+
+ private final FunctionWithException<Configuration, RestClusterClient<String>, Exception>
+ restClientGetter;
+ private final Duration restClientTimeout;
+
+ public FlinkClusterJobListFetcher(
+ FunctionWithException<Configuration, RestClusterClient<String>, Exception>
+ restClientGetter,
+ Duration restClientTimeout) {
+ this.restClientGetter = restClientGetter;
+ this.restClientTimeout = restClientTimeout;
+ }
+
+ @Override
+ public List<JobAutoScalerContext<JobID>> fetch() throws Exception {
+ try (var restClusterClient = restClientGetter.apply(new Configuration())) {
+ return restClusterClient.listJobs().get(restClientTimeout.toSeconds(), TimeUnit.SECONDS)
+ .stream()
+ .map(
+ jobStatusMessage -> {
+ try {
+ return generateJobContext(restClusterClient, jobStatusMessage);
+ } catch (Throwable e) {
+ throw new RuntimeException(
+ "generateJobContext throw exception", e);
+ }
+ })
+ .collect(Collectors.toList());
+ }
+ }
+
+ private JobAutoScalerContext<JobID> generateJobContext(
+ RestClusterClient<String> restClusterClient, JobStatusMessage jobStatusMessage)
+ throws Exception {
+ var jobId = jobStatusMessage.getJobId();
+ var conf = getConfiguration(restClusterClient, jobId);
+
+ return new JobAutoScalerContext<>(
+ jobId,
+ jobId,
+ jobStatusMessage.getJobState(),
+ conf,
+ new UnregisteredMetricsGroup(),
+ () -> restClientGetter.apply(conf));
+ }
+
+ private Configuration getConfiguration(RestClusterClient<String> restClusterClient, JobID jobId)
+ throws Exception {
+ var jobParameters = new JobMessageParameters();
+ jobParameters.jobPathParameter.resolve(jobId);
+
+ var configurationInfo =
+ restClusterClient
+ .sendRequest(
+ JobManagerJobConfigurationHeaders.getInstance(),
+ jobParameters,
+ EmptyRequestBody.getInstance())
+ .get(restClientTimeout.toSeconds(), TimeUnit.SECONDS);
+
+ var conf = new Configuration();
+ configurationInfo.forEach(entry -> conf.setString(entry.getKey(), entry.getValue()));
+ return conf;
+ }
+}
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
new file mode 100644
index 0000000..d04e6d3
--- /dev/null
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone.realizer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
+import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A ScalingRealizer which uses the Rescale API to apply parallelism changes.
+ *
+ * <p>Note: This is based on code copied from the operator, and they don't depend on each other, so
+ * some code is duplicated.
+ */
+public class RescaleApiScalingRealizer<KEY, Context extends JobAutoScalerContext<KEY>>
+ implements ScalingRealizer<KEY, Context> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RescaleApiScalingRealizer.class);
+
+ @VisibleForTesting static final String SCALING = "Scaling";
+
+ private final AutoScalerEventHandler<KEY, Context> eventHandler;
+
+ public RescaleApiScalingRealizer(AutoScalerEventHandler<KEY, Context> eventHandler) {
+ this.eventHandler = eventHandler;
+ }
+
+ @Override
+ public void realize(Context context, Map<String, String> parallelismOverrides) {
+ Configuration conf = context.getConfiguration();
+ if (!conf.get(JobManagerOptions.SCHEDULER)
+ .equals(JobManagerOptions.SchedulerType.Adaptive)) {
+ LOG.warn("In-place rescaling is only available with the adaptive scheduler.");
+ return;
+ }
+
+ var jobID = context.getJobID();
+ if (JobStatus.RUNNING != context.getJobStatus()) {
+ LOG.warn("Job in terminal or reconciling state cannot be scaled in-place.");
+ return;
+ }
+
+ var flinkRestClientTimeout = conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT);
+
+ try (var client = context.getRestClusterClient()) {
+ var requirements =
+ new HashMap<>(getVertexResources(client, jobID, flinkRestClientTimeout));
+ var parallelismUpdated = false;
+
+ for (Map.Entry<JobVertexID, JobVertexResourceRequirements> entry :
+ requirements.entrySet()) {
+ var jobVertexId = entry.getKey().toString();
+ var parallelism = entry.getValue().getParallelism();
+ var overrideStr = parallelismOverrides.get(jobVertexId);
+
+ // No overrides for this vertex
+ if (overrideStr == null) {
+ continue;
+ }
+
+ // We have an override for the vertex
+ var p = Integer.parseInt(overrideStr);
+ var newParallelism = new JobVertexResourceRequirements.Parallelism(1, p);
+ // If the requirements changed we mark this as scaling triggered
+ if (!parallelism.equals(newParallelism)) {
+ entry.setValue(new JobVertexResourceRequirements(newParallelism));
+ parallelismUpdated = true;
+ }
+ }
+ if (parallelismUpdated) {
+ updateVertexResources(client, jobID, flinkRestClientTimeout, requirements);
+ eventHandler.handleEvent(
+ context,
+ AutoScalerEventHandler.Type.Normal,
+ SCALING,
+ String.format(
+ "In-place scaling triggered, the new requirements is %s.",
+ requirements),
+ null,
+ null);
+ } else {
+ LOG.info("Vertex resources requirements already match target, nothing to do...");
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to apply parallelism overrides.", e);
+ }
+ }
+
+ private Map<JobVertexID, JobVertexResourceRequirements> getVertexResources(
+ RestClusterClient<String> client, JobID jobID, Duration restClientTimeout)
+ throws Exception {
+ var jobParameters = new JobMessageParameters();
+ jobParameters.jobPathParameter.resolve(jobID);
+
+ var currentRequirements =
+ client.sendRequest(
+ new JobResourceRequirementsHeaders(),
+ jobParameters,
+ EmptyRequestBody.getInstance())
+ .get(restClientTimeout.toSeconds(), TimeUnit.SECONDS);
+
+ return currentRequirements.asJobResourceRequirements().get().getJobVertexParallelisms();
+ }
+
+ private void updateVertexResources(
+ RestClusterClient<String> client,
+ JobID jobID,
+ Duration restClientTimeout,
+ Map<JobVertexID, JobVertexResourceRequirements> newReqs)
+ throws Exception {
+ var jobParameters = new JobMessageParameters();
+ jobParameters.jobPathParameter.resolve(jobID);
+
+ var requestBody = new JobResourceRequirementsBody(new JobResourceRequirements(newReqs));
+
+ client.sendRequest(new JobResourcesRequirementsUpdateHeaders(), jobParameters, requestBody)
+ .get(restClientTimeout.toSeconds(), TimeUnit.SECONDS);
+ }
+}
diff --git a/flink-autoscaler-standalone/src/main/resources/META-INF/NOTICE b/flink-autoscaler-standalone/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..b8a5183
--- /dev/null
+++ b/flink-autoscaler-standalone/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,31 @@
+flink-autoscaler-standalone
+Copyright 2014-2023 The Apache Software Foundation
+
+This project includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+- com.fasterxml.jackson.core:jackson-core:2.15.0
+- com.fasterxml.jackson.core:jackson-databind:2.15.0
+- com.fasterxml.jackson.core:jackson-annotations:2.15.0
+- org.objenesis:objenesis:2.1
+- commons-collections:commons-collections:3.2.2
+- org.apache.commons:commons-math3:3.6.1
+- com.twitter:chill-java:0.7.6
+- commons-io:commons-io:2.11.0
+- org.apache.commons:commons-lang3:3.12.0
+- commons-cli:commons-cli:1.5.0
+- org.javassist:javassist:3.24.0-GA
+- com.google.code.findbugs:jsr305:1.3.9
+- org.slf4j:slf4j-api:1.7.36
+- org.apache.logging.log4j:log4j-slf4j-impl:2.17.1
+- org.apache.logging.log4j:log4j-api:2.17.1
+- org.apache.logging.log4j:log4j-core:2.17.1
+- org.apache.logging.log4j:log4j-1.2-api:2.17.1
+
+This project bundles the following dependencies under the BSD License.
+See bundled license files for details.
+
+- com.esotericsoftware.kryo:kryo:2.24.0
+- com.esotericsoftware.minlog:minlog:1.2
diff --git a/flink-autoscaler-standalone/src/main/resources/log4j2.properties b/flink-autoscaler-standalone/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..4a6ab83
--- /dev/null
+++ b/flink-autoscaler-standalone/src/main/resources/log4j2.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+# Log all infos to the console
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level] [%X{resource.namespace}.%X{resource.name}] %msg%n%throwable}
+
diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
new file mode 100644
index 0000000..464d8ec
--- /dev/null
+++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.JobAutoScaler;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.event.TestingEventCollector;
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+/** Test for {@link StandaloneAutoscalerExecutor}. */
+class StandaloneAutoscalerExecutorTest {
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testScaling(boolean throwExceptionWhileScale) {
+ JobAutoScalerContext<JobID> jobContext1 = createJobAutoScalerContext();
+ JobAutoScalerContext<JobID> jobContext2 = createJobAutoScalerContext();
+ var jobList = List.of(jobContext1, jobContext2);
+ Set<JobID> exceptionKeys =
+ throwExceptionWhileScale
+ ? Set.of(jobContext1.getJobKey(), jobContext2.getJobKey())
+ : Set.of();
+
+ var actualScaleContexts = new ArrayList<JobAutoScalerContext<JobID>>();
+
+ var eventCollector = new TestingEventCollector<JobID, JobAutoScalerContext<JobID>>();
+ var autoscalerExecutor =
+ new StandaloneAutoscalerExecutor<>(
+ Duration.ofSeconds(2),
+ () -> jobList,
+ eventCollector,
+ createJobAutoScaler(actualScaleContexts, exceptionKeys));
+
+ autoscalerExecutor.scaling();
+ assertThat(actualScaleContexts).isEqualTo(jobList);
+ assertThat(eventCollector.events)
+ .hasSameSizeAs(exceptionKeys)
+ .allMatch(
+ event ->
+ event.getReason()
+ .equals(StandaloneAutoscalerExecutor.AUTOSCALER_ERROR));
+ }
+
+ @Test
+ void testFetchException() {
+ var eventCollector = new TestingEventCollector<JobID, JobAutoScalerContext<JobID>>();
+ var autoscalerExecutor =
+ new StandaloneAutoscalerExecutor<>(
+ Duration.ofSeconds(2),
+ () -> {
+ throw new RuntimeException("Excepted exception.");
+ },
+ eventCollector,
+ new JobAutoScaler<>() {
+ @Override
+ public void scale(JobAutoScalerContext<JobID> context) {
+ fail("Should be called.");
+ }
+
+ @Override
+ public void cleanup(JobID jobID) {
+ fail("Should be called.");
+ }
+ });
+
+ // scaling shouldn't throw exception even if fetch fails
+ assertDoesNotThrow(autoscalerExecutor::scaling);
+ }
+
+ private static JobAutoScalerContext<JobID> createJobAutoScalerContext() {
+ var jobID = new JobID();
+ return new JobAutoScalerContext<>(
+ jobID, jobID, JobStatus.RUNNING, new Configuration(), null, null);
+ }
+
+ private static JobAutoScaler<JobID, JobAutoScalerContext<JobID>> createJobAutoScaler(
+ List<JobAutoScalerContext<JobID>> actualScaleContexts, Set<JobID> exceptionKeys) {
+ return new JobAutoScaler<>() {
+ @Override
+ public void scale(JobAutoScalerContext<JobID> context) {
+ actualScaleContexts.add(context);
+ if (exceptionKeys.contains(context.getJobKey())) {
+ throw new RuntimeException("Excepted exception.");
+ }
+ }
+
+ @Override
+ public void cleanup(JobID jobKey) {
+ fail("Should be called.");
+ }
+ };
+ }
+}
diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcherTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcherTest.java
new file mode 100644
index 0000000..658eb58
--- /dev/null
+++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcherTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone.flinkcluster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Test for {@link FlinkClusterJobListFetcher}. */
+class FlinkClusterJobListFetcherTest {
+
+ /** Test whether the job list and confs are expected. */
+ @Test
+ void testFetchJobListAndConfigurationInfo() throws Exception {
+ var job1 =
+ new JobStatusMessage(
+ new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
+ var job2 =
+ new JobStatusMessage(
+ new JobID(), "", JobStatus.CANCELLING, Instant.now().toEpochMilli());
+
+ Configuration expectedConf1 = new Configuration();
+ expectedConf1.setString("option_key1", "option_value1");
+
+ Configuration expectedConf2 = new Configuration();
+ expectedConf2.setString("option_key2", "option_value2");
+ expectedConf2.setString("option_key3", "option_value3");
+
+ var jobs = Map.of(job1.getJobId(), job1, job2.getJobId(), job2);
+ var configurations = Map.of(job1.getJobId(), expectedConf1, job2.getJobId(), expectedConf2);
+ var closeCounter = new AtomicLong();
+ FlinkClusterJobListFetcher jobListFetcher =
+ new FlinkClusterJobListFetcher(
+ getRestClusterClient(
+ Either.Left(List.of(job1, job2)),
+ Either.Left(
+ Map.of(
+ job1.getJobId(),
+ ConfigurationInfo.from(expectedConf1),
+ job2.getJobId(),
+ ConfigurationInfo.from(expectedConf2))),
+ closeCounter),
+ Duration.ofSeconds(10));
+
+ // Fetch multiple times and check whether the results are as expected each time
+ for (int i = 1; i <= 3; i++) {
+ var fetchedJobList = jobListFetcher.fetch();
+ // Check whether rest client is closed.
+ assertThat(closeCounter).hasValue(i);
+
+ assertThat(fetchedJobList).hasSize(2);
+ for (var jobContext : fetchedJobList) {
+ JobStatusMessage expectedJobStatusMessage = jobs.get(jobContext.getJobID());
+ Configuration expectedConf = configurations.get(jobContext.getJobID());
+ assertThat(expectedJobStatusMessage).isNotNull();
+ assertThat(jobContext.getJobStatus())
+ .isEqualTo(expectedJobStatusMessage.getJobState());
+ assertThat(jobContext.getConfiguration()).isNotNull().isEqualTo(expectedConf);
+ }
+ }
+ }
+
+ /**
+ * Test whether the exception is expected after rest client fetches job list throws exception,
+ * and restClient can be closed normally.
+ */
+ @Test
+ void testFetchJobListException() {
+ var expectedException = new RuntimeException("Expected exception.");
+ var closeCounter = new AtomicLong();
+
+ FlinkClusterJobListFetcher jobListFetcher =
+ new FlinkClusterJobListFetcher(
+ getRestClusterClient(
+ Either.Right(expectedException),
+ Either.Left(Map.of()),
+ closeCounter),
+ Duration.ofSeconds(10));
+ assertThatThrownBy(jobListFetcher::fetch).getCause().isEqualTo(expectedException);
+ assertThat(closeCounter).hasValue(1);
+ }
+
+ /**
+ * Test whether the exception is expected after rest client fetches conf throws exception, and
+ * restClient can be closed normally.
+ */
+ @Test
+ void testFetchConfigurationException() {
+ var job1 =
+ new JobStatusMessage(
+ new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
+ var expectedException = new RuntimeException("Expected exception.");
+ var closeCounter = new AtomicLong();
+
+ FlinkClusterJobListFetcher jobListFetcher =
+ new FlinkClusterJobListFetcher(
+ getRestClusterClient(
+ Either.Left(List.of(job1)),
+ Either.Right(expectedException),
+ closeCounter),
+ Duration.ofSeconds(10));
+
+ assertThatThrownBy(jobListFetcher::fetch).getRootCause().isEqualTo(expectedException);
+ assertThat(closeCounter).hasValue(1);
+ }
+
+ /**
+ * Test whether the exception is expected after rest client fetches job list timeout, and
+ * restClient can be closed normally.
+ */
+ @Test
+ void testFetchJobListTimeout() {
+ CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+ FlinkClusterJobListFetcher jobListFetcher =
+ new FlinkClusterJobListFetcher(
+ getTimeoutableRestClusterClient(null, null, closeFuture),
+ Duration.ofSeconds(2));
+
+ assertThat(closeFuture).isNotDone();
+ assertThatThrownBy(jobListFetcher::fetch).isInstanceOf(TimeoutException.class);
+ assertThat(closeFuture).isDone();
+ }
+
+ /**
+ * Test whether the exception is expected after rest client fetches conf timeout, and restClient
+ * can be closed normally.
+ */
+ @Test
+ void testFetchConfigurationTimeout() {
+ var job1 =
+ new JobStatusMessage(
+ new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
+ CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+
+ FlinkClusterJobListFetcher jobListFetcher =
+ new FlinkClusterJobListFetcher(
+ getTimeoutableRestClusterClient(List.of(job1), null, closeFuture),
+ Duration.ofSeconds(2));
+
+ assertThat(closeFuture).isNotDone();
+ assertThatThrownBy(jobListFetcher::fetch)
+ .getRootCause()
+ .isInstanceOf(TimeoutException.class);
+ assertThat(closeFuture).isDone();
+ }
+
+ /**
+ * @param jobListOrException When listJobs is called, return jobList if Either is left, return
+ * failedFuture if Either is right.
+ * @param configurationsOrException When fetch job conf, return configuration if Either is left,
+ * return failedFuture if Either is right.
+ * @param closeCounter Increment the count each time the {@link RestClusterClient#close} is
+ * called
+ */
+ private static FunctionWithException<Configuration, RestClusterClient<String>, Exception>
+ getRestClusterClient(
+ Either<Collection<JobStatusMessage>, Throwable> jobListOrException,
+ Either<Map<JobID, ConfigurationInfo>, Throwable> configurationsOrException,
+ AtomicLong closeCounter) {
+ return conf ->
+ new RestClusterClient<>(
+ conf,
+ "test-cluster",
+ (c, e) -> new StandaloneClientHAServices("localhost")) {
+
+ @Override
+ public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
+ if (jobListOrException.isLeft()) {
+ return CompletableFuture.completedFuture(jobListOrException.left());
+ }
+ return CompletableFuture.failedFuture(jobListOrException.right());
+ }
+
+ @Override
+ public <
+ M extends MessageHeaders<R, P, U>,
+ U extends MessageParameters,
+ R extends RequestBody,
+ P extends ResponseBody>
+ CompletableFuture<P> sendRequest(M h, U p, R r) {
+ if (h instanceof JobManagerJobConfigurationHeaders) {
+ if (configurationsOrException.isRight()) {
+ return CompletableFuture.failedFuture(
+ configurationsOrException.right());
+ }
+ var jobID = ((JobMessageParameters) p).jobPathParameter.getValue();
+ return (CompletableFuture<P>)
+ CompletableFuture.completedFuture(
+ configurationsOrException.left().get(jobID));
+ }
+ fail("Unknown request");
+ return null;
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ closeCounter.incrementAndGet();
+ }
+ };
+ }
+
+ /**
+ * @param jobList When listJobs is called, return jobList if it's not null, don't complete
+ * future if it's null.
+ * @param configuration When fetch job conf, return configuration if it's not null, don't
+ * complete future if it's null.
+ * @param closeFuture Complete this closeFuture when {@link RestClusterClient#close} is called.
+ */
+ private static FunctionWithException<Configuration, RestClusterClient<String>, Exception>
+ getTimeoutableRestClusterClient(
+ @Nullable Collection<JobStatusMessage> jobList,
+ @Nullable ConfigurationInfo configuration,
+ CompletableFuture<Void> closeFuture) {
+ return conf ->
+ new RestClusterClient<>(
+ conf,
+ "test-cluster",
+ (c, e) -> new StandaloneClientHAServices("localhost")) {
+
+ @Override
+ public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
+ if (jobList == null) {
+ return new CompletableFuture<>();
+ }
+ return CompletableFuture.completedFuture(jobList);
+ }
+
+ @Override
+ public <
+ M extends MessageHeaders<R, P, U>,
+ U extends MessageParameters,
+ R extends RequestBody,
+ P extends ResponseBody>
+ CompletableFuture<P> sendRequest(M h, U p, R r) {
+ if (h instanceof JobManagerJobConfigurationHeaders) {
+ if (configuration == null) {
+ return new CompletableFuture<>();
+ }
+ return (CompletableFuture<P>)
+ CompletableFuture.completedFuture(configuration);
+ }
+ fail("Unknown request");
+ return null;
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ closeFuture.complete(null);
+ }
+ };
+ }
+}
diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
new file mode 100644
index 0000000..9205bac
--- /dev/null
+++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone.realizer;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.event.TestingEventCollector;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
+import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.autoscaler.standalone.realizer.RescaleApiScalingRealizer.SCALING;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Test for {@link RescaleApiScalingRealizer}. */
+class RescaleApiScalingRealizerTest {
+
+ /**
+ * Test whether scalingRealizer behaves as expected when the resource is changed or isn't
+ * changed.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testUpdateResourceRequirements(boolean resourceIsChanged) {
+ var jobID = new JobID();
+ var jobVertex1 = new JobVertexID().toHexString();
+ var jobVertex2 = new JobVertexID().toHexString();
+ var currentResourceRequirements = Map.of(jobVertex1, "5", jobVertex2, "10");
+
+ var newResourceRequirements = currentResourceRequirements;
+ if (resourceIsChanged) {
+ newResourceRequirements = Map.of(jobVertex1, "7", jobVertex2, "12");
+ }
+
+ var closeFuture = new CompletableFuture<Void>();
+ var updatedRequirements = new CompletableFuture<Optional<JobResourceRequirements>>();
+
+ var jobContext =
+ createJobAutoScalerContext(
+ jobID,
+ getRestClusterClient(
+ jobID,
+ createResourceRequirements(currentResourceRequirements),
+ updatedRequirements,
+ closeFuture));
+ jobContext
+ .getConfiguration()
+ .set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
+
+ var eventCollector = new TestingEventCollector<JobID, JobAutoScalerContext<JobID>>();
+ RescaleApiScalingRealizer<JobID, JobAutoScalerContext<JobID>> scalingRealizer =
+ new RescaleApiScalingRealizer<>(eventCollector);
+
+ assertThat(updatedRequirements).isNotDone();
+ assertThat(closeFuture).isNotDone();
+ scalingRealizer.realize(jobContext, newResourceRequirements);
+
+ // The ResourceRequirements should be updated when the newResourceRequirements is changed.
+ if (resourceIsChanged) {
+ assertThat(updatedRequirements)
+ .isCompletedWithValue(
+ Optional.of(createResourceRequirements(newResourceRequirements)));
+ assertThat(eventCollector.events).hasSize(1);
+ var event = eventCollector.events.getFirst();
+ assertThat(event.getContext()).isEqualTo(jobContext);
+ assertThat(event.getReason()).isEqualTo(SCALING);
+ } else {
+ assertThat(updatedRequirements).isNotDone();
+ assertThat(eventCollector.events).isEmpty();
+ }
+ assertThat(closeFuture).isDone();
+ }
+
+ @Test
+ void testDisableAdaptiveScheduler() {
+ var jobID = new JobID();
+ var jobVertex1 = new JobVertexID().toHexString();
+ var jobVertex2 = new JobVertexID().toHexString();
+ var resourceRequirements = Map.of(jobVertex1, "5", jobVertex2, "10");
+
+ var jobContext =
+ createJobAutoScalerContext(
+ jobID,
+ () ->
+ fail(
+ "The rest client shouldn't be created if the adaptive scheduler is disable."));
+
+ var eventCollector = new TestingEventCollector<JobID, JobAutoScalerContext<JobID>>();
+ RescaleApiScalingRealizer<JobID, JobAutoScalerContext<JobID>> scalingRealizer =
+ new RescaleApiScalingRealizer<>(eventCollector);
+
+ scalingRealizer.realize(jobContext, resourceRequirements);
+ assertThat(eventCollector.events).isEmpty();
+ }
+
+ @Test
+ void testJobNotRunning() {
+ var jobID = new JobID();
+ var jobVertex1 = new JobVertexID().toHexString();
+ var jobVertex2 = new JobVertexID().toHexString();
+ var resourceRequirements = Map.of(jobVertex1, "5", jobVertex2, "10");
+
+ var conf = new Configuration();
+ conf.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
+
+ var jobContext =
+ new JobAutoScalerContext<>(
+ jobID,
+ jobID,
+ JobStatus.CANCELLING,
+ conf,
+ null,
+ () ->
+ fail(
+ "The rest client shouldn't be created if the job isn't running."));
+
+ var eventCollector = new TestingEventCollector<JobID, JobAutoScalerContext<JobID>>();
+ RescaleApiScalingRealizer<JobID, JobAutoScalerContext<JobID>> scalingRealizer =
+ new RescaleApiScalingRealizer<>(eventCollector);
+
+ scalingRealizer.realize(jobContext, resourceRequirements);
+ assertThat(eventCollector.events).isEmpty();
+ }
+
+ private static JobAutoScalerContext<JobID> createJobAutoScalerContext(
+ JobID jobID,
+ SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier) {
+ return new JobAutoScalerContext<>(
+ jobID, jobID, JobStatus.RUNNING, new Configuration(), null, restClientSupplier);
+ }
+
+ private JobResourceRequirements createResourceRequirements(
+ Map<String, String> parallelismOverrides) {
+ Map<JobVertexID, JobVertexResourceRequirements> vertexResources = new HashMap<>();
+ parallelismOverrides.forEach(
+ (key, value) ->
+ vertexResources.put(
+ JobVertexID.fromHexString(key),
+ new JobVertexResourceRequirements(
+ new JobVertexResourceRequirements.Parallelism(
+ 1, Integer.parseInt(value)))));
+ return new JobResourceRequirements(vertexResources);
+ }
+
+ private static SupplierWithException<RestClusterClient<String>, Exception> getRestClusterClient(
+ JobID expectedJobID,
+ JobResourceRequirements currentRequirements,
+ CompletableFuture<Optional<JobResourceRequirements>> updatedRequirements,
+ CompletableFuture<Void> closeFuture) {
+
+ return () ->
+ new RestClusterClient<>(
+ new Configuration(),
+ "test-cluster",
+ (c, e) -> new StandaloneClientHAServices("localhost")) {
+
+ @Override
+ public <
+ M extends MessageHeaders<R, P, U>,
+ U extends MessageParameters,
+ R extends RequestBody,
+ P extends ResponseBody>
+ CompletableFuture<P> sendRequest(M h, U p, R r) {
+ if (h instanceof JobResourceRequirementsHeaders) {
+ if (expectedJobID.equals(
+ ((JobMessageParameters) p).jobPathParameter.getValue())) {
+ return (CompletableFuture<P>)
+ CompletableFuture.completedFuture(
+ new JobResourceRequirementsBody(
+ currentRequirements));
+ }
+ } else if (r instanceof JobResourceRequirementsBody) {
+ if (expectedJobID.equals(
+ ((JobMessageParameters) p).jobPathParameter.getValue())) {
+ updatedRequirements.complete(
+ ((JobResourceRequirementsBody) r)
+ .asJobResourceRequirements());
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+ fail("Unknown request");
+ return null;
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ closeFuture.complete(null);
+ }
+ };
+ }
+}
diff --git a/pom.xml b/pom.xml
index bc3ba1d..ef3a8a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,6 +57,7 @@
<module>flink-kubernetes-webhook</module>
<module>flink-kubernetes-docs</module>
<module>flink-autoscaler</module>
+ <module>flink-autoscaler-standalone</module>
<module>examples/flink-sql-runner-example</module>
<module>examples/flink-beam-example</module>
<module>examples/kubernetes-client-examples</module>