blob: efd998a5dd917e1cb68ae7e0337c58f929951f00 [file] [log] [blame]
# !/usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import re
import time
import json
import urllib2
import sys
import socket
import httplib
import logging
import threading
import fnmatch
import os
import multiprocessing
# load six
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/six'))
import six
# load kafka-python
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/kafka-python'))
from kafka import KafkaClient, SimpleProducer, SimpleConsumer
class Helper:
def __init__(self):
pass
@staticmethod
def load_config(config_file="config.json"):
"""
:param config_file:
:return:
"""
abs_file_path = config_file
if not os.path.isfile(abs_file_path):
script_dir = os.path.dirname(__file__)
rel_path = "./" + config_file
abs_file_path = os.path.join(script_dir, rel_path)
if not os.path.isfile(abs_file_path):
raise Exception(abs_file_path + " doesn't exist, please rename config-sample.json to config.json")
f = open(abs_file_path, 'r')
json_file = f.read()
f.close()
config = json.loads(json_file)
if config["env"].has_key("log_file"):
logging.basicConfig(filename=config["env"]["log_file"], filemode='w',level=logging.INFO,
format='%(asctime)s %(name)s %(threadName)s %(levelname)s %(message)s',
datefmt='%m-%d %H:%M')
else:
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(name)s %(threadName)s %(levelname)s %(message)s',
datefmt='%m-%d %H:%M')
logging.info("Loaded config from %s", abs_file_path)
return config
@staticmethod
def is_number(str):
"""
:param str:
:return:
"""
try:
if str == None or isinstance(str, (bool)):
return False
float(str)
return True
except:
return False
@staticmethod
def http_get(host, port, https=False, path=None):
"""
Read url by GET method
:param path:
:param url:
:param https:
:return:
"""
url = ":".join([host, str(port)])
result = None
response = None
attempts = 0
exception = None
while attempts < 2:
try:
if https:
logging.info("Reading https://" + str(url) + path)
c = httplib.HTTPSConnection(url, timeout=60)
c.request("GET", path)
response = c.getresponse()
else:
logging.info("Reading http://" + str(url) + path)
response = urllib2.urlopen("http://" + str(url) + path, timeout=60)
logging.debug("Got response")
result = response.read()
break
except Exception as e:
logging.warning(e)
exception = e
attempts += 1
try:
if attempts >= 2:
raise exception
finally:
if response is not None:
response.close()
return result
class JmxReader(object):
def __init__(self, host, port, https=False):
self.host = host
self.port = port
self.https = https
self.jmx_json = None
self.jmx_beans = None
self.jmx_raw = None
def open(self):
"""
:return: JmxReader
"""
self.read_raw()
self.set_raw(self.jmx_raw)
return self
def read_raw(self):
"""
transfer the json string into dict
:param host:
:param port:
:param https:
:return: text
"""
self.jmx_raw = Helper.http_get(self.host, self.port, self.https, "/jmx?anonymous=true")
if self.jmx_raw is None:
raise Exception("Response from " + url + " is None")
return self
def read_query(self, qry):
self.jmx_raw = Helper.http_get(self.host, self.port, self.https, qry)
if self.jmx_raw is None:
raise Exception("Response from " + url + " is None")
self.set_raw(self.jmx_raw)
return self
def set_raw(self, text):
self.jmx_json = json.loads(text)
self.jmx_beans = self.jmx_json[u'beans']
self.jmx_raw = text
return self
def get_jmx_beans(self):
return self.jmx_beans
def get_jmx_bean_by_name(self, name):
for bean in self.jmx_beans:
if bean.has_key("name") and bean["name"] == name:
return bean
class YarnWSReader:
def __init__(self, host, port, https=False):
self.host = host
self.port = port
self.https = https
def read_cluster_info(self):
cluster_info = Helper.http_get(self.host, self.port, self.https, "/ws/v1/cluster/info?anonymous=true")
logging.debug(cluster_info)
return json.loads(cluster_info)
class MetricSender(object):
def __init__(self, config):
pass
def open(self):
pass
def send(self, msg, converter = None):
raise Exception("should be overrode")
def close(self):
pass
class KafkaMetricSender(MetricSender):
start_time = time.time()
end_time = time.time()
def __init__(self, config):
super(KafkaMetricSender, self).__init__(config)
kafka_config = config["output"]["kafka"]
# default topic
self.default_topic = None
if kafka_config.has_key("default_topic"):
self.default_topic = kafka_config["default_topic"].encode('utf-8')
self.component_topic_mapping = {}
if kafka_config.has_key("component_topic_mapping"):
self.component_topic_mapping = kafka_config["component_topic_mapping"]
self.metric_topic_mapping = {}
if kafka_config.has_key("metric_topic_mapping"):
self.metric_topic_mapping = kafka_config["metric_topic_mapping"]
if not self.default_topic and not bool(self.component_topic_mapping):
raise Exception("both kafka config 'topic' and 'component_topic_mapping' are empty")
# producer
self.broker_list = kafka_config["broker_list"]
self.kafka_client = None
self.kafka_producer = None
self.debug_enabled = False
self.sent_count = 0
if kafka_config.has_key("debug"):
self.debug_enabled = bool(kafka_config["debug"])
logging.info("Overrode output.kafka.debug: " + str(self.debug_enabled))
def get_topic_id(self, msg):
if msg.has_key("metric"):
metric = msg["metric"]
if self.metric_topic_mapping.has_key(metric):
return self.metric_topic_mapping[metric]
if msg.has_key("component"):
component = msg["component"]
if self.component_topic_mapping.has_key(component):
return self.component_topic_mapping[component]
else:
return self.default_topic
else:
if not self.default_topic:
raise Exception("no default topic found for unknown-component msg: " + str(msg))
return self.default_topic
def open(self):
logging.info("Opening kafka connection for producer")
self.kafka_client = KafkaClient(self.broker_list, timeout=50)
self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=False, batch_send_every_n=500,
batch_send_every_t=30)
self.start_time = time.time()
def send(self, msg, converter = None):
if self.debug_enabled:
logging.info("Send message: " + str(msg))
self.sent_count += 1
topic = self.get_topic_id(msg)
if converter is not None:
converter.convert_metric(msg)
self.kafka_producer.send_messages(topic, json.dumps(msg))
def close(self):
logging.info("Closing kafka connection and producer")
if self.kafka_producer is not None:
self.kafka_producer.stop()
if self.kafka_client is not None:
self.kafka_client.close()
self.end_time = time.time()
logging.info("Totally sent " + str(self.sent_count) + " metric events in "+str(self.end_time - self.start_time)+" sec")
class MetricCollector(threading.Thread):
filters = []
config = None
closed = False
def __init__(self, config=None):
threading.Thread.__init__(self)
self.config = None
self.sender = None
self.fqdn = socket.getfqdn()
def init(self, config):
self.config = config
self.sender = KafkaMetricSender(self.config)
self.sender.open()
self.filter(MetricNameFilter())
for filter in self.filters:
filter.init(self.config)
def filter(self, *filters):
"""
:param filters: MetricFilters to register
:return: None
"""
logging.debug("Register filters: " + str(filters))
for filter in filters:
self.filters.append(filter)
def start(self):
super(MetricCollector, self).start()
def collect(self, msg, type = 'float', converter = None):
if not msg.has_key("timestamp"):
msg["timestamp"] = int(round(time.time() * 1000))
if msg.has_key("value") and type == 'float':
msg["value"] = float(str(msg["value"]))
elif msg.has_key("value") and type == 'string':
msg["value"] = str(msg["value"])
if not msg.has_key("host") or len(msg["host"]) == 0:
raise Exception("host is null: " + str(msg))
if not msg.has_key("site"):
msg["site"] = self.config["env"]["site"]
if len(self.filters) == 0:
self.sender.send(msg, converter)
return
else:
for filter in self.filters:
if filter.filter_metric(msg):
self.sender.send(msg, converter)
return
if self.sender.debug_enabled:
logging.info("Drop metric: " + str(msg))
def close(self):
self.sender.close()
self.closed = True
def is_closed(self):
return self.closed
def run(self):
raise Exception("`run` method should be overrode by sub-class before being called")
class Runner(object):
@staticmethod
def worker(collectors, config):
"""
Execute concurrently
:param threads:
:return:
"""
try:
for collector in collectors:
try:
collector.init(config)
collector.start()
except Exception as e:
logging.exception(e)
for collector in collectors:
collector.join(timeout=55)
collector.close()
except BaseException as e:
if not isinstance(e, SystemExit):
logging.exception(e)
finally:
for collector in collectors:
if not collector.is_closed():
collector.close()
@staticmethod
def run_async(*collectors):
config = None
argv = sys.argv
if len(argv) == 1:
config = Helper.load_config()
elif len(argv) == 2:
config = Helper.load_config(argv[1])
else:
raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv))
current_process = multiprocessing.current_process()
sub_process = multiprocessing.Process(target=Runner.worker, args=[collectors,config])
sub_process.daemon = False
sub_process.name = "CollectorSubprocess"
try:
logging.info("Starting %s", sub_process)
sub_process.start()
logging.info("Current PID: %s, subprocess PID: %s", current_process.pid, sub_process.pid)
sub_process.join(timeout = 56)
except BaseException as e:
logging.exception(e)
finally:
if sub_process.is_alive():
logging.info("%s is still alive, terminating", sub_process)
sub_process.terminate()
logging.info("%s exit code: %s", sub_process, sub_process.exitcode)
exit(0)
@staticmethod
def run(*collectors):
config = None
argv = sys.argv
current_process=multiprocessing.current_process()
if len(argv) == 1:
config = Helper.load_config()
elif len(argv) == 2:
config = Helper.load_config(argv[1])
else:
raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv))
try:
Runner.worker(collectors, config)
except BaseException as e:
logging.exception(e)
finally:
logging.info("%s (PID: %s) exit", current_process.name, current_process.pid)
exit(0)
class JmxMetricCollector(MetricCollector):
selected_domain = None
listeners = []
input_components = []
metric_prefix = "hadoop."
def init(self, config):
super(JmxMetricCollector, self).init(config)
self.input_components = config["input"]
for input in self.input_components:
if not input.has_key("host"):
input["host"] = self.fqdn
if not input.has_key("component"):
raise Exception("component not defined in " + str(input))
if not input.has_key("port"):
raise Exception("port not defined in " + str(input))
if not input.has_key("https"):
input["https"] = False
self.selected_domain = [s.encode('utf-8') for s in config[u'filter'].get('bean_group_filter')]
if config["env"].has_key("metric_prefix"):
self.metric_prefix = config["env"]["metric_prefix"]
logging.info("Override env.metric_prefix: " + self.metric_prefix + ", default: hadoop.")
def register(self, *listeners):
"""
:param listeners: type of HadoopJmxListener
:return:
"""
for listener in listeners:
listener.init(self)
self.listeners.append(listener)
def jmx_reader(self, source):
host = source["host"]
if source.has_key("source_host"):
host=source["source_host"]
port=source["port"]
https=source["https"]
protocol = "https" if https else "http"
try:
beans = JmxReader(host, port, https).open().get_jmx_beans()
self.on_beans(source, beans)
except Exception as e:
jmx_url = protocol+"://"+str(host) + ":" + str(port)
logging.error("Failed to read jmx for " + jmx_url)
logging.exception(e)
def run(self):
size=str(len(self.input_components))
logging.info("Starting jmx reading threads (num: " + size + ")")
reader_threads = []
for source in self.input_components:
reader_thread=threading.Thread(target=self.jmx_reader, args=[source])
reader_thread.daemon = False
logging.info(reader_thread.name + " starting")
reader_thread.start()
reader_threads.append(reader_thread)
for reader_thread in reader_threads:
logging.info(reader_thread.name + " stopping")
reader_thread.join(timeout = 45)
logging.info("Jmx reading threads (num: "+size+") finished")
def filter_bean(self, bean, mbean_domain):
return mbean_domain in self.selected_domain
def on_beans(self, source, beans):
for bean in beans:
self.on_bean(source, bean)
def on_bean(self, source, bean):
# mbean is of the form "domain:key=value,...,foo=bar"
mbean = bean[u'name']
mbean_domain, mbean_attribute = mbean.rstrip().split(":", 1)
mbean_domain = mbean_domain.lower()
if not self.filter_bean(bean, mbean_domain):
return
context = bean.get("tag.Context", "")
metric_prefix_name = self.__build_metric_prefix(mbean_attribute, context)
for key, value in bean.iteritems():
self.on_bean_kv(metric_prefix_name, source, key, value)
for listener in self.listeners:
try:
listener.on_bean(source, bean.copy())
except Exception as e:
logging.error("Failed to parse bean: " + bean["name"])
logging.exception(e)
def on_bean_kv(self, prefix, source, key, value):
# Skip Tags
if re.match(r'tag.*', key):
return
metric_name = (prefix + '.' + key).lower()
self.on_metric({
"component": source["component"],
"host": source["host"],
"metric": metric_name,
"value": value
})
def on_metric(self, metric):
if Helper.is_number(metric["value"]):
self.collect(metric)
elif isinstance(metric["value"], dict):
for key, value in metric["value"].iteritems():
self.on_bean_kv(metric["metric"], metric, key, value)
for listener in self.listeners:
listener.on_metric(metric.copy())
def __build_metric_prefix(self, mbean_attribute, context):
mbean_list = list(prop.split("=", 1) for prop in mbean_attribute.split(","))
if context == "":
metric_prefix_name = '.'.join([i[1] for i in mbean_list])
else:
name_index = [i[0] for i in mbean_list].index('name')
mbean_list[name_index][1] = context
metric_prefix_name = '.'.join([i[1] for i in mbean_list])
return (self.metric_prefix + metric_prefix_name).replace(" ", "").lower()
def close(self):
super(JmxMetricCollector, self).close()
# ========================
# Metric Listeners
# ========================
class JmxMetricListener:
def init(self, collector):
self.collector = collector
self.metric_prefix = self.collector.metric_prefix
def on_bean(self, component, bean):
pass
def on_metric(self, metric):
pass
# ========================
# Metric Filters
# ========================
class MetricFilter:
def init(self, config={}):
raise Exception("init() method is called before being overrode in sub-class")
pass
def filter_metric(self, metric):
"""
Filter metric to keep by return True, otherwise throw metric by returning False.
"""
return True
class MetricNameFilter(MetricFilter):
metric_name_filter = []
def init(self, config={}):
if config.has_key("filter") and config["filter"].has_key("metric_name_filter"):
self.metric_name_filter = config["filter"]["metric_name_filter"]
logging.debug("Override filter.metric_name_filter: " + str(self.metric_name_filter))
def filter_metric(self, metric):
if len(self.metric_name_filter) == 0:
return True
else:
for name_filter in self.metric_name_filter:
# multiple threads bug exists in fnmatch
#if fnmatch.fnmatch(metric["metric"], name_filter):
if re.match(name_filter, metric['metric']):
return True
return False
# ========================
# Metric Converter
# ========================
class MetricConverter:
def convert_metric(self, metric):
"""
Convert metric
"""
return True
class MetricNameConverter(MetricConverter):
def convert_metric(self, metric):
metric["resource"] = metric["metric"]
del metric["metric"]
return True