| #!/usr/bin/env python |
| |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import json |
| import logging |
| import copy |
| import os |
| import time |
| import threading |
| import pprint |
| |
| from ambari_agent.ActionQueue import ActionQueue |
| from ambari_agent.LiveStatus import LiveStatus |
| |
| |
| logger = logging.getLogger() |
| |
| """ |
| RecoveryManager has the following capabilities: |
| * Store data needed for execution commands extracted from STATUS command |
| * Generate INSTALL command |
| * Generate START command |
| """ |
| |
| |
| class RecoveryManager: |
| COMMAND_TYPE = "commandType" |
| PAYLOAD_LEVEL = "payloadLevel" |
| COMPONENT_NAME = "componentName" |
| ROLE = "role" |
| TASK_ID = "taskId" |
| DESIRED_STATE = "desiredState" |
| HAS_STALE_CONFIG = "hasStaleConfigs" |
| EXECUTION_COMMAND_DETAILS = "executionCommandDetails" |
| ROLE_COMMAND = "roleCommand" |
| HOST_LEVEL_PARAMS = "hostLevelParams" |
| PAYLOAD_LEVEL_DEFAULT = "DEFAULT" |
| PAYLOAD_LEVEL_MINIMAL = "MINIMAL" |
| PAYLOAD_LEVEL_EXECUTION_COMMAND = "EXECUTION_COMMAND" |
| STARTED = "STARTED" |
| INSTALLED = "INSTALLED" |
| INIT = "INIT" # TODO: What is the state when machine is reset |
| INSTALL_FAILED = "INSTALL_FAILED" |
| COMPONENT_UPDATE_KEY_FORMAT = "{0}_UPDATE_TIME" |
| COMMAND_REFRESH_DELAY_SEC = 600 #10 minutes |
| |
| FILENAME = "recovery.json" |
| |
| default_action_counter = { |
| "lastAttempt": 0, |
| "count": 0, |
| "lastReset": 0, |
| "lifetimeCount": 0, |
| "warnedLastAttempt": False, |
| "warnedLastReset": False, |
| "warnedThresholdReached": False |
| } |
| |
| default_component_status = { |
| "current": "", |
| "desired": "", |
| "stale_config": False |
| } |
| |
| def __init__(self, cache_dir, recovery_enabled=False, auto_start_only=False, auto_install_start=False): |
| self.recovery_enabled = recovery_enabled |
| self.auto_start_only = auto_start_only |
| self.auto_install_start = auto_install_start |
| self.max_count = 6 |
| self.window_in_min = 60 |
| self.retry_gap = 5 |
| self.max_lifetime_count = 12 |
| |
| self.stored_exec_commands = {} |
| self.id = int(time.time()) |
| self.allowed_desired_states = [self.STARTED, self.INSTALLED] |
| self.allowed_current_states = [self.INIT, self.INSTALLED] |
| self.enabled_components = [] |
| self.statuses = {} |
| self.__status_lock = threading.RLock() |
| self.__command_lock = threading.RLock() |
| self.__active_command_lock = threading.RLock() |
| self.__cache_lock = threading.RLock() |
| self.active_command_count = 0 |
| self.paused = False |
| self.recovery_timestamp = -1 |
| |
| if not os.path.exists(cache_dir): |
| try: |
| os.makedirs(cache_dir) |
| except: |
| logger.critical("[RecoveryManager] Could not create the cache directory {0}".format(cache_dir)) |
| |
| self.__actions_json_file = os.path.join(cache_dir, self.FILENAME) |
| |
| self.actions = {} |
| |
| self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, auto_install_start, "", -1) |
| |
| pass |
| |
| def start_execution_command(self): |
| with self.__active_command_lock: |
| self.active_command_count += 1 |
| pass |
| |
| def stop_execution_command(self): |
| with self.__active_command_lock: |
| self.active_command_count -= 1 |
| pass |
| |
| def has_active_command(self): |
| return self.active_command_count > 0 |
| |
| def set_paused(self, paused): |
| if self.paused != paused: |
| logger.debug("RecoveryManager is transitioning from isPaused = " + str(self.paused) + " to " + str(paused)) |
| self.paused = paused |
| |
| def enabled(self): |
| return self.recovery_enabled |
| |
| def get_current_status(self, component): |
| if component in self.statuses: |
| return self.statuses[component]["current"] |
| pass |
| |
| def get_desired_status(self, component): |
| if component in self.statuses: |
| return self.statuses[component]["desired"] |
| pass |
| |
| def update_config_staleness(self, component, is_config_stale): |
| """ |
| Updates staleness of config |
| """ |
| if component not in self.statuses: |
| self.__status_lock.acquire() |
| try: |
| if component not in self.statuses: |
| component_status = copy.deepcopy(self.default_component_status) |
| component_status["stale_config"] = is_config_stale |
| self.statuses[component] = component_status |
| finally: |
| self.__status_lock.release() |
| pass |
| |
| self.statuses[component]["stale_config"] = is_config_stale |
| if self.statuses[component]["current"] == self.statuses[component]["desired"] and \ |
| self.statuses[component]["stale_config"] == False: |
| self.remove_command(component) |
| pass |
| |
| def update_current_status(self, component, state): |
| """ |
| Updates the current status of a host component managed by the agent |
| """ |
| if component not in self.statuses: |
| self.__status_lock.acquire() |
| try: |
| if component not in self.statuses: |
| component_status = copy.deepcopy(self.default_component_status) |
| component_status["current"] = state |
| self.statuses[component] = component_status |
| logger.info("New status, current status is set to %s for %s", self.statuses[component]["current"], component) |
| finally: |
| self.__status_lock.release() |
| pass |
| |
| if self.statuses[component]["current"] != state: |
| logger.info("current status is set to %s for %s", state, component) |
| self.statuses[component]["current"] = state |
| if self.statuses[component]["current"] == self.statuses[component]["desired"] and \ |
| self.statuses[component]["stale_config"] == False: |
| self.remove_command(component) |
| pass |
| |
| |
| def update_desired_status(self, component, state): |
| """ |
| Updates the desired status of a host component managed by the agent |
| """ |
| if component not in self.statuses: |
| self.__status_lock.acquire() |
| try: |
| if component not in self.statuses: |
| component_status = copy.deepcopy(self.default_component_status) |
| component_status["desired"] = state |
| self.statuses[component] = component_status |
| logger.info("New status, desired status is set to %s for %s", self.statuses[component]["desired"], component) |
| finally: |
| self.__status_lock.release() |
| pass |
| |
| if self.statuses[component]["desired"] != state: |
| logger.info("desired status is set to %s for %s", state, component) |
| self.statuses[component]["desired"] = state |
| if self.statuses[component]["current"] == self.statuses[component]["desired"] and \ |
| self.statuses[component]["stale_config"] == False: |
| self.remove_command(component) |
| pass |
| |
| """ |
| Whether specific components are enabled for recovery. |
| """ |
| def configured_for_recovery(self, component): |
| if len(self.enabled_components) > 0 and component in self.enabled_components: |
| return True |
| |
| return False |
| |
| def requires_recovery(self, component): |
| """ |
| Recovery is allowed for: |
| INISTALLED --> STARTED |
| INIT --> INSTALLED --> STARTED |
| RE-INSTALLED (if configs do not match) |
| """ |
| if not self.enabled(): |
| return False |
| |
| if not self.configured_for_recovery(component): |
| return False |
| |
| if component not in self.statuses: |
| return False |
| |
| status = self.statuses[component] |
| if self.auto_start_only or self.auto_install_start: |
| if status["current"] == status["desired"]: |
| return False |
| if status["desired"] not in self.allowed_desired_states: |
| return False |
| else: |
| if status["current"] == status["desired"] and status['stale_config'] == False: |
| return False |
| |
| if status["desired"] not in self.allowed_desired_states or status["current"] not in self.allowed_current_states: |
| return False |
| |
| logger.info("%s needs recovery, desired = %s, and current = %s.", component, status["desired"], status["current"]) |
| return True |
| pass |
| |
| |
| |
| def get_recovery_status(self): |
| """ |
| Creates a status in the form |
| { |
| "summary" : "RECOVERABLE|DISABLED|PARTIALLY_RECOVERABLE|UNRECOVERABLE", |
| "component_reports" : [ |
| { |
| "name": "component_name", |
| "numAttempts" : "x", |
| "limitReached" : "true|false" |
| "status" : "REQUIRES_RECOVERY|RECOVERY_COMMAND_REQUESTED|RECOVERY_COMMAND_ISSUED|NO_RECOVERY_NEEDED" |
| } |
| ] |
| } |
| """ |
| report = {} |
| report["summary"] = "DISABLED" |
| if self.enabled(): |
| report["summary"] = "RECOVERABLE" |
| num_limits_reached = 0 |
| recovery_states = [] |
| report["componentReports"] = recovery_states |
| self.__status_lock.acquire() |
| try: |
| for component in self.actions.keys(): |
| action = self.actions[component] |
| recovery_state = {} |
| recovery_state["name"] = component |
| recovery_state["numAttempts"] = action["lifetimeCount"] |
| recovery_state["limitReached"] = self.max_lifetime_count <= action["lifetimeCount"] |
| recovery_states.append(recovery_state) |
| if recovery_state["limitReached"] == True: |
| num_limits_reached += 1 |
| pass |
| finally: |
| self.__status_lock.release() |
| |
| if num_limits_reached > 0: |
| report["summary"] = "PARTIALLY_RECOVERABLE" |
| if num_limits_reached == len(recovery_states): |
| report["summary"] = "UNRECOVERABLE" |
| |
| return report |
| pass |
| |
| def get_recovery_commands(self): |
| """ |
| This method computes the recovery commands for the following transitions |
| INSTALLED --> STARTED |
| INIT --> INSTALLED |
| INSTALLED_FAILED --> INSTALLED |
| INSTALLED_FAILED --> STARTED |
| """ |
| commands = [] |
| for component in self.statuses.keys(): |
| if self.requires_recovery(component) and self.may_execute(component): |
| status = copy.deepcopy(self.statuses[component]) |
| command = None |
| if self.auto_start_only: |
| if status["desired"] == self.STARTED: |
| if status["current"] == self.INSTALLED: |
| command = self.get_start_command(component) |
| elif self.auto_install_start: |
| if status["desired"] == self.STARTED: |
| if status["current"] == self.INSTALLED: |
| command = self.get_start_command(component) |
| elif status["current"] == self.INSTALL_FAILED: |
| command = self.get_install_command(component) |
| elif status["desired"] == self.INSTALLED: |
| if status["current"] == self.INSTALL_FAILED: |
| command = self.get_install_command(component) |
| else: |
| # START, INSTALL, RESTART |
| if status["desired"] != status["current"]: |
| if status["desired"] == self.STARTED: |
| if status["current"] == self.INSTALLED: |
| command = self.get_start_command(component) |
| elif status["current"] == self.INIT: |
| command = self.get_install_command(component) |
| elif status["current"] == self.INSTALL_FAILED: |
| command = self.get_install_command(component) |
| elif status["desired"] == self.INSTALLED: |
| if status["current"] == self.INIT: |
| command = self.get_install_command(component) |
| elif status["current"] == self.INSTALL_FAILED: |
| command = self.get_install_command(component) |
| elif status["current"] == self.STARTED: |
| command = self.get_stop_command(component) |
| else: |
| if status["current"] == self.INSTALLED: |
| command = self.get_install_command(component) |
| elif status["current"] == self.STARTED: |
| command = self.get_restart_command(component) |
| |
| if command: |
| self.execute(component) |
| commands.append(command) |
| return commands |
| pass |
| |
| |
| def may_execute(self, action): |
| """ |
| Check if an action can be executed |
| """ |
| if not action or action.strip() == "": |
| return False |
| |
| if action not in self.actions: |
| self.__status_lock.acquire() |
| try: |
| self.actions[action] = copy.deepcopy(self.default_action_counter) |
| finally: |
| self.__status_lock.release() |
| return self._execute_action_chk_only(action) |
| pass |
| |
| |
| def execute(self, action): |
| """ |
| Executed an action |
| """ |
| if not action or action.strip() == "": |
| return False |
| |
| if action not in self.actions: |
| self.__status_lock.acquire() |
| try: |
| self.actions[action] = copy.deepcopy(self.default_action_counter) |
| finally: |
| self.__status_lock.release() |
| return self._execute_action_(action) |
| pass |
| |
| |
| def _execute_action_(self, action_name): |
| """ |
| _private_ implementation of [may] execute |
| """ |
| action_counter = self.actions[action_name] |
| now = self._now_() |
| executed = False |
| seconds_since_last_attempt = now - action_counter["lastAttempt"] |
| if action_counter["lifetimeCount"] < self.max_lifetime_count: |
| #reset if window_in_sec seconds passed since last attempt |
| if seconds_since_last_attempt > self.window_in_sec: |
| action_counter["count"] = 0 |
| action_counter["lastReset"] = now |
| action_counter["warnedLastReset"] = False |
| if action_counter["count"] < self.max_count: |
| if seconds_since_last_attempt > self.retry_gap_in_sec: |
| action_counter["count"] += 1 |
| action_counter["lifetimeCount"] +=1 |
| if self.retry_gap > 0: |
| action_counter["lastAttempt"] = now |
| action_counter["warnedLastAttempt"] = False |
| if action_counter["count"] == 1: |
| action_counter["lastReset"] = now |
| executed = True |
| else: |
| if action_counter["warnedLastAttempt"] == False: |
| action_counter["warnedLastAttempt"] = True |
| logger.warn( |
| "%s seconds has not passed since last occurrence %s seconds back for %s. " + |
| "Will silently skip execution without warning till retry gap is passed", |
| self.retry_gap_in_sec, seconds_since_last_attempt, action_name) |
| else: |
| logger.debug( |
| "%s seconds has not passed since last occurrence %s seconds back for %s", |
| self.retry_gap_in_sec, seconds_since_last_attempt, action_name) |
| else: |
| sec_since_last_reset = now - action_counter["lastReset"] |
| if sec_since_last_reset > self.window_in_sec: |
| action_counter["count"] = 1 |
| action_counter["lifetimeCount"] +=1 |
| if self.retry_gap > 0: |
| action_counter["lastAttempt"] = now |
| action_counter["lastReset"] = now |
| action_counter["warnedLastReset"] = False |
| executed = True |
| else: |
| if action_counter["warnedLastReset"] == False: |
| action_counter["warnedLastReset"] = True |
| logger.warn("%s occurrences in %s minutes reached the limit for %s. " + |
| "Will silently skip execution without warning till window is reset", |
| action_counter["count"], self.window_in_min, action_name) |
| else: |
| logger.debug("%s occurrences in %s minutes reached the limit for %s", |
| action_counter["count"], self.window_in_min, action_name) |
| else: |
| if action_counter["warnedThresholdReached"] == False: |
| action_counter["warnedThresholdReached"] = True |
| logger.warn("%s occurrences in agent life time reached the limit for %s. " + |
| "Will silently skip execution without warning till window is reset", |
| action_counter["lifetimeCount"], action_name) |
| else: |
| logger.error("%s occurrences in agent life time reached the limit for %s", |
| action_counter["lifetimeCount"], action_name) |
| self._dump_actions() |
| return executed |
| pass |
| |
| |
| def _dump_actions(self): |
| """ |
| Dump recovery actions to FS |
| """ |
| self.__cache_lock.acquire() |
| try: |
| with open(self.__actions_json_file, 'w') as f: |
| json.dump(self.actions, f, indent=2) |
| except Exception, exception: |
| logger.exception("Unable to dump actions to {0}".format(self.__actions_json_file)) |
| return False |
| finally: |
| self.__cache_lock.release() |
| |
| return True |
| pass |
| |
| |
| def _load_actions(self): |
| """ |
| Loads recovery actions from FS |
| """ |
| self.__cache_lock.acquire() |
| |
| try: |
| if os.path.isfile(self.__actions_json_file): |
| with open(self.__actions_json_file, 'r') as fp: |
| return json.load(fp) |
| except Exception, exception: |
| logger.warning("Unable to load recovery actions from {0}.".format(self.__actions_json_file)) |
| finally: |
| self.__cache_lock.release() |
| |
| return {} |
| pass |
| |
| |
| def get_actions_copy(self): |
| """ |
| :return: recovery actions copy |
| """ |
| self.__status_lock.acquire() |
| try: |
| return copy.deepcopy(self.actions) |
| finally: |
| self.__status_lock.release() |
| pass |
| |
| |
| def is_action_info_stale(self, action_name): |
| """ |
| Checks if the action info is stale |
| :param action_name: |
| :return: if the action info for action_name: is stale |
| """ |
| if action_name in self.actions: |
| action_counter = self.actions[action_name] |
| now = self._now_() |
| seconds_since_last_attempt = now - action_counter["lastAttempt"] |
| return seconds_since_last_attempt > self.window_in_sec |
| return False |
| pass |
| |
| def _execute_action_chk_only(self, action_name): |
| """ |
| _private_ implementation of [may] execute check only |
| """ |
| action_counter = self.actions[action_name] |
| now = self._now_() |
| seconds_since_last_attempt = now - action_counter["lastAttempt"] |
| |
| if action_counter["lifetimeCount"] < self.max_lifetime_count: |
| if action_counter["count"] < self.max_count: |
| if seconds_since_last_attempt > self.retry_gap_in_sec: |
| return True |
| else: |
| sec_since_last_reset = now - action_counter["lastReset"] |
| if sec_since_last_reset > self.window_in_sec: |
| return True |
| |
| return False |
| pass |
| |
| def _now_(self): |
| return int(time.time()) |
| pass |
| |
| |
| def update_configuration_from_registration(self, reg_resp): |
| """ |
| TODO: Server sends the recovery configuration - call update_config after parsing |
| "recoveryConfig": { |
| "type" : "DEFAULT|AUTO_START|AUTO_INSTALL_START|FULL", |
| "maxCount" : 10, |
| "windowInMinutes" : 60, |
| "retryGap" : 0, |
| "components" : "a,b", |
| "recoveryTimestamp" : 1458150424380 |
| } |
| """ |
| |
| recovery_enabled = False |
| auto_start_only = False |
| auto_install_start = False |
| max_count = 6 |
| window_in_min = 60 |
| retry_gap = 5 |
| max_lifetime_count = 12 |
| enabled_components = "" |
| recovery_timestamp = -1 # Default value if recoveryTimestamp is not available. |
| |
| |
| if reg_resp and "recoveryConfig" in reg_resp: |
| logger.info("RecoverConfig = " + pprint.pformat(reg_resp["recoveryConfig"])) |
| config = reg_resp["recoveryConfig"] |
| if "type" in config: |
| if config["type"] in ["AUTO_INSTALL_START", "AUTO_START", "FULL"]: |
| recovery_enabled = True |
| if config["type"] == "AUTO_START": |
| auto_start_only = True |
| elif config["type"] == "AUTO_INSTALL_START": |
| auto_install_start = True |
| |
| if "maxCount" in config: |
| max_count = self._read_int_(config["maxCount"], max_count) |
| if "windowInMinutes" in config: |
| window_in_min = self._read_int_(config["windowInMinutes"], window_in_min) |
| if "retryGap" in config: |
| retry_gap = self._read_int_(config["retryGap"], retry_gap) |
| if 'maxLifetimeCount' in config: |
| max_lifetime_count = self._read_int_(config['maxLifetimeCount'], max_lifetime_count) |
| |
| if 'components' in config: |
| enabled_components = config['components'] |
| |
| if 'recoveryTimestamp' in config: |
| recovery_timestamp = config['recoveryTimestamp'] |
| |
| self.update_config(max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only, |
| auto_install_start, enabled_components, recovery_timestamp) |
| pass |
| |
| """ |
| Update recovery configuration with the specified values. |
| |
| max_count - Configured maximum count of recovery attempt allowed per host component in a window. |
| window_in_min - Configured window size in minutes. |
| retry_gap - Configured retry gap between tries per host component |
| max_lifetime_count - Configured maximum lifetime count of recovery attempt allowed per host component. |
| recovery_enabled - True or False. Indicates whether recovery is enabled or not. |
| auto_start_only - True if AUTO_START recovery type was specified. False otherwise. |
| auto_install_start - True if AUTO_INSTALL_START recovery type was specified. False otherwise. |
| enabled_components - CSV of componenents enabled for auto start. |
| recovery_timestamp - Timestamp when the recovery values were last updated. -1 on start up. |
| """ |
| def update_config(self, max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, |
| auto_start_only, auto_install_start, enabled_components, recovery_timestamp): |
| """ |
| Update recovery configuration, recovery is disabled if configuration values |
| are not correct |
| """ |
| self.recovery_enabled = False; |
| if max_count <= 0: |
| logger.warn("Recovery disabled: max_count must be a non-negative number") |
| return |
| |
| if window_in_min <= 0: |
| logger.warn("Recovery disabled: window_in_min must be a non-negative number") |
| return |
| |
| if retry_gap < 1: |
| logger.warn("Recovery disabled: retry_gap must be a positive number and at least 1") |
| return |
| if retry_gap >= window_in_min: |
| logger.warn("Recovery disabled: retry_gap must be smaller than window_in_min") |
| return |
| if max_lifetime_count < 0 or max_lifetime_count < max_count: |
| logger.warn("Recovery disabled: max_lifetime_count must more than 0 and >= max_count") |
| return |
| |
| self.max_count = max_count |
| self.window_in_min = window_in_min |
| self.retry_gap = retry_gap |
| self.window_in_sec = window_in_min * 60 |
| self.retry_gap_in_sec = retry_gap * 60 |
| self.auto_start_only = auto_start_only |
| self.auto_install_start = auto_install_start |
| self.max_lifetime_count = max_lifetime_count |
| self.enabled_components = [] |
| self.recovery_timestamp = recovery_timestamp |
| |
| self.allowed_desired_states = [self.STARTED, self.INSTALLED] |
| self.allowed_current_states = [self.INIT, self.INSTALL_FAILED, self.INSTALLED, self.STARTED] |
| |
| if self.auto_start_only: |
| self.allowed_desired_states = [self.STARTED] |
| self.allowed_current_states = [self.INSTALLED] |
| elif self.auto_install_start: |
| self.allowed_desired_states = [self.INSTALLED, self.STARTED] |
| self.allowed_current_states = [self.INSTALL_FAILED, self.INSTALLED] |
| |
| if enabled_components is not None and len(enabled_components) > 0: |
| components = enabled_components.split(",") |
| for component in components: |
| if len(component.strip()) > 0: |
| self.enabled_components.append(component.strip()) |
| |
| self.recovery_enabled = recovery_enabled |
| if self.recovery_enabled: |
| logger.info( |
| "==> Auto recovery is enabled with maximum %s in %s minutes with gap of %s minutes between and" |
| " lifetime max being %s. Enabled components - %s", |
| self.max_count, self.window_in_min, self.retry_gap, self.max_lifetime_count, |
| ', '.join(self.enabled_components)) |
| pass |
| |
| |
| def get_unique_task_id(self): |
| self.id += 1 |
| return self.id |
| pass |
| |
| |
| def process_status_commands(self, commands): |
| if not self.enabled(): |
| return |
| |
| if commands and len(commands) > 0: |
| for command in commands: |
| self.store_or_update_command(command) |
| if self.EXECUTION_COMMAND_DETAILS in command: |
| logger.debug("Details to construct exec commands: " + pprint.pformat(command[self.EXECUTION_COMMAND_DETAILS])) |
| |
| pass |
| |
| |
| def process_execution_commands(self, commands): |
| if not self.enabled(): |
| return |
| |
| if commands and len(commands) > 0: |
| for command in commands: |
| if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND: |
| if self.ROLE in command: |
| if command[self.ROLE_COMMAND] in (ActionQueue.ROLE_COMMAND_INSTALL, ActionQueue.ROLE_COMMAND_STOP) \ |
| and self.configured_for_recovery(command[self.ROLE]): |
| self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS) |
| logger.info("Received EXECUTION_COMMAND (STOP/INSTALL), desired state of " + command[self.ROLE] + " to " + |
| self.get_desired_status(command[self.ROLE]) ) |
| elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START \ |
| and self.configured_for_recovery(command[self.ROLE]): |
| self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS) |
| logger.info("Received EXECUTION_COMMAND (START), desired state of " + command[self.ROLE] + " to " + |
| self.get_desired_status(command[self.ROLE]) ) |
| elif command[self.HOST_LEVEL_PARAMS].has_key('custom_command') and \ |
| command[self.HOST_LEVEL_PARAMS]['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART \ |
| and self.configured_for_recovery(command[self.ROLE]): |
| self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS) |
| logger.info("Received EXECUTION_COMMAND (RESTART), desired state of " + command[self.ROLE] + " to " + |
| self.get_desired_status(command[self.ROLE]) ) |
| |
| pass |
| |
| |
| def store_or_update_command(self, command): |
| """ |
| Stores command details by reading them from the STATUS_COMMAND |
| Update desired state as well |
| """ |
| if not self.enabled(): |
| return |
| |
| logger.debug("Inspecting command to store/update details") |
| if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.STATUS_COMMAND: |
| payloadLevel = self.PAYLOAD_LEVEL_DEFAULT |
| if self.PAYLOAD_LEVEL in command: |
| payloadLevel = command[self.PAYLOAD_LEVEL] |
| |
| component = command[self.COMPONENT_NAME] |
| self.update_desired_status(component, command[self.DESIRED_STATE]) |
| self.update_config_staleness(component, command[self.HAS_STALE_CONFIG]) |
| |
| if payloadLevel == self.PAYLOAD_LEVEL_EXECUTION_COMMAND: |
| if self.EXECUTION_COMMAND_DETAILS in command: |
| # Store the execution command details |
| self.remove_command(component) |
| self.add_command(component, command[self.EXECUTION_COMMAND_DETAILS]) |
| logger.debug("Stored command details for " + component) |
| else: |
| logger.warn("Expected field " + self.EXECUTION_COMMAND_DETAILS + " unavailable.") |
| pass |
| pass |
| |
| |
| def get_install_command(self, component): |
| if self.paused: |
| logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.") |
| return None |
| |
| if self.enabled(): |
| logger.debug("Using stored INSTALL command for %s", component) |
| if self.command_exists(component, ActionQueue.EXECUTION_COMMAND): |
| command = copy.deepcopy(self.stored_exec_commands[component]) |
| command[self.ROLE_COMMAND] = "INSTALL" |
| command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND |
| command[self.TASK_ID] = self.get_unique_task_id() |
| return command |
| else: |
| logger.info("INSTALL command cannot be computed as details are not received from Server.") |
| else: |
| logger.info("Recovery is not enabled. INSTALL command will not be computed.") |
| return None |
| pass |
| |
| def get_stop_command(self, component): |
| if self.paused: |
| logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.") |
| return None |
| |
| if self.enabled(): |
| logger.debug("Using stored STOP command for %s", component) |
| if self.command_exists(component, ActionQueue.EXECUTION_COMMAND): |
| command = copy.deepcopy(self.stored_exec_commands[component]) |
| command[self.ROLE_COMMAND] = "STOP" |
| command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND |
| command[self.TASK_ID] = self.get_unique_task_id() |
| return command |
| else: |
| logger.info("STOP command cannot be computed as details are not received from Server.") |
| else: |
| logger.info("Recovery is not enabled. STOP command will not be computed.") |
| return None |
| pass |
| |
| def get_restart_command(self, component): |
| if self.paused: |
| logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.") |
| return None |
| |
| if self.enabled(): |
| logger.debug("Using stored INSTALL command for %s", component) |
| if self.command_exists(component, ActionQueue.EXECUTION_COMMAND): |
| command = copy.deepcopy(self.stored_exec_commands[component]) |
| command[self.ROLE_COMMAND] = "CUSTOM_COMMAND" |
| command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND |
| command[self.TASK_ID] = self.get_unique_task_id() |
| command[self.HOST_LEVEL_PARAMS]['custom_command'] = 'RESTART' |
| return command |
| else: |
| logger.info("RESTART command cannot be computed as details are not received from Server.") |
| else: |
| logger.info("Recovery is not enabled. RESTART command will not be computed.") |
| return None |
| pass |
| |
| |
| def get_start_command(self, component): |
| if self.paused: |
| logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.") |
| return None |
| |
| if self.enabled(): |
| logger.debug("Using stored START command for %s", component) |
| if self.command_exists(component, ActionQueue.EXECUTION_COMMAND): |
| command = copy.deepcopy(self.stored_exec_commands[component]) |
| command[self.ROLE_COMMAND] = "START" |
| command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND |
| command[self.TASK_ID] = self.get_unique_task_id() |
| return command |
| else: |
| logger.info("START command cannot be computed as details are not received from Server.") |
| else: |
| logger.info("Recovery is not enabled. START command will not be computed.") |
| |
| return None |
| pass |
| |
| |
| def command_exists(self, component, command_type): |
| if command_type == ActionQueue.EXECUTION_COMMAND: |
| self.remove_stale_command(component) |
| if component in self.stored_exec_commands: |
| return True |
| |
| return False |
| pass |
| |
| |
| def remove_stale_command(self, component): |
| component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component) |
| if component in self.stored_exec_commands: |
| insert_time = self.stored_exec_commands[component_update_key] |
| age = self._now_() - insert_time |
| if self.COMMAND_REFRESH_DELAY_SEC < age: |
| logger.debug("Removing stored command for component : " + str(component) + " as its " + str(age) + " sec old") |
| self.remove_command(component) |
| pass |
| |
| |
| def remove_command(self, component): |
| if component in self.stored_exec_commands: |
| self.__status_lock.acquire() |
| try: |
| component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component) |
| del self.stored_exec_commands[component] |
| del self.stored_exec_commands[component_update_key] |
| logger.debug("Removed stored command for component : " + str(component)) |
| return True |
| finally: |
| self.__status_lock.release() |
| return False |
| |
| |
| def add_command(self, component, command): |
| self.__status_lock.acquire() |
| try: |
| component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component) |
| self.stored_exec_commands[component] = command |
| self.stored_exec_commands[component_update_key] = self._now_() |
| logger.debug("Added command for component : " + str(component)) |
| finally: |
| self.__status_lock.release() |
| |
| |
| def _read_int_(self, value, default_value=0): |
| int_value = default_value |
| try: |
| int_value = int(value) |
| except ValueError: |
| pass |
| return int_value |
| |
| |
| def main(argv=None): |
| cmd_mgr = RecoveryManager('/tmp') |
| pass |
| |
| |
| if __name__ == '__main__': |
| main() |