| #!/usr/bin/env python |
| # ----------------------------------------------------------------------- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # ----------------------------------------------------------------------- |
| |
| |
| import os |
| import sys |
| import time |
| import getopt |
| import glob |
| |
| from ducc_util import DuccUtil |
| from properties import * |
| from ducc import Ducc |
| |
| class StopDucc(DuccUtil): |
| |
| def stop_component(self, component, force): |
| |
| if ( (component == 'broker') and self.automanage_broker ): |
| print 'Stopping broker' |
| self.stop_broker() |
| return |
| if ( (component == 'db') and self.automanage_database ): |
| print 'Stopping database' |
| self.db_stop() |
| return |
| |
| # |
| # If it's an unqualified management component, we need to get it's qualified name |
| # |
| if ( component in self.default_components ): |
| if( component == 'agent' ): |
| if ( self.pids_agents.has_key(component) ): |
| component = self.pids_agents.get(component) |
| else: |
| print 'Skipping', component, 'not in pids file.' |
| return |
| else: |
| if ( self.pids_daemons.has_key(component) ): |
| component = self.pids_daemons.get(component) |
| else: |
| print 'Skipping', component, 'not in pids file.' |
| return |
| |
| # |
| # If the name is not qualified we've got a problem, everything in the pids file is qualified |
| # |
| if ( component.find('@') >= 0 ): |
| com, target_node = component.split('@') |
| else: |
| self.invalid("Must specify hostname when stopping", component) |
| |
| # |
| # If despite all that we can't find the pid, we need to run check_ducc |
| # |
| if( com == 'agent' ): |
| if ( not self.pids_agents.has_key(component) ): |
| print "Cannot find PID for component", component, ". Run check_ducc -p to refresh PIDS and then rerun stop_ducc." |
| return |
| else: |
| pid = self.pids_agents.get(component) |
| else: |
| if ( not self.pids_daemons.has_key(component) ): |
| print "Cannot find PID for component", component, ". Run check_ducc -p to refresh PIDS and then rerun stop_ducc." |
| return |
| else: |
| pid = self.pids_daemons.get(component) |
| |
| |
| if ( force ): |
| print 'Stopping component', com, 'on node', target_node, 'with PID', pid, 'forcibly (kill -9)' |
| self.nohup(['ssh', target_node, 'kill', '-KILL', pid], False) |
| |
| pass |
| else: |
| print 'Stopping component', com, 'on node', target_node, 'with PID', pid |
| self.nohup(['ssh', target_node, 'kill', '-INT', pid], False) |
| |
| # clear the short name if it exists, and the long name |
| if( com == 'agent' ): |
| self.pids_agents.delete(com) |
| self.pids_agents.delete(component) |
| else: |
| self.pids_daemons.delete(com) |
| self.pids_daemons.delete(component) |
| |
| def quiesce_agents(self, components, nodes): |
| allnodes = [] |
| for ( nf, nl ) in nodes.items(): |
| allnodes = allnodes + nl |
| |
| for c in components: |
| if ( c.find('@') >= 0 ): |
| com, target_node = c.split('@') |
| allnodes.append(target_node) |
| else: |
| self.invalid("Must specify hostname when stopping", component) |
| |
| qparm = ','.join(allnodes) |
| print 'Quiescing', qparm |
| DUCC_JVM_OPTS = ' -Dducc.deploy.configuration=' + self.DUCC_HOME + "/resources/ducc.properties " |
| DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -DDUCC_HOME=' + self.DUCC_HOME |
| DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -Dducc.head=' + self.ducc_properties.get('ducc.head') |
| self.spawn(self.java(), DUCC_JVM_OPTS, 'org.apache.uima.ducc.common.main.DuccAdmin', '--quiesceAgents', qparm) |
| |
| # NOTE: quiesce does not actually cause agents to terminate so we don't update the PIDs file |
| return |
| |
| def stop_agent(self, node, force): |
| self.stop_component('agent@' + node.strip(), force) |
| |
| def usage(self, msg): |
| if ( msg != None ): |
| print msg |
| |
| print 'stop_ducc [options]' |
| print ' If no options are given, this help screen is shown.' |
| print '' |
| print ' For reliable DUCC agents will not be stopped from backup head node. ' |
| print '' |
| print ' Broker will not be stopped when ducc.broker.automanage = false. ' |
| print ' Database will not be stopped when ducc.database.automanage = false. ' |
| print '' |
| print 'Options:' |
| print ' -a --all' |
| print ' Stop all the DUCC processes, including agents and management processes.' |
| print '' |
| print ' -n --nodelist nodefile' |
| print ' Stop agents on the nodes in the nodefile. Multiple nodefiles may be specified:' |
| print '' |
| print ' stop_ducc -n foo.nodes -n bar.nodes -n baz.nodes' |
| print '' |
| print ' -c --component component' |
| print ' Stop a specific component. The component may be qualified with the node name' |
| print ' using the @ symbol: component@node.' |
| print '' |
| print ' stop_ducc -c rm@foonode' |
| print ' stop_ducc -c agent@barnode -c or' |
| print '' |
| print ' Components include:' |
| print ' agent - node agent' |
| print ' broker - AMQ broker' |
| print ' db - database' |
| print ' or - orchestrator' |
| print ' pm - process manager' |
| print ' rm - resource manager' |
| print ' sm - services manager' |
| print ' ws - web server' |
| print ' head = { or, pm, rm, sm, ws, db, broker }' |
| print '' |
| print ' -w --wait' |
| print ' Time to wait for everything to come down, in seconds. Default is 60.' |
| print '' |
| print ' -k --kill' |
| print ' Stop the component forcibly and immediately using kill -9. Use this only if a' |
| print ' normal stop does not work (e.g. the process may be hung).' |
| print '' |
| print ' --nothreading' |
| print ' Disable multithreaded operation if it would otherwise be used' |
| print '' |
| |
| sys.exit(1) |
| |
| def invalid(self, *msg): |
| if ( msg[0] != None ): |
| print ' '.join(msg) |
| |
| print "For usage run" |
| print " stop_ducc -h" |
| print 'or' |
| print ' stop_ducc --help' |
| sys.exit(1) |
| |
| |
| def main(self, argv): |
| |
| self.verify_head() |
| |
| self.check_properties() |
| |
| if ( len(argv) == 0 ): |
| self.usage(None) |
| |
| components = [] |
| nodefiles = [] |
| do_agents = False |
| do_components = False |
| force = False |
| quiesce = False |
| all = False |
| wait_time = 60 |
| |
| try: |
| opts, args = getopt.getopt(argv, 'ac:n:kn:w:qh?v', ['all', 'component=', 'help', 'nodelist=', 'kill', 'quiesce', 'nothreading', 'wait']) |
| except: |
| self.invalid('Invalid arguments ' + ' '.join(argv)) |
| |
| if (len(args) > 0): |
| self.invalid('Invalid extra args: ', ' '.join(args)) |
| |
| for ( o, a ) in opts: |
| if o in ('-c', '--component' ): |
| if (a.strip() == 'head'): |
| components.append('or') |
| components.append('pm') |
| components.append('rm') |
| components.append('sm') |
| components.append('ws') |
| components.append('db') |
| components.append('broker') |
| else: |
| components.append(a) |
| do_components = True |
| elif o in ( '-a', '--all' ): |
| all = True |
| components = self.default_components |
| elif o in ( '-n', '--nodelist' ): |
| nodefiles.append(a) |
| do_agents = True |
| elif o in ( '-k', '--kill' ): |
| force = True |
| elif o in ( '-q', '--quiesce' ): |
| quiesce = True |
| elif o in ( '-w', '--wait' ): |
| wait_time = int(a) |
| elif o in ( '--nothreading' ): |
| self.disable_threading() |
| elif ( o == '-v' ) : |
| print self.version() |
| sys.exit(0) |
| elif o in ( '-h', '--help' ): |
| self.usage(None) |
| elif ( o == '-?'): |
| self.usage(None) |
| else: |
| self.invalid('bad arg: ' + o) |
| |
| if ( quiesce ): |
| if ( all ): |
| self.invalid("May not quiesce 'all'."); |
| if ( force ): |
| self.invalid("May not both quiesce and force."); |
| for c in components: |
| if ( not c.startswith('agent') ): |
| self.invalid("Only agents may be quiesced.") |
| |
| |
| |
| # avoid confusion by insuring that if 'all', then nothing else is specified |
| if ( all and ( do_components ) ): |
| self.invalid("The --all option is mutually exclusive with --component") |
| |
| # 'all' means everything. we use broadcast. should use check_ducc to make sure |
| # it actually worked, and find the stragglers. |
| if ( all ): |
| if ( not force ) : |
| self.clean_shutdown() |
| |
| # Agents may wait up to 60 secs for processes to quiesce |
| print "Waiting " + str(wait_time) + " seconds to broadcast agent shutdown." |
| time.sleep(wait_time) |
| |
| if ( self.automanage_broker ): |
| print "Stopping broker" |
| self.stop_broker() |
| |
| if ( self.automanage_database ): |
| print "Stopping database" |
| self.db_stop() |
| |
| if ( os.path.exists(self.pid_file_agents) ): |
| os.remove(self.pid_file_agents) |
| if ( os.path.exists(self.pid_file_daemons) ): |
| os.remove(self.pid_file_daemons) |
| return |
| else: |
| if ( len(nodefiles) == 0 ): |
| nodefiles = self.default_nodefiles |
| |
| |
| self.pids_agents = Properties() |
| self.pids_daemons = Properties() |
| sc = set(components) |
| sb = set(['broker', 'db']) |
| read_pids = True |
| if ( sc.issubset(sb) ): |
| read_pids = False |
| |
| # The broker and db do not set the pid file |
| if ( read_pids ): |
| try: |
| if(not self.is_reliable_backup()): |
| self.pids_agents.load(self.pid_file_agents) |
| self.pids_daemons.load(self.pid_file_daemons) |
| except PropertiesException, (inst): |
| print inst.msg |
| print '' |
| print 'Run check_ducc -p to refresh the PIDs file, or check_ducc -k to search for and', |
| print 'kill all DUCC processes.' |
| print '' |
| sys.exit(1) |
| |
| # |
| # if not 'all', we use nodefiles and component names |
| # |
| |
| # make sure all the nodefiles exist and are readable |
| ok = True |
| nodes = {} |
| n_nodes = 0 |
| for n in nodefiles: |
| n_nodes, nodes = self.read_nodefile(n, nodes) |
| |
| for ( nf, nl ) in nodes.items(): |
| if ( nl == None ): # die early if the parameters are wrong |
| print "Can't read nodefile", nf |
| ok = False |
| |
| if ( not ok ): |
| sys.exit(1) |
| |
| if ( quiesce ): |
| if(self.is_reliable_backup()): |
| print '********** "backup" head node -> not quiescing agents' |
| else: |
| self.quiesce_agents(components, nodes) |
| else: |
| if(self.is_reliable_backup()): |
| print '********** "backup" head node -> not stopping agents' |
| else: |
| for (nf, nl) in nodes.items(): |
| for n in nl: |
| self.stop_agent(n, force) |
| host = self.localhost.split('.')[0] |
| for c in components: |
| c = c.strip() |
| if(c in ('pm','rm','sm','ws')): |
| c = c+'@'+host |
| self.stop_component(c, force) |
| time.sleep(2) |
| for c in components: |
| c = c.strip() |
| if(c in ('or')): |
| c = c+'@'+host |
| self.stop_component(c, force) |
| time.sleep(2) |
| for c in components: |
| c = c.strip() |
| if(c in ('db','broker')): |
| self.stop_component(c, force) |
| |
| if ( read_pids ): |
| if(not self.is_reliable_backup()): |
| if ( len(self.pids_agents) > 0 ): |
| self.pids_agents.write(self.pid_file_agents) |
| else: |
| os.remove(self.pid_file_agents) |
| if ( len(self.pids_daemons) > 0 ): |
| self.pids_daemons.write(self.pid_file_daemons) |
| else: |
| os.remove(self.pid_file_daemons) |
| |
| return |
| |
| if __name__ == "__main__": |
| stopper = StopDucc() |
| stopper.main(sys.argv[1:]) |
| |
| |