#!/usr/bin/env python

'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''

def fix_encoding_reimport_bug():
  """
  Fix https://bugs.python.org/issue14847
  """
  b'x'.decode('utf-8')
  b'x'.decode('ascii')

def fix_subprocess_racecondition():
  """
  Subprocess in Python has race condition with enabling/disabling gc. Which may lead to turning off python garbage collector.
  This leads to a memory leak.
  This function monkey patches subprocess to fix the issue.

  !!! PLEASE NOTE THIS SHOULD BE CALLED BEFORE ANY OTHER INITIALIZATION was done to avoid already created links to subprocess or subprocess.gc or gc
  """
  # monkey patching subprocess
  import subprocess
  subprocess.gc.isenabled = lambda: True

  # re-importing gc to have correct isenabled for non-subprocess contexts
  import sys
  del sys.modules['gc']
  import gc


def fix_subprocess_popen():
  '''
  Workaround for race condition in starting subprocesses concurrently from
  multiple threads via the subprocess and multiprocessing modules.
  See http://bugs.python.org/issue19809 for details and repro script.
  '''
  import os
  import sys

  if os.name == 'posix' and sys.version_info[0] < 3:
    from multiprocessing import forking
    import subprocess
    import threading

    sp_original_init = subprocess.Popen.__init__
    mp_original_init = forking.Popen.__init__
    lock = threading.RLock() # guards subprocess creation

    def sp_locked_init(self, *a, **kw):
      with lock:
        sp_original_init(self, *a, **kw)

    def mp_locked_init(self, *a, **kw):
      with lock:
        mp_original_init(self, *a, **kw)

    subprocess.Popen.__init__ = sp_locked_init
    forking.Popen.__init__ = mp_locked_init


fix_subprocess_popen()
fix_subprocess_racecondition()
fix_encoding_reimport_bug()

import logging.handlers
import logging.config
import signal
from optparse import OptionParser
import sys
import traceback
import getpass
import os
import time
import locale
import platform
import ConfigParser
import ProcessHelper
import resource
from logging.handlers import SysLogHandler
from Controller import Controller
import AmbariConfig
from NetUtil import NetUtil
from PingPortListener import PingPortListener
import hostname
from DataCleaner import DataCleaner
from ambari_agent.ExitHelper import ExitHelper
import socket
from ambari_commons import OSConst, OSCheck
from ambari_commons.shell import shellRunner
from ambari_commons.network import reconfigure_urllib2_opener
from ambari_commons import shell
import HeartbeatHandlers
from HeartbeatHandlers import bind_signal_handlers
from ambari_commons.constants import AMBARI_SUDO_BINARY
from resource_management.core.logger import Logger

logger = logging.getLogger()
alerts_logger = logging.getLogger('ambari_alerts')

formatstr = "%(levelname)s %(asctime)s %(filename)s:%(lineno)d - %(message)s"
agentPid = os.getpid()

# Global variables to be set later.
home_dir = ""

config = AmbariConfig.AmbariConfig()

# TODO AMBARI-18733, remove this global variable and calculate it based on home_dir once it is set.
configFile = config.getConfigFile()
two_way_ssl_property = config.TWO_WAY_SSL_PROPERTY

IS_LINUX = platform.system() == "Linux"
SYSLOG_FORMAT_STRING = ' ambari_agent - %(filename)s - [%(process)d] - %(name)s - %(levelname)s - %(message)s'
SYSLOG_FORMATTER = logging.Formatter(SYSLOG_FORMAT_STRING)

_file_logging_handlers ={}

def setup_logging(logger, filename, logging_level):
  formatter = logging.Formatter(formatstr)

  if filename in _file_logging_handlers:
    rotateLog = _file_logging_handlers[filename]
  else:
    rotateLog = logging.handlers.RotatingFileHandler(filename, "a", 10000000, 25)
    rotateLog.setFormatter(formatter)
    _file_logging_handlers[filename] = rotateLog
  logger.addHandler(rotateLog)
      
  logging.basicConfig(format=formatstr, level=logging_level, filename=filename)
  logger.setLevel(logging_level)
  logger.info("loglevel=logging.{0}".format(logging._levelNames[logging_level]))

GRACEFUL_STOP_TRIES = 10
GRACEFUL_STOP_TRIES_SLEEP = 3


