blob: 6c1d29c3af21ee6fcce692670f21e8cb40079424 [file] [log] [blame]
#!/usr/bin/env python
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
"""
http://apscheduler.readthedocs.org/en/v2.1.2
"""
import ambari_simplejson as json
import logging
import os
import sys
import time
from apscheduler.scheduler import Scheduler
from alerts.collector import AlertCollector
from alerts.metric_alert import MetricAlert
from alerts.ams_alert import AmsAlert
from alerts.port_alert import PortAlert
from alerts.script_alert import ScriptAlert
from alerts.web_alert import WebAlert
from alerts.recovery_alert import RecoveryAlert
from ambari_agent.ExitHelper import ExitHelper
logger = logging.getLogger(__name__)
class AlertSchedulerHandler():
FILENAME = 'definitions.json'
TYPE_PORT = 'PORT'
TYPE_METRIC = 'METRIC'
TYPE_AMS = 'AMS'
TYPE_SCRIPT = 'SCRIPT'
TYPE_WEB = 'WEB'
TYPE_RECOVERY = 'RECOVERY'
def __init__(self, cachedir, stacks_dir, common_services_dir, extensions_dir,
host_scripts_dir, cluster_configuration, config, recovery_manager,
in_minutes=True):
self.cachedir = cachedir
self.stacks_dir = stacks_dir
self.common_services_dir = common_services_dir
self.extensions_dir = extensions_dir
self.host_scripts_dir = host_scripts_dir
self._cluster_configuration = cluster_configuration
# a mapping between a cluster name and a unique hash for all definitions
self._cluster_hashes = {}
# the amount of time, in seconds, that an alert can run after it's scheduled time
alert_grace_period = int(config.get('agent', 'alert_grace_period', 5))
if not os.path.exists(cachedir):
try:
os.makedirs(cachedir)
except:
logger.critical("[AlertScheduler] Could not create the cache directory {0}".format(cachedir))
apscheduler_standalone = False
self.APS_CONFIG = {
'apscheduler.threadpool.core_threads': 3,
'apscheduler.coalesce': True,
'apscheduler.standalone': apscheduler_standalone,
'apscheduler.misfire_grace_time': alert_grace_period,
'apscheduler.threadpool.context_injector': self._job_context_injector if not apscheduler_standalone else None,
'apscheduler.threadpool.agent_config': config
}
self._collector = AlertCollector()
self.__scheduler = Scheduler(self.APS_CONFIG)
self.__in_minutes = in_minutes
self.config = config
self.recovery_manger = recovery_manager
# register python exit handler
ExitHelper().register(self.exit_handler)
def _job_context_injector(self, config):
"""
apscheduler hack to inject monkey-patching, context and configuration to all jobs inside scheduler in case if scheduler running
in embedded mode
Please note, this function called in job context thus all injects should be time-running optimized
:type config AmbariConfig.AmbariConfig
"""
if not config.use_system_proxy_setting():
from ambari_commons.network import reconfigure_urllib2_opener
reconfigure_urllib2_opener(ignore_system_proxy=True)
def exit_handler(self):
"""
Exit handler
"""
self.stop()
def update_definitions(self, heartbeat):
"""
Updates the persisted alert definitions JSON.
:param heartbeat:
:return:
"""
if 'alertDefinitionCommands' not in heartbeat:
logger.warning("There are no alert definition commands in the heartbeat; unable to update definitions")
return
# prune out things we don't want to store
alert_definitions = []
for command in heartbeat['alertDefinitionCommands']:
command_copy = command.copy()
# no need to store these since we always use the in-memory cached values
if 'configurations' in command_copy:
del command_copy['configurations']
alert_definitions.append(command_copy)
# write out the new definitions
with open(os.path.join(self.cachedir, self.FILENAME), 'w') as f:
json.dump(alert_definitions, f, indent=2)
# determine how to reschedule the jobs
reschedule_all = False
if "clusterName" in command_copy and command_copy["clusterName"] not in self._cluster_hashes:
reschedule_all = True
if reschedule_all is True:
# reschedule all jobs, creating new instances
self.reschedule_all()
else:
# reschedule only the jobs that have changed
self.reschedule()
def __make_function(self, alert_def):
return lambda: alert_def.collect()
def start(self):
""" loads definitions from file and starts the scheduler """
if self.__scheduler is None:
return
if self.__scheduler.running:
self.__scheduler.shutdown(wait=False)
self.__scheduler = Scheduler(self.APS_CONFIG)
alert_callables = self.__load_definitions()
# schedule each definition
for _callable in alert_callables:
self.schedule_definition(_callable)
logger.info("[AlertScheduler] Starting {0}; currently running: {1}".format(
str(self.__scheduler), str(self.__scheduler.running)))
self.__scheduler.start()
def stop(self):
if not self.__scheduler is None:
self.__scheduler.shutdown(wait=False)
self.__scheduler = Scheduler(self.APS_CONFIG)
logger.info("[AlertScheduler] Stopped the alert scheduler.")
def reschedule(self):
"""
Removes jobs that are scheduled where their UUID no longer is valid.
Schedules jobs where the definition UUID is not currently scheduled.
"""
jobs_scheduled = 0
jobs_removed = 0
definitions = self.__load_definitions()
scheduled_jobs = self.__scheduler.get_jobs()
# for every scheduled job, see if its UUID is still valid
for scheduled_job in scheduled_jobs:
uuid_valid = False
for definition in definitions:
definition_uuid = definition.get_uuid()
if scheduled_job.name == definition_uuid:
uuid_valid = True
break
# jobs without valid UUIDs should be unscheduled
if uuid_valid is False:
jobs_removed += 1
logger.info("[AlertScheduler] Unscheduling {0}".format(scheduled_job.name))
self._collector.remove_by_uuid(scheduled_job.name)
self.__scheduler.unschedule_job(scheduled_job)
# for every definition, determine if there is a scheduled job
for definition in definitions:
definition_scheduled = False
for scheduled_job in scheduled_jobs:
definition_uuid = definition.get_uuid()
if definition_uuid == scheduled_job.name:
definition_scheduled = True
break
# if no jobs are found with the definitions UUID, schedule it
if definition_scheduled is False:
jobs_scheduled += 1
self.schedule_definition(definition)
logger.info("[AlertScheduler] Reschedule Summary: {0} rescheduled, {1} unscheduled".format(
str(jobs_scheduled), str(jobs_removed)))
def reschedule_all(self):
"""
Removes jobs that are scheduled where their UUID no longer is valid.
Schedules jobs where the definition UUID is not currently scheduled.
"""
logger.info("[AlertScheduler] Rescheduling all jobs...")
jobs_scheduled = 0
jobs_removed = 0
definitions = self.__load_definitions()
scheduled_jobs = self.__scheduler.get_jobs()
# unschedule all scheduled jobs
for scheduled_job in scheduled_jobs:
jobs_removed += 1
logger.info("[AlertScheduler] Unscheduling {0}".format(scheduled_job.name))
self._collector.remove_by_uuid(scheduled_job.name)
self.__scheduler.unschedule_job(scheduled_job)
# for every definition, schedule a job
for definition in definitions:
jobs_scheduled += 1
self.schedule_definition(definition)
logger.info("[AlertScheduler] Reschedule Summary: {0} unscheduled, {0} rescheduled".format(
str(jobs_removed), str(jobs_scheduled)))
def collector(self):
""" gets the collector for reporting to the server """
return self._collector
def __load_definitions(self):
"""
Loads all alert definitions from a file. All clusters are stored in
a single file. This wil also populate the cluster-to-hash dictionary.
:return:
"""
definitions = []
alerts_definitions_path = os.path.join(self.cachedir, self.FILENAME)
try:
with open(alerts_definitions_path) as fp:
all_commands = json.load(fp)
except:
logger.warning('[AlertScheduler] {0} not found or invalid. No alerts will be scheduled until registration occurs.'.format(alerts_definitions_path))
return definitions
for command_json in all_commands:
clusterName = '' if not 'clusterName' in command_json else command_json['clusterName']
hostName = '' if not 'hostName' in command_json else command_json['hostName']
clusterHash = None if not 'hash' in command_json else command_json['hash']
# cache the cluster and cluster hash after loading the JSON
if clusterName != '' and clusterHash is not None:
logger.info('[AlertScheduler] Caching cluster {0} with alert hash {1}'.format(clusterName, clusterHash))
self._cluster_hashes[clusterName] = clusterHash
for definition in command_json['alertDefinitions']:
alert = self.__json_to_callable(clusterName, hostName, definition)
if alert is None:
continue
alert.set_helpers(self._collector, self._cluster_configuration)
definitions.append(alert)
return definitions
def __json_to_callable(self, clusterName, hostName, json_definition):
"""
converts the json that represents all aspects of a definition
and makes an object that extends BaseAlert that is used for individual
"""
alert = None
try:
source = json_definition['source']
source_type = source.get('type', '')
if logger.isEnabledFor(logging.DEBUG):
logger.debug("[AlertScheduler] Creating job type {0} with {1}".format(source_type, str(json_definition)))
if source_type == AlertSchedulerHandler.TYPE_METRIC:
alert = MetricAlert(json_definition, source, self.config)
elif source_type == AlertSchedulerHandler.TYPE_AMS:
alert = AmsAlert(json_definition, source, self.config)
elif source_type == AlertSchedulerHandler.TYPE_PORT:
alert = PortAlert(json_definition, source, self.config)
elif source_type == AlertSchedulerHandler.TYPE_SCRIPT:
source['stacks_directory'] = self.stacks_dir
source['common_services_directory'] = self.common_services_dir
source['extensions_directory'] = self.extensions_dir
source['host_scripts_directory'] = self.host_scripts_dir
alert = ScriptAlert(json_definition, source, self.config)
elif source_type == AlertSchedulerHandler.TYPE_WEB:
alert = WebAlert(json_definition, source, self.config)
elif source_type == AlertSchedulerHandler.TYPE_RECOVERY:
alert = RecoveryAlert(json_definition, source, self.config, self.recovery_manger)
if alert is not None:
alert.set_cluster(clusterName, hostName)
except Exception, exception:
logger.exception("[AlertScheduler] Unable to load an invalid alert definition. It will be skipped.")
return alert
def schedule_definition(self,definition):
"""
Schedule a definition (callable). Scheduled jobs are given the UUID
as their name so that they can be identified later on.
<p/>
This function can be called with a definition that is disabled; it will
simply NOOP.
"""
# NOOP if the definition is disabled; don't schedule it
if not definition.is_enabled():
logger.info("[AlertScheduler] The alert {0} with UUID {1} is disabled and will not be scheduled".format(
definition.get_name(),definition.get_uuid()))
return
job = None
if self.__in_minutes:
job = self.__scheduler.add_interval_job(self.__make_function(definition),
minutes=definition.interval())
else:
job = self.__scheduler.add_interval_job(self.__make_function(definition),
seconds=definition.interval())
# although the documentation states that Job(kwargs) takes a name
# key/value pair, it does not actually set the name; do it manually
if job is not None:
job.name = definition.get_uuid()
logger.info("[AlertScheduler] Scheduling {0} with UUID {1}".format(
definition.get_name(), definition.get_uuid()))
def get_job_count(self):
"""
Gets the number of jobs currently scheduled. This is mainly used for
test verification of scheduling.
"""
if self.__scheduler is None:
return 0
return len(self.__scheduler.get_jobs())
def execute_alert(self, execution_commands):
"""
Executes an alert immediately, ignoring any scheduled jobs. The existing
jobs remain untouched. The result of this is stored in the alert
collector for tranmission during the next heartbeat
"""
if self.__scheduler is None or execution_commands is None:
return
for execution_command in execution_commands:
try:
alert_definition = execution_command['alertDefinition']
clusterName = '' if not 'clusterName' in execution_command else execution_command['clusterName']
hostName = '' if not 'hostName' in execution_command else execution_command['hostName']
alert = self.__json_to_callable(clusterName, hostName, alert_definition)
if alert is None:
continue
logger.info("[AlertScheduler] Executing on-demand alert {0} ({1})".format(alert.get_name(),
alert.get_uuid()))
alert.set_helpers(self._collector, self._cluster_configuration)
alert.collect()
except:
logger.exception("[AlertScheduler] Unable to execute the alert outside of the job scheduler")
def main():
args = list(sys.argv)
del args[0]
ash = AlertSchedulerHandler(args[0], args[1], args[2], False)
ash.start()
i = 0
try:
while i < 10:
time.sleep(1)
i += 1
except KeyboardInterrupt:
pass
print str(ash.collector().alerts())
ash.stop()
if __name__ == "__main__":
main()