AMBARI-21106 : ML-Prototype: Detect timeseries anomaly for a metric. (Refine PIT & Trend subsystems, Integrate with AMS, Ambari Alerts.)
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java
index 0c1c6fc..b98f04c 100644
--- a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java
@@ -76,7 +76,6 @@
JSONArray array = jsonObject.getJSONArray("items");
for(int i = 0 ; i < array.length() ; i++){
JSONObject alertDefn = array.getJSONObject(i).getJSONObject("AlertDefinition");
- LOG.info("alertDefn : " + alertDefn.get("name"));
if (alertDefn.get("name") != null && alertDefn.get("name").equals("point_in_time_metrics_anomalies")) {
JSONObject sourceNode = alertDefn.getJSONObject("source");
JSONArray params = sourceNode.getJSONArray("parameters");
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java
index 7735d6c..61b3dee 100644
--- a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java
@@ -37,6 +37,12 @@
import scala.Tuple2;
import java.util.*;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
public class MetricSparkConsumer {
@@ -47,38 +53,75 @@
private static long pitStartTime = System.currentTimeMillis();
private static long ksStartTime = pitStartTime;
private static long hdevStartTime = ksStartTime;
+ private static Set<Pattern> includeMetricPatterns = new HashSet<>();
+ private static Set<String> includedHosts = new HashSet<>();
+ private static Set<TrendMetric> trendMetrics = new HashSet<>();
public MetricSparkConsumer() {
}
+ public static Properties readProperties(String propertiesFile) {
+ try {
+ Properties properties = new Properties();
+ InputStream inputStream = ClassLoader.getSystemResourceAsStream(propertiesFile);
+ if (inputStream == null) {
+ inputStream = new FileInputStream(propertiesFile);
+ }
+ properties.load(inputStream);
+ return properties;
+ } catch (IOException ioEx) {
+ LOG.error("Error reading properties file for jmeter");
+ return null;
+ }
+ }
+
public static void main(String[] args) throws InterruptedException {
- if (args.length < 5) {
- System.err.println("Usage: MetricSparkConsumer <appid1,appid2> <collector_host> <port> <protocol> <zkQuorum>");
+ if (args.length < 1) {
+ System.err.println("Usage: MetricSparkConsumer <input-config-file>");
System.exit(1);
}
- List<String> appIds = Arrays.asList(args[0].split(","));
- String collectorHost = args[1];
- String collectorPort = args[2];
- String collectorProtocol = args[3];
- String zkQuorum = args[4];
+ Properties properties = readProperties(args[0]);
- double emaW = StringUtils.isNotEmpty(args[5]) ? Double.parseDouble(args[5]) : 0.5;
- double emaN = StringUtils.isNotEmpty(args[8]) ? Double.parseDouble(args[6]) : 3;
- double tukeysN = StringUtils.isNotEmpty(args[7]) ? Double.parseDouble(args[7]) : 3;
+ List<String> appIds = Arrays.asList(properties.getProperty("appIds").split(","));
- long pitTestInterval = StringUtils.isNotEmpty(args[8]) ? Long.parseLong(args[8]) : 5 * 60 * 1000;
- long pitTrainInterval = StringUtils.isNotEmpty(args[9]) ? Long.parseLong(args[9]) : 15 * 60 * 1000;
+ String collectorHost = properties.getProperty("collectorHost");
+ String collectorPort = properties.getProperty("collectorPort");
+ String collectorProtocol = properties.getProperty("collectorProtocol");
- String fileName = args[10];
- long ksTestInterval = StringUtils.isNotEmpty(args[11]) ? Long.parseLong(args[11]) : 10 * 60 * 1000;
- long ksTrainInterval = StringUtils.isNotEmpty(args[12]) ? Long.parseLong(args[12]) : 10 * 60 * 1000;
- int hsdevNhp = StringUtils.isNotEmpty(args[13]) ? Integer.parseInt(args[13]) : 3;
- long hsdevInterval = StringUtils.isNotEmpty(args[14]) ? Long.parseLong(args[14]) : 30 * 60 * 1000;
+ String zkQuorum = properties.getProperty("zkQuorum");
- String ambariServerHost = args[15];
- String clusterName = args[16];
+ double emaW = Double.parseDouble(properties.getProperty("emaW"));
+ double emaN = Double.parseDouble(properties.getProperty("emaN"));
+ int emaThreshold = Integer.parseInt(properties.getProperty("emaThreshold"));
+ double tukeysN = Double.parseDouble(properties.getProperty("tukeysN"));
+
+ long pitTestInterval = Long.parseLong(properties.getProperty("pointInTimeTestInterval"));
+ long pitTrainInterval = Long.parseLong(properties.getProperty("pointInTimeTrainInterval"));
+
+ long ksTestInterval = Long.parseLong(properties.getProperty("ksTestInterval"));
+ long ksTrainInterval = Long.parseLong(properties.getProperty("ksTrainInterval"));
+ int hsdevNhp = Integer.parseInt(properties.getProperty("hsdevNhp"));
+ long hsdevInterval = Long.parseLong(properties.getProperty("hsdevInterval"));
+
+ String ambariServerHost = properties.getProperty("ambariServerHost");
+ String clusterName = properties.getProperty("clusterName");
+
+ String includeMetricPatternStrings = properties.getProperty("includeMetricPatterns");
+ if (includeMetricPatternStrings != null && !includeMetricPatternStrings.isEmpty()) {
+ String[] patterns = includeMetricPatternStrings.split(",");
+ for (String p : patterns) {
+ LOG.info("Included Pattern : " + p);
+ includeMetricPatterns.add(Pattern.compile(p));
+ }
+ }
+
+ String includedHostList = properties.getProperty("hosts");
+ if (includedHostList != null && !includedHostList.isEmpty()) {
+ String[] hosts = includedHostList.split(",");
+ includedHosts.addAll(Arrays.asList(hosts));
+ }
MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort);
@@ -86,7 +129,7 @@
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
- EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN);
+ EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN, emaThreshold);
PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(metricsCollectorInterface,
tukeysN,
pitTestInterval,
@@ -97,13 +140,14 @@
TrendADSystem trendADSystem = new TrendADSystem(metricsCollectorInterface,
ksTestInterval,
ksTrainInterval,
- hsdevNhp,
- fileName);
+ hsdevNhp);
Broadcast<EmaTechnique> emaTechniqueBroadcast = jssc.sparkContext().broadcast(emaTechnique);
Broadcast<PointInTimeADSystem> pointInTimeADSystemBroadcast = jssc.sparkContext().broadcast(pointInTimeADSystem);
Broadcast<TrendADSystem> trendADSystemBroadcast = jssc.sparkContext().broadcast(trendADSystem);
Broadcast<MetricsCollectorInterface> metricsCollectorInterfaceBroadcast = jssc.sparkContext().broadcast(metricsCollectorInterface);
+ Broadcast<Set<Pattern>> includePatternBroadcast = jssc.sparkContext().broadcast(includeMetricPatterns);
+ Broadcast<Set<String>> includedHostBroadcast = jssc.sparkContext().broadcast(includedHosts);
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads));
@@ -150,7 +194,7 @@
if (currentTime > ksStartTime + ksTestInterval) {
LOG.info("Running KS Test....");
- trendADSystemBroadcast.getValue().runKSTest(currentTime);
+ trendADSystemBroadcast.getValue().runKSTest(currentTime, trendMetrics);
ksStartTime = ksStartTime + ksTestInterval;
}
@@ -162,8 +206,27 @@
TimelineMetrics metrics = tuple2._2();
for (TimelineMetric timelineMetric : metrics.getMetrics()) {
- List<MetricAnomaly> anomalies = ema.test(timelineMetric);
- metricsCollectorInterfaceBroadcast.getValue().publish(anomalies);
+
+ boolean includeHost = includedHostBroadcast.getValue().contains(timelineMetric.getHostName());
+ boolean includeMetric = false;
+ if (includeHost) {
+ if (includePatternBroadcast.getValue().isEmpty()) {
+ includeMetric = true;
+ }
+ for (Pattern p : includePatternBroadcast.getValue()) {
+ Matcher m = p.matcher(timelineMetric.getMetricName());
+ if (m.find()) {
+ includeMetric = true;
+ }
+ }
+ }
+
+ if (includeMetric) {
+ trendMetrics.add(new TrendMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(),
+ timelineMetric.getHostName()));
+ List<MetricAnomaly> anomalies = ema.test(timelineMetric);
+ metricsCollectorInterfaceBroadcast.getValue().publish(anomalies);
+ }
}
});
});
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java
index 7b3f63d..dab4a0a 100644
--- a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java
@@ -96,7 +96,7 @@
emitMetrics(timelineMetrics);
}
} else {
- LOG.info("No anomalies to send.");
+ LOG.debug("No anomalies to send.");
}
}
@@ -130,7 +130,7 @@
public boolean emitMetrics(TimelineMetrics metrics) {
String connectUrl = constructTimelineMetricUri();
String jsonData = null;
- LOG.info("EmitMetrics connectUrl = " + connectUrl);
+ LOG.debug("EmitMetrics connectUrl = " + connectUrl);
try {
jsonData = mapper.writeValueAsString(metrics);
LOG.info(jsonData);
@@ -202,7 +202,7 @@
String url = constructTimelineMetricUri() + "?metricNames=" + metricName + "&appId=" + appId +
"&hostname=" + hostname + "&startTime=" + startime + "&endTime=" + endtime;
- LOG.info("Fetch metrics URL : " + url);
+ LOG.debug("Fetch metrics URL : " + url);
URL obj = null;
BufferedReader in = null;
@@ -213,8 +213,8 @@
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
con.setRequestMethod("GET");
int responseCode = con.getResponseCode();
- LOG.info("Sending 'GET' request to URL : " + url);
- LOG.info("Response Code : " + responseCode);
+ LOG.debug("Sending 'GET' request to URL : " + url);
+ LOG.debug("Response Code : " + responseCode);
in = new BufferedReader(
new InputStreamReader(con.getInputStream()));
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java
index b4a8593..b3e7bd3 100644
--- a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java
@@ -49,7 +49,7 @@
private AmbariServerInterface ambariServerInterface;
private int sensitivity = 50;
private int minSensitivity = 0;
- private int maxSensitivity = 10;
+ private int maxSensitivity = 100;
public PointInTimeADSystem(MetricsCollectorInterface metricsCollectorInterface, double defaultTukeysN,
long testIntervalMillis, long trainIntervalMillis, String ambariServerHost, String clusterName) {
@@ -73,13 +73,13 @@
if (requiredSensivity > sensitivity) {
int targetSensitivity = Math.min(maxSensitivity, requiredSensivity);
while (sensitivity < targetSensitivity) {
- defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.1;
+ defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.05;
sensitivity++;
}
} else {
int targetSensitivity = Math.max(minSensitivity, requiredSensivity);
while (sensitivity > targetSensitivity) {
- defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.1;
+ defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.05;
sensitivity--;
}
}
@@ -201,10 +201,10 @@
if (recall < 0.5) {
LOG.info("Increasing EMA sensitivity by 10%");
- emaModel.updateModel(true, 10);
+ emaModel.updateModel(true, 5);
} else if (precision < 0.5) {
LOG.info("Decreasing EMA sensitivity by 10%");
- emaModel.updateModel(false, 10);
+ emaModel.updateModel(false, 5);
}
}
@@ -233,7 +233,7 @@
double[] anomalyScore = result.resultset.get(2);
for (int i = 0; i < ts.length; i++) {
TimelineMetric timelineMetric = new TimelineMetric();
- timelineMetric.setMetricName(metricName + "_" + appId + "_" + hostname);
+ timelineMetric.setMetricName(metricName + ":" + appId + ":" + hostname);
timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-tukeys");
timelineMetric.setInstanceId(null);
@@ -243,7 +243,11 @@
HashMap<String, String> metadata = new HashMap<>();
metadata.put("method", "tukeys");
- metadata.put("anomaly-score", String.valueOf(anomalyScore[i]));
+ if (String.valueOf(anomalyScore[i]).equals("infinity")) {
+ LOG.info("Got anomalyScore = infinity for " + metricName + ":" + appId + ":" + hostname);
+ } else {
+ metadata.put("anomaly-score", String.valueOf(anomalyScore[i]));
+ }
timelineMetric.setMetadata(metadata);
timelineMetric.setMetricValues(metricValues);
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java
index 1534b55..df36a4a 100644
--- a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java
@@ -31,11 +31,11 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
public class TrendADSystem implements Serializable {
@@ -57,8 +57,7 @@
public TrendADSystem(MetricsCollectorInterface metricsCollectorInterface,
long ksTestIntervalMillis,
long ksTrainIntervalMillis,
- int hsdevNumHistoricalPeriods,
- String inputFileName) {
+ int hsdevNumHistoricalPeriods) {
this.metricsCollectorInterface = metricsCollectorInterface;
this.ksTestIntervalMillis = ksTestIntervalMillis;
@@ -69,11 +68,9 @@
this.hsdevTechnique = new HsdevTechnique();
trendMetrics = new ArrayList<>();
- this.inputFile = inputFileName;
- readInputFile(inputFileName);
}
- public void runKSTest(long currentEndTime) {
+ public void runKSTest(long currentEndTime, Set<TrendMetric> trendMetrics) {
readInputFile(inputFile);
long ksTestIntervalStartTime = currentEndTime - ksTestIntervalMillis;
@@ -85,7 +82,7 @@
String metricName = metric.metricName;
String appId = metric.appId;
String hostname = metric.hostname;
- String key = metricName + "_" + appId + "_" + hostname;
+ String key = metricName + ":" + appId + ":" + hostname;
TimelineMetrics ksData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, ksTestIntervalStartTime - ksTrainIntervalMillis,
currentEndTime);
@@ -112,6 +109,7 @@
}
}
+ LOG.info("Train Data size : " + trainDataList.size() + ", Test Data Size : " + testDataList.size());
if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
LOG.info("Not enough train/test data to perform KS analysis.");
continue;
@@ -184,6 +182,7 @@
return timelineMetric;
}
+
public void runHsdevMethod() {
List<TimelineMetric> hsdevMetricAnomalies = new ArrayList<>();
@@ -315,17 +314,4 @@
this.hostname = hostname;
}
}
-
- /*
- boolean isPresent = false;
- for (TrendMetric trendMetric : trendMetrics) {
- if (trendMetric.metricName.equalsIgnoreCase(splits[0])) {
- isPresent = true;
- }
- }
- if (!isPresent) {
- LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]);
- trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2]));
- }
- */
}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
index 5e1f76b..a31410d 100644
--- a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
@@ -23,6 +23,8 @@
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
+import static org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique.suppressAnomaliesTheshold;
+
@XmlRootElement
public class EmaModel implements Serializable {
@@ -35,7 +37,6 @@
private double timessdev;
private int ctr = 0;
- private static final int suppressAnomaliesTheshold = 30;
private static final Log LOG = LogFactory.getLog(EmaModel.class);
@@ -64,30 +65,36 @@
public double testAndUpdate(double metricValue) {
double anomalyScore = 0.0;
+ LOG.info("Before Update ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + ", timessdev = " + timessdev);
+ update(metricValue);
if (ctr > suppressAnomaliesTheshold) {
anomalyScore = test(metricValue);
- }
- if (Math.abs(anomalyScore) < 2 * timessdev) {
- update(metricValue);
+ if (anomalyScore > 0.0) {
+ LOG.info("Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems +
+ ", timessdev = " + timessdev + ", metricValue = " + metricValue);
+ } else {
+ LOG.info("Not an Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems +
+ ", timessdev = " + timessdev + ", metricValue = " + metricValue);
+ }
} else {
- LOG.info("Not updating model for this value");
+ ctr++;
+ if (ctr > suppressAnomaliesTheshold) {
+ LOG.info("Ema Model for " + metricName + ":" + appId + ":" + hostname + " is ready for testing data.");
+ }
}
- ctr++;
- LOG.info("Counter : " + ctr);
- LOG.info("Anomaly Score for " + metricValue + " : " + anomalyScore);
return anomalyScore;
}
public void update(double metricValue) {
ema = weight * ema + (1 - weight) * metricValue;
ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0));
- LOG.info("In update : ema = " + ema + ", ems = " + ems);
+ LOG.debug("In update : ema = " + ema + ", ems = " + ems);
}
public double test(double metricValue) {
- LOG.info("In test : ema = " + ema + ", ems = " + ems);
+ LOG.debug("In test : ema = " + ema + ", ems = " + ems);
double diff = Math.abs(ema - metricValue) - (timessdev * ems);
- LOG.info("diff = " + diff);
+ LOG.debug("diff = " + diff);
if (diff > 0) {
return Math.abs((metricValue - ema) / ems); //Z score
} else {
@@ -102,7 +109,7 @@
delta = delta * -1;
}
this.timessdev = timessdev + delta * timessdev;
- this.weight = Math.min(1.0, weight + delta * weight);
+ //this.weight = Math.min(1.0, weight + delta * weight);
LOG.info("New model parameters " + metricName + " : timessdev = " + timessdev + ", weight = " + weight);
}
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
index c005e6f..52c6cf3 100644
--- a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
@@ -49,6 +49,15 @@
private double startingWeight = 0.5;
private double startTimesSdev = 3.0;
private String methodType = "ema";
+ public static int suppressAnomaliesTheshold = 100;
+
+ public EmaTechnique(double startingWeight, double startTimesSdev, int suppressAnomaliesTheshold) {
+ trackedEmas = new HashMap<>();
+ this.startingWeight = startingWeight;
+ this.startTimesSdev = startTimesSdev;
+ EmaTechnique.suppressAnomaliesTheshold = suppressAnomaliesTheshold;
+ LOG.info("New EmaTechnique......");
+ }
public EmaTechnique(double startingWeight, double startTimesSdev) {
trackedEmas = new HashMap<>();
@@ -61,16 +70,16 @@
String metricName = metric.getMetricName();
String appId = metric.getAppId();
String hostname = metric.getHostName();
- String key = metricName + "_" + appId + "_" + hostname;
+ String key = metricName + ":" + appId + ":" + hostname;
EmaModel emaModel = trackedEmas.get(key);
if (emaModel == null) {
- LOG.info("EmaModel not present for " + key);
- LOG.info("Number of tracked Emas : " + trackedEmas.size());
+ LOG.debug("EmaModel not present for " + key);
+ LOG.debug("Number of tracked Emas : " + trackedEmas.size());
emaModel = new EmaModel(metricName, hostname, appId, startingWeight, startTimesSdev);
trackedEmas.put(key, emaModel);
} else {
- LOG.info("EmaModel already present for " + key);
+ LOG.debug("EmaModel already present for " + key);
}
List<MetricAnomaly> anomalies = new ArrayList<>();
@@ -79,11 +88,11 @@
double metricValue = metric.getMetricValues().get(timestamp);
double anomalyScore = emaModel.testAndUpdate(metricValue);
if (anomalyScore > 0.0) {
- LOG.info("Found anomaly for : " + key);
+ LOG.info("Found anomaly for : " + key + ", anomalyScore = " + anomalyScore);
MetricAnomaly metricAnomaly = new MetricAnomaly(key, timestamp, metricValue, methodType, anomalyScore);
anomalies.add(metricAnomaly);
} else {
- LOG.info("Discarding non-anomaly for : " + key);
+ LOG.debug("Discarding non-anomaly for : " + key);
}
}
return anomalies;
diff --git a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
index 50bf9f2..04f4a73 100644
--- a/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
+++ b/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
@@ -58,19 +58,23 @@
double historicMedian = median(trainData.values);
double currentMedian = median(testData.values);
- double diff = Math.abs(currentMedian - historicMedian);
- LOG.info("Found anomaly for metric : " + key + " in the period ending " + new Date((long)testData.ts[testLength - 1]));
- LOG.info("Current median = " + currentMedian + ", Historic Median = " + historicMedian + ", HistoricSd = " + historicSd);
- if (diff > n * historicSd) {
- double zScore = diff / historicSd;
- LOG.info("Z Score of current series : " + zScore);
- return new MetricAnomaly(key,
- (long) testData.ts[testLength - 1],
- testData.values[testLength - 1],
- methodType,
- zScore);
+ if (historicSd > 0) {
+ double diff = Math.abs(currentMedian - historicMedian);
+ LOG.info("Found anomaly for metric : " + key + " in the period ending " + new Date((long)testData.ts[testLength - 1]));
+ LOG.info("Current median = " + currentMedian + ", Historic Median = " + historicMedian + ", HistoricSd = " + historicSd);
+
+ if (diff > n * historicSd) {
+ double zScore = diff / historicSd;
+ LOG.info("Z Score of current series : " + zScore);
+ return new MetricAnomaly(key,
+ (long) testData.ts[testLength - 1],
+ testData.values[testLength - 1],
+ methodType,
+ zScore);
+ }
}
+
return null;
}
diff --git a/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r b/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
index f33b6ec..0312226 100644
--- a/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
+++ b/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
@@ -26,20 +26,23 @@
anomalies <- data.frame()
quantiles <- quantile(train_data[,2])
iqr <- quantiles[4] - quantiles[2]
+ niqr <- 0
for ( i in 1:length(test_data[,1])) {
x <- test_data[i,2]
lb <- quantiles[2] - n*iqr
ub <- quantiles[4] + n*iqr
if ( (x < lb) || (x > ub) ) {
- if (x < lb) {
- niqr <- (quantiles[2] - x) / iqr
- } else {
- niqr <- (x - quantiles[4]) / iqr
+ if (iqr != 0) {
+ if (x < lb) {
+ niqr <- (quantiles[2] - x) / iqr
+ } else {
+ niqr <- (x - quantiles[4]) / iqr
+ }
}
- anomaly <- c(test_data[i,1], x, niqr)
- anomalies <- rbind(anomalies, anomaly)
- }
+ anomaly <- c(test_data[i,1], x, niqr)
+ anomalies <- rbind(anomalies, anomaly)
+ }
}
if(length(anomalies) > 0) {
names(anomalies) <- c("TS", "Value", "niqr")
diff --git a/ambari-metrics-alertservice/src/main/resources/input-config.properties b/ambari-metrics-alertservice/src/main/resources/input-config.properties
new file mode 100644
index 0000000..88304c7
--- /dev/null
+++ b/ambari-metrics-alertservice/src/main/resources/input-config.properties
@@ -0,0 +1,24 @@
+appIds=HOST
+
+collectorHost=localhost
+collectorPort=6188
+collectorProtocol=http
+
+zkQuorum=localhost:2181
+
+ambariServerHost=localhost
+clusterName=c1
+
+emaW=0.8
+emaN=3
+tukeysN=3
+pointInTimeTestInterval=300000
+pointInTimeTrainInterval=900000
+
+ksTestInterval=600000
+ksTrainInterval=600000
+hsdevNhp=3
+hsdevInterval=1800000;
+
+skipMetricPatterns=sdisk*,cpu_sintr*,proc*,disk*,boottime
+hosts=avijayan-ad-1.openstacklocal
\ No newline at end of file
diff --git a/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java b/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
index 539ca40..d1e2b41 100644
--- a/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
+++ b/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
@@ -21,21 +21,41 @@
import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
import java.util.List;
import java.util.TreeMap;
+import static org.apache.ambari.metrics.alertservice.prototype.TestRFunctionInvoker.getTS;
+
public class TestEmaTechnique {
+ private static double[] ts;
+ private static String fullFilePath;
+
+ @BeforeClass
+ public static void init() throws URISyntaxException {
+
+ Assume.assumeTrue(System.getenv("R_HOME") != null);
+ ts = getTS(1000);
+ URL url = ClassLoader.getSystemResource("R-scripts");
+ fullFilePath = new File(url.toURI()).getAbsolutePath();
+ RFunctionInvoker.setScriptsDir(fullFilePath);
+ }
+
@Test
public void testEmaInitialization() {
EmaTechnique ema = new EmaTechnique(0.5, 3);
Assert.assertTrue(ema.getTrackedEmas().isEmpty());
Assert.assertTrue(ema.getStartingWeight() == 0.5);
- Assert.assertTrue(ema.getStartTimesSdev() == 3);
+ Assert.assertTrue(ema.getStartTimesSdev() == 2);
}
@Test
diff --git a/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java b/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
index bb409cf..ef0125f 100644
--- a/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
+++ b/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
@@ -21,7 +21,6 @@
import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
diff --git a/ambari-metrics-grafana/src/main/scripted.js b/ambari-metrics-grafana/src/main/scripted.js
new file mode 100644
index 0000000..298535f
--- /dev/null
+++ b/ambari-metrics-grafana/src/main/scripted.js
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* global _ */
+
+/*
+ * Complex scripted dashboard
+ * This script generates a dashboard object that Grafana can load. It also takes a number of user
+ * supplied URL parameters (in the ARGS variable)
+ *
+ * Return a dashboard object, or a function
+ *
+ * For async scripts, return a function, this function must take a single callback function as argument,
+ * call this callback function with the dashboard object (look at scripted_async.js for an example)
+ */
+
+'use strict';
+
+// accessible variables in this scope
+var window, document, ARGS, $, jQuery, moment, kbn;
+
+// Setup some variables
+var dashboard;
+
+// All url parameters are available via the ARGS object
+var ARGS;
+
+// Intialize a skeleton with nothing but a rows array and service object
+dashboard = {
+ rows : [],
+};
+
+// Set a title
+dashboard.title = 'Scripted dash';
+
+// Set default time
+// time can be overriden in the url using from/to parameters, but this is
+// handled automatically in grafana core during dashboard initialization
+
+
+var obj = JSON.parse(ARGS.anomalies);
+var metrics = obj.metrics;
+var rows = metrics.length
+
+dashboard.time = {
+ from: "now-1h",
+ to: "now"
+};
+
+var metricSet = new Set();
+
+for (var i = 0; i < rows; i++) {
+
+ var key = metrics[i].metricname;
+ if (metricSet.has(key)) {
+ continue;
+ }
+ metricSet.add(key)
+ var metricKeyElements = key.split(":");
+ var metricName = metricKeyElements[0];
+ var appId = metricKeyElements[1];
+ var hostname = metricKeyElements[2];
+
+ dashboard.rows.push({
+ title: 'Chart',
+ height: '300px',
+ panels: [
+ {
+ title: metricName,
+ type: 'graph',
+ span: 12,
+ fill: 1,
+ linewidth: 2,
+ targets: [
+ {
+ "aggregator": "none",
+ "alias": metricName,
+ "app": appId,
+ "errors": {},
+ "metric": metricName,
+ "precision": "default",
+ "refId": "A",
+ "hosts": hostname
+ }
+ ],
+ seriesOverrides: [
+ {
+ alias: '/random/',
+ yaxis: 2,
+ fill: 0,
+ linewidth: 5
+ }
+ ],
+ tooltip: {
+ shared: true
+ }
+ }
+ ]
+ });
+}
+
+
+return dashboard;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/TestMetricSeriesGenerator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/TestMetricSeriesGenerator.java
new file mode 100644
index 0000000..2420ef3
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/TestMetricSeriesGenerator.java
@@ -0,0 +1,87 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics;
+
+import org.apache.ambari.metrics.alertservice.prototype.TestSeriesInputRequest;
+import org.apache.ambari.metrics.alertservice.seriesgenerator.AbstractMetricSeries;
+import org.apache.ambari.metrics.alertservice.seriesgenerator.MetricSeriesGeneratorFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class TestMetricSeriesGenerator implements Runnable {
+
+ private Map<TestSeriesInputRequest, AbstractMetricSeries> configuredSeries = new HashMap<>();
+ private static final Log LOG = LogFactory.getLog(TestMetricSeriesGenerator.class);
+ private TimelineMetricStore metricStore;
+ private String hostname;
+
+ public TestMetricSeriesGenerator(TimelineMetricStore metricStore) {
+ this.metricStore = metricStore;
+ try {
+ this.hostname = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void addSeries(TestSeriesInputRequest inputRequest) {
+ if (!configuredSeries.containsKey(inputRequest)) {
+ AbstractMetricSeries metricSeries = MetricSeriesGeneratorFactory.generateSeries(inputRequest.getSeriesType(), inputRequest.getConfigs());
+ configuredSeries.put(inputRequest, metricSeries);
+ LOG.info("Added series " + inputRequest.getSeriesName());
+ }
+ }
+
+ public void removeSeries(String seriesName) {
+ boolean isPresent = false;
+ TestSeriesInputRequest tbd = null;
+ for (TestSeriesInputRequest inputRequest : configuredSeries.keySet()) {
+ if (inputRequest.getSeriesName().equals(seriesName)) {
+ isPresent = true;
+ tbd = inputRequest;
+ }
+ }
+ if (isPresent) {
+ LOG.info("Removing series " + seriesName);
+ configuredSeries.remove(tbd);
+ } else {
+ LOG.info("Series not found : " + seriesName);
+ }
+ }
+
+ @Override
+ public void run() {
+ long currentTime = System.currentTimeMillis();
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+
+ for (TestSeriesInputRequest input : configuredSeries.keySet()) {
+ AbstractMetricSeries metricSeries = configuredSeries.get(input);
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(input.getSeriesName());
+ timelineMetric.setAppId("anomaly-engine-test-metric");
+ timelineMetric.setInstanceId(null);
+ timelineMetric.setStartTime(currentTime);
+ timelineMetric.setHostName(hostname);
+ TreeMap<Long, Double> metricValues = new TreeMap();
+ metricValues.put(currentTime, metricSeries.nextValue());
+ timelineMetric.setMetricValues(metricValues);
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+ LOG.info("Emitting metric with appId = " + timelineMetric.getAppId());
+ }
+ try {
+ LOG.info("Publishing test metrics for " + timelineMetrics.getMetrics().size() + " series.");
+ metricStore.putMetrics(timelineMetrics);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ }
+}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
index 95682f9..4450d65 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
@@ -157,6 +157,10 @@
"start cache node", e);
}
}
+// String kafkaServers = configuration.getKafkaServers();
+// if (kafkaServers != null) {
+// metricKafkaProducer = new MetricKafkaProducer(kafkaServers);
+// }
defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20"));
if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
@@ -235,6 +239,11 @@
}
@Override
+ public TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) throws SQLException {
+ return hBaseAccessor.getAnomalyMetricRecords(method, startTime, endTime, limit);
+ }
+
+ @Override
public TimelineMetrics getTimelineMetrics(List<String> metricNames,
List<String> hostnames, String applicationId, String instanceId,
Long startTime, Long endTime, Precision precision, Integer limit,
@@ -403,10 +412,17 @@
cache.putMetrics(metrics.getMetrics(), metricMetadataManager);
}
+// try {
+// metricKafkaProducer.sendMetrics(metrics);
+//// if (metrics.getMetrics().size() != 0 && metrics.getMetrics().get(0).getAppId().equals("anomaly-engine-test-metric")) {
+//// }
+// } catch (Exception e) {
+// LOG.error(e);
+// }
+
return response;
}
-
@Override
public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
throws SQLException, IOException {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index da14fd1..f470c58 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATE_TABLE_SPLIT_POINTS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATORS_SKIP_BLOCK_CACHE;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
@@ -35,7 +34,6 @@
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_SPLIT_POINTS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES;
@@ -50,15 +48,18 @@
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_METRICS_METADATA_TABLE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ANOMALY_METRICS_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_INSTANCE_HOST_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_ANOMALY_METRICS_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_METADATA_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_TREND_ANOMALY_METRICS_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_HOSTED_APPS_METADATA_SQL;
@@ -73,7 +74,9 @@
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.TREND_ANOMALY_METRICS_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_ANOMALY_METRICS_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL;
@@ -81,6 +84,7 @@
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_INSTANCE_HOST_METADATA_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_TREND_ANOMALY_METRICS_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME.RAW_METRICS;
import java.io.IOException;
@@ -309,11 +313,63 @@
commitMetrics(Collections.singletonList(timelineMetrics));
}
+ private void commitAnomalyMetric(Connection conn, TimelineMetric metric) {
+ PreparedStatement metricRecordStmt = null;
+ try {
+
+ Map<String, String> metricMetadata = metric.getMetadata();
+
+
+ byte[] uuid = metadataManagerInstance.getUuid(metric);
+ if (uuid == null) {
+ LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString());
+ return;
+ }
+
+ if (metric.getAppId().equals("anomaly-engine-ks") || metric.getAppId().equals("anomaly-engine-hsdev")) {
+ metricRecordStmt = conn.prepareStatement(String.format(UPSERT_TREND_ANOMALY_METRICS_SQL,
+ TREND_ANOMALY_METRICS_TABLE_NAME));
+
+ metricRecordStmt.setBytes(1, uuid);
+ metricRecordStmt.setLong(2, metric.getStartTime());
+ metricRecordStmt.setLong(3, Long.parseLong(metricMetadata.get("test-start-time")));
+ metricRecordStmt.setLong(4, Long.parseLong(metricMetadata.get("train-start-time")));
+ metricRecordStmt.setLong(5, Long.parseLong(metricMetadata.get("train-end-time")));
+ String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+ metricRecordStmt.setString(6, json);
+ metricRecordStmt.setString(7, metric.getMetadata().get("method"));
+ double anomalyScore = metric.getMetadata().containsKey("anomaly-score") ? Double.parseDouble(metric.getMetadata().get("anomaly-score")) : 0.0;
+ metricRecordStmt.setDouble(8, anomalyScore);
+
+ } else {
+ metricRecordStmt = conn.prepareStatement(String.format(
+ UPSERT_ANOMALY_METRICS_SQL, ANOMALY_METRICS_TABLE_NAME));
+
+ metricRecordStmt.setBytes(1, uuid);
+ metricRecordStmt.setLong(2, metric.getStartTime());
+ String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+ metricRecordStmt.setString(3, json);
+ metricRecordStmt.setString(4, metric.getMetadata().get("method"));
+ double anomalyScore = metric.getMetadata().containsKey("anomaly-score") ? Double.parseDouble(metric.getMetadata().get("anomaly-score")) : 0.0;
+ metricRecordStmt.setDouble(5, anomalyScore);
+ }
+
+ try {
+ metricRecordStmt.executeUpdate();
+ } catch (SQLException sql) {
+ LOG.error("Failed on insert records to store.", sql);
+ }
+
+ } catch (Exception e) {
+ LOG.error("Failed on insert records to anomaly table.", e);
+ }
+
+ }
+
public void commitMetrics(Collection<TimelineMetrics> timelineMetricsCollection) {
LOG.debug("Committing metrics to store");
Connection conn = null;
PreparedStatement metricRecordStmt = null;
- long currentTime = System.currentTimeMillis();
try {
conn = getConnection();
@@ -321,6 +377,10 @@
UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
for (TimelineMetrics timelineMetrics : timelineMetricsCollection) {
for (TimelineMetric metric : timelineMetrics.getMetrics()) {
+ if (metric.getAppId().startsWith("anomaly-engine") && !metric.getAppId().equals("anomaly-engine-test-metric")) {
+ commitAnomalyMetric(conn, metric);
+ }
+
metricRecordStmt.clearParameters();
if (LOG.isTraceEnabled()) {
@@ -469,6 +529,20 @@
stmt.executeUpdate( String.format(CREATE_CONTAINER_METRICS_TABLE_SQL,
encoding, tableTTL.get(CONTAINER_METRICS_TABLE_NAME), compression));
+ //Anomaly Metrics
+ stmt.executeUpdate(String.format(CREATE_ANOMALY_METRICS_TABLE_SQL,
+ ANOMALY_METRICS_TABLE_NAME,
+ encoding,
+ tableTTL.get(METRICS_AGGREGATE_HOURLY_TABLE_NAME),
+ compression));
+
+ //Trend Anomaly Metrics
+ stmt.executeUpdate(String.format(CREATE_TREND_ANOMALY_METRICS_TABLE_SQL,
+ TREND_ANOMALY_METRICS_TABLE_NAME,
+ encoding,
+ tableTTL.get(METRICS_AGGREGATE_HOURLY_TABLE_NAME),
+ compression));
+
// Host level
String precisionSql = String.format(CREATE_METRICS_TABLE_SQL,
encoding, tableTTL.get(METRICS_RECORD_TABLE_NAME), compression);
@@ -842,6 +916,48 @@
insertMetricRecords(metrics, false);
}
+ public TimelineMetrics getAnomalyMetricRecords(String method, long startTime, long endTime, Integer limit) throws SQLException {
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
+ ResultSet rs = null;
+ TimelineMetrics metrics = new TimelineMetrics();
+ try {
+ stmt = PhoenixTransactSQL.prepareAnomalyMetricsGetSqlStatement(conn, method, startTime, endTime, limit);
+ rs = stmt.executeQuery();
+ while (rs.next()) {
+
+ byte[] uuid = rs.getBytes("UUID");
+ TimelineMetric metric = metadataManagerInstance.getMetricFromUuid(uuid);
+
+ if (method.equals("ks") || method.equals("hsdev")) {
+ metric.setStartTime(rs.getLong("TEST_END_TIME"));
+ } else {
+ metric.setStartTime(rs.getLong("SERVER_TIME"));
+ }
+ metric.setInstanceId(null);
+
+ HashMap<String, String> metadata = new HashMap<>();
+ metadata.put("method", rs.getString("METHOD"));
+ metadata.put("anomaly-score", String.valueOf(rs.getDouble("ANOMALY_SCORE")));
+ if (method.equals("ks") || method.equals("hsdev")) {
+ metadata.put("test-start-time", String.valueOf(rs.getLong("TEST_START_TIME")));
+ metadata.put("train-start-time", String.valueOf(rs.getLong("TRAIN_START_TIME")));
+ metadata.put("train-end-time", String.valueOf(rs.getLong("TRAIN_END_TIME")));
+ }
+ metric.setMetadata(metadata);
+
+ TreeMap<Long, Double> sortedByTimeMetrics = readMetricFromJSON(rs.getString("METRICS"));
+ metric.setMetricValues(sortedByTimeMetrics);
+
+ metrics.getMetrics().add(metric);
+ }
+ } catch (Exception ex) {
+ LOG.error(ex);
+ }
+ return metrics;
+ }
+
+
@SuppressWarnings("unchecked")
public TimelineMetrics getMetricRecords(
final Condition condition, Multimap<String, List<Function>> metricFunctions)
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 258e9c6..85dad1f 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -322,16 +322,14 @@
public static final String TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES =
"timeline.metrics.precision.table.hbase.hstore.blockingStoreFiles";
-<<<<<<< HEAD
public static final String TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS =
"timeline.metrics.support.multiple.clusters";
public static final String TIMELINE_METRICS_EVENT_METRIC_PATTERNS =
"timeline.metrics.downsampler.event.metric.patterns";
-=======
+
public static final String TIMELINE_METRICS_UUID_GEN_STRATEGY =
"timeline.metrics.uuid.gen.strategy";
->>>>>>> AMBARI-21214 : Use a uuid vs long row key for metrics in AMS schema. (avijayan)
public static final String HOST_APP_ID = "HOST";
@@ -534,6 +532,13 @@
return defaultRpcAddress;
}
+ public String getKafkaServers() {
+ if (metricsConf != null) {
+ return metricsConf.get("timeline.metrics.kafka.servers", null);
+ }
+ return null;
+ }
+
public boolean isDistributedCollectorModeDisabled() {
try {
if (getMetricsConf() != null) {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
index dab4494..cdeefdc 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -107,4 +107,6 @@
* @return [ hostname ]
*/
List<String> getLiveInstances();
+
+ TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) throws SQLException;
}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
index 2478fb1..75a9d28 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -27,6 +27,7 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -37,6 +38,27 @@
public static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
+ public static final String CREATE_ANOMALY_METRICS_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS %s " +
+ "(UUID BINARY(20) NOT NULL, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "METRICS VARCHAR, " +
+ "METHOD VARCHAR, " +
+ "ANOMALY_SCORE DOUBLE CONSTRAINT pk " +
+ "PRIMARY KEY (UUID, SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='%s'";
+
+ public static final String CREATE_TREND_ANOMALY_METRICS_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS %s " +
+ "(UUID BINARY(20) NOT NULL, " +
+ "TEST_START_TIME UNSIGNED_LONG NOT NULL, " +
+ "TEST_END_TIME UNSIGNED_LONG NOT NULL, " +
+ "TRAIN_START_TIME UNSIGNED_LONG, " +
+ "TRAIN_END_TIME UNSIGNED_LONG, " +
+ "METRICS VARCHAR, " +
+ "METHOD VARCHAR, " +
+ "ANOMALY_SCORE DOUBLE CONSTRAINT pk " +
+ "PRIMARY KEY (UUID, TEST_START_TIME, TEST_END_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='%s'";
+
/**
* Create table to store individual metric records.
*/
@@ -146,6 +168,25 @@
*/
public static final String ALTER_SQL = "ALTER TABLE %s SET TTL=%s";
+ public static final String UPSERT_ANOMALY_METRICS_SQL = "UPSERT INTO %s " +
+ "(UUID, " +
+ "SERVER_TIME, " +
+ "METRICS, " +
+ "METHOD, " +
+ "ANOMALY_SCORE) VALUES " +
+ "(?, ?, ?, ?, ?)";
+
+ public static final String UPSERT_TREND_ANOMALY_METRICS_SQL = "UPSERT INTO %s " +
+ "(UUID, " +
+ "TEST_START_TIME, " +
+ "TEST_END_TIME, " +
+ "TRAIN_START_TIME, " +
+ "TRAIN_END_TIME, " +
+ "METRICS, " +
+ "METHOD, " +
+ "ANOMALY_SCORE) VALUES " +
+ "(?, ?, ?, ?, ?, ?, ?, ?)";
+
/**
* Insert into metric records table.
*/
@@ -221,6 +262,22 @@
public static final String UPSERT_INSTANCE_HOST_METADATA_SQL =
"UPSERT INTO INSTANCE_HOST_METADATA (INSTANCE_ID, HOSTNAME) VALUES (?, ?)";
+ public static final String GET_ANOMALY_METRIC_SQL = "SELECT UUID, SERVER_TIME, " +
+ "METRICS, " +
+ "METHOD, " +
+ "ANOMALY_SCORE " +
+ "FROM %s " +
+ "WHERE METHOD = ? AND SERVER_TIME > ? AND SERVER_TIME <= ? ORDER BY ANOMALY_SCORE DESC";
+
+ public static final String GET_TREND_ANOMALY_METRIC_SQL = "SELECT UUID, " +
+ "TEST_START_TIME, TEST_END_TIME, " +
+ "TRAIN_START_TIME, TRAIN_END_TIME, " +
+ "METRICS, " +
+ "METHOD, " +
+ "ANOMALY_SCORE " +
+ "FROM %s " +
+ "WHERE METHOD = ? AND TEST_END_TIME > ? AND TEST_END_TIME <= ? ORDER BY ANOMALY_SCORE DESC";
+
/**
* Retrieve a set of rows from metrics records table.
*/
@@ -345,6 +402,9 @@
"MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE METRIC_NAME LIKE %s AND SERVER_TIME > %s AND " +
"SERVER_TIME <= %s GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS";
+ public static final String ANOMALY_METRICS_TABLE_NAME = "METRIC_ANOMALIES";
+ public static final String TREND_ANOMALY_METRICS_TABLE_NAME = "TREND_METRIC_ANOMALIES";
+
public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
public static final String CONTAINER_METRICS_TABLE_NAME = "CONTAINER_METRICS";
@@ -407,6 +467,40 @@
PhoenixTransactSQL.sortMergeJoinEnabled = sortMergeJoinEnabled;
}
+ public static PreparedStatement prepareAnomalyMetricsGetSqlStatement(Connection connection, String method,
+ long startTime, long endTime, Integer limit) throws SQLException {
+ StringBuilder sb = new StringBuilder();
+ if (method.equals("ema") || method.equals("tukeys")) {
+ sb.append(String.format(GET_ANOMALY_METRIC_SQL, ANOMALY_METRICS_TABLE_NAME));
+ } else {
+ sb.append(String.format(GET_TREND_ANOMALY_METRIC_SQL, TREND_ANOMALY_METRICS_TABLE_NAME));
+ }
+ if (limit != null) {
+ sb.append(" LIMIT " + limit);
+ }
+ PreparedStatement stmt = null;
+ try {
+ stmt = connection.prepareStatement(sb.toString());
+ int pos = 1;
+
+ stmt.setString(pos++, method);
+ stmt.setLong(pos++, startTime);
+ stmt.setLong(pos, endTime);
+ if (limit != null) {
+ stmt.setFetchSize(limit);
+ }
+
+ } catch (SQLException e) {
+ if (stmt != null) {
+ stmt.close();
+ }
+ throw e;
+ }
+
+ return stmt;
+ }
+
+
public static PreparedStatement prepareGetMetricsSqlStmt(Connection connection,
Condition condition) throws SQLException {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/MetricAnomalyDetectorTestService.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/MetricAnomalyDetectorTestService.java
new file mode 100644
index 0000000..6f7b14a
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/MetricAnomalyDetectorTestService.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.ambari.metrics.alertservice.prototype.MetricAnomalyDetectorTestInput;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+@Singleton
+@Path("/ws/v1/metrictestservice")
+public class MetricAnomalyDetectorTestService {
+
+ private static final Log LOG = LogFactory.getLog(MetricAnomalyDetectorTestService.class);
+
+ @Inject
+ public MetricAnomalyDetectorTestService() {
+ }
+
+ private void init(HttpServletResponse response) {
+ response.setContentType(null);
+ }
+
+ @Path("/anomaly")
+ @POST
+ @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public TimelinePutResponse postAnomalyDetectionRequest(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ MetricAnomalyDetectorTestInput input) {
+
+ init(res);
+ if (input == null) {
+ return new TimelinePutResponse();
+ }
+
+ try {
+ return null;
+ } catch (Exception e) {
+ throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ @GET
+ @Path("/dataseries")
+ @Produces({MediaType.APPLICATION_JSON})
+ public TimelineMetrics getTestDataSeries(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @QueryParam("type") String seriesType,
+ @QueryParam("configs") String config
+ ) {
+ return null;
+ }
+}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
index 472a787..20aba23 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
@@ -24,7 +24,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
@@ -37,6 +36,7 @@
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.TestMetricSeriesGenerator;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.EntityIdentifier;
@@ -50,6 +50,7 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@@ -75,6 +76,10 @@
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER;
@@ -389,7 +394,7 @@
}
return timelineMetricStore.getTimelineMetrics(
- parseListStr(metricNames, ","), parseListStr(hostname, ","), appId, instanceId,
+ parseListStr(metricNames, ","), parseListStr(hostname, ","), appId, parseStr(instanceId),
parseLongStr(startTime), parseLongStr(endTime),
Precision.getPrecision(precision), parseIntStr(limit),
parseBoolean(grouped), parseTopNConfig(topN, topNFunction, isBottomN),
@@ -412,6 +417,25 @@
}
@GET
+ @Path("/metrics/anomalies")
+ @Produces({ MediaType.APPLICATION_JSON })
+ public TimelineMetrics getAnomalyMetrics(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @QueryParam("method") String method,
+ @QueryParam("startTime") String startTime,
+ @QueryParam("endTime") String endTime,
+ @QueryParam("limit") String limit
+ ) {
+ init(res);
+
+ try {
+ return timelineMetricStore.getAnomalyMetrics(method, parseLongStr(startTime), parseLongStr(endTime), parseIntStr(limit));
+ } catch (Exception e) {
+ throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+ @GET
@Path("/metrics/metadata")
@Produces({ MediaType.APPLICATION_JSON })
public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(
@@ -660,6 +684,12 @@
}
private static String parseStr(String str) {
- return str == null ? null : str.trim();
+ String trimmedInstance = (str == null) ? null : str.trim();
+ if (trimmedInstance != null) {
+ if (trimmedInstance.isEmpty() || trimmedInstance.equalsIgnoreCase("undefined")) {
+ trimmedInstance = null;
+ }
+ }
+ return trimmedInstance;
}
}
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
index 07e0daa..7c879e1 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
@@ -119,4 +119,9 @@
return null;
}
+ @Override
+ public TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) {
+ return null;
+ }
+
}