blob: 59cdd27b1cf8b4787046cb76f948a7aa76306179 [file] [log] [blame]
#!/usr/bin/env python
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import threading
import subprocess
import logging
import urllib2
logger = logging.getLogger()
class Aggregator(threading.Thread):
def __init__(self, config, stop_handler):
threading.Thread.__init__(self)
self._config = config
self._stop_handler = stop_handler
self._aggregator_process = None
self._sleep_interval = config.get_collector_sleep_interval()
self.stopped = False
def run(self):
java_home = self._config.get_java_home()
collector_hosts = self._config.get_metrics_collector_hosts_as_string()
jvm_agrs = self._config.get_aggregator_jvm_agrs()
config_dir = self._config.get_config_dir()
class_name = "org.apache.hadoop.metrics2.host.aggregator.AggregatorApplication"
ams_log_file = "ambari-metrics-aggregator.log"
additional_classpath = ':{0}'.format(config_dir)
ams_log_dir = self._config.ams_monitor_log_dir()
hostname = self._config.get_hostname_config()
logger.info('Starting Aggregator thread.')
cmd = "{0}/bin/java {1} -Dams.log.dir={2} -Dams.log.file={3} -cp /var/lib/ambari-metrics-monitor/lib/*{4} {5} {6} {7}"\
.format(java_home, jvm_agrs, ams_log_dir, ams_log_file, additional_classpath, class_name, hostname, collector_hosts)
logger.info("Executing : {0}".format(cmd))
self._aggregator_process = subprocess.Popen([cmd], stdout = None, stderr = None, shell = True)
while not self.stopped:
if 0 == self._stop_handler.wait(self._sleep_interval):
break
pass
self.stop()
def stop(self):
self.stopped = True
if self._aggregator_process :
logger.info('Shutting down Aggregator thread.')
self._aggregator_process.terminate()
self._aggregator_process = None
class AggregatorWatchdog(threading.Thread):
SLEEP_TIME = 30
CONNECTION_TIMEOUT = 5
AMS_AGGREGATOR_METRICS_CHECK_URL = "/ws/v1/timeline/metrics/"
def __init__(self, config, stop_handler):
threading.Thread.__init__(self)
self._config = config
self._stop_handler = stop_handler
self.URL = self._config.get_inmemory_aggregation_protocol() + '://localhost:' + self._config.get_inmemory_aggregation_port() + self.AMS_AGGREGATOR_METRICS_CHECK_URL
self._is_ok = threading.Event()
self.set_is_ok(True)
self.stopped = False
def run(self):
logger.info('Starting Aggregator Watchdog thread.')
while not self.stopped:
if 0 == self._stop_handler.wait(self.SLEEP_TIME):
break
try:
conn = urllib2.urlopen(self.URL, timeout=self.CONNECTION_TIMEOUT)
self.set_is_ok(True)
except (KeyboardInterrupt, SystemExit):
raise
except Exception, e:
self.set_is_ok(False)
continue
if conn.code != 200:
self.set_is_ok(False)
continue
conn.close()
def is_ok(self):
return self._is_ok.is_set()
def set_is_ok(self, value):
if value == False and self.is_ok() != value:
logger.warning("Watcher couldn't connect to aggregator.")
self._is_ok.clear()
else:
self._is_ok.set()
def stop(self):
logger.info('Shutting down watcher thread.')
self.stopped = True