blob: 4fcf2fb9508d77be6b06952204364d0cdd72cf8e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.sink.storm;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
import org.apache.storm.metric.api.DataPoint;
import org.apache.storm.metric.api.IClusterMetricsConsumer;
public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
implements IClusterMetricsConsumer {
public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId";
public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "nimbus";
private String hostname;
private String port;
private Collection<String> collectorHosts;
private String zkQuorum;
private String protocol;
private boolean setInstanceId;
private String instanceId;
private String applicationId;
private int timeoutSeconds;
private boolean hostInMemoryAggregationEnabled;
private int hostInMemoryAggregationPort;
private String hostInMemoryAggregationProtocol;
public StormTimelineMetricsReporter() {
}
@Override
protected String getCollectorUri(String host) {
return constructTimelineMetricUri(protocol, host, port);
}
@Override
protected String getCollectorProtocol() {
return protocol;
}
@Override
protected int getTimeoutSeconds() {
return timeoutSeconds;
}
@Override
protected String getZookeeperQuorum() {
return zkQuorum;
}
@Override
protected String getCollectorPort() {
return port;
}
@Override
protected Collection<String> getConfiguredCollectorHosts() {
return collectorHosts;
}
@Override
protected String getHostname() {
return hostname;
}
@Override
protected boolean isHostInMemoryAggregationEnabled() {
return hostInMemoryAggregationEnabled;
}
@Override
protected int getHostInMemoryAggregationPort() {
return hostInMemoryAggregationPort;
}
@Override
protected String getHostInMemoryAggregationProtocol() {
return hostInMemoryAggregationProtocol;
}
@Override
public void prepare(Object registrationArgument) {
LOG.info("Preparing Storm Metrics Reporter");
try {
try {
hostname = InetAddress.getLocalHost().getHostName();
// If not FQDN , call DNS
if ((hostname == null) || (!hostname.contains("."))) {
hostname = InetAddress.getLocalHost().getCanonicalHostName();
}
} catch (UnknownHostException e) {
LOG.error("Could not identify hostname.");
throw new RuntimeException("Could not identify hostname.", e);
}
Configuration configuration = new Configuration("/storm-metrics2.properties");
collectorHosts = parseHostsStringIntoCollection(configuration.getProperty(COLLECTOR_HOSTS_PROPERTY));
protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
port = configuration.getProperty(COLLECTOR_PORT, "6188");
zkQuorum = StringUtils.isEmpty(configuration.getProperty(COLLECTOR_ZOOKEEPER_QUORUM)) ?
configuration.getProperty(ZOOKEEPER_QUORUM) : configuration.getProperty(COLLECTOR_ZOOKEEPER_QUORUM);
timeoutSeconds = configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS) != null ?
Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS)) :
DEFAULT_POST_TIMEOUT_SECONDS;
applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID);
setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY));
instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, "false"));
hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, "61888"));
hostInMemoryAggregationProtocol = configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
}
} catch (Exception e) {
LOG.warn("Could not initialize metrics collector, please specify " +
"protocol, host, port, appId, zkQuorum under $STORM_HOME/conf/storm-metrics2.properties ", e);
}
// Initialize the collector write strategy
super.init();
}
@Override
public void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> dataPoints) {
long timestamp = clusterInfo.getTimestamp();
List<TimelineMetric> totalMetrics = new ArrayList<>();
for (DataPoint dataPoint : dataPoints) {
LOG.debug(dataPoint.getName() + " = " + dataPoint.getValue());
List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
for (DataPoint populatedDataPoint : populatedDataPoints) {
LOG.debug("Populated datapoint: " + dataPoint.getName() + " = " + dataPoint.getValue());
try {
StormAmbariMappedMetric mappedMetric = StormAmbariMappedMetric
.valueOf(populatedDataPoint.getName());
TimelineMetric timelineMetric = createTimelineMetric(timestamp * 1000, applicationId,
mappedMetric.getAmbariMetricName(),
Double.valueOf(populatedDataPoint.getValue().toString()));
totalMetrics.add(timelineMetric);
} catch (IllegalArgumentException e) {
// not interested metrics on Ambari, skip
LOG.debug("Not interested metrics, skip: " + populatedDataPoint.getName());
}
}
}
if (totalMetrics.size() <= 0) {
return;
}
TimelineMetrics timelineMetrics = new TimelineMetrics();
timelineMetrics.setMetrics(totalMetrics);
try {
emitMetrics(timelineMetrics);
} catch (UnableToConnectException e) {
LOG.warn("Unable to connect to Metrics Collector " + e.getConnectUrl() + ". " + e.getMessage());
}
}
@Override
public void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> dataPoints) {
// Ambari is not interested on metrics on each supervisor
}
@Override
public void cleanup() {
LOG.info("Stopping Storm Metrics Reporter");
}
private List<DataPoint> populateDataPoints(DataPoint dataPoint) {
List<DataPoint> dataPoints = new ArrayList<>();
if (dataPoint.getValue() == null) {
LOG.warn("Data point with name " + dataPoint.getName() + " is null. Discarding." + dataPoint
.getName());
} else if (dataPoint.getValue() instanceof Map) {
Map<String, Object> dataMap = (Map<String, Object>) dataPoint.getValue();
for (Map.Entry<String, Object> entry : dataMap.entrySet()) {
Double value = convertValueToDouble(entry.getKey(), entry.getValue());
if (value != null) {
dataPoints.add(new DataPoint(dataPoint.getName() + "." + entry.getKey(), value));
}
}
} else {
Double value = convertValueToDouble(dataPoint.getName(), dataPoint.getValue());
if (value != null) {
dataPoints.add(new DataPoint(dataPoint.getName(), value));
}
}
return dataPoints;
}
private Double convertValueToDouble(String metricName, Object value) {
try {
Double converted = NumberUtil.convertValueToDouble(value);
if (converted == null) {
LOG.warn("Data point with name " + metricName + " has value " + value +
" which is not supported. Discarding.");
}
return converted;
} catch (NumberFormatException e) {
LOG.warn("Data point with name " + metricName + " doesn't have number format value " +
value + ". Discarding.");
return null;
}
}
private TimelineMetric createTimelineMetric(long currentTimeMillis, String component,
String attributeName, Double attributeValue) {
TimelineMetric timelineMetric = new TimelineMetric();
timelineMetric.setMetricName(attributeName);
timelineMetric.setHostName(hostname);
if (setInstanceId) {
timelineMetric.setInstanceId(instanceId);
}
timelineMetric.setAppId(component);
timelineMetric.setStartTime(currentTimeMillis);
timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number"));
timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue);
return timelineMetric;
}
enum StormAmbariMappedMetric {
supervisors("Supervisors"),
topologies("Topologies"),
slotsTotal("Total Slots"),
slotsUsed("Used Slots"),
slotsFree("Free Slots"),
executorsTotal("Total Executors"),
tasksTotal("Total Tasks");
private String ambariMetricName;
StormAmbariMappedMetric(String ambariMetricName) {
this.ambariMetricName = ambariMetricName;
}
public String getAmbariMetricName() {
return ambariMetricName;
}
}
}