[FLINK-30776] Move autoscaler code into a separate Maven module and make it pluggable (#529)

The autoscaling code is tightly coupled with the main operator code. In this change, we move the code into a
dedicated Maven module and make it pluggable.
diff --git a/Dockerfile b/Dockerfile
index c4a2c77..98fdae7 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -23,10 +23,11 @@
 
 COPY . .
 
-RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl flink-kubernetes-standalone,flink-kubernetes-operator-api,flink-kubernetes-operator,flink-kubernetes-webhook -DskipTests=$SKIP_TESTS
+RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl flink-kubernetes-standalone,flink-kubernetes-operator-api,flink-kubernetes-operator,flink-kubernetes-operator-autoscaler,flink-kubernetes-webhook -DskipTests=$SKIP_TESTS
 
 RUN cd /app/tools/license; mkdir jars; cd jars; \
     cp /app/flink-kubernetes-operator/target/flink-kubernetes-operator-*-shaded.jar . && \
+    cp /app/flink-kubernetes-operator-autoscaler/target/flink-kubernetes-operator-autoscaler-*.jar . && \
     cp /app/flink-kubernetes-webhook/target/flink-kubernetes-webhook-*-shaded.jar . && \
     cp /app/flink-kubernetes-standalone/target/flink-kubernetes-standalone-*-shaded.jar . && \
     cp -r /app/flink-kubernetes-operator/target/plugins ./plugins && \
@@ -35,8 +36,10 @@
 # stage
 FROM eclipse-temurin:11-jre-jammy
 ENV FLINK_HOME=/opt/flink
+ENV FLINK_PLUGINS_DIR=/opt/flink/plugins
 ENV OPERATOR_VERSION=1.4-SNAPSHOT
 ENV OPERATOR_JAR=flink-kubernetes-operator-$OPERATOR_VERSION-shaded.jar
+ENV AUTOSCALER_JAR=flink-kubernetes-operator-autoscaler-$OPERATOR_VERSION.jar
 ENV WEBHOOK_JAR=flink-kubernetes-webhook-$OPERATOR_VERSION-shaded.jar
 ENV FLINK_KUBERNETES_SHADED_JAR=flink-kubernetes-standalone-$OPERATOR_VERSION-shaded.jar
 
@@ -45,6 +48,7 @@
     useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=flink flink
 
 COPY --from=build /app/flink-kubernetes-operator/target/$OPERATOR_JAR .
+COPY --from=build /app/flink-kubernetes-operator-autoscaler/target/$AUTOSCALER_JAR .
 COPY --from=build /app/flink-kubernetes-webhook/target/$WEBHOOK_JAR .
 COPY --from=build /app/flink-kubernetes-standalone/target/$FLINK_KUBERNETES_SHADED_JAR .
 COPY --from=build /app/flink-kubernetes-operator/target/plugins $FLINK_HOME/plugins
@@ -54,6 +58,7 @@
 
 RUN chown -R flink:flink $FLINK_HOME && \
     chown flink:flink $OPERATOR_JAR && \
+    chown flink:flink $AUTOSCALER_JAR && \
     chown flink:flink $WEBHOOK_JAR && \
     chown flink:flink $FLINK_KUBERNETES_SHADED_JAR && \
     chown flink:flink /docker-entrypoint.sh
diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh
index 36581cd..756b1d0 100755
--- a/docker-entrypoint.sh
+++ b/docker-entrypoint.sh
@@ -29,7 +29,7 @@
 elif [ "$1" = "operator" ]; then
     echo "Starting Operator"
 
-    exec java -cp ./$FLINK_KUBERNETES_SHADED_JAR:./$OPERATOR_JAR $LOG_CONFIG $JVM_ARGS org.apache.flink.kubernetes.operator.FlinkOperator
+    exec java -cp ./$FLINK_KUBERNETES_SHADED_JAR:./$OPERATOR_JAR:./$AUTOSCALER_JAR $LOG_CONFIG $JVM_ARGS org.apache.flink.kubernetes.operator.FlinkOperator
 elif [ "$1" = "webhook" ]; then
     echo "Starting Webhook"
 
diff --git a/flink-kubernetes-docs/pom.xml b/flink-kubernetes-docs/pom.xml
index 9822d52..9dd387b 100644
--- a/flink-kubernetes-docs/pom.xml
+++ b/flink-kubernetes-docs/pom.xml
@@ -43,6 +43,13 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-kubernetes-operator-autoscaler</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- Test -->
         <dependency>
             <groupId>org.junit.jupiter</groupId>
diff --git a/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java b/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java
index 0f302dc..2fb1e6a 100644
--- a/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java
@@ -77,7 +77,7 @@
                         "flink-kubernetes-operator",
                         "org.apache.flink.kubernetes.operator.metrics"),
                 new OptionsClassLocation(
-                        "flink-kubernetes-operator",
+                        "flink-kubernetes-operator-autoscaler",
                         "org.apache.flink.kubernetes.operator.autoscaler.config")
             };
     static final String DEFAULT_PATH_PREFIX = "src/main/java";
