blob: 8d3c77ba865e196622c997c62707ef95049fafc6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.operator.metrics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.View;
import org.apache.flink.runtime.metrics.ViewUpdater;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/** Utility class for metrics testing. */
public class TestingMetricListener {
public static final String DELIMITER = ".";
private static final String NAMESPACE = "test-op-ns";
private static final String NAME = "test-op-name";
private static final String HOST = "test-op-host";
private final KubernetesOperatorMetricGroup metricGroup;
private final Map<String, Metric> metrics = new HashMap();
private final ScheduledExecutorService executor;
private Configuration configuration;
private ViewUpdater viewUpdater;
public TestingMetricListener(Configuration configuration) {
this.executor =
Executors.newSingleThreadScheduledExecutor(
new ExecutorThreadFactory("Flink-TestingMetricRegistry"));
TestingMetricRegistry registry =
TestingMetricRegistry.builder()
.setDelimiter(DELIMITER.charAt(0))
.setRegisterConsumer(
(metric, name, group) -> {
this.metrics.put(group.getMetricIdentifier(name), metric);
if (metric instanceof View) {
if (viewUpdater == null) {
viewUpdater = new ViewUpdater(executor);
}
viewUpdater.notifyOfAddedView((View) metric);
}
})
.build();
this.metricGroup =
KubernetesOperatorMetricGroup.create(
registry, configuration, NAMESPACE, NAME, HOST);
this.configuration = configuration;
}
public KubernetesOperatorMetricGroup getMetricGroup() {
return this.metricGroup;
}
public Optional<Counter> getCounter(String identifier) {
return this.getMetric(identifier);
}
public Optional<Histogram> getHistogram(String identifier) {
return this.getMetric(identifier);
}
public Optional<Meter> getMeter(String identifier) {
return this.getMetric(identifier);
}
public <T> Optional<Gauge<T>> getGauge(String identifier) {
return Optional.ofNullable((Gauge<T>) this.metrics.get(identifier));
}
private <T extends Metric> Optional<T> getMetric(String identifier) {
return Optional.ofNullable((T) this.metrics.get(identifier));
}
public String getMetricId(String... identifiers) {
return metricGroup.getMetricIdentifier(String.join(DELIMITER, identifiers));
}
public String getNamespaceMetricId(
Class<? extends AbstractFlinkResource<?, ?>> resourceClass,
String resourceNs,
String... identifiers) {
return metricGroup
.createResourceNamespaceGroup(configuration, resourceClass, resourceNs)
.getMetricIdentifier(String.join(DELIMITER, identifiers));
}
public String getResourceMetricId(
Class<? extends AbstractFlinkResource<?, ?>> resourceClass,
String resourceNs,
String resourceName,
String... identifiers) {
return metricGroup
.createResourceNamespaceGroup(configuration, resourceClass, resourceNs)
.createResourceNamespaceGroup(configuration, resourceName)
.getMetricIdentifier(String.join(DELIMITER, identifiers));
}
public int size() {
return metrics.size();
}
}