blob: 085d80ae62fc60f5e87826d5b1f8b329f2d6920b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.autoscaler;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.MetricAggregator;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.GC_PRESSURE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MAX_USAGE_RATIO;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MEMORY_USED;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MANAGED_MEMORY_USED;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.METASPACE_MEMORY_USED;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_TASK_SLOTS_USED;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.OBSERVED_TPR;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
/** Job scaling evaluator for autoscaler. */
public class ScalingMetricEvaluator {
private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricEvaluator.class);
public EvaluatedMetrics evaluate(
Configuration conf, CollectedMetricHistory collectedMetrics, Duration restartTime) {
LOG.debug("Restart time used in metrics evaluation: {}", restartTime);
var scalingOutput = new HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>();
var metricsHistory = collectedMetrics.getMetricHistory();
var topology = collectedMetrics.getJobTopology();
boolean processingBacklog = isProcessingBacklog(topology, metricsHistory, conf);
for (var vertex : topology.getVerticesInTopologicalOrder()) {
scalingOutput.put(
vertex,
evaluateMetrics(
conf,
scalingOutput,
metricsHistory,
topology,
vertex,
processingBacklog,
restartTime));
}
var globalMetrics = evaluateGlobalMetrics(metricsHistory);
return new EvaluatedMetrics(scalingOutput, globalMetrics);
}
@VisibleForTesting
protected static boolean isProcessingBacklog(
JobTopology topology,
SortedMap<Instant, CollectedMetrics> metricsHistory,
Configuration conf) {
var lastMetrics = metricsHistory.get(metricsHistory.lastKey()).getVertexMetrics();
return topology.getVerticesInTopologicalOrder().stream()
.filter(topology::isSource)
.anyMatch(
vertex -> {
double lag = lastMetrics.get(vertex).getOrDefault(LAG, 0.0);
double inputRateAvg =
getRate(ScalingMetric.NUM_RECORDS_IN, vertex, metricsHistory);
if (Double.isNaN(inputRateAvg)) {
return false;
}
double lagSeconds = lag / inputRateAvg;
if (lagSeconds
> conf.get(BACKLOG_PROCESSING_LAG_THRESHOLD).toSeconds()) {
LOG.info("Currently processing backlog at source {}", vertex);
return true;
} else {
return false;
}
});
}
@Nonnull
private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
Configuration conf,
HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> scalingOutput,
SortedMap<Instant, CollectedMetrics> metricsHistory,
JobTopology topology,
JobVertexID vertex,
boolean processingBacklog,
Duration restartTime) {
var latestVertexMetrics =
metricsHistory.get(metricsHistory.lastKey()).getVertexMetrics().get(vertex);
var vertexInfo = topology.get(vertex);
double inputRateAvg = getRate(ScalingMetric.NUM_RECORDS_IN, vertex, metricsHistory);
var evaluatedMetrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
computeTargetDataRate(
topology,
vertex,
conf,
inputRateAvg,
scalingOutput,
metricsHistory,
latestVertexMetrics,
evaluatedMetrics);
double busyTimeAvg =
computeBusyTimeAvg(conf, metricsHistory, vertex, vertexInfo.getParallelism());
evaluatedMetrics.put(
TRUE_PROCESSING_RATE,
EvaluatedScalingMetric.avg(
computeTrueProcessingRate(
busyTimeAvg, inputRateAvg, metricsHistory, vertex, conf)));
evaluatedMetrics.put(LOAD, EvaluatedScalingMetric.avg(busyTimeAvg / 1000.));
Optional.ofNullable(latestVertexMetrics.get(LAG))
.ifPresent(l -> evaluatedMetrics.put(LAG, EvaluatedScalingMetric.of(l)));
evaluatedMetrics.put(PARALLELISM, EvaluatedScalingMetric.of(vertexInfo.getParallelism()));
evaluatedMetrics.put(
MAX_PARALLELISM, EvaluatedScalingMetric.of(vertexInfo.getMaxParallelism()));
computeProcessingRateThresholds(evaluatedMetrics, conf, processingBacklog, restartTime);
return evaluatedMetrics;
}
/**
* Compute the average busy time for the given vertex for the current metric window. Depending
* on the {@link MetricAggregator} chosen we use two different mechanisms:
*
* <ol>
* <li>For AVG aggregator we compute from accumulated busy time to get the most precise metric
* <li>or MAX/MIN aggregators we have to average over the point-in-time MAX/MIN values
* collected over the metric windows. These are stored in the LOAD metric.
* </ol>
*
* @param conf
* @param metricsHistory
* @param vertex
* @param parallelism
* @return Average busy time in the current metric window
*/
@VisibleForTesting
protected static double computeBusyTimeAvg(
Configuration conf,
SortedMap<Instant, CollectedMetrics> metricsHistory,
JobVertexID vertex,
int parallelism) {
if (conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR) == MetricAggregator.AVG) {
return getRate(ScalingMetric.ACCUMULATED_BUSY_TIME, vertex, metricsHistory)
/ parallelism;
} else {
return getAverage(LOAD, vertex, metricsHistory) * 1000;
}
}
/**
* Compute the true processing rate for the given vertex for the current metric window. The
* computation takes into account both observed (during catchup) and busy time based processing
* rate and selects the right metric depending on the config.
*
* @param busyTimeAvg
* @param inputRateAvg
* @param metricsHistory
* @param vertex
* @param conf
* @return Average true processing rate over metric window.
*/
protected static double computeTrueProcessingRate(
double busyTimeAvg,
double inputRateAvg,
SortedMap<Instant, CollectedMetrics> metricsHistory,
JobVertexID vertex,
Configuration conf) {
var busyTimeTpr = computeTprFromBusyTime(busyTimeAvg, inputRateAvg);
var observedTprAvg =
getAverage(
OBSERVED_TPR,
vertex,
metricsHistory,
conf.get(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS));
var tprMetric = selectTprMetric(vertex, conf, busyTimeTpr, observedTprAvg);
return tprMetric == OBSERVED_TPR ? observedTprAvg : busyTimeTpr;
}
private static double computeTprFromBusyTime(double busyMsPerSecond, double rate) {
if (rate == 0) {
// Nothing is coming in, we assume infinite processing power
// until we can sample the true processing rate (i.e. data flows).
return Double.POSITIVE_INFINITY;
}
return rate / (busyMsPerSecond / 1000);
}
private static ScalingMetric selectTprMetric(
JobVertexID jobVertexID,
Configuration conf,
double busyTimeTprAvg,
double observedTprAvg) {
if (Double.isNaN(observedTprAvg)) {
return TRUE_PROCESSING_RATE;
}
if (Double.isInfinite(busyTimeTprAvg) || Double.isNaN(busyTimeTprAvg)) {
return OBSERVED_TPR;
}
double switchThreshold =
conf.get(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_SWITCH_THRESHOLD);
// If we could measure the observed tpr we decide whether to switch to using it
// instead of busy time based on the error / difference between the two
if (busyTimeTprAvg > observedTprAvg * (1 + switchThreshold)) {
LOG.debug(
"Using observed tpr {} for {} as busy time based seems too large ({})",
observedTprAvg,
jobVertexID,
busyTimeTprAvg);
return OBSERVED_TPR;
} else {
LOG.debug("Using busy time based tpr {} for {}.", busyTimeTprAvg, jobVertexID);
return TRUE_PROCESSING_RATE;
}
}
@VisibleForTesting
protected static void computeProcessingRateThresholds(
Map<ScalingMetric, EvaluatedScalingMetric> metrics,
Configuration conf,
boolean processingBacklog,
Duration restartTime) {
double utilizationBoundary = conf.getDouble(TARGET_UTILIZATION_BOUNDARY);
double targetUtilization = conf.get(TARGET_UTILIZATION);
double upperUtilization;
double lowerUtilization;
if (processingBacklog) {
// When we are processing backlog we allow max utilization and we do not trigger scale
// down on under utilization to avoid creating more lag.
upperUtilization = 1.0;
lowerUtilization = 0.0;
} else {
upperUtilization = targetUtilization + utilizationBoundary;
lowerUtilization = targetUtilization - utilizationBoundary;
}
double scaleUpThreshold =
AutoScalerUtils.getTargetProcessingCapacity(
metrics, conf, upperUtilization, false, restartTime);
double scaleDownThreshold =
AutoScalerUtils.getTargetProcessingCapacity(
metrics, conf, lowerUtilization, true, restartTime);
metrics.put(SCALE_UP_RATE_THRESHOLD, EvaluatedScalingMetric.of(scaleUpThreshold));
metrics.put(SCALE_DOWN_RATE_THRESHOLD, EvaluatedScalingMetric.of(scaleDownThreshold));
}
private void computeTargetDataRate(
JobTopology topology,
JobVertexID vertex,
Configuration conf,
double inputRate,
HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> alreadyEvaluated,
SortedMap<Instant, CollectedMetrics> metricsHistory,
Map<ScalingMetric, Double> latestVertexMetrics,
Map<ScalingMetric, EvaluatedScalingMetric> out) {
if (topology.isSource(vertex)) {
double catchUpTargetSec = conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();
double lagRate = getRate(LAG, vertex, metricsHistory);
double ingestionDataRate = Math.max(0, inputRate + lagRate);
if (Double.isNaN(ingestionDataRate)) {
throw new RuntimeException(
"Cannot evaluate metrics without ingestion rate information");
}
out.put(TARGET_DATA_RATE, EvaluatedScalingMetric.avg(ingestionDataRate));
double lag = latestVertexMetrics.getOrDefault(LAG, 0.);
double catchUpInputRate = catchUpTargetSec == 0 ? 0 : lag / catchUpTargetSec;
if (catchUpInputRate > 0) {
LOG.debug(
"Extra backlog processing input rate for {} is {}",
vertex,
catchUpInputRate);
}
out.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchUpInputRate));
} else {
var inputs = topology.get(vertex).getInputs().keySet();
double sumAvgTargetRate = 0;
double sumCatchUpDataRate = 0;
for (var inputVertex : inputs) {
var inputEvaluatedMetrics = alreadyEvaluated.get(inputVertex);
var inputTargetRate = inputEvaluatedMetrics.get(TARGET_DATA_RATE);
var outputRatio =
computeEdgeOutputRatio(inputVertex, vertex, topology, metricsHistory);
LOG.debug(
"Computed output ratio for edge ({} -> {}) : {}",
inputVertex,
vertex,
outputRatio);
sumAvgTargetRate += inputTargetRate.getAverage() * outputRatio;
sumCatchUpDataRate +=
inputEvaluatedMetrics.get(CATCH_UP_DATA_RATE).getCurrent() * outputRatio;
}
out.put(TARGET_DATA_RATE, EvaluatedScalingMetric.avg(sumAvgTargetRate));
out.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(sumCatchUpDataRate));
}
}
@VisibleForTesting
protected static Map<ScalingMetric, EvaluatedScalingMetric> evaluateGlobalMetrics(
SortedMap<Instant, CollectedMetrics> metricHistory) {
var latest = metricHistory.get(metricHistory.lastKey()).getGlobalMetrics();
var out = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
var gcPressure = latest.getOrDefault(GC_PRESSURE, Double.NaN);
out.put(GC_PRESSURE, EvaluatedScalingMetric.of(gcPressure));
populateMetric(HEAP_MAX_USAGE_RATIO, metricHistory, out);
populateMetric(HEAP_MEMORY_USED, metricHistory, out);
populateMetric(MANAGED_MEMORY_USED, metricHistory, out);
populateMetric(METASPACE_MEMORY_USED, metricHistory, out);
out.put(
NUM_TASK_SLOTS_USED,
EvaluatedScalingMetric.of(latest.getOrDefault(NUM_TASK_SLOTS_USED, Double.NaN)));
return out;
}
private static void populateMetric(
ScalingMetric scalingMetric,
SortedMap<Instant, CollectedMetrics> metricHistory,
Map<ScalingMetric, EvaluatedScalingMetric> out) {
var latestMetrics = metricHistory.get(metricHistory.lastKey()).getGlobalMetrics();
var latestObservation = latestMetrics.getOrDefault(scalingMetric, Double.NaN);
double value = getAverageGlobalMetric(scalingMetric, metricHistory);
out.put(scalingMetric, new EvaluatedScalingMetric(latestObservation, value));
}
private static double getAverageGlobalMetric(
ScalingMetric metric, SortedMap<Instant, CollectedMetrics> metricsHistory) {
return getAverage(metric, null, metricsHistory);
}
public static double getAverage(
ScalingMetric metric,
@Nullable JobVertexID jobVertexId,
SortedMap<Instant, CollectedMetrics> metricsHistory) {
return getAverage(metric, jobVertexId, metricsHistory, 1);
}
/**
* Compute per second rate for the given accumulated metric over the metric window.
*
* @param metric
* @param jobVertexId
* @param metricsHistory
* @return Per second rate or Double.NaN if we don't have at least 2 observations.
*/
public static double getRate(
ScalingMetric metric,
@Nullable JobVertexID jobVertexId,
SortedMap<Instant, CollectedMetrics> metricsHistory) {
Instant firstTs = null;
double first = Double.NaN;
Instant lastTs = null;
double last = Double.NaN;
for (var entry : metricsHistory.entrySet()) {
double value =
entry.getValue()
.getVertexMetrics()
.get(jobVertexId)
.getOrDefault(metric, Double.NaN);
if (!Double.isNaN(value)) {
if (Double.isNaN(first)) {
first = value;
firstTs = entry.getKey();
} else {
last = value;
lastTs = entry.getKey();
}
}
}
if (Double.isNaN(last)) {
return Double.NaN;
}
return 1000 * (last - first) / Duration.between(firstTs, lastTs).toMillis();
}
public static double getAverage(
ScalingMetric metric,
@Nullable JobVertexID jobVertexId,
SortedMap<Instant, CollectedMetrics> metricsHistory,
int minElements) {
double sum = 0;
int n = 0;
boolean anyInfinite = false;
for (var collectedMetrics : metricsHistory.values()) {
var metrics =
jobVertexId != null
? collectedMetrics.getVertexMetrics().get(jobVertexId)
: collectedMetrics.getGlobalMetrics();
double num = metrics.getOrDefault(metric, Double.NaN);
if (Double.isNaN(num)) {
continue;
}
if (Double.isInfinite(num)) {
anyInfinite = true;
continue;
}
sum += num;
n++;
}
if (n == 0) {
return anyInfinite ? Double.POSITIVE_INFINITY : Double.NaN;
}
return n < minElements ? Double.NaN : sum / n;
}
/**
* Compute the In/Out ratio between the (from, to) vertices. The rate estimates the number of
* output records produced to the downstream vertex for every input received for the upstream
* vertex. For example output ratio 2.0 means that we produce approximately 2 outputs to the
* "to" vertex for every 1 input received in the "from" vertex.
*
* @param from Upstream vertex
* @param to Downstream vertex
* @param topology
* @param metricsHistory
* @return Output ratio
*/
@VisibleForTesting
protected static double computeEdgeOutputRatio(
JobVertexID from,
JobVertexID to,
JobTopology topology,
SortedMap<Instant, CollectedMetrics> metricsHistory) {
double inputRate = getRate(ScalingMetric.NUM_RECORDS_IN, from, metricsHistory);
double outputRatio = 0;
// If the input rate is zero, we also need to flatten the output rate.
// Otherwise, the OUTPUT_RATIO would be outrageously large, leading to
// a rapid scale up.
if (inputRate > 0) {
double outputRate = computeEdgeDataRate(topology, metricsHistory, from, to);
if (outputRate > 0) {
outputRatio = outputRate / inputRate;
}
}
return outputRatio;
}
/**
* Compute how many records flow between two job vertices in the pipeline. Since Flink does not
* expose any output / data rate metric on an edge level we have to compute this from the vertex
* level input/output metrics.
*
* @param topology
* @param metricsHistory
* @param from Upstream vertex
* @param to Downstream vertex
* @return Records per second data rate between the two vertices
*/
@VisibleForTesting
protected static double computeEdgeDataRate(
JobTopology topology,
SortedMap<Instant, CollectedMetrics> metricsHistory,
JobVertexID from,
JobVertexID to) {
var toVertexInputs = topology.get(to).getInputs().keySet();
// Case 1: Downstream vertex has single input (from) so we can use the most reliable num
// records in
if (toVertexInputs.size() == 1) {
LOG.debug(
"Computing edge ({}, {}) data rate for single input downstream task", from, to);
return getRate(ScalingMetric.NUM_RECORDS_IN, to, metricsHistory);
}
// Case 2: Downstream vertex has only inputs from upstream vertices which don't have other
// outputs
double inputRateFromOtherVertices = 0;
for (JobVertexID input : toVertexInputs) {
if (input.equals(from)) {
// Exclude source edge because we only want to consider other input edges
continue;
}
if (topology.get(input).getOutputs().size() == 1) {
inputRateFromOtherVertices +=
getRate(ScalingMetric.NUM_RECORDS_OUT, input, metricsHistory);
} else {
// Output vertex has multiple outputs, cannot use this information...
inputRateFromOtherVertices = Double.NaN;
break;
}
}
if (!Double.isNaN(inputRateFromOtherVertices)) {
LOG.debug(
"Computing edge ({}, {}) data rate by subtracting upstream input rates",
from,
to);
return getRate(ScalingMetric.NUM_RECORDS_IN, to, metricsHistory)
- inputRateFromOtherVertices;
}
// Case 3: We fall back simply to num records out, this is the least reliable
LOG.debug(
"Computing edge ({}, {}) data rate by falling back to from num records out",
from,
to);
return getRate(ScalingMetric.NUM_RECORDS_OUT, from, metricsHistory);
}
}