blob: 8f2adadf2e1950ff1836a05ad03c21998eb6930a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.controller.ganglia;
import org.apache.ambari.server.controller.internal.AbstractPropertyProvider;
import org.apache.ambari.server.controller.internal.PropertyInfo;
import org.apache.ambari.server.controller.spi.*;
import org.apache.ambari.server.controller.utilities.StreamProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.*;
/**
* Abstract property provider implementation for a Ganglia source.
*/
public abstract class GangliaPropertyProvider extends AbstractPropertyProvider {
private final StreamProvider streamProvider;
private final GangliaHostProvider hostProvider;
private final String clusterNamePropertyId;
private final String hostNamePropertyId;
private final String componentNamePropertyId;
/**
* Map of Ganglia cluster names keyed by component type.
*/
public static final Map<String, String> GANGLIA_CLUSTER_NAME_MAP = new HashMap<String, String>();
static {
GANGLIA_CLUSTER_NAME_MAP.put("NAMENODE", "HDPNameNode");
GANGLIA_CLUSTER_NAME_MAP.put("DATANODE", "HDPSlaves");
GANGLIA_CLUSTER_NAME_MAP.put("JOBTRACKER", "HDPJobTracker");
GANGLIA_CLUSTER_NAME_MAP.put("TASKTRACKER", "HDPSlaves");
GANGLIA_CLUSTER_NAME_MAP.put("HBASE_MASTER", "HDPHBaseMaster");
GANGLIA_CLUSTER_NAME_MAP.put("HBASE_CLIENT", "HDPSlaves");
GANGLIA_CLUSTER_NAME_MAP.put("HBASE_REGIONSERVER", "HDPSlaves");
}
protected final static Logger LOG =
LoggerFactory.getLogger(GangliaPropertyProvider.class);
// ----- Constructors ------------------------------------------------------
public GangliaPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
StreamProvider streamProvider,
GangliaHostProvider hostProvider,
String clusterNamePropertyId,
String hostNamePropertyId,
String componentNamePropertyId) {
super(componentPropertyInfoMap);
this.streamProvider = streamProvider;
this.hostProvider = hostProvider;
this.clusterNamePropertyId = clusterNamePropertyId;
this.hostNamePropertyId = hostNamePropertyId;
this.componentNamePropertyId = componentNamePropertyId;
}
// ----- PropertyProvider --------------------------------------------------
@Override
public Set<Resource> populateResources(Set<Resource> resources, Request request, Predicate predicate)
throws SystemException {
Set<String> ids = getRequestPropertyIds(request, predicate);
if (ids.isEmpty()) {
return resources;
}
Set<Resource> keepers = new HashSet<Resource>();
Map<String, Map<TemporalInfo, RRDRequest>> requestMap = getRRDRequests(resources, request, ids);
// For each cluster...
for (Map.Entry<String, Map<TemporalInfo, RRDRequest>> clusterEntry : requestMap.entrySet()) {
// For each request ...
for (RRDRequest rrdRequest : clusterEntry.getValue().values() ) {
//todo: property provider can reduce set of resources
keepers.addAll(rrdRequest.populateResources());
}
}
//todo: ignoring keepers returned by the provider
return resources;
}
// ----- GangliaPropertyProvider -------------------------------------------
/**
* Get the host name for the given resource.
*
* @param resource the resource
*
* @return the host name
*/
protected abstract String getHostName(Resource resource);
/**
* Get the component name for the given resource.
*
* @param resource the resource
*
* @return the component name
*/
protected abstract String getComponentName(Resource resource);
/**
* Get the ganglia cluster name for the given resource.
*
*
* @param resource the resource
*
* @return the ganglia cluster name
*/
protected abstract Set<String> getGangliaClusterNames(Resource resource, String clusterName);
/**
* Get the component name property id.
*
* @return the component name property id
*/
protected String getComponentNamePropertyId() {
return componentNamePropertyId;
}
/**
* Get the host name property id.
*
* @return the host name property id
*/
protected String getHostNamePropertyId() {
return hostNamePropertyId;
}
/**
* Get the stream provider.
*
* @return the stream provider
*/
public StreamProvider getStreamProvider() {
return streamProvider;
}
// ----- helper methods ----------------------------------------------------
/**
* Get the request objects containing all the information required to
* make single requests to the Ganglia rrd script.
* Requests are created per cluster name / temporal information but
* can span multiple resources and metrics.
*
* @param resources the resources being populated
* @param request the request
* @param ids the relevant property ids
*
* @return a map of maps of rrd requests keyed by cluster name / temporal info
*/
private Map<String, Map<TemporalInfo, RRDRequest>> getRRDRequests(Set<Resource> resources,
Request request,
Set<String> ids) {
Map<String, Map<TemporalInfo, RRDRequest>> requestMap =
new HashMap<String, Map<TemporalInfo, RRDRequest>>();
for (Resource resource : resources) {
String clusterName = (String) resource.getPropertyValue(clusterNamePropertyId);
Map<TemporalInfo, RRDRequest> requests = requestMap.get(clusterName);
if (requests == null) {
requests = new HashMap<TemporalInfo, RRDRequest>();
requestMap.put(clusterName, requests);
}
Set<String> gangliaClusterNames = getGangliaClusterNames(resource, clusterName);
for (String gangliaClusterName : gangliaClusterNames) {
ResourceKey key =
new ResourceKey(getHostName(resource), gangliaClusterName);
for (String id : ids) {
Map<String, PropertyInfo> propertyInfoMap = getPropertyInfoMap(getComponentName(resource), id);
for (Map.Entry<String, PropertyInfo> entry : propertyInfoMap.entrySet()) {
String propertyId = entry.getKey();
PropertyInfo propertyInfo = entry.getValue();
TemporalInfo temporalInfo = request.getTemporalInfo(id);
if ((temporalInfo == null && propertyInfo.isPointInTime()) || (temporalInfo != null && propertyInfo.isTemporal())) {
RRDRequest rrdRequest = requests.get(temporalInfo);
if (rrdRequest == null) {
rrdRequest = new RRDRequest(clusterName, temporalInfo);
requests.put(temporalInfo, rrdRequest);
}
rrdRequest.putResource(key, resource);
rrdRequest.putPropertyId(propertyInfo.getPropertyId(), propertyId);
}
}
}
}
}
return requestMap;
}
/**
* Get the spec to locate the Ganglia stream from the given
* request info.
*
* @param clusterName the cluster name
* @param clusterSet the set of ganglia cluster names
* @param hostSet the set of host names
* @param metricSet the set of metric names
* @param temporalInfo the temporal information
*
* @return the spec
*
* @throws SystemException if unable to get the Ganglia Collector host name
*/
private String getSpec(String clusterName,
Set<String> clusterSet,
Set<String> hostSet,
Set<String> metricSet,
TemporalInfo temporalInfo) throws SystemException {
String clusters = getSetString(clusterSet, -1);
String hosts = getSetString(hostSet, 100);
String metrics = getSetString(metricSet, 50);
StringBuilder sb = new StringBuilder();
sb.append("http://").
append(hostProvider.getGangliaCollectorHostName(clusterName)).
append("/cgi-bin/rrd.py?c=").
append(clusters);
if (hosts.length() > 0) {
sb.append("&h=").append(hosts);
}
if (metrics.length() > 0) {
sb.append("&m=").append(metrics);
}
if (temporalInfo != null) {
long startTime = temporalInfo.getStartTime();
if (startTime != -1) {
sb.append("&s=").append(startTime);
}
long endTime = temporalInfo.getEndTime();
if (endTime != -1) {
sb.append("&e=").append(endTime);
}
long step = temporalInfo.getStep();
if (step != -1) {
sb.append("&r=").append(step);
}
}
else {
sb.append("&e=now");
sb.append("&pt=true");
}
return sb.toString();
}
/**
* Get value from the given metric.
*
* @param metric the metric
* @param isTemporal indicates whether or not this a temporal metric
*
* @return a range of temporal data or a point in time value if not temporal
*/
private static Object getValue(GangliaMetric metric, boolean isTemporal) {
Number[][] dataPoints = metric.getDatapoints();
if (isTemporal) {
return dataPoints;
} else {
// return the value of the last data point
int length = dataPoints.length;
return length > 0 ? dataPoints[length - 1][0] : 0;
}
}
/**
* Get a comma delimited string from the given set of strings or
* an empty string if the size of the given set is greater than
* the given limit.
*
* @param set the set of strings
* @param limit the upper size limit for the list
*
* @return a comma delimited string of strings
*/
private static String getSetString(Set<String> set, int limit) {
StringBuilder sb = new StringBuilder();
if (limit == -1 || set.size() <= limit) {
for (String cluster : set) {
if (sb.length() > 0) {
sb.append(",");
}
sb.append(cluster);
}
}
return sb.toString();
}
// ----- inner classes -----------------------------------------------------
// ----- RRDRequest ----------------------------------------------------
/**
* The information required to make a single RRD request.
*/
private class RRDRequest {
private final String clusterName;
private final TemporalInfo temporalInfo;
private final Map<ResourceKey, Set<Resource>> resources = new HashMap<ResourceKey, Set<Resource>>();
private final Map<String, Set<String>> metrics = new HashMap<String, Set<String>>();
private final Set<String> clusterSet = new HashSet<String>();
private final Set<String> hostSet = new HashSet<String>();
private RRDRequest(String clusterName, TemporalInfo temporalInfo) {
this.clusterName = clusterName;
this.temporalInfo = temporalInfo;
}
public void putResource(ResourceKey key, Resource resource) {
clusterSet.add(key.getClusterName());
hostSet.add(key.getHostName());
Set<Resource> resourceSet = resources.get(key);
if (resourceSet == null) {
resourceSet = new HashSet<Resource>();
resources.put(key, resourceSet);
}
resourceSet.add(resource);
}
public void putPropertyId(String metric, String id) {
Set<String> propertyIds = metrics.get(metric);
if (propertyIds == null) {
propertyIds = new HashSet<String>();
metrics.put(metric, propertyIds);
}
propertyIds.add(id);
}
/**
* Populate the associated resources by making the rrd request.
*
* @return a collection of populated resources
*
* @throws SystemException if unable to populate the resources
*/
public Collection<Resource> populateResources() throws SystemException {
String spec = getSpec(clusterName, clusterSet, hostSet, metrics.keySet(), temporalInfo);
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(
getStreamProvider().readFrom(spec)));
int startTime = convertToNumber(reader.readLine()).intValue();
String dsName = reader.readLine();
while(! dsName.equals("[AMBARI_END]")) {
GangliaMetric metric = new GangliaMetric();
List<GangliaMetric.TemporalMetric> listTemporalMetrics =
new ArrayList<GangliaMetric.TemporalMetric>();
metric.setDs_name(dsName);
metric.setCluster_name(reader.readLine());
metric.setHost_name(reader.readLine());
metric.setMetric_name(reader.readLine());
int time = convertToNumber(reader.readLine()).intValue();
int step = convertToNumber(reader.readLine()).intValue();
String val = reader.readLine();
while(! val.equals("[AMBARI_DP_END]")) {
listTemporalMetrics.add(
new GangliaMetric.TemporalMetric(convertToNumber(val), time));
time += step;
val = reader.readLine();
}
//todo: change setter in GangliaMetric to take collection
Number[][] datapointsArray = new Number[listTemporalMetrics.size()][2];
for (int i = 0; i < listTemporalMetrics.size(); ++i) {
GangliaMetric.TemporalMetric m = listTemporalMetrics.get(i);
datapointsArray[i][0] = m.getValue();
datapointsArray[i][1] = m.getTime();
}
metric.setDatapoints(datapointsArray);
ResourceKey key = new ResourceKey(metric.getHost_name(), metric.getCluster_name());
Set<Resource> resourceSet = resources.get(key);
if (resourceSet != null) {
for (Resource resource : resourceSet) {
populateResource(resource, metric);
}
}
dsName = reader.readLine();
}
int endTime = convertToNumber(reader.readLine()).intValue();
if (LOG.isInfoEnabled()) {
LOG.info("Ganglia resource population time: " + (endTime - startTime));
}
} catch (IOException e) {
if (LOG.isErrorEnabled()) {
LOG.error("Caught exception getting Ganglia metrics : spec=" + spec, e);
}
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Unable to close http input steam : spec=" + spec, e);
}
}
}
}
//todo: filter out resources and return keepers
return Collections.emptySet();
}
/**
* Populate the given resource with the given Ganglia metric.
*
* @param resource the resource
* @param gangliaMetric the Ganglia metrics
*/
private void populateResource(Resource resource, GangliaMetric gangliaMetric) {
Set<String> propertyIdSet = metrics.get(gangliaMetric.getMetric_name());
if (propertyIdSet != null) {
Map<String, PropertyInfo> metricsMap = getComponentMetrics().get(getComponentName(resource));
if (metricsMap != null) {
for (String propertyId : propertyIdSet) {
if (propertyId != null) {
if (metricsMap.containsKey(propertyId)){
resource.setProperty(propertyId, getValue(gangliaMetric, temporalInfo != null));
}
}
}
}
}
}
private Number convertToNumber(String s) {
return s.contains(".") ? Double.parseDouble(s) : Long.parseLong(s);
}
}
// ----- ResourceKey ---------------------------------------------------
/**
* Key used to associate information from a Ganglia metric to a resource.
*/
private static class ResourceKey {
private final String hostName;
private final String gangliaClusterName;
private ResourceKey(String hostName, String gangliaClusterName) {
this.hostName = hostName;
this.gangliaClusterName = gangliaClusterName;
}
public String getHostName() {
return hostName;
}
public String getClusterName() {
return gangliaClusterName;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ResourceKey that = (ResourceKey) o;
return
!(gangliaClusterName != null ? !gangliaClusterName.equals(that.gangliaClusterName) : that.gangliaClusterName != null) &&
!(hostName != null ? !hostName.equals(that.hostName) : that.hostName != null);
}
@Override
public int hashCode() {
int result = hostName != null ? hostName.hashCode() : 0;
result = 31 * result + (gangliaClusterName != null ? gangliaClusterName.hashCode() : 0);
return result;
}
}
}