diff --git a/flink-kubernetes-operator-autoscaler/pom.xml b/flink-kubernetes-operator-autoscaler/pom.xml
new file mode 100644
index 0000000..7263787
--- /dev/null
+++ b/flink-kubernetes-operator-autoscaler/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-kubernetes-operator-parent</artifactId>
+        <version>1.4-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-kubernetes-operator-autoscaler</artifactId>
+    <name>Flink Kubernetes Operator Autoscaler</name>
+    <packaging>jar</packaging>
+
+    <properties>
+        <auto-service.version>1.0.1</auto-service.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-kubernetes-operator</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-kubernetes</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.auto.service</groupId>
+            <artifactId>auto-service</artifactId>
+            <version>${auto-service.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>${lombok.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <version>${junit.jupiter.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-kubernetes-operator</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-kubernetes-operator-api</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>io.fabric8</groupId>
+            <artifactId>kubernetes-server-mock</artifactId>
+            <version>${fabric8.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
similarity index 98%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
index bbfd9ab..bff2228 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
@@ -49,7 +49,7 @@
 /** Class for encapsulating information stored for each resource when using the autoscaler. */
 public class AutoScalerInfo {
 
-    private static final Logger LOG = LoggerFactory.getLogger(JobAutoScaler.class);
+    private static final Logger LOG = LoggerFactory.getLogger(AutoScalerInfo.class);
 
     private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
similarity index 92%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index 4f74a52..ee4e75a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -23,7 +23,7 @@
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
-import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -40,9 +40,9 @@
 import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
 
 /** Application and SessionJob autoscaler. */
-public class JobAutoScaler implements Cleanup {
+public class JobAutoScalerImpl implements JobAutoScaler {
 
-    private static final Logger LOG = LoggerFactory.getLogger(JobAutoScaler.class);
+    private static final Logger LOG = LoggerFactory.getLogger(JobAutoScalerImpl.class);
 
     private final KubernetesClient kubernetesClient;
     private final ScalingMetricCollector metricsCollector;
@@ -53,7 +53,7 @@
             lastEvaluatedMetrics = new ConcurrentHashMap<>();
     private final Map<ResourceID, Set<JobVertexID>> registeredMetrics = new ConcurrentHashMap<>();
 
-    public JobAutoScaler(
+    public JobAutoScalerImpl(
             KubernetesClient kubernetesClient,
             ScalingMetricCollector metricsCollector,
             ScalingMetricEvaluator evaluator,
@@ -70,13 +70,13 @@
     public void cleanup(AbstractFlinkResource<?, ?> cr) {
         LOG.info("Cleaning up autoscaling meta data");
         metricsCollector.cleanup(cr);
-        scalingExecutor.cleanup(cr);
         var resourceId = ResourceID.fromResource(cr);
         lastEvaluatedMetrics.remove(resourceId);
         registeredMetrics.remove(resourceId);
     }
 
-    public boolean scale(FlinkResourceContext<?> ctx) {
+    @Override
+    public boolean scale(FlinkResourceContext<? extends AbstractFlinkResource<?, ?>> ctx) {
 
         var conf = ctx.getObserveConfig();
         var resource = ctx.getResource();
@@ -172,13 +172,4 @@
                                     });
                         });
     }
-
-    public static JobAutoScaler create(
-            KubernetesClient kubernetesClient, EventRecorder eventRecorder) {
-        return new JobAutoScaler(
-                kubernetesClient,
-                new RestApiMetricsCollector(),
-                new ScalingMetricEvaluator(),
-                new ScalingExecutor(kubernetesClient, eventRecorder));
-    }
 }
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java
new file mode 100644
index 0000000..128381d
--- /dev/null
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+
+import com.google.auto.service.AutoService;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+/**
+ * Factory for loading JobAutoScalerImpl included in this module. This class will be dynamically
+ * instantiated by the main operator module.
+ */
+@AutoService(JobAutoScalerFactory.class)
+public class JobAutoscalerFactoryImpl implements JobAutoScalerFactory {
+    @Override
+    public JobAutoScaler create(KubernetesClient kubernetesClient, EventRecorder eventRecorder) {
+        return new JobAutoScalerImpl(
+                kubernetesClient,
+                new RestApiMetricsCollector(),
+                new ScalingMetricEvaluator(),
+                new ScalingExecutor(kubernetesClient, eventRecorder));
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
similarity index 98%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
index bda9dfe..a1f5b55 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -51,7 +51,7 @@
 import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 
 /** Class responsible for executing scaling decisions. */
-public class ScalingExecutor implements Cleanup {
+public class ScalingExecutor {
 
     public static final ConfigOption<Map<String, String>> PARALLELISM_OVERRIDES =
             ConfigOptions.key("pipeline.jobvertex-parallelism-overrides")
@@ -270,9 +270,4 @@
         this.clock = Preconditions.checkNotNull(clock);
         jobVertexScaler.setClock(clock);
     }
-
-    @Override
-    public void cleanup(AbstractFlinkResource<?, ?> cr) {
-        // No cleanup is currently necessary
-    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
similarity index 99%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index 6b33d4b..0fd8c49 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -64,7 +64,7 @@
 import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SOURCE_SCALING_ENABLED;
 
 /** Metric collector using flink rest api. */
-public abstract class ScalingMetricCollector implements Cleanup {
+public abstract class ScalingMetricCollector {
     private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricCollector.class);
 
     private final Map<ResourceID, Tuple2<Long, Map<JobVertexID, Map<String, FlinkMetric>>>>
@@ -434,7 +434,6 @@
                     Configuration conf,
                     Map<JobVertexID, Map<String, FlinkMetric>> filteredVertexMetricNames);
 
-    @Override
     public void cleanup(AbstractFlinkResource<?, ?> cr) {
         var resourceId = ResourceID.fromResource(cr);
         histories.remove(resourceId);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
similarity index 100%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
rename to flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
similarity index 98%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
rename to flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index b0e48dd..26a1c61 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -66,7 +66,7 @@
     private FlinkDeployment app;
     private JobVertexID source1, sink;
 
-    private JobAutoScaler autoscaler;
+    private JobAutoScalerImpl autoscaler;
 
     @BeforeEach
     public void setup() {
@@ -107,7 +107,8 @@
         app.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
 
         autoscaler =
-                new JobAutoScaler(kubernetesClient, metricsCollector, evaluator, scalingExecutor);
+                new JobAutoScalerImpl(
+                        kubernetesClient, metricsCollector, evaluator, scalingExecutor);
     }
 
     @Test
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
similarity index 100%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
rename to flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
similarity index 100%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
rename to flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
similarity index 99%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
rename to flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index b42137d..e87167e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -197,7 +197,6 @@
         assertNotNull(metricsCollector.getTopologies().get(resourceID));
 
         metricsCollector.cleanup(app);
-        scalingExecutor.cleanup(app);
         assertNull(metricsCollector.getHistories().get(resourceID));
         assertNull(metricsCollector.getAvailableVertexMetricNames().get(resourceID));
         assertNull(metricsCollector.getTopologies().get(resourceID));
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
similarity index 100%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
rename to flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
similarity index 100%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
rename to flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
similarity index 100%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
rename to flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
similarity index 100%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
rename to flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
diff --git a/flink-kubernetes-operator/pom.xml b/flink-kubernetes-operator/pom.xml
index 7029e40..eb196ca 100644
--- a/flink-kubernetes-operator/pom.xml
+++ b/flink-kubernetes-operator/pom.xml
@@ -324,6 +324,18 @@
                 </executions>
             </plugin>
             <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>3.3.0</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
                 <!-- Description: https://github.com/git-commit-id/git-commit-id-maven-plugin
                     Used to show the git ref when starting the Flink Kubernetes operator. -->
                 <groupId>io.github.git-commit-id</groupId>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 08bcb65..154a17d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -39,6 +39,7 @@
 import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
 import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.AutoscalerLoader;
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
@@ -154,8 +155,10 @@
                 MetricManager.createFlinkDeploymentMetricManager(configManager, metricGroup);
         var statusRecorder = StatusRecorder.create(client, metricManager, listeners);
         var eventRecorder = EventRecorder.create(client, listeners);
+        var autoscalerFactory = AutoscalerLoader.loadJobAutoscalerFactory();
         var reconcilerFactory =
-                new ReconcilerFactory(client, configManager, eventRecorder, statusRecorder);
+                new ReconcilerFactory(
+                        client, configManager, eventRecorder, statusRecorder, autoscalerFactory);
         var observerFactory = new FlinkDeploymentObserverFactory(configManager, eventRecorder);
 
         var controller =
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index afd2a9d..540884b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -33,7 +33,6 @@
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.api.status.Savepoint;
 import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
-import org.apache.flink.kubernetes.operator.autoscaler.JobAutoScaler;
 import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
@@ -86,11 +85,12 @@
     public AbstractFlinkResourceReconciler(
             KubernetesClient kubernetesClient,
             EventRecorder eventRecorder,
-            StatusRecorder<CR, STATUS> statusRecorder) {
+            StatusRecorder<CR, STATUS> statusRecorder,
+            JobAutoScalerFactory autoscalerFactory) {
         this.kubernetesClient = kubernetesClient;
         this.eventRecorder = eventRecorder;
         this.statusRecorder = statusRecorder;
-        this.resourceScaler = JobAutoScaler.create(kubernetesClient, eventRecorder);
+        this.resourceScaler = autoscalerFactory.create(kubernetesClient, eventRecorder);
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index b9e2fa6..445fcb4 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -55,8 +55,9 @@
     public AbstractJobReconciler(
             KubernetesClient kubernetesClient,
             EventRecorder eventRecorder,
-            StatusRecorder<CR, STATUS> statusRecorder) {
-        super(kubernetesClient, eventRecorder, statusRecorder);
+            StatusRecorder<CR, STATUS> statusRecorder,
+            JobAutoScalerFactory autoscalerFactory) {
+        super(kubernetesClient, eventRecorder, statusRecorder, autoscalerFactory);
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index c941873..6739667 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -66,8 +66,9 @@
     public ApplicationReconciler(
             KubernetesClient kubernetesClient,
             EventRecorder eventRecorder,
-            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder) {
-        super(kubernetesClient, eventRecorder, statusRecorder);
+            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder,
+            JobAutoScalerFactory autoscalerFactory) {
+        super(kubernetesClient, eventRecorder, statusRecorder, autoscalerFactory);
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java
similarity index 66%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java
index ae1b98b..58db4bc 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java
@@ -15,16 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 
-/** Cleanup interface for autoscaling related metadata. */
-public interface Cleanup {
-    /**
-     * Method is called when a custom resource is deleted.
-     *
-     * @param cr custom resource
-     */
+/** Per-job Autoscaler instance. */
+public interface JobAutoScaler {
+
+    /** Called as part of the reconciliation loop. Returns true if this call led to scaling. */
+    boolean scale(FlinkResourceContext<? extends AbstractFlinkResource<?, ?>> ctx);
+
+    /** Called when the custom resource is deleted. */
     void cleanup(AbstractFlinkResource<?, ?> cr);
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScalerFactory.java
similarity index 67%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScalerFactory.java
index ae1b98b..927657a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScalerFactory.java
@@ -15,16 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
-import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 
-/** Cleanup interface for autoscaling related metadata. */
-public interface Cleanup {
-    /**
-     * Method is called when a custom resource is deleted.
-     *
-     * @param cr custom resource
-     */
-    void cleanup(AbstractFlinkResource<?, ?> cr);
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+/** Factory to construct a new autoscaler instance. */
+public interface JobAutoScalerFactory {
+
+    JobAutoScaler create(KubernetesClient kubernetesClient, EventRecorder eventRecorder);
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java
new file mode 100644
index 0000000..dd54b07
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+/** An autoscaler implementation which does nothing. */
+public class NoopJobAutoscalerFactory implements JobAutoScalerFactory, JobAutoScaler {
+
+    @Override
+    public JobAutoScaler create(KubernetesClient kubernetesClient, EventRecorder eventRecorder) {
+        return this;
+    }
+
+    @Override
+    public boolean scale(FlinkResourceContext ctx) {
+        return false;
+    }
+
+    @Override
+    public void cleanup(AbstractFlinkResource<?, ?> cr) {}
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index 7acefcd..655b3e2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -39,6 +39,7 @@
     private final FlinkConfigManager configManager;
     private final EventRecorder eventRecorder;
     private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> deploymentStatusRecorder;
+    private final JobAutoScalerFactory autoscalerFactory;
     private final Map<Tuple2<Mode, KubernetesDeploymentMode>, Reconciler<FlinkDeployment>>
             reconcilerMap;
 
@@ -46,11 +47,13 @@
             KubernetesClient kubernetesClient,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> deploymentStatusRecorder) {
+            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> deploymentStatusRecorder,
+            JobAutoScalerFactory autoscalerFactory) {
         this.kubernetesClient = kubernetesClient;
         this.configManager = configManager;
         this.eventRecorder = eventRecorder;
         this.deploymentStatusRecorder = deploymentStatusRecorder;
+        this.autoscalerFactory = autoscalerFactory;
         this.reconcilerMap = new ConcurrentHashMap<>();
     }
 
@@ -69,7 +72,10 @@
                                     configManager);
                         case APPLICATION:
                             return new ApplicationReconciler(
-                                    kubernetesClient, eventRecorder, deploymentStatusRecorder);
+                                    kubernetesClient,
+                                    eventRecorder,
+                                    deploymentStatusRecorder,
+                                    autoscalerFactory);
                         default:
                             throw new UnsupportedOperationException(
                                     String.format("Unsupported running mode: %s", modes.f0));
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index b1f93c2..eb486a2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -57,7 +57,7 @@
             EventRecorder eventRecorder,
             StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder,
             FlinkConfigManager configManager) {
-        super(kubernetesClient, eventRecorder, statusRecorder);
+        super(kubernetesClient, eventRecorder, statusRecorder, new NoopJobAutoscalerFactory());
         this.configManager = configManager;
     }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index a13da27..ccff139 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -26,6 +26,7 @@
 import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.NoopJobAutoscalerFactory;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
@@ -46,7 +47,7 @@
             KubernetesClient kubernetesClient,
             EventRecorder eventRecorder,
             StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> statusRecorder) {
-        super(kubernetesClient, eventRecorder, statusRecorder);
+        super(kubernetesClient, eventRecorder, statusRecorder, new NoopJobAutoscalerFactory());
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/AutoscalerLoader.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/AutoscalerLoader.java
new file mode 100644
index 0000000..37bac40
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/AutoscalerLoader.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.NoopJobAutoscalerFactory;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ServiceLoader;
+
+/** Loads the active Autoscaler implementation from the classpath. */
+public class AutoscalerLoader {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AutoscalerLoader.class);
+
+    public static JobAutoScalerFactory loadJobAutoscalerFactory() {
+        JobAutoScalerFactory factory = null;
+        boolean singleImplementation = true;
+
+        for (JobAutoScalerFactory discoveredFactory :
+                ServiceLoader.load(JobAutoScalerFactory.class)) {
+            LOG.info(
+                    "Discovered JobAutoScaler factory: {}", discoveredFactory.getClass().getName());
+            singleImplementation = factory == null;
+            factory = discoveredFactory;
+        }
+
+        if (factory == null) {
+            LOG.info("No JobAutoscaler implementation found. Autoscaling is disabled.");
+            return new NoopJobAutoscalerFactory();
+        }
+
+        Preconditions.checkState(
+                singleImplementation,
+                "Found multiple implementation for JobAutoScalerFactory. Please ensure only one implementation is present.");
+
+        return factory;
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml b/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml
index 93d7807..01b21c3 100644
--- a/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml
+++ b/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml
@@ -30,6 +30,7 @@
             <includes>
                 <include>org/apache/flink/kubernetes/operator/validation/TestValidator.class</include>
                 <include>org/apache/flink/kubernetes/operator/listener/TestingListener.class</include>
+                <include>org/apache/flink/kubernetes/operator/autoscaler/TestingAutoscaler.class</include>
             </includes>
         </fileSet>
         <fileSet>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingAutoscalerFactory.java
similarity index 72%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java
rename to flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingAutoscalerFactory.java
index ae1b98b..486fa96 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingAutoscalerFactory.java
@@ -17,14 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler;
 
-import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.NoopJobAutoscalerFactory;
 
-/** Cleanup interface for autoscaling related metadata. */
-public interface Cleanup {
-    /**
-     * Method is called when a custom resource is deleted.
-     *
-     * @param cr custom resource
-     */
-    void cleanup(AbstractFlinkResource<?, ?> cr);
-}
+/** Dummy autoscaler to test the plugin loading for the autoscaler. */
+public class TestingAutoscalerFactory extends NoopJobAutoscalerFactory {}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index 97e0adf..565f503 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -27,6 +27,7 @@
 import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.observer.deployment.FlinkDeploymentObserverFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.NoopJobAutoscalerFactory;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
 import org.apache.flink.kubernetes.operator.utils.EventCollector;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -82,7 +83,11 @@
                 new StatusRecorder<>(kubernetesClient, new MetricManager<>(), statusUpdateCounter);
         reconcilerFactory =
                 new ReconcilerFactory(
-                        kubernetesClient, configManager, eventRecorder, statusRecorder);
+                        kubernetesClient,
+                        configManager,
+                        eventRecorder,
+                        statusRecorder,
+                        new NoopJobAutoscalerFactory());
         flinkDeploymentController =
                 new FlinkDeploymentController(
                         configManager,
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 6c7269b..4d5bca3 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -95,7 +95,11 @@
         reconciler =
                 new TestReconcilerAdapter<>(
                         this,
-                        new ApplicationReconciler(kubernetesClient, eventRecorder, statusRecorder));
+                        new ApplicationReconciler(
+                                kubernetesClient,
+                                eventRecorder,
+                                statusRecorder,
+                                new NoopJobAutoscalerFactory()));
     }
 
     @ParameterizedTest
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index ca8ece2..6d16a5e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -71,7 +71,11 @@
         reconciler =
                 new TestReconcilerAdapter<>(
                         this,
-                        new ApplicationReconciler(kubernetesClient, eventRecorder, statusRecorder));
+                        new ApplicationReconciler(
+                                kubernetesClient,
+                                eventRecorder,
+                                statusRecorder,
+                                new NoopJobAutoscalerFactory()));
     }
 
     @ParameterizedTest
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/AutoscalerLoaderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/AutoscalerLoaderTest.java
new file mode 100644
index 0000000..cabc25b
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/AutoscalerLoaderTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.autoscaler.TestingAutoscalerFactory;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.NoopJobAutoscalerFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Test loading the default autoscaling implementation from the classpath. */
+public class AutoscalerLoaderTest {
+
+    @TempDir public Path temporaryFolder;
+
+    @Test
+    void testLoadFallbackNoopImplementation() {
+        JobAutoScalerFactory factory = AutoscalerLoader.loadJobAutoscalerFactory();
+        Assertions.assertTrue(factory instanceof NoopJobAutoscalerFactory);
+    }
+
+    @Test
+    void testLoadCustomImplementation() throws Exception {
+        Map<String, String> originalEnv = System.getenv();
+        try {
+            Map<String, String> systemEnv = new HashMap<>(originalEnv);
+            systemEnv.put(
+                    ConfigConstants.ENV_FLINK_PLUGINS_DIR,
+                    TestUtils.getTestPluginsRootDir(temporaryFolder));
+            TestUtils.setEnv(systemEnv);
+
+            JobAutoScalerFactory factory = AutoscalerLoader.loadJobAutoscalerFactory();
+            Assertions.assertTrue(factory instanceof TestingAutoscalerFactory);
+        } finally {
+            TestUtils.setEnv(originalEnv);
+        }
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory b/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory
new file mode 100644
index 0000000..0704c52
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.kubernetes.operator.autoscaler.TestingAutoscalerFactory
diff --git a/pom.xml b/pom.xml
index 897e354..0ea86fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,6 +53,7 @@
     <modules>
         <module>flink-kubernetes-standalone</module>
         <module>flink-kubernetes-operator</module>
+        <module>flink-kubernetes-operator-autoscaler</module>
         <module>flink-kubernetes-operator-api</module>
         <module>flink-kubernetes-webhook</module>
         <module>flink-kubernetes-docs</module>