[FLINK-32134] Autoscaler min/max parallelism configs should respect current parallelism
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
index 90d46a8..f6c6a88 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
@@ -120,8 +120,8 @@
currentParallelism,
(int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
scaleFactor,
- conf.getInteger(VERTEX_MIN_PARALLELISM),
- conf.getInteger(VERTEX_MAX_PARALLELISM));
+ Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)),
+ Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM)));
if (newParallelism == currentParallelism
|| blockScalingBasedOnPastActions(
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
index 0548229..fb4655d 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
@@ -187,6 +187,16 @@
new JobVertexID(),
evaluated(10, 100, 500),
Collections.emptySortedMap()));
+
+ // Make sure we respect current parallelism in case it's lower
+ assertEquals(
+ 4,
+ vertexScaler.computeScaleTargetParallelism(
+ flinkDep,
+ conf,
+ new JobVertexID(),
+ evaluated(4, 100, 500),
+ Collections.emptySortedMap()));
}
@Test
@@ -201,6 +211,16 @@
new JobVertexID(),
evaluated(10, 500, 100),
Collections.emptySortedMap()));
+
+ // Make sure we respect current parallelism in case it's higher
+ assertEquals(
+ 12,
+ vertexScaler.computeScaleTargetParallelism(
+ flinkDep,
+ conf,
+ new JobVertexID(),
+ evaluated(12, 500, 100),
+ Collections.emptySortedMap()));
}
@Test