blob: aa4c742678f32cc472b69cb8826b0070720046c0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.operator.metrics;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils.SynchronizedMeterView;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import okhttp3.Interceptor;
import okhttp3.Request;
import okhttp3.Response;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** Kubernetes client metrics. */
public class KubernetesClientMetrics implements Interceptor {
public static final String KUBE_CLIENT_GROUP = "KubeClient";
public static final String HTTP_REQUEST_GROUP = "HttpRequest";
public static final String HTTP_REQUEST_FAILED_GROUP = "Failed";
public static final String HTTP_RESPONSE_GROUP = "HttpResponse";
public static final String HTTP_RESPONSE_1XX = "1xx";
public static final String HTTP_RESPONSE_2XX = "2xx";
public static final String HTTP_RESPONSE_3XX = "3xx";
public static final String HTTP_RESPONSE_4XX = "4xx";
public static final String HTTP_RESPONSE_5XX = "5xx";
public static final String COUNTER = "Count";
public static final String METER = "NumPerSecond";
public static final String HISTO = "TimeNanos";
private final Histogram responseLatency;
private final MetricGroup requestMetricGroup;
private final MetricGroup failedRequestMetricGroup;
private final MetricGroup responseMetricGroup;
private final Counter requestCounter;
private final Counter failedRequestCounter;
private final Counter responseCounter;
private final SynchronizedMeterView requestRateMeter;
private final SynchronizedMeterView requestFailedRateMeter;
private final SynchronizedMeterView responseRateMeter;
private final boolean httpResponseCodeGroupsEnabled;
private final List<SynchronizedMeterView> responseCodeGroupMeters = new ArrayList<>(5);
private final Map<Integer, SynchronizedMeterView> responseCodeMeters =
new ConcurrentHashMap<>();
private final Map<String, Counter> requestMethodCounter = new ConcurrentHashMap<>();
public KubernetesClientMetrics(
MetricGroup parentGroup, FlinkOperatorConfiguration flinkOperatorConfiguration) {
MetricGroup metricGroup = parentGroup.addGroup(KUBE_CLIENT_GROUP);
this.requestMetricGroup = metricGroup.addGroup(HTTP_REQUEST_GROUP);
this.failedRequestMetricGroup = requestMetricGroup.addGroup(HTTP_REQUEST_FAILED_GROUP);
this.responseMetricGroup = metricGroup.addGroup(HTTP_RESPONSE_GROUP);
this.requestCounter =
OperatorMetricUtils.synchronizedCounter(requestMetricGroup.counter(COUNTER));
this.failedRequestCounter =
OperatorMetricUtils.synchronizedCounter(failedRequestMetricGroup.counter(COUNTER));
this.responseCounter =
OperatorMetricUtils.synchronizedCounter(responseMetricGroup.counter(COUNTER));
this.requestRateMeter =
OperatorMetricUtils.synchronizedMeterView(
requestMetricGroup.meter(METER, new MeterView(requestCounter)));
this.requestFailedRateMeter =
OperatorMetricUtils.synchronizedMeterView(
failedRequestMetricGroup.meter(METER, new MeterView(failedRequestCounter)));
this.responseRateMeter =
OperatorMetricUtils.synchronizedMeterView(
responseMetricGroup.meter(METER, new MeterView(responseCounter)));
this.responseLatency =
responseMetricGroup.histogram(
HISTO, OperatorMetricUtils.createHistogram(flinkOperatorConfiguration));
this.httpResponseCodeGroupsEnabled =
flinkOperatorConfiguration.isKubernetesClientMetricsHttpResponseCodeGroupsEnabled();
if (this.httpResponseCodeGroupsEnabled) {
this.responseCodeGroupMeters.add(
createMeterViewForMetricsGroup(
responseMetricGroup.addGroup(HTTP_RESPONSE_1XX)));
this.responseCodeGroupMeters.add(
createMeterViewForMetricsGroup(
responseMetricGroup.addGroup(HTTP_RESPONSE_2XX)));
this.responseCodeGroupMeters.add(
createMeterViewForMetricsGroup(
responseMetricGroup.addGroup(HTTP_RESPONSE_3XX)));
this.responseCodeGroupMeters.add(
createMeterViewForMetricsGroup(
responseMetricGroup.addGroup(HTTP_RESPONSE_4XX)));
this.responseCodeGroupMeters.add(
createMeterViewForMetricsGroup(
responseMetricGroup.addGroup(HTTP_RESPONSE_5XX)));
}
}
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
updateRequestMetrics(request);
Response response = null;
final long startTime = System.nanoTime();
try {
response = chain.proceed(request);
return response;
} finally {
updateResponseMetrics(response, startTime);
}
}
private void updateRequestMetrics(Request request) {
this.requestRateMeter.markEvent();
getCounterByRequestMethod(request.method()).inc();
}
private void updateResponseMetrics(Response response, long startTimeNanos) {
final long latency = System.nanoTime() - startTimeNanos;
if (response != null) {
this.responseRateMeter.markEvent();
this.responseLatency.update(latency);
getMeterViewByResponseCode(response.code()).markEvent();
if (this.httpResponseCodeGroupsEnabled) {
responseCodeGroupMeters.get(response.code() / 100 - 1).markEvent();
}
} else {
this.requestFailedRateMeter.markEvent();
}
}
private Counter getCounterByRequestMethod(String method) {
return requestMethodCounter.computeIfAbsent(
method,
key ->
OperatorMetricUtils.synchronizedCounter(
requestMetricGroup.addGroup(key).counter(COUNTER)));
}
private SynchronizedMeterView getMeterViewByResponseCode(int code) {
return responseCodeMeters.computeIfAbsent(
code, key -> createMeterViewForMetricsGroup(responseMetricGroup.addGroup(key)));
}
private SynchronizedMeterView createMeterViewForMetricsGroup(MetricGroup metricGroup) {
return OperatorMetricUtils.synchronizedMeterView(
metricGroup.meter(
METER,
new MeterView(
OperatorMetricUtils.synchronizedCounter(
metricGroup.counter(COUNTER)))));
}
}