blob: 720c37153ddf2a8251136c31a751dbea0192395a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.sink.flume;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.flume.Context;
import org.apache.flume.FlumeException;
import org.apache.flume.instrumentation.MonitorService;
import org.apache.flume.instrumentation.util.JMXPollUtil;
import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implements MonitorService {
private String collectorUri;
private String protocol;
// Key - component(instance_id)
private Map<String, TimelineMetricsCache> metricsCaches;
private int maxRowCacheSize;
private int metricsSendInterval;
private ScheduledExecutorService scheduledExecutorService;
private long pollFrequency;
private String hostname;
private String port;
private Collection<String> collectorHosts;
private String zookeeperQuorum;
private final static String COUNTER_METRICS_PROPERTY = "counters";
private final Set<String> counterMetrics = new HashSet<String>();
private int timeoutSeconds = 10;
private boolean setInstanceId;
private String instanceId;
private boolean hostInMemoryAggregationEnabled;
private int hostInMemoryAggregationPort;
private String hostInMemoryAggregationProtocol;
@Override
public void start() {
LOG.info("Starting Flume Metrics Sink");
TimelineMetricsCollector timelineMetricsCollector = new TimelineMetricsCollector();
if (scheduledExecutorService == null || scheduledExecutorService.isShutdown() || scheduledExecutorService.isTerminated()) {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
}
scheduledExecutorService.scheduleWithFixedDelay(timelineMetricsCollector, 0,
pollFrequency, TimeUnit.MILLISECONDS);
}
@Override
public void stop() {
LOG.info("Stopping Flume Metrics Sink");
scheduledExecutorService.shutdown();
}
@Override
public void configure(Context context) {
LOG.info("Context parameters " + context);
try {
hostname = InetAddress.getLocalHost().getHostName();
//If not FQDN , call DNS
if ((hostname == null) || (!hostname.contains("."))) {
hostname = InetAddress.getLocalHost().getCanonicalHostName();
}
hostname = hostname.toLowerCase();
} catch (UnknownHostException e) {
LOG.error("Could not identify hostname.");
throw new FlumeException("Could not identify hostname.", e);
}
Configuration configuration = new Configuration("/flume-metrics2.properties");
timeoutSeconds = Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS,
String.valueOf(DEFAULT_POST_TIMEOUT_SECONDS)));
maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
metricsCaches = new HashMap<String, TimelineMetricsCache>();
collectorHosts = parseHostsStringIntoCollection(configuration.getProperty(COLLECTOR_HOSTS_PROPERTY));
zookeeperQuorum = configuration.getProperty("zookeeper.quorum");
protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
port = configuration.getProperty(COLLECTOR_PORT, "6188");
setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, "");
hostInMemoryAggregationEnabled = Boolean.getBoolean(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, "false"));
hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, "61888"));
hostInMemoryAggregationProtocol = configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
// Initialize the collector write strategy
super.init();
if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
}
collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port);
pollFrequency = Long.parseLong(configuration.getProperty("collectionFrequency"));
String[] metrics = configuration.getProperty(COUNTER_METRICS_PROPERTY).trim().split(",");
Collections.addAll(counterMetrics, metrics);
}
@Override
public String getCollectorUri(String host) {
return constructTimelineMetricUri(protocol, host, port);
}
@Override
protected String getCollectorProtocol() {
return protocol;
}
@Override
protected String getCollectorPort() {
return port;
}
@Override
protected int getTimeoutSeconds() {
return timeoutSeconds;
}
@Override
protected String getZookeeperQuorum() {
return zookeeperQuorum;
}
@Override
protected Collection<String> getConfiguredCollectorHosts() {
return collectorHosts;
}
@Override
protected String getHostname() {
return hostname;
}
@Override
protected boolean isHostInMemoryAggregationEnabled() {
return hostInMemoryAggregationEnabled;
}
@Override
protected int getHostInMemoryAggregationPort() {
return hostInMemoryAggregationPort;
}
@Override
protected String getHostInMemoryAggregationProtocol() {
return hostInMemoryAggregationProtocol;
}
public void setPollFrequency(long pollFrequency) {
this.pollFrequency = pollFrequency;
}
//Test hepler method
protected void setMetricsCaches(Map<String, TimelineMetricsCache> metricsCaches) {
this.metricsCaches = metricsCaches;
}
/**
* Worker which polls JMX for all mbeans with
* {@link javax.management.ObjectName} within the flume namespace:
* org.apache.flume. All attributes of such beans are sent
* to the metrics collector service.
*/
class TimelineMetricsCollector implements Runnable {
@Override
public void run() {
LOG.debug("Collecting Metrics for Flume");
try {
Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
long currentTimeMillis = System.currentTimeMillis();
for (String component : metricsMap.keySet()) {
Map<String, String> attributeMap = metricsMap.get(component);
LOG.debug("Attributes for component " + component);
processComponentAttributes(currentTimeMillis, component, attributeMap);
}
} catch (UnableToConnectException uce) {
LOG.warn("Unable to send metrics to collector by address:" + uce.getConnectUrl());
} catch (Exception e) {
LOG.error("Unexpected error", e);
}
LOG.debug("Finished collecting Metrics for Flume");
}
private void processComponentAttributes(long currentTimeMillis, String component, Map<String, String> attributeMap) throws IOException {
List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
if (!metricsCaches.containsKey(component)) {
metricsCaches.put(component, new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
}
TimelineMetricsCache metricsCache = metricsCaches.get(component);
for (String attributeName : attributeMap.keySet()) {
String attributeValue = attributeMap.get(attributeName);
if (NumberUtils.isNumber(attributeValue)) {
LOG.info(attributeName + " = " + attributeValue);
TimelineMetric timelineMetric = createTimelineMetric(currentTimeMillis,
component, attributeName, attributeValue);
// Put intermediate values into the cache until it is time to send
metricsCache.putTimelineMetric(timelineMetric, isCounterMetric(attributeName));
TimelineMetric cachedMetric = metricsCache.getTimelineMetric(attributeName);
if (cachedMetric != null) {
metricList.add(cachedMetric);
}
}
}
if (!metricList.isEmpty()) {
TimelineMetrics timelineMetrics = new TimelineMetrics();
timelineMetrics.setMetrics(metricList);
emitMetrics(timelineMetrics);
}
}
private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, String attributeValue) {
TimelineMetric timelineMetric = new TimelineMetric();
timelineMetric.setMetricName(attributeName);
timelineMetric.setHostName(hostname);
if (setInstanceId) {
timelineMetric.setInstanceId(instanceId + component);
} else {
timelineMetric.setInstanceId(component);
}
timelineMetric.setAppId("FLUME_HANDLER");
timelineMetric.setStartTime(currentTimeMillis);
timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
return timelineMetric;
}
}
private boolean isCounterMetric(String attributeName) {
return counterMetrics.contains(attributeName);
}
}