blob: f521ef0afcf5cccfbba21dfb49349874161a9c28 [file] [log] [blame]
#!/usr/bin/env python
# -----------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# -----------------------------------------------------------------------
features = [ 'stop_ducc from head node only',
'support --head which stops non-agent daemons only on local head',
'support --agents which stops on any stated node',
'--help explains that stop_ducc disables autostart',
'employ broadcast for --stop and --quiesce of ducc daemons',
'employ kill -15 for --stop of broker and database',
'show ssh before and after for kill -15',
]
import sys
version_min = [2, 6]
version_info = sys.version_info
version_error = False
if(version_info[0] < version_min[0]):
version_error = True
elif(version_info[0] == version_min[0]):
if(version_info[1] < version_min[1]):
version_error = True
if(version_error):
print('Python minimum requirement is version '+str(version_min[0])+'.'+str(version_min[1]))
sys.exit(1)
import argparse
import datetime
import textwrap
import time
import traceback
from threading import BoundedSemaphore
from threading import Lock
from threading import Thread
from argparse import RawDescriptionHelpFormatter
from ducc_util import DuccUtil
# lock for messages
lock_print = Lock()
# print message
def output(msg):
with lock_print:
print msg
# produce a time stamp
def get_timestamp():
tod = time.time()
timestamp = datetime.datetime.fromtimestamp(tod).strftime('%Y-%m-%d %H:%M:%S')
return timestamp
_flag_debug = False
# record debug message
def debug(mn,text):
if(_flag_debug):
type ='D'
msg = get_timestamp()+' '+type+' '+mn+' '+text
output(msg)
class StopDucc(DuccUtil):
def _fn(self):
fpath = __file__.split('/')
flen = len(fpath)
return fpath[flen-1]
# return class name
def _cn(self):
return self.__class__.__name__
# return method name
def _mn(self):
return traceback.extract_stack(None,2)[0][2]
c_agent = 'agent'
c_ag = 'ag'
c_broker = 'broker'
c_br = 'br'
c_database = 'database'
c_db = 'db'
c_orchestrator = 'orchestrator'
c_or = 'or'
c_pm = 'pm'
c_rm = 'rm'
c_sm = 'sm'
c_ws = 'ws'
n_ag = 'ag'
n_br = 'br'
n_db = 'db'
n_or = 'or'
n_pm = 'pm'
n_rm = 'rm'
n_sm = 'sm'
n_ws = 'ws'
components = [ c_agent, c_pm, c_rm, c_sm, c_or, c_ws, c_broker, c_database, ]
shortname = { c_agent:n_ag,
c_broker:n_br,
c_database:n_db,
c_or:n_or,
c_pm:n_pm,
c_rm:n_rm,
c_sm:n_sm,
c_ws:n_ws,
c_ag:n_ag,
c_br:n_br,
c_db:n_db,
c_orchestrator:n_or,
}
longname = { n_ag:c_agent,
n_br:c_broker,
n_db:c_database,
n_or:c_orchestrator,
n_pm:c_pm,
n_rm:c_rm,
n_sm:c_sm,
n_ws:c_ws,
}
option_agents = '--agents'
option_all = '--all'
option_component = '--component'
option_debug = '--debug'
option_head = '--head'
option_kill = '--kill'
option_maxthreads = '--maxthreads'
option_nodelist = '--nodelist'
option_stop = '--stop'
option_quiesce = '--quiesce-then-stop'
cmd_kill_9 = 'kill -9'
cmd_kill_15 = 'kill -15'
cmd_ssh = 'ssh'
cmd_start_ducc = 'start_ducc'
kw_DUCC = 'DUCC'
maxthreads = 10
default_stop = 60
sig15 = 15
sig9 = 9
def _exit(self):
sys.exit(1)
def _help(self):
self.parser.print_help()
self._exit
def get_epilog(self):
epilog = ''
epilog = epilog+'Notes:'
epilog = epilog+'\n'
epilog = epilog+'N1. '+self._fn()+' '+'is limited to running on a head node.'
epilog = epilog+'\n'
epilog = epilog+'N2. '+self._fn()+' '+'updates database autostart table with "stop" status.'
epilog = epilog+'\n'
epilog = epilog+'N3. '+self._fn()+' '+self.option_kill+' option employs '+self.cmd_ssh+' with '+self.cmd_kill_9+'.'
epilog = epilog+'\n'
epilog = epilog+'N4. '+self._fn()+' '+self.option_stop+' and '+self.option_quiesce+' options employ broadcast via broker.'\
+'\n'\
+' '+'The broker and database are exceptions, whereby '+self.cmd_ssh+' with '+self.cmd_kill_15+' is employed.'
epilog = epilog+'\n\n'
epilog = epilog+'Examples:'
epilog = epilog+'\n\n'
epilog = epilog+'E1. kill all daemons that were started, as recorded in the database autostart table'
epilog = epilog+'\n'
epilog = epilog+'> '+self._fn()+' '+self.option_all+' '+self.option_kill
epilog = epilog+'\n\n'
epilog = epilog+'E2. stop all head node daemons on the present node'
epilog = epilog+'\n'
epilog = epilog+'> '+self._fn()+' '+self.option_head+' '+self.option_stop
epilog = epilog+'\n\n'
epilog = epilog+'E3. stop all agents via broadcast, each will issue '+self.cmd_kill_15+' to children'\
+'\n'\
+' '+'then exit after a maximum of '+str(self.default_stop)+' seconds, by default'
epilog = epilog+'\n'
epilog = epilog+'> '+self._fn()+' '+self.option_agents+' '+self.option_stop
epilog = epilog+'\n\n'
epilog = epilog+'E4. quiesce all agents, each will issue '+self.cmd_kill_15+' to children then exit only'\
+'\n'\
+' '+'after all children have exited'
epilog = epilog+'\n'
epilog = epilog+'> '+self._fn()+' '+self.option_agents+' '+self.option_quiesce
epilog = epilog+'\n\n'
epilog = epilog+'E5. quiesce agents listed in groupA.nodes and groupB.nodes, each will issue '+self.cmd_kill_15\
+'\n'\
+' '+'to children then exit only after all children have exited'
epilog = epilog+'\n'
epilog = epilog+'> '+self._fn()+' '+self.option_nodelist+' groupA.nodes '+self.option_nodelist+' groupB.nodes '+self.option_quiesce
epilog = epilog+'\n\n'
epilog = epilog+'E6. stop agents on nodes nodeC8 and nodeD5, each will issue '+self.cmd_kill_15\
+'\n'\
+' '+'to children then exit after a maximum of '+str(90)+' seconds.'
epilog = epilog+'\n'
epilog = epilog+'> '+self._fn()+' '+self.option_component+' '+self.c_agent+'@nodeC8 '+self.option_component+' '+self.c_agent+'@nodeD5 '+self.option_stop+' '+str(90)
epilog = epilog+'\n\n'
epilog = epilog+'E7. stop orchestrator'
epilog = epilog+'\n'
epilog = epilog+'> '+self._fn()+' '+self.option_component+' '+self.c_or+' '+self.option_stop
epilog = epilog+'\n\n'
epilog = epilog+'E8. kill orchestrator on alternate head node nodeX3'
epilog = epilog+'\n'
epilog = epilog+'> '+self._fn()+' '+self.option_component+' '+self.c_or+'@nodeX3'+' '+self.option_kill
return epilog
help_all = 'Stop all DUCC management and agent processes by using database entries recorded by start_ducc.'
help_head = 'Stop the DUCC management processes on the present head node by using database entries recorded by start_ducc.'
help_agents = 'Stop the DUCC agents processes on all nodes by using database entries recorded by '+cmd_start_ducc+'.'
help_nodelist = 'Stop agents on the nodes in the nodefile. Multiple nodefiles may be specified.'
help_component = 'Stop a specific component. The component may be qualified with the node name using the @ symbol: component@node.'\
+ ' If node is not specified, then the localhost is presumed. Multiple components may be specified. components = '+str(components)+'.'\
+ ' Specification of a head node component other than on the present head node is disallowed unless '+option_kill+' option is also specified.'\
+ ' Specification of broker or database is disallowed unless that component is automanaged by '+kw_DUCC+'.'
help_kill = 'Stop the component(s) forcibly and immediately using '+cmd_ssh+' with '+cmd_kill_9+'. Use this only if a normal stop does not work (e.g. the process may be hung).'
help_stop = 'Stop the component(s) gracefully using broadcast. Agents allow children specified time (in seconds) to exit. Default is '+str(default_stop)+'.'\
+ ' Broadcast is not used for broker, database, and remote head node daemons; instead a direct kill -15 is employed.'
help_quiesce = 'Stop the component(s) gracefully using broadcast. Agents exit only when no children exist. Children are given infinite time to exit.'
help_maxthreads = 'Maximum concurrent threads. Default = '+str(maxthreads)+'.'
help_debug = 'Display debugging messages.'
# get user specified command line args
def get_args(self):
self.parser = argparse.ArgumentParser(formatter_class=RawDescriptionHelpFormatter,epilog=self.get_epilog())
group1 = self.parser.add_mutually_exclusive_group(required=True)
group1.add_argument(self.option_all, '-a', action='store_true', help=self.help_all)
group1.add_argument(self.option_head, action='store_true', help=self.help_head)
group1.add_argument(self.option_agents, action='store_true', help=self.help_agents)
group1.add_argument(self.option_nodelist, '-n', action='append', help=self.help_nodelist)
group1.add_argument(self.option_component, '-c', action='append', help=self.help_component)
group2 = self.parser.add_mutually_exclusive_group()
group2.add_argument(self.option_kill, '-k', action='store_true', help=self.help_kill)
group2.add_argument(self.option_stop, '-s', action='store', type=int, nargs='?', const=self.default_stop, help=self.help_stop)
group2.add_argument(self.option_quiesce, '-q', action='store_true', help=self.help_quiesce)
self.parser.add_argument(self.option_maxthreads, '-m', action='store', type=int, default=None, help=self.help_maxthreads)
self.parser.add_argument(self.option_debug, '-d', action='store_true', help=self.help_debug)
self.args = self.parser.parse_args()
# mutual choice
if(not self.args.kill):
if(not self.args.quiesce_then_stop):
if(self.args.stop == None):
self.args.stop = self.default_stop
# special cases
if(self.args.kill):
if(self.args.maxthreads == None):
self.args.maxthreads = self.maxthreads
elif(self.args.stop):
if(self.args.maxthreads == None):
self.args.maxthreads = 2
elif(self.args.quiesce_then_stop):
if(self.args.maxthreads == None):
self.args.maxthreads = 2
elif(self.args.maxthreads != None):
self.parser.error(self.option_maxthreads+' requires '+self.option_kill)
# debug
if(self.args.debug):
global _flag_debug
_flag_debug = True
text = str(self.args)
debug(self._mn(),text)
db_list = None
# fetch and cache list of tuples comprising
# daemon@node from database autostart table
def get_db_list(self):
if(self.db_list == None):
self.db_list = self.db_acct_query()
text = 'list='+str(self.db_list)
debug(self._mn(),text)
return self.db_list
# --all
def all(self):
text = str(self.args.all)
debug(self._mn(),text)
# get list of tuples from DB:
# [ host, component, state ]
list = self.get_db_list()
return list
# --head
def head(self):
text = str(self.args.head)
debug(self._mn(),text)
# get list of tuples from DB: [ host, component, state ]
# exclude non-automanged br & db & exclude db if not the active/master node
db_list = self.get_db_list()
list = []
this_node = self.localhost
for item in db_list:
node = item[0]
component = item[1]
if(component == self.n_ag):
continue
if(component == self.n_db):
if(not self.automanage_database):
continue
if (not self.is_active()):
continue
if(component == self.n_br):
if(not self.automanage_broker):
continue
if(node != this_node):
continue
list.append(item)
return list
# --agents
def agents(self):
text = str(self.args.agents)
debug(self._mn(),text)
# get list of tuples from DB:
# [ host, component, state ]
db_list = self.get_db_list()
list = []
for item in db_list:
node = item[0]
component = item[1]
if(component == self.n_ag):
list.append(item)
text = 'add: '+'node:'+node+' '+'component:'+component
debug(self._mn(),text)
else:
text = 'skip: '+'node:'+node+' '+'component:'+component
debug(self._mn(),text)
return list
# --nodelist
def nodelist(self):
text = str(self.args.nodelist)
debug(self._mn(),text)
component = 'ag'
state = ''
list = []
map = {}
# fetch map where key is nodefile filename
# and value is list of nodes
for nodefile in self.args.nodelist:
nodes, map = self.read_nodefile(nodefile,map)
if(nodes < 0):
self._exit()
# create list of tuples from nodelist file(s):
# [ host, component, state ]
for key in map:
nodes = map[key]
for node in nodes:
entry = [ node, component, state ]
list.append(entry)
return list
# --component
def complist(self):
text = str(self.args.component)
debug(self._mn(),text)
list = []
# validate components specified on cmdline
for c in self.args.component:
parts = c.split('@')
if len(parts) == 1:
dn = self.localhost
dc = parts[0]
elif len(parts) == 2:
dn = parts[1]
dc = parts[0]
if(dc.startswith('all')):
msg = 'node specification disallowed for: '+dc
output(msg)
self._exit()
else:
msg = 'invalid component syntax: '+c
output(msg)
self._exit()
# Convert to the internal shortname if valid
if(dc in self.shortname):
component = self.shortname[dc]
else:
msg = 'invalid component: '+c
output(msg)
self._exit()
text = dc+'.'+dn
debug(self._mn(),text)
node = dn
entry = [ node, component, '' ]
list.append(entry)
return list
# disallow br/db unless automanaged
def validate_automanage(self,component):
if(component == 'br'):
if(not self.automanage_broker):
msg = 'component='+component+' '+'not automanaged.'
output(msg)
self._exit()
elif(component == 'db'):
if(not self.automanage_database):
msg = 'component='+component+' '+'not automanaged.'
output(msg)
self._exit()
# disallow unless in db
def validate_db(self,node,component):
list = self.get_db_list()
for item in list:
db_node = item[0]
db_component = item[1]
if(db_node == node):
if(db_component == component):
return
msg = 'node='+node+' '+'component='+component+' not found in database autostart table'
output(msg)
self._exit()
# validate user specified list
def validate_list(self,list):
is_backup = not self.is_active()
for item in list:
node = item[0]
component = item[1]
self.validate_automanage(component)
self.validate_db(node, component)
if (component == 'ag' and is_backup):
msg = 'cannot stop agents from the backup node.'
output(msg)
self._exit()
if (component == 'db' and is_backup):
msg = 'cannot stop database from the backup node.'
output(msg)
self._exit()
# in: tuples of (component,pid,user) and a desired component
# out: the pid of the desired component, if found
def find_pid(self,tuples,component):
pid = None
for tuple in tuples:
t_component = tuple[0]
t_pid = tuple[1]
t_user = tuple[2]
if(t_user == self.ducc_uid):
if(self.shortname.has_key(t_component)):
t_comp = self.shortname[t_component]
if(t_comp == component):
pid = t_pid
break
return pid
# Note - If this is called after submitting the stop request the autostart cron job may see
# that the daemon is down before the DB is updated and restart it!
def acct_stop(self,node,component):
print 'db_acct: Marking '+component+'@'+node+' as Stopped'
self.db_acct_stop(node,component)
# target=kill
def kill(self,count,tid,node,component,signal):
verbosity=False
ssh = self.ssh_operational(node,verbosity)
state = 'state=pending'
pfx = 'kill'+' '+'daemon='+str(count)+' '+'thread='+str(tid)+' '+'node='+node+' '+'component='+component+' '
msg = pfx+state
output(msg)
process=''
if(ssh):
state='state=success'
status, tuples = self.find_ducc_process(node)
if(status):
pid = self.find_pid(tuples,component)
if(pid == None):
state='state=component not found'
else:
self.acct_stop(node,component)
self.ssh(node, True, 'kill', '-'+str(signal), str(pid))
process='pid='+str(pid)+' '
else:
state='state=find DUCC process failed'
else:
state = 'state=ssh failure'
msg = pfx+process+state
output(msg)
self.put_tid(tid)
self.pool.release()
# launch threads to perform kills
def kill_threads(self,list,signal):
size = len(list)
msg = 'daemons='+str(len(list))
output(msg)
count = 0
for raw_type in self.components:
type = self.shortname[raw_type]
for item in list:
node = item[0]
component = item[1]
if(component == type):
count = count+1
self.pool.acquire()
tid = self.get_tid()
t = Thread(target=self.kill, args=(count,tid,node,component,signal))
t.start()
def filter_remote_head(self,list):
list_remote_head = []
list_remainder = []
this_node = self.localhost
for item in list:
node = item[0]
com = item[1]
if(com == self.c_ag):
list_remainder.append(item)
elif(node == this_node):
list_remainder.append(item)
else:
list_remote_head.append(item)
return list_remote_head, list_remainder
# target=stop
def stop(self,list,qflag):
text = 'list='+str(list)
debug(self._mn(),text)
# get 2 lists
list_remote_head, list = self.filter_remote_head(list)
# stop remote head(s)
if(len(list_remote_head)>0):
signal = self.sig15
self.kill_threads(list_remote_head, signal)
# stop the local daemons
stop_db = False
stop_broker = False
# pass 1 - stop all but broker & database
for item in list:
node = item[0]
com = item[1]
state = item[2]
if (item[2] == 'Stop'):
print 'WARN: '+com+' already recorded as stopped'
# Use the short names for DB recording but the external long names for DuccAdmin
component = self.longname[com]
if(component == self.c_broker):
stop_broker = True
continue
if(component == self.c_database):
stop_db = True
continue
self.acct_stop(node,com)
item = component+'@'+node
if(qflag):
self.ducc_admin('--quiesce',item)
else:
self.ducc_admin('--stop',item)
# Finally stop broker & database if requested
node = self.localhost
if(stop_broker):
print 'Stopping: broker'
self.acct_stop(node,self.c_br)
self.stop_broker()
if(stop_db):
print 'Stopping: database'
self.acct_stop(node,self.c_db)
self.db_stop()
# multi-thread lock to obtain thread id
lock_tid = Lock()
# get thread id
def get_tid(self):
with self.lock_tid:
tid = self.tids.pop(0)
return tid
# return thread id
def put_tid(self,tid):
with self.lock_tid:
self.tids.append(tid)
# initialize for threading
def threads_prep(self):
maxthreads = self.args.maxthreads
self.tids = range(1,maxthreads+1)
self.pool = BoundedSemaphore(value=maxthreads)
# main
def main(self,argv):
if(not self.is_head_node()):
output('Must be run from a head node.')
self._exit()
self.get_args()
self.threads_prep()
# get list of nodes+daemons
if(self.args.all):
list = self.all()
elif(self.args.head):
list = self.head()
elif(self.args.agents):
list = self.agents()
elif(self.args.nodelist != None):
list = self.nodelist()
elif(self.args.component != None):
list = self.complist()
else:
self._help()
text = str(list)
debug(self._mn(),text)
# disallow br & db unless DUCC managed and db & agents if not the master
self.validate_list(list)
# perform action
if(self.args.kill):
signal = self.sig9
self.kill_threads(list,signal)
elif(self.args.stop != None):
self.stop(list,False)
elif(self.args.quiesce_then_stop):
self.stop(list,True)
else:
self._help()
if __name__ == '__main__':
instance = StopDucc()
instance.main(sys.argv[1:])