AMBARI-21106 : ML-Prototype: Detect timeseries anomaly for a metric. (Refine PIT & Trend subsystems, Integrate with AMS, Ambari Alerts.)
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java
index 0c1c6fc..b98f04c 100644
--- a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java
@@ -76,7 +76,6 @@
       JSONArray array = jsonObject.getJSONArray("items");
       for(int i = 0 ; i < array.length() ; i++){
         JSONObject alertDefn = array.getJSONObject(i).getJSONObject("AlertDefinition");
-        LOG.info("alertDefn : " + alertDefn.get("name"));
         if (alertDefn.get("name") != null && alertDefn.get("name").equals("point_in_time_metrics_anomalies")) {
           JSONObject sourceNode = alertDefn.getJSONObject("source");
           JSONArray params = sourceNode.getJSONArray("parameters");
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java
index 7735d6c..61b3dee 100644
--- a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java
@@ -37,6 +37,12 @@
 import scala.Tuple2;
 
 import java.util.*;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 public class MetricSparkConsumer {
 
@@ -47,38 +53,75 @@
   private static long pitStartTime = System.currentTimeMillis();
   private static long ksStartTime = pitStartTime;
   private static long hdevStartTime = ksStartTime;
+  private static Set<Pattern> includeMetricPatterns = new HashSet<>();
+  private static Set<String> includedHosts = new HashSet<>();
+  private static Set<TrendMetric> trendMetrics = new HashSet<>();
 
   public MetricSparkConsumer() {
   }
 
+  public static Properties readProperties(String propertiesFile) {
+    try {
+      Properties properties = new Properties();
+      InputStream inputStream = ClassLoader.getSystemResourceAsStream(propertiesFile);
+      if (inputStream == null) {
+        inputStream = new FileInputStream(propertiesFile);
+      }
+      properties.load(inputStream);
+      return properties;
+    } catch (IOException ioEx) {
+      LOG.error("Error reading properties file for jmeter");
+      return null;
+    }
+  }
+
   public static void main(String[] args) throws InterruptedException {
 
-    if (args.length < 5) {
-      System.err.println("Usage: MetricSparkConsumer <appid1,appid2> <collector_host> <port> <protocol> <zkQuorum>");
+    if (args.length < 1) {
+      System.err.println("Usage: MetricSparkConsumer <input-config-file>");
       System.exit(1);
     }
 
-    List<String> appIds = Arrays.asList(args[0].split(","));
-    String collectorHost = args[1];
-    String collectorPort = args[2];
-    String collectorProtocol = args[3];
-    String zkQuorum = args[4];
+    Properties properties = readProperties(args[0]);
 
-    double emaW = StringUtils.isNotEmpty(args[5]) ? Double.parseDouble(args[5]) : 0.5;
-    double emaN = StringUtils.isNotEmpty(args[8]) ? Double.parseDouble(args[6]) : 3;
-    double tukeysN = StringUtils.isNotEmpty(args[7]) ? Double.parseDouble(args[7]) : 3;
+    List<String> appIds = Arrays.asList(properties.getProperty("appIds").split(","));
 
-    long pitTestInterval = StringUtils.isNotEmpty(args[8]) ? Long.parseLong(args[8]) : 5 * 60 * 1000;
-    long pitTrainInterval = StringUtils.isNotEmpty(args[9]) ? Long.parseLong(args[9]) : 15 * 60 * 1000;
+    String collectorHost = properties.getProperty("collectorHost");
+    String collectorPort = properties.getProperty("collectorPort");
+    String collectorProtocol = properties.getProperty("collectorProtocol");
 
-    String fileName = args[10];
-    long ksTestInterval = StringUtils.isNotEmpty(args[11]) ? Long.parseLong(args[11]) : 10 * 60 * 1000;
-    long ksTrainInterval = StringUtils.isNotEmpty(args[12]) ? Long.parseLong(args[12]) : 10 * 60 * 1000;
-    int hsdevNhp = StringUtils.isNotEmpty(args[13]) ? Integer.parseInt(args[13]) : 3;
-    long hsdevInterval = StringUtils.isNotEmpty(args[14]) ? Long.parseLong(args[14]) : 30 * 60 * 1000;
+    String zkQuorum = properties.getProperty("zkQuorum");
 
-    String ambariServerHost = args[15];
-    String clusterName = args[16];
+    double emaW = Double.parseDouble(properties.getProperty("emaW"));
+    double emaN = Double.parseDouble(properties.getProperty("emaN"));
+    int emaThreshold = Integer.parseInt(properties.getProperty("emaThreshold"));
+    double tukeysN = Double.parseDouble(properties.getProperty("tukeysN"));
+
+    long pitTestInterval = Long.parseLong(properties.getProperty("pointInTimeTestInterval"));
+    long pitTrainInterval = Long.parseLong(properties.getProperty("pointInTimeTrainInterval"));
+
+    long ksTestInterval = Long.parseLong(properties.getProperty("ksTestInterval"));
+    long ksTrainInterval = Long.parseLong(properties.getProperty("ksTrainInterval"));
+    int hsdevNhp = Integer.parseInt(properties.getProperty("hsdevNhp"));
+    long hsdevInterval = Long.parseLong(properties.getProperty("hsdevInterval"));
+
+    String ambariServerHost = properties.getProperty("ambariServerHost");
+    String clusterName = properties.getProperty("clusterName");
+
+    String includeMetricPatternStrings = properties.getProperty("includeMetricPatterns");
+    if (includeMetricPatternStrings != null && !includeMetricPatternStrings.isEmpty()) {
+      String[] patterns = includeMetricPatternStrings.split(",");
+      for (String p : patterns) {
+        LOG.info("Included Pattern : " + p);
+        includeMetricPatterns.add(Pattern.compile(p));
+      }
+    }
+
+    String includedHostList = properties.getProperty("hosts");
+    if (includedHostList != null && !includedHostList.isEmpty()) {
+      String[] hosts = includedHostList.split(",");
+      includedHosts.addAll(Arrays.asList(hosts));
+    }
 
     MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort);
 
@@ -86,7 +129,7 @@
 
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
 
-    EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN);
+    EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN, emaThreshold);
     PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(metricsCollectorInterface,
       tukeysN,
       pitTestInterval,
@@ -97,13 +140,14 @@
     TrendADSystem trendADSystem = new TrendADSystem(metricsCollectorInterface,
       ksTestInterval,
       ksTrainInterval,
-      hsdevNhp,
-      fileName);
+      hsdevNhp);
 
     Broadcast<EmaTechnique> emaTechniqueBroadcast = jssc.sparkContext().broadcast(emaTechnique);
     Broadcast<PointInTimeADSystem> pointInTimeADSystemBroadcast = jssc.sparkContext().broadcast(pointInTimeADSystem);
     Broadcast<TrendADSystem> trendADSystemBroadcast = jssc.sparkContext().broadcast(trendADSystem);
     Broadcast<MetricsCollectorInterface> metricsCollectorInterfaceBroadcast = jssc.sparkContext().broadcast(metricsCollectorInterface);
+    Broadcast<Set<Pattern>> includePatternBroadcast = jssc.sparkContext().broadcast(includeMetricPatterns);
+    Broadcast<Set<String>> includedHostBroadcast = jssc.sparkContext().broadcast(includedHosts);
 
     JavaPairReceiverInputDStream<String, String> messages =
       KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads));