def add_syslog_handler(logger):
    
  syslog_enabled = config.has_option("logging","syslog_enabled") and (int(config.get("logging","syslog_enabled")) == 1)
      
  #add syslog handler if we are on linux and syslog is enabled in ambari config
  if syslog_enabled and IS_LINUX:
    logger.info("Adding syslog handler to ambari agent logger")
    syslog_handler = SysLogHandler(address="/dev/log",
                                   facility=SysLogHandler.LOG_LOCAL1)
        
    syslog_handler.setFormatter(SYSLOG_FORMATTER)
    logger.addHandler(syslog_handler)
    
def update_log_level(config):
  # Setting loglevel based on config file
  global logger
  global home_dir
  log_cfg_file = os.path.join(os.path.dirname(AmbariConfig.AmbariConfig.getConfigFile(home_dir)), "logging.conf")
  if os.path.exists(log_cfg_file):
    logging.config.fileConfig(log_cfg_file)
    # create logger
    logger = logging.getLogger(__name__)
    logger.info("Logging configured by " + log_cfg_file)
  else:  
    try:
      loglevel = config.get('agent', 'loglevel')
      if loglevel is not None:
        if loglevel == 'DEBUG':
          logging.basicConfig(format=formatstr, level=logging.DEBUG, filename=AmbariConfig.AmbariConfig.getLogFile())
          logger.setLevel(logging.DEBUG)
          logger.info("Newloglevel=logging.DEBUG")
        else:
          logging.basicConfig(format=formatstr, level=logging.INFO, filename=AmbariConfig.AmbariConfig.getLogFile())
          logger.setLevel(logging.INFO)
          logger.debug("Newloglevel=logging.INFO")
    except Exception, err:
      logger.info("Default loglevel=DEBUG")


# TODO AMBARI-18733, move inside AmbariConfig
def resolve_ambari_config():
  """
  Load the configurations.
  In production, home_dir will be "". When running multiple Agents per host, each agent will have a unique path.
  """
  global config
  global home_dir
  configPath = os.path.abspath(AmbariConfig.AmbariConfig.getConfigFile(home_dir))
  try:
    if os.path.exists(configPath):
      config.read(configPath)
    else:
      raise Exception("No config found at {0}, use default".format(configPath))

  except Exception, err:
    logger.warn(err)

def check_sudo():
  # don't need to check sudo for root.
  if os.geteuid() == 0:
    return
  
  runner = shellRunner()
  test_command = [AMBARI_SUDO_BINARY, '/usr/bin/test', '/']
  test_command_str = ' '.join(test_command)
  
  start_time = time.time()
  res = runner.run(test_command)
  end_time = time.time()
  run_time = end_time - start_time
  
  if res['exitCode'] != 0:
    raise Exception("Please check your sudo configurations.\n" + test_command_str + " failed with " + res['error'] + res['output']) # bad sudo configurations
  
  if run_time > 2:
    logger.warn(("Sudo commands on this host are running slowly ('{0}' took {1} seconds).\n" +
                "This will create a significant slow down for ambari-agent service tasks.").format(test_command_str, run_time))

# Updates the hard limit for open file handles
def update_open_files_ulimit(config):
  global logger
  # get the current soft and hard limits
  # if the specified value is greater than or equal to the soft limit
  # we can update the hard limit
  (soft_limit, hard_limit) = resource.getrlimit(resource.RLIMIT_NOFILE)
  open_files_ulimit = config.get_ulimit_open_files()
  if open_files_ulimit >= soft_limit:
    try:
      resource.setrlimit(resource.RLIMIT_NOFILE, (soft_limit, open_files_ulimit))
      logger.info('open files ulimit = {0}'.format(open_files_ulimit))
    except ValueError, err:
      logger.error('Unable to set open files ulimit to {0}: {1}'.format(open_files_ulimit, str(err)))
      logger.info('open files ulimit = {0}'.format(hard_limit))

