blob: 2b0550e160798c7509605d351cf24bdc6114ee3f [file] [log] [blame]
#!/usr/bin/env python
# -----------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# -----------------------------------------------------------------------
import os
import sys
import time
import getopt
import glob
from ducc_util import DuccUtil
from properties import *
from ducc import Ducc
class StopDucc(DuccUtil):
def stop_component(self, component, force):
if ( (component == 'broker') and self.automanage_broker ):
print 'Stopping broker'
self.stop_broker()
return
if ( (component == 'db') and self.automanage_database ):
print 'Stopping database'
self.db_stop()
return
#
# If it's an unqualified management component, we need to get it's qualified name
#
if ( component in self.default_components ):
if( component == 'agent' ):
if ( self.pids_agents.has_key(component) ):
component = self.pids_agents.get(component)
else:
print 'Skipping', component, 'not in pids file.'
return
else:
if ( self.pids_daemons.has_key(component) ):
component = self.pids_daemons.get(component)
else:
print 'Skipping', component, 'not in pids file.'
return
#
# If the name is not qualified we've got a problem, everything in the pids file is qualified
#
if ( component.find('@') >= 0 ):
com, target_node = component.split('@')
else:
self.invalid("Must specify hostname when stopping", component)
#
# If despite all that we can't find the pid, we need to run check_ducc
#
if( com == 'agent' ):
if ( not self.pids_agents.has_key(component) ):
print "Cannot find PID for component", component, ". Run check_ducc -p to refresh PIDS and then rerun stop_ducc."
return
else:
pid = self.pids_agents.get(component)
else:
if ( not self.pids_daemons.has_key(component) ):
print "Cannot find PID for component", component, ". Run check_ducc -p to refresh PIDS and then rerun stop_ducc."
return
else:
pid = self.pids_daemons.get(component)
if ( force ):
print 'Stopping component', com, 'on node', target_node, 'with PID', pid, 'forcibly (kill -9)'
self.nohup(['ssh', target_node, 'kill', '-KILL', pid], False)
pass
else:
print 'Stopping component', com, 'on node', target_node, 'with PID', pid
self.nohup(['ssh', target_node, 'kill', '-INT', pid], False)
# clear the short name if it exists, and the long name
if( com == 'agent' ):
self.pids_agents.delete(com)
self.pids_agents.delete(component)
else:
self.pids_daemons.delete(com)
self.pids_daemons.delete(component)
def quiesce_agents(self, components, nodes):
allnodes = []
for ( nf, nl ) in nodes.items():
allnodes = allnodes + nl
for c in components:
if ( c.find('@') >= 0 ):
com, target_node = c.split('@')
allnodes.append(target_node)
else:
self.invalid("Must specify hostname when stopping", component)
qparm = ','.join(allnodes)
print 'Quiescing', qparm
DUCC_JVM_OPTS = ' -Dducc.deploy.configuration=' + self.DUCC_HOME + "/resources/ducc.properties "
DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -DDUCC_HOME=' + self.DUCC_HOME
DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -Dducc.head=' + self.ducc_properties.get('ducc.head')
self.spawn(self.java(), DUCC_JVM_OPTS, 'org.apache.uima.ducc.common.main.DuccAdmin', '--quiesceAgents', qparm)
# NOTE: quiesce does not actually cause agents to terminate so we don't update the PIDs file
return
def stop_agent(self, node, force):
self.stop_component('agent@' + node.strip(), force)
def usage(self, msg):
if ( msg != None ):
print msg
print 'stop_ducc [options]'
print ' If no options are given, this help screen is shown.'
print ''
print ' For reliable DUCC agents will not be stopped from backup head node. '
print ''
print ' Broker will not be stopped when ducc.broker.automanage = false. '
print ' Database will not be stopped when ducc.database.automanage = false. '
print ''
print 'Options:'
print ' -a --all'
print ' Stop all the DUCC processes, including agents and management processes.'
print ''
print ' -n --nodelist nodefile'
print ' Stop agents on the nodes in the nodefile. Multiple nodefiles may be specified:'
print ''
print ' stop_ducc -n foo.nodes -n bar.nodes -n baz.nodes'
print ''
print ' -c --component component'
print ' Stop a specific component. The component may be qualified with the node name'
print ' using the @ symbol: component@node.'
print ''
print ' stop_ducc -c rm@foonode'
print ' stop_ducc -c agent@barnode -c or'
print ''
print ' Components include:'
print ' agent - node agent'
print ' broker - AMQ broker'
print ' db - database'
print ' or - orchestrator'
print ' pm - process manager'
print ' rm - resource manager'
print ' sm - services manager'
print ' ws - web server'
print ' head = { or, pm, rm, sm, ws, db, broker }'
print ''
print ' -w --wait'
print ' Time to wait for everything to come down, in seconds. Default is 60.'
print ''
print ' -k --kill'
print ' Stop the component forcibly and immediately using kill -9. Use this only if a'
print ' normal stop does not work (e.g. the process may be hung).'
print ''
print ' --nothreading'
print ' Disable multithreaded operation if it would otherwise be used'
print ''
sys.exit(1)
def invalid(self, *msg):
if ( msg[0] != None ):
print ' '.join(msg)
print "For usage run"
print " stop_ducc -h"
print 'or'
print ' stop_ducc --help'
sys.exit(1)
def main(self, argv):
self.verify_head()
self.check_properties()
if ( len(argv) == 0 ):
self.usage(None)
components = []
nodefiles = []
do_agents = False
do_components = False
force = False
quiesce = False
all = False
wait_time = 60
try:
opts, args = getopt.getopt(argv, 'ac:n:kn:w:qh?v', ['all', 'component=', 'help', 'nodelist=', 'kill', 'quiesce', 'nothreading', 'wait'])
except:
self.invalid('Invalid arguments ' + ' '.join(argv))
if (len(args) > 0):
self.invalid('Invalid extra args: ', ' '.join(args))
for ( o, a ) in opts:
if o in ('-c', '--component' ):
if (a.strip() == 'head'):
components.append('or')
components.append('pm')
components.append('rm')
components.append('sm')
components.append('ws')
components.append('db')
components.append('broker')
else:
components.append(a)
do_components = True
elif o in ( '-a', '--all' ):
all = True
components = self.default_components
elif o in ( '-n', '--nodelist' ):
nodefiles.append(a)
do_agents = True
elif o in ( '-k', '--kill' ):
force = True
elif o in ( '-q', '--quiesce' ):
quiesce = True
elif o in ( '-w', '--wait' ):
wait_time = int(a)
elif o in ( '--nothreading' ):
self.disable_threading()
elif ( o == '-v' ) :
print self.version()
sys.exit(0)
elif o in ( '-h', '--help' ):
self.usage(None)
elif ( o == '-?'):
self.usage(None)
else:
self.invalid('bad arg: ' + o)
if ( quiesce ):
if ( all ):
self.invalid("May not quiesce 'all'.");
if ( force ):
self.invalid("May not both quiesce and force.");
for c in components:
if ( not c.startswith('agent') ):
self.invalid("Only agents may be quiesced.")
# avoid confusion by insuring that if 'all', then nothing else is specified
if ( all and ( do_components ) ):
self.invalid("The --all option is mutually exclusive with --component")
# 'all' means everything. we use broadcast. should use check_ducc to make sure
# it actually worked, and find the stragglers.
if ( all ):
if ( not force ) :
self.clean_shutdown()
# Agents may wait up to 60 secs for processes to quiesce
print "Waiting " + str(wait_time) + " seconds to broadcast agent shutdown."
time.sleep(wait_time)
if ( self.automanage_broker ):
print "Stopping broker"
self.stop_broker()
if ( self.automanage_database ):
print "Stopping database"
self.db_stop()
if ( os.path.exists(self.pid_file_agents) ):
os.remove(self.pid_file_agents)
if ( os.path.exists(self.pid_file_daemons) ):
os.remove(self.pid_file_daemons)
return
else:
if ( len(nodefiles) == 0 ):
nodefiles = self.default_nodefiles
self.pids_agents = Properties()
self.pids_daemons = Properties()
sc = set(components)
sb = set(['broker', 'db'])
read_pids = True
if ( sc.issubset(sb) ):
read_pids = False
# The broker and db do not set the pid file
if ( read_pids ):
try:
if(not self.is_reliable_backup()):
self.pids_agents.load(self.pid_file_agents)
self.pids_daemons.load(self.pid_file_daemons)
except PropertiesException, (inst):
print inst.msg
print ''
print 'Run check_ducc -p to refresh the PIDs file, or check_ducc -k to search for and',
print 'kill all DUCC processes.'
print ''
sys.exit(1)
#
# if not 'all', we use nodefiles and component names
#
# make sure all the nodefiles exist and are readable
ok = True
nodes = {}
n_nodes = 0
for n in nodefiles:
n_nodes, nodes = self.read_nodefile(n, nodes)
for ( nf, nl ) in nodes.items():
if ( nl == None ): # die early if the parameters are wrong
print "Can't read nodefile", nf
ok = False
if ( not ok ):
sys.exit(1)
if ( quiesce ):
if(self.is_reliable_backup()):
print '********** "backup" head node -> not quiescing agents'
else:
self.quiesce_agents(components, nodes)
else:
if(self.is_reliable_backup()):
print '********** "backup" head node -> not stopping agents'
else:
for (nf, nl) in nodes.items():
for n in nl:
self.stop_agent(n, force)
host = self.localhost.split('.')[0]
for c in components:
c = c.strip()
if(c in ('pm','rm','sm','ws')):
c = c+'@'+host
self.stop_component(c, force)
time.sleep(2)
for c in components:
c = c.strip()
if(c in ('or')):
c = c+'@'+host
self.stop_component(c, force)
time.sleep(2)
for c in components:
c = c.strip()
if(c in ('db','broker')):
self.stop_component(c, force)
if ( read_pids ):
if(not self.is_reliable_backup()):
if ( len(self.pids_agents) > 0 ):
self.pids_agents.write(self.pid_file_agents)
else:
os.remove(self.pid_file_agents)
if ( len(self.pids_daemons) > 0 ):
self.pids_daemons.write(self.pid_file_daemons)
else:
os.remove(self.pid_file_daemons)
return
if __name__ == "__main__":
stopper = StopDucc()
stopper.main(sys.argv[1:])