@@ -150,7 +194,7 @@
 
           if (currentTime > ksStartTime + ksTestInterval) {
             LOG.info("Running KS Test....");
-            trendADSystemBroadcast.getValue().runKSTest(currentTime);
+            trendADSystemBroadcast.getValue().runKSTest(currentTime, trendMetrics);
             ksStartTime = ksStartTime + ksTestInterval;
           }
 
@@ -162,8 +206,27 @@
 
           TimelineMetrics metrics = tuple2._2();
           for (TimelineMetric timelineMetric : metrics.getMetrics()) {
-            List<MetricAnomaly> anomalies = ema.test(timelineMetric);
-            metricsCollectorInterfaceBroadcast.getValue().publish(anomalies);
+
+            boolean includeHost = includedHostBroadcast.getValue().contains(timelineMetric.getHostName());
+            boolean includeMetric = false;
+            if (includeHost) {
+              if (includePatternBroadcast.getValue().isEmpty()) {
+                includeMetric = true;
+              }
+              for (Pattern p : includePatternBroadcast.getValue()) {
+                Matcher m = p.matcher(timelineMetric.getMetricName());
+                if (m.find()) {
+                  includeMetric = true;
+                }
+              }
+            }
+
+            if (includeMetric) {
+              trendMetrics.add(new TrendMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(),
+                timelineMetric.getHostName()));
+              List<MetricAnomaly> anomalies = ema.test(timelineMetric);
+              metricsCollectorInterfaceBroadcast.getValue().publish(anomalies);
+            }
           }
         });
     });
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java
index 7b3f63d..dab4a0a 100644
--- a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java
@@ -96,7 +96,7 @@
         emitMetrics(timelineMetrics);
       }
     } else {
-      LOG.info("No anomalies to send.");
+      LOG.debug("No anomalies to send.");
     }
   }
 
@@ -130,7 +130,7 @@
   public boolean emitMetrics(TimelineMetrics metrics) {
     String connectUrl = constructTimelineMetricUri();
     String jsonData = null;
-    LOG.info("EmitMetrics connectUrl = " + connectUrl);
+    LOG.debug("EmitMetrics connectUrl = " + connectUrl);
     try {
       jsonData = mapper.writeValueAsString(metrics);
       LOG.info(jsonData);
@@ -202,7 +202,7 @@
 
     String url = constructTimelineMetricUri() + "?metricNames=" + metricName + "&appId=" + appId +
       "&hostname=" + hostname + "&startTime=" + startime + "&endTime=" + endtime;
-    LOG.info("Fetch metrics URL : " + url);
+    LOG.debug("Fetch metrics URL : " + url);
 
     URL obj = null;
     BufferedReader in = null;
@@ -213,8 +213,8 @@
       HttpURLConnection con = (HttpURLConnection) obj.openConnection();
       con.setRequestMethod("GET");
       int responseCode = con.getResponseCode();
-      LOG.info("Sending 'GET' request to URL : " + url);
-      LOG.info("Response Code : " + responseCode);
+      LOG.debug("Sending 'GET' request to URL : " + url);
+      LOG.debug("Response Code : " + responseCode);
 
       in = new BufferedReader(
         new InputStreamReader(con.getInputStream()));
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java
index b4a8593..b3e7bd3 100644
--- a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java
@@ -49,7 +49,7 @@
   private AmbariServerInterface ambariServerInterface;
   private int sensitivity = 50;
   private int minSensitivity = 0;
-  private int maxSensitivity = 10;
+  private int maxSensitivity = 100;
 
   public PointInTimeADSystem(MetricsCollectorInterface metricsCollectorInterface, double defaultTukeysN,
                              long testIntervalMillis, long trainIntervalMillis, String ambariServerHost, String clusterName) {
@@ -73,13 +73,13 @@
       if (requiredSensivity > sensitivity) {
         int targetSensitivity = Math.min(maxSensitivity, requiredSensivity);
         while (sensitivity < targetSensitivity) {
-          defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.1;
+          defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.05;
           sensitivity++;
         }
       } else {
         int targetSensitivity = Math.max(minSensitivity, requiredSensivity);
         while (sensitivity > targetSensitivity) {
-          defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.1;
+          defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.05;
           sensitivity--;
         }
       }
@@ -201,10 +201,10 @@
 
       if (recall < 0.5) {
         LOG.info("Increasing EMA sensitivity by 10%");
-        emaModel.updateModel(true, 10);
+        emaModel.updateModel(true, 5);
       } else if (precision < 0.5) {
         LOG.info("Decreasing EMA sensitivity by 10%");
-        emaModel.updateModel(false, 10);
+        emaModel.updateModel(false, 5);
       }
 
     }
@@ -233,7 +233,7 @@
       double[] anomalyScore = result.resultset.get(2);
       for (int i = 0; i < ts.length; i++) {
         TimelineMetric timelineMetric = new TimelineMetric();
-        timelineMetric.setMetricName(metricName + "_" + appId + "_" + hostname);
+        timelineMetric.setMetricName(metricName + ":" + appId + ":" + hostname);
         timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
         timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-tukeys");
         timelineMetric.setInstanceId(null);
@@ -243,7 +243,11 @@
 
         HashMap<String, String> metadata = new HashMap<>();
         metadata.put("method", "tukeys");
-        metadata.put("anomaly-score", String.valueOf(anomalyScore[i]));
+        if (String.valueOf(anomalyScore[i]).equals("infinity")) {
+          LOG.info("Got anomalyScore = infinity for " + metricName + ":" + appId + ":" + hostname);
+        } else {
+          metadata.put("anomaly-score", String.valueOf(anomalyScore[i]));
+        }
         timelineMetric.setMetadata(metadata);
 
         timelineMetric.setMetricValues(metricValues);
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java
index 1534b55..df36a4a 100644
--- a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java
@@ -31,11 +31,11 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 
 public class TrendADSystem implements Serializable {
@@ -57,8 +57,7 @@
   public TrendADSystem(MetricsCollectorInterface metricsCollectorInterface,
                        long ksTestIntervalMillis,
                        long ksTrainIntervalMillis,
-                       int hsdevNumHistoricalPeriods,
-                       String inputFileName) {
+                       int hsdevNumHistoricalPeriods) {
 
     this.metricsCollectorInterface = metricsCollectorInterface;
     this.ksTestIntervalMillis = ksTestIntervalMillis;
@@ -69,11 +68,9 @@
     this.hsdevTechnique = new HsdevTechnique();
 
     trendMetrics = new ArrayList<>();
-    this.inputFile = inputFileName;
-    readInputFile(inputFileName);
   }
 
-  public void runKSTest(long currentEndTime) {
+  public void runKSTest(long currentEndTime, Set<TrendMetric> trendMetrics) {
     readInputFile(inputFile);
 
     long ksTestIntervalStartTime = currentEndTime - ksTestIntervalMillis;
@@ -85,7 +82,7 @@
       String metricName = metric.metricName;
       String appId = metric.appId;
       String hostname = metric.hostname;
-      String key = metricName + "_" + appId + "_" + hostname;
+      String key = metricName + ":" + appId + ":" + hostname;
 
       TimelineMetrics ksData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, ksTestIntervalStartTime - ksTrainIntervalMillis,
         currentEndTime);
@@ -112,6 +109,7 @@
         }
       }
 
+      LOG.info("Train Data size : " + trainDataList.size() + ", Test Data Size : " + testDataList.size());
       if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
         LOG.info("Not enough train/test data to perform KS analysis.");
         continue;
@@ -184,6 +182,7 @@
     return timelineMetric;
 
   }
+
   public void runHsdevMethod() {
 
     List<TimelineMetric> hsdevMetricAnomalies = new ArrayList<>();
@@ -315,17 +314,4 @@
       this.hostname = hostname;
     }
   }
-
-  /*
-          boolean isPresent = false;
-        for (TrendMetric trendMetric : trendMetrics) {
-          if (trendMetric.metricName.equalsIgnoreCase(splits[0])) {
-            isPresent = true;
-          }
-        }
-        if (!isPresent) {
-          LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]);
-          trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2]));
-        }
-   */
 }
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
index 5e1f76b..a31410d 100644
--- a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
@@ -23,6 +23,8 @@
 import javax.xml.bind.annotation.XmlRootElement;
 import java.io.Serializable;
 