def perform_prestart_checks(expected_hostname):
  # Check if current hostname is equal to expected one (got from the server
  # during bootstrap.
  global config

  if expected_hostname is not None:
    current_hostname = hostname.hostname(config)
    if current_hostname != expected_hostname:
      print("Determined hostname does not match expected. Please check agent "
            "log for details")
      msg = "Ambari agent machine hostname ({0}) does not match expected ambari " \
            "server hostname ({1}). Aborting registration. Please check hostname, " \
            "hostname -f and /etc/hosts file to confirm your " \
            "hostname is setup correctly".format(current_hostname, expected_hostname)
      logger.error(msg)
      sys.exit(1)
  # Check if there is another instance running
  if os.path.isfile(ProcessHelper.pidfile) and not OSCheck.get_os_family() == OSConst.WINSRV_FAMILY:
    print("%s already exists, exiting" % ProcessHelper.pidfile)
    sys.exit(1)
  # check if ambari prefix exists
  elif config.has_option('agent', 'prefix') and not os.path.isdir(os.path.abspath(config.get('agent', 'prefix'))):
    msg = "Ambari prefix dir %s does not exists, can't continue" \
          % config.get("agent", "prefix")
    logger.error(msg)
    print(msg)
    sys.exit(1)
  elif not config.has_option('agent', 'prefix'):
    msg = "Ambari prefix dir %s not configured, can't continue"
    logger.error(msg)
    print(msg)
    sys.exit(1)
    
  check_sudo()


def daemonize():
  pid = str(os.getpid())
  file(ProcessHelper.pidfile, 'w').write(pid)

def stop_agent():
# stop existing Ambari agent
  pid = -1
  runner = shellRunner()
  try:
    with open(ProcessHelper.pidfile, 'r') as f:
      pid = f.read()
    pid = int(pid)
    
    runner.run([AMBARI_SUDO_BINARY, 'kill', '-15', str(pid)])
    for i in range(GRACEFUL_STOP_TRIES):
      result = runner.run([AMBARI_SUDO_BINARY, 'kill', '-0', str(pid)])
      if result['exitCode'] != 0:
        logger.info("Agent died gracefully, exiting.")
        sys.exit(0)
      time.sleep(GRACEFUL_STOP_TRIES_SLEEP)
    logger.info("Agent not going to die gracefully, going to execute kill -9")
    raise Exception("Agent is running")
  except Exception, err:
    #raise
    if pid == -1:
      print ("Agent process is not running")
    else:
      res = runner.run([AMBARI_SUDO_BINARY, 'kill', '-9', str(pid)])
      if res['exitCode'] != 0:
        raise Exception("Error while performing agent stop. " + res['error'] + res['output'])
      else:
        logger.info("Agent stopped successfully by kill -9, exiting.")
    sys.exit(0)

def reset_agent(options):
  global home_dir
  try:
    # update agent config file
    agent_config = ConfigParser.ConfigParser()
    # TODO AMBARI-18733, calculate configFile based on home_dir
    agent_config.read(configFile)
    server_host = agent_config.get('server', 'hostname')
    new_host = options[2]
    if new_host is not None and server_host != new_host:
      print "Updating server host from " + server_host + " to " + new_host
      agent_config.set('server', 'hostname', new_host)
      with (open(configFile, "wb")) as new_agent_config:
        agent_config.write(new_agent_config)

    # clear agent certs
    agent_keysdir = agent_config.get('security', 'keysdir')
    print "Removing Agent certificates..."
    for root, dirs, files in os.walk(agent_keysdir, topdown=False):
      for name in files:
        os.remove(os.path.join(root, name))
      for name in dirs:
        os.rmdir(os.path.join(root, name))
  except Exception, err:
    print("A problem occurred while trying to reset the agent: " + str(err))
    sys.exit(1)

  sys.exit(0)

MAX_RETRIES = 10

def run_threads(server_hostname, heartbeat_stop_callback):
  # Launch Controller communication
  controller = Controller(config, server_hostname, heartbeat_stop_callback)
  controller.start()
  time.sleep(2) # in order to get controller.statusCommandsExecutor initialized
  while controller.is_alive():
    time.sleep(0.1)

    need_relaunch, reason = controller.get_status_commands_executor().need_relaunch
    if need_relaunch:
      controller.get_status_commands_executor().relaunch(reason)

  controller.get_status_commands_executor().kill("AGENT_STOPPED", can_relaunch=False)

