blob: c01a54b7534a6eab2f352e3c73a739efecdf0b6e [file] [log] [blame]
#!/usr/bin/env python
# -----------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# -----------------------------------------------------------------------
import os
import sys
from time import time
import getopt
import signal
from ducc_util import DuccUtil
from properties import Properties
from local_hooks import verify_slave_node
from local_hooks import verify_master_node
#from ducc_util import ThreadWorker
from ducc_util import ThreadPool
class CheckDucc(DuccUtil):
def __init__(self):
DuccUtil.__init__(self)
self.badnodes = []
def validate(self, checkdate):
verify_slave_node(checkdate, self.ducc_properties)
self.check_clock_skew(checkdate)
self.verify_jvm()
self.verify_limits()
(viable, elevated, safe) = self.verify_duccling()
self.duccling_ok(viable, elevated, safe)
if ( not safe or not viable ):
print 'NOTOK ducc_ling is not installed correctly.'
return
def verify_database(self):
if ( self.db_bypass == True ):
return True
ret = self.db_alive(1)
if ( ret ):
print 'The database is running'
else:
print 'The database is not running'
def verify_activemq(self):
if ( self.is_amq_active() ):
print 'ActiveMQ is found listening at', self.broker_protocol + "://" + self.broker_host + ':' + self.broker_port
return True
return False
def check_node(self, args):
messages = []
spacer = ' '
node = args[0]
messages.append((' '))
messages.append(('Checking', node, '...'))
response = self.find_ducc_process(node) # a tuple, (True|False, proclist)
if ( not response[0] ):
messages.append((spacer, "No response."))
return messages
proclist = response[1] # a list of tuples, tuple is (component, pid, user)
if ( len(proclist) > 0 ):
for proc in proclist:
component = proc[0]
pid = proc[1]
found_user = proc[2]
if ( component == 'orchestrator' ):
component = 'or'
process_id = found_user + ' ' + component + '@' + node + ' PID ' + pid
if ( self.kill_signal != None ) :
if ( self.user != found_user ):
messages.append((spacer, "Not killing someone else's process.", process_id))
elif ( component == 'unknown-java' ):
messages.append((spacer, 'Not killing non-ducc process', process_id))
else:
messages.append((spacer, 'Killing (' + self.kill_signal + ')', process_id))
self.kill_process(node, proc, self.kill_signal)
self.pids.delete(pid)
process_changes = True
else:
messages.append((spacer, 'Found', process_id))
full_name = component + '@' + node
if ( component == 'agent' ):
self.pids.put(full_name, pid)
if ( component in self.default_components ):
self.pids.put(full_name, pid)
self.pids.put(component, full_name)
else:
messages.append((spacer, 'no processes found.'))
if ( self.kill_signal == None ):
response = "Node health checks return."
lines = self.ssh(node, True, self.DUCC_HOME + "/admin/check_ducc", "-x", str(int(time())))
while 1:
line = lines.readline()
if ( 'signal' in line ):
response = "Node health did not complete: " + line
self.badnodes.append(node)
# these next two filter junk if 'mesg' is running in a shell rc
if ( 'stdin: is not a tty' in line ):
continue
if ( 'mesg' in line ):
continue
if ( not line ):
break
line = line.strip()
messages.append((spacer, line))
#messages.append((spacer, '[]', line))
messages.append((spacer, response))
return messages
def signalHandler(self, signum, frame):
print "-------- Caught signal", signum, "--------"
if ( len(self.badnodes) != 0 ):
print "Health checks on these nodes did not return:"
for n in self.badnodes:
print n,
print ''
sys.exit(1)
def usage(self, msg):
if ( msg != None ):
print msg
print "Usage:"
print " check_ducc [options]"
print " If no options are given this is the equivalent of:"
print ""
print " check_ducc -n ../resources/ducc.nodes"
print ""
print "Options:"
print " -n --nodelist nodefile"
print " Check for agents on the nodes in nodefile. This option may be specified multiple time"
print " for multiple nodefiles. The 'local' node is always checked"
print ""
print " -c --configuration"
print " Do basic sanity checking on the configuration only. Note that configuration checking is always"
print " performed with most options. The [-c, --configuration] option does ONLY configuration checking."
print ""
print " -k --kill"
print " Force-kill any DUCC process you find on a node (if normal stop_ducc isn't working. This"
print " uses kill -KILL (-9) and only kills processes owned by the invoking user."
print ""
print " -i --int"
print " Force-kill any DUCC process you find on a node (if normal stop_ducc isn't working. This"
print " uses kill -INT (-2) and only kills processes owned by the invoking user."
print ""
print " -q --quit"
print " Force-kill any DUCC process you find on a node (if normal stop_ducc isn't working. This"
print " uses kill -QUIT (-3) and only kills processes owned by the invoking user."
print ""
print " -p --pids"
print " Rewrite the PID file. The PID file is always rewritten if any changes to processes are made. Sometimes"
print " the PID file needs rebuilding. This option causes the file to be rebuilt regardless of"
print " changes."
print ""
print " -x localdate"
print " Validate the local installation, called via ssh usually. The date is the dat on the calling machine."
print ""
print " --nothreading"
print " Disable multithreaded operation if it would otherwise be used"
print ""
print " -v --verbose"
print " If specified, print the validated configuration to the console."
print ""
print " -? prints this message."
sys.exit(1)
def main(self, argv):
try:
opts, args = getopt.getopt(argv, 'cikn:opqx:h?v', ['configuration', 'nodelist=', 'int', 'quit', 'kill', 'pids', 'verbose', 'nothreading', ])
except:
self.usage("Invalid arguments " + ' '.join(argv))
nodefiles = []
self.user = os.environ['LOGNAME']
self.kill_signal = None
redo_pids = False
process_changes = False
do_validate = False
checkdate = 0
config_only = False
verbose = False
for ( o, a ) in opts:
if o in ('-c', '--configuration'):
config_only = True
elif o in ('-n', '--nodelist'):
nodefiles.append(a)
elif o in ('-i', '--int'):
if ( self.kill_signal != None ):
print 'Conflicting kill signals: -INT and', self.kill_signal
return
self.kill_signal = '-INT'
elif o in ('-q', '--quit'):
if ( self.kill_signal != None ):
print 'Conflicting kill signals: -QUIT and', self.kill_signal
return
self.kill_signal = '-QUIT'
elif o in ('-k', '--kill'):
if ( self.kill_signal != None ):
print 'Conflicting kill signals: -KILL and', self.kill_signal
return
self.kill_signal = '-KILL'
elif o in ( '--nothreading' ):
self.disable_threading()
elif o in ('-p', '--pids'):
redo_pids = True
elif o in ('-x'):
# intended to be called recursively from check_ducc, NOT from the command line
do_validate = True
checkdate = float(a)
elif o in ('-v', '--verbose'):
verbose = True
elif o in ('-h', '-?', '--help'):
self.usage(None)
else:
print 'badarg', a
usage('bad arg: ' + a)
if not self.installed():
print "Head node is not initialized. Have you run ducc_post_install?"
return
if ( do_validate ):
# if validating, ONLY validate, called via ssh usually
self.validate(checkdate)
return
# When called directly must be from the head node
self.verify_head()
self.set_duccling_version()
os.system('cat ' + self.DUCC_HOME + '/state/duccling.version')
# not -x option, do this only on local node
env = self.show_ducc_environment()
for e in env:
print e
jvm = self.ducc_properties.get('ducc.jvm')
if ( jvm == None ):
print 'WARN: ducc.jvm is not specified in ducc.properties. Default is simply "java" which may not work on all nodes.'
if ( not verify_master_node(self.ducc_properties) ):
print 'FAIL: Cannot verify master mode'
return
if ( not self.verify_activemq() ):
print 'ActiveMQ broker is not running on', self.broker_protocol + "://" + self.broker_host + ':' + self.broker_port
self.verify_database()
# init the PID file
self.pids = Properties()
self.pids.load_if_exists(self.pid_file)
# read the nodelists
if ( len(nodefiles) == 0 ):
nodefiles = self.default_nodefiles
check_nodepools = True
else:
# if using other than the fully configured set of nodes we can't reliably check nodepools
# because anything other than the full set of nodes may be missing something
check_nodepools = False
nodes = {}
n_nodes = 0
for nf in nodefiles:
n_nodes, nodes = self.read_nodefile(nf, nodes)
#
# add in the local host if needed, and the webserver node
#
localnodes = []
if ( not self.localhost in nodes ):
localnodes.append(self.localhost)
if ( not (self.webserver_node in ['localhost', self.localhost, None]) ):
localnodes.append(self.webserver_node)
if ( len(localnodes) > 0 ):
nodes['local'] = localnodes
self.verify_jvm()
if ( config_only ):
if ( nodefiles != self.default_nodefiles):
print "NOTOK: Config check only works with full, default nodefile:", self.default_nodefiles
return
if self.verify_class_configuration(nodefiles[0], verbose):
print "OK: Class configuration checked"
else:
print "NOTOK: Errors in class or node configuration."
return
# checking starts here
print "Checking", n_nodes, "nodes"
self.threadpool = ThreadPool(n_nodes + 5) # more for the head processes
checked = {}
signal.signal(signal.SIGINT, self.signalHandler)
try:
for (nodefile, nodelist) in nodes.items():
if ( nodelist == None ):
# loading the nodes prints the necessary message
continue
for node in nodelist:
if ( checked.has_key(node) ):
continue
checked[node] = node
self.threadpool.invoke(self.check_node, node)
except:
self.threadpool.quit()
print sys.exc_info()[0], "Exiting."
sys.exit(1)
self.threadpool.quit()
if ( self.kill_signal != None ):
print 'Stopping broker'
self.stop_broker()
print 'Stopping database'
self.db_stop()
if ( len(self.pids) == 0):
if ( os.path.exists(self.pid_file) ):
os.remove(self.pid_file)
elif (process_changes or redo_pids):
self.pids.write(self.pid_file)
if __name__ == "__main__":
checker = CheckDucc()
checker.main(sys.argv[1:])