+import static org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique.suppressAnomaliesTheshold;
+
 @XmlRootElement
 public class EmaModel implements Serializable {
 
@@ -35,7 +37,6 @@
   private double timessdev;
 
   private int ctr = 0;
-  private static final int suppressAnomaliesTheshold = 30;
 
   private static final Log LOG = LogFactory.getLog(EmaModel.class);
 
@@ -64,30 +65,36 @@
   public double testAndUpdate(double metricValue) {
 
     double anomalyScore = 0.0;
+    LOG.info("Before Update ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + ", timessdev = " + timessdev);
+    update(metricValue);
     if (ctr > suppressAnomaliesTheshold) {
       anomalyScore = test(metricValue);
-    }
-    if (Math.abs(anomalyScore) < 2 * timessdev) {
-      update(metricValue);
+      if (anomalyScore > 0.0) {
+        LOG.info("Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems +
+          ", timessdev = " + timessdev + ", metricValue = " + metricValue);
+      } else {
+        LOG.info("Not an Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems +
+          ", timessdev = " + timessdev + ", metricValue = " + metricValue);
+      }
     } else {
-      LOG.info("Not updating model for this value");
+      ctr++;
+      if (ctr > suppressAnomaliesTheshold) {
+        LOG.info("Ema Model for " + metricName + ":" + appId + ":" + hostname + " is ready for testing data.");
+      }
     }
-    ctr++;
-    LOG.info("Counter : " + ctr);
-    LOG.info("Anomaly Score for " + metricValue + " : " + anomalyScore);
     return anomalyScore;
   }
 
   public void update(double metricValue) {
     ema = weight * ema + (1 - weight) * metricValue;
     ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0));
-    LOG.info("In update : ema = " + ema + ", ems = " + ems);
+    LOG.debug("In update : ema = " + ema + ", ems = " + ems);
   }
 
   public double test(double metricValue) {
-    LOG.info("In test : ema = " + ema + ", ems = " + ems);
+    LOG.debug("In test : ema = " + ema + ", ems = " + ems);
     double diff = Math.abs(ema - metricValue) - (timessdev * ems);
-    LOG.info("diff = " + diff);
+    LOG.debug("diff = " + diff);
     if (diff > 0) {
       return Math.abs((metricValue - ema) / ems); //Z score
     } else {
@@ -102,7 +109,7 @@
       delta = delta * -1;
     }
     this.timessdev = timessdev + delta * timessdev;
-    this.weight = Math.min(1.0, weight + delta * weight);
+    //this.weight = Math.min(1.0, weight + delta * weight);
     LOG.info("New model parameters " + metricName + " : timessdev = " + timessdev + ", weight = " + weight);
   }
 
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
index c005e6f..52c6cf3 100644
--- a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
@@ -49,6 +49,15 @@
   private double startingWeight = 0.5;
   private double startTimesSdev = 3.0;
   private String methodType = "ema";
