blob: fb4655dfbcfc7d2fae7f4799a540e0bae38e8725 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.utils.EventCollector;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import static org.apache.flink.kubernetes.operator.autoscaler.JobVertexScaler.INNEFFECTIVE_MESSAGE_FORMAT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** Test for vertex parallelism scaler logic. */
@EnableKubernetesMockClient(crud = true)
public class JobVertexScalerTest {
private JobVertexScaler vertexScaler;
private Configuration conf;
private KubernetesClient kubernetesClient;
private EventCollector eventCollector;
private FlinkDeployment flinkDep;
@BeforeEach
public void setup() {
flinkDep = TestUtils.buildApplicationCluster();
kubernetesClient.resource(flinkDep).createOrReplace();
eventCollector = new EventCollector();
vertexScaler = new JobVertexScaler(new EventRecorder(kubernetesClient, eventCollector));
conf = new Configuration();
conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
}
@Test
public void testParallelismScaling() {
var op = new JobVertexID();
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
assertEquals(
5,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
assertEquals(
8,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
assertEquals(
10,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(10, 80, 100), Collections.emptySortedMap()));
conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
assertEquals(
8,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(10, 60, 100), Collections.emptySortedMap()));
assertEquals(
8,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(10, 59, 100), Collections.emptySortedMap()));
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
assertEquals(
10,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(2, 100, 40), Collections.emptySortedMap()));
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
assertEquals(
4,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(2, 100, 100), Collections.emptySortedMap()));
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
assertEquals(
5,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6);
assertEquals(
4,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5);
assertEquals(
15,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(10, 200, 10), Collections.emptySortedMap()));
conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.6);
assertEquals(
16,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(10, 200, 10), Collections.emptySortedMap()));
}
@Test
public void testParallelismComputation() {
final int minParallelism = 1;
final int maxParallelism = Integer.MAX_VALUE;
assertEquals(1, JobVertexScaler.scale(1, 720, 0.0001, minParallelism, maxParallelism));
assertEquals(1, JobVertexScaler.scale(2, 720, 0.1, minParallelism, maxParallelism));
assertEquals(5, JobVertexScaler.scale(6, 720, 0.8, minParallelism, maxParallelism));
assertEquals(32, JobVertexScaler.scale(16, 128, 1.5, minParallelism, maxParallelism));
assertEquals(400, JobVertexScaler.scale(200, 720, 2, minParallelism, maxParallelism));
assertEquals(
720,
JobVertexScaler.scale(200, 720, Integer.MAX_VALUE, minParallelism, maxParallelism));
}
@Test
public void testParallelismComputationWithLimit() {
assertEquals(5, JobVertexScaler.scale(6, 720, 0.8, 1, 700));
assertEquals(8, JobVertexScaler.scale(8, 720, 0.8, 8, 700));
assertEquals(32, JobVertexScaler.scale(16, 128, 1.5, 1, Integer.MAX_VALUE));
assertEquals(64, JobVertexScaler.scale(16, 128, 1.5, 60, Integer.MAX_VALUE));
assertEquals(300, JobVertexScaler.scale(200, 720, 2, 1, 300));
assertEquals(600, JobVertexScaler.scale(200, 720, Integer.MAX_VALUE, 1, 600));
}
@Test
public void ensureMinParallelismDoesNotExceedMax() {
Assert.assertThrows(
IllegalArgumentException.class,
() ->
assertEquals(
600, JobVertexScaler.scale(200, 720, Integer.MAX_VALUE, 500, 499)));
}
@Test
public void testMinParallelismLimitIsUsed() {
conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5);
assertEquals(
5,
vertexScaler.computeScaleTargetParallelism(
flinkDep,
conf,
new JobVertexID(),
evaluated(10, 100, 500),
Collections.emptySortedMap()));
// Make sure we respect current parallelism in case it's lower
assertEquals(
4,
vertexScaler.computeScaleTargetParallelism(
flinkDep,
conf,
new JobVertexID(),
evaluated(4, 100, 500),
Collections.emptySortedMap()));
}
@Test
public void testMaxParallelismLimitIsUsed() {
conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
assertEquals(
10,
vertexScaler.computeScaleTargetParallelism(
flinkDep,
conf,
new JobVertexID(),
evaluated(10, 500, 100),
Collections.emptySortedMap()));
// Make sure we respect current parallelism in case it's higher
assertEquals(
12,
vertexScaler.computeScaleTargetParallelism(
flinkDep,
conf,
new JobVertexID(),
evaluated(12, 500, 100),
Collections.emptySortedMap()));
}
@Test
public void testScaleDownAfterScaleUpDetection() {
var op = new JobVertexID();
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ofMinutes(1));
var clock = Clock.systemDefaultZone();
vertexScaler.setClock(clock);
var evaluated = evaluated(5, 100, 50);
var history = new TreeMap<Instant, ScalingSummary>();
assertEquals(
10,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
history.put(clock.instant(), new ScalingSummary(5, 10, evaluated));
// Should not allow scale back down immediately
evaluated = evaluated(10, 50, 100);
assertEquals(
10,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
// Pass some time...
clock = Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(61));
vertexScaler.setClock(clock);
assertEquals(
5,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
history.put(clock.instant(), new ScalingSummary(10, 5, evaluated));
// Allow immediate scale up
evaluated = evaluated(5, 100, 50);
assertEquals(
10,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
history.put(clock.instant(), new ScalingSummary(5, 10, evaluated));
}
@Test
public void testIneffectiveScalingDetection() {
var op = new JobVertexID();
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true);
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO);
var evaluated = evaluated(5, 100, 50);
var history = new TreeMap<Instant, ScalingSummary>();
assertEquals(
10,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
assertEquals(100, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
history.put(Instant.now(), new ScalingSummary(5, 10, evaluated));
// Allow to scale higher if scaling was effective (80%)
evaluated = evaluated(10, 180, 90);
assertEquals(
20,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
assertEquals(180, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
history.put(Instant.now(), new ScalingSummary(10, 20, evaluated));
// Detect ineffective scaling, less than 5% of target increase (instead of 90 -> 180, only
// 90 -> 94. Do not try to scale above 20
evaluated = evaluated(20, 180, 94);
assertEquals(
20,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
// Still considered ineffective (less than <10%)
evaluated = evaluated(20, 180, 98);
assertEquals(
20,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
// Allow scale up if current parallelism doesnt match last (user rescaled manually)
evaluated = evaluated(10, 180, 90);
assertEquals(
20,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
// Over 10%, effective
evaluated = evaluated(20, 180, 100);
assertEquals(
36,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
// Ineffective but detection is turned off
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, false);
evaluated = evaluated(20, 180, 90);
assertEquals(
40,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true);
// Allow scale down even if ineffective
evaluated = evaluated(20, 45, 90);
assertEquals(
10,
vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history));
assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
}
@Test
public void testSendingIneffectiveScalingEvents() {
var jobVertexID = new JobVertexID();
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true);
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0);
conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO);
var evaluated = evaluated(5, 100, 50);
var history = new TreeMap<Instant, ScalingSummary>();
assertEquals(
10,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, jobVertexID, evaluated, history));
assertEquals(100, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
history.put(Instant.now(), new ScalingSummary(5, 10, evaluated));
// Effective scale, no events triggered
evaluated = evaluated(10, 180, 90);
assertEquals(
20,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, jobVertexID, evaluated, history));
assertEquals(180, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
history.put(Instant.now(), new ScalingSummary(10, 20, evaluated));
assertEquals(0, eventCollector.events.size());
// Ineffective scale, an event is triggered
evaluated = evaluated(20, 180, 95);
assertEquals(
20,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, jobVertexID, evaluated, history));
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
assertEquals(1, eventCollector.events.size());
var event = eventCollector.events.poll();
assertEquals(String.format(INNEFFECTIVE_MESSAGE_FORMAT, jobVertexID), event.getMessage());
assertEquals(EventRecorder.Reason.IneffectiveScaling.name(), event.getReason());
}
private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
int parallelism, double targetDataRate, double trueProcessingRate) {
var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
metrics.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(parallelism));
metrics.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(720));
metrics.put(
ScalingMetric.TARGET_DATA_RATE,
new EvaluatedScalingMetric(targetDataRate, targetDataRate));
metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(0.));
metrics.put(
ScalingMetric.TRUE_PROCESSING_RATE,
new EvaluatedScalingMetric(trueProcessingRate, trueProcessingRate));
ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf, false);
return metrics;
}
}