blob: da3999a1c800266d7886ba5b241b2ac6e8dd8a25 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.metrics.adservice.prototype.core;
import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.codehaus.jackson.map.AnnotationIntrospector;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.TreeMap;
public class MetricsCollectorInterface implements Serializable {
private static String hostName = null;
private String instanceId = null;
public final static String serviceName = "anomaly-engine";
private String collectorHost;
private String protocol;
private String port;
private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics";
private static final Log LOG = LogFactory.getLog(MetricsCollectorInterface.class);
private static ObjectMapper mapper;
private final static ObjectReader timelineObjectReader;
static {
mapper = new ObjectMapper();
AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
mapper.setAnnotationIntrospector(introspector);
mapper.getSerializationConfig()
.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
timelineObjectReader = mapper.reader(TimelineMetrics.class);
}
public MetricsCollectorInterface(String collectorHost, String protocol, String port) {
this.collectorHost = collectorHost;
this.protocol = protocol;
this.port = port;
this.hostName = getDefaultLocalHostName();
}
public static String getDefaultLocalHostName() {
if (hostName != null) {
return hostName;
}
try {
return InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
LOG.info("Error getting host address");
}
return null;
}
public void publish(List<MetricAnomaly> metricAnomalies) {
if (CollectionUtils.isNotEmpty(metricAnomalies)) {
LOG.info("Sending metric anomalies of size : " + metricAnomalies.size());
List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies);
if (!metricList.isEmpty()) {
TimelineMetrics timelineMetrics = new TimelineMetrics();
timelineMetrics.setMetrics(metricList);
emitMetrics(timelineMetrics);
}
} else {
LOG.debug("No anomalies to send.");
}
}
private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) {
List<TimelineMetric> metrics = new ArrayList<>();
if (metricAnomalies.isEmpty()) {
return metrics;
}
for (MetricAnomaly anomaly : metricAnomalies) {
TimelineMetric timelineMetric = new TimelineMetric();
timelineMetric.setMetricName(anomaly.getMetricKey());
timelineMetric.setAppId(serviceName + "-" + anomaly.getMethodType());
timelineMetric.setInstanceId(null);
timelineMetric.setHostName(getDefaultLocalHostName());
timelineMetric.setStartTime(anomaly.getTimestamp());
HashMap<String, String> metadata = new HashMap<>();
metadata.put("method", anomaly.getMethodType());
metadata.put("anomaly-score", String.valueOf(anomaly.getAnomalyScore()));
timelineMetric.setMetadata(metadata);
TreeMap<Long,Double> metricValues = new TreeMap<>();
metricValues.put(anomaly.getTimestamp(), anomaly.getMetricValue());
timelineMetric.setMetricValues(metricValues);
metrics.add(timelineMetric);
}
return metrics;
}
public boolean emitMetrics(TimelineMetrics metrics) {
String connectUrl = constructTimelineMetricUri();
String jsonData = null;
LOG.debug("EmitMetrics connectUrl = " + connectUrl);
try {
jsonData = mapper.writeValueAsString(metrics);
LOG.info(jsonData);
} catch (IOException e) {
LOG.error("Unable to parse metrics", e);
}
if (jsonData != null) {
return emitMetricsJson(connectUrl, jsonData);
}
return false;
}
private HttpURLConnection getConnection(String spec) throws IOException {
return (HttpURLConnection) new URL(spec).openConnection();
}
private boolean emitMetricsJson(String connectUrl, String jsonData) {
int timeout = 10000;
HttpURLConnection connection = null;
try {
if (connectUrl == null) {
throw new IOException("Unknown URL. Unable to connect to metrics collector.");
}
connection = getConnection(connectUrl);
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json");
connection.setRequestProperty("Connection", "Keep-Alive");
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
connection.setDoOutput(true);
if (jsonData != null) {
try (OutputStream os = connection.getOutputStream()) {
os.write(jsonData.getBytes("UTF-8"));
}
}
int statusCode = connection.getResponseCode();
if (statusCode != 200) {
LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
"statusCode = " + statusCode);
} else {
LOG.info("Metrics posted to Collector " + connectUrl);
}
return true;
} catch (IOException ioe) {
LOG.error(ioe.getMessage());
}
return false;
}
private String constructTimelineMetricUri() {
StringBuilder sb = new StringBuilder(protocol);
sb.append("://");
sb.append(collectorHost);
sb.append(":");
sb.append(port);
sb.append(WS_V1_TIMELINE_METRICS);
return sb.toString();
}
public TimelineMetrics fetchMetrics(String metricName,
String appId,
String hostname,
long startime,
long endtime) {
String url = constructTimelineMetricUri() + "?metricNames=" + metricName + "&appId=" + appId +
"&hostname=" + hostname + "&startTime=" + startime + "&endTime=" + endtime;
LOG.debug("Fetch metrics URL : " + url);
URL obj = null;
BufferedReader in = null;
TimelineMetrics timelineMetrics = new TimelineMetrics();
try {
obj = new URL(url);
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
con.setRequestMethod("GET");
int responseCode = con.getResponseCode();
LOG.debug("Sending 'GET' request to URL : " + url);
LOG.debug("Response Code : " + responseCode);
in = new BufferedReader(
new InputStreamReader(con.getInputStream()));
timelineMetrics = timelineObjectReader.readValue(in);
} catch (Exception e) {
LOG.error(e);
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
LOG.warn(e);
}
}
}
LOG.info("Fetched " + timelineMetrics.getMetrics().size() + " metrics.");
return timelineMetrics;
}
}