blob: 70f100f933165fa19b4cc9d3ff28f265aa7d1bb0 [file] [log] [blame]
#!/usr/bin/env python
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import httplib
import imp
import time
import urllib
from alerts.base_alert import BaseAlert
from alerts.metric_alert import MetricAlert
import ambari_simplejson as json
import logging
import re
import uuid
from resource_management.libraries.functions.get_port_from_url import get_port_from_url
logger = logging.getLogger()
AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics?%s"
class AmsAlert(MetricAlert):
"""
Allow alerts to fire based on an AMS metrics.
Alert is triggered if the aggregated function of the specified metric has
grown beyond the specified threshold within a given time interval.
"""
def __init__(self, alert_meta, alert_source_meta, config):
super(AmsAlert, self).__init__(alert_meta, alert_source_meta, config)
self.metric_info = None
if 'ams' in alert_source_meta:
self.metric_info = AmsMetric(alert_source_meta['ams'])
def _collect(self):
"""
Low level function to collect alert data. The result is a tuple as:
res[0] = the result code
res[1] = the list of arguments supplied to the reporting text for the result code
"""
if self.metric_info is None:
raise Exception("Could not determine result. Specific metric collector is not defined.")
if self.uri_property_keys is None:
raise Exception("Could not determine result. URL(s) were not defined.")
# use the URI lookup keys to get a final URI value to query
alert_uri = self._get_uri_from_structure(self.uri_property_keys)
logger.debug("[Alert][{0}] Calculated metric URI to be {1} (ssl={2})".format(
self.get_name(), alert_uri.uri, str(alert_uri.is_ssl_enabled)))
host = BaseAlert.get_host_from_url(alert_uri.uri)
if host is None:
host = self.host_name
try:
port = int(get_port_from_url(alert_uri.uri))
except:
port = 6188
collect_result = None
value_list = []
if isinstance(self.metric_info, AmsMetric):
raw_data_points, http_code = self._load_metric(alert_uri.is_ssl_enabled, host, port, self.metric_info)
if not raw_data_points and http_code not in [200, 307]:
collect_result = self.RESULT_UNKNOWN
value_list.append('HTTP {0} response (metrics unavailable)'.format(str(http_code)))
elif not raw_data_points and http_code in [200, 307]:
raise Exception("[Alert][{0}] Unable to extract JSON from HTTP response".format(self.get_name()))
else:
data_points = self.metric_info.calculate_value(raw_data_points)
compute_result = self.metric_info.calculate_compute(data_points)
value_list.append(compute_result)
collect_result = self._get_result(value_list[0] if compute_result is None else compute_result)
logger.debug("[Alert][{0}] Computed result = {1}".format(self.get_name(), str(value_list)))
return (collect_result, value_list)
def _load_metric(self, ssl, host, port, ams_metric):
""" creates a AmsMetric object that holds info about ams-based metrics """
if "0.0.0.0" in str(host):
host = self.host_name
current_time = int(time.time()) * 1000
interval = ams_metric.interval
get_metrics_parameters = {
"metricNames": ",".join(ams_metric.metric_list),
"appId": ams_metric.app_id,
"hostname": self.host_name,
"startTime": current_time - 60 * 1000 * interval,
"endTime": current_time,
"precision": "seconds",
"grouped": "true",
}
encoded_get_metrics_parameters = urllib.urlencode(get_metrics_parameters)
url = AMS_METRICS_GET_URL % encoded_get_metrics_parameters
try:
# TODO Implement HTTPS support
conn = httplib.HTTPConnection(host, port,
timeout=self.connection_timeout)
conn.request("GET", url)
response = conn.getresponse()
data = response.read()
except Exception, exception:
if logger.isEnabledFor(logging.DEBUG):
logger.exception("[Alert][{0}] Unable to retrieve metrics from AMS: {1}".format(self.get_name(), str(exception)))
status = response.status if 'response' in vars() else None
return (None, status)
finally:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("""
AMS request parameters - {0}
AMS response - {1}
""".format(encoded_get_metrics_parameters, data))
# explicitely close the connection as we've seen python hold onto these
if conn is not None:
try:
conn.close()
except:
logger.debug("[Alert][{0}] Unable to close URL connection to {1}".format(self.get_name(), url))
json_is_valid = True
try:
data_json = json.loads(data)
except Exception, exception:
json_is_valid = False
if logger.isEnabledFor(logging.DEBUG):
logger.exception("[Alert][{0}] Convert response to json failed or json doesn't contain needed data: {1}".
format(self.get_name(), str(exception)))
metrics = []
if json_is_valid:
metric_dict = {}
for metrics_data in data_json["metrics"]:
metric_dict[metrics_data["metricname"]] = metrics_data["metrics"]
for metric_name in self.metric_info.metric_list:
if metric_name in metric_dict:
# TODO sorted data points by timestamp
# OrderedDict was implemented in Python2.7
sorted_data_points = metric_dict[metric_name]
metrics.append(sorted_data_points)
pass
return (metrics, response.status)
class AmsMetric:
DYNAMIC_CODE_VALUE_TEMPLATE = """
# ensure that division yields a float, use // for integer division
from __future__ import division
def f(args):
l = []
for k in args[0]:
try:
data_point = {0}
l.append(data_point)
except:
continue
return l
"""
DYNAMIC_CODE_COMPUTE_TEMPLATE = """
# ensure that division yields a float, use // for integer division
from __future__ import division
from ambari_commons.aggregate_functions import sample_standard_deviation_percentage
from ambari_commons.aggregate_functions import sample_standard_deviation
from ambari_commons.aggregate_functions import mean
from ambari_commons.aggregate_functions import count
def f(args):
func = {0}
return func(args)
"""
def __init__(self, metric_info):
self.custom_value_module = None
self.custom_compute_module = None
self.metric_list = metric_info['metric_list']
self.interval = metric_info['interval'] # in minutes
self.app_id = metric_info['app_id']
self.minimum_value = metric_info['minimum_value']
if 'value' in metric_info:
realcode = re.sub('(\{(\d+)\})', 'args[\g<2>][k]', metric_info['value'])
self.custom_value_module = imp.new_module(str(uuid.uuid4()))
code = self.DYNAMIC_CODE_VALUE_TEMPLATE.format(realcode)
exec code in self.custom_value_module.__dict__
if 'compute' in metric_info:
realcode = metric_info['compute']
self.custom_compute_module = imp.new_module(str(uuid.uuid4()))
code = self.DYNAMIC_CODE_COMPUTE_TEMPLATE.format(realcode)
exec code in self.custom_compute_module.__dict__
def calculate_value(self, args):
data_points = None
if self.custom_value_module is not None:
data_points = self.custom_value_module.f(args)
if self.minimum_value:
data_points = [data_point for data_point in data_points if data_point > self.minimum_value]
return data_points
def calculate_compute(self, args):
compute_result = None
if self.custom_compute_module is not None:
compute_result = self.custom_compute_module.f(args)
return compute_result