[AMBARI-24797] Support regex based metric inclusion in KafkaTimelineMetricsReporter. (#8)

diff --git a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index f07d508..cf529ea 100644
--- a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -45,6 +45,7 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
@@ -80,6 +81,7 @@
   private static final String TIMELINE_DEFAULT_PROTOCOL = "http";
   private static final String EXCLUDED_METRICS_PROPERTY = "external.kafka.metrics.exclude.prefix";
   private static final String INCLUDED_METRICS_PROPERTY = "external.kafka.metrics.include.prefix";
+  private static final String INCLUDED_METRICS_REGEX_PROPERTY = "external.kafka.metrics.include.regex";
 
   private volatile boolean initialized = false;
   private boolean running = false;
@@ -97,6 +99,7 @@
 
   private String[] excludedMetricsPrefixes;
   private String[] includedMetricsPrefixes;
+  private String[] includedMetricsRegex;
   // Local cache to avoid prefix matching everytime
   private Set<String> excludedMetrics = new HashSet<>();
   private boolean hostInMemoryAggregationEnabled;
@@ -214,6 +217,13 @@
           includedMetricsPrefixes = includedMetricsStr.trim().split(",");
         }
 
+        // Inclusion override
+        String includedMetricsRegexStr = props.getString(INCLUDED_METRICS_REGEX_PROPERTY, "");
+        if (!StringUtils.isEmpty(includedMetricsRegexStr.trim())) {
+          LOG.info("Including metrics which match the following regex patterns : " + includedMetricsRegexStr);
+          includedMetricsRegex = includedMetricsRegexStr.trim().split(",");
+        }
+
         initializeReporter();
         if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) {
           startReporter(metricsConfig.pollingIntervalSecs());
@@ -273,7 +283,7 @@
         ", include: " + StringUtils.startsWithAny(metricName, includedMetricsPrefixes));
     }
     if (StringUtils.startsWithAny(metricName, excludedMetricsPrefixes)) {
-      if (!StringUtils.startsWithAny(metricName, includedMetricsPrefixes)) {
+      if (!(StringUtils.startsWithAny(metricName, includedMetricsPrefixes) || Arrays.stream(includedMetricsRegex).anyMatch(metricName::matches))) {
         excludedMetrics.add(metricName);
         return true;
       }
diff --git a/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java b/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
index b05190c..cc56227 100644
--- a/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
+++ b/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
@@ -84,6 +84,7 @@
     properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true");
     properties.setProperty("external.kafka.metrics.exclude.prefix", "a.b.c");
     properties.setProperty("external.kafka.metrics.include.prefix", "a.b.c.d");
+    properties.setProperty("external.kafka.metrics.include.regex", "a.b.c.*.f");
     properties.setProperty("kafka.timeline.metrics.instanceId", "cluster");
     properties.setProperty("kafka.timeline.metrics.set.instanceId", "false");
     props = new VerifiableProperties(properties);
@@ -118,6 +119,7 @@
     properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true");
     properties.setProperty("external.kafka.metrics.exclude.prefix", "a.b.c");
     properties.setProperty("external.kafka.metrics.include.prefix", "a.b.c.d");
+    properties.setProperty("external.kafka.metrics.include.regex", "a.b.c.*.f");
     properties.setProperty("kafka.timeline.metrics.protocol", "https");
     properties.setProperty("kafka.timeline.metrics.truststore.path", "");
     properties.setProperty("kafka.timeline.metrics.truststore.type", "");
@@ -143,6 +145,7 @@
     Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b"));
     Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b.c.d"));
     Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b.c.d.e"));
+    Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b.c.e.f"));
 
     kafkaTimelineMetricsReporter.stopReporter();
     verifyAll();