[FLINK-33099][autoscaler] Introduce the Standalone Autoscaler and support flink cluster
diff --git a/flink-autoscaler-standalone/README.md b/flink-autoscaler-standalone/README.md
new file mode 100644
index 0000000..a4e2daa
--- /dev/null
+++ b/flink-autoscaler-standalone/README.md
@@ -0,0 +1,75 @@
+# Flink Autoscaler Standalone
+
+## What's the autoscaler standalone?
+
+`Flink Autoscaler Standalone` is an implementation of `Flink Autoscaler`, it runs as 
+a separate java process. It computes the reasonable parallelism of all job vertices 
+by monitoring the metrics, such as: processing rate, busy time, etc. Please see 
+[Autoscaler official doc](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/) 
+for an overview of how autoscaling works.
+
+`Flink Autoscaler Standalone` rescales flink job in-place by rest api of 
+[Externalized Declarative Resource Management](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#externalized-declarative-resource-management).
+`RescaleApiScalingRealizer` is the default implementation of `ScalingRealizer`, 
+it uses the Rescale API to apply parallelism changes.
+
+Kubernetes Operator is well integrated with Autoscaler, we strongly recommend using 
+Kubernetes Operator directly for the kubernetes flink jobs, and only flink jobs in 
+non-kubernetes environments use Autoscaler Standalone.
+
+## How To Use
+
+Currently, `Flink Autoscaler Standalone` only supports a single Flink cluster.
+It can be any type of Flink cluster, includes: 
+
+- Flink Standalone Cluster
+- MiniCluster
+- Flink yarn session cluster
+- Flink yarn application cluster
+- Flink kubernetes session cluster
+- Flink kubernetes application cluster
+- etc
+
+You can start a Flink Streaming job with the following ConfigOptions.
+
+```
+# Enable Adaptvie scheduler to play the in-place rescaling.
+jobmanager.scheduler : adaptive
+
+# Enable autoscale and scaling
+job.autoscaler.enabled : true
+job.autoscaler.scaling.enabled : true
+job.autoscaler.stabilization.interval : 1m
+job.autoscaler.metrics.window : 3m
+```
+
+Note: In-place rescaling is only supported since Flink 1.18. Flink jobs before version 
+1.18 cannot be scaled automatically, but you can view the ScalingReport in Log. 
+ScalingReport will show the recommended parallelism for each vertex.
+
+After the flink job starts, please start the StandaloneAutoscaler process by the 
+following command. 
+
+```
+java -cp flink-autoscaler-standalone-1.7-SNAPSHOT.jar \
+org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint \
+--flinkClusterHost localhost \
+--flinkClusterPort 8081
+```
+
+Updating the `flinkClusterHost` and `flinkClusterPort` based on your flink cluster. 
+In general, the host and port are the same as Flink WebUI.
+
+## Extensibility of autoscaler standalone
+
+Please click [here](../flink-autoscaler/README.md) to check out extensibility of generic autoscaler.
+
+`Autoscaler Standalone` isn't responsible for job management, so it doesn't have job information.
+`Autoscaler Standalone` defines the `JobListFetcher` interface in order to get the 
+`JobAutoScalerContext` of the job. It has a control loop that periodically calls 
+`JobListFetcher#fetch` to fetch the job list and scale these jobs.
+
+Currently `FlinkClusterJobListFetcher` is the only implementation of the `JobListFetcher` 
+interface, that's why `Flink Autoscaler Standalone` only supports a single Flink cluster so far.
+We will implement `YarnJobListFetcher` in the future, `Flink Autoscaler Standalone` will call 
+`YarnJobListFetcher#fetch` to fetch job list from yarn cluster periodically.
diff --git a/flink-autoscaler-standalone/pom.xml b/flink-autoscaler-standalone/pom.xml
new file mode 100644
index 0000000..5e572d1
--- /dev/null
+++ b/flink-autoscaler-standalone/pom.xml
@@ -0,0 +1,207 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-kubernetes-operator-parent</artifactId>
+        <version>1.7-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-autoscaler-standalone</artifactId>
+    <name>Flink Autoscaler Standalone</name>
+    <packaging>jar</packaging>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-autoscaler</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>flink-rpc-akka-loader</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>flink-shaded-zookeeper-3</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>flink-queryable-state-client-java</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.xerial.snappy</groupId>
+                    <artifactId>snappy-java</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>flink-hadoop-fs</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-compress</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-text</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.lz4</groupId>
+                    <artifactId>lz4-java</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>flink-streaming-java</artifactId>
+                    <groupId>org.apache.flink</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>flink-optimizer</artifactId>
+                    <groupId>org.apache.flink</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- Logging -->
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <version>${log4j.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <version>${log4j.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${log4j.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-1.2-api</artifactId>
+            <version>${log4j.version}</version>
+        </dependency>
+
+        <!-- Test -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-autoscaler</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>${assertj.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <!-- Run shade goal on package phase -->
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>org.apache.flink:flink-shaded-force-shading</exclude>
+                                    <exclude>com.google.code.findbugs:jsr305</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <!-- Do not copy the signatures in the META-INF folder.
+                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                        <exclude>META-INF/**/module-info.class</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass>org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint</mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/JobListFetcher.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/JobListFetcher.java
new file mode 100644
index 0000000..4e12dd2
--- /dev/null
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/JobListFetcher.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import java.util.List;
+
+/** The JobListFetcher will fetch the jobContext of all jobs. */
+@Experimental
+public interface JobListFetcher<KEY, Context extends JobAutoScalerContext<KEY>> {
+
+    List<Context> fetch() throws Exception;
+}
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java
new file mode 100644
index 0000000..3336b91
--- /dev/null
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.autoscaler.JobAutoScaler;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.JobAutoScalerImpl;
+import org.apache.flink.autoscaler.RestApiMetricsCollector;
+import org.apache.flink.autoscaler.ScalingExecutor;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.event.LoggingEventHandler;
+import org.apache.flink.autoscaler.standalone.flinkcluster.FlinkClusterJobListFetcher;
+import org.apache.flink.autoscaler.standalone.realizer.RescaleApiScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.util.TimeUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.FLINK_CLIENT_TIMEOUT;
+
+/** The entrypoint of the standalone autoscaler. */
+@Experimental
+public class StandaloneAutoscalerEntrypoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StandaloneAutoscalerEntrypoint.class);
+
+    public static final String SCALING_INTERVAL = "scalingInterval";
+    private static final Duration DEFAULT_SCALING_INTERVAL = Duration.ofSeconds(10);
+
+    // This timeout option is used before the job config is got, such as: listJobs, get
+    // Configuration, etc.
+    public static final String REST_CLIENT_TIMEOUT = "restClientTimeout";
+
+    /** Arguments related to {@link FlinkClusterJobListFetcher}. */
+    public static final String FLINK_CLUSTER_HOST = "flinkClusterHost";
+
+    private static final String DEFAULT_FLINK_CLUSTER_HOST = "localhost";
+
+    public static final String FLINK_CLUSTER_PORT = "flinkClusterPort";
+    private static final int DEFAULT_FLINK_CLUSTER_PORT = 8081;
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void main(String[] args) {
+        var parameters = ParameterTool.fromArgs(args);
+        LOG.info("The standalone autoscaler is started, parameters: {}", parameters.toMap());
+
+        var scalingInterval = DEFAULT_SCALING_INTERVAL;
+        if (parameters.get(SCALING_INTERVAL) != null) {
+            scalingInterval = TimeUtils.parseDuration(parameters.get(SCALING_INTERVAL));
+        }
+
+        var restClientTimeout = FLINK_CLIENT_TIMEOUT.defaultValue();
+        if (parameters.get(REST_CLIENT_TIMEOUT) != null) {
+            restClientTimeout = TimeUtils.parseDuration(parameters.get(REST_CLIENT_TIMEOUT));
+        }
+
+        // Initialize JobListFetcher and JobAutoScaler.
+        var eventHandler = new LoggingEventHandler<KEY, Context>();
+        JobListFetcher<KEY, Context> jobListFetcher =
+                createJobListFetcher(parameters, restClientTimeout);
+        var autoScaler = createJobAutoscaler(eventHandler);
+
+        var autoscalerExecutor =
+                new StandaloneAutoscalerExecutor<>(
+                        scalingInterval, jobListFetcher, eventHandler, autoScaler);
+        autoscalerExecutor.start();
+    }
+
+    private static <KEY, Context extends JobAutoScalerContext<KEY>>
+            JobListFetcher<KEY, Context> createJobListFetcher(
+                    ParameterTool parameters, Duration restClientTimeout) {
+        var host = parameters.get(FLINK_CLUSTER_HOST, DEFAULT_FLINK_CLUSTER_HOST);
+        var port = parameters.getInt(FLINK_CLUSTER_PORT, DEFAULT_FLINK_CLUSTER_PORT);
+        var restServerAddress = String.format("http://%s:%s", host, port);
+
+        return (JobListFetcher<KEY, Context>)
+                new FlinkClusterJobListFetcher(
+                        conf ->
+                                new RestClusterClient<>(
+                                        conf,
+                                        "clusterId",
+                                        (c, e) ->
+                                                new StandaloneClientHAServices(restServerAddress)),
+                        restClientTimeout);
+    }
+
+    private static <KEY, Context extends JobAutoScalerContext<KEY>>
+            JobAutoScaler<KEY, Context> createJobAutoscaler(
+                    AutoScalerEventHandler<KEY, Context> eventHandler) {
+        AutoScalerStateStore<KEY, Context> stateStore = new InMemoryAutoScalerStateStore<>();
+        return new JobAutoScalerImpl<>(
+                new RestApiMetricsCollector<>(),
+                new ScalingMetricEvaluator(),
+                new ScalingExecutor<>(eventHandler, stateStore),
+                eventHandler,
+                new RescaleApiScalingRealizer<>(eventHandler),
+                stateStore);
+    }
+}
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
new file mode 100644
index 0000000..aa39812
--- /dev/null
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.JobAutoScaler;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+
+import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/** The executor of the standalone autoscaler. */
+public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerContext<KEY>>
+        implements Closeable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StandaloneAutoscalerExecutor.class);
+
+    @VisibleForTesting protected static final String AUTOSCALER_ERROR = "AutoscalerError";
+
+    private final Duration scalingInterval;
+    private final JobListFetcher<KEY, Context> jobListFetcher;
+    private final AutoScalerEventHandler<KEY, Context> eventHandler;
+    private final JobAutoScaler<KEY, Context> autoScaler;
+    private final ScheduledExecutorService scheduledExecutorService;
+
+    public StandaloneAutoscalerExecutor(
+            @Nonnull Duration scalingInterval,
+            @Nonnull JobListFetcher<KEY, Context> jobListFetcher,
+            @Nonnull AutoScalerEventHandler<KEY, Context> eventHandler,
+            @Nonnull JobAutoScaler<KEY, Context> autoScaler) {
+        this.scalingInterval = scalingInterval;
+        this.jobListFetcher = jobListFetcher;
+        this.eventHandler = eventHandler;
+        this.autoScaler = autoScaler;
+        this.scheduledExecutorService =
+                Executors.newSingleThreadScheduledExecutor(
+                        new ThreadFactoryBuilder()
+                                .setNameFormat("StandaloneAutoscalerControlLoop")
+                                .setDaemon(false)
+                                .build());
+    }
+
+    public void start() {
+        LOG.info("Schedule control loop.");
+        scheduledExecutorService.scheduleWithFixedDelay(
+                this::scaling, 0, scalingInterval.toMillis(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() {
+        scheduledExecutorService.shutdownNow();
+    }
+
+    @VisibleForTesting
+    protected void scaling() {
+        LOG.info("Standalone autoscaler starts scaling.");
+        try {
+            var jobList = jobListFetcher.fetch();
+            for (var jobContext : jobList) {
+                try {
+                    autoScaler.scale(jobContext);
+                } catch (Throwable e) {
+                    LOG.error("Error while scaling job", e);
+                    eventHandler.handleEvent(
+                            jobContext,
+                            AutoScalerEventHandler.Type.Warning,
+                            AUTOSCALER_ERROR,
+                            e.getMessage(),
+                            null,
+                            null);
+                }
+            }
+        } catch (Throwable e) {
+            LOG.error("Error while fetch job list.", e);
+        }
+    }
+}
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
new file mode 100644
index 0000000..caf4ee4
--- /dev/null
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone.flinkcluster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.standalone.JobListFetcher;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/** Fetch JobAutoScalerContext based on flink cluster. */
+public class FlinkClusterJobListFetcher
+        implements JobListFetcher<JobID, JobAutoScalerContext<JobID>> {
+
+    private final FunctionWithException<Configuration, RestClusterClient<String>, Exception>
+            restClientGetter;
+    private final Duration restClientTimeout;
+
+    public FlinkClusterJobListFetcher(
+            FunctionWithException<Configuration, RestClusterClient<String>, Exception>
+                    restClientGetter,
+            Duration restClientTimeout) {
+        this.restClientGetter = restClientGetter;
+        this.restClientTimeout = restClientTimeout;
+    }
+
+    @Override
+    public List<JobAutoScalerContext<JobID>> fetch() throws Exception {
+        try (var restClusterClient = restClientGetter.apply(new Configuration())) {
+            return restClusterClient.listJobs().get(restClientTimeout.toSeconds(), TimeUnit.SECONDS)
+                    .stream()
+                    .map(
+                            jobStatusMessage -> {
+                                try {
+                                    return generateJobContext(restClusterClient, jobStatusMessage);
+                                } catch (Throwable e) {
+                                    throw new RuntimeException(
+                                            "generateJobContext throw exception", e);
+                                }
+                            })
+                    .collect(Collectors.toList());
+        }
+    }
+
+    private JobAutoScalerContext<JobID> generateJobContext(
+            RestClusterClient<String> restClusterClient, JobStatusMessage jobStatusMessage)
+            throws Exception {
+        var jobId = jobStatusMessage.getJobId();
+        var conf = getConfiguration(restClusterClient, jobId);
+
+        return new JobAutoScalerContext<>(
+                jobId,
+                jobId,
+                jobStatusMessage.getJobState(),
+                conf,
+                new UnregisteredMetricsGroup(),
+                () -> restClientGetter.apply(conf));
+    }
+
+    private Configuration getConfiguration(RestClusterClient<String> restClusterClient, JobID jobId)
+            throws Exception {
+        var jobParameters = new JobMessageParameters();
+        jobParameters.jobPathParameter.resolve(jobId);
+
+        var configurationInfo =
+                restClusterClient
+                        .sendRequest(
+                                JobManagerJobConfigurationHeaders.getInstance(),
+                                jobParameters,
+                                EmptyRequestBody.getInstance())
+                        .get(restClientTimeout.toSeconds(), TimeUnit.SECONDS);
+
+        var conf = new Configuration();
+        configurationInfo.forEach(entry -> conf.setString(entry.getKey(), entry.getValue()));
+        return conf;
+    }
+}
diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
new file mode 100644
index 0000000..d04e6d3
--- /dev/null
+++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone.realizer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
+import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A ScalingRealizer which uses the Rescale API to apply parallelism changes.
+ *
+ * <p>Note: This is based on code copied from the operator, and they don't depend on each other, so
+ * some code is duplicated.
+ */
+public class RescaleApiScalingRealizer<KEY, Context extends JobAutoScalerContext<KEY>>
+        implements ScalingRealizer<KEY, Context> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RescaleApiScalingRealizer.class);
+
+    @VisibleForTesting static final String SCALING = "Scaling";
+
+    private final AutoScalerEventHandler<KEY, Context> eventHandler;
+
+    public RescaleApiScalingRealizer(AutoScalerEventHandler<KEY, Context> eventHandler) {
+        this.eventHandler = eventHandler;
+    }
+
+    @Override
+    public void realize(Context context, Map<String, String> parallelismOverrides) {
+        Configuration conf = context.getConfiguration();
+        if (!conf.get(JobManagerOptions.SCHEDULER)
+                .equals(JobManagerOptions.SchedulerType.Adaptive)) {
+            LOG.warn("In-place rescaling is only available with the adaptive scheduler.");
+            return;
+        }
+
+        var jobID = context.getJobID();
+        if (JobStatus.RUNNING != context.getJobStatus()) {
+            LOG.warn("Job in terminal or reconciling state cannot be scaled in-place.");
+            return;
+        }
+
+        var flinkRestClientTimeout = conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT);
+
+        try (var client = context.getRestClusterClient()) {
+            var requirements =
+                    new HashMap<>(getVertexResources(client, jobID, flinkRestClientTimeout));
+            var parallelismUpdated = false;
+
+            for (Map.Entry<JobVertexID, JobVertexResourceRequirements> entry :
+                    requirements.entrySet()) {
+                var jobVertexId = entry.getKey().toString();
+                var parallelism = entry.getValue().getParallelism();
+                var overrideStr = parallelismOverrides.get(jobVertexId);
+
+                // No overrides for this vertex
+                if (overrideStr == null) {
+                    continue;
+                }
+
+                // We have an override for the vertex
+                var p = Integer.parseInt(overrideStr);
+                var newParallelism = new JobVertexResourceRequirements.Parallelism(1, p);
+                // If the requirements changed we mark this as scaling triggered
+                if (!parallelism.equals(newParallelism)) {
+                    entry.setValue(new JobVertexResourceRequirements(newParallelism));
+                    parallelismUpdated = true;
+                }
+            }
+            if (parallelismUpdated) {
+                updateVertexResources(client, jobID, flinkRestClientTimeout, requirements);
+                eventHandler.handleEvent(
+                        context,
+                        AutoScalerEventHandler.Type.Normal,
+                        SCALING,
+                        String.format(
+                                "In-place scaling triggered, the new requirements is %s.",
+                                requirements),
+                        null,
+                        null);
+            } else {
+                LOG.info("Vertex resources requirements already match target, nothing to do...");
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to apply parallelism overrides.", e);
+        }
+    }
+
+    private Map<JobVertexID, JobVertexResourceRequirements> getVertexResources(
+            RestClusterClient<String> client, JobID jobID, Duration restClientTimeout)
+            throws Exception {
+        var jobParameters = new JobMessageParameters();
+        jobParameters.jobPathParameter.resolve(jobID);
+
+        var currentRequirements =
+                client.sendRequest(
+                                new JobResourceRequirementsHeaders(),
+                                jobParameters,
+                                EmptyRequestBody.getInstance())
+                        .get(restClientTimeout.toSeconds(), TimeUnit.SECONDS);
+
+        return currentRequirements.asJobResourceRequirements().get().getJobVertexParallelisms();
+    }
+
+    private void updateVertexResources(
+            RestClusterClient<String> client,
+            JobID jobID,
+            Duration restClientTimeout,
+            Map<JobVertexID, JobVertexResourceRequirements> newReqs)
+            throws Exception {
+        var jobParameters = new JobMessageParameters();
+        jobParameters.jobPathParameter.resolve(jobID);
+
+        var requestBody = new JobResourceRequirementsBody(new JobResourceRequirements(newReqs));
+
+        client.sendRequest(new JobResourcesRequirementsUpdateHeaders(), jobParameters, requestBody)
+                .get(restClientTimeout.toSeconds(), TimeUnit.SECONDS);
+    }
+}
diff --git a/flink-autoscaler-standalone/src/main/resources/META-INF/NOTICE b/flink-autoscaler-standalone/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..b8a5183
--- /dev/null
+++ b/flink-autoscaler-standalone/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,31 @@
+flink-autoscaler-standalone
+Copyright 2014-2023 The Apache Software Foundation
+
+This project includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+- com.fasterxml.jackson.core:jackson-core:2.15.0
+- com.fasterxml.jackson.core:jackson-databind:2.15.0
+- com.fasterxml.jackson.core:jackson-annotations:2.15.0
+- org.objenesis:objenesis:2.1
+- commons-collections:commons-collections:3.2.2
+- org.apache.commons:commons-math3:3.6.1
+- com.twitter:chill-java:0.7.6
+- commons-io:commons-io:2.11.0
+- org.apache.commons:commons-lang3:3.12.0
+- commons-cli:commons-cli:1.5.0
+- org.javassist:javassist:3.24.0-GA
+- com.google.code.findbugs:jsr305:1.3.9
+- org.slf4j:slf4j-api:1.7.36
+- org.apache.logging.log4j:log4j-slf4j-impl:2.17.1
+- org.apache.logging.log4j:log4j-api:2.17.1
+- org.apache.logging.log4j:log4j-core:2.17.1
+- org.apache.logging.log4j:log4j-1.2-api:2.17.1
+
+This project bundles the following dependencies under the BSD License.
+See bundled license files for details.
+
+- com.esotericsoftware.kryo:kryo:2.24.0
+- com.esotericsoftware.minlog:minlog:1.2
diff --git a/flink-autoscaler-standalone/src/main/resources/log4j2.properties b/flink-autoscaler-standalone/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..4a6ab83
--- /dev/null
+++ b/flink-autoscaler-standalone/src/main/resources/log4j2.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+# Log all infos to the console
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level] [%X{resource.namespace}.%X{resource.name}] %msg%n%throwable}
+
diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
new file mode 100644
index 0000000..464d8ec
--- /dev/null
+++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.JobAutoScaler;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.event.TestingEventCollector;
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+/** Test for {@link StandaloneAutoscalerExecutor}. */
+class StandaloneAutoscalerExecutorTest {
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testScaling(boolean throwExceptionWhileScale) {
+        JobAutoScalerContext<JobID> jobContext1 = createJobAutoScalerContext();
+        JobAutoScalerContext<JobID> jobContext2 = createJobAutoScalerContext();
+        var jobList = List.of(jobContext1, jobContext2);
+        Set<JobID> exceptionKeys =
+                throwExceptionWhileScale
+                        ? Set.of(jobContext1.getJobKey(), jobContext2.getJobKey())
+                        : Set.of();
+
+        var actualScaleContexts = new ArrayList<JobAutoScalerContext<JobID>>();
+
+        var eventCollector = new TestingEventCollector<JobID, JobAutoScalerContext<JobID>>();
+        var autoscalerExecutor =
+                new StandaloneAutoscalerExecutor<>(
+                        Duration.ofSeconds(2),
+                        () -> jobList,
+                        eventCollector,
+                        createJobAutoScaler(actualScaleContexts, exceptionKeys));
+
+        autoscalerExecutor.scaling();
+        assertThat(actualScaleContexts).isEqualTo(jobList);
+        assertThat(eventCollector.events)
+                .hasSameSizeAs(exceptionKeys)
+                .allMatch(
+                        event ->
+                                event.getReason()
+                                        .equals(StandaloneAutoscalerExecutor.AUTOSCALER_ERROR));
+    }
+
+    @Test
+    void testFetchException() {
+        var eventCollector = new TestingEventCollector<JobID, JobAutoScalerContext<JobID>>();
+        var autoscalerExecutor =
+                new StandaloneAutoscalerExecutor<>(
+                        Duration.ofSeconds(2),
+                        () -> {
+                            throw new RuntimeException("Excepted exception.");
+                        },
+                        eventCollector,
+                        new JobAutoScaler<>() {
+                            @Override
+                            public void scale(JobAutoScalerContext<JobID> context) {
+                                fail("Should be called.");
+                            }
+
+                            @Override
+                            public void cleanup(JobID jobID) {
+                                fail("Should be called.");
+                            }
+                        });
+
+        // scaling shouldn't throw exception even if fetch fails
+        assertDoesNotThrow(autoscalerExecutor::scaling);
+    }
+
+    private static JobAutoScalerContext<JobID> createJobAutoScalerContext() {
+        var jobID = new JobID();
+        return new JobAutoScalerContext<>(
+                jobID, jobID, JobStatus.RUNNING, new Configuration(), null, null);
+    }
+
+    private static JobAutoScaler<JobID, JobAutoScalerContext<JobID>> createJobAutoScaler(
+            List<JobAutoScalerContext<JobID>> actualScaleContexts, Set<JobID> exceptionKeys) {
+        return new JobAutoScaler<>() {
+            @Override
+            public void scale(JobAutoScalerContext<JobID> context) {
+                actualScaleContexts.add(context);
+                if (exceptionKeys.contains(context.getJobKey())) {
+                    throw new RuntimeException("Excepted exception.");
+                }
+            }
+
+            @Override
+            public void cleanup(JobID jobKey) {
+                fail("Should be called.");
+            }
+        };
+    }
+}
diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcherTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcherTest.java
new file mode 100644
index 0000000..658eb58
--- /dev/null
+++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcherTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone.flinkcluster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Test for {@link FlinkClusterJobListFetcher}. */
+class FlinkClusterJobListFetcherTest {
+
+    /** Test whether the job list and confs are expected. */
+    @Test
+    void testFetchJobListAndConfigurationInfo() throws Exception {
+        var job1 =
+                new JobStatusMessage(
+                        new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
+        var job2 =
+                new JobStatusMessage(
+                        new JobID(), "", JobStatus.CANCELLING, Instant.now().toEpochMilli());
+
+        Configuration expectedConf1 = new Configuration();
+        expectedConf1.setString("option_key1", "option_value1");
+
+        Configuration expectedConf2 = new Configuration();
+        expectedConf2.setString("option_key2", "option_value2");
+        expectedConf2.setString("option_key3", "option_value3");
+
+        var jobs = Map.of(job1.getJobId(), job1, job2.getJobId(), job2);
+        var configurations = Map.of(job1.getJobId(), expectedConf1, job2.getJobId(), expectedConf2);
+        var closeCounter = new AtomicLong();
+        FlinkClusterJobListFetcher jobListFetcher =
+                new FlinkClusterJobListFetcher(
+                        getRestClusterClient(
+                                Either.Left(List.of(job1, job2)),
+                                Either.Left(
+                                        Map.of(
+                                                job1.getJobId(),
+                                                ConfigurationInfo.from(expectedConf1),
+                                                job2.getJobId(),
+                                                ConfigurationInfo.from(expectedConf2))),
+                                closeCounter),
+                        Duration.ofSeconds(10));
+
+        // Fetch multiple times and check whether the results are as expected each time
+        for (int i = 1; i <= 3; i++) {
+            var fetchedJobList = jobListFetcher.fetch();
+            // Check whether rest client is closed.
+            assertThat(closeCounter).hasValue(i);
+
+            assertThat(fetchedJobList).hasSize(2);
+            for (var jobContext : fetchedJobList) {
+                JobStatusMessage expectedJobStatusMessage = jobs.get(jobContext.getJobID());
+                Configuration expectedConf = configurations.get(jobContext.getJobID());
+                assertThat(expectedJobStatusMessage).isNotNull();
+                assertThat(jobContext.getJobStatus())
+                        .isEqualTo(expectedJobStatusMessage.getJobState());
+                assertThat(jobContext.getConfiguration()).isNotNull().isEqualTo(expectedConf);
+            }
+        }
+    }
+
+    /**
+     * Test whether the exception is expected after rest client fetches job list throws exception,
+     * and restClient can be closed normally.
+     */
+    @Test
+    void testFetchJobListException() {
+        var expectedException = new RuntimeException("Expected exception.");
+        var closeCounter = new AtomicLong();
+
+        FlinkClusterJobListFetcher jobListFetcher =
+                new FlinkClusterJobListFetcher(
+                        getRestClusterClient(
+                                Either.Right(expectedException),
+                                Either.Left(Map.of()),
+                                closeCounter),
+                        Duration.ofSeconds(10));
+        assertThatThrownBy(jobListFetcher::fetch).getCause().isEqualTo(expectedException);
+        assertThat(closeCounter).hasValue(1);
+    }
+
+    /**
+     * Test whether the exception is expected after rest client fetches conf throws exception, and
+     * restClient can be closed normally.
+     */
+    @Test
+    void testFetchConfigurationException() {
+        var job1 =
+                new JobStatusMessage(
+                        new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
+        var expectedException = new RuntimeException("Expected exception.");
+        var closeCounter = new AtomicLong();
+
+        FlinkClusterJobListFetcher jobListFetcher =
+                new FlinkClusterJobListFetcher(
+                        getRestClusterClient(
+                                Either.Left(List.of(job1)),
+                                Either.Right(expectedException),
+                                closeCounter),
+                        Duration.ofSeconds(10));
+
+        assertThatThrownBy(jobListFetcher::fetch).getRootCause().isEqualTo(expectedException);
+        assertThat(closeCounter).hasValue(1);
+    }
+
+    /**
+     * Test whether the exception is expected after rest client fetches job list timeout, and
+     * restClient can be closed normally.
+     */
+    @Test
+    void testFetchJobListTimeout() {
+        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+        FlinkClusterJobListFetcher jobListFetcher =
+                new FlinkClusterJobListFetcher(
+                        getTimeoutableRestClusterClient(null, null, closeFuture),
+                        Duration.ofSeconds(2));
+
+        assertThat(closeFuture).isNotDone();
+        assertThatThrownBy(jobListFetcher::fetch).isInstanceOf(TimeoutException.class);
+        assertThat(closeFuture).isDone();
+    }
+
+    /**
+     * Test whether the exception is expected after rest client fetches conf timeout, and restClient
+     * can be closed normally.
+     */
+    @Test
+    void testFetchConfigurationTimeout() {
+        var job1 =
+                new JobStatusMessage(
+                        new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
+        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+
+        FlinkClusterJobListFetcher jobListFetcher =
+                new FlinkClusterJobListFetcher(
+                        getTimeoutableRestClusterClient(List.of(job1), null, closeFuture),
+                        Duration.ofSeconds(2));
+
+        assertThat(closeFuture).isNotDone();
+        assertThatThrownBy(jobListFetcher::fetch)
+                .getRootCause()
+                .isInstanceOf(TimeoutException.class);
+        assertThat(closeFuture).isDone();
+    }
+
+    /**
+     * @param jobListOrException When listJobs is called, return jobList if Either is left, return
+     *     failedFuture if Either is right.
+     * @param configurationsOrException When fetch job conf, return configuration if Either is left,
+     *     return failedFuture if Either is right.
+     * @param closeCounter Increment the count each time the {@link RestClusterClient#close} is
+     *     called
+     */
+    private static FunctionWithException<Configuration, RestClusterClient<String>, Exception>
+            getRestClusterClient(
+                    Either<Collection<JobStatusMessage>, Throwable> jobListOrException,
+                    Either<Map<JobID, ConfigurationInfo>, Throwable> configurationsOrException,
+                    AtomicLong closeCounter) {
+        return conf ->
+                new RestClusterClient<>(
+                        conf,
+                        "test-cluster",
+                        (c, e) -> new StandaloneClientHAServices("localhost")) {
+
+                    @Override
+                    public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
+                        if (jobListOrException.isLeft()) {
+                            return CompletableFuture.completedFuture(jobListOrException.left());
+                        }
+                        return CompletableFuture.failedFuture(jobListOrException.right());
+                    }
+
+                    @Override
+                    public <
+                                    M extends MessageHeaders<R, P, U>,
+                                    U extends MessageParameters,
+                                    R extends RequestBody,
+                                    P extends ResponseBody>
+                            CompletableFuture<P> sendRequest(M h, U p, R r) {
+                        if (h instanceof JobManagerJobConfigurationHeaders) {
+                            if (configurationsOrException.isRight()) {
+                                return CompletableFuture.failedFuture(
+                                        configurationsOrException.right());
+                            }
+                            var jobID = ((JobMessageParameters) p).jobPathParameter.getValue();
+                            return (CompletableFuture<P>)
+                                    CompletableFuture.completedFuture(
+                                            configurationsOrException.left().get(jobID));
+                        }
+                        fail("Unknown request");
+                        return null;
+                    }
+
+                    @Override
+                    public void close() {
+                        super.close();
+                        closeCounter.incrementAndGet();
+                    }
+                };
+    }
+
+    /**
+     * @param jobList When listJobs is called, return jobList if it's not null, don't complete
+     *     future if it's null.
+     * @param configuration When fetch job conf, return configuration if it's not null, don't
+     *     complete future if it's null.
+     * @param closeFuture Complete this closeFuture when {@link RestClusterClient#close} is called.
+     */
+    private static FunctionWithException<Configuration, RestClusterClient<String>, Exception>
+            getTimeoutableRestClusterClient(
+                    @Nullable Collection<JobStatusMessage> jobList,
+                    @Nullable ConfigurationInfo configuration,
+                    CompletableFuture<Void> closeFuture) {
+        return conf ->
+                new RestClusterClient<>(
+                        conf,
+                        "test-cluster",
+                        (c, e) -> new StandaloneClientHAServices("localhost")) {
+
+                    @Override
+                    public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
+                        if (jobList == null) {
+                            return new CompletableFuture<>();
+                        }
+                        return CompletableFuture.completedFuture(jobList);
+                    }
+
+                    @Override
+                    public <
+                                    M extends MessageHeaders<R, P, U>,
+                                    U extends MessageParameters,
+                                    R extends RequestBody,
+                                    P extends ResponseBody>
+                            CompletableFuture<P> sendRequest(M h, U p, R r) {
+                        if (h instanceof JobManagerJobConfigurationHeaders) {
+                            if (configuration == null) {
+                                return new CompletableFuture<>();
+                            }
+                            return (CompletableFuture<P>)
+                                    CompletableFuture.completedFuture(configuration);
+                        }
+                        fail("Unknown request");
+                        return null;
+                    }
+
+                    @Override
+                    public void close() {
+                        super.close();
+                        closeFuture.complete(null);
+                    }
+                };
+    }
+}
diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
new file mode 100644
index 0000000..9205bac
--- /dev/null
+++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone.realizer;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.event.TestingEventCollector;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
+import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.autoscaler.standalone.realizer.RescaleApiScalingRealizer.SCALING;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Test for {@link RescaleApiScalingRealizer}. */
+class RescaleApiScalingRealizerTest {
+
+    /**
+     * Test whether scalingRealizer behaves as expected when the resource is changed or isn't
+     * changed.
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testUpdateResourceRequirements(boolean resourceIsChanged) {
+        var jobID = new JobID();
+        var jobVertex1 = new JobVertexID().toHexString();
+        var jobVertex2 = new JobVertexID().toHexString();
+        var currentResourceRequirements = Map.of(jobVertex1, "5", jobVertex2, "10");
+
+        var newResourceRequirements = currentResourceRequirements;
+        if (resourceIsChanged) {
+            newResourceRequirements = Map.of(jobVertex1, "7", jobVertex2, "12");
+        }
+
+        var closeFuture = new CompletableFuture<Void>();
+        var updatedRequirements = new CompletableFuture<Optional<JobResourceRequirements>>();
+
+        var jobContext =
+                createJobAutoScalerContext(
+                        jobID,
+                        getRestClusterClient(
+                                jobID,
+                                createResourceRequirements(currentResourceRequirements),
+                                updatedRequirements,
+                                closeFuture));
+        jobContext
+                .getConfiguration()
+                .set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
+
+        var eventCollector = new TestingEventCollector<JobID, JobAutoScalerContext<JobID>>();
+        RescaleApiScalingRealizer<JobID, JobAutoScalerContext<JobID>> scalingRealizer =
+                new RescaleApiScalingRealizer<>(eventCollector);
+
+        assertThat(updatedRequirements).isNotDone();
+        assertThat(closeFuture).isNotDone();
+        scalingRealizer.realize(jobContext, newResourceRequirements);
+
+        // The ResourceRequirements should be updated when the newResourceRequirements is changed.
+        if (resourceIsChanged) {
+            assertThat(updatedRequirements)
+                    .isCompletedWithValue(
+                            Optional.of(createResourceRequirements(newResourceRequirements)));
+            assertThat(eventCollector.events).hasSize(1);
+            var event = eventCollector.events.getFirst();
+            assertThat(event.getContext()).isEqualTo(jobContext);
+            assertThat(event.getReason()).isEqualTo(SCALING);
+        } else {
+            assertThat(updatedRequirements).isNotDone();
+            assertThat(eventCollector.events).isEmpty();
+        }
+        assertThat(closeFuture).isDone();
+    }
+
+    @Test
+    void testDisableAdaptiveScheduler() {
+        var jobID = new JobID();
+        var jobVertex1 = new JobVertexID().toHexString();
+        var jobVertex2 = new JobVertexID().toHexString();
+        var resourceRequirements = Map.of(jobVertex1, "5", jobVertex2, "10");
+
+        var jobContext =
+                createJobAutoScalerContext(
+                        jobID,
+                        () ->
+                                fail(
+                                        "The rest client shouldn't be created if the adaptive scheduler is disable."));
+
+        var eventCollector = new TestingEventCollector<JobID, JobAutoScalerContext<JobID>>();
+        RescaleApiScalingRealizer<JobID, JobAutoScalerContext<JobID>> scalingRealizer =
+                new RescaleApiScalingRealizer<>(eventCollector);
+
+        scalingRealizer.realize(jobContext, resourceRequirements);
+        assertThat(eventCollector.events).isEmpty();
+    }
+
+    @Test
+    void testJobNotRunning() {
+        var jobID = new JobID();
+        var jobVertex1 = new JobVertexID().toHexString();
+        var jobVertex2 = new JobVertexID().toHexString();
+        var resourceRequirements = Map.of(jobVertex1, "5", jobVertex2, "10");
+
+        var conf = new Configuration();
+        conf.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
+
+        var jobContext =
+                new JobAutoScalerContext<>(
+                        jobID,
+                        jobID,
+                        JobStatus.CANCELLING,
+                        conf,
+                        null,
+                        () ->
+                                fail(
+                                        "The rest client shouldn't be created if the job isn't running."));
+
+        var eventCollector = new TestingEventCollector<JobID, JobAutoScalerContext<JobID>>();
+        RescaleApiScalingRealizer<JobID, JobAutoScalerContext<JobID>> scalingRealizer =
+                new RescaleApiScalingRealizer<>(eventCollector);
+
+        scalingRealizer.realize(jobContext, resourceRequirements);
+        assertThat(eventCollector.events).isEmpty();
+    }
+
+    private static JobAutoScalerContext<JobID> createJobAutoScalerContext(
+            JobID jobID,
+            SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier) {
+        return new JobAutoScalerContext<>(
+                jobID, jobID, JobStatus.RUNNING, new Configuration(), null, restClientSupplier);
+    }
+
+    private JobResourceRequirements createResourceRequirements(
+            Map<String, String> parallelismOverrides) {
+        Map<JobVertexID, JobVertexResourceRequirements> vertexResources = new HashMap<>();
+        parallelismOverrides.forEach(
+                (key, value) ->
+                        vertexResources.put(
+                                JobVertexID.fromHexString(key),
+                                new JobVertexResourceRequirements(
+                                        new JobVertexResourceRequirements.Parallelism(
+                                                1, Integer.parseInt(value)))));
+        return new JobResourceRequirements(vertexResources);
+    }
+
+    private static SupplierWithException<RestClusterClient<String>, Exception> getRestClusterClient(
+            JobID expectedJobID,
+            JobResourceRequirements currentRequirements,
+            CompletableFuture<Optional<JobResourceRequirements>> updatedRequirements,
+            CompletableFuture<Void> closeFuture) {
+
+        return () ->
+                new RestClusterClient<>(
+                        new Configuration(),
+                        "test-cluster",
+                        (c, e) -> new StandaloneClientHAServices("localhost")) {
+
+                    @Override
+                    public <
+                                    M extends MessageHeaders<R, P, U>,
+                                    U extends MessageParameters,
+                                    R extends RequestBody,
+                                    P extends ResponseBody>
+                            CompletableFuture<P> sendRequest(M h, U p, R r) {
+                        if (h instanceof JobResourceRequirementsHeaders) {
+                            if (expectedJobID.equals(
+                                    ((JobMessageParameters) p).jobPathParameter.getValue())) {
+                                return (CompletableFuture<P>)
+                                        CompletableFuture.completedFuture(
+                                                new JobResourceRequirementsBody(
+                                                        currentRequirements));
+                            }
+                        } else if (r instanceof JobResourceRequirementsBody) {
+                            if (expectedJobID.equals(
+                                    ((JobMessageParameters) p).jobPathParameter.getValue())) {
+                                updatedRequirements.complete(
+                                        ((JobResourceRequirementsBody) r)
+                                                .asJobResourceRequirements());
+                                return CompletableFuture.completedFuture(null);
+                            }
+                        }
+                        fail("Unknown request");
+                        return null;
+                    }
+
+                    @Override
+                    public void close() {
+                        super.close();
+                        closeFuture.complete(null);
+                    }
+                };
+    }
+}
diff --git a/pom.xml b/pom.xml
index bc3ba1d..ef3a8a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,6 +57,7 @@
         <module>flink-kubernetes-webhook</module>
         <module>flink-kubernetes-docs</module>
         <module>flink-autoscaler</module>
+        <module>flink-autoscaler-standalone</module>
         <module>examples/flink-sql-runner-example</module>
         <module>examples/flink-beam-example</module>
         <module>examples/kubernetes-client-examples</module>