AMBARI-22348 : Metric Definition Service V1 Implementation. (avijayan)
diff --git a/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics-anomaly-detection-service/pom.xml
index e96e957..44bdc1f 100644
--- a/ambari-metrics-anomaly-detection-service/pom.xml
+++ b/ambari-metrics-anomaly-detection-service/pom.xml
@@ -446,5 +446,11 @@
<artifactId>metrics-core</artifactId>
<version>3.2.5</version>
</dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <version>2.5</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java b/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java
index 7f0aed3..0a22e50 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java
+++ b/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java
@@ -18,10 +18,7 @@
package org.apache.ambari.metrics.adservice.prototype.common;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
public class StatisticUtils {
diff --git a/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java b/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java
index e8257e5..addeda7 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java
+++ b/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java
@@ -35,10 +35,15 @@
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
-import java.util.*;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
diff --git a/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java b/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java
index 0a2271a..f379605 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java
+++ b/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java
@@ -17,8 +17,8 @@
*/
package org.apache.ambari.metrics.adservice.prototype.core;
-import org.apache.ambari.metrics.adservice.prototype.common.ResultSet;
import org.apache.ambari.metrics.adservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.adservice.prototype.common.ResultSet;
import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaModel;
import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique;
import org.apache.commons.logging.Log;
diff --git a/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java b/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java
index f5ec83a..80212b3 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java
+++ b/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java
@@ -17,9 +17,9 @@
*/
package org.apache.ambari.metrics.adservice.prototype.core;
+import org.apache.ambari.metrics.adservice.prototype.common.DataSeries;
import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly;
import org.apache.ambari.metrics.adservice.prototype.methods.hsdev.HsdevTechnique;
-import org.apache.ambari.metrics.adservice.prototype.common.DataSeries;
import org.apache.ambari.metrics.adservice.prototype.methods.kstest.KSTechnique;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
diff --git a/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java b/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java
index 251603b..60ff11c 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java
+++ b/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java
@@ -18,8 +18,6 @@
package org.apache.ambari.metrics.adservice.prototype.methods;
import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
public class MetricAnomaly implements Serializable{
diff --git a/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java b/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java
index 6facc99..855cc70 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java
+++ b/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java
@@ -21,14 +21,15 @@
import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import static org.apache.ambari.metrics.adservice.prototype.common.StatisticUtils.median;
-import static org.apache.ambari.metrics.adservice.prototype.common.StatisticUtils.sdev;
import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.ambari.metrics.adservice.prototype.common.StatisticUtils.median;
+import static org.apache.ambari.metrics.adservice.prototype.common.StatisticUtils.sdev;
+
public class HsdevTechnique implements Serializable {
private Map<String, Double> hsdevMap;
diff --git a/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java b/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java
index 4727c6f..0dc679e 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java
+++ b/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java
@@ -20,8 +20,8 @@
import org.apache.ambari.metrics.adservice.prototype.common.DataSeries;
import org.apache.ambari.metrics.adservice.prototype.common.ResultSet;
-import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly;
import org.apache.ambari.metrics.adservice.prototype.core.RFunctionInvoker;
+import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
diff --git a/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java b/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java
index d079e66..10b3a71 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java
+++ b/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java
@@ -18,23 +18,6 @@
package org.apache.ambari.metrics.adservice.prototype.testing.utilities;
-import org.apache.ambari.metrics.adservice.prototype.common.DataSeries;
-import org.apache.ambari.metrics.adservice.prototype.common.ResultSet;
-import org.apache.ambari.metrics.adservice.prototype.core.MetricsCollectorInterface;
-import org.apache.ambari.metrics.adservice.prototype.core.RFunctionInvoker;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
/**
* Class which was originally used to send test series from AMS to Spark through Kafka.
*/
diff --git a/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml b/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
index bd88d57..6953745 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
+++ b/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
@@ -1,3 +1,15 @@
+#Licensed under the Apache License, Version 2.0 (the "License");
+#you may not use this file except in compliance with the License.
+#You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+
server:
applicationConnectors:
- type: http
@@ -7,3 +19,18 @@
logging:
type: external
+
+metricManagerService:
+ inputDefinitionDirectory: /etc/adservice/conf/input-definitions-directory
+
+metricsCollector:
+ hostPortList: host1:6188,host2:6188
+ metadataEndpoint: /v1/timeline/metrics/metadata/keys
+
+adQueryService:
+ anomalyDataTtl: 604800
+
+#subsystemService:
+# spark:
+# pointInTime:
+# trend:
\ No newline at end of file
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
index b7f217e..8b3a829 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
@@ -28,6 +28,7 @@
import com.fasterxml.jackson.datatype.joda.JodaModule
import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider
import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
import io.dropwizard.Application
import io.dropwizard.setup.Environment
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
index 9e6cc6d..be8d027 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
@@ -1,7 +1,3 @@
-package org.apache.ambari.metrics.adservice.app
-
-import io.dropwizard.Configuration
-
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,6 +15,55 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+package org.apache.ambari.metrics.adservice.app
+
+import javax.validation.Valid
+
+import org.apache.ambari.metrics.adservice.configuration.{AdServiceConfiguration, HBaseConfiguration, MetricCollectorConfiguration, MetricManagerServiceConfiguration}
+
+import com.fasterxml.jackson.annotation.JsonProperty
+
+import io.dropwizard.Configuration
+
+/**
+ * Top Level AD System Manager config items.
+ */
class AnomalyDetectionAppConfig extends Configuration {
+ /*
+ Metric Definition Service configuration
+ */
+ @Valid
+ private val metricManagerServiceConfiguration = new MetricManagerServiceConfiguration
+
+ @Valid
+ private val metricCollectorConfiguration = new MetricCollectorConfiguration
+
+ /*
+ Anomaly Service configuration
+ */
+ @Valid
+ private val adServiceConfiguration = new AdServiceConfiguration
+
+ /*
+ HBase Conf
+ */
+ def getHBaseConf : org.apache.hadoop.conf.Configuration = {
+ HBaseConfiguration.getHBaseConf
+ }
+
+ @JsonProperty("metricManagerService")
+ def getMetricManagerServiceConfiguration: MetricManagerServiceConfiguration = {
+ metricManagerServiceConfiguration
+ }
+
+ @JsonProperty("adQueryService")
+ def getAdServiceConfiguration: AdServiceConfiguration = {
+ adServiceConfiguration
+ }
+
+ @JsonProperty("metricsCollector")
+ def getMetricCollectorConfiguration: MetricCollectorConfiguration = metricCollectorConfiguration
+
}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
index 338c97b..7425a7e 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
@@ -23,6 +23,7 @@
import com.codahale.metrics.health.HealthCheck
import com.google.inject.AbstractModule
import com.google.inject.multibindings.Multibinder
+
import io.dropwizard.setup.Environment
class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environment) extends AbstractModule {
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/ADServiceConfiguration.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/ADServiceConfiguration.scala
deleted file mode 100644
index 248c74e..0000000
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/ADServiceConfiguration.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.adservice.common
-
-import java.net.{MalformedURLException, URISyntaxException}
-
-import org.apache.hadoop.conf.Configuration
-
-object ADServiceConfiguration {
-
- private val AMS_AD_SITE_CONFIGURATION_FILE = "ams-ad-site.xml"
- private val HBASE_SITE_CONFIGURATION_FILE = "hbase-site.xml"
-
- val ANOMALY_METRICS_TTL = "timeline.metrics.anomaly.data.ttl"
-
- private var hbaseConf: org.apache.hadoop.conf.Configuration = _
- private var adConf: org.apache.hadoop.conf.Configuration = _
-
- def initConfigs(): Unit = {
-
- var classLoader: ClassLoader = Thread.currentThread.getContextClassLoader
- if (classLoader == null) classLoader = getClass.getClassLoader
-
- try {
- val hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE)
- if (hbaseResUrl == null) throw new IllegalStateException("Unable to initialize the AD subsystem. No hbase-site present in the classpath.")
-
- hbaseConf = new Configuration(true)
- hbaseConf.addResource(hbaseResUrl.toURI.toURL)
-
- val adSystemConfigUrl = classLoader.getResource(AMS_AD_SITE_CONFIGURATION_FILE)
- if (adSystemConfigUrl == null) throw new IllegalStateException("Unable to initialize the AD subsystem. No ams-ad-site present in the classpath")
-
- adConf = new Configuration(true)
- adConf.addResource(adSystemConfigUrl.toURI.toURL)
-
- } catch {
- case me : MalformedURLException => println("MalformedURLException")
- case ue : URISyntaxException => println("URISyntaxException")
- }
- }
-
- def getHBaseConf: org.apache.hadoop.conf.Configuration = {
- hbaseConf
- }
-
- def getAdConf: org.apache.hadoop.conf.Configuration = {
- adConf
- }
-
- def getAnomalyDataTtl: Int = {
- if (adConf != null) return adConf.get(ANOMALY_METRICS_TTL, "604800").toInt
- 604800
- }
-
- /**
- * ttl
- *
- */
-}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/Range.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/Range.scala
new file mode 100644
index 0000000..003c18f
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/Range.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.common
+
+/**
+ * Class to capture a Range in a Season.
+ * For example Monday - Wednesday is a 'Range' in a DAY Season.
+ * @param lower lower end
+ * @param higher higher end
+ */
+case class Range (lower: Int, higher: Int) {
+
+ def withinRange(value: Int) : Boolean = {
+ if (lower <= higher) {
+ (value >= lower) && (value <= higher)
+ } else {
+ !(value > higher) && (value < lower)
+ }
+ }
+
+ @Override
+ override def equals(obj: scala.Any): Boolean = {
+ if (obj == null) {
+ return false
+ }
+ val that : Range = obj.asInstanceOf[Range]
+ (lower == that.lower) && (higher == that.higher)
+ }
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/Season.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/Season.scala
new file mode 100644
index 0000000..aba2587
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/Season.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.common
+
+import java.time.DayOfWeek
+import java.util.Calendar
+
+import javax.xml.bind.annotation.XmlRootElement
+
+import org.apache.ambari.metrics.adservice.common.SeasonType.SeasonType
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+
+/**
+ * Class to capture a 'Season' for a metric anomaly.
+ * A Season is a combination of DAY Range and HOUR Range.
+ * @param DAY Day Range
+ * @param HOUR Hour Range
+ */
+@XmlRootElement
+case class Season(var DAY: Range, var HOUR: Range) {
+
+ def belongsTo(timestamp : Long) : Boolean = {
+ val c = Calendar.getInstance
+ c.setTimeInMillis(timestamp)
+ val dayOfWeek = c.get(Calendar.DAY_OF_WEEK)
+ val hourOfDay = c.get(Calendar.HOUR_OF_DAY)
+
+ if (DAY.lower != -1 && !DAY.withinRange(dayOfWeek))
+ return false
+ if (HOUR.lower != -1 && !HOUR.withinRange(hourOfDay))
+ return false
+ true
+ }
+
+ @Override
+ override def equals(obj: scala.Any): Boolean = {
+
+ if (obj == null) {
+ return false
+ }
+
+ val that : Season = obj.asInstanceOf[Season]
+ DAY.equals(that.DAY) && HOUR.equals(that.HOUR)
+ }
+
+ @Override
+ override def toString: String = {
+
+ var prettyPrintString = ""
+
+ var dLower: Int = DAY.lower - 1
+ if (dLower == 0) {
+ dLower = 7
+ }
+
+ var dHigher: Int = DAY.higher - 1
+ if (dHigher == 0) {
+ dHigher = 7
+ }
+
+ if (DAY != null) {
+ prettyPrintString = prettyPrintString.concat("DAY : [" + DayOfWeek.of(dLower) + "," + DayOfWeek.of(dHigher)) + "]"
+ }
+
+ if (HOUR != null) {
+ prettyPrintString = prettyPrintString.concat(" HOUR : [" + HOUR.lower + "," + HOUR.higher) + "]"
+ }
+ prettyPrintString
+ }
+}
+
+object Season {
+
+ def apply(DAY: Range, HOUR: Range): Season = new Season(DAY, HOUR)
+
+ def apply(range: Range, seasonType: SeasonType): Season = {
+ if (seasonType.equals(SeasonType.DAY)) {
+ new Season(range, Range(-1,-1))
+ } else {
+ new Season(Range(-1,-1), range)
+ }
+ }
+
+ val mapper = new ObjectMapper() with ScalaObjectMapper
+ mapper.registerModule(DefaultScalaModule)
+
+ def getSeasons(timestamp: Long, seasons : List[Season]) : List[Season] = {
+ val validSeasons : scala.collection.mutable.MutableList[Season] = scala.collection.mutable.MutableList.empty[Season]
+ for ( season <- seasons ) {
+ if (season.belongsTo(timestamp)) {
+ validSeasons += season
+ }
+ }
+ validSeasons.toList
+ }
+
+ def serialize(season: Season) : String = {
+ mapper.writeValueAsString(season)
+ }
+
+ def deserialize(seasonString: String) : Season = {
+ mapper.readValue[Season](seasonString)
+ }
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/SeasonType.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/SeasonType.scala
new file mode 100644
index 0000000..067972c
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/SeasonType.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.common
+
+object SeasonType extends Enumeration{
+
+ type SeasonType = Value
+ val DAY,HOUR = Value
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/TimeRange.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/TimeRange.scala
new file mode 100644
index 0000000..50df658
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/TimeRange.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.common
+
+import java.util.Date
+
+/**
+ * A special form of a 'Range' class to denote Time range.
+ */
+case class TimeRange (startTime: Long, endTime: Long) {
+ @Override
+ override def toString: String = {
+ "StartTime=" + new Date(startTime) + ", EndTime=" + new Date(endTime)
+ }
+
+ @Override
+ override def equals(obj: scala.Any): Boolean = {
+ if (obj == null) {
+ return false
+ }
+ val that : TimeRange = obj.asInstanceOf[TimeRange]
+ (startTime == that.startTime) && (endTime == that.endTime)
+ }
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/AdServiceConfiguration.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/AdServiceConfiguration.scala
new file mode 100644
index 0000000..11e9f28
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/AdServiceConfiguration.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.configuration
+
+import javax.validation.constraints.NotNull
+
+import com.fasterxml.jackson.annotation.JsonProperty
+
+/**
+ * Class to get Anomaly Service specific configuration.
+ */
+class AdServiceConfiguration {
+
+ @NotNull
+ var anomalyDataTtl: Long = _
+
+ @JsonProperty
+ def getAnomalyDataTtl: Long = anomalyDataTtl
+
+ @JsonProperty
+ def setAnomalyDataTtl(anomalyDataTtl: Long): Unit = {
+ this.anomalyDataTtl = anomalyDataTtl
+ }
+
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala
new file mode 100644
index 0000000..a7bbc66
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.configuration
+
+import java.net.{MalformedURLException, URISyntaxException}
+
+import org.apache.hadoop.conf.Configuration
+
+object HBaseConfiguration {
+
+ val HBASE_SITE_CONFIGURATION_FILE: String = "hbase-site.xml"
+ val hbaseConf: org.apache.hadoop.conf.Configuration = new Configuration(true)
+ var isInitialized: Boolean = false
+
+ def initConfigs(): Unit = {
+ if (!isInitialized) {
+ var classLoader: ClassLoader = Thread.currentThread.getContextClassLoader
+ if (classLoader == null) classLoader = getClass.getClassLoader
+
+ try {
+ val hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE)
+ if (hbaseResUrl == null) throw new IllegalStateException("Unable to initialize the AD subsystem. No hbase-site present in the classpath.")
+
+ hbaseConf.addResource(hbaseResUrl.toURI.toURL)
+ isInitialized = true
+
+ } catch {
+ case me : MalformedURLException => println("MalformedURLException")
+ case ue : URISyntaxException => println("URISyntaxException")
+ }
+ }
+ }
+
+ def getHBaseConf: org.apache.hadoop.conf.Configuration = {
+ if (!isInitialized) {
+ initConfigs()
+ }
+ hbaseConf
+ }
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricCollectorConfiguration.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricCollectorConfiguration.scala
new file mode 100644
index 0000000..50a0b72
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricCollectorConfiguration.scala
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.configuration
+
+import javax.validation.constraints.NotNull
+
+import com.fasterxml.jackson.annotation.JsonProperty
+
+/**
+ * Class to capture the Metrics Collector related configuration.
+ */
+class MetricCollectorConfiguration {
+
+ @NotNull
+ private var hostPortList: String = _
+
+ @NotNull
+ private var metadataEndpoint: String = _
+
+ @JsonProperty
+ def getHostPortList: String = hostPortList
+
+ @JsonProperty
+ def getMetadataEndpoint: String = metadataEndpoint
+
+ @JsonProperty
+ def setHostPortList(hostPortList: String): Unit = {
+ this.hostPortList = hostPortList
+ }
+
+ @JsonProperty
+ def setMetadataEndpoint(metadataEndpoint: String): Unit = {
+ this.metadataEndpoint = metadataEndpoint
+ }
+
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricManagerServiceConfiguration.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricManagerServiceConfiguration.scala
new file mode 100644
index 0000000..e5960d5
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricManagerServiceConfiguration.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.configuration
+
+import javax.validation.constraints.NotNull
+
+import com.fasterxml.jackson.annotation.JsonProperty
+
+/**
+ * Class to capture the Metric Definition Service configuration.
+ */
+class MetricManagerServiceConfiguration {
+
+ @NotNull
+ private val inputDefinitionDirectory: String = ""
+
+ @JsonProperty
+ def getInputDefinitionDirectory: String = inputDefinitionDirectory
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessor.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessor.scala
new file mode 100644
index 0000000..bcdb416
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessor.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.db
+
+import org.apache.ambari.metrics.adservice.metadata.MetricSourceDefinition
+
+/**
+ * Trait used to talk to the AD Metadata Store.
+ */
+trait AdMetadataStoreAccessor {
+
+ /**
+ * Return all saved component definitions from DB.
+ * @return
+ */
+ def getSavedInputDefinitions: List[MetricSourceDefinition]
+
+ /**
+ * Save a set of component definitions
+ * @param metricSourceDefinitions Set of component definitions
+ * @return Success / Failure
+ */
+ def saveInputDefinitions(metricSourceDefinitions: List[MetricSourceDefinition]) : Boolean
+
+ /**
+ * Save a component definition
+ * @param metricSourceDefinition component definition
+ * @return Success / Failure
+ */
+ def saveInputDefinition(metricSourceDefinition: MetricSourceDefinition) : Boolean
+
+ /**
+ * Delete a component definition
+ * @param definitionName component definition
+ * @return
+ */
+ def removeInputDefinition(definitionName: String) : Boolean
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreConstants.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreConstants.scala
new file mode 100644
index 0000000..3d273a3
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreConstants.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.db
+
+object AdMetadataStoreConstants {
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ /* Table Name constants */
+ //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ val METRIC_PROFILE_TABLE_NAME = "METRIC_DEFINITION"
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ /* CREATE statement constants */
+ //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ val CREATE_METRIC_DEFINITION_TABLE: String = "CREATE TABLE IF NOT EXISTS %s (" +
+ "DEFINITION_NAME VARCHAR, " +
+ "DEFINITION_JSON VARCHAR, " +
+ "DEFINITION_SOURCE NUMBER, " +
+ "CREATED_TIME TIMESTAMP, " +
+ "UPDATED_TIME TIMESTAMP " +
+ "CONSTRAINT pk PRIMARY KEY (DEFINITION_NAME))"
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
index 6f33e56..1191e90 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
@@ -17,23 +17,36 @@
package org.apache.ambari.metrics.adservice.db
-import java.sql.{Connection, SQLException}
+import java.sql.{Connection, PreparedStatement, ResultSet, SQLException}
+import java.util.concurrent.TimeUnit.SECONDS
-import org.apache.ambari.metrics.adservice.common.{ADServiceConfiguration, PhoenixQueryConstants}
+import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
+import org.apache.ambari.metrics.adservice.common._
+import org.apache.ambari.metrics.adservice.configuration.HBaseConfiguration
+import org.apache.ambari.metrics.adservice.metadata.MetricKey
+import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod
+import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
+import org.apache.ambari.metrics.adservice.model.{AnomalyDetectionMethod, AnomalyType, SingleMetricAnomalyInstance}
+import org.apache.ambari.metrics.adservice.subsystem.pointintime.PointInTimeAnomalyInstance
+import org.apache.ambari.metrics.adservice.subsystem.trend.TrendAnomalyInstance
import org.apache.hadoop.hbase.util.RetryCounterFactory
import org.apache.hadoop.metrics2.sink.timeline.query.{DefaultPhoenixDataSource, PhoenixConnectionProvider}
-import java.util.concurrent.TimeUnit.SECONDS
+
+import com.google.inject.Inject
object PhoenixAnomalyStoreAccessor {
- private var datasource: PhoenixConnectionProvider = _
+ @Inject
+ var configuration: AnomalyDetectionAppConfig = _
+
+ var datasource: PhoenixConnectionProvider = _
def initAnomalyMetricSchema(): Unit = {
- val datasource: PhoenixConnectionProvider = new DefaultPhoenixDataSource(ADServiceConfiguration.getHBaseConf)
+ val datasource: PhoenixConnectionProvider = new DefaultPhoenixDataSource(HBaseConfiguration.getHBaseConf)
val retryCounterFactory = new RetryCounterFactory(10, SECONDS.toMillis(3).toInt)
- val ttl = ADServiceConfiguration.getAnomalyDataTtl
+ val ttl = configuration.getAdServiceConfiguration.getAnomalyDataTtl
try {
var conn = datasource.getConnectionRetryingOnException(retryCounterFactory)
var stmt = conn.createStatement
@@ -64,4 +77,89 @@
@throws[SQLException]
def getConnection: Connection = datasource.getConnection
+
+ def getSingleMetricAnomalies(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int) : scala.collection.mutable.MutableList[SingleMetricAnomalyInstance] = {
+ val anomalies = scala.collection.mutable.MutableList.empty[SingleMetricAnomalyInstance]
+ val conn : Connection = getConnection
+ var stmt : PreparedStatement = null
+ var rs : ResultSet = null
+ val s : Season = Season(Range(-1,-1), SeasonType.DAY)
+
+ try {
+ stmt = prepareAnomalyMetricsGetSqlStatement(conn, anomalyType, startTime, endTime, limit)
+ rs = stmt.executeQuery
+ if (anomalyType.equals(AnomalyType.POINT_IN_TIME)) {
+ while (rs.next()) {
+ val uuid: Array[Byte] = rs.getBytes("METRIC_UUID")
+ val timestamp: Long = rs.getLong("ANOMALY_TIMESTAMP")
+ val metricValue: Double = rs.getDouble("METRIC_VALUE")
+ val methodType: AnomalyDetectionMethod = AnomalyDetectionMethod.withName(rs.getString("METHOD_NAME"))
+ val season: Season = Season.deserialize(rs.getString("SEASONAL_INFO"))
+ val anomalyScore: Double = rs.getDouble("ANOMALY_SCORE")
+ val modelSnapshot: String = rs.getString("MODEL_PARAMETERS")
+
+ val metricKey: MetricKey = null //MetricManager.getMetricKeyFromUuid(uuid)
+ val anomalyInstance: SingleMetricAnomalyInstance = new PointInTimeAnomalyInstance(metricKey, timestamp,
+ metricValue, methodType, anomalyScore, season, modelSnapshot)
+ anomalies.+=(anomalyInstance)
+ }
+ } else {
+ while (rs.next()) {
+ val uuid: Array[Byte] = rs.getBytes("METRIC_UUID")
+ val anomalyStart: Long = rs.getLong("ANOMALY_PERIOD_START")
+ val anomalyEnd: Long = rs.getLong("ANOMALY_PERIOD_END")
+ val referenceStart: Long = rs.getLong("TEST_PERIOD_START")
+ val referenceEnd: Long = rs.getLong("TEST_PERIOD_END")
+ val methodType: AnomalyDetectionMethod = AnomalyDetectionMethod.withName(rs.getString("METHOD_NAME"))
+ val season: Season = Season.deserialize(rs.getString("SEASONAL_INFO"))
+ val anomalyScore: Double = rs.getDouble("ANOMALY_SCORE")
+ val modelSnapshot: String = rs.getString("MODEL_PARAMETERS")
+
+ val metricKey: MetricKey = null //MetricManager.getMetricKeyFromUuid(uuid)
+ val anomalyInstance: SingleMetricAnomalyInstance = TrendAnomalyInstance(metricKey,
+ TimeRange(anomalyStart, anomalyEnd),
+ TimeRange(referenceStart, referenceEnd),
+ methodType, anomalyScore, season, modelSnapshot)
+ anomalies.+=(anomalyInstance)
+ }
+ }
+ } catch {
+ case e: SQLException => throw e
+ }
+
+ anomalies
+ }
+
+ @throws[SQLException]
+ def prepareAnomalyMetricsGetSqlStatement(connection: Connection, anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): PreparedStatement = {
+
+ val sb = new StringBuilder
+
+ if (anomalyType.equals(AnomalyType.POINT_IN_TIME)) {
+ sb.++=(String.format(PhoenixQueryConstants.GET_PIT_ANOMALY_METRIC_SQL, PhoenixQueryConstants.PIT_ANOMALY_METRICS_TABLE_NAME))
+ } else {
+ sb.++=(String.format(PhoenixQueryConstants.GET_TREND_ANOMALY_METRIC_SQL, PhoenixQueryConstants.TREND_ANOMALY_METRICS_TABLE_NAME))
+ }
+
+ sb.append(" LIMIT " + limit)
+ var stmt: java.sql.PreparedStatement = null
+ try {
+ stmt = connection.prepareStatement(sb.toString)
+ var pos = 1
+
+ pos += 1
+ stmt.setLong(pos, startTime)
+
+ stmt.setLong(pos, endTime)
+
+ stmt.setFetchSize(limit)
+
+ } catch {
+ case e: SQLException =>
+ if (stmt != null)
+ stmt
+ throw e
+ }
+ stmt
+ }
}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/PhoenixQueryConstants.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala
similarity index 85%
rename from ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/PhoenixQueryConstants.scala
rename to ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala
index 17173ec..5379c91 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/PhoenixQueryConstants.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ambari.metrics.adservice.common
+package org.apache.ambari.metrics.adservice.db
object PhoenixQueryConstants {
@@ -33,8 +33,6 @@
/* CREATE statement constants */
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- val CREATE_METRIC_PROFILE_TABLE = ""
-
val CREATE_METHOD_PARAMETERS_TABLE: String = "CREATE TABLE IF NOT EXISTS %s (" +
"METHOD_NAME VARCHAR, " +
"METHOD_TYPE VARCHAR, " +
@@ -49,7 +47,7 @@
"METRIC_VALUE DOUBLE, " +
"SEASONAL_INFO VARCHAR, " +
"ANOMALY_SCORE DOUBLE, " +
- "MODEL_SNAPSHOT VARCHAR, " +
+ "MODEL_PARAMETERS VARCHAR, " +
"DETECTION_TIME UNSIGNED_LONG " +
"CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, ANOMALY_TIMESTAMP)) " +
"DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'"
@@ -61,8 +59,9 @@
"TEST_PERIOD_START UNSIGNED_LONG NOT NULL, " +
"TEST_PERIOD_END UNSIGNED_LONG NOT NULL, " +
"METHOD_NAME VARCHAR, " +
+ "SEASONAL_INFO VARCHAR, " +
"ANOMALY_SCORE DOUBLE, " +
- "MODEL_SNAPSHOT VARCHAR, " +
+ "MODEL_PARAMETERS VARCHAR, " +
"DETECTION_TIME UNSIGNED_LONG " +
"CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, TEST_PERIOD_START, TEST_PERIOD_END)) " +
"DATA_BLOCK_ENCODING='FAST_DIFF' IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'"
@@ -83,10 +82,10 @@
val UPSERT_METHOD_PARAMETERS_SQL: String = "UPSERT INTO %s (METHOD_NAME, METHOD_TYPE, PARAMETERS) VALUES (?,?,?)"
val UPSERT_PIT_ANOMALY_METRICS_SQL: String = "UPSERT INTO %s (METRIC_UUID, ANOMALY_TIMESTAMP, METRIC_VALUE, METHOD_NAME, " +
- "SEASONAL_INFO, ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
+ "SEASONAL_INFO, ANOMALY_SCORE, MODEL_PARAMETERS, DETECTION_TIME) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
val UPSERT_TREND_ANOMALY_METRICS_SQL: String = "UPSERT INTO %s (METRIC_UUID, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, " +
- "TEST_PERIOD_START, TEST_PERIOD_END, METHOD_NAME, ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
+ "TEST_PERIOD_START, TEST_PERIOD_END, METHOD_NAME, ANOMALY_SCORE, MODEL_PARAMETERS, DETECTION_TIME) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
val UPSERT_MODEL_SNAPSHOT_SQL: String = "UPSERT INTO %s (METRIC_UUID, METHOD_NAME, METHOD_TYPE, PARAMETERS) VALUES (?, ?, ?, ?)"
@@ -94,15 +93,15 @@
/* GET statement constants */
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- val GET_METHOD_PAREMETERS_SQL: String = "SELECT METHOD_NAME, METHOD_TYPE, PARAMETERS FROM %s WHERE METHOD_NAME = %s"
+ val GET_METHOD_PARAMETERS_SQL: String = "SELECT METHOD_NAME, METHOD_TYPE, PARAMETERS FROM %s WHERE METHOD_NAME = %s"
val GET_PIT_ANOMALY_METRIC_SQL: String = "SELECT METRIC_UUID, ANOMALY_TIMESTAMP, METRIC_VALUE, METHOD_NAME, SEASONAL_INFO, " +
- "ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME FROM %s WHERE METRIC_METRIC_UUID = ? AND ANOMALY_TIMESTAMP > ? AND ANOMALY_TIMESTAMP <= ? " +
+ "ANOMALY_SCORE, MODEL_PARAMETERS, DETECTION_TIME FROM %s WHERE ANOMALY_TIMESTAMP > ? AND ANOMALY_TIMESTAMP <= ? " +
"ORDER BY ANOMALY_SCORE DESC"
- val GET_TREND_ANOMALY_METRIC_SQL: String = "SELECT METRIC_METRIC_UUID, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, TEST_PERIOD_START, " +
- "ANOMALY_PERIOD_START, METHOD_NAME, ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME FROM %s WHERE METHOD = ? AND ANOMALY_PERIOD_END > ? " +
- "AND TEST_END_TIME <= ? ORDER BY ANOMALY_SCORE DESC"
+ val GET_TREND_ANOMALY_METRIC_SQL: String = "SELECT METRIC_UUID, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, TEST_PERIOD_START, " +
+ "TEST_PERIOD_END, METHOD_NAME, SEASONAL_INFO, ANOMALY_SCORE, MODEL_PARAMETERS, DETECTION_TIME FROM %s WHERE ANOMALY_PERIOD_END > ? " +
+ "AND ANOMALY_PERIOD_END <= ? ORDER BY ANOMALY_SCORE DESC"
val GET_MODEL_SNAPSHOT_SQL: String = "SELECT METRIC_UUID, METHOD_NAME, METHOD_TYPE, PARAMETERS FROM %s WHERE UUID = %s AND METHOD_NAME = %s"
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala
new file mode 100644
index 0000000..801c5f5
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.metadata
+
+import java.net.{HttpURLConnection, URL}
+
+import org.apache.ambari.metrics.adservice.configuration.MetricCollectorConfiguration
+import org.apache.commons.lang.StringUtils
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+
+/**
+ * Class to invoke Metrics Collector metadata API.
+ * TODO : Instantiate a sync thread that regularly updates the internal maps by reading off AMS metadata.
+ */
+class ADMetadataProvider extends MetricMetadataProvider {
+
+ var metricCollectorHostPorts: Array[String] = Array.empty[String]
+ var metricMetadataPath: String = "/v1/timeline/metrics/metadata/keys"
+
+ val connectTimeout: Int = 10000
+ val readTimeout: Int = 10000
+ //TODO: Add retries for metrics collector GET call.
+ //val retries: Long = 5
+
+ def this(configuration: MetricCollectorConfiguration) {
+ this
+ if (StringUtils.isNotEmpty(configuration.getHostPortList)) {
+ metricCollectorHostPorts = configuration.getHostPortList.split(",")
+ }
+ metricMetadataPath = configuration.getMetadataEndpoint
+ }
+
+ override def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): (Map[MetricDefinition,
+ Set[MetricKey]], Set[MetricKey]) = {
+
+ val keysMap = scala.collection.mutable.Map[MetricDefinition, Set[MetricKey]]()
+ val numDefinitions: Int = metricSourceDefinition.metricDefinitions.size
+ val metricKeySet: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey]
+
+ for (metricDef <- metricSourceDefinition.metricDefinitions) {
+ for (hostPort <- metricCollectorHostPorts) {
+ val metricKeys: Set[MetricKey] = getKeysFromMetricsCollector(hostPort + metricMetadataPath, metricDef)
+ if (metricKeys != null) {
+ keysMap += (metricDef -> metricKeys)
+ metricKeySet.++(metricKeys)
+ }
+ }
+ }
+ (keysMap.toMap, metricKeySet.toSet)
+ }
+
+ /**
+ * Make Metrics Collector REST API call to fetch keys.
+ *
+ * @param url
+ * @param metricDefinition
+ * @return
+ */
+ def getKeysFromMetricsCollector(url: String, metricDefinition: MetricDefinition): Set[MetricKey] = {
+
+ val mapper = new ObjectMapper() with ScalaObjectMapper
+ try {
+ val connection = new URL(url).openConnection.asInstanceOf[HttpURLConnection]
+ connection.setConnectTimeout(connectTimeout)
+ connection.setReadTimeout(readTimeout)
+ connection.setRequestMethod("GET")
+ val inputStream = connection.getInputStream
+ val content = scala.io.Source.fromInputStream(inputStream).mkString
+ if (inputStream != null) inputStream.close()
+ val metricKeySet: Set[MetricKey] = fromTimelineMetricKey(mapper.readValue[java.util.Set[TimelineMetricKey]](content))
+ return metricKeySet
+ } catch {
+ case _: java.io.IOException | _: java.net.SocketTimeoutException => // handle this
+ }
+ null
+ }
+
+ def fromTimelineMetricKey(timelineMetricKeys: java.util.Set[TimelineMetricKey]): Set[MetricKey] = {
+ val metricKeySet: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey]
+ val iter = timelineMetricKeys.iterator()
+ while (iter.hasNext) {
+ val timelineMetricKey: TimelineMetricKey = iter.next()
+ val metricKey: MetricKey = MetricKey(timelineMetricKey.metricName,
+ timelineMetricKey.appId,
+ timelineMetricKey.instanceId,
+ timelineMetricKey.hostName,
+ timelineMetricKey.uuid)
+
+ metricKeySet.add(metricKey)
+ }
+ metricKeySet.toSet
+ }
+
+}
\ No newline at end of file
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala
new file mode 100644
index 0000000..cc66c90
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.metadata
+
+import java.io.File
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+
+object InputMetricDefinitionParser {
+
+ def parseInputDefinitionsFromDirectory(directory: String): List[MetricSourceDefinition] = {
+
+ if (directory == null) {
+ return List.empty[MetricSourceDefinition]
+ }
+ val mapper = new ObjectMapper() with ScalaObjectMapper
+
+ def metricSourceDefinitions: List[MetricSourceDefinition] =
+ for {
+ file <- getFilesInDirectory(directory)
+ definition: MetricSourceDefinition = mapper.readValue[MetricSourceDefinition](file)
+ if definition != null
+ } yield definition
+
+ metricSourceDefinitions
+ }
+
+ private def getFilesInDirectory(directory: String): List[File] = {
+ val dir = new File(directory)
+ if (dir.exists && dir.isDirectory) {
+ dir.listFiles.filter(_.isFile).toList
+ } else {
+ List[File]()
+ }
+ }
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala
new file mode 100644
index 0000000..0a5e51f
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.metadata
+
+/*
+ {
+ "metric-name": "mem_free",
+ "metric-description" : "Free memory on a Host.",
+ "troubleshooting-info" : "Sudden drop / hike in free memory on a host.",
+ "static-threshold" : 10,
+ “app-id” : “HOST”
+}
+ */
+
+case class MetricDefinition (metricName: String,
+ appId: String,
+ hosts: List[String],
+ metricDescription: String,
+ troubleshootingInfo: String,
+ staticThreshold: Double) {
+
+ @Override
+ override def equals(obj: scala.Any): Boolean = {
+
+ if (obj == null || (getClass ne obj.getClass))
+ return false
+
+ val that = obj.asInstanceOf[MetricDefinition]
+
+ if (!(metricName == that.metricName))
+ return false
+
+ if (!(appId == that.appId))
+ return false
+
+ true
+ }
+}
+
+object MetricDefinition {
+
+ def apply(metricName: String,
+ appId: String,
+ hosts: List[String],
+ metricDescription: String,
+ troubleshootingInfo: String,
+ staticThreshold: Double): MetricDefinition =
+ new MetricDefinition(metricName, appId, hosts, metricDescription, troubleshootingInfo, staticThreshold)
+
+ def apply(metricName: String, appId: String, hosts: List[String]): MetricDefinition =
+ new MetricDefinition(metricName, appId, hosts, null, null, -1)
+
+}
\ No newline at end of file
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala
new file mode 100644
index 0000000..afad617
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.metadata
+
+case class MetricKey (metricName: String, appId: String, instanceId: String, hostname: String, uuid: Array[Byte]) {
+
+ @Override
+ override def toString: String = {
+ "MetricName=" + metricName + ",App=" + appId + ",InstanceId=" + instanceId + ",Host=" + hostname
+ }
+
+ @Override
+ override def equals(obj: scala.Any): Boolean = {
+
+ if (obj == null || (getClass ne obj.getClass))
+ return false
+
+ val that = obj.asInstanceOf[MetricKey]
+
+ if (!(metricName == that.metricName))
+ return false
+
+ if (!(appId == that.appId))
+ return false
+
+ if (!(instanceId == that.instanceId))
+ return false
+
+ if (!(hostname == that.hostname))
+ return false
+
+ true
+ }
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerService.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerService.scala
new file mode 100644
index 0000000..12bd7e4
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerService.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.metadata
+
+trait MetricManagerService {
+
+ /**
+ * Given a 'UUID', return the metric key associated with it.
+ * @param uuid UUID
+ * @return
+ */
+ def getMetricKeyFromUuid(uuid: Array[Byte]) : MetricKey
+
+ /**
+ * Given a component definition name, return the definition associated with it.
+ * @param name component definition name
+ * @return
+ */
+ def getDefinitionByName(name: String) : MetricSourceDefinition
+
+ /**
+ * Add a new definition.
+ * @param definition component definition JSON
+ * @return
+ */
+ def addDefinition(definition: MetricSourceDefinition) : Boolean
+
+ /**
+ * Update a component definition by name. Only definitions which were added by API can be modified through API.
+ * @param definition component definition name
+ * @return
+ */
+ def updateDefinition(definition: MetricSourceDefinition) : Boolean
+
+ /**
+ * Delete a component definition by name. Only definitions which were added by API can be deleted through API.
+ * @param name component definition name
+ * @return
+ */
+ def deleteDefinitionByName(name: String) : Boolean
+
+ /**
+ * Given an appId, return set of definitions that are tracked for that appId.
+ * @param appId component definition appId
+ * @return
+ */
+ def getDefinitionByAppId(appId: String) : List[MetricSourceDefinition]
+
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerServiceImpl.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerServiceImpl.scala
new file mode 100644
index 0000000..ce02775
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerServiceImpl.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.metadata
+
+import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
+import org.apache.ambari.metrics.adservice.db.AdMetadataStoreAccessor
+
+import com.google.inject.{Inject, Singleton}
+
+@Singleton
+class MetricManagerServiceImpl extends MetricManagerService {
+
+ @Inject
+ var adMetadataStoreAccessor: AdMetadataStoreAccessor = _
+
+ var configuration: AnomalyDetectionAppConfig = _
+ var metricMetadataProvider: MetricMetadataProvider = _
+
+ var metricSourceDefinitionMap: Map[String, MetricSourceDefinition] = Map()
+ var metricKeys: Set[MetricKey] = Set.empty[MetricKey]
+ var metricDefinitionMetricKeyMap: Map[MetricDefinition, Set[MetricKey]] = Map()
+
+ @Inject
+ def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig) = {
+ this ()
+ //TODO : Create AD Metadata instance here (or inject)
+ configuration = anomalyDetectionAppConfig
+ initializeService()
+ }
+
+ def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig, adMetadataStoreAccessor: AdMetadataStoreAccessor) = {
+ this ()
+ //TODO : Create AD Metadata instance here (or inject). Pass in Schema information.
+ configuration = anomalyDetectionAppConfig
+ this.adMetadataStoreAccessor = adMetadataStoreAccessor
+ initializeService()
+ }
+
+ def initializeService() : Unit = {
+
+ //Create AD Metadata Schema
+ //TODO Make sure AD Metadata DB is initialized here.
+
+ //Initialize Metric Metadata Provider
+ metricMetadataProvider = new ADMetadataProvider(configuration.getMetricCollectorConfiguration)
+
+ loadMetricSourceDefinitions()
+ }
+
+ def loadMetricSourceDefinitions() : Unit = {
+
+ //Load definitions from metadata store
+ val definitionsFromStore: List[MetricSourceDefinition] = adMetadataStoreAccessor.getSavedInputDefinitions
+
+ //Load definitions from configs
+ val definitionsFromConfig: List[MetricSourceDefinition] = getInputDefinitionsFromConfig
+
+ //Union the 2 sources, with DB taking precedence.
+ //Save new definition list to DB.
+ metricSourceDefinitionMap = metricSourceDefinitionMap.++(combineDefinitionSources(definitionsFromConfig, definitionsFromStore))
+
+ //Reach out to AMS Metadata and get Metric Keys. Pass in List<CD> and get back Map<MD,Set<MK>>
+ for (definition <- metricSourceDefinitionMap.values) {
+ val (definitionKeyMap: Map[MetricDefinition, Set[MetricKey]], keys: Set[MetricKey])= metricMetadataProvider.getMetricKeysForDefinitions(definition)
+ metricDefinitionMetricKeyMap = metricDefinitionMetricKeyMap.++(definitionKeyMap)
+ metricKeys = metricKeys.++(keys)
+ }
+ }
+
+ def getMetricKeyFromUuid(uuid: Array[Byte]): MetricKey = {
+ var key: MetricKey = null
+ for (metricKey <- metricKeys) {
+ if (metricKey.uuid.sameElements(uuid)) {
+ key = metricKey
+ }
+ }
+ key
+ }
+
+ @Override
+ def getDefinitionByName(name: String): MetricSourceDefinition = {
+ metricSourceDefinitionMap.apply(name)
+ }
+
+ @Override
+ def addDefinition(definition: MetricSourceDefinition): Boolean = {
+ if (metricSourceDefinitionMap.contains(definition.definitionName)) {
+ return false
+ }
+ definition.definitionSource = MetricSourceDefinitionType.API
+
+ val success: Boolean = adMetadataStoreAccessor.saveInputDefinition(definition)
+ if (success) {
+ metricSourceDefinitionMap += definition.definitionName -> definition
+ }
+ success
+ }
+
+ @Override
+ def updateDefinition(definition: MetricSourceDefinition): Boolean = {
+ if (!metricSourceDefinitionMap.contains(definition.definitionName)) {
+ return false
+ }
+
+ if (metricSourceDefinitionMap.apply(definition.definitionName).definitionSource != MetricSourceDefinitionType.API) {
+ return false
+ }
+
+ val success: Boolean = adMetadataStoreAccessor.saveInputDefinition(definition)
+ if (success) {
+ metricSourceDefinitionMap += definition.definitionName -> definition
+ }
+ success
+ }
+
+ @Override
+ def deleteDefinitionByName(name: String): Boolean = {
+ if (!metricSourceDefinitionMap.contains(name)) {
+ return false
+ }
+
+ val definition : MetricSourceDefinition = metricSourceDefinitionMap.apply(name)
+ if (definition.definitionSource != MetricSourceDefinitionType.API) {
+ return false
+ }
+
+ val success: Boolean = adMetadataStoreAccessor.removeInputDefinition(name)
+ if (success) {
+ metricSourceDefinitionMap += definition.definitionName -> definition
+ }
+ success
+ }
+
+ @Override
+ def getDefinitionByAppId(appId: String): List[MetricSourceDefinition] = {
+
+ val defList : List[MetricSourceDefinition] = metricSourceDefinitionMap.values.toList
+ defList.filter(_.appId == appId)
+ }
+
+ def combineDefinitionSources(configDefinitions: List[MetricSourceDefinition], dbDefinitions: List[MetricSourceDefinition])
+ : Map[String, MetricSourceDefinition] = {
+
+ var combinedDefinitionMap: scala.collection.mutable.Map[String, MetricSourceDefinition] =
+ scala.collection.mutable.Map.empty[String, MetricSourceDefinition]
+
+ for (definitionFromDb <- dbDefinitions) {
+ combinedDefinitionMap(definitionFromDb.definitionName) = definitionFromDb
+ }
+
+ for (definition <- configDefinitions) {
+ if (!dbDefinitions.contains(definition)) {
+ adMetadataStoreAccessor.saveInputDefinition(definition)
+ combinedDefinitionMap(definition.definitionName) = definition
+ }
+ }
+ combinedDefinitionMap.toMap
+ }
+
+ def getInputDefinitionsFromConfig: List[MetricSourceDefinition] = {
+ val configDirectory = configuration.getMetricManagerServiceConfiguration.getInputDefinitionDirectory
+ InputMetricDefinitionParser.parseInputDefinitionsFromDirectory(configDirectory)
+ }
+
+ def setAdMetadataStoreAccessor (adMetadataStoreAccessor: AdMetadataStoreAccessor) : Unit = {
+ this.adMetadataStoreAccessor = adMetadataStoreAccessor
+ }
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala
new file mode 100644
index 0000000..5f9c0a0
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.metadata
+
+/**
+ * Metadata provider for maintaining the metric information in the Metric Definition Service.
+ */
+trait MetricMetadataProvider {
+
+ /**
+ * Return the set of Metric Keys for a given component definition.
+ * @param metricSourceDefinition component definition
+ * @return
+ */
+ def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): (Map[MetricDefinition, Set[MetricKey]], Set[MetricKey])
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala
new file mode 100644
index 0000000..60198e0
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.metadata
+
+import javax.xml.bind.annotation.XmlRootElement
+
+import org.apache.ambari.metrics.adservice.metadata.MetricSourceDefinitionType.MetricSourceDefinitionType
+import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+
+/*
+{
+ "definition-name": "host-memory",
+ "app-id" : "HOST",
+ "hosts" : [“c6401.ambari.apache.org”],
+ "metric-definitions" : [
+ {
+ "metric-name": "mem_free",
+ "metric-description" : "Free memory on a Host.",
+ "troubleshooting-info" : "Sudden drop / hike in free memory on a host.",
+ "static-threshold" : 10,
+ “app-id” : “HOST”
+} ],
+
+ "related-definition-names" : ["host-cpu", “host-network”],
+ “anomaly-detection-subsystems” : [“point-in-time”, “trend”]
+}
+*/
+
+/*
+
+On Startup
+Read input definitions directory, parse the JSONs
+Create / Update the metric definitions in DB
+Convert metric definitions to Map<MetricKey, MetricDefinition>
+
+What to do want to have in memory?
+Map of Metric Key -> List<Component Definitions>
+
+What do we use metric definitions for?
+Anomaly GET - Associate definition information as well.
+Definition CRUD - Get definition given definition name
+Get set of metrics that are being tracked
+Return definition information for a metric key
+Given a metric definition name, return set of metrics.
+
+*/
+
+@XmlRootElement
+class MetricSourceDefinition {
+
+ var definitionName: String = _
+ var appId: String = _
+ var definitionSource: MetricSourceDefinitionType = MetricSourceDefinitionType.CONFIG
+ var hosts: List[String] = List.empty[String]
+ var relatedDefinitions: List[String] = List.empty[String]
+ var associatedAnomalySubsystems: List[AnomalyType] = List.empty[AnomalyType]
+
+ var metricDefinitions: scala.collection.mutable.MutableList[MetricDefinition] =
+ scala.collection.mutable.MutableList.empty[MetricDefinition]
+
+ def this(definitionName: String, appId: String, source: MetricSourceDefinitionType) = {
+ this
+ this.definitionName = definitionName
+ this.appId = appId
+ this.definitionSource = source
+ }
+
+ def addMetricDefinition(metricDefinition: MetricDefinition): Unit = {
+ if (!metricDefinitions.contains(metricDefinition)) {
+ metricDefinitions.+=(metricDefinition)
+ }
+ }
+
+ def removeMetricDefinition(metricDefinition: MetricDefinition): Unit = {
+ metricDefinitions = metricDefinitions.filter(_ != metricDefinition)
+ }
+
+ @Override
+ override def equals(obj: scala.Any): Boolean = {
+
+ if (obj == null) {
+ return false
+ }
+ val that = obj.asInstanceOf[MetricSourceDefinition]
+ definitionName.equals(that.definitionName)
+ }
+}
+
+object MetricSourceDefinition {
+ val mapper = new ObjectMapper() with ScalaObjectMapper
+ mapper.registerModule(DefaultScalaModule)
+
+ def serialize(definition: MetricSourceDefinition) : String = {
+ mapper.writeValueAsString(definition)
+ }
+
+ def deserialize(definitionString: String) : MetricSourceDefinition = {
+ mapper.readValue[MetricSourceDefinition](definitionString)
+ }
+}
\ No newline at end of file
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionType.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionType.scala
new file mode 100644
index 0000000..04ff95b
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionType.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.metadata
+
+import javax.xml.bind.annotation.XmlRootElement
+
+@XmlRootElement
+object MetricSourceDefinitionType extends Enumeration{
+ type MetricSourceDefinitionType = Value
+ val CONFIG,API = Value
+}
\ No newline at end of file
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/AnomalyDetectionMethod.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/AnomalyDetectionMethod.scala
new file mode 100644
index 0000000..81a7023
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/AnomalyDetectionMethod.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.model
+
+object AnomalyDetectionMethod extends Enumeration {
+ type AnomalyDetectionMethod = Value
+ val EMA, TUKEYS, KS, HSDEV, UNKOWN = Value
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/AnomalyType.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/AnomalyType.scala
new file mode 100644
index 0000000..817180e
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/AnomalyType.scala
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.model
+
+import javax.xml.bind.annotation.XmlRootElement
+
+@XmlRootElement
+object AnomalyType extends Enumeration {
+ type AnomalyType = Value
+ val POINT_IN_TIME, TREND, UNKNOWN = Value
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala
new file mode 100644
index 0000000..981a893
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.model
+
+import org.apache.ambari.metrics.adservice.metadata.MetricKey
+import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
+
+abstract class SingleMetricAnomalyInstance {
+
+ val metricKey: MetricKey
+ val anomalyType: AnomalyType
+
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala
index fb9921a..c941ac3 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala
@@ -17,9 +17,9 @@
*/
package org.apache.ambari.metrics.adservice.resource
-import javax.ws.rs.{GET, Path, Produces}
-import javax.ws.rs.core.Response
import javax.ws.rs.core.MediaType.APPLICATION_JSON
+import javax.ws.rs.core.Response
+import javax.ws.rs.{GET, Path, Produces}
import org.joda.time.DateTime
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala
new file mode 100644
index 0000000..aacea79
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.resource
+
+class MetricDefinitionResource {
+
+ /*
+ GET component definition
+ POST component definition
+ DELETE component definition
+ PUT component definition
+ */
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala
index b92a145..22fe0ac 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala
@@ -17,9 +17,9 @@
*/
package org.apache.ambari.metrics.adservice.resource
-import javax.ws.rs.{GET, Path, Produces}
-import javax.ws.rs.core.Response
import javax.ws.rs.core.MediaType.APPLICATION_JSON
+import javax.ws.rs.core.Response
+import javax.ws.rs.{GET, Path, Produces}
import org.joda.time.DateTime
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/SubsystemResource.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/SubsystemResource.scala
new file mode 100644
index 0000000..e7d7c9a
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/SubsystemResource.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.resource
+
+class SubsystemResource {
+
+ /*
+ GET / UPDATE - parameters (which subsystem, parameters)
+ POST - Update sensitivity of a subsystem (which subsystem, increase or decrease, factor)
+ */
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala
index 0161166..8e6f511 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala
@@ -17,6 +17,18 @@
*/
package org.apache.ambari.metrics.adservice.service
+import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
+import org.apache.ambari.metrics.adservice.model.SingleMetricAnomalyInstance
+
trait ADQueryService {
+ /**
+ * API to return list of single metric anomalies satisfying a set of conditions from the anomaly store.
+ * @param anomalyType Type of the anomaly (Point In Time / Trend)
+ * @param startTime Start of time range
+ * @param endTime End of time range
+ * @param limit Maximim number of anomaly metrics that need to be returned based on anomaly score.
+ * @return
+ */
+ def getTopNAnomaliesByType(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): List[SingleMetricAnomalyInstance]
}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala
index fe00f58..e5efa44 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala
@@ -16,7 +16,22 @@
* limitations under the License.
*/
package org.apache.ambari.metrics.adservice.service
+import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
+import org.apache.ambari.metrics.adservice.model.SingleMetricAnomalyInstance
class ADQueryServiceImpl extends ADQueryService {
+ /**
+ * Implementation to return list of anomalies satisfying a set of conditions from the anomaly store.
+ *
+ * @param anomalyType Type of the anomaly (Point In Time / Trend)
+ * @param startTime Start of time range
+ * @param endTime End of time range
+ * @param limit Maximim number of anomaly metrics that need to be returned based on anomaly score.
+ * @return
+ */
+ override def getTopNAnomaliesByType(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): List[SingleMetricAnomalyInstance] = {
+ val anomalies = List.empty[SingleMetricAnomalyInstance]
+ anomalies
+ }
}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/MetricAnomalyDetector.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/MetricAnomalyDetector.scala
index 6122f5e..90c564e 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/MetricAnomalyDetector.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/MetricAnomalyDetector.scala
@@ -16,22 +16,6 @@
*/
package org.apache.ambari.metrics.adservice.spark.prototype
-import java.io.{FileInputStream, IOException, InputStream}
-import java.util
-import java.util.Properties
-import java.util.logging.LogManager
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.ambari.metrics.adservice.prototype.core.MetricsCollectorInterface
-import org.apache.spark.SparkConf
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.kafka._
-import org.apache.ambari.metrics.adservice.prototype.methods.{AnomalyDetectionTechnique, MetricAnomaly}
-import org.apache.ambari.metrics.adservice.prototype.methods.ema.{EmaModelLoader, EmaTechnique}
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics
-import org.apache.log4j.Logger
-import org.apache.spark.storage.StorageLevel
-
object MetricAnomalyDetector {
/*
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala
index ac00764..466225f 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala
@@ -17,11 +17,6 @@
package org.apache.ambari.metrics.adservice.spark.prototype
-import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.{SparkConf, SparkContext}
-
object SparkPhoenixReader {
def main(args: Array[String]) {
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala
new file mode 100644
index 0000000..63cf8c7
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.subsystem.pointintime
+
+import java.util.Date
+
+import org.apache.ambari.metrics.adservice.common.Season
+import org.apache.ambari.metrics.adservice.metadata.MetricKey
+import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod
+import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
+import org.apache.ambari.metrics.adservice.model.{AnomalyType, SingleMetricAnomalyInstance}
+
+class PointInTimeAnomalyInstance(val metricKey: MetricKey,
+ val timestamp: Long,
+ val metricValue: Double,
+ val methodType: AnomalyDetectionMethod,
+ val anomalyScore: Double,
+ val anomalousSeason: Season,
+ val modelParameters: String) extends SingleMetricAnomalyInstance {
+
+ override val anomalyType: AnomalyType = AnomalyType.POINT_IN_TIME
+
+ private def anomalyToString : String = {
+ "Method=" + methodType + ", AnomalyScore=" + anomalyScore + ", Season=" + anomalousSeason.toString +
+ ", Model Parameters=" + modelParameters
+ }
+
+ @Override
+ override def toString: String = {
+ "Metric : [" + metricKey.toString + ", Metric Value=" + metricValue + " @ Time = " + new Date(timestamp) + "], Anomaly : [" + anomalyToString + "]"
+ }
+
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala
new file mode 100644
index 0000000..125da34
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala
@@ -0,0 +1,29 @@
+package org.apache.ambari.metrics.adservice.subsystem.trend
+
+import org.apache.ambari.metrics.adservice.common.{Season, TimeRange}
+import org.apache.ambari.metrics.adservice.metadata.MetricKey
+import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod
+import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
+import org.apache.ambari.metrics.adservice.model.{AnomalyType, SingleMetricAnomalyInstance}
+
+case class TrendAnomalyInstance (metricKey: MetricKey,
+ anomalousPeriod: TimeRange,
+ referencePeriod: TimeRange,
+ methodType: AnomalyDetectionMethod,
+ anomalyScore: Double,
+ seasonInfo: Season,
+ modelParameters: String) extends SingleMetricAnomalyInstance {
+
+ override val anomalyType: AnomalyType = AnomalyType.POINT_IN_TIME
+
+ private def anomalyToString : String = {
+ "Method=" + methodType + ", AnomalyScore=" + anomalyScore + ", Season=" + anomalousPeriod.toString +
+ ", Model Parameters=" + modelParameters
+ }
+
+ @Override
+ override def toString: String = {
+ "Metric : [" + metricKey.toString + ", AnomalousPeriod=" + anomalousPeriod + ", ReferencePeriod=" + referencePeriod +
+ "], Anomaly : [" + anomalyToString + "]"
+ }
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestTukeys.java b/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestTukeys.java
index 57a6f34..1077a9c 100644
--- a/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestTukeys.java
+++ b/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestTukeys.java
@@ -17,9 +17,9 @@
*/
package org.apache.ambari.metrics.adservice.prototype;
-import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly;
import org.apache.ambari.metrics.adservice.prototype.core.MetricsCollectorInterface;
import org.apache.ambari.metrics.adservice.prototype.core.RFunctionInvoker;
+import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly;
import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
diff --git a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala
new file mode 100644
index 0000000..8e3a949
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.app
+
+import java.io.File
+
+import javax.validation.Validator
+
+import org.scalatest.FunSuite
+
+import com.fasterxml.jackson.databind.ObjectMapper
+
+import io.dropwizard.configuration.YamlConfigurationFactory
+import io.dropwizard.jackson.Jackson
+import io.dropwizard.jersey.validation.Validators
+
+class AnomalyDetectionAppConfigTest extends FunSuite {
+
+ test("testConfiguration") {
+
+ val objectMapper: ObjectMapper = Jackson.newObjectMapper()
+ val validator: Validator = Validators.newValidator
+ val factory: YamlConfigurationFactory[AnomalyDetectionAppConfig] =
+ new YamlConfigurationFactory[AnomalyDetectionAppConfig](classOf[AnomalyDetectionAppConfig], validator, objectMapper, "")
+
+ val classLoader = getClass.getClassLoader
+ val file = new File(classLoader.getResource("config.yml").getFile)
+ val config = factory.build(file)
+
+ assert(config.isInstanceOf[AnomalyDetectionAppConfig])
+
+ assert(config.getMetricManagerServiceConfiguration.getInputDefinitionDirectory == "/etc/adservice/conf/input-definitions-directory")
+
+ assert(config.getMetricCollectorConfiguration.getHostPortList == "host1:6188,host2:6188")
+
+ assert(config.getAdServiceConfiguration.getAnomalyDataTtl == 604800)
+ }
+
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala
index c088855..65cf609 100644
--- a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala
+++ b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala
@@ -21,8 +21,8 @@
import javax.ws.rs.core.MediaType.APPLICATION_JSON
import org.apache.ambari.metrics.adservice.app.DropwizardAppRuleHelper.withAppRunning
-import org.glassfish.jersey.client.{ClientConfig, JerseyClientBuilder}
import org.glassfish.jersey.client.ClientProperties.{CONNECT_TIMEOUT, READ_TIMEOUT}
+import org.glassfish.jersey.client.{ClientConfig, JerseyClientBuilder}
import org.glassfish.jersey.filter.LoggingFilter
import org.glassfish.jersey.jaxb.internal.XmlJaxbElementProvider
import org.joda.time.DateTime
diff --git a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/ADManagerConfigurationTest.scala b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/ADManagerConfigurationTest.scala
deleted file mode 100644
index 40b9d6a..0000000
--- a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/ADManagerConfigurationTest.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.adservice.common
-
-import org.scalatest.FlatSpec
-
-import scala.collection.mutable
-
-class ADServiceConfigurationTest extends FlatSpec {
-
- "A Stack" should "pop values in last-in-first-out order" in {
- val stack = new mutable.Stack[Int]
- stack.push(1)
- stack.push(2)
- assert(stack.pop() === 2)
- assert(stack.pop() === 1)
- }
-
- it should "throw NoSuchElementException if an empty stack is popped" in {
- val emptyStack = new mutable.Stack[String]
- assertThrows[NoSuchElementException] {
- emptyStack.pop()
- }
- }
-}
diff --git a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/RangeTest.scala b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/RangeTest.scala
new file mode 100644
index 0000000..b610b97
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/RangeTest.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.common
+
+import org.scalatest.FlatSpec
+
+class RangeTest extends FlatSpec {
+
+ "A Range " should " return true for inner and boundary values" in {
+ val range : Range = Range(4,6)
+ assert(range.withinRange(5))
+ assert(range.withinRange(6))
+ assert(range.withinRange(4))
+ assert(!range.withinRange(7))
+ }
+
+ it should "accept same lower and higher range values" in {
+ val range : Range = Range(4,4)
+ assert(range.withinRange(4))
+ }
+
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/SeasonTest.scala b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/SeasonTest.scala
new file mode 100644
index 0000000..4d542e8
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/SeasonTest.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.common
+
+import java.util.Calendar
+
+import org.scalatest.FunSuite
+
+class SeasonTest extends FunSuite {
+
+ test("testBelongsTo") {
+
+ //Create Season for weekdays. Mon to Friday and 9AM - 5PM
+ var season : Season = Season(Range(Calendar.MONDAY,Calendar.FRIDAY), Range(9,17))
+
+ //Try with a timestamp on a Monday, @ 9AM.
+ val c = Calendar.getInstance
+ c.set(2017, Calendar.OCTOBER, 30, 9, 0, 0)
+ assert(season.belongsTo(c.getTimeInMillis))
+
+ c.set(2017, Calendar.OCTOBER, 30, 18, 0, 0)
+ assert(!season.belongsTo(c.getTimeInMillis))
+
+ //Try with a timestamp on a Sunday, @ 9AM.
+ c.set(2017, Calendar.OCTOBER, 29, 9, 0, 0)
+ assert(!season.belongsTo(c.getTimeInMillis))
+
+ //Create Season for Monday 11AM - 12Noon.
+ season = Season(Range(Calendar.MONDAY,Calendar.MONDAY), Range(11,12))
+ c.set(2017, Calendar.OCTOBER, 30, 9, 0, 0)
+ assert(!season.belongsTo(c.getTimeInMillis))
+
+ c.set(2017, Calendar.OCTOBER, 30, 11, 30, 0)
+ assert(season.belongsTo(c.getTimeInMillis))
+
+
+ //Create Season from Friday to Monday and 9AM - 5PM
+ season = Season(Range(Calendar.FRIDAY,Calendar.MONDAY), Range(9,17))
+
+ //Try with a timestamp on a Monday, @ 9AM.
+ c.set(2017, Calendar.OCTOBER, 30, 9, 0, 0)
+ assert(season.belongsTo(c.getTimeInMillis))
+
+ //Try with a timestamp on a Sunday, @ 3PM.
+ c.set(2017, Calendar.OCTOBER, 29, 15, 0, 0)
+ assert(season.belongsTo(c.getTimeInMillis))
+
+ //Try with a timestamp on a Wednesday, @ 9AM.
+ c.set(2017, Calendar.NOVEMBER, 1, 9, 0, 0)
+ assert(!season.belongsTo(c.getTimeInMillis))
+ }
+
+ test("testEquals") {
+
+ var season1: Season = Season(Range(4,5), Range(2,3))
+ var season2: Season = Season(Range(4,5), Range(2,3))
+ assert(season1 == season2)
+
+ var season3: Season = Season(Range(4,4), Range(2,3))
+ assert(!(season1 == season3))
+ }
+
+ test("testSerialize") {
+ val season1 : Season = Season(Range(Calendar.MONDAY,Calendar.FRIDAY), Range(9,17))
+
+ val seasonString = Season.serialize(season1)
+
+ val season2 : Season = Season.deserialize(seasonString)
+ assert(season1 == season2)
+
+ val season3 : Season = Season(Range(Calendar.MONDAY,Calendar.THURSDAY), Range(9,17))
+ assert(!(season2 == season3))
+
+ }
+
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala
new file mode 100644
index 0000000..bd38e9a
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.metadata
+
+import org.apache.ambari.metrics.adservice.configuration.MetricCollectorConfiguration
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey
+import org.scalatest.FunSuite
+
+class AMSMetadataProviderTest extends FunSuite {
+
+ test("testFromTimelineMetricKey") {
+ val timelineMetricKeys: java.util.Set[TimelineMetricKey] = new java.util.HashSet[TimelineMetricKey]()
+
+ val uuid: Array[Byte] = Array.empty[Byte]
+
+ for (i <- 1 to 3) {
+ val key: TimelineMetricKey = new TimelineMetricKey("M" + i, "App", null, "H", uuid)
+ timelineMetricKeys.add(key)
+ }
+
+ val aMSMetadataProvider : ADMetadataProvider = new ADMetadataProvider(new MetricCollectorConfiguration)
+
+ val metricKeys : Set[MetricKey] = aMSMetadataProvider.fromTimelineMetricKey(timelineMetricKeys)
+ assert(metricKeys.size == 3)
+ }
+
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerServiceTest.scala b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerServiceTest.scala
new file mode 100644
index 0000000..8e19a0f
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerServiceTest.scala
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.metadata
+
+import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
+import org.apache.ambari.metrics.adservice.db.AdMetadataStoreAccessor
+import org.easymock.EasyMock.{anyObject, expect, expectLastCall, replay}
+import org.scalatest.FunSuite
+import org.scalatest.easymock.EasyMockSugar
+
+class MetricManagerServiceTest extends FunSuite {
+
+ test("testAddDefinition") {
+
+ val definitions : scala.collection.mutable.MutableList[MetricSourceDefinition] = scala.collection.mutable.MutableList.empty[MetricSourceDefinition]
+
+ for (i <- 1 to 3) {
+ val msd1 : MetricSourceDefinition = new MetricSourceDefinition("TestDefinition" + i, "testAppId", MetricSourceDefinitionType.API)
+ definitions.+=(msd1)
+ }
+
+ val newDef : MetricSourceDefinition = new MetricSourceDefinition("NewDefinition", "testAppId", MetricSourceDefinitionType.API)
+
+ val adMetadataStoreAccessor: AdMetadataStoreAccessor = EasyMockSugar.niceMock[AdMetadataStoreAccessor]
+ expect(adMetadataStoreAccessor.getSavedInputDefinitions).andReturn(definitions.toList).once()
+ expect(adMetadataStoreAccessor.saveInputDefinition(newDef)).andReturn(true).once()
+ replay(adMetadataStoreAccessor)
+
+ val metricManagerService: MetricManagerServiceImpl = new MetricManagerServiceImpl(new AnomalyDetectionAppConfig, adMetadataStoreAccessor)
+
+ metricManagerService.setAdMetadataStoreAccessor(adMetadataStoreAccessor)
+
+ metricManagerService.addDefinition(newDef)
+
+ assert(metricManagerService.metricSourceDefinitionMap.size == 4)
+ assert(metricManagerService.metricSourceDefinitionMap.get("testDefinition") != null)
+ }
+
+ test("testGetDefinitionByName") {
+ val definitions : scala.collection.mutable.MutableList[MetricSourceDefinition] = scala.collection.mutable.MutableList.empty[MetricSourceDefinition]
+
+ for (i <- 1 to 3) {
+ val msd1 : MetricSourceDefinition = new MetricSourceDefinition("TestDefinition" + i, "testAppId", MetricSourceDefinitionType.API)
+ definitions.+=(msd1)
+ }
+
+ val adMetadataStoreAccessor: AdMetadataStoreAccessor = EasyMockSugar.niceMock[AdMetadataStoreAccessor]
+ expect(adMetadataStoreAccessor.getSavedInputDefinitions).andReturn(definitions.toList).once()
+ replay(adMetadataStoreAccessor)
+
+ val metricManagerService: MetricManagerServiceImpl = new MetricManagerServiceImpl(new AnomalyDetectionAppConfig, adMetadataStoreAccessor)
+
+ metricManagerService.setAdMetadataStoreAccessor(adMetadataStoreAccessor)
+ for (i <- 1 to 3) {
+ val definition: MetricSourceDefinition = metricManagerService.getDefinitionByName("TestDefinition" + i)
+ assert(definition != null)
+ }
+ }
+
+ test("testGetDefinitionByAppId") {
+ val definitions : scala.collection.mutable.MutableList[MetricSourceDefinition] = scala.collection.mutable.MutableList.empty[MetricSourceDefinition]
+
+ for (i <- 1 to 3) {
+ var msd1 : MetricSourceDefinition = null
+ if (i == 2) {
+ msd1 = new MetricSourceDefinition("TestDefinition" + i, null, MetricSourceDefinitionType.API)
+ } else {
+ msd1 = new MetricSourceDefinition("TestDefinition" + i, "testAppId", MetricSourceDefinitionType.API)
+ }
+ definitions.+=(msd1)
+ }
+
+ val adMetadataStoreAccessor: AdMetadataStoreAccessor = EasyMockSugar.niceMock[AdMetadataStoreAccessor]
+ expect(adMetadataStoreAccessor.getSavedInputDefinitions).andReturn(definitions.toList).once()
+ replay(adMetadataStoreAccessor)
+
+ val metricManagerService: MetricManagerServiceImpl = new MetricManagerServiceImpl(new AnomalyDetectionAppConfig, adMetadataStoreAccessor)
+
+ metricManagerService.setAdMetadataStoreAccessor(adMetadataStoreAccessor)
+ val definitionsByAppId: List[MetricSourceDefinition] = metricManagerService.getDefinitionByAppId("testAppId")
+ assert(definitionsByAppId.size == 2)
+ }
+
+ test("testDeleteDefinitionByName") {
+ val definitions : scala.collection.mutable.MutableList[MetricSourceDefinition] = scala.collection.mutable.MutableList.empty[MetricSourceDefinition]
+
+ for (i <- 1 to 3) {
+ var msd1 : MetricSourceDefinition = null
+ if (i == 2) {
+ msd1 = new MetricSourceDefinition("TestDefinition" + i, null, MetricSourceDefinitionType.CONFIG)
+ } else {
+ msd1 = new MetricSourceDefinition("TestDefinition" + i, "testAppId", MetricSourceDefinitionType.API)
+ }
+ definitions.+=(msd1)
+ }
+
+ val adMetadataStoreAccessor: AdMetadataStoreAccessor = EasyMockSugar.niceMock[AdMetadataStoreAccessor]
+ expect(adMetadataStoreAccessor.getSavedInputDefinitions).andReturn(definitions.toList).once()
+ expect(adMetadataStoreAccessor.removeInputDefinition(anyObject[String])).andReturn(true).times(2)
+ replay(adMetadataStoreAccessor)
+
+ val metricManagerService: MetricManagerServiceImpl = new MetricManagerServiceImpl(new AnomalyDetectionAppConfig, adMetadataStoreAccessor)
+
+ metricManagerService.setAdMetadataStoreAccessor(adMetadataStoreAccessor)
+
+ var success: Boolean = metricManagerService.deleteDefinitionByName("TestDefinition1")
+ assert(success)
+ success = metricManagerService.deleteDefinitionByName("TestDefinition2")
+ assert(!success)
+ success = metricManagerService.deleteDefinitionByName("TestDefinition3")
+ assert(success)
+ }
+
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala
new file mode 100644
index 0000000..c4d639c
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.metadata
+
+import org.scalatest.FunSuite
+
+class MetricSourceDefinitionTest extends FunSuite {
+
+ test("createNewMetricSourceDefinition") {
+ val msd : MetricSourceDefinition = new MetricSourceDefinition("testDefinition", "testAppId", MetricSourceDefinitionType.API)
+
+ assert(msd.definitionName == "testDefinition")
+ assert(msd.appId == "testAppId")
+ assert(msd.definitionSource == MetricSourceDefinitionType.API)
+
+ assert(msd.hosts.isEmpty)
+ assert(msd.metricDefinitions.isEmpty)
+ assert(msd.associatedAnomalySubsystems.isEmpty)
+ assert(msd.relatedDefinitions.isEmpty)
+ }
+
+ test("testAddMetricDefinition") {
+ val msd : MetricSourceDefinition = new MetricSourceDefinition("testDefinition", "testAppId", MetricSourceDefinitionType.API)
+ assert(msd.metricDefinitions.isEmpty)
+
+ msd.addMetricDefinition(MetricDefinition("TestMetric", "TestApp", List.empty[String]))
+ assert(msd.metricDefinitions.nonEmpty)
+ }
+
+ test("testEquals") {
+ val msd1 : MetricSourceDefinition = new MetricSourceDefinition("testDefinition", "testAppId", MetricSourceDefinitionType.API)
+ val msd2 : MetricSourceDefinition = new MetricSourceDefinition("testDefinition", "testAppId2", MetricSourceDefinitionType.API)
+ assert(msd1 == msd2)
+ }
+
+ test("testRemoveMetricDefinition") {
+ val msd : MetricSourceDefinition = new MetricSourceDefinition("testDefinition", "testAppId", MetricSourceDefinitionType.API)
+ assert(msd.metricDefinitions.isEmpty)
+
+ msd.addMetricDefinition(MetricDefinition("TestMetric", "TestApp", List.empty[String]))
+ assert(msd.metricDefinitions.nonEmpty)
+
+ msd.removeMetricDefinition(MetricDefinition("TestMetric", "TestApp", List.empty[String]))
+ assert(msd.metricDefinitions.isEmpty)
+ }
+
+ test("serializeDeserialize") {
+ val msd : MetricSourceDefinition = new MetricSourceDefinition("testDefinition", "testAppId", MetricSourceDefinitionType.API)
+ val msdString: String = MetricSourceDefinition.serialize(msd)
+ assert(msdString.nonEmpty)
+
+ val msd2: MetricSourceDefinition = MetricSourceDefinition.deserialize(msdString)
+ assert(msd2 != null)
+ assert(msd == msd2)
+
+ }
+}
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricKey.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricKey.java
new file mode 100644
index 0000000..7619811
--- /dev/null
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricKey.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.commons.lang.StringUtils;
+
+public class TimelineMetricKey {
+ public String metricName;
+ public String appId;
+ public String instanceId = null;
+ public String hostName;
+ public byte[] uuid;
+
+ public TimelineMetricKey(String metricName, String appId, String instanceId, String hostName, byte[] uuid) {
+ this.metricName = metricName;
+ this.appId = appId;
+ this.instanceId = instanceId;
+ this.hostName = hostName;
+ this.uuid = uuid;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TimelineMetricKey that = (TimelineMetricKey) o;
+
+ if (!metricName.equals(that.metricName)) return false;
+ if (!appId.equals(that.appId)) return false;
+ if (!hostName.equals(that.hostName)) return false;
+ return (StringUtils.isNotEmpty(instanceId) ? instanceId.equals(that.instanceId) : StringUtils.isEmpty(that.instanceId));
+ }
+
+ @Override
+ public int hashCode() {
+ int result = metricName.hashCode();
+ result = 31 * result + (appId != null ? appId.hashCode() : 0);
+ result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+ result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
+ return result;
+ }
+
+}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
index 4450d65..a96be30 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
@@ -19,6 +19,31 @@
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -29,6 +54,7 @@
import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
@@ -51,29 +77,6 @@
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
-
public class HBaseTimelineMetricsService extends AbstractService implements TimelineMetricStore {
static final Log LOG = LogFactory.getLog(HBaseTimelineMetricsService.class);
@@ -437,11 +440,17 @@
}
@Override
- public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException {
+ public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String appId, String metricPattern,
+ boolean includeBlacklistedMetrics) throws SQLException, IOException {
Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata =
metricMetadataManager.getMetadataCache();
- boolean includeBlacklistedMetrics = StringUtils.isNotEmpty(query) && "all".equalsIgnoreCase(query);
+ boolean filterByAppId = StringUtils.isNotEmpty(appId);
+ boolean filterByMetricName = StringUtils.isNotEmpty(metricPattern);
+ Pattern metricFilterPattern = null;
+ if (filterByMetricName) {
+ metricFilterPattern = Pattern.compile(metricPattern);
+ }
// Group Metadata by AppId
Map<String, List<TimelineMetricMetadata>> metadataByAppId = new HashMap<>();
@@ -450,10 +459,23 @@
if (!includeBlacklistedMetrics && !metricMetadata.isWhitelisted()) {
continue;
}
- List<TimelineMetricMetadata> metadataList = metadataByAppId.get(metricMetadata.getAppId());
+
+ String currentAppId = metricMetadata.getAppId();
+ if (filterByAppId && !currentAppId.equals(appId)) {
+ continue;
+ }
+
+ if (filterByMetricName) {
+ Matcher m = metricFilterPattern.matcher(metricMetadata.getMetricName());
+ if (!m.find()) {
+ continue;
+ }
+ }
+
+ List<TimelineMetricMetadata> metadataList = metadataByAppId.get(currentAppId);
if (metadataList == null) {
metadataList = new ArrayList<>();
- metadataByAppId.put(metricMetadata.getAppId(), metadataList);
+ metadataByAppId.put(currentAppId, metadataList);
}
metadataList.add(metricMetadata);
@@ -463,8 +485,42 @@
}
@Override
- public Map<String, TimelineMetricMetadataKey> getUuids() throws SQLException, IOException {
- return metricMetadataManager.getUuidKeyMap();
+ public byte[] getUuid(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException {
+ return metricMetadataManager.getUuid(metricName, appId, instanceId, hostname);
+ }
+
+ /**
+ * Given a metricName, appId, instanceId and optional hostname parameter, return a set of TimelineMetricKey objects
+ * that will have all the unique metric instances for the above parameter filter.
+ *
+ * @param metricName
+ * @param appId
+ * @param instanceId
+ * @param hostname
+ * @return
+ * @throws SQLException
+ * @throws IOException
+ */
+ @Override
+ public Set<TimelineMetricKey> getTimelineMetricKey(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException {
+
+ if (StringUtils.isEmpty(hostname)) {
+ Set<String> hosts = new HashSet<>();
+ for (String host : metricMetadataManager.getHostedAppsCache().keySet()) {
+ if (metricMetadataManager.getHostedAppsCache().get(host).getHostedApps().contains(appId)) {
+ hosts.add(host);
+ }
+ }
+ Set<TimelineMetricKey> timelineMetricKeys = new HashSet<>();
+ for (String host : hosts) {
+ byte[] uuid = metricMetadataManager.getUuid(metricName, appId, instanceId, host);
+ timelineMetricKeys.add(new TimelineMetricKey(metricName, appId, instanceId, host, uuid));
+ }
+ return timelineMetricKeys;
+ } else {
+ byte[] uuid = metricMetadataManager.getUuid(metricName, appId, instanceId, hostname);
+ return Collections.singleton(new TimelineMetricKey(metricName, appId, instanceId, hostname, uuid));
+ }
}
@Override
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
index cdeefdc..f00bd91 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -21,11 +21,11 @@
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
import java.io.IOException;
import java.sql.SQLException;
@@ -81,7 +81,8 @@
* @throws SQLException
* @throws IOException
*/
- Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException;
+ Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String appId, String metricPattern,
+ boolean includeBlacklistedMetrics) throws SQLException, IOException;
TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException;
/**
@@ -100,7 +101,7 @@
*/
Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId) throws SQLException, IOException;
- Map<String, TimelineMetricMetadataKey> getUuids() throws SQLException, IOException;
+ byte[] getUuid(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException;
/**
* Return a list of known live collector nodes
@@ -109,4 +110,7 @@
List<String> getLiveInstances();
TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) throws SQLException;
+
+ Set<TimelineMetricKey> getTimelineMetricKey(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException;
+
}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
index f9ad773..6b926ac 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
@@ -438,6 +438,16 @@
return ArrayUtils.addAll(metricUuid, hostUuid);
}
+ public byte[] getUuid(String metricName, String appId, String instanceId, String hostname) {
+
+ byte[] metricUuid = getUuid(new TimelineClusterMetric(metricName, appId, instanceId, -1l));
+ if (StringUtils.isNotEmpty(hostname)) {
+ byte[] hostUuid = getUuidForHostname(hostname);
+ return ArrayUtils.addAll(metricUuid, hostUuid);
+ }
+ return metricUuid;
+ }
+
public String getMetricNameFromUuid(byte[] uuid) {
byte[] metricUuid = uuid;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
index 5d9bb35..db35686 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@@ -50,6 +51,7 @@
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@@ -434,18 +436,24 @@
throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
}
+
@GET
@Path("/metrics/metadata")
@Produces({ MediaType.APPLICATION_JSON })
public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
- @QueryParam("query") String query
+ @QueryParam("appId") String appId,
+ @QueryParam("metricName") String metricPattern,
+ @QueryParam("includeAll") String includeBlacklistedMetrics
) {
init(res);
try {
- return timelineMetricStore.getTimelineMetricMetadata(query);
+ return timelineMetricStore.getTimelineMetricMetadata(
+ parseStr(appId),
+ parseStr(metricPattern),
+ parseBoolean(includeBlacklistedMetrics));
} catch (Exception e) {
throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
@@ -486,16 +494,40 @@
}
@GET
- @Path("/metrics/uuids")
+ @Path("/metrics/uuid")
@Produces({ MediaType.APPLICATION_JSON })
- public Map<String, TimelineMetricMetadataKey> getUuids(
+ public byte[] getUuid(
@Context HttpServletRequest req,
- @Context HttpServletResponse res
+ @Context HttpServletResponse res,
+ @QueryParam("metricName") String metricName,
+ @QueryParam("appId") String appId,
+ @QueryParam("instanceId") String instanceId,
+ @QueryParam("hostname") String hostname
+ ) {
+ init(res);
+
+ try {
+ return timelineMetricStore.getUuid(metricName, appId, instanceId, hostname);
+ } catch (Exception e) {
+ throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ @GET
+ @Path("/metrics/metadata/key")
+ @Produces({ MediaType.APPLICATION_JSON })
+ public Set<TimelineMetricKey> getTimelineMetricKey(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @QueryParam("metricName") String metricName,
+ @QueryParam("appId") String appId,
+ @QueryParam("instanceId") String instanceId,
+ @QueryParam("hostname") String hostname
) {
init(res);
try {
- return timelineMetricStore.getUuids();
+ return timelineMetricStore.getTimelineMetricKey(metricName, appId, instanceId, hostname);
} catch (Exception e) {
throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
index 7c879e1..42175a7 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
@@ -90,7 +91,8 @@
}
@Override
- public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException {
+ public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String appId, String metricPattern,
+ boolean includeBlacklistedMetrics) throws SQLException, IOException {
return null;
}
@@ -115,7 +117,7 @@
}
@Override
- public Map<String, TimelineMetricMetadataKey> getUuids() throws SQLException, IOException {
+ public byte[] getUuid(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException {
return null;
}
@@ -124,4 +126,9 @@
return null;
}
+ @Override
+ public Set<TimelineMetricKey> getTimelineMetricKey(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException {
+ return Collections.emptySet();
+ }
+
}