blob: 0297f742f3637430172f5bf07bf549cf129cf4f2 [file] [log] [blame]
#!/usr/bin/env python
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import multiprocessing
import logging
import ambari_simplejson as json
import sys
import os
import socket
import time
import threading
import urllib2
import pprint
from random import randint
import re
import subprocess
import functools
import hostname
import security
import ssl
import AmbariConfig
from ambari_agent.Heartbeat import Heartbeat
from ambari_agent.Register import Register
from ambari_agent.ActionQueue import ActionQueue
from ambari_agent.FileCache import FileCache
from ambari_agent.NetUtil import NetUtil
from ambari_agent.LiveStatus import LiveStatus
from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
from ambari_agent.ClusterConfiguration import ClusterConfiguration
from ambari_agent.RecoveryManager import RecoveryManager
from ambari_agent.HeartbeatHandlers import HeartbeatStopHandlers, bind_signal_handlers
from ambari_agent.ExitHelper import ExitHelper
from ambari_agent.StatusCommandsExecutor import MultiProcessStatusCommandsExecutor, SingleProcessStatusCommandsExecutor
from ambari_commons.network import reconfigure_urllib2_opener
from resource_management.libraries.functions.version import compare_versions
from ambari_commons.os_utils import get_used_ram
logger = logging.getLogger(__name__)
AGENT_AUTO_RESTART_EXIT_CODE = 77
AGENT_RAM_OVERUSE_MESSAGE = "Ambari-agent RAM usage {used_ram} MB went above {config_name}={max_ram} MB. Restarting ambari-agent to clean the RAM."
class Controller(threading.Thread):
def __init__(self, config, server_hostname, heartbeat_stop_callback = None):
threading.Thread.__init__(self)
logger.debug('Initializing Controller RPC thread.')
if heartbeat_stop_callback is None:
heartbeat_stop_callback = HeartbeatStopHandlers()
self.version = self.read_agent_version(config)
self.lock = threading.Lock()
self.safeMode = True
self.credential = None
self.config = config
self.hostname = hostname.hostname(config)
self.serverHostname = server_hostname
server_secured_url = 'https://' + self.serverHostname + \
':' + config.get('server', 'secured_url_port')
self.registerUrl = server_secured_url + '/agent/v1/register/' + self.hostname
self.heartbeatUrl = server_secured_url + '/agent/v1/heartbeat/' + self.hostname
self.componentsUrl = server_secured_url + '/agent/v1/components/'
self.netutil = NetUtil(self.config, heartbeat_stop_callback)
self.responseId = -1
self.repeatRegistration = False
self.isRegistered = False
self.cachedconnect = None
self.max_reconnect_retry_delay = int(config.get('server','max_reconnect_retry_delay', default=30))
self.hasMappedComponents = True
self.statusCommandsExecutor = None
# Event is used for synchronizing heartbeat iterations (to make possible
# manual wait() interruption between heartbeats )
self.heartbeat_stop_callback = heartbeat_stop_callback
# List of callbacks that are called at agent registration
self.registration_listeners = []
# pull config directory out of config
cache_dir = config.get('agent', 'cache_dir')
if cache_dir is None:
cache_dir = '/var/lib/ambari-agent/cache'
self.max_ram_soft = int(config.get('agent','memory_threshold_soft_mb', default=0))
self.max_ram_hard = int(config.get('agent','memory_threshold_hard_mb', default=0))
stacks_cache_dir = os.path.join(cache_dir, FileCache.STACKS_CACHE_DIRECTORY)
common_services_cache_dir = os.path.join(cache_dir, FileCache.COMMON_SERVICES_DIRECTORY)
extensions_cache_dir = os.path.join(cache_dir, FileCache.EXTENSIONS_CACHE_DIRECTORY)
host_scripts_cache_dir = os.path.join(cache_dir, FileCache.HOST_SCRIPTS_CACHE_DIRECTORY)
alerts_cache_dir = os.path.join(cache_dir, FileCache.ALERTS_CACHE_DIRECTORY)
cluster_config_cache_dir = os.path.join(cache_dir, FileCache.CLUSTER_CONFIGURATION_CACHE_DIRECTORY)
recovery_cache_dir = os.path.join(cache_dir, FileCache.RECOVERY_CACHE_DIRECTORY)
self.heartbeat_idle_interval_min = int(self.config.get('heartbeat', 'idle_interval_min')) if self.config.get('heartbeat', 'idle_interval_min') else self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MIN_SEC
if self.heartbeat_idle_interval_min < 1:
self.heartbeat_idle_interval_min = 1
self.heartbeat_idle_interval_max = int(self.config.get('heartbeat', 'idle_interval_max')) if self.config.get('heartbeat', 'idle_interval_max') else self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
if self.heartbeat_idle_interval_min > self.heartbeat_idle_interval_max:
raise Exception("Heartbeat minimum interval={0} seconds can not be greater than the maximum interval={1} seconds !".format(self.heartbeat_idle_interval_min, self.heartbeat_idle_interval_max))
self.get_heartbeat_interval = functools.partial(self.netutil.get_agent_heartbeat_idle_interval_sec, self.heartbeat_idle_interval_min, self.heartbeat_idle_interval_max)
self.recovery_manager = RecoveryManager(recovery_cache_dir)
self.cluster_configuration = ClusterConfiguration(cluster_config_cache_dir)
self.move_data_dir_mount_file()
if not config.use_system_proxy_setting():
reconfigure_urllib2_opener(ignore_system_proxy=True)
self.alert_scheduler_handler = AlertSchedulerHandler(alerts_cache_dir,
stacks_cache_dir, common_services_cache_dir, extensions_cache_dir,
host_scripts_cache_dir, self.cluster_configuration, config,
self.recovery_manager)
self.alert_scheduler_handler.start()
def read_agent_version(self, config):
data_dir = config.get('agent', 'prefix')
ver_file = os.path.join(data_dir, 'version')
f = open(ver_file, "r")
version = f.read().strip()
f.close()
return version
def __del__(self):
logger.info("Server connection disconnected.")
def registerWithServer(self):
"""
:return: returning from current method without setting self.isRegistered
to True will lead to agent termination.
"""
LiveStatus.SERVICES = []
LiveStatus.CLIENT_COMPONENTS = []
LiveStatus.COMPONENTS = []
ret = {}
while not self.isRegistered:
try:
data = json.dumps(self.register.build(self.version))
prettyData = pprint.pformat(data)
try:
server_ip = socket.gethostbyname(self.hostname)
logger.info("Registering with %s (%s) (agent=%s)", self.hostname, server_ip, prettyData)
except socket.error:
logger.warn("Unable to determine the IP address of '%s', agent registration may fail (agent=%s)",
self.hostname, prettyData)
ret = self.sendRequest(self.registerUrl, data)
prettyData = pprint.pformat(ret)
logger.debug("Registration response is %s", prettyData)
# exitstatus is a code of error which was raised on server side.
# exitstatus = 0 (OK - Default)
# exitstatus = 1 (Registration failed because different version of agent and server)
exitstatus = 0
if 'exitstatus' in ret.keys():
exitstatus = int(ret['exitstatus'])
if exitstatus == 1:
# log - message, which will be printed to agents log
if 'log' in ret.keys():
log = ret['log']
logger.error(log)
self.isRegistered = False
self.repeatRegistration = False
return ret
self.responseId = int(ret['responseId'])
logger.info("Registration Successful (response id = %s)", self.responseId)
self.isRegistered = True
# always update cached cluster configurations on registration
# must be prior to any other operation
self.cluster_configuration.update_configurations_from_heartbeat(ret)
self.recovery_manager.update_configuration_from_registration(ret)
self.config.update_configuration_from_registration(ret)
logger.debug("Updated config:" + str(self.config))
# Start StatusCommandExecutor child process or restart it if already running
# in order to receive up to date agent config.
self.statusCommandsExecutor.relaunch("REGISTER_WITH_SERVER")
if 'statusCommands' in ret.keys():
logger.debug("Got status commands on registration.")
self.addToStatusQueue(ret['statusCommands'])
else:
self.hasMappedComponents = False
# always update alert definitions on registration
self.alert_scheduler_handler.update_definitions(ret)
except ssl.SSLError:
self.repeatRegistration = False
self.isRegistered = False
return
except Exception, ex:
# try a reconnect only after a certain amount of random time
delay = randint(0, self.max_reconnect_retry_delay)
logger.error("Unable to connect to: " + self.registerUrl, exc_info=True)
logger.error("Error:" + str(ex))
logger.warn(""" Sleeping for {0} seconds and then trying again """.format(delay,))
time.sleep(delay)
return ret
def cancelCommandInQueue(self, commands):
""" Remove from the queue commands, kill the process if it's in progress """
if commands:
try:
self.actionQueue.cancel(commands)
except Exception, err:
logger.error("Exception occurred on commands cancel: %s", err.message)
def addToQueue(self, commands):
"""Add to the queue for running the commands """
""" Put the required actions into the Queue """
""" Verify if the action is to reboot or not """
if not commands:
logger.debug("No commands received from %s", self.serverHostname)
else:
"""Only add to the queue if not empty list """
logger.info("Adding %s commands. Heartbeat id = %s", len(commands), self.responseId)
if 'clusterName' in commands[0].keys():
self.updateComponents(commands[0]['clusterName'])
self.actionQueue.put(commands)
def addToStatusQueue(self, commands):
if not commands:
logger.debug("No status commands received from %s", self.serverHostname)
else:
logger.info("Adding %s status commands. Heartbeat id = %s", len(commands), self.responseId)
if 'clusterName' in commands[0].keys():
self.updateComponents(commands[0]['clusterName'])
self.recovery_manager.process_status_commands(commands)
self.actionQueue.put_status(commands)
pass
# For testing purposes
DEBUG_HEARTBEAT_RETRIES = 0
DEBUG_SUCCESSFULL_HEARTBEATS = 0
DEBUG_STOP_HEARTBEATING = False
def trigger_heartbeat(self):
self.heartbeat_stop_callback.set_heartbeat()
def heartbeatWithServer(self):
self.DEBUG_HEARTBEAT_RETRIES = 0
self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
retry = False
certVerifFailed = False
state_interval = int(self.config.get('heartbeat', 'state_interval_seconds', '60'))
# last time when state was successfully sent to server
last_state_timestamp = 0.0
# in order to be able to check from logs that heartbeats processing
# still running we log a message. However to avoid generating too
# much log when the heartbeat runs at a higher rate (e.g. 1 second intervals)
# we log the message at the same interval as 'state interval'
heartbeat_running_msg_timestamp = 0.0
# Prevent excessive logging by logging only at specific intervals
getrecoverycommands_timestamp = 0.0
getrecoverycommands_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
heartbeat_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
while not self.DEBUG_STOP_HEARTBEATING:
current_time = time.time()
logging_level = logging.DEBUG
if current_time - heartbeat_running_msg_timestamp > state_interval:
# log more steps every minute or so
logging_level = logging.INFO
heartbeat_running_msg_timestamp = current_time
try:
logger.log(logging_level, "Heartbeat (response id = %s) with server is running...", self.responseId)
send_state = False
if not retry:
if current_time - last_state_timestamp > state_interval:
send_state = True
logger.log(logging_level, "Building heartbeat message")
data = json.dumps(self.heartbeat.build(self.responseId, send_state, self.hasMappedComponents))
else:
self.DEBUG_HEARTBEAT_RETRIES += 1
if logger.isEnabledFor(logging.DEBUG):
logger.log(logging_level, "Sending Heartbeat (id = %s): %s", self.responseId, data)
else:
logger.log(logging_level, "Sending Heartbeat (id = %s)", self.responseId)
response = self.sendRequest(self.heartbeatUrl, data)
exitStatus = 0
if 'exitstatus' in response.keys():
exitStatus = int(response['exitstatus'])
if exitStatus != 0:
raise Exception(response)
serverId = int(response['responseId'])
logger.log(logging_level, 'Heartbeat response received (id = %s)', serverId)
cluster_size = int(response['clusterSize']) if 'clusterSize' in response.keys() else -1
# TODO: this needs to be revised if hosts can be shared across multiple clusters
heartbeat_interval = self.get_heartbeat_interval(cluster_size) \
if cluster_size > 0 \
else self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
logger.log(logging_level, "Heartbeat interval is %s seconds", heartbeat_interval)
if 'hasMappedComponents' in response.keys():
self.hasMappedComponents = response['hasMappedComponents'] is not False
if 'hasPendingTasks' in response.keys():
has_pending_tasks = bool(response['hasPendingTasks'])
self.recovery_manager.set_paused(has_pending_tasks)
if 'registrationCommand' in response.keys():
# check if the registration command is None. If none skip
if response['registrationCommand'] is not None:
logger.info("RegistrationCommand received - repeat agent registration")
self.isRegistered = False
self.repeatRegistration = True
return
used_ram = get_used_ram()/1000
# dealing with a possible memory leaks
if self.max_ram_soft and used_ram >= self.max_ram_soft and not self.actionQueue.tasks_in_progress_or_pending():
logger.error(AGENT_RAM_OVERUSE_MESSAGE.format(used_ram=used_ram, config_name="memory_threshold_soft_mb", max_ram=self.max_ram_soft))
self.restartAgent()
if self.max_ram_hard and used_ram >= self.max_ram_hard:
logger.error(AGENT_RAM_OVERUSE_MESSAGE.format(used_ram=used_ram, config_name="memory_threshold_hard_mb", max_ram=self.max_ram_hard))
self.restartAgent()
if serverId != self.responseId + 1:
logger.error("Error in responseId sequence - restarting")
self.restartAgent()
else:
self.responseId = serverId
if send_state:
last_state_timestamp = current_time
# if the response contains configurations, update the in-memory and
# disk-based configuration cache (execution and alert commands have this)
logger.log(logging_level, "Updating configurations from heartbeat")
self.cluster_configuration.update_configurations_from_heartbeat(response)
response_keys = response.keys()
# there's case when canceled task can be processed in Action Queue.execute before adding rescheduled task to queue
# this can cause command failure instead result suppression
# so canceling and putting rescheduled commands should be executed atomically
if 'cancelCommands' in response_keys or 'executionCommands' in response_keys:
logger.log(logging_level, "Adding cancel/execution commands")
with self.actionQueue.lock:
if 'cancelCommands' in response_keys:
self.cancelCommandInQueue(response['cancelCommands'])
if 'executionCommands' in response_keys:
execution_commands = response['executionCommands']
self.recovery_manager.process_execution_commands(execution_commands)
self.addToQueue(execution_commands)
if 'statusCommands' in response_keys:
# try storing execution command details and desired state
self.addToStatusQueue(response['statusCommands'])
if current_time - getrecoverycommands_timestamp > getrecoverycommands_interval:
getrecoverycommands_timestamp = current_time
if not self.actionQueue.tasks_in_progress_or_pending():
logger.log(logging_level, "Adding recovery commands")
recovery_commands = self.recovery_manager.get_recovery_commands()
for recovery_command in recovery_commands:
logger.info("Adding recovery command %s for component %s",
recovery_command['roleCommand'], recovery_command['role'])
self.addToQueue([recovery_command])
if 'alertDefinitionCommands' in response_keys:
logger.log(logging_level, "Updating alert definitions")
self.alert_scheduler_handler.update_definitions(response)
if 'alertExecutionCommands' in response_keys:
logger.log(logging_level, "Executing alert commands")
self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands'])
if "true" == response['restartAgent']:
logger.error("Received the restartAgent command")
self.restartAgent()
else:
logger.debug("No commands sent from %s", self.serverHostname)
if retry:
logger.info("Reconnected to %s", self.heartbeatUrl)
if "recoveryConfig" in response:
# update the list of components enabled for recovery
logger.log(logging_level, "Updating recovery config")
self.recovery_manager.update_configuration_from_registration(response)
retry = False
certVerifFailed = False
self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
self.DEBUG_HEARTBEAT_RETRIES = 0
self.heartbeat_stop_callback.reset_heartbeat()
except ssl.SSLError:
self.repeatRegistration=False
self.isRegistered = False
logger.exception("SSLError while trying to heartbeat.")
return
except Exception, err:
if "code" in err:
logger.error(err.code)
else:
logException = False
if logger.isEnabledFor(logging.DEBUG):
logException = True
exceptionMessage = str(err)
errorMessage = "Unable to reconnect to {0} (attempts={1}, details={2})".format(self.heartbeatUrl, self.DEBUG_HEARTBEAT_RETRIES, exceptionMessage)
if not retry:
errorMessage = "Connection to {0} was lost (details={1})".format(self.serverHostname, exceptionMessage)
logger.error(errorMessage, exc_info=logException)
if 'certificate verify failed' in str(err) and not certVerifFailed:
logger.warn("Server certificate verify failed. Did you regenerate server certificate?")
certVerifFailed = True
self.cachedconnect = None # Previous connection is broken now
retry = True
#randomize the heartbeat
delay = randint(0, self.max_reconnect_retry_delay)
time.sleep(delay)
# Sleep for some time
timeout = heartbeat_interval - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
logger.log(logging_level, "Waiting %s for next heartbeat", timeout)
if 0 == self.heartbeat_stop_callback.wait(timeout, self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS):
# Stop loop when stop event received
logger.info("Stop event received")
self.DEBUG_STOP_HEARTBEATING=True
logger.log(logging_level, "Wait for next heartbeat over")
def run(self):
try:
self.actionQueue = ActionQueue(self.config, controller=self)
if self.config.get_multiprocess_status_commands_executor_enabled():
self.statusCommandsExecutor = MultiProcessStatusCommandsExecutor(self.config, self.actionQueue)
else:
self.statusCommandsExecutor = SingleProcessStatusCommandsExecutor(self.config, self.actionQueue)
ExitHelper().register(self.statusCommandsExecutor.kill, "CLEANUP_KILLING", can_relaunch=False)
self.actionQueue.start()
self.register = Register(self.config)
self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())
opener = urllib2.build_opener()
urllib2.install_opener(opener)
while True:
self.repeatRegistration = False
self.registerAndHeartbeat()
if not self.repeatRegistration:
logger.info("Finished heartbeating and registering cycle")
break
except:
logger.exception("Controller thread failed with exception:")
raise
logger.info("Controller thread has successfully finished")
def registerAndHeartbeat(self):
registerResponse = self.registerWithServer()
if "response" in registerResponse:
message = registerResponse["response"]
logger.info("Registration response from %s was %s", self.serverHostname, message)
if self.isRegistered:
# Clearing command queue to stop executing "stale" commands
# after registration
logger.info('Resetting ActionQueue...')
self.actionQueue.reset()
# Process callbacks
for callback in self.registration_listeners:
callback()
time.sleep(self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC)
self.heartbeatWithServer()
else:
logger.info("Registration response from %s didn't contain 'response' as a key".format(self.serverHostname))
def restartAgent(self):
ExitHelper().exit(AGENT_AUTO_RESTART_EXIT_CODE)
def sendRequest(self, url, data):
response = None
try:
if self.cachedconnect is None: # Lazy initialization
self.cachedconnect = security.CachedHTTPSConnection(self.config, self.serverHostname)
req = urllib2.Request(url, data, {'Content-Type': 'application/json',
'Accept-encoding': 'gzip'})
response = self.cachedconnect.request(req)
return json.loads(response)
except Exception, exception:
if response is None:
raise IOError('Request to {0} failed due to {1}'.format(url, str(exception)))
else:
raise IOError('Response parsing failed! Request data: ' + str(data)
+ '; Response: ' + str(response))
def updateComponents(self, cluster_name):
if LiveStatus.SERVICES:
return
logger.debug("Updating components map of cluster " + cluster_name)
# May throw IOError on server connection error
response = self.sendRequest(self.componentsUrl + cluster_name, None)
logger.debug("Response from %s was %s", self.serverHostname, str(response))
services, client_components, server_components = [], [], []
for service, components in response['components'].items():
services.append(service)
for component, category in components.items():
service_component = {"serviceName": service, "componentName": component}
if category == 'CLIENT':
client_components.append(service_component)
else:
server_components.append(service_component)
LiveStatus.SERVICES = services
LiveStatus.CLIENT_COMPONENTS = client_components
LiveStatus.COMPONENTS = server_components
logger.debug("Components map updated")
logger.debug("LiveStatus.SERVICES" + str(LiveStatus.SERVICES))
logger.debug("LiveStatus.CLIENT_COMPONENTS" + str(LiveStatus.CLIENT_COMPONENTS))
logger.debug("LiveStatus.COMPONENTS" + str(LiveStatus.COMPONENTS))
def get_status_commands_executor(self):
return self.statusCommandsExecutor
def move_data_dir_mount_file(self):
"""
In Ambari 2.1.2, we moved the dfs_data_dir_mount.hist to a static location
because /etc/hadoop/conf points to a symlink'ed location that would change during
Stack Upgrade.
"""
try:
version = self.get_version()
logger.debug("Ambari Agent version {0}".format(version))
if compare_versions(version, "2.1.2") >= 0:
source_file = "/etc/hadoop/conf/dfs_data_dir_mount.hist"
destination_file = "/var/lib/ambari-agent/data/datanode/dfs_data_dir_mount.hist"
if os.path.exists(source_file) and not os.path.exists(destination_file):
command = "mkdir -p %s" % os.path.dirname(destination_file)
logger.info("Moving Data Dir Mount History file. Executing command: %s" % command)
return_code = subprocess.call(command, shell=True)
logger.info("Return code: %d" % return_code)
command = "mv %s %s" % (source_file, destination_file)
logger.info("Moving Data Dir Mount History file. Executing command: %s" % command)
return_code = subprocess.call(command, shell=True)
logger.info("Return code: %d" % return_code)
except Exception, e:
logger.error("Exception in move_data_dir_mount_file(). Error: {0}".format(str(e)))
def get_version(self):
version = self.version
matches = re.findall(r"[\d+.]+",version)
if not matches:
logger.warning("No version match result, use original version {0}".format(version))
return version
else:
return matches[0]
def main(argv=None):
# Allow Ctrl-C
formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - \
%(message)s")
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv))
config = AmbariConfig.config
heartbeat_stop_callback = bind_signal_handlers()
collector = Controller(config, heartbeat_stop_callback)
collector.start()
collector.run()
if __name__ == '__main__':
main()