# event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
# we need this for windows os, where no sigterm available
def main(heartbeat_stop_callback=None):
  global config
  global home_dir

  parser = OptionParser()
  parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="verbose log output", default=False)
  parser.add_option("-e", "--expected-hostname", dest="expected_hostname", action="store",
                    help="expected hostname of current host. If hostname differs, agent will fail", default=None)
  parser.add_option("--home", dest="home_dir", action="store", help="Home directory", default="")
  (options, args) = parser.parse_args()

  expected_hostname = options.expected_hostname
  home_dir = options.home_dir

  logging_level = logging.DEBUG if options.verbose else logging.INFO

  setup_logging(logger, AmbariConfig.AmbariConfig.getLogFile(), logging_level)
  global is_logger_setup
  is_logger_setup = True
  setup_logging(alerts_logger, AmbariConfig.AmbariConfig.getAlertsLogFile(), logging_level)
  Logger.initialize_logger('resource_management', logging_level=logging_level)

  if home_dir != "":
    # When running multiple Ambari Agents on this host for simulation, each one will use a unique home directory.
    Logger.info("Agent is using Home Dir: %s" % str(home_dir))

  # use the host's locale for numeric formatting
  try:
    locale.setlocale(locale.LC_ALL, '')
  except locale.Error as ex:
    logger.warning("Cannot set locale for ambari-agent. Please check your systemwide locale settings. Failed due to: {0}.".format(str(ex)))

  default_cfg = {'agent': {'prefix': '/home/ambari'}}
  config.load(default_cfg)

  if (len(sys.argv) > 1) and sys.argv[1] == 'stop':
    stop_agent()

  if (len(sys.argv) > 2) and sys.argv[1] == 'reset':
    reset_agent(sys.argv)

  # Check for ambari configuration file.
  resolve_ambari_config()
  
  # Add syslog hanlder based on ambari config file
  add_syslog_handler(logger)

  # Starting data cleanup daemon
  data_cleaner = None
  if config.has_option('agent', 'data_cleanup_interval') and int(config.get('agent','data_cleanup_interval')) > 0:
    data_cleaner = DataCleaner(config)
    data_cleaner.start()

  perform_prestart_checks(expected_hostname)

  # Starting ping port listener
  try:
    #This acts as a single process machine-wide lock (albeit incomplete, since
    # we still need an extra file to track the Agent PID)
    ping_port_listener = PingPortListener(config)
  except Exception as ex:
    err_message = "Failed to start ping port listener of: " + str(ex)
    logger.error(err_message)
    sys.stderr.write(err_message)
    sys.exit(1)
  ping_port_listener.start()

  update_log_level(config)

  update_open_files_ulimit(config)

  if not config.use_system_proxy_setting():
    logger.info('Agent is configured to ignore system proxy settings')
    reconfigure_urllib2_opener(ignore_system_proxy=True)

  if not OSCheck.get_os_family() == OSConst.WINSRV_FAMILY:
    daemonize()

  #
  # Iterate through the list of server hostnames and connect to the first active server
  #

  active_server = None
  server_hostnames = hostname.server_hostnames(config)

  connected = False
  stopped = False

  # Keep trying to connect to a server or bail out if ambari-agent was stopped
  while not connected and not stopped:
    for server_hostname in server_hostnames:
      server_url = config.get_api_url(server_hostname)
      try:
        server_ip = socket.gethostbyname(server_hostname)
        logger.info('Connecting to Ambari server at %s (%s)', server_url, server_ip)
      except socket.error:
        logger.warn("Unable to determine the IP address of the Ambari server '%s'", server_hostname)

      # Wait until MAX_RETRIES to see if server is reachable
      netutil = NetUtil(config, heartbeat_stop_callback)
      (retries, connected, stopped) = netutil.try_to_connect(server_url, MAX_RETRIES, logger)

      # if connected, launch controller
      if connected:
        logger.info('Connected to Ambari server %s', server_hostname)
        # Set the active server
        active_server = server_hostname
        # Launch Controller communication
        run_threads(server_hostname, heartbeat_stop_callback)

      #
      # If Ambari Agent connected to the server or
      # Ambari Agent was stopped using stop event
      # Clean up if not Windows OS
      #
      if connected or stopped:
        ExitHelper().exit(0)
        logger.info("finished")
        break
    pass # for server_hostname in server_hostnames
  pass # while not (connected or stopped)

  return active_server

if __name__ == "__main__":
  is_logger_setup = False
  try:
    heartbeat_stop_callback = bind_signal_handlers(agentPid)
  
    main(heartbeat_stop_callback)
  except SystemExit:
    raise
  except BaseException:
    if is_logger_setup:
      logger.exception("Exiting with exception:")
    raise