+  public static int suppressAnomaliesTheshold = 100;
+
+  public EmaTechnique(double startingWeight, double startTimesSdev, int suppressAnomaliesTheshold) {
+    trackedEmas = new HashMap<>();
+    this.startingWeight = startingWeight;
+    this.startTimesSdev = startTimesSdev;
+    EmaTechnique.suppressAnomaliesTheshold = suppressAnomaliesTheshold;
+    LOG.info("New EmaTechnique......");
+  }
 
   public EmaTechnique(double startingWeight, double startTimesSdev) {
     trackedEmas = new HashMap<>();
@@ -61,16 +70,16 @@
     String metricName = metric.getMetricName();
     String appId = metric.getAppId();
     String hostname = metric.getHostName();
-    String key = metricName + "_" + appId + "_" + hostname;
+    String key = metricName + ":" + appId + ":" + hostname;
 
     EmaModel emaModel = trackedEmas.get(key);
     if (emaModel == null) {
-      LOG.info("EmaModel not present for " + key);
-      LOG.info("Number of tracked Emas : " + trackedEmas.size());
+      LOG.debug("EmaModel not present for " + key);
+      LOG.debug("Number of tracked Emas : " + trackedEmas.size());
       emaModel  = new EmaModel(metricName, hostname, appId, startingWeight, startTimesSdev);
       trackedEmas.put(key, emaModel);
     } else {
-      LOG.info("EmaModel already present for " + key);
+      LOG.debug("EmaModel already present for " + key);
     }
 
     List<MetricAnomaly> anomalies = new ArrayList<>();
@@ -79,11 +88,11 @@
       double metricValue = metric.getMetricValues().get(timestamp);
       double anomalyScore = emaModel.testAndUpdate(metricValue);
       if (anomalyScore > 0.0) {
-        LOG.info("Found anomaly for : " + key);
+        LOG.info("Found anomaly for : " + key + ", anomalyScore = " + anomalyScore);
         MetricAnomaly metricAnomaly = new MetricAnomaly(key, timestamp, metricValue, methodType, anomalyScore);
         anomalies.add(metricAnomaly);
       } else {
-        LOG.info("Discarding non-anomaly for : " + key);
+        LOG.debug("Discarding non-anomaly for : " + key);
       }
     }
     return anomalies;
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
index 50bf9f2..04f4a73 100644
--- a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
@@ -58,19 +58,23 @@
     double historicMedian = median(trainData.values);
     double currentMedian = median(testData.values);
 
-    double diff = Math.abs(currentMedian - historicMedian);
-    LOG.info("Found anomaly for metric : " + key + " in the period ending " + new Date((long)testData.ts[testLength - 1]));
-    LOG.info("Current median = " + currentMedian + ", Historic Median = " + historicMedian + ", HistoricSd = " + historicSd);
 
-    if (diff > n * historicSd) {
-      double zScore = diff / historicSd;
-      LOG.info("Z Score of current series : " + zScore);
-      return new MetricAnomaly(key,
-        (long) testData.ts[testLength - 1],
-        testData.values[testLength - 1],
-        methodType,
-        zScore);
+    if (historicSd > 0) {
+      double diff = Math.abs(currentMedian - historicMedian);
+      LOG.info("Found anomaly for metric : " + key + " in the period ending " + new Date((long)testData.ts[testLength - 1]));
+      LOG.info("Current median = " + currentMedian + ", Historic Median = " + historicMedian + ", HistoricSd = " + historicSd);
+
+      if (diff > n * historicSd) {
+        double zScore = diff / historicSd;
+        LOG.info("Z Score of current series : " + zScore);
+        return new MetricAnomaly(key,
+          (long) testData.ts[testLength - 1],
+          testData.values[testLength - 1],
+          methodType,
+          zScore);
+      }
     }
+
     return null;
   }
 
diff --git a/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r b/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
index f33b6ec..0312226 100644
--- a/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
+++ b/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
@@ -26,20 +26,23 @@
   anomalies <- data.frame()
   quantiles <- quantile(train_data[,2])
   iqr <- quantiles[4] - quantiles[2]
