[AMBARI-24797] Support regex based metric inclusion in KafkaTimelineMetricsReporter. (#8)
diff --git a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index f07d508..cf529ea 100644
--- a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -45,6 +45,7 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@@ -80,6 +81,7 @@
private static final String TIMELINE_DEFAULT_PROTOCOL = "http";
private static final String EXCLUDED_METRICS_PROPERTY = "external.kafka.metrics.exclude.prefix";
private static final String INCLUDED_METRICS_PROPERTY = "external.kafka.metrics.include.prefix";
+ private static final String INCLUDED_METRICS_REGEX_PROPERTY = "external.kafka.metrics.include.regex";
private volatile boolean initialized = false;
private boolean running = false;
@@ -97,6 +99,7 @@
private String[] excludedMetricsPrefixes;
private String[] includedMetricsPrefixes;
+ private String[] includedMetricsRegex;
// Local cache to avoid prefix matching everytime
private Set<String> excludedMetrics = new HashSet<>();
private boolean hostInMemoryAggregationEnabled;
@@ -214,6 +217,13 @@
includedMetricsPrefixes = includedMetricsStr.trim().split(",");
}
+ // Inclusion override
+ String includedMetricsRegexStr = props.getString(INCLUDED_METRICS_REGEX_PROPERTY, "");
+ if (!StringUtils.isEmpty(includedMetricsRegexStr.trim())) {
+ LOG.info("Including metrics which match the following regex patterns : " + includedMetricsRegexStr);
+ includedMetricsRegex = includedMetricsRegexStr.trim().split(",");
+ }
+
initializeReporter();
if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) {
startReporter(metricsConfig.pollingIntervalSecs());
@@ -273,7 +283,7 @@
", include: " + StringUtils.startsWithAny(metricName, includedMetricsPrefixes));
}
if (StringUtils.startsWithAny(metricName, excludedMetricsPrefixes)) {
- if (!StringUtils.startsWithAny(metricName, includedMetricsPrefixes)) {
+ if (!(StringUtils.startsWithAny(metricName, includedMetricsPrefixes) || Arrays.stream(includedMetricsRegex).anyMatch(metricName::matches))) {
excludedMetrics.add(metricName);
return true;
}
diff --git a/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java b/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
index b05190c..cc56227 100644
--- a/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
+++ b/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
@@ -84,6 +84,7 @@
properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true");
properties.setProperty("external.kafka.metrics.exclude.prefix", "a.b.c");
properties.setProperty("external.kafka.metrics.include.prefix", "a.b.c.d");
+ properties.setProperty("external.kafka.metrics.include.regex", "a.b.c.*.f");
properties.setProperty("kafka.timeline.metrics.instanceId", "cluster");
properties.setProperty("kafka.timeline.metrics.set.instanceId", "false");
props = new VerifiableProperties(properties);
@@ -118,6 +119,7 @@
properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true");
properties.setProperty("external.kafka.metrics.exclude.prefix", "a.b.c");
properties.setProperty("external.kafka.metrics.include.prefix", "a.b.c.d");
+ properties.setProperty("external.kafka.metrics.include.regex", "a.b.c.*.f");
properties.setProperty("kafka.timeline.metrics.protocol", "https");
properties.setProperty("kafka.timeline.metrics.truststore.path", "");
properties.setProperty("kafka.timeline.metrics.truststore.type", "");
@@ -143,6 +145,7 @@
Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b"));
Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b.c.d"));
Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b.c.d.e"));
+ Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b.c.e.f"));
kafkaTimelineMetricsReporter.stopReporter();
verifyAll();