#!/usr/bin/env python
import httplib
import imp
import time
import urllib
from alerts.base_alert import BaseAlert
from alerts.metric_alert import MetricAlert
import ambari_simplejson as json
import logging
import re
import uuid
from resource_management.libraries.functions.get_port_from_url import get_port_from_url
logger = logging.getLogger()
AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics?%s"
class AmsAlert(MetricAlert):
Allow alerts to fire based on an AMS metrics.
Alert is triggered if the aggregated function of the specified metric has
grown beyond the specified threshold within a given time interval.
def __init__(self, alert_meta, alert_source_meta, config):
super(AmsAlert, self).__init__(alert_meta, alert_source_meta, config)
self.metric_info = None
if 'ams' in alert_source_meta:
self.metric_info = AmsMetric(alert_source_meta['ams'])
def _collect(self):
Low level function to collect alert data. The result is a tuple as:
res[0] = the result code
res[1] = the list of arguments supplied to the reporting text for the result code
if self.metric_info is None:
raise Exception("Could not determine result. Specific metric collector is not defined.")
if self.uri_property_keys is None:
raise Exception("Could not determine result. URL(s) were not defined.")
# use the URI lookup keys to get a final URI value to query
alert_uri = self._get_uri_from_structure(self.uri_property_keys)
logger.debug("[Alert][{0}] Calculated metric URI to be {1} (ssl={2})".format(
self.get_name(), alert_uri.uri, str(alert_uri.is_ssl_enabled)))
host = BaseAlert.get_host_from_url(alert_uri.uri)
if host is None:
host = self.host_name
port = int(get_port_from_url(alert_uri.uri))
port = 6188
collect_result = None
value_list = []
if isinstance(self.metric_info, AmsMetric):
raw_data_points, http_code = self._load_metric(alert_uri.is_ssl_enabled, host, port, self.metric_info)
if not raw_data_points and http_code not in [200, 307]:
collect_result = self.RESULT_UNKNOWN
value_list.append('HTTP {0} response (metrics unavailable)'.format(str(http_code)))
elif not raw_data_points and http_code in [200, 307]:
raise Exception("[Alert][{0}] Unable to extract JSON from HTTP response".format(self.get_name()))
data_points = self.metric_info.calculate_value(raw_data_points)
compute_result = self.metric_info.calculate_compute(data_points)
collect_result = self._get_result(value_list[0] if compute_result is None else compute_result)
logger.debug("[Alert][{0}] Computed result = {1}".format(self.get_name(), str(value_list)))
return (collect_result, value_list)
def _load_metric(self, ssl, host, port, ams_metric):
""" creates a AmsMetric object that holds info about ams-based metrics """
if "" in str(host):
host = self.host_name
current_time = int(time.time()) * 1000
interval = ams_metric.interval
get_metrics_parameters = {
"metricNames": ",".join(ams_metric.metric_list),
"appId": ams_metric.app_id,
"hostname": self.host_name,
"startTime": current_time - 60 * 1000 * interval,
"endTime": current_time,
"precision": "seconds",
"grouped": "true",
encoded_get_metrics_parameters = urllib.urlencode(get_metrics_parameters)
url = AMS_METRICS_GET_URL % encoded_get_metrics_parameters
# TODO Implement HTTPS support
conn = httplib.HTTPConnection(host, port,
conn.request("GET", url)
response = conn.getresponse()
data =
except Exception, exception:
if logger.isEnabledFor(logging.DEBUG):
logger.exception("[Alert][{0}] Unable to retrieve metrics from AMS: {1}".format(self.get_name(), str(exception)))
status = response.status if 'response' in vars() else None
return (None, status)
if logger.isEnabledFor(logging.DEBUG):
AMS request parameters - {0}
AMS response - {1}
""".format(encoded_get_metrics_parameters, data))
# explicitely close the connection as we've seen python hold onto these
if conn is not None:
logger.debug("[Alert][{0}] Unable to close URL connection to {1}".format(self.get_name(), url))
json_is_valid = True
data_json = json.loads(data)
except Exception, exception:
json_is_valid = False
if logger.isEnabledFor(logging.DEBUG):
logger.exception("[Alert][{0}] Convert response to json failed or json doesn't contain needed data: {1}".
format(self.get_name(), str(exception)))
metrics = []
if json_is_valid:
metric_dict = {}
for metrics_data in data_json["metrics"]:
metric_dict[metrics_data["metricname"]] = metrics_data["metrics"]
for metric_name in self.metric_info.metric_list:
if metric_name in metric_dict:
# TODO sorted data points by timestamp
# OrderedDict was implemented in Python2.7
sorted_data_points = metric_dict[metric_name]
return (metrics, response.status)
class AmsMetric:
# ensure that division yields a float, use // for integer division
from __future__ import division
def f(args):
l = []
for k in args[0]:
data_point = {0}
return l
# ensure that division yields a float, use // for integer division
from __future__ import division
from ambari_commons.aggregate_functions import sample_standard_deviation_percentage
from ambari_commons.aggregate_functions import sample_standard_deviation
from ambari_commons.aggregate_functions import mean
from ambari_commons.aggregate_functions import count
def f(args):
func = {0}
return func(args)
def __init__(self, metric_info):
self.custom_value_module = None
self.custom_compute_module = None
self.metric_list = metric_info['metric_list']
self.interval = metric_info['interval'] # in minutes
self.app_id = metric_info['app_id']
self.minimum_value = metric_info['minimum_value']
if 'value' in metric_info:
realcode = re.sub('(\{(\d+)\})', 'args[\g<2>][k]', metric_info['value'])
self.custom_value_module = imp.new_module(str(uuid.uuid4()))
code = self.DYNAMIC_CODE_VALUE_TEMPLATE.format(realcode)
exec code in self.custom_value_module.__dict__
if 'compute' in metric_info:
realcode = metric_info['compute']
self.custom_compute_module = imp.new_module(str(uuid.uuid4()))
code = self.DYNAMIC_CODE_COMPUTE_TEMPLATE.format(realcode)
exec code in self.custom_compute_module.__dict__
def calculate_value(self, args):
data_points = None
if self.custom_value_module is not None:
data_points = self.custom_value_module.f(args)
if self.minimum_value:
data_points = [data_point for data_point in data_points if data_point > self.minimum_value]
return data_points
def calculate_compute(self, args):
compute_result = None
if self.custom_compute_module is not None:
compute_result = self.custom_compute_module.f(args)
return compute_result