blob: dd17c5ab77aee524e83e34d2aeef80b02fbb1df2 [file] [log] [blame]
#!/usr/bin/env python
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import logging
import threading
from collections import defaultdict
from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
from ambari_agent import Constants
logger = logging.getLogger(__name__)
class AlertStatusReporter(threading.Thread):
"""
Thread sends alert reports to server. The report is only sent if its 'text' or 'state' has changed.
This is done to reduce bandwidth usage on large clusters and number of operations (DB and others) done by server.
"""
FIELDS_CHANGED_RESEND_ALERT = ['text', 'state']
def __init__(self, initializer_module):
self.initializer_module = initializer_module
self.collector = initializer_module.alert_scheduler_handler.collector()
self.stop_event = initializer_module.stop_event
self.alert_reports_interval = initializer_module.config.alert_reports_interval
self.alert_definitions_cache = initializer_module.alert_definitions_cache
self.stale_alerts_monitor = initializer_module.stale_alerts_monitor
self.server_responses_listener = initializer_module.server_responses_listener
self.reported_alerts = defaultdict(lambda:defaultdict(lambda:[]))
self.send_alert_changes_only = initializer_module.config.send_alert_changes_only
threading.Thread.__init__(self)
def run(self):
"""
Run an endless loop which reports all the alert statuses got from collector
"""
if self.alert_reports_interval == 0:
logger.warn("AlertStatusReporter is turned off. Some functionality might not work correctly.")
return
while not self.stop_event.is_set():
try:
if self.initializer_module.is_registered:
self.clean_not_existing_clusters_info()
alerts = self.collector.alerts()
self.stale_alerts_monitor.save_executed_alerts(alerts)
alerts_to_send = self.get_changed_alerts(alerts) if self.send_alert_changes_only else alerts
if alerts_to_send and self.initializer_module.is_registered:
correlation_id = self.initializer_module.connection.send(message=alerts_to_send, destination=Constants.ALERTS_STATUS_REPORTS_ENDPOINT, log_message_function=AlertStatusReporter.log_sending)
self.server_responses_listener.listener_functions_on_success[correlation_id] = lambda headers, message: self.save_results(alerts_to_send)
except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue
pass
except:
logger.exception("Exception in AlertStatusReporter. Re-running it")
self.stop_event.wait(self.alert_reports_interval)
logger.info("AlertStatusReporter has successfully finished")
def save_results(self, alerts):
"""
Save alert reports which were synced to server
"""
for alert in alerts:
cluster_id = alert['clusterId']
alert_name = alert['name']
self.reported_alerts[cluster_id][alert_name] = [alert[field] for field in self.FIELDS_CHANGED_RESEND_ALERT]
def get_changed_alerts(self, alerts):
"""
Get alert reports, which changed since last successful report to server
"""
changed_alerts = []
for alert in alerts:
cluster_id = alert['clusterId']
alert_name = alert['name']
if [alert[field] for field in self.FIELDS_CHANGED_RESEND_ALERT] != self.reported_alerts[cluster_id][alert_name]:
changed_alerts.append(alert)
return changed_alerts
def clean_not_existing_clusters_info(self):
"""
This needs to be done to remove information about clusters which where deleted (e.g. ambari-server reset)
"""
for cluster_id in self.reported_alerts.keys():
if not cluster_id in self.alert_definitions_cache.get_cluster_ids():
del self.reported_alerts[cluster_id]
@staticmethod
def log_sending(message_dict):
"""
Returned dictionary will be used while logging sent alert status.
Used because full dict is too big for logs and should be shortened
"""
try:
for alert_status in message_dict:
if 'text' in alert_status:
alert_status['text'] = '...'
except KeyError:
pass
return message_dict