[FLINK-32134] Autoscaler min/max parallelism configs should respect current parallelism
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
index 90d46a8..f6c6a88 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
@@ -120,8 +120,8 @@
                         currentParallelism,
                         (int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
                         scaleFactor,
-                        conf.getInteger(VERTEX_MIN_PARALLELISM),
-                        conf.getInteger(VERTEX_MAX_PARALLELISM));
+                        Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)),
+                        Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM)));
 
         if (newParallelism == currentParallelism
                 || blockScalingBasedOnPastActions(
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
index 0548229..fb4655d 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
@@ -187,6 +187,16 @@
                         new JobVertexID(),
                         evaluated(10, 100, 500),
                         Collections.emptySortedMap()));
+
+        // Make sure we respect current parallelism in case it's lower
+        assertEquals(
+                4,
+                vertexScaler.computeScaleTargetParallelism(
+                        flinkDep,
+                        conf,
+                        new JobVertexID(),
+                        evaluated(4, 100, 500),
+                        Collections.emptySortedMap()));
     }
 
     @Test
@@ -201,6 +211,16 @@
                         new JobVertexID(),
                         evaluated(10, 500, 100),
                         Collections.emptySortedMap()));
+
+        // Make sure we respect current parallelism in case it's higher
+        assertEquals(
+                12,
+                vertexScaler.computeScaleTargetParallelism(
+                        flinkDep,
+                        conf,
+                        new JobVertexID(),
+                        evaluated(12, 500, 100),
+                        Collections.emptySortedMap()));
     }
 
     @Test