blob: 5a6bb6161bb0878fd69af9ce5934a9c5ec2c36c4 [file] [log] [blame]
package org.apache.ambari.metrics.alertservice.spark;
import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
import org.apache.ambari.metrics.alertservice.common.TimelineMetrics;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.jackson.map.AnnotationIntrospector;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.*;
public class AnomalyMetricPublisher implements Serializable {
private String hostName = "UNKNOWN.example.com";
private String instanceId = null;
private String serviceName = "anomaly-engine";
private String collectorHost;
private String protocol;
private String port;
private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics";
private static final Log LOG = LogFactory.getLog(AnomalyMetricPublisher.class);
private static ObjectMapper mapper;
static {
mapper = new ObjectMapper();
AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
mapper.setAnnotationIntrospector(introspector);
mapper.getSerializationConfig()
.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
}
public AnomalyMetricPublisher(String collectorHost, String protocol, String port) {
this.collectorHost = collectorHost;
this.protocol = protocol;
this.port = port;
this.hostName = getDefaultLocalHostName();
}
private String getDefaultLocalHostName() {
try {
return InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
LOG.info("Error getting host address");
}
return null;
}
public void publish(List<MetricAnomaly> metricAnomalies) {
LOG.info("Sending metric anomalies of size : " + metricAnomalies.size());
List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies);
LOG.info("Sending TimelineMetric list of size : " + metricList.size());
if (!metricList.isEmpty()) {
TimelineMetrics timelineMetrics = new TimelineMetrics();
timelineMetrics.setMetrics(metricList);
emitMetrics(timelineMetrics);
}
}
private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) {
List<TimelineMetric> metrics = new ArrayList<>();
if (metricAnomalies.isEmpty()) {
return metrics;
}
long currentTime = System.currentTimeMillis();
MetricAnomaly prevAnomaly = metricAnomalies.get(0);
TimelineMetric timelineMetric = new TimelineMetric();
timelineMetric.setMetricName(prevAnomaly.getMetricKey() + "_" + prevAnomaly.getMethodResult().getMethodType());
timelineMetric.setAppId(serviceName);
timelineMetric.setInstanceId(instanceId);
timelineMetric.setHostName(hostName);
timelineMetric.setStartTime(currentTime);
TreeMap<Long,Double> metricValues = new TreeMap<>();
metricValues.put(prevAnomaly.getTimestamp(), prevAnomaly.getMetricValue());
MetricAnomaly currentAnomaly;
for (int i = 1; i < metricAnomalies.size(); i++) {
currentAnomaly = metricAnomalies.get(i);
if (currentAnomaly.getMetricKey().equals(prevAnomaly.getMetricKey())) {
metricValues.put(currentAnomaly.getTimestamp(), currentAnomaly.getMetricValue());
} else {
timelineMetric.setMetricValues(metricValues);
metrics.add(timelineMetric);
timelineMetric = new TimelineMetric();
timelineMetric.setMetricName(currentAnomaly.getMetricKey() + "_" + currentAnomaly.getMethodResult().getMethodType());
timelineMetric.setAppId(serviceName);
timelineMetric.setInstanceId(instanceId);
timelineMetric.setHostName(hostName);
timelineMetric.setStartTime(currentTime);
metricValues = new TreeMap<>();
metricValues.put(currentAnomaly.getTimestamp(), currentAnomaly.getMetricValue());
prevAnomaly = currentAnomaly;
}
}
timelineMetric.setMetricValues(metricValues);
metrics.add(timelineMetric);
return metrics;
}
private boolean emitMetrics(TimelineMetrics metrics) {
String connectUrl = constructTimelineMetricUri();
String jsonData = null;
LOG.info("EmitMetrics connectUrl = " + connectUrl);
try {
jsonData = mapper.writeValueAsString(metrics);
} catch (IOException e) {
LOG.error("Unable to parse metrics", e);
}
if (jsonData != null) {
return emitMetricsJson(connectUrl, jsonData);
}
return false;
}
private HttpURLConnection getConnection(String spec) throws IOException {
return (HttpURLConnection) new URL(spec).openConnection();
}
private boolean emitMetricsJson(String connectUrl, String jsonData) {
LOG.info("Metrics Data : " + jsonData);
int timeout = 10000;
HttpURLConnection connection = null;
try {
if (connectUrl == null) {
throw new IOException("Unknown URL. Unable to connect to metrics collector.");
}
connection = getConnection(connectUrl);
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json");
connection.setRequestProperty("Connection", "Keep-Alive");
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
connection.setDoOutput(true);
if (jsonData != null) {
try (OutputStream os = connection.getOutputStream()) {
os.write(jsonData.getBytes("UTF-8"));
}
}
int statusCode = connection.getResponseCode();
if (statusCode != 200) {
LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
"statusCode = " + statusCode);
} else {
LOG.info("Metrics posted to Collector " + connectUrl);
}
return true;
} catch (IOException ioe) {
LOG.error(ioe.getMessage());
}
return false;
}
private String constructTimelineMetricUri() {
StringBuilder sb = new StringBuilder(protocol);
sb.append("://");
sb.append(collectorHost);
sb.append(":");
sb.append(port);
sb.append(WS_V1_TIMELINE_METRICS);
return sb.toString();
}
}