[FLINK-30776] Move autoscaler code into a separate Maven module and make it pluggable (#529)
The autoscaling code is tightly coupled with the main operator code. In this change, we move the code into a
dedicated Maven module and make it pluggable.
diff --git a/Dockerfile b/Dockerfile
index c4a2c77..98fdae7 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -23,10 +23,11 @@
COPY . .
-RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl flink-kubernetes-standalone,flink-kubernetes-operator-api,flink-kubernetes-operator,flink-kubernetes-webhook -DskipTests=$SKIP_TESTS
+RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl flink-kubernetes-standalone,flink-kubernetes-operator-api,flink-kubernetes-operator,flink-kubernetes-operator-autoscaler,flink-kubernetes-webhook -DskipTests=$SKIP_TESTS
RUN cd /app/tools/license; mkdir jars; cd jars; \
cp /app/flink-kubernetes-operator/target/flink-kubernetes-operator-*-shaded.jar . && \
+ cp /app/flink-kubernetes-operator-autoscaler/target/flink-kubernetes-operator-autoscaler-*.jar . && \
cp /app/flink-kubernetes-webhook/target/flink-kubernetes-webhook-*-shaded.jar . && \
cp /app/flink-kubernetes-standalone/target/flink-kubernetes-standalone-*-shaded.jar . && \
cp -r /app/flink-kubernetes-operator/target/plugins ./plugins && \
@@ -35,8 +36,10 @@
# stage
FROM eclipse-temurin:11-jre-jammy
ENV FLINK_HOME=/opt/flink
+ENV FLINK_PLUGINS_DIR=/opt/flink/plugins
ENV OPERATOR_VERSION=1.4-SNAPSHOT
ENV OPERATOR_JAR=flink-kubernetes-operator-$OPERATOR_VERSION-shaded.jar
+ENV AUTOSCALER_JAR=flink-kubernetes-operator-autoscaler-$OPERATOR_VERSION.jar
ENV WEBHOOK_JAR=flink-kubernetes-webhook-$OPERATOR_VERSION-shaded.jar
ENV FLINK_KUBERNETES_SHADED_JAR=flink-kubernetes-standalone-$OPERATOR_VERSION-shaded.jar
@@ -45,6 +48,7 @@
useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=flink flink
COPY --from=build /app/flink-kubernetes-operator/target/$OPERATOR_JAR .
+COPY --from=build /app/flink-kubernetes-operator-autoscaler/target/$AUTOSCALER_JAR .
COPY --from=build /app/flink-kubernetes-webhook/target/$WEBHOOK_JAR .
COPY --from=build /app/flink-kubernetes-standalone/target/$FLINK_KUBERNETES_SHADED_JAR .
COPY --from=build /app/flink-kubernetes-operator/target/plugins $FLINK_HOME/plugins
@@ -54,6 +58,7 @@
RUN chown -R flink:flink $FLINK_HOME && \
chown flink:flink $OPERATOR_JAR && \
+ chown flink:flink $AUTOSCALER_JAR && \
chown flink:flink $WEBHOOK_JAR && \
chown flink:flink $FLINK_KUBERNETES_SHADED_JAR && \
chown flink:flink /docker-entrypoint.sh
diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh
index 36581cd..756b1d0 100755
--- a/docker-entrypoint.sh
+++ b/docker-entrypoint.sh
@@ -29,7 +29,7 @@
elif [ "$1" = "operator" ]; then
echo "Starting Operator"
- exec java -cp ./$FLINK_KUBERNETES_SHADED_JAR:./$OPERATOR_JAR $LOG_CONFIG $JVM_ARGS org.apache.flink.kubernetes.operator.FlinkOperator
+ exec java -cp ./$FLINK_KUBERNETES_SHADED_JAR:./$OPERATOR_JAR:./$AUTOSCALER_JAR $LOG_CONFIG $JVM_ARGS org.apache.flink.kubernetes.operator.FlinkOperator
elif [ "$1" = "webhook" ]; then
echo "Starting Webhook"
diff --git a/flink-kubernetes-docs/pom.xml b/flink-kubernetes-docs/pom.xml
index 9822d52..9dd387b 100644
--- a/flink-kubernetes-docs/pom.xml
+++ b/flink-kubernetes-docs/pom.xml
@@ -43,6 +43,13 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-kubernetes-operator-autoscaler</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!-- Test -->
<dependency>
<groupId>org.junit.jupiter</groupId>
diff --git a/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java b/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java
index 0f302dc..2fb1e6a 100644
--- a/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java
@@ -77,7 +77,7 @@
"flink-kubernetes-operator",
"org.apache.flink.kubernetes.operator.metrics"),
new OptionsClassLocation(
- "flink-kubernetes-operator",
+ "flink-kubernetes-operator-autoscaler",
"org.apache.flink.kubernetes.operator.autoscaler.config")
};
static final String DEFAULT_PATH_PREFIX = "src/main/java";
diff --git a/flink-kubernetes-operator-autoscaler/pom.xml b/flink-kubernetes-operator-autoscaler/pom.xml
new file mode 100644
index 0000000..7263787
--- /dev/null
+++ b/flink-kubernetes-operator-autoscaler/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-kubernetes-operator-parent</artifactId>
+ <version>1.4-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-kubernetes-operator-autoscaler</artifactId>
+ <name>Flink Kubernetes Operator Autoscaler</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <auto-service.version>1.0.1</auto-service.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-kubernetes-operator</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-kubernetes</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <version>${auto-service.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>${lombok.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <version>${junit.jupiter.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-kubernetes-operator</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-kubernetes-operator-api</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-server-mock</artifactId>
+ <version>${fabric8.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
similarity index 98%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
index bbfd9ab..bff2228 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
@@ -49,7 +49,7 @@
/** Class for encapsulating information stored for each resource when using the autoscaler. */
public class AutoScalerInfo {
- private static final Logger LOG = LoggerFactory.getLogger(JobAutoScaler.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AutoScalerInfo.class);
private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
similarity index 92%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index 4f74a52..ee4e75a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -23,7 +23,7 @@
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
-import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -40,9 +40,9 @@
import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
/** Application and SessionJob autoscaler. */
-public class JobAutoScaler implements Cleanup {
+public class JobAutoScalerImpl implements JobAutoScaler {
- private static final Logger LOG = LoggerFactory.getLogger(JobAutoScaler.class);
+ private static final Logger LOG = LoggerFactory.getLogger(JobAutoScalerImpl.class);
private final KubernetesClient kubernetesClient;
private final ScalingMetricCollector metricsCollector;
@@ -53,7 +53,7 @@
lastEvaluatedMetrics = new ConcurrentHashMap<>();
private final Map<ResourceID, Set<JobVertexID>> registeredMetrics = new ConcurrentHashMap<>();
- public JobAutoScaler(
+ public JobAutoScalerImpl(
KubernetesClient kubernetesClient,
ScalingMetricCollector metricsCollector,
ScalingMetricEvaluator evaluator,
@@ -70,13 +70,13 @@
public void cleanup(AbstractFlinkResource<?, ?> cr) {
LOG.info("Cleaning up autoscaling meta data");
metricsCollector.cleanup(cr);
- scalingExecutor.cleanup(cr);
var resourceId = ResourceID.fromResource(cr);
lastEvaluatedMetrics.remove(resourceId);
registeredMetrics.remove(resourceId);
}
- public boolean scale(FlinkResourceContext<?> ctx) {
+ @Override
+ public boolean scale(FlinkResourceContext<? extends AbstractFlinkResource<?, ?>> ctx) {
var conf = ctx.getObserveConfig();
var resource = ctx.getResource();
@@ -172,13 +172,4 @@
});
});
}
-
- public static JobAutoScaler create(
- KubernetesClient kubernetesClient, EventRecorder eventRecorder) {
- return new JobAutoScaler(
- kubernetesClient,
- new RestApiMetricsCollector(),
- new ScalingMetricEvaluator(),
- new ScalingExecutor(kubernetesClient, eventRecorder));
- }
}
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java
new file mode 100644
index 0000000..128381d
--- /dev/null
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+
+import com.google.auto.service.AutoService;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+/**
+ * Factory for loading JobAutoScalerImpl included in this module. This class will be dynamically
+ * instantiated by the main operator module.
+ */
+@AutoService(JobAutoScalerFactory.class)
+public class JobAutoscalerFactoryImpl implements JobAutoScalerFactory {
+ @Override
+ public JobAutoScaler create(KubernetesClient kubernetesClient, EventRecorder eventRecorder) {
+ return new JobAutoScalerImpl(
+ kubernetesClient,
+ new RestApiMetricsCollector(),
+ new ScalingMetricEvaluator(),
+ new ScalingExecutor(kubernetesClient, eventRecorder));
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
similarity index 98%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
index bda9dfe..a1f5b55 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -51,7 +51,7 @@
import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
/** Class responsible for executing scaling decisions. */
-public class ScalingExecutor implements Cleanup {
+public class ScalingExecutor {
public static final ConfigOption<Map<String, String>> PARALLELISM_OVERRIDES =
ConfigOptions.key("pipeline.jobvertex-parallelism-overrides")
@@ -270,9 +270,4 @@
this.clock = Preconditions.checkNotNull(clock);
jobVertexScaler.setClock(clock);
}
-
- @Override
- public void cleanup(AbstractFlinkResource<?, ?> cr) {
- // No cleanup is currently necessary
- }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
similarity index 99%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index 6b33d4b..0fd8c49 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -64,7 +64,7 @@
import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SOURCE_SCALING_ENABLED;
/** Metric collector using flink rest api. */
-public abstract class ScalingMetricCollector implements Cleanup {
+public abstract class ScalingMetricCollector {
private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricCollector.class);
private final Map<ResourceID, Tuple2<Long, Map<JobVertexID, Map<String, FlinkMetric>>>>
@@ -434,7 +434,6 @@
Configuration conf,
Map<JobVertexID, Map<String, FlinkMetric>> filteredVertexMetricNames);
- @Override
public void cleanup(AbstractFlinkResource<?, ?> cr) {
var resourceId = ResourceID.fromResource(cr);
histories.remove(resourceId);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java
similarity index 100%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java
rename to flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
similarity index 100%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
rename to flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
similarity index 98%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
rename to flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index b0e48dd..26a1c61 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -66,7 +66,7 @@
private FlinkDeployment app;
private JobVertexID source1, sink;
- private JobAutoScaler autoscaler;
+ private JobAutoScalerImpl autoscaler;
@BeforeEach
public void setup() {
@@ -107,7 +107,8 @@
app.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
autoscaler =
- new JobAutoScaler(kubernetesClient, metricsCollector, evaluator, scalingExecutor);
+ new JobAutoScalerImpl(
+ kubernetesClient, metricsCollector, evaluator, scalingExecutor);
}
@Test
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
similarity index 100%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
rename to flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
similarity index 100%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
rename to flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
similarity index 99%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
rename to flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index b42137d..e87167e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -197,7 +197,6 @@
assertNotNull(metricsCollector.getTopologies().get(resourceID));
metricsCollector.cleanup(app);
- scalingExecutor.cleanup(app);
assertNull(metricsCollector.getHistories().get(resourceID));
assertNull(metricsCollector.getAvailableVertexMetricNames().get(resourceID));
assertNull(metricsCollector.getTopologies().get(resourceID));
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
similarity index 100%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
rename to flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
similarity index 100%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
rename to flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
similarity index 100%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
rename to flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
similarity index 100%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
rename to flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
diff --git a/flink-kubernetes-operator/pom.xml b/flink-kubernetes-operator/pom.xml
index 7029e40..eb196ca 100644
--- a/flink-kubernetes-operator/pom.xml
+++ b/flink-kubernetes-operator/pom.xml
@@ -324,6 +324,18 @@
</executions>
</plugin>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>3.3.0</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<!-- Description: https://github.com/git-commit-id/git-commit-id-maven-plugin
Used to show the git ref when starting the Flink Kubernetes operator. -->
<groupId>io.github.git-commit-id</groupId>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 08bcb65..154a17d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -39,6 +39,7 @@
import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.AutoscalerLoader;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
@@ -154,8 +155,10 @@
MetricManager.createFlinkDeploymentMetricManager(configManager, metricGroup);
var statusRecorder = StatusRecorder.create(client, metricManager, listeners);
var eventRecorder = EventRecorder.create(client, listeners);
+ var autoscalerFactory = AutoscalerLoader.loadJobAutoscalerFactory();
var reconcilerFactory =
- new ReconcilerFactory(client, configManager, eventRecorder, statusRecorder);
+ new ReconcilerFactory(
+ client, configManager, eventRecorder, statusRecorder, autoscalerFactory);
var observerFactory = new FlinkDeploymentObserverFactory(configManager, eventRecorder);
var controller =
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index afd2a9d..540884b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -33,7 +33,6 @@
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
-import org.apache.flink.kubernetes.operator.autoscaler.JobAutoScaler;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
@@ -86,11 +85,12 @@
public AbstractFlinkResourceReconciler(
KubernetesClient kubernetesClient,
EventRecorder eventRecorder,
- StatusRecorder<CR, STATUS> statusRecorder) {
+ StatusRecorder<CR, STATUS> statusRecorder,
+ JobAutoScalerFactory autoscalerFactory) {
this.kubernetesClient = kubernetesClient;
this.eventRecorder = eventRecorder;
this.statusRecorder = statusRecorder;
- this.resourceScaler = JobAutoScaler.create(kubernetesClient, eventRecorder);
+ this.resourceScaler = autoscalerFactory.create(kubernetesClient, eventRecorder);
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index b9e2fa6..445fcb4 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -55,8 +55,9 @@
public AbstractJobReconciler(
KubernetesClient kubernetesClient,
EventRecorder eventRecorder,
- StatusRecorder<CR, STATUS> statusRecorder) {
- super(kubernetesClient, eventRecorder, statusRecorder);
+ StatusRecorder<CR, STATUS> statusRecorder,
+ JobAutoScalerFactory autoscalerFactory) {
+ super(kubernetesClient, eventRecorder, statusRecorder, autoscalerFactory);
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index c941873..6739667 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -66,8 +66,9 @@
public ApplicationReconciler(
KubernetesClient kubernetesClient,
EventRecorder eventRecorder,
- StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder) {
- super(kubernetesClient, eventRecorder, statusRecorder);
+ StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder,
+ JobAutoScalerFactory autoscalerFactory) {
+ super(kubernetesClient, eventRecorder, statusRecorder, autoscalerFactory);
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java
similarity index 66%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java
index ae1b98b..58db4bc 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java
@@ -15,16 +15,17 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
-/** Cleanup interface for autoscaling related metadata. */
-public interface Cleanup {
- /**
- * Method is called when a custom resource is deleted.
- *
- * @param cr custom resource
- */
+/** Per-job Autoscaler instance. */
+public interface JobAutoScaler {
+
+ /** Called as part of the reconciliation loop. Returns true if this call led to scaling. */
+ boolean scale(FlinkResourceContext<? extends AbstractFlinkResource<?, ?>> ctx);
+
+ /** Called when the custom resource is deleted. */
void cleanup(AbstractFlinkResource<?, ?> cr);
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScalerFactory.java
similarity index 67%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScalerFactory.java
index ae1b98b..927657a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScalerFactory.java
@@ -15,16 +15,14 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
-import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-/** Cleanup interface for autoscaling related metadata. */
-public interface Cleanup {
- /**
- * Method is called when a custom resource is deleted.
- *
- * @param cr custom resource
- */
- void cleanup(AbstractFlinkResource<?, ?> cr);
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+/** Factory to construct a new autoscaler instance. */
+public interface JobAutoScalerFactory {
+
+ JobAutoScaler create(KubernetesClient kubernetesClient, EventRecorder eventRecorder);
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java
new file mode 100644
index 0000000..dd54b07
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+/** An autoscaler implementation which does nothing. */
+public class NoopJobAutoscalerFactory implements JobAutoScalerFactory, JobAutoScaler {
+
+ @Override
+ public JobAutoScaler create(KubernetesClient kubernetesClient, EventRecorder eventRecorder) {
+ return this;
+ }
+
+ @Override
+ public boolean scale(FlinkResourceContext ctx) {
+ return false;
+ }
+
+ @Override
+ public void cleanup(AbstractFlinkResource<?, ?> cr) {}
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index 7acefcd..655b3e2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -39,6 +39,7 @@
private final FlinkConfigManager configManager;
private final EventRecorder eventRecorder;
private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> deploymentStatusRecorder;
+ private final JobAutoScalerFactory autoscalerFactory;
private final Map<Tuple2<Mode, KubernetesDeploymentMode>, Reconciler<FlinkDeployment>>
reconcilerMap;
@@ -46,11 +47,13 @@
KubernetesClient kubernetesClient,
FlinkConfigManager configManager,
EventRecorder eventRecorder,
- StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> deploymentStatusRecorder) {
+ StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> deploymentStatusRecorder,
+ JobAutoScalerFactory autoscalerFactory) {
this.kubernetesClient = kubernetesClient;
this.configManager = configManager;
this.eventRecorder = eventRecorder;
this.deploymentStatusRecorder = deploymentStatusRecorder;
+ this.autoscalerFactory = autoscalerFactory;
this.reconcilerMap = new ConcurrentHashMap<>();
}
@@ -69,7 +72,10 @@
configManager);
case APPLICATION:
return new ApplicationReconciler(
- kubernetesClient, eventRecorder, deploymentStatusRecorder);
+ kubernetesClient,
+ eventRecorder,
+ deploymentStatusRecorder,
+ autoscalerFactory);
default:
throw new UnsupportedOperationException(
String.format("Unsupported running mode: %s", modes.f0));
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index b1f93c2..eb486a2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -57,7 +57,7 @@
EventRecorder eventRecorder,
StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder,
FlinkConfigManager configManager) {
- super(kubernetesClient, eventRecorder, statusRecorder);
+ super(kubernetesClient, eventRecorder, statusRecorder, new NoopJobAutoscalerFactory());
this.configManager = configManager;
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index a13da27..ccff139 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -26,6 +26,7 @@
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.NoopJobAutoscalerFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
@@ -46,7 +47,7 @@
KubernetesClient kubernetesClient,
EventRecorder eventRecorder,
StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> statusRecorder) {
- super(kubernetesClient, eventRecorder, statusRecorder);
+ super(kubernetesClient, eventRecorder, statusRecorder, new NoopJobAutoscalerFactory());
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/AutoscalerLoader.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/AutoscalerLoader.java
new file mode 100644
index 0000000..37bac40
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/AutoscalerLoader.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.NoopJobAutoscalerFactory;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ServiceLoader;
+
+/** Loads the active Autoscaler implementation from the classpath. */
+public class AutoscalerLoader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AutoscalerLoader.class);
+
+ public static JobAutoScalerFactory loadJobAutoscalerFactory() {
+ JobAutoScalerFactory factory = null;
+ boolean singleImplementation = true;
+
+ for (JobAutoScalerFactory discoveredFactory :
+ ServiceLoader.load(JobAutoScalerFactory.class)) {
+ LOG.info(
+ "Discovered JobAutoScaler factory: {}", discoveredFactory.getClass().getName());
+ singleImplementation = factory == null;
+ factory = discoveredFactory;
+ }
+
+ if (factory == null) {
+ LOG.info("No JobAutoscaler implementation found. Autoscaling is disabled.");
+ return new NoopJobAutoscalerFactory();
+ }
+
+ Preconditions.checkState(
+ singleImplementation,
+ "Found multiple implementation for JobAutoScalerFactory. Please ensure only one implementation is present.");
+
+ return factory;
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml b/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml
index 93d7807..01b21c3 100644
--- a/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml
+++ b/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml
@@ -30,6 +30,7 @@
<includes>
<include>org/apache/flink/kubernetes/operator/validation/TestValidator.class</include>
<include>org/apache/flink/kubernetes/operator/listener/TestingListener.class</include>
+ <include>org/apache/flink/kubernetes/operator/autoscaler/TestingAutoscaler.class</include>
</includes>
</fileSet>
<fileSet>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingAutoscalerFactory.java
similarity index 72%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java
rename to flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingAutoscalerFactory.java
index ae1b98b..486fa96 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingAutoscalerFactory.java
@@ -17,14 +17,7 @@
package org.apache.flink.kubernetes.operator.autoscaler;
-import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.NoopJobAutoscalerFactory;
-/** Cleanup interface for autoscaling related metadata. */
-public interface Cleanup {
- /**
- * Method is called when a custom resource is deleted.
- *
- * @param cr custom resource
- */
- void cleanup(AbstractFlinkResource<?, ?> cr);
-}
+/** Dummy autoscaler to test the plugin loading for the autoscaler. */
+public class TestingAutoscalerFactory extends NoopJobAutoscalerFactory {}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index 97e0adf..565f503 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -27,6 +27,7 @@
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.observer.deployment.FlinkDeploymentObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.NoopJobAutoscalerFactory;
import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.utils.EventCollector;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -82,7 +83,11 @@
new StatusRecorder<>(kubernetesClient, new MetricManager<>(), statusUpdateCounter);
reconcilerFactory =
new ReconcilerFactory(
- kubernetesClient, configManager, eventRecorder, statusRecorder);
+ kubernetesClient,
+ configManager,
+ eventRecorder,
+ statusRecorder,
+ new NoopJobAutoscalerFactory());
flinkDeploymentController =
new FlinkDeploymentController(
configManager,
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 6c7269b..4d5bca3 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -95,7 +95,11 @@
reconciler =
new TestReconcilerAdapter<>(
this,
- new ApplicationReconciler(kubernetesClient, eventRecorder, statusRecorder));
+ new ApplicationReconciler(
+ kubernetesClient,
+ eventRecorder,
+ statusRecorder,
+ new NoopJobAutoscalerFactory()));
}
@ParameterizedTest
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index ca8ece2..6d16a5e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -71,7 +71,11 @@
reconciler =
new TestReconcilerAdapter<>(
this,
- new ApplicationReconciler(kubernetesClient, eventRecorder, statusRecorder));
+ new ApplicationReconciler(
+ kubernetesClient,
+ eventRecorder,
+ statusRecorder,
+ new NoopJobAutoscalerFactory()));
}
@ParameterizedTest
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/AutoscalerLoaderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/AutoscalerLoaderTest.java
new file mode 100644
index 0000000..cabc25b
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/AutoscalerLoaderTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.autoscaler.TestingAutoscalerFactory;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.NoopJobAutoscalerFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Test loading the default autoscaling implementation from the classpath. */
+public class AutoscalerLoaderTest {
+
+ @TempDir public Path temporaryFolder;
+
+ @Test
+ void testLoadFallbackNoopImplementation() {
+ JobAutoScalerFactory factory = AutoscalerLoader.loadJobAutoscalerFactory();
+ Assertions.assertTrue(factory instanceof NoopJobAutoscalerFactory);
+ }
+
+ @Test
+ void testLoadCustomImplementation() throws Exception {
+ Map<String, String> originalEnv = System.getenv();
+ try {
+ Map<String, String> systemEnv = new HashMap<>(originalEnv);
+ systemEnv.put(
+ ConfigConstants.ENV_FLINK_PLUGINS_DIR,
+ TestUtils.getTestPluginsRootDir(temporaryFolder));
+ TestUtils.setEnv(systemEnv);
+
+ JobAutoScalerFactory factory = AutoscalerLoader.loadJobAutoscalerFactory();
+ Assertions.assertTrue(factory instanceof TestingAutoscalerFactory);
+ } finally {
+ TestUtils.setEnv(originalEnv);
+ }
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory b/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory
new file mode 100644
index 0000000..0704c52
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.kubernetes.operator.autoscaler.TestingAutoscalerFactory
diff --git a/pom.xml b/pom.xml
index 897e354..0ea86fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,6 +53,7 @@
<modules>
<module>flink-kubernetes-standalone</module>
<module>flink-kubernetes-operator</module>
+ <module>flink-kubernetes-operator-autoscaler</module>
<module>flink-kubernetes-operator-api</module>
<module>flink-kubernetes-webhook</module>
<module>flink-kubernetes-docs</module>