blob: 5d20495c0bbe144c470f68d5bacc23dd519a8c45 [file] [log] [blame]
#!/usr/bin/env python
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import logging
import threading
from ambari_agent import Constants
from ambari_agent.LiveStatus import LiveStatus
from collections import defaultdict
from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
logger = logging.getLogger(__name__)
class ComponentStatusExecutor(threading.Thread):
def __init__(self, initializer_module):
self.initializer_module = initializer_module
self.status_commands_run_interval = initializer_module.config.status_commands_run_interval
self.metadata_cache = initializer_module.metadata_cache
self.topology_cache = initializer_module.topology_cache
self.customServiceOrchestrator = initializer_module.customServiceOrchestrator
self.stop_event = initializer_module.stop_event
self.recovery_manager = initializer_module.recovery_manager
self.reported_component_status = defaultdict(lambda:defaultdict(lambda:defaultdict(lambda:None))) # component statuses which were received by server
self.server_responses_listener = initializer_module.server_responses_listener
threading.Thread.__init__(self)
def run(self):
"""
Run an endless loop which executes all status commands every 'status_commands_run_interval' seconds.
"""
if self.status_commands_run_interval == 0:
logger.warn("ComponentStatusExecutor is turned off. Some functionality might not work correctly.")
return
while not self.stop_event.is_set():
try:
self.clean_not_existing_clusters_info()
cluster_reports = defaultdict(lambda:[])
for cluster_id in self.topology_cache.get_cluster_ids():
# TODO: check if we can make clusters immutable too
try:
topology_cache = self.topology_cache[cluster_id]
metadata_cache = self.metadata_cache[cluster_id]
except KeyError:
# multithreading: if cluster was deleted during iteration
continue
if not 'status_commands_to_run' in metadata_cache:
continue
status_commands_to_run = metadata_cache.status_commands_to_run
if not 'components' in topology_cache:
continue
current_host_id = self.topology_cache.get_current_host_id(cluster_id)
if current_host_id is None:
continue
cluster_components = topology_cache.components
for component_dict in cluster_components:
for command_name in status_commands_to_run:
if self.stop_event.is_set():
break
# cluster was already removed
if not cluster_id in self.topology_cache.get_cluster_ids():
break
# check if component is installed on current host
if not current_host_id in component_dict.hostIds:
break
service_name = component_dict.serviceName
component_name = component_dict.componentName
# do not run status commands for the component which is starting/stopping or doing other action
if self.customServiceOrchestrator.commandsRunningForComponent(cluster_id, component_name):
logger.info("Skipping status command for {0}. Since command for it is running".format(component_name))
continue
result = self.check_component_status(cluster_id, service_name, component_name, command_name)
if result:
cluster_reports[cluster_id].append(result)
self.send_updates_to_server(cluster_reports)
except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue
pass
except:
logger.exception("Exception in ComponentStatusExecutor. Re-running it")
self.stop_event.wait(self.status_commands_run_interval)
logger.info("ComponentStatusExecutor has successfully finished")
def check_component_status(self, cluster_id, service_name, component_name, command_name, report=False):
"""
Returns components status if it has changed, otherwise None.
"""
# if not a component
if self.topology_cache.get_component_info_by_key(cluster_id, service_name, component_name) is None:
return None
command_dict = {
'serviceName': service_name,
'role': component_name,
'clusterId': cluster_id,
'commandType': 'STATUS_COMMAND',
}
component_status_result = self.customServiceOrchestrator.requestComponentStatus(command_dict)
status = LiveStatus.LIVE_STATUS if component_status_result['exitcode'] == 0 else LiveStatus.DEAD_STATUS
# log if status command failed
if status == LiveStatus.DEAD_STATUS:
stderr = component_status_result['stderr']
if not "ComponentIsNotRunning" in stderr and not "ClientComponentHasNoStatus" in stderr:
logger.info("Status command for {0} failed:\n{1}".format(component_name, stderr))
result = {
'serviceName': service_name,
'componentName': component_name,
'command': command_name,
'status': status,
'clusterId': cluster_id,
}
if status != self.reported_component_status[cluster_id][component_name][command_name]:
logging.info("Status for {0} has changed to {1}".format(component_name, status))
self.recovery_manager.handle_status_change(component_name, status)
if report:
self.send_updates_to_server({cluster_id: [result]})
return result
return None
def send_updates_to_server(self, cluster_reports):
if not cluster_reports or not self.initializer_module.is_registered:
return
correlation_id = self.initializer_module.connection.send(message={'clusters': cluster_reports}, destination=Constants.COMPONENT_STATUS_REPORTS_ENDPOINT)
self.server_responses_listener.listener_functions_on_success[correlation_id] = lambda headers, message: self.save_reported_component_status(cluster_reports)
def save_reported_component_status(self, cluster_reports):
for cluster_id, reports in cluster_reports.iteritems():
for report in reports:
component_name = report['componentName']
command = report['command']
status = report['status']
self.reported_component_status[cluster_id][component_name][command] = status
def clean_not_existing_clusters_info(self):
"""
This needs to be done to remove information about clusters which where deleted (e.g. ambari-server reset)
"""
for cluster_id in self.reported_component_status.keys():
if not cluster_id in self.topology_cache.get_cluster_ids():
del self.reported_component_status[cluster_id]