AMBARI-22437 : Create an 'AD Manager' component in Ambari Metrics Service stack side. (avijayan)
diff --git a/ambari-metrics-anomaly-detection-service/conf/unix/ambari-metrics-admanager.sh b/ambari-metrics-anomaly-detection-service/conf/unix/ambari-metrics-admanager.sh
new file mode 100644
index 0000000..f1a1ae3
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/conf/unix/ambari-metrics-admanager.sh
@@ -0,0 +1,194 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific
+
+PIDFILE=/var/run//var/run/ambari-metrics-anomaly-detection/ambari-metrics-admanager.pid
+OUTFILE=/var/log/ambari-metrics-anomaly-detection/ambari-metrics-admanager.out
+
+CONF_DIR=/etc/ambari-metrics-anomaly-detection/conf
+DAEMON_NAME=ams_admanager
+
+STOP_TIMEOUT=5
+
+function write_pidfile
+{
+ local pidfile="$1"
+ echo $! > "${pidfile}" 2>/dev/null
+ if [[ $? -gt 0 ]]; then
+ echo "ERROR: Cannot write pid ${pidfile}." | tee -a $STARTUPFILE
+ exit 1;
+ fi
+}
+
+function java_setup
+{
+ # Bail if we did not detect it
+ if [[ -z "${JAVA_HOME}" ]]; then
+ echo "ERROR: JAVA_HOME is not set and could not be found."
+ exit 1
+ fi
+
+ if [[ ! -d "${JAVA_HOME}" ]]; then
+ echo "ERROR: JAVA_HOME ${JAVA_HOME} does not exist."
+ exit 1
+ fi
+
+ JAVA="${JAVA_HOME}/bin/java"
+
+ if [[ ! -x "$JAVA" ]]; then
+ echo "ERROR: $JAVA is not executable."
+ exit 1
+ fi
+}
+
+function daemon_status()
+{
+ #
+ # LSB 4.1.0 compatible status command (1)
+ #
+ # 0 = program is running
+ # 1 = dead, but still a pid (2)
+ # 2 = (not used by us)
+ # 3 = not running
+ #
+ # 1 - this is not an endorsement of the LSB
+ #
+ # 2 - technically, the specification says /var/run/pid, so
+ # we should never return this value, but we're giving
+ # them the benefit of a doubt and returning 1 even if
+ # our pid is not in in /var/run .
+ #
+
+ local pidfile="$1"
+ shift
+
+ local pid
+
+ if [[ -f "${pidfile}" ]]; then
+ pid=$(cat "${pidfile}")
+ if ps -p "${pid}" > /dev/null 2>&1; then
+ return 0
+ fi
+ return 1
+ fi
+ return 3
+}
+
+function start()
+{
+ java_setup
+
+ daemon_status "${PIDFILE}"
+ if [[ $? == 0 ]]; then
+ echo "AMS AD Manager is running as process $(cat "${PIDFILE}"). Exiting" | tee -a $STARTUPFILE
+ exit 0
+ else
+ # stale pid file, so just remove it and continue on
+ rm -f "${PIDFILE}" >/dev/null 2>&1
+ fi
+
+ nohup "${JAVA}" "-Xms$AMS_AD_HEAPSIZE" "-Xmx$AMS_AD_HEAPSIZE" ${AMS_AD_OPTS} "-Dlog4j.configuration=file://$CONF_DIR/log4j.properties" "-jar" "/usr/lib/ambari-metrics-anomaly-detection/ambari-metrics-anomaly-detection-service.jar" "server" "${CONF_DIR}/config.yaml" "$@" > $OUTFILE 2>&1 &
+ PID=$!
+ write_pidfile "${PIDFILE}"
+ sleep 2
+
+ echo "Verifying ${DAEMON_NAME} process status..."
+ if [ -z "`ps ax -o pid | grep ${PID}`" ]; then
+ if [ -s ${OUTFILE} ]; then
+ echo "ERROR: ${DAEMON_NAME} start failed. For more details, see ${OUTFILE}:"
+ echo "===================="
+ tail -n 10 ${OUTFILE}
+ echo "===================="
+ else
+ echo "ERROR: ${DAEMON_NAME} start failed"
+ rm -f ${PIDFILE}
+ fi
+ echo "Anomaly Detection Manager out at: ${OUTFILE}"
+ exit -1
+ fi
+
+ rm -f $STARTUPFILE #Deleting startup file
+ echo "Anomaly Detection Manager successfully started."
+ }
+
+function stop()
+{
+ pidfile=${PIDFILE}
+
+ if [[ -f "${pidfile}" ]]; then
+ pid=$(cat "$pidfile")
+
+ kill "${pid}" >/dev/null 2>&1
+ sleep "${STOP_TIMEOUT}"
+
+ if kill -0 "${pid}" > /dev/null 2>&1; then
+ echo "WARNING: ${DAEMON_NAME} did not stop gracefully after ${STOP_TIMEOUT} seconds: Trying to kill with kill -9"
+ kill -9 "${pid}" >/dev/null 2>&1
+ fi
+
+ if ps -p "${pid}" > /dev/null 2>&1; then
+ echo "ERROR: Unable to kill ${pid}"
+ else
+ rm -f "${pidfile}" >/dev/null 2>&1
+ fi
+ fi
+}
+
+# execute ams-env.sh
+if [[ -f "${CONF_DIR}/ams-admanager-env.sh" ]]; then
+ . "${CONF_DIR}/ams-admanager-env.sh"
+else
+ echo "ERROR: Cannot execute ${CONF_DIR}/ams-admanager-env.sh." 2>&1
+ exit 1
+fi
+
+# set these env variables only if they were not set by ams-env.sh
+: ${AMS_AD_LOG_DIR:=/var/log/ambari-metrics-anomaly-detection}
+
+# set pid dir path
+if [[ -n "${AMS_AD_PID_DIR}" ]]; then
+ PIDFILE=${AMS_AD_PID_DIR}/admanager.pid
+fi
+
+# set out file path
+if [[ -n "${AMS_AD_LOG_DIR}" ]]; then
+ OUTFILE=${AMS_AD_LOG_DIR}/ambari-metrics-admanager.out
+fi
+
+#TODO manage 3 hbase daemons for start/stop/status
+case "$1" in
+
+ start)
+ start
+
+ ;;
+ stop)
+ stop
+
+ ;;
+ status)
+ daemon_status "${PIDFILE}"
+ if [[ $? == 0 ]]; then
+ echo "AMS AD Manager is running as process $(cat "${PIDFILE}")."
+ else
+ echo "AMS AD Manager is not running."
+ fi
+ ;;
+ restart)
+ stop
+ start
+ ;;
+
+esac
diff --git a/ambari-metrics-anomaly-detection-service/conf/unix/log4j.properties b/ambari-metrics-anomaly-detection-service/conf/unix/log4j.properties
new file mode 100644
index 0000000..9dba1da
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/conf/unix/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Define some default values that can be overridden by system properties
+# Root logger option
+log4j.rootLogger=INFO,file
+
+# Direct log messages to a log file
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=/var/log/ambari-metrics-anomaly-detection/ambari-metrics-admanager.log
+log4j.appender.file.MaxFileSize=80MB
+log4j.appender.file.MaxBackupIndex=60
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L - %m%n
+
+
diff --git a/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics-anomaly-detection-service/pom.xml
index cfa8124..142f02f 100644
--- a/ambari-metrics-anomaly-detection-service/pom.xml
+++ b/ambari-metrics-anomaly-detection-service/pom.xml
@@ -135,7 +135,7 @@
<version>3.1.0</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
- <minimizeJar>true</minimizeJar>
+ <!--<minimizeJar>true</minimizeJar>-->
<filters>
<filter>
<artifact>*:*</artifact>
@@ -231,6 +231,12 @@
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>0.10.1.0</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>jackson-databind</artifactId>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
@@ -262,6 +268,10 @@
<artifactId>jersey-json</artifactId>
<groupId>com.sun.jersey</groupId>
</exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -307,7 +317,6 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
- <scope>provided</scope>
<exclusions>
<exclusion>
<groupId>commons-el</groupId>
@@ -446,7 +455,6 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>21.0</version>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
diff --git a/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml b/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
index 299a472..9402f6e 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
+++ b/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
@@ -21,10 +21,12 @@
type: external
metricDefinitionService:
- inputDefinitionDirectory: /etc/ambari-metrics-anomaly-detection/conf
+ inputDefinitionDirectory: /etc/ambari-metrics-anomaly-detection/conf/definitionDirectory
metricsCollector:
- hostPortList: host1:6188,host2:6188
+ hosts: host1,host2
+ port: 6188
+ protocol: http
metadataEndpoint: /v1/timeline/metrics/metadata/keys
adQueryService:
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
index aa20223..93f6b28 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
@@ -20,14 +20,16 @@
import javax.validation.Valid
-import org.apache.ambari.metrics.adservice.configuration._
+import org.apache.ambari.metrics.adservice.configuration.{HBaseConfiguration, _}
-import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.annotation.{JsonIgnore, JsonIgnoreProperties, JsonProperty}
+
import io.dropwizard.Configuration
/**
* Top Level AD System Manager config items.
*/
+@JsonIgnoreProperties(ignoreUnknown=true)
class AnomalyDetectionAppConfig extends Configuration {
/*
@@ -54,6 +56,7 @@
/*
HBase Conf
*/
+ @JsonIgnore
def getHBaseConf : org.apache.hadoop.conf.Configuration = {
HBaseConfiguration.getHBaseConf
}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
index 28b2880..a896563 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
@@ -17,14 +17,16 @@
*/
package org.apache.ambari.metrics.adservice.app
-import org.apache.ambari.metrics.adservice.db.MetadataDatasource
+import org.apache.ambari.metrics.adservice.db.{AdMetadataStoreAccessor, LevelDbStoreAccessor, MetadataDatasource}
import org.apache.ambari.metrics.adservice.leveldb.LevelDBDataSource
-import org.apache.ambari.metrics.adservice.resource.{AnomalyResource, RootResource}
+import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricDefinitionServiceImpl}
+import org.apache.ambari.metrics.adservice.resource.{AnomalyResource, MetricDefinitionResource, RootResource}
import org.apache.ambari.metrics.adservice.service.{ADQueryService, ADQueryServiceImpl}
import com.codahale.metrics.health.HealthCheck
import com.google.inject.AbstractModule
import com.google.inject.multibindings.Multibinder
+
import io.dropwizard.setup.Environment
class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environment) extends AbstractModule {
@@ -34,8 +36,11 @@
val healthCheckBinder = Multibinder.newSetBinder(binder(), classOf[HealthCheck])
healthCheckBinder.addBinding().to(classOf[DefaultHealthCheck])
bind(classOf[AnomalyResource])
+ bind(classOf[MetricDefinitionResource])
bind(classOf[RootResource])
+ bind(classOf[AdMetadataStoreAccessor]).to(classOf[LevelDbStoreAccessor])
bind(classOf[ADQueryService]).to(classOf[ADQueryServiceImpl])
+ bind(classOf[MetricDefinitionService]).to(classOf[MetricDefinitionServiceImpl])
bind(classOf[MetadataDatasource]).to(classOf[LevelDBDataSource])
}
}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala
index a7bbc66..a51a959 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala
@@ -19,12 +19,14 @@
import java.net.{MalformedURLException, URISyntaxException}
import org.apache.hadoop.conf.Configuration
+import org.slf4j.{Logger, LoggerFactory}
object HBaseConfiguration {
val HBASE_SITE_CONFIGURATION_FILE: String = "hbase-site.xml"
val hbaseConf: org.apache.hadoop.conf.Configuration = new Configuration(true)
var isInitialized: Boolean = false
+ val LOG : Logger = LoggerFactory.getLogger("HBaseConfiguration")
def initConfigs(): Unit = {
if (!isInitialized) {
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricCollectorConfiguration.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricCollectorConfiguration.scala
index 9418897..2530730 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricCollectorConfiguration.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricCollectorConfiguration.scala
@@ -28,13 +28,25 @@
class MetricCollectorConfiguration {
@NotNull
- private var hostPortList: String = _
+ private var hosts: String = _
+
+ @NotNull
+ private var port: String = _
+
+ @NotNull
+ private var protocol: String = _
@NotNull
private var metadataEndpoint: String = _
@JsonProperty
- def getHostPortList: String = hostPortList
+ def getHosts: String = hosts
+
+ @JsonProperty
+ def getPort: String = port
+
+ @JsonProperty
+ def getProtocol: String = protocol
@JsonProperty
def getMetadataEndpoint: String = metadataEndpoint
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricDefinitionDBConfiguration.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricDefinitionDBConfiguration.scala
index 79a350c..ef4e00c 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricDefinitionDBConfiguration.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricDefinitionDBConfiguration.scala
@@ -26,12 +26,14 @@
@NotNull
private var dbDirPath: String = _
+ private var verifyChecksums: Boolean = true
+ private var performParanoidChecks: Boolean = false
@JsonProperty("verifyChecksums")
- def verifyChecksums: Boolean = true
+ def getVerifyChecksums: Boolean = verifyChecksums
@JsonProperty("performParanoidChecks")
- def performParanoidChecks: Boolean = false
+ def getPerformParanoidChecks: Boolean = performParanoidChecks
@JsonProperty("dbDirPath")
def getDbDirPath: String = dbDirPath
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala
new file mode 100644
index 0000000..baad57d
--- /dev/null
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala
@@ -0,0 +1,56 @@
+package org.apache.ambari.metrics.adservice.db
+
+import org.apache.ambari.metrics.adservice.metadata.MetricSourceDefinition
+
+import com.google.inject.Inject
+
+class LevelDbStoreAccessor extends AdMetadataStoreAccessor{
+
+ @Inject
+ var levelDbDataSource : MetadataDatasource = _
+
+ @Inject
+ def this(levelDbDataSource: MetadataDatasource) = {
+ this
+ this.levelDbDataSource = levelDbDataSource
+ }
+
+ /**
+ * Return all saved component definitions from DB.
+ *
+ * @return
+ */
+ override def getSavedInputDefinitions: List[MetricSourceDefinition] = {
+ List.empty[MetricSourceDefinition]
+ }
+
+ /**
+ * Save a set of component definitions
+ *
+ * @param metricSourceDefinitions Set of component definitions
+ * @return Success / Failure
+ */
+override def saveInputDefinitions(metricSourceDefinitions: List[MetricSourceDefinition]): Boolean = {
+ true
+}
+
+ /**
+ * Save a component definition
+ *
+ * @param metricSourceDefinition component definition
+ * @return Success / Failure
+ */
+ override def saveInputDefinition(metricSourceDefinition: MetricSourceDefinition): Boolean = {
+ true
+ }
+
+ /**
+ * Delete a component definition
+ *
+ * @param definitionName component definition
+ * @return
+ */
+ override def removeInputDefinition(definitionName: String): Boolean = {
+ true
+ }
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
index 6d185bf..a34a60a 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
@@ -20,6 +20,8 @@
import java.io.File
+import javax.inject.Inject
+
import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
import org.apache.ambari.metrics.adservice.configuration.MetricDefinitionDBConfiguration
import org.apache.ambari.metrics.adservice.db.MetadataDatasource
@@ -29,11 +31,20 @@
import com.google.inject.Singleton
@Singleton
-class LevelDBDataSource(appConfig: AnomalyDetectionAppConfig) extends MetadataDatasource {
+class LevelDBDataSource() extends MetadataDatasource {
private var db: DB = _
@volatile var isInitialized: Boolean = false
+ var appConfig: AnomalyDetectionAppConfig = _
+
+ @Inject
+ def this(appConfig: AnomalyDetectionAppConfig) = {
+ this
+ this.appConfig = appConfig
+ initialize()
+ }
+
override def initialize(): Unit = {
if (isInitialized) return
@@ -41,8 +52,8 @@
db = createDB(new LevelDbConfig {
override val createIfMissing: Boolean = true
- override val verifyChecksums: Boolean = configuration.verifyChecksums
- override val paranoidChecks: Boolean = configuration.performParanoidChecks
+ override val verifyChecksums: Boolean = configuration.getVerifyChecksums
+ override val paranoidChecks: Boolean = configuration.getPerformParanoidChecks
override val path: String = configuration.getDbDirPath
})
isInitialized = true
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala
index 3bcf4b0..95b1b63 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala
@@ -32,7 +32,9 @@
*/
class ADMetadataProvider extends MetricMetadataProvider {
- var metricCollectorHostPorts: Array[String] = Array.empty[String]
+ var metricCollectorHosts: Array[String] = Array.empty[String]
+ var metricCollectorPort: String = _
+ var metricCollectorProtocol: String = _
var metricMetadataPath: String = "/v1/timeline/metrics/metadata/keys"
val connectTimeout: Int = 10000
@@ -42,9 +44,11 @@
def this(configuration: MetricCollectorConfiguration) {
this
- if (StringUtils.isNotEmpty(configuration.getHostPortList)) {
- metricCollectorHostPorts = configuration.getHostPortList.split(",")
+ if (StringUtils.isNotEmpty(configuration.getHosts)) {
+ metricCollectorHosts = configuration.getHosts.split(",")
}
+ metricCollectorPort = configuration.getPort
+ metricCollectorProtocol = configuration.getProtocol
metricMetadataPath = configuration.getMetadataEndpoint
}
@@ -57,8 +61,8 @@
for (metricDef <- metricSourceDefinition.metricDefinitions) {
if (metricDef.isValid) { //Skip requesting metric keys for invalid definitions.
- for (hostPort <- metricCollectorHostPorts) {
- val metricKeys: Set[MetricKey] = getKeysFromMetricsCollector(hostPort + metricMetadataPath, metricDef)
+ for (host <- metricCollectorHosts) {
+ val metricKeys: Set[MetricKey] = getKeysFromMetricsCollector(metricCollectorProtocol, host, metricCollectorPort, metricMetadataPath, metricDef)
if (metricKeys != null) {
keysMap += (metricDef -> metricKeys)
metricKeySet.++(metricKeys)
@@ -76,8 +80,9 @@
* @param metricDefinition
* @return
*/
- def getKeysFromMetricsCollector(url: String, metricDefinition: MetricDefinition): Set[MetricKey] = {
+ def getKeysFromMetricsCollector(protocol: String, host: String, port: String, path: String, metricDefinition: MetricDefinition): Set[MetricKey] = {
+ val url: String = protocol + "://" + host + port + "/" + path
val mapper = new ObjectMapper() with ScalaObjectMapper
try {
val connection = new URL(url).openConnection.asInstanceOf[HttpURLConnection]
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
index ffa9944..c34d2dd 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
@@ -19,15 +19,16 @@
import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
import org.apache.ambari.metrics.adservice.db.AdMetadataStoreAccessor
+import org.slf4j.{Logger, LoggerFactory}
import com.google.inject.{Inject, Singleton}
@Singleton
class MetricDefinitionServiceImpl extends MetricDefinitionService {
- @Inject
- var adMetadataStoreAccessor: AdMetadataStoreAccessor = _
+ val LOG : Logger = LoggerFactory.getLogger(classOf[MetricDefinitionServiceImpl])
+ var adMetadataStoreAccessor: AdMetadataStoreAccessor = _
var configuration: AnomalyDetectionAppConfig = _
var metricMetadataProvider: MetricMetadataProvider = _
@@ -36,21 +37,13 @@
var metricDefinitionMetricKeyMap: Map[MetricDefinition, Set[MetricKey]] = Map()
@Inject
- def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig) = {
+ def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig, metadataStoreAccessor: AdMetadataStoreAccessor) = {
this ()
- //TODO : Create AD Metadata instance here (or inject)
+ adMetadataStoreAccessor = metadataStoreAccessor
configuration = anomalyDetectionAppConfig
initializeService()
}
- def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig, adMetadataStoreAccessor: AdMetadataStoreAccessor) = {
- this ()
- //TODO : Create AD Metadata instance here (or inject). Pass in Schema information.
- configuration = anomalyDetectionAppConfig
- this.adMetadataStoreAccessor = adMetadataStoreAccessor
- initializeService()
- }
-
def initializeService() : Unit = {
//Create AD Metadata Schema
@@ -67,13 +60,13 @@
//Load definitions from metadata store
val definitionsFromStore: List[MetricSourceDefinition] = adMetadataStoreAccessor.getSavedInputDefinitions
for (definition <- definitionsFromStore) {
- validateAndSanitizeMetricSourceDefinition(definition)
+ sanitizeMetricSourceDefinition(definition)
}
//Load definitions from configs
val definitionsFromConfig: List[MetricSourceDefinition] = getInputDefinitionsFromConfig
for (definition <- definitionsFromConfig) {
- validateAndSanitizeMetricSourceDefinition(definition)
+ sanitizeMetricSourceDefinition(definition)
}
//Union the 2 sources, with DB taking precedence.
@@ -100,6 +93,9 @@
@Override
def getDefinitionByName(name: String): MetricSourceDefinition = {
+ if (!metricSourceDefinitionMap.contains(name)) {
+ LOG.warn("Metric Source Definition with name " + name + " not found")
+ }
metricSourceDefinitionMap.apply(name)
}
@@ -187,7 +183,13 @@
this.adMetadataStoreAccessor = adMetadataStoreAccessor
}
- def validateAndSanitizeMetricSourceDefinition(metricSourceDefinition: MetricSourceDefinition): Unit = {
+
+ /**
+ * Look into the Metric Definitions inside a Metric Source definition, and push down source level appId &
+ * hosts to Metric definition if they do not have an override.
+ * @param metricSourceDefinition Input Metric Source Definition
+ */
+ def sanitizeMetricSourceDefinition(metricSourceDefinition: MetricSourceDefinition): Unit = {
val sourceLevelAppId: String = metricSourceDefinition.appId
val sourceLevelHostList: List[String] = metricSourceDefinition.hosts
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala
index c941ac3..98ce0c4 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala
@@ -23,7 +23,7 @@
import org.joda.time.DateTime
-@Path("/topNAnomalies")
+@Path("/anomaly")
class AnomalyResource {
@GET
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala
index aacea79..16125fa 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala
@@ -17,12 +17,24 @@
package org.apache.ambari.metrics.adservice.resource
+import javax.ws.rs.{GET, Path, Produces}
+import javax.ws.rs.core.MediaType.APPLICATION_JSON
+
+import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricSourceDefinition}
+import org.apache.commons.lang.StringUtils
+
+import com.google.inject.Inject
+
+@Path("/metric-definition")
class MetricDefinitionResource {
- /*
- GET component definition
- POST component definition
- DELETE component definition
- PUT component definition
- */
+ @Inject
+ var metricDefinitionService: MetricDefinitionService = _
+
+ @GET
+ @Produces(Array(APPLICATION_JSON))
+ def getMetricDefinition (definitionName: String) : MetricSourceDefinition = {
+ null
+ }
+
}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala
index 125da34..3fc0d6f 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala
+++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.ambari.metrics.adservice.subsystem.trend
import org.apache.ambari.metrics.adservice.common.{Season, TimeRange}
diff --git a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala
index 104ccea..989ba21 100644
--- a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala
+++ b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala
@@ -44,11 +44,21 @@
assert(config.isInstanceOf[AnomalyDetectionAppConfig])
- assert(config.getMetricDefinitionServiceConfiguration.getInputDefinitionDirectory == "/etc/ambari-metrics-anomaly-detection/conf")
+ assert(config.getMetricDefinitionServiceConfiguration.getInputDefinitionDirectory ==
+ "/etc/ambari-metrics-anomaly-detection/conf/definitionDirectory")
- assert(config.getMetricCollectorConfiguration.getHostPortList == "host1:6188,host2:6188")
+ assert(config.getMetricCollectorConfiguration.getHosts == "host1,host2")
+
+ assert(config.getMetricCollectorConfiguration.getPort == "6188")
assert(config.getAdServiceConfiguration.getAnomalyDataTtl == 604800)
+
+ assert(config.getMetricDefinitionDBConfiguration.getDbDirPath == "/var/lib/ambari-metrics-anomaly-detection/")
+
+ assert(config.getMetricDefinitionDBConfiguration.getVerifyChecksums)
+
+ assert(!config.getMetricDefinitionDBConfiguration.getPerformParanoidChecks)
+
}
}
diff --git a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala
index 65cf609..2a4999c 100644
--- a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala
+++ b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala
@@ -32,10 +32,10 @@
class DefaultADResourceSpecTest extends FunSpec with Matchers {
- describe("/topNAnomalies") {
+ describe("/anomaly") {
it("Must return default message") {
withAppRunning(classOf[AnomalyDetectionApp], Resources.getResource("config.yml").getPath) { rule =>
- val json = client.target(s"http://localhost:${rule.getLocalPort}/topNAnomalies")
+ val json = client.target(s"http://localhost:${rule.getLocalPort}/anomaly")
.request().accept(APPLICATION_JSON).buildGet().invoke(classOf[String])
val now = DateTime.now.toString("MM-dd-yyyy hh:mm")
assert(json == "{\"message\":\"Anomaly Detection Service!\"," + "\"today\":\"" + now + "\"}")
diff --git a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDataSourceTest.scala b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDataSourceTest.scala
index 2ddb7b8..9757d76 100644
--- a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDataSourceTest.scala
+++ b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDataSourceTest.scala
@@ -36,8 +36,8 @@
val mdConfig : MetricDefinitionDBConfiguration = mock[MetricDefinitionDBConfiguration]
when(appConfig.getMetricDefinitionDBConfiguration).thenReturn(mdConfig)
- when(mdConfig.verifyChecksums).thenReturn(true)
- when(mdConfig.performParanoidChecks).thenReturn(false)
+ when(mdConfig.getVerifyChecksums).thenReturn(true)
+ when(mdConfig.getPerformParanoidChecks).thenReturn(false)
when(mdConfig.getDbDirPath).thenReturn(file.getAbsolutePath)
db = new LevelDBDataSource(appConfig)