blob: f68db04f54b1576cef3816835926fb26b87c95a4 [file] [log] [blame]
#!/usr/bin/env python
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import logging
import logging.handlers
import signal
from optparse import OptionParser
import sys
import traceback
import os
import time
import platform
import ConfigParser
import ProcessHelper
import errno
import posixpath
from Controller import Controller
from AgentConfig import AgentConfig
from NetUtil import NetUtil
from Registry import Registry
import Constants
logger = logging.getLogger()
IS_WINDOWS = platform.system() == "Windows"
formatstr = "%(levelname)s %(asctime)s %(filename)s:%(lineno)d - %(message)s"
agentPid = os.getpid()
configFileRelPath = "infra/conf/agent.ini"
logFileName = "agent.log"
SERVER_STATUS_URL="https://{0}:{1}{2}"
def signal_handler(signum, frame):
#we want the handler to run only for the agent process and not
#for the children (e.g. namenode, etc.)
if os.getpid() != agentPid:
os._exit(0)
logger.info('signal received, exiting.')
ProcessHelper.stopAgent()
def debug(sig, frame):
"""Interrupt running process, and provide a python prompt for
interactive debugging."""
d = {'_frame': frame} # Allow access to frame object.
d.update(frame.f_globals) # Unless shadowed by global
d.update(frame.f_locals)
message = "Signal received : entering python shell.\nTraceback:\n"
message += ''.join(traceback.format_stack(frame))
logger.info(message)
def setup_logging(verbose, logfile):
formatter = logging.Formatter(formatstr)
rotateLog = logging.handlers.RotatingFileHandler(logfile, "a", 10000000, 25)
rotateLog.setFormatter(formatter)
logger.addHandler(rotateLog)
if verbose:
logging.basicConfig(format=formatstr, level=logging.DEBUG, filename=logfile)
logger.setLevel(logging.DEBUG)
logger.info("loglevel=logging.DEBUG")
else:
logging.basicConfig(format=formatstr, level=logging.INFO, filename=logfile)
logger.setLevel(logging.INFO)
logger.info("loglevel=logging.INFO")
def update_log_level(config, logfile):
# Setting loglevel based on config file
try:
loglevel = config.get('agent', 'log_level')
if loglevel is not None:
if loglevel == 'DEBUG':
logging.basicConfig(format=formatstr, level=logging.DEBUG, filename=logfile)
logger.setLevel(logging.DEBUG)
logger.info("Newloglevel=logging.DEBUG")
else:
logging.basicConfig(format=formatstr, level=logging.INFO, filename=logfile)
logger.setLevel(logging.INFO)
logger.debug("Newloglevel=logging.INFO")
except Exception, err:
logger.info("Default loglevel=DEBUG")
def bind_signal_handlers():
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if platform.system() != "Windows":
signal.signal(signal.SIGUSR1, debug)
def update_config_from_file(agentConfig):
try:
configFile = posixpath.join(agentConfig.getWorkRootPath(), configFileRelPath)
if os.path.exists(configFile):
agentConfig.setConfig(configFile)
else:
logger.warn("No config found, using default")
except Exception, err:
logger.warn(err)
def perform_prestart_checks(config):
if os.path.isfile(ProcessHelper.pidfile):
print("%s already exists, deleting" % ProcessHelper.pidfile)
os.remove(ProcessHelper.pidfile)
# check if the key folders exist
elif not os.path.isdir(config.getResolvedPath(AgentConfig.APP_PACKAGE_DIR)):
msg = "Package dir %s does not exists, can't continue" \
% config.getResolvedPath(AgentConfig.APP_PACKAGE_DIR)
logger.error(msg)
print(msg)
sys.exit(1)
def ensure_folder_layout(config):
ensure_path_exists(config.getResolvedPath(AgentConfig.APP_INSTALL_DIR))
ensure_path_exists(config.getResolvedPath(AgentConfig.APP_LOG_DIR))
ensure_path_exists(config.getResolvedPath(AgentConfig.APP_RUN_DIR))
ensure_path_exists(config.getResolvedPath(AgentConfig.APP_TASK_DIR))
ensure_path_exists(config.getResolvedPath(AgentConfig.LOG_DIR))
ensure_path_exists(config.getResolvedPath(AgentConfig.RUN_DIR))
def ensure_path_exists(path):
try:
os.makedirs(os.path.realpath(path))
except OSError as exception:
if exception.errno != errno.EEXIST:
raise
pass
pass
def write_pid():
# agent only dumps self pid to file
pid = str(os.getpid())
file(ProcessHelper.pidfile, 'w').write(pid)
def stop_agent():
# stop existing Slider agent
pid = -1
try:
f = open(ProcessHelper.pidfile, 'r')
pid = f.read()
pid = int(pid)
f.close()
os.kill(pid, signal.SIGTERM)
time.sleep(5)
if os.path.exists(ProcessHelper.pidfile):
raise Exception("PID file still exists.")
except Exception, err:
if pid == -1:
print ("Agent process is not running")
else:
os.kill(pid, signal.SIGKILL)
os._exit(1)
def main():
parser = OptionParser()
parser.add_option("-v", "--verbose", dest="verbose", help="verbose log output", default=False)
parser.add_option("-l", "--label", dest="label", help="label of the agent", default=None)
parser.add_option("--zk-quorum", dest=Constants.ZK_QUORUM, help="Zookeeper Quorum", default=None)
parser.add_option("--zk-reg-path", dest=Constants.ZK_REG_PATH, help="Zookeeper Registry Path", default=None)
parser.add_option("--debug", dest="debug", help="Agent debug hint", default="")
(options, args) = parser.parse_args()
if not 'AGENT_WORK_ROOT' in os.environ:
parser.error("AGENT_WORK_ROOT environment variable must be set.")
options.root_folder = os.environ['AGENT_WORK_ROOT']
if not 'AGENT_LOG_ROOT' in os.environ:
parser.error("AGENT_LOG_ROOT environment variable must be set.")
options.log_folder = os.environ['AGENT_LOG_ROOT']
all_log_folders = [x.strip() for x in options.log_folder.split(',')]
if len(all_log_folders) > 1:
options.log_folder = all_log_folders[0]
# If there are multiple log folder, separate by comma, pick one
if not options.label:
parser.error("label is required.");
if not IS_WINDOWS:
bind_signal_handlers()
# Check for configuration file.
agentConfig = AgentConfig(options.root_folder, options.log_folder, options.label)
update_config_from_file(agentConfig)
# update configurations if needed
if options.zk_quorum:
agentConfig.set(AgentConfig.SERVER_SECTION, Constants.ZK_QUORUM, options.zk_quorum)
if options.zk_reg_path:
agentConfig.set(AgentConfig.SERVER_SECTION, Constants.ZK_REG_PATH, options.zk_reg_path)
if options.debug:
agentConfig.set(AgentConfig.AGENT_SECTION, AgentConfig.APP_DBG_CMD, options.debug)
# Extract the AM hostname and secured port from ZK registry
registry = Registry(options.zk_quorum, options.zk_reg_path)
amHost, amSecuredPort = registry.readAMHostPort()
if amHost:
agentConfig.set(AgentConfig.SERVER_SECTION, "hostname", amHost)
if amSecuredPort:
agentConfig.set(AgentConfig.SERVER_SECTION, "secured_port", amSecuredPort)
# set the security directory to a subdirectory of the run dir
secDir = posixpath.join(agentConfig.getResolvedPath(AgentConfig.RUN_DIR), "security")
logger.info("Security/Keys directory: " + secDir)
agentConfig.set(AgentConfig.SECURITY_SECTION, "keysdir", secDir)
logFile = posixpath.join(agentConfig.getResolvedPath(AgentConfig.LOG_DIR), logFileName)
perform_prestart_checks(agentConfig)
ensure_folder_layout(agentConfig)
# create security dir if necessary
ensure_path_exists(secDir)
setup_logging(options.verbose, logFile)
update_log_level(agentConfig, logFile)
write_pid()
logger.info("Using AGENT_WORK_ROOT = " + options.root_folder)
logger.info("Using AGENT_LOG_ROOT = " + options.log_folder)
if len(all_log_folders) > 1:
logger.info("Selected log folder from available: " + ",".join(all_log_folders))
server_url = SERVER_STATUS_URL.format(
agentConfig.get(AgentConfig.SERVER_SECTION, 'hostname'),
agentConfig.get(AgentConfig.SERVER_SECTION, 'secured_port'),
agentConfig.get(AgentConfig.SERVER_SECTION, 'check_path'))
print("Connecting to the server at " + server_url + "...")
logger.info('Connecting to the server at: ' + server_url)
# Wait until server is reachable
netutil = NetUtil()
netutil.try_to_connect(server_url, -1, logger)
# Launch Controller communication
controller = Controller(agentConfig)
controller.start()
try:
while controller.is_alive():
controller.join(timeout=1.0)
except (KeyboardInterrupt, SystemExit):
logger.info("... agent interrupted")
pass
if __name__ == "__main__":
main()