[FLINK-29939]Add metrics for Kubernetes Client Response 5xx count and rate
diff --git a/docs/content/docs/operations/metrics-logging.md b/docs/content/docs/operations/metrics-logging.md
index 75d97ae..56c33fe 100644
--- a/docs/content/docs/operations/metrics-logging.md
+++ b/docs/content/docs/operations/metrics-logging.md
@@ -62,17 +62,35 @@
The Operator gathers various metrics related to Kubernetes API server access.
-| Scope | Metrics | Description | Type |
-|--------|----------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|
-| System | KubeClient.HttpRequest.Count | Number of HTTP request sent to the Kubernetes API Server | Counter |
-| System | KubeClient.HttpRequest.<RequestMethod>.Count | Number of HTTP request sent to the Kubernetes API Server per request method. <RequestMethod> can take values from: GET, POST, PUT, PATCH, DELETE, etc. | Counter |
-| System | KubeClient.HttpRequest.Failed.Count | Number of failed HTTP requests that has no response from the Kubernetes API Server | Counter |
-| System | KubeClient.HttpResponse.Count | Number of HTTP responses received from the Kubernetes API Server | Counter |
-| System | KubeClient.HttpResponse.<ResponseCode>.Count | Number of HTTP responses received from the Kubernetes API Server per response code. <ResponseCode> can take values from: 200, 404, 503, etc. | Counter |
-| System | KubeClient.HttpRequest.NumPerSecond | Number of HTTP requests sent to the Kubernetes API Server per second | Meter |
-| System | KubeClient.HttpRequest.Failed.NumPerSecond | Number of failed HTTP requests sent to the Kubernetes API Server per second | Meter |
-| System | KubeClient.HttpResponse.NumPerSecond | Number of HTTP responses received from the Kubernetes API Server per second | Meter |
-| System | KubeClient.HttpResponse.TimeNanos | Latency statistics obtained from the HTTP responses received from the Kubernetes API Server | Histogram |
+| Scope | Metrics | Description | Type |
+|--------|-----------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|
+| System | KubeClient.HttpRequest.Count | Number of HTTP request sent to the Kubernetes API Server | Counter |
+| System | KubeClient.HttpRequest.<RequestMethod>.Count | Number of HTTP request sent to the Kubernetes API Server per request method. <RequestMethod> can take values from: GET, POST, PUT, PATCH, DELETE, etc. | Counter |
+| System | KubeClient.HttpRequest.Failed.Count | Number of failed HTTP requests that has no response from the Kubernetes API Server | Counter |
+| System | KubeClient.HttpResponse.Count | Number of HTTP responses received from the Kubernetes API Server | Counter |
+| System | KubeClient.HttpResponse.<ResponseCode>.Count | Number of HTTP responses received from the Kubernetes API Server per response code. <ResponseCode> can take values from: 200, 404, 503, etc. | Counter |
+| System | KubeClient.HttpResponse.<ResponseCode>.NumPerSecond | Number of HTTP responses received from the Kubernetes API Server per response code per second. <ResponseCode> can take values from: 200, 404, 503, etc.| Meter |
+| System | KubeClient.HttpRequest.NumPerSecond | Number of HTTP requests sent to the Kubernetes API Server per second | Meter |
+| System | KubeClient.HttpRequest.Failed.NumPerSecond | Number of failed HTTP requests sent to the Kubernetes API Server per second | Meter |
+| System | KubeClient.HttpResponse.NumPerSecond | Number of HTTP responses received from the Kubernetes API Server per second | Meter |
+| System | KubeClient.HttpResponse.TimeNanos | Latency statistics obtained from the HTTP responses received from the Kubernetes API Server | Histogram |
+
+#### Kubernetes client metrics by Http Response Code
+
+It's possible to publish additional metrics by Http response code received from API server by setting `kubernetes.client.metrics.http.response.code.groups.enabled` to `true` .
+
+| Scope | Metrics | Description | Type |
+|--------|-----------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|
+| System | KubeClient.HttpResponse.1xx.Count | Number of HTTP Code 1xx responses (informational) received from the Kubernetes API Server per response code. | Counter |
+| System | KubeClient.HttpResponse.2xx.Count | Number of HTTP Code 2xx responses (success) received from the Kubernetes API Server per response code. | Counter |
+| System | KubeClient.HttpResponse.3xx.Count | Number of HTTP Code 3xx responses (redirection) received from the Kubernetes API Server per response code. | Counter |
+| System | KubeClient.HttpResponse.4xx.Count | Number of HTTP Code 4xx responses (client error) received from the Kubernetes API Server per response code. | Counter |
+| System | KubeClient.HttpResponse.5xx.Count | Number of HTTP Code 5xx responses (server error) received from the Kubernetes API Server per response code. | Counter |
+| System | KubeClient.HttpResponse.1xx.NumPerSecond | Number of HTTP Code 1xx responses (informational) received from the Kubernetes API Server per response code per second. | Meter |
+| System | KubeClient.HttpResponse.2xx.NumPerSecond | Number of HTTP Code 2xx responses (success) received from the Kubernetes API Server per response code per second. | Meter |
+| System | KubeClient.HttpResponse.3xx.NumPerSecond | Number of HTTP Code 3xx responses (redirection) received from the Kubernetes API Server per response code per second. | Meter |
+| System | KubeClient.HttpResponse.4xx.NumPerSecond | Number of HTTP Code 4xx responses (client error) received from the Kubernetes API Server per response code per second. | Meter |
+| System | KubeClient.HttpResponse.5xx.NumPerSecond | Number of HTTP Code 5xx responses (server error) received from the Kubernetes API Server per response code per second. | Meter |
### JVM Metrics
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
index 939339e..a326654 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
@@ -27,6 +27,12 @@
<td>Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server.</td>
</tr>
<tr>
+ <td><h5>kubernetes.operator.kubernetes.client.metrics.http.response.code.groups.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server by response code group, e.g. 1xx, 2xx.</td>
+ </tr>
+ <tr>
<td><h5>kubernetes.operator.metrics.histogram.sample.size</h5></td>
<td style="word-wrap: break-word;">1000</td>
<td>Integer</td>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index f65a2f0..ce6b97c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -54,6 +54,7 @@
boolean josdkMetricsEnabled;
int metricsHistogramSampleSize;
boolean kubernetesClientMetricsEnabled;
+ boolean kubernetesClientMetricsHttpResponseCodeGroupsEnabled;
Duration flinkCancelJobTimeout;
Duration flinkShutdownClusterTimeout;
String artifactsBaseDir;
@@ -157,6 +158,11 @@
operatorConfig.get(
KubernetesOperatorMetricOptions.OPERATOR_KUBERNETES_CLIENT_METRICS_ENABLED);
+ boolean kubernetesClientMetricsHttpResponseCodeGroupsEnabled =
+ operatorConfig.get(
+ KubernetesOperatorMetricOptions
+ .OPERATOR_KUBERNETES_CLIENT_METRICS_HTTP_RESPONSE_CODE_GROUPS_ENABLED);
+
int metricsHistogramSampleSize =
operatorConfig.get(
KubernetesOperatorMetricOptions.OPERATOR_METRICS_HISTOGRAM_SAMPLE_SIZE);
@@ -178,6 +184,7 @@
josdkMetricsEnabled,
metricsHistogramSampleSize,
kubernetesClientMetricsEnabled,
+ kubernetesClientMetricsHttpResponseCodeGroupsEnabled,
flinkCancelJobTimeout,
flinkShutdownClusterTimeout,
artifactsBaseDir,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java
index 1dc6d3d..aa4c742 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java
@@ -30,6 +30,8 @@
import okhttp3.Response;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -40,6 +42,11 @@
public static final String HTTP_REQUEST_GROUP = "HttpRequest";
public static final String HTTP_REQUEST_FAILED_GROUP = "Failed";
public static final String HTTP_RESPONSE_GROUP = "HttpResponse";
+ public static final String HTTP_RESPONSE_1XX = "1xx";
+ public static final String HTTP_RESPONSE_2XX = "2xx";
+ public static final String HTTP_RESPONSE_3XX = "3xx";
+ public static final String HTTP_RESPONSE_4XX = "4xx";
+ public static final String HTTP_RESPONSE_5XX = "5xx";
public static final String COUNTER = "Count";
public static final String METER = "NumPerSecond";
public static final String HISTO = "TimeNanos";
@@ -57,7 +64,10 @@
private final SynchronizedMeterView requestFailedRateMeter;
private final SynchronizedMeterView responseRateMeter;
- private final Map<Integer, Counter> responseCodeCounters = new ConcurrentHashMap<>();
+ private final boolean httpResponseCodeGroupsEnabled;
+ private final List<SynchronizedMeterView> responseCodeGroupMeters = new ArrayList<>(5);
+ private final Map<Integer, SynchronizedMeterView> responseCodeMeters =
+ new ConcurrentHashMap<>();
private final Map<String, Counter> requestMethodCounter = new ConcurrentHashMap<>();
public KubernetesClientMetrics(
@@ -88,6 +98,26 @@
this.responseLatency =
responseMetricGroup.histogram(
HISTO, OperatorMetricUtils.createHistogram(flinkOperatorConfiguration));
+
+ this.httpResponseCodeGroupsEnabled =
+ flinkOperatorConfiguration.isKubernetesClientMetricsHttpResponseCodeGroupsEnabled();
+ if (this.httpResponseCodeGroupsEnabled) {
+ this.responseCodeGroupMeters.add(
+ createMeterViewForMetricsGroup(
+ responseMetricGroup.addGroup(HTTP_RESPONSE_1XX)));
+ this.responseCodeGroupMeters.add(
+ createMeterViewForMetricsGroup(
+ responseMetricGroup.addGroup(HTTP_RESPONSE_2XX)));
+ this.responseCodeGroupMeters.add(
+ createMeterViewForMetricsGroup(
+ responseMetricGroup.addGroup(HTTP_RESPONSE_3XX)));
+ this.responseCodeGroupMeters.add(
+ createMeterViewForMetricsGroup(
+ responseMetricGroup.addGroup(HTTP_RESPONSE_4XX)));
+ this.responseCodeGroupMeters.add(
+ createMeterViewForMetricsGroup(
+ responseMetricGroup.addGroup(HTTP_RESPONSE_5XX)));
+ }
}
@Override
@@ -114,7 +144,10 @@
if (response != null) {
this.responseRateMeter.markEvent();
this.responseLatency.update(latency);
- getCounterByResponseCode(response.code()).inc();
+ getMeterViewByResponseCode(response.code()).markEvent();
+ if (this.httpResponseCodeGroupsEnabled) {
+ responseCodeGroupMeters.get(response.code() / 100 - 1).markEvent();
+ }
} else {
this.requestFailedRateMeter.markEvent();
}
@@ -128,11 +161,17 @@
requestMetricGroup.addGroup(key).counter(COUNTER)));
}
- private Counter getCounterByResponseCode(int code) {
- return responseCodeCounters.computeIfAbsent(
- code,
- key ->
- OperatorMetricUtils.synchronizedCounter(
- responseMetricGroup.addGroup(key).counter(COUNTER)));
+ private SynchronizedMeterView getMeterViewByResponseCode(int code) {
+ return responseCodeMeters.computeIfAbsent(
+ code, key -> createMeterViewForMetricsGroup(responseMetricGroup.addGroup(key)));
+ }
+
+ private SynchronizedMeterView createMeterViewForMetricsGroup(MetricGroup metricGroup) {
+ return OperatorMetricUtils.synchronizedMeterView(
+ metricGroup.meter(
+ METER,
+ new MeterView(
+ OperatorMetricUtils.synchronizedCounter(
+ metricGroup.counter(COUNTER)))));
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
index dfc876d..bf13380 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -44,6 +44,14 @@
.withDescription(
"Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server.");
+ public static final ConfigOption<Boolean>
+ OPERATOR_KUBERNETES_CLIENT_METRICS_HTTP_RESPONSE_CODE_GROUPS_ENABLED =
+ operatorConfig("kubernetes.client.metrics.http.response.code.groups.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server by response code group, e.g. 1xx, 2xx.");
+
public static final ConfigOption<Boolean> OPERATOR_RESOURCE_METRICS_ENABLED =
operatorConfig("resource.metrics.enabled")
.booleanType()
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java
index 64f27da..2dd86a5 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java
@@ -72,8 +72,20 @@
String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, METER);
private static final String RESPONSE_201_COUNTER_ID =
String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, "201", COUNTER);
+ private static final String RESPONSE_201_METER_ID =
+ String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, "201", METER);
+ private static final String RESPONSE_2xx_COUNTER_ID =
+ String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, "2xx", COUNTER);
+ private static final String RESPONSE_2xx_METER_ID =
+ String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, "2xx", METER);
private static final String RESPONSE_404_COUNTER_ID =
String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, "404", COUNTER);
+ private static final String RESPONSE_404_METER_ID =
+ String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, "404", METER);
+ private static final String RESPONSE_4xx_COUNTER_ID =
+ String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, "4xx", COUNTER);
+ private static final String RESPONSE_4xx_METER_ID =
+ String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, "4xx", METER);
private static final String RESPONSE_LATENCY_ID =
String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, HISTO);
@@ -203,6 +215,160 @@
&& listener.getMeter(listener.getMetricId(RESPONSE_METER_ID))
.get()
.getRate()
+ > 0.01
+ && listener.getMeter(
+ listener.getMetricId(
+ RESPONSE_201_METER_ID))
+ .get()
+ .getRate()
+ > 0.01
+ && listener.getMeter(
+ listener.getMetricId(
+ RESPONSE_404_METER_ID))
+ .get()
+ .getRate()
+ > 0.01;
+ });
+ }
+
+ @Test
+ @Order(3)
+ public void testMetricsHttpResponseCodeGroupsEnabled() {
+ var configuration = new Configuration();
+ configuration.set(
+ KubernetesOperatorMetricOptions
+ .OPERATOR_KUBERNETES_CLIENT_METRICS_HTTP_RESPONSE_CODE_GROUPS_ENABLED,
+ true);
+ var flinkConfigManager = new FlinkConfigManager(configuration);
+ var listener = new TestingMetricListener(configuration);
+ var kubernetesClient =
+ KubernetesClientUtils.getKubernetesClient(
+ flinkConfigManager.getOperatorConfiguration(),
+ listener.getMetricGroup(),
+ mockServer.createClient().getConfiguration());
+
+ var deployment = TestUtils.buildApplicationCluster();
+ assertEquals(
+ 0, listener.getCounter(listener.getMetricId(REQUEST_COUNTER_ID)).get().getCount());
+ assertEquals(
+ 0.0, listener.getMeter(listener.getMetricId(REQUEST_METER_ID)).get().getRate());
+ assertEquals(
+ 0,
+ listener.getCounter(listener.getMetricId(REQUEST_FAILED_COUNTER_ID))
+ .get()
+ .getCount());
+ assertEquals(
+ 0.0,
+ listener.getMeter(listener.getMetricId(REQUEST_FAILED_METER_ID)).get().getRate());
+ assertEquals(
+ 0, listener.getCounter(listener.getMetricId(RESPONSE_COUNTER_ID)).get().getCount());
+ assertEquals(
+ 0.0, listener.getMeter(listener.getMetricId(RESPONSE_METER_ID)).get().getRate());
+ assertEquals(
+ 0,
+ listener.getHistogram(listener.getMetricId(RESPONSE_LATENCY_ID))
+ .get()
+ .getStatistics()
+ .getMin());
+ assertEquals(
+ 0,
+ listener.getHistogram(listener.getMetricId(RESPONSE_LATENCY_ID))
+ .get()
+ .getStatistics()
+ .getMax());
+
+ kubernetesClient.resource(deployment).createOrReplace();
+ assertEquals(
+ 1, listener.getCounter(listener.getMetricId(REQUEST_COUNTER_ID)).get().getCount());
+ assertEquals(
+ 1,
+ listener.getCounter(listener.getMetricId(REQUEST_POST_COUNTER_ID))
+ .get()
+ .getCount());
+ assertEquals(
+ 1, listener.getCounter(listener.getMetricId(RESPONSE_COUNTER_ID)).get().getCount());
+ assertEquals(
+ 1,
+ listener.getCounter(listener.getMetricId(RESPONSE_201_COUNTER_ID))
+ .get()
+ .getCount());
+ assertEquals(
+ 1,
+ listener.getCounter(listener.getMetricId(RESPONSE_2xx_COUNTER_ID))
+ .get()
+ .getCount());
+ assertTrue(
+ listener.getHistogram(listener.getMetricId(RESPONSE_LATENCY_ID))
+ .get()
+ .getStatistics()
+ .getMin()
+ > 0);
+ assertTrue(
+ listener.getHistogram(listener.getMetricId(RESPONSE_LATENCY_ID))
+ .get()
+ .getStatistics()
+ .getMax()
+ > 0);
+
+ kubernetesClient.resource(deployment).delete();
+ assertEquals(
+ 1,
+ listener.getCounter(listener.getMetricId(REQUEST_DELETE_COUNTER_ID))
+ .get()
+ .getCount());
+
+ kubernetesClient.resource(deployment).delete();
+ assertEquals(
+ 2,
+ listener.getCounter(listener.getMetricId(REQUEST_DELETE_COUNTER_ID))
+ .get()
+ .getCount());
+ assertEquals(
+ 1,
+ listener.getCounter(listener.getMetricId(RESPONSE_404_COUNTER_ID))
+ .get()
+ .getCount());
+ assertEquals(
+ 1,
+ listener.getCounter(listener.getMetricId(RESPONSE_4xx_COUNTER_ID))
+ .get()
+ .getCount());
+ Awaitility.await()
+ .atMost(1, TimeUnit.MINUTES)
+ .until(
+ () -> {
+ kubernetesClient.resource(deployment).createOrReplace();
+ return listener.getMeter(listener.getMetricId(REQUEST_METER_ID))
+ .get()
+ .getRate()
+ > 0.01
+ && listener.getMeter(listener.getMetricId(RESPONSE_METER_ID))
+ .get()
+ .getRate()
+ > 0.01
+ && listener.getMeter(
+ listener.getMetricId(
+ RESPONSE_201_METER_ID))
+ .get()
+ .getRate()
+ > 0.01
+ && listener.getMeter(
+ listener.getMetricId(
+ RESPONSE_404_METER_ID))
+ .get()
+ .getRate()
+ > 0.01
+ && listener.getMeter(
+ listener.getMetricId(
+ RESPONSE_2xx_METER_ID))
+ .get()
+ .getRate()
+ > 0.01
+ && listener.getMeter(
+ listener.getMetricId(
+ RESPONSE_4xx_METER_ID))
+ .get()
+ .getRate()
> 0.01;
});
}