blob: b1f60fa10d38552bb4ffd3a38b435d627b99ba14 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.host.aggregator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.codehaus.jackson.map.AnnotationIntrospector;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Map;
/**
* Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals.
*/
public abstract class AbstractMetricPublisherThread extends Thread {
protected int publishIntervalInSeconds;
protected String publishURL;
protected ObjectMapper objectMapper;
private Log LOG;
protected TimelineMetricsHolder timelineMetricsHolder;
public AbstractMetricPublisherThread(TimelineMetricsHolder timelineMetricsHolder, String publishURL, int publishIntervalInSeconds) {
LOG = LogFactory.getLog(this.getClass());
this.publishURL = publishURL;
this.publishIntervalInSeconds = publishIntervalInSeconds;
this.timelineMetricsHolder = timelineMetricsHolder;
objectMapper = new ObjectMapper();
AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
objectMapper.setAnnotationIntrospector(introspector);
objectMapper.getSerializationConfig()
.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
}
/**
* Publishes metrics to collector in specified intervals while not interrupted.
*/
@Override
public void run() {
while (!isInterrupted()) {
try {
sleep(this.publishIntervalInSeconds * 1000);
} catch (InterruptedException e) {
//Ignore
}
try {
processAndPublishMetrics(getMetricsFromCache());
} catch (Exception e) {
LOG.error("Couldn't process and send metrics : ",e);
}
}
}
/**
* Processes and sends metrics to collector.
* @param metricsFromCache
* @throws Exception
*/
protected void processAndPublishMetrics(Map<Long, TimelineMetrics> metricsFromCache) throws Exception {
if (metricsFromCache.size()==0) return;
LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size()));
publishMetricsJson(processMetrics(metricsFromCache));
}
/**
* Returns metrics map. Source is based on implementation.
* @return
*/
protected abstract Map<Long,TimelineMetrics> getMetricsFromCache();
/**
* Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector
* @param metricValues
* @return
*/
protected abstract String processMetrics(Map<Long, TimelineMetrics> metricValues);
protected void publishMetricsJson(String jsonData) throws Exception {
int timeout = 5 * 1000;
HttpURLConnection connection = null;
if (this.publishURL == null) {
throw new IOException("Unknown URL. Unable to connect to metrics collector.");
}
LOG.info("Collector URL : " + publishURL);
connection = (HttpURLConnection) new URL(this.publishURL).openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json");
connection.setRequestProperty("Connection", "Keep-Alive");
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
connection.setDoOutput(true);
if (jsonData != null) {
try (OutputStream os = connection.getOutputStream()) {
os.write(jsonData.getBytes("UTF-8"));
}
}
int responseCode = connection.getResponseCode();
if (responseCode != 200) {
throw new Exception("responseCode is " + responseCode);
}
LOG.info("Successfully sent metrics.");
}
/**
* Interrupts the thread.
*/
protected void stopPublisher() {
this.interrupt();
}
}