AMBARI-21106 : Ambari Metrics Anomaly detection prototype.(avijayan)
diff --git a/ambari-metrics-alertservice/pom.xml b/ambari-metrics-alertservice/pom.xml
new file mode 100644
index 0000000..3a3545b
--- /dev/null
+++ b/ambari-metrics-alertservice/pom.xml
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>ambari-metrics</artifactId>
+        <groupId>org.apache.ambari</groupId>
+        <version>2.5.1.0.0</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>ambari-metrics-alertservice</artifactId>
+    <version>2.5.1.0.0</version>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    <name>Ambari Metrics Alert Service</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.ambari</groupId>
+            <artifactId>ambari-metrics-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+            <version>2.5</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.github.lucarosellini.rJava</groupId>
+            <artifactId>JRI</artifactId>
+            <version>0.9-7</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_2.11</artifactId>
+            <version>2.1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.10</artifactId>
+            <version>0.10.1.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.sun.jdmk</groupId>
+                    <artifactId>jmxtools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jmx</groupId>
+                    <artifactId>jmxri</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.mail</groupId>
+                    <artifactId>mail</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jmx</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jms</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.10.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-json</artifactId>
+            <version>0.10.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming-kafka_2.10</artifactId>
+            <version>1.6.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.10</artifactId>
+            <version>1.6.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.phoenix</groupId>
+            <artifactId>phoenix-spark</artifactId>
+            <version>4.7.0-HBase-1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-mllib_2.10</artifactId>
+            <version>1.3.0</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/AmsRTest.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/AmsRTest.java
new file mode 100644
index 0000000..0929f4c
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/AmsRTest.java
@@ -0,0 +1,130 @@
+package org.apache.ambari.metrics.alertservice.R;
+
+import org.apache.ambari.metrics.alertservice.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.common.DataSet;
+import org.apache.commons.lang.ArrayUtils;
+import org.rosuda.JRI.REXP;
+import org.rosuda.JRI.RVector;
+import org.rosuda.JRI.Rengine;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class AmsRTest {
+
+    public static void main(String[] args) {
+
+        String metricName = "TestMetric";
+        double[] ts = getTS(1000);
+
+        double[] train_ts = ArrayUtils.subarray(ts, 0,750);
+        double[] train_x = getData(750);
+        DataSet trainData = new DataSet(metricName, train_ts, train_x);
+
+        double[] test_ts = ArrayUtils.subarray(ts, 750,1000);
+        double[] test_x = getData(250);
+        test_x[50] = 5.5; //Anomaly
+        DataSet testData = new DataSet(metricName, test_ts, test_x);
+        ResultSet rs;
+
+        Map<String, String> configs = new HashMap();
+
+        System.out.println("TUKEYS");
+        configs.put("tukeys.n", "3");
+        rs = RFunctionInvoker.tukeys(trainData, testData, configs);
+        rs.print();
+        System.out.println("--------------");
+
+        System.out.println("EMA Global");
+        configs.put("ema.n", "3");
+        configs.put("ema.w", "0.8");
+        rs = RFunctionInvoker.ema_global(trainData, testData, configs);
+        rs.print();
+        System.out.println("--------------");
+
+        System.out.println("EMA Daily");
+        rs = RFunctionInvoker.ema_daily(trainData, testData, configs);
+        rs.print();
+        System.out.println("--------------");
+
+        configs.put("ks.p_value", "0.05");
+        System.out.println("KS Test");
+        rs = RFunctionInvoker.ksTest(trainData, testData, configs);
+        rs.print();
+        System.out.println("--------------");
+
+        ts = getTS(5000);
+        train_ts = ArrayUtils.subarray(ts, 30,4800);
+        train_x = getData(4800);
+        trainData = new DataSet(metricName, train_ts, train_x);
+        test_ts = ArrayUtils.subarray(ts, 4800,5000);
+        test_x = getData(200);
+        for (int i =0; i<200;i++) {
+            test_x[i] = test_x[i]*5;
+        }
+        testData = new DataSet(metricName, test_ts, test_x);
+        configs.put("hsdev.n", "3");
+        configs.put("hsdev.nhp", "3");
+        configs.put("hsdev.interval", "86400000");
+        configs.put("hsdev.period", "604800000");
+        System.out.println("HSdev");
+        rs = RFunctionInvoker.hsdev(trainData, testData, configs);
+        rs.print();
+        System.out.println("--------------");
+
+    }
+
+    static double[] getTS(int n) {
+        long currentTime = System.currentTimeMillis();
+        double[] ts = new double[n];
+        currentTime = currentTime - (currentTime % (5*60*1000));
+
+        for (int i = 0,j=n-1; i<n; i++,j--) {
+            ts[j] = currentTime;
+            currentTime = currentTime - (5*60*1000);
+        }
+        return ts;
+    }
+
+    static void testBasic() {
+        Rengine r = new Rengine(new String[]{"--no-save"}, false, null);
+        try {
+            r.eval("library(ambarimetricsAD)");
+            r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/test.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+            r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/util.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+            r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/tukeys.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+            double[] ts = getTS(5000);
+            double[] x = getData(5000);
+            r.assign("ts", ts);
+            r.assign("x", x);
+            r.eval("x[1000] <- 4.5");
+            r.eval("x[2000] <- 4.75");
+            r.eval("x[3000] <- 3.5");
+            r.eval("x[4000] <- 5.5");
+            r.eval("x[5000] <- 5.0");
+            r.eval("data <- data.frame(ts,x)");
+            r.eval("names(data) <- c(\"TS\", \"Metric\")");
+            System.out.println(r.eval("data"));
+            REXP exp = r.eval("t_an <- test_methods(data)");
+            exp = r.eval("t_an");
+            String strExp = exp.asString();
+            System.out.println("result:" + exp);
+            RVector cont = (RVector) exp.getContent();
+            double[] an_ts = cont.at(0).asDoubleArray();
+            double[] an_x = cont.at(1).asDoubleArray();
+            System.out.println("result:" + strExp);
+        }
+        finally {
+            r.end();
+        }
+    }
+    static double[] getData(int n) {
+        double[] metrics = new double[n];
+        Random random = new Random();
+        for (int i = 0; i<n; i++) {
+            metrics[i] = random.nextDouble();
+        }
+        return metrics;
+    }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java
new file mode 100644
index 0000000..8d1e520
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java
@@ -0,0 +1,180 @@
+package org.apache.ambari.metrics.alertservice.R;
+
+
+import org.apache.ambari.metrics.alertservice.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.common.DataSet;
+import org.rosuda.JRI.REXP;
+import org.rosuda.JRI.RVector;
+import org.rosuda.JRI.Rengine;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class RFunctionInvoker {
+
+    public static Rengine r = new Rengine(new String[]{"--no-save"}, false, null);
+
+
+    private static void loadDataSets(Rengine r, DataSet trainData, DataSet testData) {
+        r.assign("train_ts", trainData.ts);
+        r.assign("train_x", trainData.values);
+        r.eval("train_data <- data.frame(train_ts,train_x)");
+        r.eval("names(train_data) <- c(\"TS\", " + trainData.metricName + ")");
+
+        r.assign("test_ts", testData.ts);
+        r.assign("test_x", testData.values);
+        r.eval("test_data <- data.frame(test_ts,test_x)");
+        r.eval("names(test_data) <- c(\"TS\", " + testData.metricName + ")");
+    }
+
+
+    public static ResultSet tukeys(DataSet trainData, DataSet testData, Map<String, String> configs) {
+        try {
+            r.eval("library(ambarimetricsAD)");
+            r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/tukeys.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+
+            int n = Integer.parseInt(configs.get("tukeys.n"));
+            r.eval("n <- " + n);
+
+            loadDataSets(r, trainData, testData);
+
+            r.eval("an <- ams_tukeys(train_data, test_data, n)");
+            REXP exp = r.eval("an");
+            RVector cont = (RVector) exp.getContent();
+            List<double[]> result = new ArrayList();
+            for (int i = 0; i< cont.size(); i++) {
+                result.add(cont.at(i).asDoubleArray());
+            }
+            return new ResultSet(result);
+        } catch(Exception e) {
+            e.printStackTrace();
+        } finally {
+            r.end();
+        }
+        return null;
+    }
+
+    public static ResultSet ema_global(DataSet trainData, DataSet testData, Map<String, String> configs) {
+        try {
+            r.eval("library(ambarimetricsAD)");
+            r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/ema.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+
+            int n = Integer.parseInt(configs.get("ema.n"));
+            r.eval("n <- " + n);
+
+            double w = Double.parseDouble(configs.get("ema.w"));
+            r.eval("w <- " + w);
+
+            loadDataSets(r, trainData, testData);
+
+            r.eval("an <- ema_global(train_data, test_data, w, n)");
+            REXP exp = r.eval("an");
+            RVector cont = (RVector) exp.getContent();
+            List<double[]> result = new ArrayList();
+            for (int i = 0; i< cont.size(); i++) {
+                result.add(cont.at(i).asDoubleArray());
+            }
+            return new ResultSet(result);
+
+        } catch(Exception e) {
+            e.printStackTrace();
+        } finally {
+            r.end();
+        }
+        return null;
+    }
+
+    public static ResultSet ema_daily(DataSet trainData, DataSet testData, Map<String, String> configs) {
+        try {
+            r.eval("library(ambarimetricsAD)");
+            r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/ema.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+
+            int n = Integer.parseInt(configs.get("ema.n"));
+            r.eval("n <- " + n);
+
+            double w = Double.parseDouble(configs.get("ema.w"));
+            r.eval("w <- " + w);
+
+            loadDataSets(r, trainData, testData);
+
+            r.eval("an <- ema_daily(train_data, test_data, w, n)");
+            REXP exp = r.eval("an");
+            RVector cont = (RVector) exp.getContent();
+            List<double[]> result = new ArrayList();
+            for (int i = 0; i< cont.size(); i++) {
+                result.add(cont.at(i).asDoubleArray());
+            }
+            return new ResultSet(result);
+
+        } catch(Exception e) {
+            e.printStackTrace();
+        } finally {
+            r.end();
+        }
+        return null;
+    }
+
+    public static ResultSet ksTest(DataSet trainData, DataSet testData, Map<String, String> configs) {
+        try {
+            r.eval("library(ambarimetricsAD)");
+            r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/kstest.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+
+            double p_value = Double.parseDouble(configs.get("ks.p_value"));
+            r.eval("p_value <- " + p_value);
+
+            loadDataSets(r, trainData, testData);
+
+            r.eval("an <- ams_ks(train_data, test_data, p_value)");
+            REXP exp = r.eval("an");
+            RVector cont = (RVector) exp.getContent();
+            List<double[]> result = new ArrayList();
+            for (int i = 0; i< cont.size(); i++) {
+                result.add(cont.at(i).asDoubleArray());
+            }
+            return new ResultSet(result);
+
+        } catch(Exception e) {
+            e.printStackTrace();
+        } finally {
+            r.end();
+        }
+        return null;
+    }
+
+    public static ResultSet hsdev(DataSet trainData, DataSet testData, Map<String, String> configs) {
+        try {
+            r.eval("library(ambarimetricsAD)");
+            r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/hsdev.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+
+            int n = Integer.parseInt(configs.get("hsdev.n"));
+            r.eval("n <- " + n);
+
+            int nhp = Integer.parseInt(configs.get("hsdev.nhp"));
+            r.eval("nhp <- " + nhp);
+
+            long interval = Long.parseLong(configs.get("hsdev.interval"));
+            r.eval("interval <- " + interval);
+
+            long period = Long.parseLong(configs.get("hsdev.period"));
+            r.eval("period <- " + period);
+
+            loadDataSets(r, trainData, testData);
+
+            r.eval("an2 <- hsdev_daily(train_data, test_data, n, nhp, interval, period)");
+            REXP exp = r.eval("an2");
+            RVector cont = (RVector) exp.getContent();
+
+            List<double[]> result = new ArrayList();
+            for (int i = 0; i< cont.size(); i++) {
+                result.add(cont.at(i).asDoubleArray());
+            }
+            return new ResultSet(result);
+        } catch(Exception e) {
+            e.printStackTrace();
+        } finally {
+            r.end();
+        }
+        return null;
+    }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/DataSet.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/DataSet.java
new file mode 100644
index 0000000..47bf9b6
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/DataSet.java
@@ -0,0 +1,21 @@
+package org.apache.ambari.metrics.alertservice.common;
+
+import java.util.Arrays;
+
+public class DataSet {
+
+    public String metricName;
+    public double[] ts;
+    public double[] values;
+
+    public DataSet(String metricName, double[] ts, double[] values) {
+        this.metricName = metricName;
+        this.ts = ts;
+        this.values = values;
+    }
+
+    @Override
+    public String toString() {
+        return metricName + Arrays.toString(ts) + Arrays.toString(values);
+    }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MethodResult.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MethodResult.java
new file mode 100644
index 0000000..915da4c
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MethodResult.java
@@ -0,0 +1,10 @@
+package org.apache.ambari.metrics.alertservice.common;
+
+public abstract class MethodResult {
+    protected String methodType;
+    public abstract String prettyPrint();
+
+    public String getMethodType() {
+        return methodType;
+    }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MetricAnomaly.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MetricAnomaly.java
new file mode 100644
index 0000000..d237bee
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MetricAnomaly.java
@@ -0,0 +1,52 @@
+package org.apache.ambari.metrics.alertservice.common;
+
+public class MetricAnomaly {
+
+    private String metricKey;
+    private long timestamp;
+    private double metricValue;
+    private MethodResult methodResult;
+
+    public MetricAnomaly(String metricKey, long timestamp, double metricValue, MethodResult methodResult) {
+        this.metricKey = metricKey;
+        this.timestamp = timestamp;
+        this.metricValue = metricValue;
+        this.methodResult = methodResult;
+    }
+
+    public String getMetricKey() {
+        return metricKey;
+    }
+
+    public void setMetricName(String metricName) {
+        this.metricKey = metricName;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public double getMetricValue() {
+        return metricValue;
+    }
+
+    public void setMetricValue(double metricValue) {
+        this.metricValue = metricValue;
+    }
+
+    public MethodResult getMethodResult() {
+        return methodResult;
+    }
+
+    public void setMethodResult(MethodResult methodResult) {
+        this.methodResult = methodResult;
+    }
+
+    public String getAnomalyAsString() {
+        return metricKey + ":" + timestamp + ":" + metricValue + ":" + methodResult.prettyPrint();
+    }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/ResultSet.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/ResultSet.java
new file mode 100644
index 0000000..96b74e0
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/ResultSet.java
@@ -0,0 +1,26 @@
+package org.apache.ambari.metrics.alertservice.common;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ResultSet {
+
+    List<double[]> resultset = new ArrayList<>();
+
+    public ResultSet(List<double[]> resultset) {
+        this.resultset = resultset;
+    }
+
+    public void print() {
+        System.out.println("Result : ");
+        if (!resultset.isEmpty()) {
+            for (int i = 0; i<resultset.get(0).length;i++) {
+                for (double[] entity : resultset) {
+                    System.out.print(entity[i] + " ");
+                }
+                System.out.println();
+            }
+        }
+    }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/SingleValuedTimelineMetric.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/SingleValuedTimelineMetric.java
new file mode 100644
index 0000000..5118225
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/SingleValuedTimelineMetric.java
@@ -0,0 +1,86 @@
+package org.apache.ambari.metrics.alertservice.common;
+
+
+public class SingleValuedTimelineMetric {
+    private Long timestamp;
+    private Double value;
+    private String metricName;
+    private String appId;
+    private String instanceId;
+    private String hostName;
+    private Long startTime;
+    private String type;
+
+    public void setSingleTimeseriesValue(Long timestamp, Double value) {
+        this.timestamp = timestamp;
+        this.value = value;
+    }
+
+    public SingleValuedTimelineMetric(String metricName, String appId,
+                                      String instanceId, String hostName,
+                                      long timestamp, long startTime, String type) {
+        this.metricName = metricName;
+        this.appId = appId;
+        this.instanceId = instanceId;
+        this.hostName = hostName;
+        this.timestamp = timestamp;
+        this.startTime = startTime;
+        this.type = type;
+    }
+
+    public Long getTimestamp() {
+        return timestamp;
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public Double getValue() {
+        return value;
+    }
+
+    public String getMetricName() {
+        return metricName;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+
+    public String getInstanceId() {
+        return instanceId;
+    }
+
+    public String getHostName() {
+        return hostName;
+    }
+
+    public boolean equalsExceptTime(TimelineMetric metric) {
+        if (!metricName.equals(metric.getMetricName())) return false;
+        if (hostName != null ? !hostName.equals(metric.getHostName()) : metric.getHostName() != null)
+            return false;
+        if (appId != null ? !appId.equals(metric.getAppId()) : metric.getAppId() != null)
+            return false;
+        if (instanceId != null ? !instanceId.equals(metric.getInstanceId()) : metric.getInstanceId() != null) return false;
+
+        return true;
+    }
+
+    public TimelineMetric getTimelineMetric() {
+        TimelineMetric metric = new TimelineMetric();
+        metric.setMetricName(this.metricName);
+        metric.setAppId(this.appId);
+        metric.setHostName(this.hostName);
+        metric.setType(this.type);
+        metric.setInstanceId(this.instanceId);
+        metric.setStartTime(this.startTime);
+        metric.setTimestamp(this.timestamp);
+        metric.getMetricValues().put(timestamp, value);
+        return metric;
+    }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/StatisticUtils.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/StatisticUtils.java
new file mode 100644
index 0000000..dff56e6
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/StatisticUtils.java
@@ -0,0 +1,60 @@
+package org.apache.ambari.metrics.alertservice.common;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+public class StatisticUtils {
+
+  public static double mean(Collection<Double> values) {
+    double sum = 0;
+    for (double d : values) {
+      sum += d;
+    }
+    return sum / values.size();
+  }
+
+  public static double variance(Collection<Double> values) {
+    double avg =  mean(values);
+    double variance = 0;
+    for (double d : values) {
+      variance += Math.pow(d - avg, 2.0);
+    }
+    return variance;
+  }
+
+  public static double sdev(Collection<Double> values, boolean useBesselsCorrection) {
+    double variance = variance(values);
+    int n = (useBesselsCorrection) ? values.size() - 1 : values.size();
+    return Math.sqrt(variance / n);
+  }
+
+  public static double median(Collection<Double> values) {
+    ArrayList<Double> clonedValues = new ArrayList<Double>(values);
+    Collections.sort(clonedValues);
+    int n = values.size();
+
+    if (n % 2 != 0) {
+      return clonedValues.get((n-1)/2);
+    } else {
+      return ( clonedValues.get((n-1)/2) + clonedValues.get(n/2) ) / 2;
+    }
+  }
+
+
+
+//  public static void main(String[] args) {
+//
+//    Collection<Double> values = new ArrayList<>();
+//    values.add(1.0);
+//    values.add(2.0);
+//    values.add(3.0);
+//    values.add(4.0);
+//    values.add(5.0);
+//
+//    System.out.println(mean(values));
+//    System.out.println(sdev(values, false));
+//    System.out.println(median(values));
+//  }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetric.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetric.java
new file mode 100644
index 0000000..2a73855
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetric.java
@@ -0,0 +1,221 @@
+package org.apache.ambari.metrics.alertservice.common;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+@XmlRootElement(name = "metric")
+@XmlAccessorType(XmlAccessType.NONE)
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TimelineMetric implements Comparable<TimelineMetric>, Serializable {
+
+    private String metricName;
+    private String appId;
+    private String instanceId;
+    private String hostName;
+    private long timestamp;
+    private long startTime;
+    private String type;
+    private String units;
+    private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+    private Map<String, String> metadata = new HashMap<>();
+
+    // default
+    public TimelineMetric() {
+
+    }
+
+    public TimelineMetric(String metricName, String appId, String hostName, TreeMap<Long,Double> metricValues) {
+        this.metricName = metricName;
+        this.appId = appId;
+        this.hostName = hostName;
+        this.metricValues.putAll(metricValues);
+    }
+
+    // copy constructor
+    public TimelineMetric(TimelineMetric metric) {
+        setMetricName(metric.getMetricName());
+        setType(metric.getType());
+        setUnits(metric.getUnits());
+        setTimestamp(metric.getTimestamp());
+        setAppId(metric.getAppId());
+        setInstanceId(metric.getInstanceId());
+        setHostName(metric.getHostName());
+        setStartTime(metric.getStartTime());
+        setMetricValues(new TreeMap<Long, Double>(metric.getMetricValues()));
+    }
+
+    @XmlElement(name = "metricname")
+    public String getMetricName() {
+        return metricName;
+    }
+
+    public void setMetricName(String metricName) {
+        this.metricName = metricName;
+    }
+
+    @XmlElement(name = "appid")
+    public String getAppId() {
+        return appId;
+    }
+
+    public void setAppId(String appId) {
+        this.appId = appId;
+    }
+
+    @XmlElement(name = "instanceid")
+    public String getInstanceId() {
+        return instanceId;
+    }
+
+    public void setInstanceId(String instanceId) {
+        this.instanceId = instanceId;
+    }
+
+    @XmlElement(name = "hostname")
+    public String getHostName() {
+        return hostName;
+    }
+
+    public void setHostName(String hostName) {
+        this.hostName = hostName;
+    }
+
+    @XmlElement(name = "timestamp")
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    @XmlElement(name = "starttime")
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    @XmlElement(name = "type", defaultValue = "UNDEFINED")
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    @XmlElement(name = "units")
+    public String getUnits() {
+        return units;
+    }
+
+    public void setUnits(String units) {
+        this.units = units;
+    }
+
+    @XmlElement(name = "metrics")
+    public TreeMap<Long, Double> getMetricValues() {
+        return metricValues;
+    }
+
+    public void setMetricValues(TreeMap<Long, Double> metricValues) {
+        this.metricValues = metricValues;
+    }
+
+    public void addMetricValues(Map<Long, Double> metricValues) {
+        this.metricValues.putAll(metricValues);
+    }
+
+    @XmlElement(name = "metadata")
+    public Map<String,String> getMetadata () {
+        return metadata;
+    }
+
+    public void setMetadata (Map<String,String> metadata) {
+        this.metadata = metadata;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        TimelineMetric metric = (TimelineMetric) o;
+
+        if (!metricName.equals(metric.metricName)) return false;
+        if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
+            return false;
+        if (appId != null ? !appId.equals(metric.appId) : metric.appId != null)
+            return false;
+        if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+            return false;
+        if (timestamp != metric.timestamp) return false;
+        if (startTime != metric.startTime) return false;
+
+        return true;
+    }
+
+    public boolean equalsExceptTime(TimelineMetric metric) {
+        if (!metricName.equals(metric.metricName)) return false;
+        if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
+            return false;
+        if (appId != null ? !appId.equals(metric.appId) : metric.appId != null)
+            return false;
+        if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+            return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = metricName.hashCode();
+        result = 31 * result + (appId != null ? appId.hashCode() : 0);
+        result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+        result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
+        result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+        return result;
+    }
+
+    @Override
+    public int compareTo(TimelineMetric other) {
+        if (timestamp > other.timestamp) {
+            return -1;
+        } else if (timestamp < other.timestamp) {
+            return 1;
+        } else {
+            return metricName.compareTo(other.metricName);
+        }
+    }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetrics.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetrics.java
new file mode 100644
index 0000000..500e1e9
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetrics.java
@@ -0,0 +1,112 @@
+package org.apache.ambari.metrics.alertservice.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The class that hosts a list of timeline entities.
+ */
+@XmlRootElement(name = "metrics")
+@XmlAccessorType(XmlAccessType.NONE)
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TimelineMetrics implements Serializable {
+
+    private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>();
+
+    public TimelineMetrics() {}
+
+    @XmlElement(name = "metrics")
+    public List<TimelineMetric> getMetrics() {
+        return allMetrics;
+    }
+
+    public void setMetrics(List<TimelineMetric> allMetrics) {
+        this.allMetrics = allMetrics;
+    }
+
+    private boolean isEqualTimelineMetrics(TimelineMetric metric1,
+                                           TimelineMetric metric2) {
+
+        boolean isEqual = true;
+
+        if (!metric1.getMetricName().equals(metric2.getMetricName())) {
+            return false;
+        }
+
+        if (metric1.getHostName() != null) {
+            isEqual = metric1.getHostName().equals(metric2.getHostName());
+        }
+
+        if (metric1.getAppId() != null) {
+            isEqual = metric1.getAppId().equals(metric2.getAppId());
+        }
+
+        return isEqual;
+    }
+
+    /**
+     * Merge with existing TimelineMetric if everything except startTime is
+     * the same.
+     * @param metric {@link TimelineMetric}
+     */
+    public void addOrMergeTimelineMetric(TimelineMetric metric) {
+        TimelineMetric metricToMerge = null;
+
+        if (!allMetrics.isEmpty()) {
+            for (TimelineMetric timelineMetric : allMetrics) {
+                if (timelineMetric.equalsExceptTime(metric)) {
+                    metricToMerge = timelineMetric;
+                    break;
+                }
+            }
+        }
+
+        if (metricToMerge != null) {
+            metricToMerge.addMetricValues(metric.getMetricValues());
+            if (metricToMerge.getTimestamp() > metric.getTimestamp()) {
+                metricToMerge.setTimestamp(metric.getTimestamp());
+            }
+            if (metricToMerge.getStartTime() > metric.getStartTime()) {
+                metricToMerge.setStartTime(metric.getStartTime());
+            }
+        } else {
+            allMetrics.add(metric);
+        }
+    }
+
+    // Optimization that addresses too many TreeMaps from getting created.
+    public void addOrMergeTimelineMetric(SingleValuedTimelineMetric metric) {
+        TimelineMetric metricToMerge = null;
+
+        if (!allMetrics.isEmpty()) {
+            for (TimelineMetric timelineMetric : allMetrics) {
+                if (metric.equalsExceptTime(timelineMetric)) {
+                    metricToMerge = timelineMetric;
+                    break;
+                }
+            }
+        }
+
+        if (metricToMerge != null) {
+            metricToMerge.getMetricValues().put(metric.getTimestamp(), metric.getValue());
+            if (metricToMerge.getTimestamp() > metric.getTimestamp()) {
+                metricToMerge.setTimestamp(metric.getTimestamp());
+            }
+            if (metricToMerge.getStartTime() > metric.getStartTime()) {
+                metricToMerge.setStartTime(metric.getStartTime());
+            }
+        } else {
+            allMetrics.add(metric.getTimelineMetric());
+        }
+    }
+}
+
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/MetricAnomalyModel.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/MetricAnomalyModel.java
new file mode 100644
index 0000000..7ae91a3
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/MetricAnomalyModel.java
@@ -0,0 +1,12 @@
+package org.apache.ambari.metrics.alertservice.methods;
+
+import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
+
+import java.util.List;
+
+public interface MetricAnomalyModel {
+
+    public List<MetricAnomaly> onNewMetric(TimelineMetric metric);
+    public List<MetricAnomaly> test(TimelineMetric metric);
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaDS.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaDS.java
new file mode 100644
index 0000000..ec548c8
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaDS.java
@@ -0,0 +1,56 @@
+package org.apache.ambari.metrics.alertservice.methods.ema;
+
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+
+@XmlRootElement
+public class EmaDS implements Serializable {
+
+    String metricName;
+    String appId;
+    String hostname;
+    double ema;
+    double ems;
+    double weight;
+    int timessdev;
+    private static final Log LOG = LogFactory.getLog(EmaDS.class);
+
+    public EmaDS(String metricName, String appId, String hostname, double weight, int timessdev) {
+        this.metricName = metricName;
+        this.appId = appId;
+        this.hostname = hostname;
+        this.weight = weight;
+        this.timessdev = timessdev;
+        this.ema = 0.0;
+        this.ems = 0.0;
+    }
+
+
+    public EmaResult testAndUpdate(double metricValue) {
+
+        double diff  = Math.abs(ema - metricValue) - (timessdev * ems);
+
+        ema = weight * ema + (1 - weight) * metricValue;
+        ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0));
+
+        System.out.println(ema + ", " + ems);
+        LOG.info(ema + ", " + ems);
+        return diff > 0 ? new EmaResult(diff) : null;
+    }
+
+    public void update(double metricValue) {
+        ema = weight * ema + (1 - weight) * metricValue;
+        ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0));
+        System.out.println(ema + ", " + ems);
+        LOG.info(ema + ", " + ems);
+    }
+
+    public EmaResult test(double metricValue) {
+        double diff  = Math.abs(ema - metricValue) - (timessdev * ems);
+        return diff > 0 ? new EmaResult(diff) : null;
+    }
+
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModel.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModel.java
new file mode 100644
index 0000000..4aae543
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModel.java
@@ -0,0 +1,114 @@
+package org.apache.ambari.metrics.alertservice.methods.ema;
+
+import com.google.gson.Gson;
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.ambari.metrics.alertservice.common.MethodResult;
+import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
+import org.apache.ambari.metrics.alertservice.methods.MetricAnomalyModel;
+import org.apache.spark.SparkContext;
+import org.apache.spark.mllib.util.Saveable;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@XmlRootElement
+public class EmaModel implements MetricAnomalyModel, Saveable, Serializable {
+
+    @XmlElement(name = "trackedEmas")
+    private Map<String, EmaDS> trackedEmas = new HashMap<>();
+    private static final Log LOG = LogFactory.getLog(EmaModel.class);
+
+    public List<MetricAnomaly> onNewMetric(TimelineMetric metric) {
+
+        String metricName = metric.getMetricName();
+        String appId = metric.getAppId();
+        String hostname = metric.getHostName();
+        String key = metricName + "_" + appId + "_" + hostname;
+        List<MetricAnomaly> anomalies = new ArrayList<>();
+
+        if (!trackedEmas.containsKey(metricName)) {
+            trackedEmas.put(key, new EmaDS(metricName, appId, hostname, 0.8, 3));
+        }
+
+        EmaDS emaDS = trackedEmas.get(key);
+        for (Long timestamp : metric.getMetricValues().keySet()) {
+            double metricValue = metric.getMetricValues().get(timestamp);
+            MethodResult result = emaDS.testAndUpdate(metricValue);
+            if (result != null) {
+                MetricAnomaly metricAnomaly = new MetricAnomaly(key,timestamp, metricValue, result);
+                anomalies.add(metricAnomaly);
+            }
+        }
+        return anomalies;
+    }
+
+    public EmaDS train(TimelineMetric metric, double weight, int timessdev) {
+
+        String metricName = metric.getMetricName();
+        String appId = metric.getAppId();
+        String hostname = metric.getHostName();
+        String key = metricName + "_" + appId + "_" + hostname;
+
+        EmaDS emaDS = new EmaDS(metric.getMetricName(), metric.getAppId(), metric.getHostName(), weight, timessdev);
+        LOG.info("In EMA Train step");
+        for (Long timestamp : metric.getMetricValues().keySet()) {
+            System.out.println(timestamp + " : " + metric.getMetricValues().get(timestamp));
+            LOG.info(timestamp + " : " + metric.getMetricValues().get(timestamp));
+            emaDS.update(metric.getMetricValues().get(timestamp));
+        }
+        trackedEmas.put(key, emaDS);
+        return emaDS;
+    }
+
+    public List<MetricAnomaly> test(TimelineMetric metric) {
+        String metricName = metric.getMetricName();
+        String appId = metric.getAppId();
+        String hostname = metric.getHostName();
+        String key = metricName + "_" + appId + "_" + hostname;
+
+        EmaDS emaDS = trackedEmas.get(key);
+
+        if (emaDS == null) {
+            return new ArrayList<>();
+        }
+
+        List<MetricAnomaly> anomalies = new ArrayList<>();
+
+        for (Long timestamp : metric.getMetricValues().keySet()) {
+            double metricValue = metric.getMetricValues().get(timestamp);
+            MethodResult result = emaDS.test(metricValue);
+            if (result != null) {
+                MetricAnomaly metricAnomaly = new MetricAnomaly(key,timestamp, metricValue, result);
+                anomalies.add(metricAnomaly);
+            }
+        }
+        return anomalies;
+    }
+
+    @Override
+    public void save(SparkContext sc, String path) {
+        Gson gson = new Gson();
+        try {
+            String json = gson.toJson(this);
+            try (Writer writer = new BufferedWriter(new OutputStreamWriter(
+                    new FileOutputStream(path), "utf-8"))) {
+                writer.write(json);
+            }        } catch (IOException e) {
+            LOG.error(e);
+        }
+    }
+
+    @Override
+    public String formatVersion() {
+        return "1.0";
+    }
+
+}
+
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModelLoader.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModelLoader.java
new file mode 100644
index 0000000..f0ef340
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModelLoader.java
@@ -0,0 +1,29 @@
+package org.apache.ambari.metrics.alertservice.methods.ema;
+
+import com.google.gson.Gson;
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.spark.SparkContext;
+import org.apache.spark.mllib.util.Loader;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+public class EmaModelLoader implements Loader<EmaModel> {
+    private static final Log LOG = LogFactory.getLog(EmaModelLoader.class);
+
+    @Override
+    public EmaModel load(SparkContext sc, String path) {
+        Gson gson = new Gson();
+        try {
+            String fileString = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
+            return gson.fromJson(fileString, EmaModel.class);
+        } catch (IOException e) {
+            LOG.error(e);
+        }
+        return null;
+    }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaResult.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaResult.java
new file mode 100644
index 0000000..23f1793
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaResult.java
@@ -0,0 +1,19 @@
+package org.apache.ambari.metrics.alertservice.methods.ema;
+
+import org.apache.ambari.metrics.alertservice.common.MethodResult;
+
+public class EmaResult extends MethodResult{
+
+    double diff;
+
+    public EmaResult(double diff) {
+        this.methodType = "EMA";
+        this.diff = diff;
+    }
+
+
+    @Override
+    public String prettyPrint() {
+        return methodType + "(` = " + diff + ")";
+    }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/TestEmaModel.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/TestEmaModel.java
new file mode 100644
index 0000000..a090786
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/TestEmaModel.java
@@ -0,0 +1,51 @@
+package org.apache.ambari.metrics.alertservice.methods.ema;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
+import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class TestEmaModel {
+
+    public static void main(String[] args) throws IOException {
+
+        long now = System.currentTimeMillis();
+        TimelineMetric metric1 = new TimelineMetric();
+        metric1.setMetricName("dummy_metric");
+        metric1.setHostName("dummy_host");
+        metric1.setTimestamp(now);
+        metric1.setStartTime(now - 1000);
+        metric1.setAppId("HOST");
+        metric1.setType("Integer");
+
+        TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+
+        for (int i = 0; i<20;i++) {
+            double metric = 9 + Math.random();
+            metricValues.put(now - i*100, metric);
+        }
+        metric1.setMetricValues(metricValues);
+
+        EmaModel emaModel = new EmaModel();
+
+        emaModel.train(metric1, 0.8, 3);
+    }
+
+    /*
+     {{
+            put(now - 100, 1.20);
+            put(now - 200, 1.25);
+            put(now - 300, 1.30);
+            put(now - 400, 4.50);
+            put(now - 500, 1.35);
+            put(now - 400, 5.50);
+        }}
+     */
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java
new file mode 100644
index 0000000..de56825
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java
@@ -0,0 +1,75 @@
+package org.apache.ambari.metrics.alertservice.spark;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
+import org.apache.ambari.metrics.alertservice.common.TimelineMetrics;
+import org.apache.kafka.clients.producer.*;
+
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class AmsKafkaProducer {
+
+    Producer producer;
+    private static String topicName = "ambari-metrics-topic";
+
+    public AmsKafkaProducer(String kafkaServers) {
+        Properties configProperties = new Properties();
+        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); //"avijayan-ams-2.openstacklocal:6667"
+        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
+        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer");
+        producer = new KafkaProducer(configProperties);
+    }
+
+    public void sendMetrics(TimelineMetrics timelineMetrics) throws InterruptedException, ExecutionException {
+
+        ObjectMapper objectMapper = new ObjectMapper();
+        JsonNode jsonNode = objectMapper.valueToTree(timelineMetrics);
+        ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, JsonNode>(topicName,jsonNode);
+        Future<RecordMetadata> kafkaFuture =  producer.send(rec);
+
+        System.out.println(kafkaFuture.isDone());
+        System.out.println(kafkaFuture.get().topic());
+    }
+
+    public static void main(String[] args) throws ExecutionException, InterruptedException {
+        final long now = System.currentTimeMillis();
+
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        TimelineMetric metric1 = new TimelineMetric();
+        metric1.setMetricName("mem_free");
+        metric1.setHostName("avijayan-ams-3.openstacklocal");
+        metric1.setTimestamp(now);
+        metric1.setStartTime(now - 1000);
+        metric1.setAppId("HOST");
+        metric1.setType("Integer");
+
+        TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+
+        for (int i = 0; i<20;i++) {
+            double metric = 20000 + Math.random();
+            metricValues.put(now - i*100, metric);
+        }
+
+        metric1.setMetricValues(metricValues);
+
+//        metric1.setMetricValues(new TreeMap<Long, Double>() {{
+//            put(now - 100, 1.20);
+//            put(now - 200, 11.25);
+//            put(now - 300, 1.30);
+//            put(now - 400, 4.50);
+//            put(now - 500, 16.35);
+//            put(now - 400, 5.50);
+//        }});
+
+        timelineMetrics.getMetrics().add(metric1);
+
+        for (int i = 0; i<1; i++) {
+            new AmsKafkaProducer("avijayan-ams-2.openstacklocal:6667").sendMetrics(timelineMetrics);
+            Thread.sleep(1000);
+        }
+    }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java
new file mode 100644
index 0000000..5a6bb61
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java
@@ -0,0 +1,181 @@
+package org.apache.ambari.metrics.alertservice.spark;
+
+import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
+import org.apache.ambari.metrics.alertservice.common.TimelineMetrics;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.*;
+
+public class AnomalyMetricPublisher implements Serializable {
+
+    private String hostName = "UNKNOWN.example.com";
+    private String instanceId = null;
+    private String serviceName = "anomaly-engine";
+    private String collectorHost;
+    private String protocol;
+    private String port;
+    private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics";
+    private static final Log LOG = LogFactory.getLog(AnomalyMetricPublisher.class);
+    private static ObjectMapper mapper;
+
+    static {
+        mapper = new ObjectMapper();
+        AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+        mapper.setAnnotationIntrospector(introspector);
+        mapper.getSerializationConfig()
+                .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+    }
+
+    public AnomalyMetricPublisher(String collectorHost, String protocol, String port) {
+        this.collectorHost = collectorHost;
+        this.protocol = protocol;
+        this.port = port;
+        this.hostName = getDefaultLocalHostName();
+    }
+
+    private String getDefaultLocalHostName() {
+        try {
+            return InetAddress.getLocalHost().getCanonicalHostName();
+        } catch (UnknownHostException e) {
+            LOG.info("Error getting host address");
+        }
+        return null;
+    }
+
+    public void publish(List<MetricAnomaly> metricAnomalies) {
+        LOG.info("Sending metric anomalies of size : " + metricAnomalies.size());
+        List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies);
+        LOG.info("Sending TimelineMetric list of size : " + metricList.size());
+        if (!metricList.isEmpty()) {
+            TimelineMetrics timelineMetrics = new TimelineMetrics();
+            timelineMetrics.setMetrics(metricList);
+            emitMetrics(timelineMetrics);
+        }
+    }
+
+    private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) {
+        List<TimelineMetric> metrics = new ArrayList<>();
+
+        if (metricAnomalies.isEmpty()) {
+            return metrics;
+        }
+
+        long currentTime = System.currentTimeMillis();
+        MetricAnomaly prevAnomaly = metricAnomalies.get(0);
+
+        TimelineMetric timelineMetric = new TimelineMetric();
+        timelineMetric.setMetricName(prevAnomaly.getMetricKey() + "_" + prevAnomaly.getMethodResult().getMethodType());
+        timelineMetric.setAppId(serviceName);
+        timelineMetric.setInstanceId(instanceId);
+        timelineMetric.setHostName(hostName);
+        timelineMetric.setStartTime(currentTime);
+
+        TreeMap<Long,Double> metricValues = new TreeMap<>();
+        metricValues.put(prevAnomaly.getTimestamp(), prevAnomaly.getMetricValue());
+        MetricAnomaly currentAnomaly;
+
+        for (int i = 1; i < metricAnomalies.size(); i++) {
+            currentAnomaly = metricAnomalies.get(i);
+            if (currentAnomaly.getMetricKey().equals(prevAnomaly.getMetricKey())) {
+                metricValues.put(currentAnomaly.getTimestamp(), currentAnomaly.getMetricValue());
+            } else {
+                timelineMetric.setMetricValues(metricValues);
+                metrics.add(timelineMetric);
+
+                timelineMetric = new TimelineMetric();
+                timelineMetric.setMetricName(currentAnomaly.getMetricKey() + "_" + currentAnomaly.getMethodResult().getMethodType());
+                timelineMetric.setAppId(serviceName);
+                timelineMetric.setInstanceId(instanceId);
+                timelineMetric.setHostName(hostName);
+                timelineMetric.setStartTime(currentTime);
+                metricValues = new TreeMap<>();
+                metricValues.put(currentAnomaly.getTimestamp(), currentAnomaly.getMetricValue());
+                prevAnomaly = currentAnomaly;
+            }
+        }
+
+        timelineMetric.setMetricValues(metricValues);
+        metrics.add(timelineMetric);
+        return metrics;
+    }
+
+    private boolean emitMetrics(TimelineMetrics metrics) {
+        String connectUrl = constructTimelineMetricUri();
+        String jsonData = null;
+        LOG.info("EmitMetrics connectUrl = "  + connectUrl);
+        try {
+            jsonData = mapper.writeValueAsString(metrics);
+        } catch (IOException e) {
+            LOG.error("Unable to parse metrics", e);
+        }
+        if (jsonData != null) {
+            return emitMetricsJson(connectUrl, jsonData);
+        }
+        return false;
+    }
+
+    private HttpURLConnection getConnection(String spec) throws IOException {
+        return (HttpURLConnection) new URL(spec).openConnection();
+    }
+
+    private boolean emitMetricsJson(String connectUrl, String jsonData) {
+        LOG.info("Metrics Data : " + jsonData);
+        int timeout = 10000;
+        HttpURLConnection connection = null;
+        try {
+            if (connectUrl == null) {
+                throw new IOException("Unknown URL. Unable to connect to metrics collector.");
+            }
+            connection = getConnection(connectUrl);
+
+            connection.setRequestMethod("POST");
+            connection.setRequestProperty("Content-Type", "application/json");
+            connection.setRequestProperty("Connection", "Keep-Alive");
+            connection.setConnectTimeout(timeout);
+            connection.setReadTimeout(timeout);
+            connection.setDoOutput(true);
+
+            if (jsonData != null) {
+                try (OutputStream os = connection.getOutputStream()) {
+                    os.write(jsonData.getBytes("UTF-8"));
+                }
+            }
+
+            int statusCode = connection.getResponseCode();
+
+            if (statusCode != 200) {
+                LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
+                        "statusCode = " + statusCode);
+            } else {
+                LOG.info("Metrics posted to Collector " + connectUrl);
+            }
+            return true;
+        } catch (IOException ioe) {
+            LOG.error(ioe.getMessage());
+        }
+        return false;
+    }
+
+    private String constructTimelineMetricUri() {
+        StringBuilder sb = new StringBuilder(protocol);
+        sb.append("://");
+        sb.append(collectorHost);
+        sb.append(":");
+        sb.append(port);
+        sb.append(WS_V1_TIMELINE_METRICS);
+        return sb.toString();
+    }
+}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java
new file mode 100644
index 0000000..ab87a95
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java
@@ -0,0 +1,134 @@
+package org.apache.ambari.metrics.alertservice.spark;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
+import org.apache.ambari.metrics.alertservice.common.TimelineMetrics;
+import org.apache.ambari.metrics.alertservice.methods.ema.EmaModel;
+import org.apache.ambari.metrics.alertservice.methods.MetricAnomalyModel;
+import org.apache.ambari.metrics.alertservice.methods.ema.EmaModelLoader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+import scala.Tuple2;
+
+import java.util.*;
+
+public class MetricAnomalyDetector {
+
+    private static final Log LOG = LogFactory.getLog(MetricAnomalyDetector.class);
+    private static String groupId = "ambari-metrics-group";
+    private static String topicName = "ambari-metrics-topic";
+    private static int numThreads = 1;
+
+    //private static String zkQuorum = "avijayan-ams-1.openstacklocal:2181,avijayan-ams-2.openstacklocal:2181,avijayan-ams-3.openstacklocal:2181";
+    //private static Map<String, String> kafkaParams = new HashMap<>();
+    //static {
+    //    kafkaParams.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "avijayan-ams-2.openstacklocal:6667");
+    //    kafkaParams.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+    //    kafkaParams.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonSerializer");
+    //    kafkaParams.put("metadata.broker.list", "avijayan-ams-2.openstacklocal:6667");
+    //}
+
+    public MetricAnomalyDetector() {
+    }
+
+    public static void main(String[] args) throws InterruptedException {
+
+
+        if (args.length < 6) {
+            System.err.println("Usage: MetricAnomalyDetector <method1,method2> <appid1,appid2> <collector_host> <port> <protocol> <zkQuorum>");
+            System.exit(1);
+        }
+
+        List<String> appIds = Arrays.asList(args[1].split(","));
+        String collectorHost = args[2];
+        String collectorPort = args[3];
+        String collectorProtocol = args[4];
+        String zkQuorum = args[5];
+
+        List<MetricAnomalyModel> anomalyDetectionModels = new ArrayList<>();
+        AnomalyMetricPublisher anomalyMetricPublisher = new AnomalyMetricPublisher(collectorHost, collectorProtocol, collectorPort);
+
+        SparkConf sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector");
+
+        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
+
+        for (String method : args[0].split(",")) {
+            if (method.equals("ema")) {
+                LOG.info("Model EMA requested.");
+                EmaModel emaModel = new EmaModelLoader().load(jssc.sparkContext().sc(), "/tmp/model/ema");
+                anomalyDetectionModels.add(emaModel);
+            }
+        }
+
+        JavaPairReceiverInputDStream<String, String> messages =
+                KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads));
+
+        //Convert JSON string to TimelineMetrics.
+        JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() {
+            @Override
+            public TimelineMetrics call(Tuple2<String, String> message) throws Exception {
+                LOG.info(message._2());
+                ObjectMapper mapper = new ObjectMapper();
+                TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class);
+                return metrics;
+            }
+        });
+
+        //Group TimelineMetric by AppId.
+        JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair(
+                timelineMetrics -> new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(),timelineMetrics)
+        );
+
+        appMetricStream.print();
+
+        //Filter AppIds that are not needed.
+        JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() {
+            @Override
+            public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception {
+                return appIds.contains(appMetricTuple._1);
+            }
+        });
+
+        filteredAppMetricStream.print();
+
+        filteredAppMetricStream.foreachRDD(rdd -> {
+            rdd.foreach(
+                    tuple2 -> {
+                        TimelineMetrics metrics = tuple2._2();
+                        LOG.info("Received Metric : " + metrics.getMetrics().get(0).getMetricName());
+                        for (TimelineMetric metric : metrics.getMetrics()) {
+
+                            TimelineMetric timelineMetric =
+                                    new TimelineMetric(metric.getMetricName(), metric.getAppId(), metric.getHostName(), metric.getMetricValues());
+                            LOG.info("Models size : " + anomalyDetectionModels.size());
+
+                            for (MetricAnomalyModel model : anomalyDetectionModels) {
+                                LOG.info("Testing against Model : " + model.getClass().getCanonicalName());
+                                List<MetricAnomaly> anomalies = model.test(timelineMetric);
+                                anomalyMetricPublisher.publish(anomalies);
+                                for (MetricAnomaly anomaly : anomalies) {
+                                    LOG.info(anomaly.getAnomalyAsString());
+                                }
+
+                            }
+                        }
+                    });
+        });
+
+        jssc.start();
+        jssc.awaitTermination();
+    }
+}
+
+
+
+
diff --git a/ambari-metrics-spark/pom.xml b/ambari-metrics-spark/pom.xml
new file mode 100644
index 0000000..33b4257
--- /dev/null
+++ b/ambari-metrics-spark/pom.xml
@@ -0,0 +1,133 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <parent>
+        <artifactId>ambari-metrics</artifactId>
+        <groupId>org.apache.ambari</groupId>
+        <version>2.5.1.0.0</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>ambari-metrics-spark</artifactId>
+    <version>2.5.1.0.0</version>
+    <properties>
+        <scala.version>2.10.4</scala.version>
+    </properties>
+
+    <repositories>
+        <repository>
+            <id>scala-tools.org</id>
+            <name>Scala-Tools Maven2 Repository</name>
+            <url>http://scala-tools.org/repo-releases</url>
+        </repository>
+    </repositories>
+
+    <pluginRepositories>
+        <pluginRepository>
+            <id>scala-tools.org</id>
+            <name>Scala-Tools Maven2 Repository</name>
+            <url>http://scala-tools.org/repo-releases</url>
+        </pluginRepository>
+    </pluginRepositories>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.4</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.specs</groupId>
+            <artifactId>specs</artifactId>
+            <version>1.2.5</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.10</artifactId>
+            <version>1.6.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.10</artifactId>
+            <version>1.6.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.phoenix</groupId>
+            <artifactId>phoenix-spark</artifactId>
+            <version>4.7.0-HBase-1.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.ambari</groupId>
+            <artifactId>ambari-metrics-alertservice</artifactId>
+            <version>2.5.1.0.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api-scala_2.10</artifactId>
+            <version>2.8.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-mllib_2.10</artifactId>
+            <version>2.1.1</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/scala</sourceDirectory>
+        <plugins>
+            <plugin>
+                <groupId>org.scala-tools</groupId>
+                <artifactId>maven-scala-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <scalaVersion>${scala.version}</scalaVersion>
+                    <args>
+                        <arg>-target:jvm-1.5</arg>
+                    </args>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-eclipse-plugin</artifactId>
+                <configuration>
+                    <downloadSources>true</downloadSources>
+                    <buildcommands>
+                        <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
+                    </buildcommands>
+                    <additionalProjectnatures>
+                        <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
+                    </additionalProjectnatures>
+                    <classpathContainers>
+                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+                        <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
+                    </classpathContainers>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    <reporting>
+        <plugins>
+            <plugin>
+                <groupId>org.scala-tools</groupId>
+                <artifactId>maven-scala-plugin</artifactId>
+                <configuration>
+                    <scalaVersion>${scala.version}</scalaVersion>
+                </configuration>
+            </plugin>
+        </plugins>
+    </reporting>
+</project>
diff --git a/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala b/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
new file mode 100644
index 0000000..d4ed31a
--- /dev/null
+++ b/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
@@ -0,0 +1,97 @@
+package org.apache.ambari.metrics.spark
+
+
+import java.util
+import java.util.logging.LogManager
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.kafka._
+import org.apache.ambari.metrics.alertservice.common.{MetricAnomaly, TimelineMetrics}
+import org.apache.ambari.metrics.alertservice.methods.MetricAnomalyModel
+import org.apache.ambari.metrics.alertservice.methods.ema.{EmaModel, EmaModelLoader}
+import org.apache.ambari.metrics.alertservice.spark.AnomalyMetricPublisher
+import org.apache.log4j.Logger
+import org.apache.spark.storage.StorageLevel
+
+import scala.collection.JavaConversions._
+import org.apache.logging.log4j.scala.Logging
+
+object MetricAnomalyDetector extends Logging {
+
+
+  var zkQuorum = "avijayan-ams-1.openstacklocal:2181,avijayan-ams-2.openstacklocal:2181,avijayan-ams-3.openstacklocal:2181"
+  var groupId = "ambari-metrics-group"
+  var topicName = "ambari-metrics-topic"
+  var numThreads = 1
+  val anomalyDetectionModels: Array[MetricAnomalyModel] = Array[MetricAnomalyModel]()
+
+  def main(args: Array[String]): Unit = {
+
+    @transient
+    lazy val log: Logger = org.apache.log4j.LogManager.getLogger("MetricAnomalyDetectorLogger")
+
+    if (args.length < 5) {
+      System.err.println("Usage: MetricAnomalyDetector <method1,method2> <appid1,appid2> <collector_host> <port> <protocol>")
+      System.exit(1)
+    }
+
+    for (method <- args(0).split(",")) {
+      if (method == "ema") anomalyDetectionModels :+ new EmaModel()
+    }
+
+    val appIds = util.Arrays.asList(args(1).split(","))
+
+    val collectorHost = args(2)
+    val collectorPort = args(3)
+    val collectorProtocol = args(4)
+
+    val anomalyMetricPublisher: AnomalyMetricPublisher = new AnomalyMetricPublisher(collectorHost, collectorProtocol, collectorPort)
+
+    val sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector")
+
+    val streamingContext = new StreamingContext(sparkConf, Duration(10000))
+
+    val emaModel = new EmaModelLoader().load(streamingContext.sparkContext, "/tmp/model/ema")
+
+    val kafkaStream = KafkaUtils.createStream(streamingContext, zkQuorum, groupId, Map(topicName -> numThreads), StorageLevel.MEMORY_AND_DISK_SER_2)
+    kafkaStream.print()
+
+    var timelineMetricsStream = kafkaStream.map( message => {
+      val mapper = new ObjectMapper
+      val metrics = mapper.readValue(message._2, classOf[TimelineMetrics])
+      metrics
+    })
+    timelineMetricsStream.print()
+
+    var appMetricStream = timelineMetricsStream.map( timelineMetrics => {
+      (timelineMetrics.getMetrics.get(0).getAppId, timelineMetrics)
+    })
+    appMetricStream.print()
+
+    var filteredAppMetricStream = appMetricStream.filter( appMetricTuple => {
+      appIds.contains(appMetricTuple._1)
+    } )
+    filteredAppMetricStream.print()
+
+    filteredAppMetricStream.foreachRDD( rdd => {
+      rdd.foreach( appMetricTuple => {
+        val timelineMetrics = appMetricTuple._2
+        logger.info("Received Metric (1): " + timelineMetrics.getMetrics.get(0).getMetricName)
+        log.info("Received Metric (2): " + timelineMetrics.getMetrics.get(0).getMetricName)
+        for (timelineMetric <- timelineMetrics.getMetrics) {
+          var anomalies = emaModel.test(timelineMetric)
+          anomalyMetricPublisher.publish(anomalies)
+          for (anomaly <- anomalies) {
+            var an = anomaly : MetricAnomaly
+            logger.info(an.getAnomalyAsString)
+          }
+        }
+      })
+    })
+
+    streamingContext.start()
+    streamingContext.awaitTermination()
+  }
+  }
diff --git a/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala b/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
new file mode 100644
index 0000000..5ca7b17
--- /dev/null
+++ b/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
@@ -0,0 +1,67 @@
+package org.apache.ambari.metrics.spark
+
+import org.apache.ambari.metrics.alertservice.common.TimelineMetric
+import org.apache.ambari.metrics.alertservice.methods.ema.EmaModel
+import org.apache.spark.mllib.stat.Statistics
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.rdd.RDD
+
+object SparkPhoenixReader {
+
+  def main(args: Array[String]) {
+
+    if (args.length < 6) {
+      System.err.println("Usage: SparkPhoenixReader <metric_name> <appId> <hostname> <weight> <timessdev> <phoenixConnectionString> <model_dir>")
+      System.exit(1)
+    }
+
+    var metricName = args(0)
+    var appId = args(1)
+    var hostname = args(2)
+    var weight = args(3).toDouble
+    var timessdev = args(4).toInt
+    var phoenixConnectionString = args(5) //avijayan-ams-3.openstacklocal:61181:/ams-hbase-unsecure
+    var modelDir = args(6)
+
+    val conf = new SparkConf()
+    conf.set("spark.app.name", "AMSAnomalyModelBuilder")
+    //conf.set("spark.master", "spark://avijayan-ams-2.openstacklocal:7077")
+
+    var sc = new SparkContext(conf)
+    val sqlContext = new SQLContext(sc)
+
+    val currentTime = System.currentTimeMillis()
+    val oneDayBack = currentTime - 24*60*60*1000
+
+    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "METRIC_RECORD", "zkUrl" -> phoenixConnectionString))
+    df.registerTempTable("METRIC_RECORD")
+    val result = sqlContext.sql("SELECT METRIC_NAME, HOSTNAME, APP_ID, SERVER_TIME, METRIC_SUM, METRIC_COUNT FROM METRIC_RECORD " +
+      "WHERE METRIC_NAME = '" + metricName + "' AND HOSTNAME = '" + hostname + "' AND APP_ID = '" + appId + "' AND SERVER_TIME < " + currentTime + " AND SERVER_TIME > " + oneDayBack)
+
+    var metricValues = new java.util.TreeMap[java.lang.Long, java.lang.Double]
+    result.collect().foreach(
+      t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5))
+    )
+
+    //val metricName = result.head().getString(0)
+    //val hostname = result.head().getString(1)
+    //val appId = result.head().getString(2)
+
+    val timelineMetric = new TimelineMetric(metricName, appId, hostname, metricValues)
+
+    var emaModel = new EmaModel()
+    emaModel.train(timelineMetric, weight, timessdev)
+    emaModel.save(sc, modelDir)
+
+//    var metricData:Seq[Double] = Seq.empty
+//    result.collect().foreach(
+//      t => metricData :+ t.getDouble(4) / t.getInt(5)
+//    )
+//    val data: RDD[Double] = sc.parallelize(metricData)
+//    val myCDF = Map(0.1 -> 0.2, 0.15 -> 0.6, 0.2 -> 0.05, 0.3 -> 0.05, 0.25 -> 0.1)
+//    val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF)
+
+  }
+
+}
diff --git a/ambari-metrics-timelineservice/pom.xml b/ambari-metrics-timelineservice/pom.xml
index fc67cb1..67b7f4b 100644
--- a/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics-timelineservice/pom.xml
@@ -697,6 +697,11 @@
       <version>1.0.0.0-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
