| #!/usr/bin/env python |
| # ----------------------------------------------------------------------- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # ----------------------------------------------------------------------- |
| |
| |
| import os |
| import sys |
| import time |
| import getopt |
| import threading |
| import traceback |
| |
| from ducc_util import DuccUtil |
| from properties import Properties |
| from local_hooks import verify_slave_node |
| from local_hooks import verify_master_node |
| from ducc import Ducc |
| from ducc_util import ThreadPool |
| from ducc_base import find_ducc_home |
| |
| class StartDucc(DuccUtil): |
| |
| def __init__(self): |
| DuccUtil.__init__(self, True) |
| |
| def start_broker(self): |
| |
| broker_host = self.ducc_properties.get('ducc.broker.hostname') |
| print 'Starting broker on', broker_host |
| lines = self.ssh(broker_host, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c', 'broker', "'") |
| while 1: |
| line = lines.readline().strip() |
| if ( not line ): |
| break |
| #print '[] ' + line |
| if ( line.startswith('PID') ): |
| toks = line.split(' ') # get the PID |
| print "Broker on", broker_host, 'PID', toks[1] |
| self.pids.put('broker@' + broker_host, toks[1]) |
| lines.close() |
| break |
| |
| for i in range(0, 9): |
| if ( self.is_amq_active() ): |
| return |
| print 'Waiting for broker .....', str(i) |
| time.sleep(1) |
| |
| def start_component(self, args): |
| |
| ducc, component, or_parms = args |
| msgs = [] |
| |
| node = self.ducc_properties.get('ducc.head') |
| com = component |
| if ( com.find('@') >= 0 ): |
| com, node = com.split('@') |
| |
| if ( ( com == 'ws' ) and ( node == 'local' ) and ( self.webserver_node != 'localhost' )): |
| if ( self.webserver_node != None ): |
| node = self.webserver_node |
| component = com + '@' + node |
| |
| if ((com in self.default_components) or ( com == 'agent')) : |
| msgs.append((node, 'Starting', com)) |
| else: |
| msgs.append(('Unrecognized component', component)) |
| return msgs |
| |
| |
| if ( or_parms == None ): |
| or_parms = '--or_parms=' |
| else: |
| or_parms = '--or_parms=' + or_parms |
| |
| if ( node == 'local' ): |
| node = self.localhost |
| |
| lines = self.ssh(node, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c', com, '-b', or_parms, '-d', str(time.time()), '--nodup', "'") |
| |
| # we'll capture anything that the python shell spews because it may be useful, and then drop the |
| # pipe when we see a PID message |
| while 1: |
| line = lines.readline().strip() |
| if ( not line ): |
| break |
| #msgs.append(('[]', line)) |
| if ( line.startswith('PID') ): |
| toks = line.split(' ') # get the PID |
| msgs.append((' PID', toks[1])) |
| self.pids.put(com + '@' + node, toks[1]) |
| lines.close() |
| break |
| if ( line.startswith('WARN') ): |
| msgs.append((' ', line)) |
| |
| if ( com in self.default_components ): # tracks where the management processes are |
| self.pidlock.acquire() |
| self.pids.put(com, com + '@' + node) |
| self.pidlock.release() |
| |
| return msgs |
| |
| def start_one_agent(self, args): |
| |
| host = args[0] |
| msgs = [] |
| spacer = ' ' |
| msgs.append((host, "")) |
| lines = self.ssh(host, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c' 'agent', '-b', '-d', str(time.time()), '--nodup', "'") |
| for line in lines: |
| line = line.strip() |
| # print '[]', host, line |
| # msgs.append(('[l]', line)) |
| if ( line.startswith('PID') ): |
| toks = line.split(' ') |
| pid = toks[1] |
| self.pidlock.acquire() |
| self.pids.put('agent@' + host, pid) |
| self.pidlock.release() |
| |
| lines.close() |
| msgs.append((spacer, 'DUCC Agent started PID', pid)) |
| break |
| |
| if ( 'tty' in line ): |
| # ssh junk if mesg is set |
| continue |
| |
| toks = line.split() |
| |
| sshmsgs = self.ssh_ok(host, line ) |
| if ( sshmsgs != None ): |
| for m in sshmsgs: |
| print '[S]', m |
| |
| if ( toks[0] == 'NOTOK' ): |
| msgs.append((spacer, 'NOTOK Not started:', ' '.join(toks[1:]))) |
| else: |
| msgs.append((spacer, line)) |
| |
| return msgs |
| |
| def verify_required_directories(self): |
| for dir in ('history', 'state', 'logs'): |
| d = self.DUCC_HOME + '/' + dir |
| if ( not os.path.exists(d) ): |
| print "Initializing", d |
| os.mkdir(d) |
| |
| def usage(self, *msg): |
| if ( msg[0] != None ): |
| print ' '.join(msg) |
| |
| print "Usage:" |
| print " start_ducc [options]" |
| print " If no options are given, all DUCC processes are started, using the default" |
| print " nodelist, DUCC_HOME/resources/ducc.nodes. " |
| print "" |
| print "Options:" |
| print " -n --nodelist nodefile" |
| print " Start agents on the nodes in the nodefile. Multiple nodefiles may be specified:" |
| print "" |
| print " start_ducc -n foo.nodes -n bar.nodes -n baz.nodes" |
| print "" |
| print " -c, --component component" |
| print " Start a specific DUCC component, optionally on a specific node. If the component name" |
| print " is qualified with a nodename, the component is started on that node. To qualify a" |
| print " component name with a destination node, use the notation component@nodename." |
| print " Multiple components may be specified:" |
| print "" |
| print " start_ducc -c sm -c pm -c rm@node1 -c or@node2 -c agent@remote1 -c agent@remote2" |
| print "" |
| print " Components include:" |
| print " rm - resource manager" |
| print " or - orchestrator" |
| print " pm - process manager" |
| print " sm - services manager" |
| print " ws - web server" |
| print " agent - node agent" |
| print "" |
| print " --nothreading" |
| print " Disable multithreaded operation if it would otherwise be used" |
| print "" |
| print "Examples:" |
| print " Start all DUCC processes, using custom nodelists:" |
| print " start_ducc -n foo.nodes -n bar.nodes" |
| print "" |
| print " Start just agents on a specific set of nodes:" |
| print " start_ducc -n foo.nodes -n bar.nodes" |
| print "" |
| print " Start the webserver on node 'bingle':" |
| print " start_ducc -c ws@bingle" |
| sys.exit(1) |
| |
| def invalid(self, *msg): |
| if ( msg[0] != None ): |
| print ' '.join(msg) |
| |
| print "For usage run" |
| print " start_ducc -h" |
| print 'or' |
| print ' start_ducc --help' |
| sys.exit(1) |
| |
| def main(self, argv): |
| |
| self.verify_head() |
| |
| if ( not self.verify_jvm() ): |
| sys.exit(1); |
| |
| self.set_duccling_version() |
| |
| nodefiles = [] |
| components = [] |
| or_parms = self.ducc_properties.get('ducc.orchestrator.start.type') |
| self.pids = Properties() |
| self.pids.load_if_exists(self.pid_file) |
| |
| try: |
| opts, args = getopt.getopt(argv, 'c:mn:sh?v', ['component=', 'components=', 'help', 'nodelist=', 'cold', 'warm', 'hot', 'nothreading']) |
| except: |
| self.invalid('Invalid arguments', ' '.join(argv)) |
| |
| for ( o, a ) in opts: |
| if o in ( '-c', '--components' ): |
| components.append(a) |
| elif o in ( '-n', '--nodelist' ): |
| nodefiles.append(a) |
| elif o in ( '--nothreading' ): |
| self.disable_threading() |
| elif o in ( '--cold', '--warm', '--hot' ): |
| or_parms = o[2:] # (strip the leading --) |
| elif ( o == '-v'): |
| print self.version() |
| sys.exit(0) |
| elif o in ( '-h', '--help' ): |
| self.usage(None) |
| elif ( o == '-?'): |
| self.usage(None) |
| else: |
| self.invalid('bad args: ', ' '.join(argv)) |
| |
| if not self.installed(): |
| print "Head node is not initialized. Have you run ducc_post_install?" |
| return |
| |
| environ = self.show_ducc_environment() |
| for e in environ: |
| print e |
| |
| # no args, or just -s - make equivalent of -management and -nodefile=DUCC.HOME/resources/ducc.nodes |
| if ( (len(components) == 0) and (len(nodefiles) == 0 ) ) : |
| nodefiles = self.default_nodefiles |
| components = self.default_components |
| |
| self.verify_required_directories() |
| |
| if ( not verify_master_node(self.ducc_properties) ): |
| print 'FAIL: Cannot run javac to run java verification' |
| return |
| |
| # make sure all the nodefiles exist and are readable |
| ok = True |
| nodes = {} |
| n_nodes = 0 |
| for n in nodefiles: |
| n_nodes, nodes = self.read_nodefile(n, nodes) |
| |
| for ( nf, nl ) in nodes.items(): |
| if ( nl == None ): |
| print "Can't read nodefile", nf |
| ok = False |
| |
| if ok and (nodefiles == self.default_nodefiles): |
| if self.verify_class_configuration(nodefiles[0], False): |
| print "OK: Class configuration checked" |
| else: |
| print "NOTOK: Bad configuration, cannot start." |
| ok = False |
| |
| if ( not ok ): |
| sys.exit(1) |
| |
| if ( not self.verify_limits() ): |
| print "Limits too low to run DUCC" |
| sys.exit(1) |
| |
| # activeMQ needs to be started externally before starting any DUCC processes |
| if ( self.automanage and ('broker' in components) ): |
| if ( self.is_amq_active() ): |
| print 'ActiveMQ broker is already running on host and port:', self.broker_host + ':' + self.broker_port, 'NOT restarting' |
| else: |
| try: |
| self.start_broker() |
| except: |
| print sys.exc_info()[0], "DUCC may not be started correctly." |
| sys.exit(1) |
| |
| if ( 'db' in components ): |
| try: |
| self.db_start() |
| except Exception (e): |
| # print e |
| print sys.exc_info()[0], "Can't start the database." |
| sys.exit(1) |
| |
| if ( self.is_amq_active() ): |
| print 'ActiveMQ broker is found on configured host and port:', self.broker_host + ':' + self.broker_port |
| else: |
| print 'ActiveMQ broker is required but cannot be found on', self.broker_host + ':' + self.broker_port |
| sys.exit(1) |
| |
| ducc = Ducc() |
| |
| print "Starting", n_nodes, "agents" |
| self.threadpool = ThreadPool(n_nodes + 5) # a few more for the head processes |
| self.pidlock = threading.Lock() |
| for (nodefile, nodelist) in nodes.items(): |
| print '********** Starting agents from file', nodefile |
| try: |
| for node in nodelist: |
| self.threadpool.invoke(self.start_one_agent, node) |
| except: |
| self.threadpool.quit() |
| print sys.exc_info()[0], "DUCC may not be started correctly." |
| sys.exit(1) |
| |
| if ( len(components) != 0 ): |
| print 'Starting', or_parms |
| |
| for com in components: |
| if ( com in ('broker', 'db') ): |
| pass # already started |
| else: |
| try: |
| self.threadpool.invoke(self.start_component, ducc, com, or_parms) |
| #self.start_component(ducc, com, or_parms) |
| except: |
| self.threadpool.quit() |
| print sys.exc_info()[0], "DUCC may not be started correctly." |
| sys.exit(1) |
| |
| self.threadpool.quit() |
| |
| if ( len(self.pids) > 0 ): |
| self.pids.write(self.pid_file) |
| return |
| |
| if __name__ == "__main__": |
| # First check if ducc_post_install has been run |
| DUCC_HOME = find_ducc_home() |
| propsfile = DUCC_HOME + '/resources/site.ducc.properties' |
| if ( not os.path.exists(propsfile) ): |
| print "\n>> ERROR >> Missing site.ducc.properties -- please run ducc_post_install\n" |
| sys.exit(99) |
| starter = StartDucc() |
| starter.main(sys.argv[1:]) |