| #!/usr/bin/env python |
| |
| ''' |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| ''' |
| |
| def fix_encoding_reimport_bug(): |
| """ |
| Fix https://bugs.python.org/issue14847 |
| """ |
| b'x'.decode('utf-8') |
| b'x'.decode('ascii') |
| |
| def fix_subprocess_racecondition(): |
| """ |
| Subprocess in Python has race condition with enabling/disabling gc. Which may lead to turning off python garbage collector. |
| This leads to a memory leak. |
| This function monkey patches subprocess to fix the issue. |
| |
| !!! PLEASE NOTE THIS SHOULD BE CALLED BEFORE ANY OTHER INITIALIZATION was done to avoid already created links to subprocess or subprocess.gc or gc |
| """ |
| # monkey patching subprocess |
| import subprocess |
| subprocess.gc.isenabled = lambda: True |
| |
| # re-importing gc to have correct isenabled for non-subprocess contexts |
| import sys |
| del sys.modules['gc'] |
| import gc |
| |
| |
| def fix_subprocess_popen(): |
| ''' |
| Workaround for race condition in starting subprocesses concurrently from |
| multiple threads via the subprocess and multiprocessing modules. |
| See http://bugs.python.org/issue19809 for details and repro script. |
| ''' |
| import os |
| import sys |
| |
| if os.name == 'posix' and sys.version_info[0] < 3: |
| from multiprocessing import forking |
| import subprocess |
| import threading |
| |
| sp_original_init = subprocess.Popen.__init__ |
| mp_original_init = forking.Popen.__init__ |
| lock = threading.RLock() # guards subprocess creation |
| |
| def sp_locked_init(self, *a, **kw): |
| with lock: |
| sp_original_init(self, *a, **kw) |
| |
| def mp_locked_init(self, *a, **kw): |
| with lock: |
| mp_original_init(self, *a, **kw) |
| |
| subprocess.Popen.__init__ = sp_locked_init |
| forking.Popen.__init__ = mp_locked_init |
| |
| |
| fix_subprocess_popen() |
| fix_subprocess_racecondition() |
| fix_encoding_reimport_bug() |
| |
| import logging.handlers |
| import logging.config |
| import signal |
| from optparse import OptionParser |
| import sys |
| import traceback |
| import getpass |
| import os |
| import time |
| import locale |
| import platform |
| import ConfigParser |
| import ProcessHelper |
| import resource |
| from logging.handlers import SysLogHandler |
| from Controller import Controller |
| import AmbariConfig |
| from NetUtil import NetUtil |
| from PingPortListener import PingPortListener |
| import hostname |
| from DataCleaner import DataCleaner |
| from ambari_agent.ExitHelper import ExitHelper |
| import socket |
| from ambari_commons import OSConst, OSCheck |
| from ambari_commons.shell import shellRunner |
| from ambari_commons.network import reconfigure_urllib2_opener |
| from ambari_commons import shell |
| import HeartbeatHandlers |
| from HeartbeatHandlers import bind_signal_handlers |
| from ambari_commons.constants import AMBARI_SUDO_BINARY |
| from resource_management.core.logger import Logger |
| |
| logger = logging.getLogger() |
| alerts_logger = logging.getLogger('ambari_alerts') |
| |
| formatstr = "%(levelname)s %(asctime)s %(filename)s:%(lineno)d - %(message)s" |
| agentPid = os.getpid() |
| |
| # Global variables to be set later. |
| home_dir = "" |
| |
| config = AmbariConfig.AmbariConfig() |
| |
| # TODO AMBARI-18733, remove this global variable and calculate it based on home_dir once it is set. |
| configFile = config.getConfigFile() |
| two_way_ssl_property = config.TWO_WAY_SSL_PROPERTY |
| |
| IS_LINUX = platform.system() == "Linux" |
| SYSLOG_FORMAT_STRING = ' ambari_agent - %(filename)s - [%(process)d] - %(name)s - %(levelname)s - %(message)s' |
| SYSLOG_FORMATTER = logging.Formatter(SYSLOG_FORMAT_STRING) |
| |
| _file_logging_handlers ={} |
| |
| def setup_logging(logger, filename, logging_level): |
| formatter = logging.Formatter(formatstr) |
| |
| if filename in _file_logging_handlers: |
| rotateLog = _file_logging_handlers[filename] |
| else: |
| rotateLog = logging.handlers.RotatingFileHandler(filename, "a", 10000000, 25) |
| rotateLog.setFormatter(formatter) |
| _file_logging_handlers[filename] = rotateLog |
| logger.addHandler(rotateLog) |
| |
| logging.basicConfig(format=formatstr, level=logging_level, filename=filename) |
| logger.setLevel(logging_level) |
| logger.info("loglevel=logging.{0}".format(logging._levelNames[logging_level])) |
| |
| GRACEFUL_STOP_TRIES = 10 |
| GRACEFUL_STOP_TRIES_SLEEP = 3 |
| |
| |
| def add_syslog_handler(logger): |
| |
| syslog_enabled = config.has_option("logging","syslog_enabled") and (int(config.get("logging","syslog_enabled")) == 1) |
| |
| #add syslog handler if we are on linux and syslog is enabled in ambari config |
| if syslog_enabled and IS_LINUX: |
| logger.info("Adding syslog handler to ambari agent logger") |
| syslog_handler = SysLogHandler(address="/dev/log", |
| facility=SysLogHandler.LOG_LOCAL1) |
| |
| syslog_handler.setFormatter(SYSLOG_FORMATTER) |
| logger.addHandler(syslog_handler) |
| |
| def update_log_level(config): |
| # Setting loglevel based on config file |
| global logger |
| global home_dir |
| log_cfg_file = os.path.join(os.path.dirname(AmbariConfig.AmbariConfig.getConfigFile(home_dir)), "logging.conf") |
| if os.path.exists(log_cfg_file): |
| logging.config.fileConfig(log_cfg_file) |
| # create logger |
| logger = logging.getLogger(__name__) |
| logger.info("Logging configured by " + log_cfg_file) |
| else: |
| try: |
| loglevel = config.get('agent', 'loglevel') |
| if loglevel is not None: |
| if loglevel == 'DEBUG': |
| logging.basicConfig(format=formatstr, level=logging.DEBUG, filename=AmbariConfig.AmbariConfig.getLogFile()) |
| logger.setLevel(logging.DEBUG) |
| logger.info("Newloglevel=logging.DEBUG") |
| else: |
| logging.basicConfig(format=formatstr, level=logging.INFO, filename=AmbariConfig.AmbariConfig.getLogFile()) |
| logger.setLevel(logging.INFO) |
| logger.debug("Newloglevel=logging.INFO") |
| except Exception, err: |
| logger.info("Default loglevel=DEBUG") |
| |
| |
| # TODO AMBARI-18733, move inside AmbariConfig |
| def resolve_ambari_config(): |
| """ |
| Load the configurations. |
| In production, home_dir will be "". When running multiple Agents per host, each agent will have a unique path. |
| """ |
| global config |
| global home_dir |
| configPath = os.path.abspath(AmbariConfig.AmbariConfig.getConfigFile(home_dir)) |
| try: |
| if os.path.exists(configPath): |
| config.read(configPath) |
| else: |
| raise Exception("No config found at {0}, use default".format(configPath)) |
| |
| except Exception, err: |
| logger.warn(err) |
| |
| def check_sudo(): |
| # don't need to check sudo for root. |
| if os.geteuid() == 0: |
| return |
| |
| runner = shellRunner() |
| test_command = [AMBARI_SUDO_BINARY, '/usr/bin/test', '/'] |
| test_command_str = ' '.join(test_command) |
| |
| start_time = time.time() |
| res = runner.run(test_command) |
| end_time = time.time() |
| run_time = end_time - start_time |
| |
| if res['exitCode'] != 0: |
| raise Exception("Please check your sudo configurations.\n" + test_command_str + " failed with " + res['error'] + res['output']) # bad sudo configurations |
| |
| if run_time > 2: |
| logger.warn(("Sudo commands on this host are running slowly ('{0}' took {1} seconds).\n" + |
| "This will create a significant slow down for ambari-agent service tasks.").format(test_command_str, run_time)) |
| |
| # Updates the hard limit for open file handles |
| def update_open_files_ulimit(config): |
| global logger |
| # get the current soft and hard limits |
| # if the specified value is greater than or equal to the soft limit |
| # we can update the hard limit |
| (soft_limit, hard_limit) = resource.getrlimit(resource.RLIMIT_NOFILE) |
| open_files_ulimit = config.get_ulimit_open_files() |
| if open_files_ulimit >= soft_limit: |
| try: |
| resource.setrlimit(resource.RLIMIT_NOFILE, (soft_limit, open_files_ulimit)) |
| logger.info('open files ulimit = {0}'.format(open_files_ulimit)) |
| except ValueError, err: |
| logger.error('Unable to set open files ulimit to {0}: {1}'.format(open_files_ulimit, str(err))) |
| logger.info('open files ulimit = {0}'.format(hard_limit)) |
| |
| def perform_prestart_checks(expected_hostname): |
| # Check if current hostname is equal to expected one (got from the server |
| # during bootstrap. |
| global config |
| |
| if expected_hostname is not None: |
| current_hostname = hostname.hostname(config) |
| if current_hostname != expected_hostname: |
| print("Determined hostname does not match expected. Please check agent " |
| "log for details") |
| msg = "Ambari agent machine hostname ({0}) does not match expected ambari " \ |
| "server hostname ({1}). Aborting registration. Please check hostname, " \ |
| "hostname -f and /etc/hosts file to confirm your " \ |
| "hostname is setup correctly".format(current_hostname, expected_hostname) |
| logger.error(msg) |
| sys.exit(1) |
| # Check if there is another instance running |
| if os.path.isfile(ProcessHelper.pidfile) and not OSCheck.get_os_family() == OSConst.WINSRV_FAMILY: |
| print("%s already exists, exiting" % ProcessHelper.pidfile) |
| sys.exit(1) |
| # check if ambari prefix exists |
| elif config.has_option('agent', 'prefix') and not os.path.isdir(os.path.abspath(config.get('agent', 'prefix'))): |
| msg = "Ambari prefix dir %s does not exists, can't continue" \ |
| % config.get("agent", "prefix") |
| logger.error(msg) |
| print(msg) |
| sys.exit(1) |
| elif not config.has_option('agent', 'prefix'): |
| msg = "Ambari prefix dir %s not configured, can't continue" |
| logger.error(msg) |
| print(msg) |
| sys.exit(1) |
| |
| check_sudo() |
| |
| |
| def daemonize(): |
| pid = str(os.getpid()) |
| file(ProcessHelper.pidfile, 'w').write(pid) |
| |
| def stop_agent(): |
| # stop existing Ambari agent |
| pid = -1 |
| runner = shellRunner() |
| try: |
| with open(ProcessHelper.pidfile, 'r') as f: |
| pid = f.read() |
| pid = int(pid) |
| |
| runner.run([AMBARI_SUDO_BINARY, 'kill', '-15', str(pid)]) |
| for i in range(GRACEFUL_STOP_TRIES): |
| result = runner.run([AMBARI_SUDO_BINARY, 'kill', '-0', str(pid)]) |
| if result['exitCode'] != 0: |
| logger.info("Agent died gracefully, exiting.") |
| sys.exit(0) |
| time.sleep(GRACEFUL_STOP_TRIES_SLEEP) |
| logger.info("Agent not going to die gracefully, going to execute kill -9") |
| raise Exception("Agent is running") |
| except Exception, err: |
| #raise |
| if pid == -1: |
| print ("Agent process is not running") |
| else: |
| res = runner.run([AMBARI_SUDO_BINARY, 'kill', '-9', str(pid)]) |
| if res['exitCode'] != 0: |
| raise Exception("Error while performing agent stop. " + res['error'] + res['output']) |
| else: |
| logger.info("Agent stopped successfully by kill -9, exiting.") |
| sys.exit(0) |
| |
| def reset_agent(options): |
| global home_dir |
| try: |
| # update agent config file |
| agent_config = ConfigParser.ConfigParser() |
| # TODO AMBARI-18733, calculate configFile based on home_dir |
| agent_config.read(configFile) |
| server_host = agent_config.get('server', 'hostname') |
| new_host = options[2] |
| if new_host is not None and server_host != new_host: |
| print "Updating server host from " + server_host + " to " + new_host |
| agent_config.set('server', 'hostname', new_host) |
| with (open(configFile, "wb")) as new_agent_config: |
| agent_config.write(new_agent_config) |
| |
| # clear agent certs |
| agent_keysdir = agent_config.get('security', 'keysdir') |
| print "Removing Agent certificates..." |
| for root, dirs, files in os.walk(agent_keysdir, topdown=False): |
| for name in files: |
| os.remove(os.path.join(root, name)) |
| for name in dirs: |
| os.rmdir(os.path.join(root, name)) |
| except Exception, err: |
| print("A problem occurred while trying to reset the agent: " + str(err)) |
| sys.exit(1) |
| |
| sys.exit(0) |
| |
| MAX_RETRIES = 10 |
| |
| def run_threads(server_hostname, heartbeat_stop_callback): |
| # Launch Controller communication |
| controller = Controller(config, server_hostname, heartbeat_stop_callback) |
| controller.start() |
| time.sleep(2) # in order to get controller.statusCommandsExecutor initialized |
| while controller.is_alive(): |
| time.sleep(0.1) |
| |
| need_relaunch, reason = controller.get_status_commands_executor().need_relaunch |
| if need_relaunch: |
| controller.get_status_commands_executor().relaunch(reason) |
| |
| controller.get_status_commands_executor().kill("AGENT_STOPPED", can_relaunch=False) |
| |
| # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process |
| # we need this for windows os, where no sigterm available |
| def main(heartbeat_stop_callback=None): |
| global config |
| global home_dir |
| |
| parser = OptionParser() |
| parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="verbose log output", default=False) |
| parser.add_option("-e", "--expected-hostname", dest="expected_hostname", action="store", |
| help="expected hostname of current host. If hostname differs, agent will fail", default=None) |
| parser.add_option("--home", dest="home_dir", action="store", help="Home directory", default="") |
| (options, args) = parser.parse_args() |
| |
| expected_hostname = options.expected_hostname |
| home_dir = options.home_dir |
| |
| logging_level = logging.DEBUG if options.verbose else logging.INFO |
| |
| setup_logging(logger, AmbariConfig.AmbariConfig.getLogFile(), logging_level) |
| global is_logger_setup |
| is_logger_setup = True |
| setup_logging(alerts_logger, AmbariConfig.AmbariConfig.getAlertsLogFile(), logging_level) |
| Logger.initialize_logger('resource_management', logging_level=logging_level) |
| |
| if home_dir != "": |
| # When running multiple Ambari Agents on this host for simulation, each one will use a unique home directory. |
| Logger.info("Agent is using Home Dir: %s" % str(home_dir)) |
| |
| # use the host's locale for numeric formatting |
| try: |
| locale.setlocale(locale.LC_ALL, '') |
| except locale.Error as ex: |
| logger.warning("Cannot set locale for ambari-agent. Please check your systemwide locale settings. Failed due to: {0}.".format(str(ex))) |
| |
| default_cfg = {'agent': {'prefix': '/home/ambari'}} |
| config.load(default_cfg) |
| |
| if (len(sys.argv) > 1) and sys.argv[1] == 'stop': |
| stop_agent() |
| |
| if (len(sys.argv) > 2) and sys.argv[1] == 'reset': |
| reset_agent(sys.argv) |
| |
| # Check for ambari configuration file. |
| resolve_ambari_config() |
| |
| # Add syslog hanlder based on ambari config file |
| add_syslog_handler(logger) |
| |
| # Starting data cleanup daemon |
| data_cleaner = None |
| if config.has_option('agent', 'data_cleanup_interval') and int(config.get('agent','data_cleanup_interval')) > 0: |
| data_cleaner = DataCleaner(config) |
| data_cleaner.start() |
| |
| perform_prestart_checks(expected_hostname) |
| |
| # Starting ping port listener |
| try: |
| #This acts as a single process machine-wide lock (albeit incomplete, since |
| # we still need an extra file to track the Agent PID) |
| ping_port_listener = PingPortListener(config) |
| except Exception as ex: |
| err_message = "Failed to start ping port listener of: " + str(ex) |
| logger.error(err_message) |
| sys.stderr.write(err_message) |
| sys.exit(1) |
| ping_port_listener.start() |
| |
| update_log_level(config) |
| |
| update_open_files_ulimit(config) |
| |
| if not config.use_system_proxy_setting(): |
| logger.info('Agent is configured to ignore system proxy settings') |
| reconfigure_urllib2_opener(ignore_system_proxy=True) |
| |
| if not OSCheck.get_os_family() == OSConst.WINSRV_FAMILY: |
| daemonize() |
| |
| # |
| # Iterate through the list of server hostnames and connect to the first active server |
| # |
| |
| active_server = None |
| server_hostnames = hostname.server_hostnames(config) |
| |
| connected = False |
| stopped = False |
| |
| # Keep trying to connect to a server or bail out if ambari-agent was stopped |
| while not connected and not stopped: |
| for server_hostname in server_hostnames: |
| server_url = config.get_api_url(server_hostname) |
| try: |
| server_ip = socket.gethostbyname(server_hostname) |
| logger.info('Connecting to Ambari server at %s (%s)', server_url, server_ip) |
| except socket.error: |
| logger.warn("Unable to determine the IP address of the Ambari server '%s'", server_hostname) |
| |
| # Wait until MAX_RETRIES to see if server is reachable |
| netutil = NetUtil(config, heartbeat_stop_callback) |
| (retries, connected, stopped) = netutil.try_to_connect(server_url, MAX_RETRIES, logger) |
| |
| # if connected, launch controller |
| if connected: |
| logger.info('Connected to Ambari server %s', server_hostname) |
| # Set the active server |
| active_server = server_hostname |
| # Launch Controller communication |
| run_threads(server_hostname, heartbeat_stop_callback) |
| |
| # |
| # If Ambari Agent connected to the server or |
| # Ambari Agent was stopped using stop event |
| # Clean up if not Windows OS |
| # |
| if connected or stopped: |
| ExitHelper().exit(0) |
| logger.info("finished") |
| break |
| pass # for server_hostname in server_hostnames |
| pass # while not (connected or stopped) |
| |
| return active_server |
| |
| if __name__ == "__main__": |
| is_logger_setup = False |
| try: |
| heartbeat_stop_callback = bind_signal_handlers(agentPid) |
| |
| main(heartbeat_stop_callback) |
| except SystemExit: |
| raise |
| except BaseException: |
| if is_logger_setup: |
| logger.exception("Exiting with exception:") |
| raise |