blob: 40485acc92b86f864ad49d8ba883f30522728279 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.stream.Collectors;
/** Metric collector using flink rest api. */
public class RestApiMetricsCollector extends ScalingMetricCollector {
private static final Logger LOG = LoggerFactory.getLogger(RestApiMetricsCollector.class);
@Override
protected Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> queryAllAggregatedMetrics(
AbstractFlinkResource<?, ?> cr,
FlinkService flinkService,
Configuration conf,
Map<JobVertexID, Map<String, FlinkMetric>> filteredVertexMetricNames) {
return filteredVertexMetricNames.entrySet().stream()
.collect(
Collectors.toMap(
e -> e.getKey(),
e ->
queryAggregatedVertexMetrics(
flinkService, cr, conf, e.getKey(), e.getValue())));
}
@SneakyThrows
protected Map<FlinkMetric, AggregatedMetric> queryAggregatedVertexMetrics(
FlinkService flinkService,
AbstractFlinkResource<?, ?> cr,
Configuration conf,
JobVertexID jobVertexID,
Map<String, FlinkMetric> metrics) {
LOG.debug("Querying metrics {} for {}", metrics, jobVertexID);
var jobId = JobID.fromHexString(cr.getStatus().getJobStatus().getJobId());
var parameters = new AggregatedSubtaskMetricsParameters();
var pathIt = parameters.getPathParameters().iterator();
((JobIDPathParameter) pathIt.next()).resolve(jobId);
((JobVertexIdPathParameter) pathIt.next()).resolve(jobVertexID);
parameters
.getQueryParameters()
.iterator()
.next()
.resolveFromString(StringUtils.join(metrics.keySet(), ","));
try (var restClient = (RestClusterClient<String>) flinkService.getClusterClient(conf)) {
var responseBody =
restClient
.sendRequest(
AggregatedSubtaskMetricsHeaders.getInstance(),
parameters,
EmptyRequestBody.getInstance())
.get();
return responseBody.getMetrics().stream()
.collect(Collectors.toMap(m -> metrics.get(m.getId()), m -> m));
}
}
}