+      <dependency>
+          <groupId>org.apache.ambari</groupId>
+          <artifactId>ambari-metrics-alertservice</artifactId>
+          <version>2.5.1.0.0</version>
+      </dependency>
   </dependencies>
 
   <profiles>
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
similarity index 92%
rename from ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
rename to ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
index 0836a72..3558f87 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
@@ -19,6 +19,7 @@
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
+import org.apache.ambari.metrics.alertservice.spark.AmsKafkaProducer;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -63,10 +64,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
@@ -85,6 +83,7 @@
   private Integer defaultTopNHostsLimit;
   private MetricCollectorHAController haController;
   private boolean containerMetricsDisabled = false;
+  private AmsKafkaProducer kafkaProducer = new AmsKafkaProducer("104.196.85.21:6667");
 
   /**
    * Construct the service.
@@ -372,11 +371,43 @@
     // Error indicated by the Sql exception
     TimelinePutResponse response = new TimelinePutResponse();
 
+    try {
+      if (!metrics.getMetrics().isEmpty() && metrics.getMetrics().get(0).getAppId().equals("HOST")) {
+        kafkaProducer.sendMetrics(fromTimelineMetrics(metrics));
+      }
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.error(e);
+    }
     hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false);
 
     return response;
   }
 
+
+  private org.apache.ambari.metrics.alertservice.common.TimelineMetrics fromTimelineMetrics(TimelineMetrics timelineMetrics) {
+    org.apache.ambari.metrics.alertservice.common.TimelineMetrics otherMetrics = new org.apache.ambari.metrics.alertservice.common.TimelineMetrics();
+
+    List<org.apache.ambari.metrics.alertservice.common.TimelineMetric> timelineMetricList = new ArrayList<>();
+    for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
+      timelineMetricList.add(fromTimelineMetric(timelineMetric));
+    }
+    otherMetrics.setMetrics(timelineMetricList);
+    return otherMetrics;
+  }
+
+  private org.apache.ambari.metrics.alertservice.common.TimelineMetric fromTimelineMetric(TimelineMetric timelineMetric) {
+
+    org.apache.ambari.metrics.alertservice.common.TimelineMetric otherMetric = new org.apache.ambari.metrics.alertservice.common.TimelineMetric();
+    otherMetric.setMetricValues(timelineMetric.getMetricValues());
+    otherMetric.setStartTime(timelineMetric.getStartTime());
+    otherMetric.setHostName(timelineMetric.getHostName());
+    otherMetric.setInstanceId(timelineMetric.getInstanceId());
+    otherMetric.setAppId(timelineMetric.getAppId());
+    otherMetric.setMetricName(timelineMetric.getMetricName());
+
+    return otherMetric;
+  }
+
   @Override
   public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
       throws SQLException, IOException {
diff --git a/pom.xml b/pom.xml
index 47255ea..79ea06f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,6 +34,8 @@
     <module>ambari-metrics-grafana</module>
     <module>ambari-metrics-assembly</module>
     <module>ambari-metrics-host-aggregator</module>
+    <module>ambari-metrics-alertservice</module>
+    <module>ambari-metrics-spark</module>
   </modules>
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>