blob: 8db4f618c6f31fc65ea72e636811523bea3a24dc [file] [log] [blame]
#!/usr/bin/env python
# -----------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# -----------------------------------------------------------------------
import sys
version_min = [2, 7]
version_info = sys.version_info
version_error = False
if(version_info[0] < version_min[0]):
version_error = True
elif(version_info[0] == version_min[0]):
if(version_info[1] < version_min[1]):
version_error = True
if(version_error):
print('Python minimum requirement is version '+str(version_min[0])+'.'+str(version_min[1]))
sys.exit(1)
import argparse
import datetime
import logging
import os
import shlex
import socket
import subprocess
import time
import traceback
from ducc_util import DuccUtil
# produce a time stamp
def get_timestamp():
tod = time.time()
timestamp = datetime.datetime.fromtimestamp(tod).strftime('%Y-%m-%d %H:%M:%S')
return timestamp
# embedded class to log to file
# default level is info level,
# controlled by environment variable LOGLEVEL = { info, debug, trace }
class Logger():
flag_info = True
flag_debug = False
flag_trace = False
flag_error = True
flag_warn = True
def __init__(self,filename,level='info'):
self.filename = filename
if(level == 'debug'):
self.flag_debug = True
elif(level == 'trace'):
self.flag_debug = True
self.flag_trace = True
# write to file
def output(self,type,mn,text):
message = get_timestamp()+' '+type+' '+mn+' '+text
with open(self.filename, 'a') as logfile:
logfile.write(message+'\n')
# record info message
def info(self,mn,text):
if(self.flag_info):
self.output('I',mn,text)
# record debug message
def debug(self,mn,text):
if(self.flag_debug):
self.output('D',mn,text)
# record trace message
def trace(self,mn,text):
if(self.flag_trace):
self.output('T',mn,text)
# record error message
def error(self,mn,text):
if(self.flag_error):
self.output('E',mn,text)
# record warn message
def warn(self,mn,text):
if(self.flag_warn):
self.output('W',mn,text)
# class to automagically start DUCC daemons
class AutoStart(DuccUtil):
components = [ 'agent', 'broker', 'orchestrator', 'pm', 'rm', 'sm', 'ws' ]
map = { 'ag':'agent',
'br':'broker',
'or':'or',
'pm':'pm',
'rm':'rm',
'sm':'sm',
'ws':'ws',
}
def __init__(self):
DuccUtil.__init__(self)
self.ssh_enabled = False
# return file name
def _fn(self):
fpath = __file__.split('/')
flen = len(fpath)
return fpath[flen-1]
# return class name
def _cn(self):
return self.__class__.__name__
# return method name
def _mn(self):
return traceback.extract_stack(None,2)[0][2]
description = 'Start daemon(s) on the present node when listed in the autostart database table but not already running.'
def get_args(self):
parser = argparse.ArgumentParser(description=self.description)
self.args = parser.parse_args()
# setup logging to file for autostart script
def setup_logging(self,NAME):
LOGDIR = self.DUCC_HOME+'/logs/'+NAME
self.makedirs(LOGDIR)
self.LOCAL_HOST = socket.getfqdn()
FN = self.LOCAL_HOST.split('.')[0]+'.'+NAME+'.log'
LOGFILE = LOGDIR+'/'+FN
LOGLEVEL = os.environ.get('LOGLEVEL','info')
self.logger = Logger(LOGFILE,LOGLEVEL)
# check if host names with domain match
def is_host_match_with_domain(self,h1,h2):
retVal = False
if(h1 == h2):
retVal = True
text = str(h1)+' '+str(h2)+' '+str(retVal)
self.logger.debug(self._mn(),text)
return retVal
# check if host names without domain match
def is_host_match_without_domain(self,h1,h2):
retVal = False
h10 = h1.split('.')[0]
h20 = h2.split('.')[0]
if(h10 == h20):
retVal = True
text = str(h10)+' '+str(h20)+' '+str(retVal)
self.logger.debug(self._mn(),text)
return retVal
#check if host names match with/without domain
def is_host_match(self,h1,h2):
retVal = False
if(h1 != None):
if(h2 != None):
if(self.is_host_match_with_domain(h1,h2)):
retVal = True
elif(self.is_host_match_without_domain(h1,h2)):
retVal = True
text = str(h1)+' '+str(h2)+' '+str(retVal)
self.logger.debug(self._mn(),text)
return retVal
def parse_line(self,line):
retVal = '', '', ''
try:
if(line != None):
text = line
self.logger.debug(self._mn(),text)
tokens = line.split()
if(len(tokens) == 1):
host = line.split('.')[0]
remainder = line.split('.')[1]
name = remainder.split('=')[0]
state = remainder.split('=')[1]
retVal = host, name, state
except:
pass
return retVal
# get daemons started/all in DB for host
def get_daemons_host(self):
db = []
started = []
jclass = 'org.apache.uima.ducc.database.lifetime.DbDaemonLifetimeUI'
option = '--query'
cmd = [self.jvm, '-DDUCC_HOME='+self.DUCC_HOME, jclass, option]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
lines = out.split('\n')
for line in lines:
host, daemon, state = self.parse_line(line)
if(self.is_host_match(self.LOCAL_HOST, host)):
db.append(daemon)
if(state == 'Start'):
started.append(daemon)
text = 'add'+' '+host+' '+daemon
self.logger.debug(self._mn(),text)
else:
text = 'skip'+' '+host+' '+daemon
self.logger.debug(self._mn(),text)
else:
text = 'skip'+' '+host+' '+daemon
self.logger.debug(self._mn(),text)
text = 'started'+' '+str(started)+' '+'db'+' '+str(db)
self.logger.debug(self._mn(),text)
return db, started
def normalize_component(self,component):
daemon = component[:2]
return daemon
# get daemons running (from system)
def get_components_running(self):
daemons = []
result = self.find_ducc_process(self.LOCAL_HOST)
find_status = result[0]
text = 'find_status:'+str(find_status)
self.logger.debug(self._mn(),text)
tuples = result[1]
text = 'tuples:'+str(tuples)
self.logger.debug(self._mn(),text)
for tuple in tuples:
component = tuple[0]
pid = tuple[1]
user = tuple[2]
if(user == self.ducc_uid):
if(component in self.components):
text = 'keep:'+str(tuple)
self.logger.debug(self._mn(),text)
daemon = self.normalize_component(component)
daemons.append(daemon)
else:
text = 'skip:'+str(tuple)
self.logger.debug(self._mn(),text)
else:
text = 'skip:'+str(tuple)+' '+str(self.ducc_uid)
self.logger.debug(self._mn(),text)
text = 'daemons:'+str(daemons)
self.logger.debug(self._mn(),text)
return daemons
def start(self,daemon):
component = self.map[daemon]
text = str(component)
self.logger.warn(self._mn(),text)
if(daemon == 'ag'):
python_script = os.path.join(self.DUCC_HOME,'admin','ducc.py')
cmd = [ python_script, '-c', component, '-b', '-d', str(time.time()), '--nodup' ]
else:
python_script = os.path.join(self.DUCC_HOME,'admin','start_ducc',)
cmd = [ python_script, '-c', component ]
text = str(cmd)
self.logger.info(self._mn(),text)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
text = str(out)
self.logger.info(self._mn(),text)
def insert(self):
python_script = os.path.join(self.DUCC_HOME,'admin','db_autostart_insert.py')
node = self.localhost
component = 'ag'
cmd = [ python_script, '--host', node, '--name', component]
text = str(cmd)
self.logger.info(self._mn(),text)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
text = str(out)
self.logger.info(self._mn(),text)
# autostart: start head or agent daemons, as required
def main(self, argv):
NAME = 'autostart'
self.setup_logging(NAME)
self.get_args()
try:
daemons_db, daemons_started = self.get_daemons_host()
text = 'daemons_db '+str(len(daemons_db))
self.logger.debug(self._mn(),text)
text = 'daemons_started '+str(len(daemons_started))
self.logger.debug(self._mn(),text)
# if agent node is not in db, then insert it!
if(len(daemons_db) == 0):
if(self.is_head_node()):
pass
else:
self.insert()
daemons_db, daemons_started = self.get_daemons_host()
components_running = self.get_components_running()
text = 'components_running '+str(components_running)
self.logger.debug(self._mn(),text)
for daemon in daemons_started:
component = self.normalize_component(daemon)
text = 'component '+str(component)
self.logger.debug(self._mn(),text)
if(component in components_running):
pass
else:
text = 'start daemon '+str(daemon)
self.logger.debug(self._mn(),text)
self.start(daemon)
except Exception,e:
lines = traceback.format_exc().splitlines()
for line in lines:
text = line
self.logger.debug(self._mn(),text)
if __name__ == '__main__':
instance = AutoStart()
instance.main(sys.argv[1:])