+  niqr <- 0
 
   for ( i in 1:length(test_data[,1])) {
     x <- test_data[i,2]
     lb <- quantiles[2] - n*iqr
     ub <- quantiles[4] + n*iqr
     if ( (x < lb)  || (x > ub) ) {
-      if (x < lb) {
-        niqr <- (quantiles[2] - x) / iqr
-      } else {
-        niqr <- (x - quantiles[4]) / iqr
+      if (iqr != 0) {
+        if (x < lb) {
+          niqr <- (quantiles[2] - x) / iqr
+        } else {
+          niqr <- (x - quantiles[4]) / iqr
+        }
       }
-      anomaly <- c(test_data[i,1], x, niqr)
-      anomalies <- rbind(anomalies, anomaly)
-    }
+        anomaly <- c(test_data[i,1], x, niqr)
+        anomalies <- rbind(anomalies, anomaly)
+      }
   }
   if(length(anomalies) > 0) {
     names(anomalies) <- c("TS", "Value", "niqr")
diff --git a/ambari-metrics-alertservice/src/main/resources/input-config.properties b/ambari-metrics-alertservice/src/main/resources/input-config.properties
new file mode 100644
index 0000000..88304c7
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/resources/input-config.properties
@@ -0,0 +1,24 @@
+appIds=HOST
+
+collectorHost=localhost
+collectorPort=6188
+collectorProtocol=http
+
+zkQuorum=localhost:2181
+
+ambariServerHost=localhost
+clusterName=c1
+
+emaW=0.8
+emaN=3
+tukeysN=3
+pointInTimeTestInterval=300000
+pointInTimeTrainInterval=900000
+
+ksTestInterval=600000
+ksTrainInterval=600000
+hsdevNhp=3
+hsdevInterval=1800000;
+
+skipMetricPatterns=sdisk*,cpu_sintr*,proc*,disk*,boottime
+hosts=avijayan-ad-1.openstacklocal
\ No newline at end of file
diff --git a/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java b/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
index 539ca40..d1e2b41 100644
--- a/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
+++ b/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
@@ -21,21 +21,41 @@
 import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
 import java.util.List;
 import java.util.TreeMap;
 
+import static org.apache.ambari.metrics.alertservice.prototype.TestRFunctionInvoker.getTS;
+
 public class TestEmaTechnique {
 
+  private static double[] ts;
+  private static String fullFilePath;
+
+  @BeforeClass
+  public static void init() throws URISyntaxException {
+
+    Assume.assumeTrue(System.getenv("R_HOME") != null);
+    ts = getTS(1000);
+    URL url = ClassLoader.getSystemResource("R-scripts");
+    fullFilePath = new File(url.toURI()).getAbsolutePath();
+    RFunctionInvoker.setScriptsDir(fullFilePath);
+  }
+
   @Test
   public void testEmaInitialization() {
 
     EmaTechnique ema = new EmaTechnique(0.5, 3);
     Assert.assertTrue(ema.getTrackedEmas().isEmpty());
     Assert.assertTrue(ema.getStartingWeight() == 0.5);
-    Assert.assertTrue(ema.getStartTimesSdev() == 3);
+    Assert.assertTrue(ema.getStartTimesSdev() == 2);
   }
 
   @Test
diff --git a/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java b/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
index bb409cf..ef0125f 100644
--- a/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
+++ b/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
@@ -21,7 +21,6 @@
 import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
diff --git a/ambari-metrics-grafana/src/main/scripted.js b/ambari-metrics-grafana/src/main/scripted.js
new file mode 100644
index 0000000..298535f
--- /dev/null
+++ b/ambari-metrics-grafana/src/main/scripted.js
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* global _ */
+
+/*
+ * Complex scripted dashboard
+ * This script generates a dashboard object that Grafana can load. It also takes a number of user
+ * supplied URL parameters (in the ARGS variable)
+ *
+ * Return a dashboard object, or a function
+ *
+ * For async scripts, return a function, this function must take a single callback function as argument,
+ * call this callback function with the dashboard object (look at scripted_async.js for an example)
+ */
+
+'use strict';
+
+// accessible variables in this scope
+var window, document, ARGS, $, jQuery, moment, kbn;
+
+// Setup some variables
+var dashboard;
+
+// All url parameters are available via the ARGS object
+var ARGS;
+
+// Intialize a skeleton with nothing but a rows array and service object
+dashboard = {
+    rows : [],
+};
+
+// Set a title
+dashboard.title = 'Scripted dash';
+
+// Set default time
+// time can be overriden in the url using from/to parameters, but this is
+// handled automatically in grafana core during dashboard initialization
+
+
+var obj = JSON.parse(ARGS.anomalies);
+var metrics = obj.metrics;
+var rows = metrics.length
+
+dashboard.time = {
+    from: "now-1h",
+    to: "now"
+};
+
+var metricSet = new Set();
+
+for (var i = 0; i < rows; i++) {
+
+    var key = metrics[i].metricname;
+    if (metricSet.has(key)) {
+        continue;
+    }
+    metricSet.add(key)
+    var metricKeyElements = key.split(":");
+    var metricName = metricKeyElements[0];
+    var appId = metricKeyElements[1];
+    var hostname = metricKeyElements[2];
+
+    dashboard.rows.push({
+        title: 'Chart',
+        height: '300px',
+        panels: [
+            {
+                title: metricName,
+                type: 'graph',
+                span: 12,
+                fill: 1,
+                linewidth: 2,
+                targets: [
+                    {
+                        "aggregator": "none",
+                        "alias": metricName,
+                        "app": appId,
+                        "errors": {},
+                        "metric": metricName,
+                        "precision": "default",
+                        "refId": "A",
+                        "hosts": hostname
+                    }
+                ],
+                seriesOverrides: [
+                    {
+                        alias: '/random/',
+                        yaxis: 2,
+                        fill: 0,
+                        linewidth: 5
+                    }
+                ],
+                tooltip: {
+                    shared: true
+                }
+            }
+        ]
+    });
+}
+
+
+return dashboard;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/TestMetricSeriesGenerator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/TestMetricSeriesGenerator.java
new file mode 100644
index 0000000..2420ef3
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/TestMetricSeriesGenerator.java
@@ -0,0 +1,87 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics;
+
+import org.apache.ambari.metrics.alertservice.prototype.TestSeriesInputRequest;
+import org.apache.ambari.metrics.alertservice.seriesgenerator.AbstractMetricSeries;
+import org.apache.ambari.metrics.alertservice.seriesgenerator.MetricSeriesGeneratorFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class TestMetricSeriesGenerator implements Runnable {
+
+  private Map<TestSeriesInputRequest, AbstractMetricSeries> configuredSeries = new HashMap<>();
+  private static final Log LOG = LogFactory.getLog(TestMetricSeriesGenerator.class);
+  private TimelineMetricStore metricStore;
+  private String hostname;
+
+  public TestMetricSeriesGenerator(TimelineMetricStore metricStore) {
+    this.metricStore = metricStore;
+    try {
+      this.hostname = InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void addSeries(TestSeriesInputRequest inputRequest) {
+    if (!configuredSeries.containsKey(inputRequest)) {
+      AbstractMetricSeries metricSeries = MetricSeriesGeneratorFactory.generateSeries(inputRequest.getSeriesType(), inputRequest.getConfigs());
+      configuredSeries.put(inputRequest, metricSeries);
+      LOG.info("Added series " + inputRequest.getSeriesName());
+    }
+  }
+
+  public void removeSeries(String seriesName) {
+    boolean isPresent = false;
+    TestSeriesInputRequest tbd = null;
+    for (TestSeriesInputRequest inputRequest : configuredSeries.keySet()) {
+      if (inputRequest.getSeriesName().equals(seriesName)) {
+        isPresent = true;
+        tbd = inputRequest;
+      }
+    }
+    if (isPresent) {
+      LOG.info("Removing series " + seriesName);
+      configuredSeries.remove(tbd);
+    } else {
+      LOG.info("Series not found : " + seriesName);
+    }
+  }
+
+  @Override
+  public void run() {
+    long currentTime = System.currentTimeMillis();
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+
+    for (TestSeriesInputRequest input : configuredSeries.keySet()) {
+      AbstractMetricSeries metricSeries = configuredSeries.get(input);
+      TimelineMetric timelineMetric = new TimelineMetric();
+      timelineMetric.setMetricName(input.getSeriesName());
+      timelineMetric.setAppId("anomaly-engine-test-metric");
+      timelineMetric.setInstanceId(null);
+      timelineMetric.setStartTime(currentTime);
+      timelineMetric.setHostName(hostname);
+      TreeMap<Long, Double> metricValues = new TreeMap();
+      metricValues.put(currentTime, metricSeries.nextValue());
+      timelineMetric.setMetricValues(metricValues);
+      timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+      LOG.info("Emitting metric with appId = " + timelineMetric.getAppId());
+    }
+    try {
+      LOG.info("Publishing test metrics for " + timelineMetrics.getMetrics().size() + " series.");
+      metricStore.putMetrics(timelineMetrics);
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+  }
+}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
index 95682f9..4450d65 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
@@ -157,6 +157,10 @@
               "start cache node", e);
         }
       }
+//      String kafkaServers = configuration.getKafkaServers();
+//      if (kafkaServers != null) {
+//        metricKafkaProducer = new MetricKafkaProducer(kafkaServers);
+//      }
 
       defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20"));
       if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
@@ -235,6 +239,11 @@
   }
 
   @Override
+  public TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) throws SQLException {
+    return hBaseAccessor.getAnomalyMetricRecords(method, startTime, endTime, limit);
+  }
+
+  @Override
   public TimelineMetrics getTimelineMetrics(List<String> metricNames,
       List<String> hostnames, String applicationId, String instanceId,
       Long startTime, Long endTime, Precision precision, Integer limit,
@@ -403,10 +412,17 @@
       cache.putMetrics(metrics.getMetrics(), metricMetadataManager);
     }
 
