blob: f379605a1beeb853a3cc2be42b427ee5bf46a8bb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.metrics.adservice.prototype.core;
import org.apache.ambari.metrics.adservice.prototype.common.DataSeries;
import org.apache.ambari.metrics.adservice.prototype.common.ResultSet;
import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaModel;
import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
public class PointInTimeADSystem implements Serializable {
//private EmaTechnique emaTechnique;
private MetricsCollectorInterface metricsCollectorInterface;
private Map<String, Double> tukeysNMap;
private double defaultTukeysN = 3;
private long testIntervalMillis = 5*60*1000; //10mins
private long trainIntervalMillis = 15*60*1000; //1hour
private static final Log LOG = LogFactory.getLog(PointInTimeADSystem.class);
private AmbariServerInterface ambariServerInterface;
private int sensitivity = 50;
private int minSensitivity = 0;
private int maxSensitivity = 100;
public PointInTimeADSystem(MetricsCollectorInterface metricsCollectorInterface, double defaultTukeysN,
long testIntervalMillis, long trainIntervalMillis, String ambariServerHost, String clusterName) {
this.metricsCollectorInterface = metricsCollectorInterface;
this.defaultTukeysN = defaultTukeysN;
this.tukeysNMap = new HashMap<>();
this.testIntervalMillis = testIntervalMillis;
this.trainIntervalMillis = trainIntervalMillis;
this.ambariServerInterface = new AmbariServerInterface(ambariServerHost, clusterName);
LOG.info("Starting PointInTimeADSystem...");
}
public void runTukeysAndRefineEma(EmaTechnique emaTechnique, long startTime) {
LOG.info("Running Tukeys for test data interval [" + new Date(startTime - testIntervalMillis) + " : " + new Date(startTime) + "], with train data period [" + new Date(startTime - testIntervalMillis - trainIntervalMillis) + " : " + new Date(startTime - testIntervalMillis) + "]");
int requiredSensivity = ambariServerInterface.getPointInTimeSensitivity();
if (requiredSensivity == -1 || requiredSensivity == sensitivity) {
LOG.info("No change in sensitivity needed.");
} else {
LOG.info("Current tukey's N value = " + defaultTukeysN);
if (requiredSensivity > sensitivity) {
int targetSensitivity = Math.min(maxSensitivity, requiredSensivity);
while (sensitivity < targetSensitivity) {
defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.05;
sensitivity++;
}
} else {
int targetSensitivity = Math.max(minSensitivity, requiredSensivity);
while (sensitivity > targetSensitivity) {
defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.05;
sensitivity--;
}
}
LOG.info("New tukey's N value = " + defaultTukeysN);
}
TimelineMetrics timelineMetrics = new TimelineMetrics();
for (String metricKey : emaTechnique.getTrackedEmas().keySet()) {
LOG.info("EMA key = " + metricKey);
EmaModel emaModel = emaTechnique.getTrackedEmas().get(metricKey);
String metricName = emaModel.getMetricName();
String appId = emaModel.getAppId();
String hostname = emaModel.getHostname();
TimelineMetrics tukeysData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, startTime - (testIntervalMillis + trainIntervalMillis),
startTime);
if (tukeysData.getMetrics().isEmpty()) {
LOG.info("No metrics fetched for Tukeys, metricKey = " + metricKey);
continue;
}
List<Double> trainTsList = new ArrayList<>();
List<Double> trainDataList = new ArrayList<>();
List<Double> testTsList = new ArrayList<>();
List<Double> testDataList = new ArrayList<>();
for (TimelineMetric metric : tukeysData.getMetrics()) {
for (Long timestamp : metric.getMetricValues().keySet()) {
if (timestamp <= (startTime - testIntervalMillis)) {
trainDataList.add(metric.getMetricValues().get(timestamp));
trainTsList.add((double)timestamp);
} else {
testDataList.add(metric.getMetricValues().get(timestamp));
testTsList.add((double)timestamp);
}
}
}
if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
LOG.info("Not enough train/test data to perform analysis.");
continue;
}
String tukeysTrainSeries = "tukeysTrainSeries";
double[] trainTs = new double[trainTsList.size()];
double[] trainData = new double[trainTsList.size()];
for (int i = 0; i < trainTs.length; i++) {
trainTs[i] = trainTsList.get(i);
trainData[i] = trainDataList.get(i);
}
String tukeysTestSeries = "tukeysTestSeries";
double[] testTs = new double[testTsList.size()];
double[] testData = new double[testTsList.size()];
for (int i = 0; i < testTs.length; i++) {
testTs[i] = testTsList.get(i);
testData[i] = testDataList.get(i);
}
LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
DataSeries tukeysTrainData = new DataSeries(tukeysTrainSeries, trainTs, trainData);
DataSeries tukeysTestData = new DataSeries(tukeysTestSeries, testTs, testData);
if (!tukeysNMap.containsKey(metricKey)) {
tukeysNMap.put(metricKey, defaultTukeysN);
}
Map<String, String> configs = new HashMap<>();
configs.put("tukeys.n", String.valueOf(tukeysNMap.get(metricKey)));
ResultSet rs = RFunctionInvoker.tukeys(tukeysTrainData, tukeysTestData, configs);
List<TimelineMetric> tukeysMetrics = getAsTimelineMetric(rs, metricName, appId, hostname);
LOG.info("Tukeys anomalies size : " + tukeysMetrics.size());
TreeMap<Long, Double> tukeysMetricValues = new TreeMap<>();
for (TimelineMetric tukeysMetric : tukeysMetrics) {
tukeysMetricValues.putAll(tukeysMetric.getMetricValues());
timelineMetrics.addOrMergeTimelineMetric(tukeysMetric);
}
TimelineMetrics emaData = metricsCollectorInterface.fetchMetrics(metricKey, MetricsCollectorInterface.serviceName+"-ema", MetricsCollectorInterface.getDefaultLocalHostName(), startTime - testIntervalMillis, startTime);
TreeMap<Long, Double> emaMetricValues = new TreeMap();
if (!emaData.getMetrics().isEmpty()) {
emaMetricValues = emaData.getMetrics().get(0).getMetricValues();
}
LOG.info("Ema anomalies size : " + emaMetricValues.size());
int tp = 0;
int tn = 0;
int fp = 0;
int fn = 0;
for (double ts : testTs) {
long timestamp = (long) ts;
if (tukeysMetricValues.containsKey(timestamp)) {
if (emaMetricValues.containsKey(timestamp)) {
tp++;
} else {
fn++;
}
} else {
if (emaMetricValues.containsKey(timestamp)) {
fp++;
} else {
tn++;
}
}
}
double recall = (double) tp / (double) (tp + fn);
double precision = (double) tp / (double) (tp + fp);
LOG.info("----------------------------");
LOG.info("Precision Recall values for " + metricKey);
LOG.info("tp=" + tp + ", fp=" + fp + ", tn=" + tn + ", fn=" + fn);
LOG.info("----------------------------");
if (recall < 0.5) {
LOG.info("Increasing EMA sensitivity by 10%");
emaModel.updateModel(true, 5);
} else if (precision < 0.5) {
LOG.info("Decreasing EMA sensitivity by 10%");
emaModel.updateModel(false, 5);
}
}
if (emaTechnique.getTrackedEmas().isEmpty()){
LOG.info("No EMA Technique keys tracked!!!!");
}
if (!timelineMetrics.getMetrics().isEmpty()) {
metricsCollectorInterface.emitMetrics(timelineMetrics);
}
}
private static List<TimelineMetric> getAsTimelineMetric(ResultSet result, String metricName, String appId, String hostname) {
List<TimelineMetric> timelineMetrics = new ArrayList<>();
if (result == null) {
LOG.info("ResultSet from R call is null!!");
return null;
}
if (result.resultset.size() > 0) {
double[] ts = result.resultset.get(0);
double[] metrics = result.resultset.get(1);
double[] anomalyScore = result.resultset.get(2);
for (int i = 0; i < ts.length; i++) {
TimelineMetric timelineMetric = new TimelineMetric();
timelineMetric.setMetricName(metricName + ":" + appId + ":" + hostname);
timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-tukeys");
timelineMetric.setInstanceId(null);
timelineMetric.setStartTime((long) ts[i]);
TreeMap<Long, Double> metricValues = new TreeMap<>();
metricValues.put((long) ts[i], metrics[i]);
HashMap<String, String> metadata = new HashMap<>();
metadata.put("method", "tukeys");
if (String.valueOf(anomalyScore[i]).equals("infinity")) {
LOG.info("Got anomalyScore = infinity for " + metricName + ":" + appId + ":" + hostname);
} else {
metadata.put("anomaly-score", String.valueOf(anomalyScore[i]));
}
timelineMetric.setMetadata(metadata);
timelineMetric.setMetricValues(metricValues);
timelineMetrics.add(timelineMetric);
}
}
return timelineMetrics;
}
}