AMBARI-21106 : Ambari Metrics Anomaly detection prototype.(avijayan)
diff --git a/ambari-metrics-alertservice/pom.xml b/ambari-metrics-alertservice/pom.xml
new file mode 100644
index 0000000..3a3545b
--- /dev/null
+++ b/ambari-metrics-alertservice/pom.xml
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>ambari-metrics</artifactId>
+ <groupId>org.apache.ambari</groupId>
+ <version>2.5.1.0.0</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>ambari-metrics-alertservice</artifactId>
+ <version>2.5.1.0.0</version>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <name>Ambari Metrics Alert Service</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-metrics-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.5</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.github.lucarosellini.rJava</groupId>
+ <artifactId>JRI</artifactId>
+ <version>0.9-7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_2.11</artifactId>
+ <version>2.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.10</artifactId>
+ <version>0.10.1.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.mail</groupId>
+ <artifactId>mail</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jmx</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>0.10.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-json</artifactId>
+ <version>0.10.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kafka_2.10</artifactId>
+ <version>1.6.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.10</artifactId>
+ <version>1.6.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.phoenix</groupId>
+ <artifactId>phoenix-spark</artifactId>
+ <version>4.7.0-HBase-1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-mllib_2.10</artifactId>
+ <version>1.3.0</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/AmsRTest.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/AmsRTest.java
new file mode 100644
index 0000000..0929f4c
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/AmsRTest.java
@@ -0,0 +1,130 @@
+package org.apache.ambari.metrics.alertservice.R;
+
+import org.apache.ambari.metrics.alertservice.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.common.DataSet;
+import org.apache.commons.lang.ArrayUtils;
+import org.rosuda.JRI.REXP;
+import org.rosuda.JRI.RVector;
+import org.rosuda.JRI.Rengine;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class AmsRTest {
+
+ public static void main(String[] args) {
+
+ String metricName = "TestMetric";
+ double[] ts = getTS(1000);
+
+ double[] train_ts = ArrayUtils.subarray(ts, 0,750);
+ double[] train_x = getData(750);
+ DataSet trainData = new DataSet(metricName, train_ts, train_x);
+
+ double[] test_ts = ArrayUtils.subarray(ts, 750,1000);
+ double[] test_x = getData(250);
+ test_x[50] = 5.5; //Anomaly
+ DataSet testData = new DataSet(metricName, test_ts, test_x);
+ ResultSet rs;
+
+ Map<String, String> configs = new HashMap();
+
+ System.out.println("TUKEYS");
+ configs.put("tukeys.n", "3");
+ rs = RFunctionInvoker.tukeys(trainData, testData, configs);
+ rs.print();
+ System.out.println("--------------");
+
+ System.out.println("EMA Global");
+ configs.put("ema.n", "3");
+ configs.put("ema.w", "0.8");
+ rs = RFunctionInvoker.ema_global(trainData, testData, configs);
+ rs.print();
+ System.out.println("--------------");
+
+ System.out.println("EMA Daily");
+ rs = RFunctionInvoker.ema_daily(trainData, testData, configs);
+ rs.print();
+ System.out.println("--------------");
+
+ configs.put("ks.p_value", "0.05");
+ System.out.println("KS Test");
+ rs = RFunctionInvoker.ksTest(trainData, testData, configs);
+ rs.print();
+ System.out.println("--------------");
+
+ ts = getTS(5000);
+ train_ts = ArrayUtils.subarray(ts, 30,4800);
+ train_x = getData(4800);
+ trainData = new DataSet(metricName, train_ts, train_x);
+ test_ts = ArrayUtils.subarray(ts, 4800,5000);
+ test_x = getData(200);
+ for (int i =0; i<200;i++) {
+ test_x[i] = test_x[i]*5;
+ }
+ testData = new DataSet(metricName, test_ts, test_x);
+ configs.put("hsdev.n", "3");
+ configs.put("hsdev.nhp", "3");
+ configs.put("hsdev.interval", "86400000");
+ configs.put("hsdev.period", "604800000");
+ System.out.println("HSdev");
+ rs = RFunctionInvoker.hsdev(trainData, testData, configs);
+ rs.print();
+ System.out.println("--------------");
+
+ }
+
+ static double[] getTS(int n) {
+ long currentTime = System.currentTimeMillis();
+ double[] ts = new double[n];
+ currentTime = currentTime - (currentTime % (5*60*1000));
+
+ for (int i = 0,j=n-1; i<n; i++,j--) {
+ ts[j] = currentTime;
+ currentTime = currentTime - (5*60*1000);
+ }
+ return ts;
+ }
+
+ static void testBasic() {
+ Rengine r = new Rengine(new String[]{"--no-save"}, false, null);
+ try {
+ r.eval("library(ambarimetricsAD)");
+ r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/test.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+ r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/util.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+ r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/tukeys.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+ double[] ts = getTS(5000);
+ double[] x = getData(5000);
+ r.assign("ts", ts);
+ r.assign("x", x);
+ r.eval("x[1000] <- 4.5");
+ r.eval("x[2000] <- 4.75");
+ r.eval("x[3000] <- 3.5");
+ r.eval("x[4000] <- 5.5");
+ r.eval("x[5000] <- 5.0");
+ r.eval("data <- data.frame(ts,x)");
+ r.eval("names(data) <- c(\"TS\", \"Metric\")");
+ System.out.println(r.eval("data"));
+ REXP exp = r.eval("t_an <- test_methods(data)");
+ exp = r.eval("t_an");
+ String strExp = exp.asString();
+ System.out.println("result:" + exp);
+ RVector cont = (RVector) exp.getContent();
+ double[] an_ts = cont.at(0).asDoubleArray();
+ double[] an_x = cont.at(1).asDoubleArray();
+ System.out.println("result:" + strExp);
+ }
+ finally {
+ r.end();
+ }
+ }
+ static double[] getData(int n) {
+ double[] metrics = new double[n];
+ Random random = new Random();
+ for (int i = 0; i<n; i++) {
+ metrics[i] = random.nextDouble();
+ }
+ return metrics;
+ }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java
new file mode 100644
index 0000000..8d1e520
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java
@@ -0,0 +1,180 @@
+package org.apache.ambari.metrics.alertservice.R;
+
+
+import org.apache.ambari.metrics.alertservice.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.common.DataSet;
+import org.rosuda.JRI.REXP;
+import org.rosuda.JRI.RVector;
+import org.rosuda.JRI.Rengine;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class RFunctionInvoker {
+
+ public static Rengine r = new Rengine(new String[]{"--no-save"}, false, null);
+
+
+ private static void loadDataSets(Rengine r, DataSet trainData, DataSet testData) {
+ r.assign("train_ts", trainData.ts);
+ r.assign("train_x", trainData.values);
+ r.eval("train_data <- data.frame(train_ts,train_x)");
+ r.eval("names(train_data) <- c(\"TS\", " + trainData.metricName + ")");
+
+ r.assign("test_ts", testData.ts);
+ r.assign("test_x", testData.values);
+ r.eval("test_data <- data.frame(test_ts,test_x)");
+ r.eval("names(test_data) <- c(\"TS\", " + testData.metricName + ")");
+ }
+
+
+ public static ResultSet tukeys(DataSet trainData, DataSet testData, Map<String, String> configs) {
+ try {
+ r.eval("library(ambarimetricsAD)");
+ r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/tukeys.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+
+ int n = Integer.parseInt(configs.get("tukeys.n"));
+ r.eval("n <- " + n);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an <- ams_tukeys(train_data, test_data, n)");
+ REXP exp = r.eval("an");
+ RVector cont = (RVector) exp.getContent();
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i< cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+ } catch(Exception e) {
+ e.printStackTrace();
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+
+ public static ResultSet ema_global(DataSet trainData, DataSet testData, Map<String, String> configs) {
+ try {
+ r.eval("library(ambarimetricsAD)");
+ r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/ema.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+
+ int n = Integer.parseInt(configs.get("ema.n"));
+ r.eval("n <- " + n);
+
+ double w = Double.parseDouble(configs.get("ema.w"));
+ r.eval("w <- " + w);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an <- ema_global(train_data, test_data, w, n)");
+ REXP exp = r.eval("an");
+ RVector cont = (RVector) exp.getContent();
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i< cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+
+ } catch(Exception e) {
+ e.printStackTrace();
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+
+ public static ResultSet ema_daily(DataSet trainData, DataSet testData, Map<String, String> configs) {
+ try {
+ r.eval("library(ambarimetricsAD)");
+ r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/ema.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+
+ int n = Integer.parseInt(configs.get("ema.n"));
+ r.eval("n <- " + n);
+
+ double w = Double.parseDouble(configs.get("ema.w"));
+ r.eval("w <- " + w);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an <- ema_daily(train_data, test_data, w, n)");
+ REXP exp = r.eval("an");
+ RVector cont = (RVector) exp.getContent();
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i< cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+
+ } catch(Exception e) {
+ e.printStackTrace();
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+
+ public static ResultSet ksTest(DataSet trainData, DataSet testData, Map<String, String> configs) {
+ try {
+ r.eval("library(ambarimetricsAD)");
+ r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/kstest.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+
+ double p_value = Double.parseDouble(configs.get("ks.p_value"));
+ r.eval("p_value <- " + p_value);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an <- ams_ks(train_data, test_data, p_value)");
+ REXP exp = r.eval("an");
+ RVector cont = (RVector) exp.getContent();
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i< cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+
+ } catch(Exception e) {
+ e.printStackTrace();
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+
+ public static ResultSet hsdev(DataSet trainData, DataSet testData, Map<String, String> configs) {
+ try {
+ r.eval("library(ambarimetricsAD)");
+ r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/hsdev.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+
+ int n = Integer.parseInt(configs.get("hsdev.n"));
+ r.eval("n <- " + n);
+
+ int nhp = Integer.parseInt(configs.get("hsdev.nhp"));
+ r.eval("nhp <- " + nhp);
+
+ long interval = Long.parseLong(configs.get("hsdev.interval"));
+ r.eval("interval <- " + interval);
+
+ long period = Long.parseLong(configs.get("hsdev.period"));
+ r.eval("period <- " + period);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an2 <- hsdev_daily(train_data, test_data, n, nhp, interval, period)");
+ REXP exp = r.eval("an2");
+ RVector cont = (RVector) exp.getContent();
+
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i< cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+ } catch(Exception e) {
+ e.printStackTrace();
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/DataSet.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/DataSet.java
new file mode 100644
index 0000000..47bf9b6
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/DataSet.java
@@ -0,0 +1,21 @@
+package org.apache.ambari.metrics.alertservice.common;
+
+import java.util.Arrays;
+
+public class DataSet {
+
+ public String metricName;
+ public double[] ts;
+ public double[] values;
+
+ public DataSet(String metricName, double[] ts, double[] values) {
+ this.metricName = metricName;
+ this.ts = ts;
+ this.values = values;
+ }
+
+ @Override
+ public String toString() {
+ return metricName + Arrays.toString(ts) + Arrays.toString(values);
+ }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MethodResult.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MethodResult.java
new file mode 100644
index 0000000..915da4c
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MethodResult.java
@@ -0,0 +1,10 @@
+package org.apache.ambari.metrics.alertservice.common;
+
+public abstract class MethodResult {
+ protected String methodType;
+ public abstract String prettyPrint();
+
+ public String getMethodType() {
+ return methodType;
+ }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MetricAnomaly.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MetricAnomaly.java
new file mode 100644
index 0000000..d237bee
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MetricAnomaly.java
@@ -0,0 +1,52 @@
+package org.apache.ambari.metrics.alertservice.common;
+
+public class MetricAnomaly {
+
+ private String metricKey;
+ private long timestamp;
+ private double metricValue;
+ private MethodResult methodResult;
+
+ public MetricAnomaly(String metricKey, long timestamp, double metricValue, MethodResult methodResult) {
+ this.metricKey = metricKey;
+ this.timestamp = timestamp;
+ this.metricValue = metricValue;
+ this.methodResult = methodResult;
+ }
+
+ public String getMetricKey() {
+ return metricKey;
+ }
+
+ public void setMetricName(String metricName) {
+ this.metricKey = metricName;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public double getMetricValue() {
+ return metricValue;
+ }
+
+ public void setMetricValue(double metricValue) {
+ this.metricValue = metricValue;
+ }
+
+ public MethodResult getMethodResult() {
+ return methodResult;
+ }
+
+ public void setMethodResult(MethodResult methodResult) {
+ this.methodResult = methodResult;
+ }
+
+ public String getAnomalyAsString() {
+ return metricKey + ":" + timestamp + ":" + metricValue + ":" + methodResult.prettyPrint();
+ }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/ResultSet.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/ResultSet.java
new file mode 100644
index 0000000..96b74e0
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/ResultSet.java
@@ -0,0 +1,26 @@
+package org.apache.ambari.metrics.alertservice.common;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ResultSet {
+
+ List<double[]> resultset = new ArrayList<>();
+
+ public ResultSet(List<double[]> resultset) {
+ this.resultset = resultset;
+ }
+
+ public void print() {
+ System.out.println("Result : ");
+ if (!resultset.isEmpty()) {
+ for (int i = 0; i<resultset.get(0).length;i++) {
+ for (double[] entity : resultset) {
+ System.out.print(entity[i] + " ");
+ }
+ System.out.println();
+ }
+ }
+ }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/SingleValuedTimelineMetric.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/SingleValuedTimelineMetric.java
new file mode 100644
index 0000000..5118225
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/SingleValuedTimelineMetric.java
@@ -0,0 +1,86 @@
+package org.apache.ambari.metrics.alertservice.common;
+
+
+public class SingleValuedTimelineMetric {
+ private Long timestamp;
+ private Double value;
+ private String metricName;
+ private String appId;
+ private String instanceId;
+ private String hostName;
+ private Long startTime;
+ private String type;
+
+ public void setSingleTimeseriesValue(Long timestamp, Double value) {
+ this.timestamp = timestamp;
+ this.value = value;
+ }
+
+ public SingleValuedTimelineMetric(String metricName, String appId,
+ String instanceId, String hostName,
+ long timestamp, long startTime, String type) {
+ this.metricName = metricName;
+ this.appId = appId;
+ this.instanceId = instanceId;
+ this.hostName = hostName;
+ this.timestamp = timestamp;
+ this.startTime = startTime;
+ this.type = type;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public Double getValue() {
+ return value;
+ }
+
+ public String getMetricName() {
+ return metricName;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public String getInstanceId() {
+ return instanceId;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public boolean equalsExceptTime(TimelineMetric metric) {
+ if (!metricName.equals(metric.getMetricName())) return false;
+ if (hostName != null ? !hostName.equals(metric.getHostName()) : metric.getHostName() != null)
+ return false;
+ if (appId != null ? !appId.equals(metric.getAppId()) : metric.getAppId() != null)
+ return false;
+ if (instanceId != null ? !instanceId.equals(metric.getInstanceId()) : metric.getInstanceId() != null) return false;
+
+ return true;
+ }
+
+ public TimelineMetric getTimelineMetric() {
+ TimelineMetric metric = new TimelineMetric();
+ metric.setMetricName(this.metricName);
+ metric.setAppId(this.appId);
+ metric.setHostName(this.hostName);
+ metric.setType(this.type);
+ metric.setInstanceId(this.instanceId);
+ metric.setStartTime(this.startTime);
+ metric.setTimestamp(this.timestamp);
+ metric.getMetricValues().put(timestamp, value);
+ return metric;
+ }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/StatisticUtils.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/StatisticUtils.java
new file mode 100644
index 0000000..dff56e6
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/StatisticUtils.java
@@ -0,0 +1,60 @@
+package org.apache.ambari.metrics.alertservice.common;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+public class StatisticUtils {
+
+ public static double mean(Collection<Double> values) {
+ double sum = 0;
+ for (double d : values) {
+ sum += d;
+ }
+ return sum / values.size();
+ }
+
+ public static double variance(Collection<Double> values) {
+ double avg = mean(values);
+ double variance = 0;
+ for (double d : values) {
+ variance += Math.pow(d - avg, 2.0);
+ }
+ return variance;
+ }
+
+ public static double sdev(Collection<Double> values, boolean useBesselsCorrection) {
+ double variance = variance(values);
+ int n = (useBesselsCorrection) ? values.size() - 1 : values.size();
+ return Math.sqrt(variance / n);
+ }
+
+ public static double median(Collection<Double> values) {
+ ArrayList<Double> clonedValues = new ArrayList<Double>(values);
+ Collections.sort(clonedValues);
+ int n = values.size();
+
+ if (n % 2 != 0) {
+ return clonedValues.get((n-1)/2);
+ } else {
+ return ( clonedValues.get((n-1)/2) + clonedValues.get(n/2) ) / 2;
+ }
+ }
+
+
+
+// public static void main(String[] args) {
+//
+// Collection<Double> values = new ArrayList<>();
+// values.add(1.0);
+// values.add(2.0);
+// values.add(3.0);
+// values.add(4.0);
+// values.add(5.0);
+//
+// System.out.println(mean(values));
+// System.out.println(sdev(values, false));
+// System.out.println(median(values));
+// }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetric.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetric.java
new file mode 100644
index 0000000..2a73855
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetric.java
@@ -0,0 +1,221 @@
+package org.apache.ambari.metrics.alertservice.common;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+@XmlRootElement(name = "metric")
+@XmlAccessorType(XmlAccessType.NONE)
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TimelineMetric implements Comparable<TimelineMetric>, Serializable {
+
+ private String metricName;
+ private String appId;
+ private String instanceId;
+ private String hostName;
+ private long timestamp;
+ private long startTime;
+ private String type;
+ private String units;
+ private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+ private Map<String, String> metadata = new HashMap<>();
+
+ // default
+ public TimelineMetric() {
+
+ }
+
+ public TimelineMetric(String metricName, String appId, String hostName, TreeMap<Long,Double> metricValues) {
+ this.metricName = metricName;
+ this.appId = appId;
+ this.hostName = hostName;
+ this.metricValues.putAll(metricValues);
+ }
+
+ // copy constructor
+ public TimelineMetric(TimelineMetric metric) {
+ setMetricName(metric.getMetricName());
+ setType(metric.getType());
+ setUnits(metric.getUnits());
+ setTimestamp(metric.getTimestamp());
+ setAppId(metric.getAppId());
+ setInstanceId(metric.getInstanceId());
+ setHostName(metric.getHostName());
+ setStartTime(metric.getStartTime());
+ setMetricValues(new TreeMap<Long, Double>(metric.getMetricValues()));
+ }
+
+ @XmlElement(name = "metricname")
+ public String getMetricName() {
+ return metricName;
+ }
+
+ public void setMetricName(String metricName) {
+ this.metricName = metricName;
+ }
+
+ @XmlElement(name = "appid")
+ public String getAppId() {
+ return appId;
+ }
+
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+
+ @XmlElement(name = "instanceid")
+ public String getInstanceId() {
+ return instanceId;
+ }
+
+ public void setInstanceId(String instanceId) {
+ this.instanceId = instanceId;
+ }
+
+ @XmlElement(name = "hostname")
+ public String getHostName() {
+ return hostName;
+ }
+
+ public void setHostName(String hostName) {
+ this.hostName = hostName;
+ }
+
+ @XmlElement(name = "timestamp")
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @XmlElement(name = "starttime")
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ @XmlElement(name = "type", defaultValue = "UNDEFINED")
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ @XmlElement(name = "units")
+ public String getUnits() {
+ return units;
+ }
+
+ public void setUnits(String units) {
+ this.units = units;
+ }
+
+ @XmlElement(name = "metrics")
+ public TreeMap<Long, Double> getMetricValues() {
+ return metricValues;
+ }
+
+ public void setMetricValues(TreeMap<Long, Double> metricValues) {
+ this.metricValues = metricValues;
+ }
+
+ public void addMetricValues(Map<Long, Double> metricValues) {
+ this.metricValues.putAll(metricValues);
+ }
+
+ @XmlElement(name = "metadata")
+ public Map<String,String> getMetadata () {
+ return metadata;
+ }
+
+ public void setMetadata (Map<String,String> metadata) {
+ this.metadata = metadata;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TimelineMetric metric = (TimelineMetric) o;
+
+ if (!metricName.equals(metric.metricName)) return false;
+ if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
+ return false;
+ if (appId != null ? !appId.equals(metric.appId) : metric.appId != null)
+ return false;
+ if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+ return false;
+ if (timestamp != metric.timestamp) return false;
+ if (startTime != metric.startTime) return false;
+
+ return true;
+ }
+
+ public boolean equalsExceptTime(TimelineMetric metric) {
+ if (!metricName.equals(metric.metricName)) return false;
+ if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
+ return false;
+ if (appId != null ? !appId.equals(metric.appId) : metric.appId != null)
+ return false;
+ if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = metricName.hashCode();
+ result = 31 * result + (appId != null ? appId.hashCode() : 0);
+ result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+ result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
+ result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+ return result;
+ }
+
+ @Override
+ public int compareTo(TimelineMetric other) {
+ if (timestamp > other.timestamp) {
+ return -1;
+ } else if (timestamp < other.timestamp) {
+ return 1;
+ } else {
+ return metricName.compareTo(other.metricName);
+ }
+ }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetrics.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetrics.java
new file mode 100644
index 0000000..500e1e9
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetrics.java
@@ -0,0 +1,112 @@
+package org.apache.ambari.metrics.alertservice.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The class that hosts a list of timeline entities.
+ */
+@XmlRootElement(name = "metrics")
+@XmlAccessorType(XmlAccessType.NONE)
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TimelineMetrics implements Serializable {
+
+ private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>();
+
+ public TimelineMetrics() {}
+
+ @XmlElement(name = "metrics")
+ public List<TimelineMetric> getMetrics() {
+ return allMetrics;
+ }
+
+ public void setMetrics(List<TimelineMetric> allMetrics) {
+ this.allMetrics = allMetrics;
+ }
+
+ private boolean isEqualTimelineMetrics(TimelineMetric metric1,
+ TimelineMetric metric2) {
+
+ boolean isEqual = true;
+
+ if (!metric1.getMetricName().equals(metric2.getMetricName())) {
+ return false;
+ }
+
+ if (metric1.getHostName() != null) {
+ isEqual = metric1.getHostName().equals(metric2.getHostName());
+ }
+
+ if (metric1.getAppId() != null) {
+ isEqual = metric1.getAppId().equals(metric2.getAppId());
+ }
+
+ return isEqual;
+ }
+
+ /**
+ * Merge with existing TimelineMetric if everything except startTime is
+ * the same.
+ * @param metric {@link TimelineMetric}
+ */
+ public void addOrMergeTimelineMetric(TimelineMetric metric) {
+ TimelineMetric metricToMerge = null;
+
+ if (!allMetrics.isEmpty()) {
+ for (TimelineMetric timelineMetric : allMetrics) {
+ if (timelineMetric.equalsExceptTime(metric)) {
+ metricToMerge = timelineMetric;
+ break;
+ }
+ }
+ }
+
+ if (metricToMerge != null) {
+ metricToMerge.addMetricValues(metric.getMetricValues());
+ if (metricToMerge.getTimestamp() > metric.getTimestamp()) {
+ metricToMerge.setTimestamp(metric.getTimestamp());
+ }
+ if (metricToMerge.getStartTime() > metric.getStartTime()) {
+ metricToMerge.setStartTime(metric.getStartTime());
+ }
+ } else {
+ allMetrics.add(metric);
+ }
+ }
+
+ // Optimization that addresses too many TreeMaps from getting created.
+ public void addOrMergeTimelineMetric(SingleValuedTimelineMetric metric) {
+ TimelineMetric metricToMerge = null;
+
+ if (!allMetrics.isEmpty()) {
+ for (TimelineMetric timelineMetric : allMetrics) {
+ if (metric.equalsExceptTime(timelineMetric)) {
+ metricToMerge = timelineMetric;
+ break;
+ }
+ }
+ }
+
+ if (metricToMerge != null) {
+ metricToMerge.getMetricValues().put(metric.getTimestamp(), metric.getValue());
+ if (metricToMerge.getTimestamp() > metric.getTimestamp()) {
+ metricToMerge.setTimestamp(metric.getTimestamp());
+ }
+ if (metricToMerge.getStartTime() > metric.getStartTime()) {
+ metricToMerge.setStartTime(metric.getStartTime());
+ }
+ } else {
+ allMetrics.add(metric.getTimelineMetric());
+ }
+ }
+}
+
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/MetricAnomalyModel.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/MetricAnomalyModel.java
new file mode 100644
index 0000000..7ae91a3
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/MetricAnomalyModel.java
@@ -0,0 +1,12 @@
+package org.apache.ambari.metrics.alertservice.methods;
+
+import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
+
+import java.util.List;
+
+public interface MetricAnomalyModel {
+
+ public List<MetricAnomaly> onNewMetric(TimelineMetric metric);
+ public List<MetricAnomaly> test(TimelineMetric metric);
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaDS.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaDS.java
new file mode 100644
index 0000000..ec548c8
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaDS.java
@@ -0,0 +1,56 @@
+package org.apache.ambari.metrics.alertservice.methods.ema;
+
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+
+@XmlRootElement
+public class EmaDS implements Serializable {
+
+ String metricName;
+ String appId;
+ String hostname;
+ double ema;
+ double ems;
+ double weight;
+ int timessdev;
+ private static final Log LOG = LogFactory.getLog(EmaDS.class);
+
+ public EmaDS(String metricName, String appId, String hostname, double weight, int timessdev) {
+ this.metricName = metricName;
+ this.appId = appId;
+ this.hostname = hostname;
+ this.weight = weight;
+ this.timessdev = timessdev;
+ this.ema = 0.0;
+ this.ems = 0.0;
+ }
+
+
+ public EmaResult testAndUpdate(double metricValue) {
+
+ double diff = Math.abs(ema - metricValue) - (timessdev * ems);
+
+ ema = weight * ema + (1 - weight) * metricValue;
+ ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0));
+
+ System.out.println(ema + ", " + ems);
+ LOG.info(ema + ", " + ems);
+ return diff > 0 ? new EmaResult(diff) : null;
+ }
+
+ public void update(double metricValue) {
+ ema = weight * ema + (1 - weight) * metricValue;
+ ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0));
+ System.out.println(ema + ", " + ems);
+ LOG.info(ema + ", " + ems);
+ }
+
+ public EmaResult test(double metricValue) {
+ double diff = Math.abs(ema - metricValue) - (timessdev * ems);
+ return diff > 0 ? new EmaResult(diff) : null;
+ }
+
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModel.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModel.java
new file mode 100644
index 0000000..4aae543
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModel.java
@@ -0,0 +1,114 @@
+package org.apache.ambari.metrics.alertservice.methods.ema;
+
+import com.google.gson.Gson;
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.ambari.metrics.alertservice.common.MethodResult;
+import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
+import org.apache.ambari.metrics.alertservice.methods.MetricAnomalyModel;
+import org.apache.spark.SparkContext;
+import org.apache.spark.mllib.util.Saveable;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@XmlRootElement
+public class EmaModel implements MetricAnomalyModel, Saveable, Serializable {
+
+ @XmlElement(name = "trackedEmas")
+ private Map<String, EmaDS> trackedEmas = new HashMap<>();
+ private static final Log LOG = LogFactory.getLog(EmaModel.class);
+
+ public List<MetricAnomaly> onNewMetric(TimelineMetric metric) {
+
+ String metricName = metric.getMetricName();
+ String appId = metric.getAppId();
+ String hostname = metric.getHostName();
+ String key = metricName + "_" + appId + "_" + hostname;
+ List<MetricAnomaly> anomalies = new ArrayList<>();
+
+ if (!trackedEmas.containsKey(metricName)) {
+ trackedEmas.put(key, new EmaDS(metricName, appId, hostname, 0.8, 3));
+ }
+
+ EmaDS emaDS = trackedEmas.get(key);
+ for (Long timestamp : metric.getMetricValues().keySet()) {
+ double metricValue = metric.getMetricValues().get(timestamp);
+ MethodResult result = emaDS.testAndUpdate(metricValue);
+ if (result != null) {
+ MetricAnomaly metricAnomaly = new MetricAnomaly(key,timestamp, metricValue, result);
+ anomalies.add(metricAnomaly);
+ }
+ }
+ return anomalies;
+ }
+
+ public EmaDS train(TimelineMetric metric, double weight, int timessdev) {
+
+ String metricName = metric.getMetricName();
+ String appId = metric.getAppId();
+ String hostname = metric.getHostName();
+ String key = metricName + "_" + appId + "_" + hostname;
+
+ EmaDS emaDS = new EmaDS(metric.getMetricName(), metric.getAppId(), metric.getHostName(), weight, timessdev);
+ LOG.info("In EMA Train step");
+ for (Long timestamp : metric.getMetricValues().keySet()) {
+ System.out.println(timestamp + " : " + metric.getMetricValues().get(timestamp));
+ LOG.info(timestamp + " : " + metric.getMetricValues().get(timestamp));
+ emaDS.update(metric.getMetricValues().get(timestamp));
+ }
+ trackedEmas.put(key, emaDS);
+ return emaDS;
+ }
+
+ public List<MetricAnomaly> test(TimelineMetric metric) {
+ String metricName = metric.getMetricName();
+ String appId = metric.getAppId();
+ String hostname = metric.getHostName();
+ String key = metricName + "_" + appId + "_" + hostname;
+
+ EmaDS emaDS = trackedEmas.get(key);
+
+ if (emaDS == null) {
+ return new ArrayList<>();
+ }
+
+ List<MetricAnomaly> anomalies = new ArrayList<>();
+
+ for (Long timestamp : metric.getMetricValues().keySet()) {
+ double metricValue = metric.getMetricValues().get(timestamp);
+ MethodResult result = emaDS.test(metricValue);
+ if (result != null) {
+ MetricAnomaly metricAnomaly = new MetricAnomaly(key,timestamp, metricValue, result);
+ anomalies.add(metricAnomaly);
+ }
+ }
+ return anomalies;
+ }
+
+ @Override
+ public void save(SparkContext sc, String path) {
+ Gson gson = new Gson();
+ try {
+ String json = gson.toJson(this);
+ try (Writer writer = new BufferedWriter(new OutputStreamWriter(
+ new FileOutputStream(path), "utf-8"))) {
+ writer.write(json);
+ } } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+
+ @Override
+ public String formatVersion() {
+ return "1.0";
+ }
+
+}
+
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModelLoader.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModelLoader.java
new file mode 100644
index 0000000..f0ef340
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModelLoader.java
@@ -0,0 +1,29 @@
+package org.apache.ambari.metrics.alertservice.methods.ema;
+
+import com.google.gson.Gson;
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.spark.SparkContext;
+import org.apache.spark.mllib.util.Loader;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+public class EmaModelLoader implements Loader<EmaModel> {
+ private static final Log LOG = LogFactory.getLog(EmaModelLoader.class);
+
+ @Override
+ public EmaModel load(SparkContext sc, String path) {
+ Gson gson = new Gson();
+ try {
+ String fileString = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
+ return gson.fromJson(fileString, EmaModel.class);
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ return null;
+ }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaResult.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaResult.java
new file mode 100644
index 0000000..23f1793
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaResult.java
@@ -0,0 +1,19 @@
+package org.apache.ambari.metrics.alertservice.methods.ema;
+
+import org.apache.ambari.metrics.alertservice.common.MethodResult;
+
+public class EmaResult extends MethodResult{
+
+ double diff;
+
+ public EmaResult(double diff) {
+ this.methodType = "EMA";
+ this.diff = diff;
+ }
+
+
+ @Override
+ public String prettyPrint() {
+ return methodType + "(` = " + diff + ")";
+ }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/TestEmaModel.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/TestEmaModel.java
new file mode 100644
index 0000000..a090786
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/TestEmaModel.java
@@ -0,0 +1,51 @@
+package org.apache.ambari.metrics.alertservice.methods.ema;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
+import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class TestEmaModel {
+
+ public static void main(String[] args) throws IOException {
+
+ long now = System.currentTimeMillis();
+ TimelineMetric metric1 = new TimelineMetric();
+ metric1.setMetricName("dummy_metric");
+ metric1.setHostName("dummy_host");
+ metric1.setTimestamp(now);
+ metric1.setStartTime(now - 1000);
+ metric1.setAppId("HOST");
+ metric1.setType("Integer");
+
+ TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+
+ for (int i = 0; i<20;i++) {
+ double metric = 9 + Math.random();
+ metricValues.put(now - i*100, metric);
+ }
+ metric1.setMetricValues(metricValues);
+
+ EmaModel emaModel = new EmaModel();
+
+ emaModel.train(metric1, 0.8, 3);
+ }
+
+ /*
+ {{
+ put(now - 100, 1.20);
+ put(now - 200, 1.25);
+ put(now - 300, 1.30);
+ put(now - 400, 4.50);
+ put(now - 500, 1.35);
+ put(now - 400, 5.50);
+ }}
+ */
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java
new file mode 100644
index 0000000..de56825
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java
@@ -0,0 +1,75 @@
+package org.apache.ambari.metrics.alertservice.spark;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
+import org.apache.ambari.metrics.alertservice.common.TimelineMetrics;
+import org.apache.kafka.clients.producer.*;
+
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class AmsKafkaProducer {
+
+ Producer producer;
+ private static String topicName = "ambari-metrics-topic";
+
+ public AmsKafkaProducer(String kafkaServers) {
+ Properties configProperties = new Properties();
+ configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); //"avijayan-ams-2.openstacklocal:6667"
+ configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
+ configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer");
+ producer = new KafkaProducer(configProperties);
+ }
+
+ public void sendMetrics(TimelineMetrics timelineMetrics) throws InterruptedException, ExecutionException {
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ JsonNode jsonNode = objectMapper.valueToTree(timelineMetrics);
+ ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, JsonNode>(topicName,jsonNode);
+ Future<RecordMetadata> kafkaFuture = producer.send(rec);
+
+ System.out.println(kafkaFuture.isDone());
+ System.out.println(kafkaFuture.get().topic());
+ }
+
+ public static void main(String[] args) throws ExecutionException, InterruptedException {
+ final long now = System.currentTimeMillis();
+
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ TimelineMetric metric1 = new TimelineMetric();
+ metric1.setMetricName("mem_free");
+ metric1.setHostName("avijayan-ams-3.openstacklocal");
+ metric1.setTimestamp(now);
+ metric1.setStartTime(now - 1000);
+ metric1.setAppId("HOST");
+ metric1.setType("Integer");
+
+ TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+
+ for (int i = 0; i<20;i++) {
+ double metric = 20000 + Math.random();
+ metricValues.put(now - i*100, metric);
+ }
+
+ metric1.setMetricValues(metricValues);
+
+// metric1.setMetricValues(new TreeMap<Long, Double>() {{
+// put(now - 100, 1.20);
+// put(now - 200, 11.25);
+// put(now - 300, 1.30);
+// put(now - 400, 4.50);
+// put(now - 500, 16.35);
+// put(now - 400, 5.50);
+// }});
+
+ timelineMetrics.getMetrics().add(metric1);
+
+ for (int i = 0; i<1; i++) {
+ new AmsKafkaProducer("avijayan-ams-2.openstacklocal:6667").sendMetrics(timelineMetrics);
+ Thread.sleep(1000);
+ }
+ }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java
new file mode 100644
index 0000000..5a6bb61
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java
@@ -0,0 +1,181 @@
+package org.apache.ambari.metrics.alertservice.spark;
+
+import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
+import org.apache.ambari.metrics.alertservice.common.TimelineMetrics;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.*;
+
+public class AnomalyMetricPublisher implements Serializable {
+
+ private String hostName = "UNKNOWN.example.com";
+ private String instanceId = null;
+ private String serviceName = "anomaly-engine";
+ private String collectorHost;
+ private String protocol;
+ private String port;
+ private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics";
+ private static final Log LOG = LogFactory.getLog(AnomalyMetricPublisher.class);
+ private static ObjectMapper mapper;
+
+ static {
+ mapper = new ObjectMapper();
+ AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+ mapper.setAnnotationIntrospector(introspector);
+ mapper.getSerializationConfig()
+ .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+ }
+
+ public AnomalyMetricPublisher(String collectorHost, String protocol, String port) {
+ this.collectorHost = collectorHost;
+ this.protocol = protocol;
+ this.port = port;
+ this.hostName = getDefaultLocalHostName();
+ }
+
+ private String getDefaultLocalHostName() {
+ try {
+ return InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ LOG.info("Error getting host address");
+ }
+ return null;
+ }
+
+ public void publish(List<MetricAnomaly> metricAnomalies) {
+ LOG.info("Sending metric anomalies of size : " + metricAnomalies.size());
+ List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies);
+ LOG.info("Sending TimelineMetric list of size : " + metricList.size());
+ if (!metricList.isEmpty()) {
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.setMetrics(metricList);
+ emitMetrics(timelineMetrics);
+ }
+ }
+
+ private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) {
+ List<TimelineMetric> metrics = new ArrayList<>();
+
+ if (metricAnomalies.isEmpty()) {
+ return metrics;
+ }
+
+ long currentTime = System.currentTimeMillis();
+ MetricAnomaly prevAnomaly = metricAnomalies.get(0);
+
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(prevAnomaly.getMetricKey() + "_" + prevAnomaly.getMethodResult().getMethodType());
+ timelineMetric.setAppId(serviceName);
+ timelineMetric.setInstanceId(instanceId);
+ timelineMetric.setHostName(hostName);
+ timelineMetric.setStartTime(currentTime);
+
+ TreeMap<Long,Double> metricValues = new TreeMap<>();
+ metricValues.put(prevAnomaly.getTimestamp(), prevAnomaly.getMetricValue());
+ MetricAnomaly currentAnomaly;
+
+ for (int i = 1; i < metricAnomalies.size(); i++) {
+ currentAnomaly = metricAnomalies.get(i);
+ if (currentAnomaly.getMetricKey().equals(prevAnomaly.getMetricKey())) {
+ metricValues.put(currentAnomaly.getTimestamp(), currentAnomaly.getMetricValue());
+ } else {
+ timelineMetric.setMetricValues(metricValues);
+ metrics.add(timelineMetric);
+
+ timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(currentAnomaly.getMetricKey() + "_" + currentAnomaly.getMethodResult().getMethodType());
+ timelineMetric.setAppId(serviceName);
+ timelineMetric.setInstanceId(instanceId);
+ timelineMetric.setHostName(hostName);
+ timelineMetric.setStartTime(currentTime);
+ metricValues = new TreeMap<>();
+ metricValues.put(currentAnomaly.getTimestamp(), currentAnomaly.getMetricValue());
+ prevAnomaly = currentAnomaly;
+ }
+ }
+
+ timelineMetric.setMetricValues(metricValues);
+ metrics.add(timelineMetric);
+ return metrics;
+ }
+
+ private boolean emitMetrics(TimelineMetrics metrics) {
+ String connectUrl = constructTimelineMetricUri();
+ String jsonData = null;
+ LOG.info("EmitMetrics connectUrl = " + connectUrl);
+ try {
+ jsonData = mapper.writeValueAsString(metrics);
+ } catch (IOException e) {
+ LOG.error("Unable to parse metrics", e);
+ }
+ if (jsonData != null) {
+ return emitMetricsJson(connectUrl, jsonData);
+ }
+ return false;
+ }
+
+ private HttpURLConnection getConnection(String spec) throws IOException {
+ return (HttpURLConnection) new URL(spec).openConnection();
+ }
+
+ private boolean emitMetricsJson(String connectUrl, String jsonData) {
+ LOG.info("Metrics Data : " + jsonData);
+ int timeout = 10000;
+ HttpURLConnection connection = null;
+ try {
+ if (connectUrl == null) {
+ throw new IOException("Unknown URL. Unable to connect to metrics collector.");
+ }
+ connection = getConnection(connectUrl);
+
+ connection.setRequestMethod("POST");
+ connection.setRequestProperty("Content-Type", "application/json");
+ connection.setRequestProperty("Connection", "Keep-Alive");
+ connection.setConnectTimeout(timeout);
+ connection.setReadTimeout(timeout);
+ connection.setDoOutput(true);
+
+ if (jsonData != null) {
+ try (OutputStream os = connection.getOutputStream()) {
+ os.write(jsonData.getBytes("UTF-8"));
+ }
+ }
+
+ int statusCode = connection.getResponseCode();
+
+ if (statusCode != 200) {
+ LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
+ "statusCode = " + statusCode);
+ } else {
+ LOG.info("Metrics posted to Collector " + connectUrl);
+ }
+ return true;
+ } catch (IOException ioe) {
+ LOG.error(ioe.getMessage());
+ }
+ return false;
+ }
+
+ private String constructTimelineMetricUri() {
+ StringBuilder sb = new StringBuilder(protocol);
+ sb.append("://");
+ sb.append(collectorHost);
+ sb.append(":");
+ sb.append(port);
+ sb.append(WS_V1_TIMELINE_METRICS);
+ return sb.toString();
+ }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java
new file mode 100644
index 0000000..ab87a95
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java
@@ -0,0 +1,134 @@
+package org.apache.ambari.metrics.alertservice.spark;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
+import org.apache.ambari.metrics.alertservice.common.TimelineMetrics;
+import org.apache.ambari.metrics.alertservice.methods.ema.EmaModel;
+import org.apache.ambari.metrics.alertservice.methods.MetricAnomalyModel;
+import org.apache.ambari.metrics.alertservice.methods.ema.EmaModelLoader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+import scala.Tuple2;
+
+import java.util.*;
+
+public class MetricAnomalyDetector {
+
+ private static final Log LOG = LogFactory.getLog(MetricAnomalyDetector.class);
+ private static String groupId = "ambari-metrics-group";
+ private static String topicName = "ambari-metrics-topic";
+ private static int numThreads = 1;
+
+ //private static String zkQuorum = "avijayan-ams-1.openstacklocal:2181,avijayan-ams-2.openstacklocal:2181,avijayan-ams-3.openstacklocal:2181";
+ //private static Map<String, String> kafkaParams = new HashMap<>();
+ //static {
+ // kafkaParams.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "avijayan-ams-2.openstacklocal:6667");
+ // kafkaParams.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ // kafkaParams.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonSerializer");
+ // kafkaParams.put("metadata.broker.list", "avijayan-ams-2.openstacklocal:6667");
+ //}
+
+ public MetricAnomalyDetector() {
+ }
+
+ public static void main(String[] args) throws InterruptedException {
+
+
+ if (args.length < 6) {
+ System.err.println("Usage: MetricAnomalyDetector <method1,method2> <appid1,appid2> <collector_host> <port> <protocol> <zkQuorum>");
+ System.exit(1);
+ }
+
+ List<String> appIds = Arrays.asList(args[1].split(","));
+ String collectorHost = args[2];
+ String collectorPort = args[3];
+ String collectorProtocol = args[4];
+ String zkQuorum = args[5];
+
+ List<MetricAnomalyModel> anomalyDetectionModels = new ArrayList<>();
+ AnomalyMetricPublisher anomalyMetricPublisher = new AnomalyMetricPublisher(collectorHost, collectorProtocol, collectorPort);
+
+ SparkConf sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector");
+
+ JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
+
+ for (String method : args[0].split(",")) {
+ if (method.equals("ema")) {
+ LOG.info("Model EMA requested.");
+ EmaModel emaModel = new EmaModelLoader().load(jssc.sparkContext().sc(), "/tmp/model/ema");
+ anomalyDetectionModels.add(emaModel);
+ }
+ }
+
+ JavaPairReceiverInputDStream<String, String> messages =
+ KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads));
+
+ //Convert JSON string to TimelineMetrics.
+ JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() {
+ @Override
+ public TimelineMetrics call(Tuple2<String, String> message) throws Exception {
+ LOG.info(message._2());
+ ObjectMapper mapper = new ObjectMapper();
+ TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class);
+ return metrics;
+ }
+ });
+
+ //Group TimelineMetric by AppId.
+ JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair(
+ timelineMetrics -> new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(),timelineMetrics)
+ );
+
+ appMetricStream.print();
+
+ //Filter AppIds that are not needed.
+ JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() {
+ @Override
+ public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception {
+ return appIds.contains(appMetricTuple._1);
+ }
+ });
+
+ filteredAppMetricStream.print();
+
+ filteredAppMetricStream.foreachRDD(rdd -> {
+ rdd.foreach(
+ tuple2 -> {
+ TimelineMetrics metrics = tuple2._2();
+ LOG.info("Received Metric : " + metrics.getMetrics().get(0).getMetricName());
+ for (TimelineMetric metric : metrics.getMetrics()) {
+
+ TimelineMetric timelineMetric =
+ new TimelineMetric(metric.getMetricName(), metric.getAppId(), metric.getHostName(), metric.getMetricValues());
+ LOG.info("Models size : " + anomalyDetectionModels.size());
+
+ for (MetricAnomalyModel model : anomalyDetectionModels) {
+ LOG.info("Testing against Model : " + model.getClass().getCanonicalName());
+ List<MetricAnomaly> anomalies = model.test(timelineMetric);
+ anomalyMetricPublisher.publish(anomalies);
+ for (MetricAnomaly anomaly : anomalies) {
+ LOG.info(anomaly.getAnomalyAsString());
+ }
+
+ }
+ }
+ });
+ });
+
+ jssc.start();
+ jssc.awaitTermination();
+ }
+}
+
+
+
+
diff --git a/ambari-metrics-spark/pom.xml b/ambari-metrics-spark/pom.xml
new file mode 100644
index 0000000..33b4257
--- /dev/null
+++ b/ambari-metrics-spark/pom.xml
@@ -0,0 +1,133 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <parent>
+ <artifactId>ambari-metrics</artifactId>
+ <groupId>org.apache.ambari</groupId>
+ <version>2.5.1.0.0</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>ambari-metrics-spark</artifactId>
+ <version>2.5.1.0.0</version>
+ <properties>
+ <scala.version>2.10.4</scala.version>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>scala-tools.org</id>
+ <name>Scala-Tools Maven2 Repository</name>
+ <url>http://scala-tools.org/repo-releases</url>
+ </repository>
+ </repositories>
+
+ <pluginRepositories>
+ <pluginRepository>
+ <id>scala-tools.org</id>
+ <name>Scala-Tools Maven2 Repository</name>
+ <url>http://scala-tools.org/repo-releases</url>
+ </pluginRepository>
+ </pluginRepositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.specs</groupId>
+ <artifactId>specs</artifactId>
+ <version>1.2.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.10</artifactId>
+ <version>1.6.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.10</artifactId>
+ <version>1.6.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.phoenix</groupId>
+ <artifactId>phoenix-spark</artifactId>
+ <version>4.7.0-HBase-1.1</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-metrics-alertservice</artifactId>
+ <version>2.5.1.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api-scala_2.10</artifactId>
+ <version>2.8.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-mllib_2.10</artifactId>
+ <version>2.1.1</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/scala</sourceDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <scalaVersion>${scala.version}</scalaVersion>
+ <args>
+ <arg>-target:jvm-1.5</arg>
+ </args>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-eclipse-plugin</artifactId>
+ <configuration>
+ <downloadSources>true</downloadSources>
+ <buildcommands>
+ <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
+ </buildcommands>
+ <additionalProjectnatures>
+ <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
+ </additionalProjectnatures>
+ <classpathContainers>
+ <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+ <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
+ </classpathContainers>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <configuration>
+ <scalaVersion>${scala.version}</scalaVersion>
+ </configuration>
+ </plugin>
+ </plugins>
+ </reporting>
+</project>
diff --git a/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala b/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
new file mode 100644
index 0000000..d4ed31a
--- /dev/null
+++ b/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
@@ -0,0 +1,97 @@
+package org.apache.ambari.metrics.spark
+
+
+import java.util
+import java.util.logging.LogManager
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.kafka._
+import org.apache.ambari.metrics.alertservice.common.{MetricAnomaly, TimelineMetrics}
+import org.apache.ambari.metrics.alertservice.methods.MetricAnomalyModel
+import org.apache.ambari.metrics.alertservice.methods.ema.{EmaModel, EmaModelLoader}
+import org.apache.ambari.metrics.alertservice.spark.AnomalyMetricPublisher
+import org.apache.log4j.Logger
+import org.apache.spark.storage.StorageLevel
+
+import scala.collection.JavaConversions._
+import org.apache.logging.log4j.scala.Logging
+
+object MetricAnomalyDetector extends Logging {
+
+
+ var zkQuorum = "avijayan-ams-1.openstacklocal:2181,avijayan-ams-2.openstacklocal:2181,avijayan-ams-3.openstacklocal:2181"
+ var groupId = "ambari-metrics-group"
+ var topicName = "ambari-metrics-topic"
+ var numThreads = 1
+ val anomalyDetectionModels: Array[MetricAnomalyModel] = Array[MetricAnomalyModel]()
+
+ def main(args: Array[String]): Unit = {
+
+ @transient
+ lazy val log: Logger = org.apache.log4j.LogManager.getLogger("MetricAnomalyDetectorLogger")
+
+ if (args.length < 5) {
+ System.err.println("Usage: MetricAnomalyDetector <method1,method2> <appid1,appid2> <collector_host> <port> <protocol>")
+ System.exit(1)
+ }
+
+ for (method <- args(0).split(",")) {
+ if (method == "ema") anomalyDetectionModels :+ new EmaModel()
+ }
+
+ val appIds = util.Arrays.asList(args(1).split(","))
+
+ val collectorHost = args(2)
+ val collectorPort = args(3)
+ val collectorProtocol = args(4)
+
+ val anomalyMetricPublisher: AnomalyMetricPublisher = new AnomalyMetricPublisher(collectorHost, collectorProtocol, collectorPort)
+
+ val sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector")
+
+ val streamingContext = new StreamingContext(sparkConf, Duration(10000))
+
+ val emaModel = new EmaModelLoader().load(streamingContext.sparkContext, "/tmp/model/ema")
+
+ val kafkaStream = KafkaUtils.createStream(streamingContext, zkQuorum, groupId, Map(topicName -> numThreads), StorageLevel.MEMORY_AND_DISK_SER_2)
+ kafkaStream.print()
+
+ var timelineMetricsStream = kafkaStream.map( message => {
+ val mapper = new ObjectMapper
+ val metrics = mapper.readValue(message._2, classOf[TimelineMetrics])
+ metrics
+ })
+ timelineMetricsStream.print()
+
+ var appMetricStream = timelineMetricsStream.map( timelineMetrics => {
+ (timelineMetrics.getMetrics.get(0).getAppId, timelineMetrics)
+ })
+ appMetricStream.print()
+
+ var filteredAppMetricStream = appMetricStream.filter( appMetricTuple => {
+ appIds.contains(appMetricTuple._1)
+ } )
+ filteredAppMetricStream.print()
+
+ filteredAppMetricStream.foreachRDD( rdd => {
+ rdd.foreach( appMetricTuple => {
+ val timelineMetrics = appMetricTuple._2
+ logger.info("Received Metric (1): " + timelineMetrics.getMetrics.get(0).getMetricName)
+ log.info("Received Metric (2): " + timelineMetrics.getMetrics.get(0).getMetricName)
+ for (timelineMetric <- timelineMetrics.getMetrics) {
+ var anomalies = emaModel.test(timelineMetric)
+ anomalyMetricPublisher.publish(anomalies)
+ for (anomaly <- anomalies) {
+ var an = anomaly : MetricAnomaly
+ logger.info(an.getAnomalyAsString)
+ }
+ }
+ })
+ })
+
+ streamingContext.start()
+ streamingContext.awaitTermination()
+ }
+ }
diff --git a/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala b/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
new file mode 100644
index 0000000..5ca7b17
--- /dev/null
+++ b/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
@@ -0,0 +1,67 @@
+package org.apache.ambari.metrics.spark
+
+import org.apache.ambari.metrics.alertservice.common.TimelineMetric
+import org.apache.ambari.metrics.alertservice.methods.ema.EmaModel
+import org.apache.spark.mllib.stat.Statistics
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.rdd.RDD
+
+object SparkPhoenixReader {
+
+ def main(args: Array[String]) {
+
+ if (args.length < 6) {
+ System.err.println("Usage: SparkPhoenixReader <metric_name> <appId> <hostname> <weight> <timessdev> <phoenixConnectionString> <model_dir>")
+ System.exit(1)
+ }
+
+ var metricName = args(0)
+ var appId = args(1)
+ var hostname = args(2)
+ var weight = args(3).toDouble
+ var timessdev = args(4).toInt
+ var phoenixConnectionString = args(5) //avijayan-ams-3.openstacklocal:61181:/ams-hbase-unsecure
+ var modelDir = args(6)
+
+ val conf = new SparkConf()
+ conf.set("spark.app.name", "AMSAnomalyModelBuilder")
+ //conf.set("spark.master", "spark://avijayan-ams-2.openstacklocal:7077")
+
+ var sc = new SparkContext(conf)
+ val sqlContext = new SQLContext(sc)
+
+ val currentTime = System.currentTimeMillis()
+ val oneDayBack = currentTime - 24*60*60*1000
+
+ val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "METRIC_RECORD", "zkUrl" -> phoenixConnectionString))
+ df.registerTempTable("METRIC_RECORD")
+ val result = sqlContext.sql("SELECT METRIC_NAME, HOSTNAME, APP_ID, SERVER_TIME, METRIC_SUM, METRIC_COUNT FROM METRIC_RECORD " +
+ "WHERE METRIC_NAME = '" + metricName + "' AND HOSTNAME = '" + hostname + "' AND APP_ID = '" + appId + "' AND SERVER_TIME < " + currentTime + " AND SERVER_TIME > " + oneDayBack)
+
+ var metricValues = new java.util.TreeMap[java.lang.Long, java.lang.Double]
+ result.collect().foreach(
+ t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5))
+ )
+
+ //val metricName = result.head().getString(0)
+ //val hostname = result.head().getString(1)
+ //val appId = result.head().getString(2)
+
+ val timelineMetric = new TimelineMetric(metricName, appId, hostname, metricValues)
+
+ var emaModel = new EmaModel()
+ emaModel.train(timelineMetric, weight, timessdev)
+ emaModel.save(sc, modelDir)
+
+// var metricData:Seq[Double] = Seq.empty
+// result.collect().foreach(
+// t => metricData :+ t.getDouble(4) / t.getInt(5)
+// )
+// val data: RDD[Double] = sc.parallelize(metricData)
+// val myCDF = Map(0.1 -> 0.2, 0.15 -> 0.6, 0.2 -> 0.05, 0.3 -> 0.05, 0.25 -> 0.1)
+// val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF)
+
+ }
+
+}
diff --git a/ambari-metrics-timelineservice/pom.xml b/ambari-metrics-timelineservice/pom.xml
index fc67cb1..67b7f4b 100644
--- a/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics-timelineservice/pom.xml
@@ -697,6 +697,11 @@
<version>1.0.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-metrics-alertservice</artifactId>
+ <version>2.5.1.0.0</version>
+ </dependency>
</dependencies>
<profiles>
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
similarity index 92%
rename from ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
rename to ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
index 0836a72..3558f87 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
@@ -19,6 +19,7 @@
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
+import org.apache.ambari.metrics.alertservice.spark.AmsKafkaProducer;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -63,10 +64,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
@@ -85,6 +83,7 @@
private Integer defaultTopNHostsLimit;
private MetricCollectorHAController haController;
private boolean containerMetricsDisabled = false;
+ private AmsKafkaProducer kafkaProducer = new AmsKafkaProducer("104.196.85.21:6667");
/**
* Construct the service.
@@ -372,11 +371,43 @@
// Error indicated by the Sql exception
TimelinePutResponse response = new TimelinePutResponse();
+ try {
+ if (!metrics.getMetrics().isEmpty() && metrics.getMetrics().get(0).getAppId().equals("HOST")) {
+ kafkaProducer.sendMetrics(fromTimelineMetrics(metrics));
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error(e);
+ }
hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false);
return response;
}
+
+ private org.apache.ambari.metrics.alertservice.common.TimelineMetrics fromTimelineMetrics(TimelineMetrics timelineMetrics) {
+ org.apache.ambari.metrics.alertservice.common.TimelineMetrics otherMetrics = new org.apache.ambari.metrics.alertservice.common.TimelineMetrics();
+
+ List<org.apache.ambari.metrics.alertservice.common.TimelineMetric> timelineMetricList = new ArrayList<>();
+ for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
+ timelineMetricList.add(fromTimelineMetric(timelineMetric));
+ }
+ otherMetrics.setMetrics(timelineMetricList);
+ return otherMetrics;
+ }
+
+ private org.apache.ambari.metrics.alertservice.common.TimelineMetric fromTimelineMetric(TimelineMetric timelineMetric) {
+
+ org.apache.ambari.metrics.alertservice.common.TimelineMetric otherMetric = new org.apache.ambari.metrics.alertservice.common.TimelineMetric();
+ otherMetric.setMetricValues(timelineMetric.getMetricValues());
+ otherMetric.setStartTime(timelineMetric.getStartTime());
+ otherMetric.setHostName(timelineMetric.getHostName());
+ otherMetric.setInstanceId(timelineMetric.getInstanceId());
+ otherMetric.setAppId(timelineMetric.getAppId());
+ otherMetric.setMetricName(timelineMetric.getMetricName());
+
+ return otherMetric;
+ }
+
@Override
public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
throws SQLException, IOException {
diff --git a/pom.xml b/pom.xml
index 47255ea..79ea06f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,6 +34,8 @@
<module>ambari-metrics-grafana</module>
<module>ambari-metrics-assembly</module>
<module>ambari-metrics-host-aggregator</module>
+ <module>ambari-metrics-alertservice</module>
+ <module>ambari-metrics-spark</module>
</modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>