+//    try {
+//      metricKafkaProducer.sendMetrics(metrics);
+////      if (metrics.getMetrics().size() != 0 && metrics.getMetrics().get(0).getAppId().equals("anomaly-engine-test-metric")) {
+////      }
+//    } catch (Exception e) {
+//      LOG.error(e);
+//    }
+
     return response;
   }
 
-
   @Override
   public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
       throws SQLException, IOException {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index da14fd1..f470c58 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATE_TABLE_SPLIT_POINTS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATORS_SKIP_BLOCK_CACHE;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
@@ -35,7 +34,6 @@
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_SPLIT_POINTS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES;
@@ -50,15 +48,18 @@
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_METRICS_METADATA_TABLE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ANOMALY_METRICS_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_INSTANCE_HOST_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_ANOMALY_METRICS_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_METADATA_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_TREND_ANOMALY_METRICS_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_HOSTED_APPS_METADATA_SQL;
@@ -73,7 +74,9 @@
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.TREND_ANOMALY_METRICS_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_ANOMALY_METRICS_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL;
@@ -81,6 +84,7 @@
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_INSTANCE_HOST_METADATA_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_TREND_ANOMALY_METRICS_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME.RAW_METRICS;
 
 import java.io.IOException;
@@ -309,11 +313,63 @@
     commitMetrics(Collections.singletonList(timelineMetrics));
   }
 
+  private void commitAnomalyMetric(Connection conn, TimelineMetric metric) {
+    PreparedStatement metricRecordStmt = null;
+    try {
+
+      Map<String, String> metricMetadata = metric.getMetadata();
+
+
+      byte[] uuid = metadataManagerInstance.getUuid(metric);
+      if (uuid == null) {
+        LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString());
+        return;
+      }
+
+      if (metric.getAppId().equals("anomaly-engine-ks") || metric.getAppId().equals("anomaly-engine-hsdev")) {
+        metricRecordStmt = conn.prepareStatement(String.format(UPSERT_TREND_ANOMALY_METRICS_SQL,
+          TREND_ANOMALY_METRICS_TABLE_NAME));
+
+        metricRecordStmt.setBytes(1, uuid);
+        metricRecordStmt.setLong(2, metric.getStartTime());
+        metricRecordStmt.setLong(3, Long.parseLong(metricMetadata.get("test-start-time")));
+        metricRecordStmt.setLong(4, Long.parseLong(metricMetadata.get("train-start-time")));
+        metricRecordStmt.setLong(5, Long.parseLong(metricMetadata.get("train-end-time")));
+        String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+        metricRecordStmt.setString(6, json);
+        metricRecordStmt.setString(7, metric.getMetadata().get("method"));
+        double anomalyScore = metric.getMetadata().containsKey("anomaly-score") ? Double.parseDouble(metric.getMetadata().get("anomaly-score"))  : 0.0;
+        metricRecordStmt.setDouble(8, anomalyScore);
+
+      } else {
+        metricRecordStmt = conn.prepareStatement(String.format(
+          UPSERT_ANOMALY_METRICS_SQL, ANOMALY_METRICS_TABLE_NAME));
+
+        metricRecordStmt.setBytes(1, uuid);
+        metricRecordStmt.setLong(2, metric.getStartTime());
+        String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+        metricRecordStmt.setString(3, json);
+        metricRecordStmt.setString(4, metric.getMetadata().get("method"));
+        double anomalyScore = metric.getMetadata().containsKey("anomaly-score") ? Double.parseDouble(metric.getMetadata().get("anomaly-score"))  : 0.0;
+        metricRecordStmt.setDouble(5, anomalyScore);
+      }
+
+      try {
+        metricRecordStmt.executeUpdate();
+      } catch (SQLException sql) {
+        LOG.error("Failed on insert records to store.", sql);
+      }
+
+    } catch (Exception e) {
+      LOG.error("Failed on insert records to anomaly table.", e);
+    }
+
+  }
+
   public void commitMetrics(Collection<TimelineMetrics> timelineMetricsCollection) {
     LOG.debug("Committing metrics to store");
     Connection conn = null;
     PreparedStatement metricRecordStmt = null;
-    long currentTime = System.currentTimeMillis();
 
     try {
       conn = getConnection();
@@ -321,6 +377,10 @@
               UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
       for (TimelineMetrics timelineMetrics : timelineMetricsCollection) {
         for (TimelineMetric metric : timelineMetrics.getMetrics()) {
+          if (metric.getAppId().startsWith("anomaly-engine") && !metric.getAppId().equals("anomaly-engine-test-metric")) {
+            commitAnomalyMetric(conn, metric);
+          }
+
           metricRecordStmt.clearParameters();
 
           if (LOG.isTraceEnabled()) {
@@ -469,6 +529,20 @@
       stmt.executeUpdate( String.format(CREATE_CONTAINER_METRICS_TABLE_SQL,
         encoding, tableTTL.get(CONTAINER_METRICS_TABLE_NAME), compression));
 
+      //Anomaly Metrics
+      stmt.executeUpdate(String.format(CREATE_ANOMALY_METRICS_TABLE_SQL,
+        ANOMALY_METRICS_TABLE_NAME,
+        encoding,
+        tableTTL.get(METRICS_AGGREGATE_HOURLY_TABLE_NAME),
+        compression));
+
+      //Trend Anomaly Metrics
+      stmt.executeUpdate(String.format(CREATE_TREND_ANOMALY_METRICS_TABLE_SQL,
+        TREND_ANOMALY_METRICS_TABLE_NAME,
+        encoding,
+        tableTTL.get(METRICS_AGGREGATE_HOURLY_TABLE_NAME),
+        compression));
+
       // Host level
       String precisionSql = String.format(CREATE_METRICS_TABLE_SQL,
         encoding, tableTTL.get(METRICS_RECORD_TABLE_NAME), compression);
@@ -842,6 +916,48 @@
     insertMetricRecords(metrics, false);
   }
 
+  public TimelineMetrics getAnomalyMetricRecords(String method, long startTime, long endTime, Integer limit) throws SQLException {
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+    TimelineMetrics metrics = new TimelineMetrics();
+    try {
+      stmt = PhoenixTransactSQL.prepareAnomalyMetricsGetSqlStatement(conn, method, startTime, endTime, limit);
+      rs = stmt.executeQuery();
+      while (rs.next()) {
+
+        byte[] uuid = rs.getBytes("UUID");
+        TimelineMetric metric = metadataManagerInstance.getMetricFromUuid(uuid);
+
+        if (method.equals("ks") || method.equals("hsdev")) {
+          metric.setStartTime(rs.getLong("TEST_END_TIME"));
+        } else {
+          metric.setStartTime(rs.getLong("SERVER_TIME"));
+        }
+        metric.setInstanceId(null);
+
+        HashMap<String, String> metadata = new HashMap<>();
+        metadata.put("method", rs.getString("METHOD"));
+        metadata.put("anomaly-score", String.valueOf(rs.getDouble("ANOMALY_SCORE")));
+        if (method.equals("ks") || method.equals("hsdev")) {
+          metadata.put("test-start-time", String.valueOf(rs.getLong("TEST_START_TIME")));
+          metadata.put("train-start-time", String.valueOf(rs.getLong("TRAIN_START_TIME")));
+          metadata.put("train-end-time", String.valueOf(rs.getLong("TRAIN_END_TIME")));
+        }
+        metric.setMetadata(metadata);
+
+        TreeMap<Long, Double> sortedByTimeMetrics = readMetricFromJSON(rs.getString("METRICS"));
+        metric.setMetricValues(sortedByTimeMetrics);
+
+        metrics.getMetrics().add(metric);
+      }
+    } catch (Exception ex) {
+      LOG.error(ex);
+    }
+    return metrics;
+  }
+
+
   @SuppressWarnings("unchecked")
   public TimelineMetrics getMetricRecords(
     final Condition condition, Multimap<String, List<Function>> metricFunctions)
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 258e9c6..85dad1f 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -322,16 +322,14 @@
   public static final String TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES =
     "timeline.metrics.precision.table.hbase.hstore.blockingStoreFiles";
 
-<<<<<<< HEAD
   public static final String TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS =
     "timeline.metrics.support.multiple.clusters";
 
   public static final String TIMELINE_METRICS_EVENT_METRIC_PATTERNS =
     "timeline.metrics.downsampler.event.metric.patterns";
-=======
+
   public static final String TIMELINE_METRICS_UUID_GEN_STRATEGY =
     "timeline.metrics.uuid.gen.strategy";
->>>>>>> AMBARI-21214 : Use a uuid vs long row key for metrics in AMS schema. (avijayan)
 
   public static final String HOST_APP_ID = "HOST";
 
@@ -534,6 +532,13 @@
     return defaultRpcAddress;
   }
 
+  public String getKafkaServers() {
+    if (metricsConf != null) {
+      return metricsConf.get("timeline.metrics.kafka.servers", null);
+    }
+    return null;
+  }
+
   public boolean isDistributedCollectorModeDisabled() {
     try {
       if (getMetricsConf() != null) {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
index dab4494..cdeefdc 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -107,4 +107,6 @@
      * @return [ hostname ]
      */
   List<String> getLiveInstances();
+
+  TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) throws SQLException;
 }
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
index 2478fb1..75a9d28 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -27,6 +27,7 @@
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -37,6 +38,27 @@
 
   public static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
 
+  public static final String CREATE_ANOMALY_METRICS_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS %s " +
+      "(UUID BINARY(20) NOT NULL, " +
+      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+      "METRICS VARCHAR, " +
+      "METHOD VARCHAR, " +
+      "ANOMALY_SCORE DOUBLE CONSTRAINT pk " +
+      "PRIMARY KEY (UUID, SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='%s'";
+
+  public static final String CREATE_TREND_ANOMALY_METRICS_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS %s " +
+      "(UUID BINARY(20) NOT NULL, " +
+      "TEST_START_TIME UNSIGNED_LONG NOT NULL, " +
+      "TEST_END_TIME UNSIGNED_LONG NOT NULL, " +
+      "TRAIN_START_TIME UNSIGNED_LONG, " +
+      "TRAIN_END_TIME UNSIGNED_LONG, " +
+      "METRICS VARCHAR, " +
+      "METHOD VARCHAR, " +
+      "ANOMALY_SCORE DOUBLE CONSTRAINT pk " +
+      "PRIMARY KEY (UUID, TEST_START_TIME, TEST_END_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='%s'";
+
   /**
    * Create table to store individual metric records.
    */
@@ -146,6 +168,25 @@
    */
   public static final String ALTER_SQL = "ALTER TABLE %s SET TTL=%s";
 
+  public static final String UPSERT_ANOMALY_METRICS_SQL = "UPSERT INTO %s " +
+    "(UUID, " +
+    "SERVER_TIME, " +
+    "METRICS, " +
+    "METHOD, " +
+    "ANOMALY_SCORE) VALUES " +
+    "(?, ?, ?, ?, ?)";
+
+  public static final String UPSERT_TREND_ANOMALY_METRICS_SQL = "UPSERT INTO %s " +
+    "(UUID, " +
+    "TEST_START_TIME, " +
+    "TEST_END_TIME, " +
+    "TRAIN_START_TIME, " +
+    "TRAIN_END_TIME, " +
+    "METRICS, " +
+    "METHOD, " +
+    "ANOMALY_SCORE) VALUES " +
+    "(?, ?, ?, ?, ?, ?, ?, ?)";
+
   /**
    * Insert into metric records table.
    */
@@ -221,6 +262,22 @@
   public static final String UPSERT_INSTANCE_HOST_METADATA_SQL =
     "UPSERT INTO INSTANCE_HOST_METADATA (INSTANCE_ID, HOSTNAME) VALUES (?, ?)";
 
+  public static final String GET_ANOMALY_METRIC_SQL = "SELECT UUID, SERVER_TIME, " +
+    "METRICS, " +
+    "METHOD, " +
+    "ANOMALY_SCORE " +
+    "FROM %s " +
+    "WHERE METHOD = ? AND SERVER_TIME > ? AND SERVER_TIME <= ? ORDER BY ANOMALY_SCORE DESC";
+
+  public static final String GET_TREND_ANOMALY_METRIC_SQL = "SELECT UUID, " +
+    "TEST_START_TIME, TEST_END_TIME, " +
+    "TRAIN_START_TIME, TRAIN_END_TIME, " +
+    "METRICS, " +
+    "METHOD, " +
+    "ANOMALY_SCORE " +
+    "FROM %s " +
+    "WHERE METHOD = ? AND TEST_END_TIME > ? AND TEST_END_TIME <= ? ORDER BY ANOMALY_SCORE DESC";
+
   /**
    * Retrieve a set of rows from metrics records table.
    */
@@ -345,6 +402,9 @@
     "MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE METRIC_NAME LIKE %s AND SERVER_TIME > %s AND " +
     "SERVER_TIME <= %s GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS";
 
+  public static final String ANOMALY_METRICS_TABLE_NAME = "METRIC_ANOMALIES";
+  public static final String TREND_ANOMALY_METRICS_TABLE_NAME = "TREND_METRIC_ANOMALIES";
+
   public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
 
   public static final String CONTAINER_METRICS_TABLE_NAME = "CONTAINER_METRICS";
@@ -407,6 +467,40 @@
     PhoenixTransactSQL.sortMergeJoinEnabled = sortMergeJoinEnabled;
   }
 
+  public static PreparedStatement prepareAnomalyMetricsGetSqlStatement(Connection connection, String method,
+                                                                       long startTime, long endTime, Integer limit) throws SQLException {
+    StringBuilder sb = new StringBuilder();
+    if (method.equals("ema") || method.equals("tukeys")) {
+      sb.append(String.format(GET_ANOMALY_METRIC_SQL, ANOMALY_METRICS_TABLE_NAME));
+    } else {
+      sb.append(String.format(GET_TREND_ANOMALY_METRIC_SQL, TREND_ANOMALY_METRICS_TABLE_NAME));
+    }
+    if (limit != null) {
+      sb.append(" LIMIT " + limit);
+    }
+    PreparedStatement stmt = null;
+    try {
+      stmt = connection.prepareStatement(sb.toString());
+      int pos = 1;
+
+      stmt.setString(pos++, method);
+      stmt.setLong(pos++, startTime);
+      stmt.setLong(pos, endTime);
+      if (limit != null) {
+        stmt.setFetchSize(limit);
+      }
+
+    } catch (SQLException e) {
+      if (stmt != null) {
+        stmt.close();
+      }
+      throw e;
+    }
+
+    return stmt;
+  }
+
+
   public static PreparedStatement prepareGetMetricsSqlStmt(Connection connection,
                                                            Condition condition) throws SQLException {
 
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/MetricAnomalyDetectorTestService.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/MetricAnomalyDetectorTestService.java
new file mode 100644
index 0000000..6f7b14a
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/MetricAnomalyDetectorTestService.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.ambari.metrics.alertservice.prototype.MetricAnomalyDetectorTestInput;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+@Singleton
+@Path("/ws/v1/metrictestservice")
+public class MetricAnomalyDetectorTestService {
+
+  private static final Log LOG = LogFactory.getLog(MetricAnomalyDetectorTestService.class);
+
+  @Inject
+  public MetricAnomalyDetectorTestService() {
+  }
+
+  private void init(HttpServletResponse response) {
+    response.setContentType(null);
+  }
+
+  @Path("/anomaly")
+  @POST
+  @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public TimelinePutResponse postAnomalyDetectionRequest(
+    @Context HttpServletRequest req,
+    @Context HttpServletResponse res,
+    MetricAnomalyDetectorTestInput input) {
+
+    init(res);
+    if (input == null) {
+      return new TimelinePutResponse();
+    }
+
+    try {
+      return null;
+    } catch (Exception e) {
+      throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  @GET
+  @Path("/dataseries")
+  @Produces({MediaType.APPLICATION_JSON})
+  public TimelineMetrics getTestDataSeries(
+    @Context HttpServletRequest req,
+    @Context HttpServletResponse res,
+    @QueryParam("type") String seriesType,
+    @QueryParam("configs") String config
+  ) {
+    return null;
+  }
+}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
index 472a787..20aba23 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
@@ -24,7 +24,6 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
 import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
@@ -37,6 +36,7 @@
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.TestMetricSeriesGenerator;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.EntityIdentifier;
@@ -50,6 +50,7 @@
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
@@ -75,6 +76,10 @@
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER;
 
@@ -389,7 +394,7 @@
       }
 
       return timelineMetricStore.getTimelineMetrics(
-        parseListStr(metricNames, ","), parseListStr(hostname, ","), appId, instanceId,
+        parseListStr(metricNames, ","), parseListStr(hostname, ","), appId, parseStr(instanceId),
         parseLongStr(startTime), parseLongStr(endTime),
         Precision.getPrecision(precision), parseIntStr(limit),
         parseBoolean(grouped), parseTopNConfig(topN, topNFunction, isBottomN),
@@ -412,6 +417,25 @@
   }
 
   @GET
+  @Path("/metrics/anomalies")
+  @Produces({ MediaType.APPLICATION_JSON })
+  public TimelineMetrics getAnomalyMetrics(
+    @Context HttpServletRequest req,
+    @Context HttpServletResponse res,
+    @QueryParam("method") String method,
+    @QueryParam("startTime") String startTime,
+    @QueryParam("endTime") String endTime,
+    @QueryParam("limit") String limit
+    ) {
+    init(res);
+
+    try {
+      return timelineMetricStore.getAnomalyMetrics(method, parseLongStr(startTime), parseLongStr(endTime), parseIntStr(limit));
+    } catch (Exception e) {
+      throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+  @GET
   @Path("/metrics/metadata")
   @Produces({ MediaType.APPLICATION_JSON })
   public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(
@@ -660,6 +684,12 @@
   }
 
   private static String parseStr(String str) {
-    return str == null ? null : str.trim();
+    String trimmedInstance = (str == null) ? null : str.trim();
+    if (trimmedInstance != null) {
+      if (trimmedInstance.isEmpty() || trimmedInstance.equalsIgnoreCase("undefined")) {
+        trimmedInstance = null;
+      }
+    }
+    return trimmedInstance;
   }
 }
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
index 07e0daa..7c879e1 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
@@ -119,4 +119,9 @@
     return null;
   }
 
+  @Override
+  public TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) {
+    return null;
+  }
+
 }