blob: 54bb663e62229aa9e190db02fb5bcad74067ba94 [file] [log] [blame]
#!/usr/bin/env python
# -----------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# -----------------------------------------------------------------------
import os
import sys
import time
import getopt
import threading
import traceback
from ducc_util import DuccUtil
from properties import Properties
from local_hooks import verify_master_node
from ducc import Ducc
from ducc_util import ThreadPool
from ducc_base import find_ducc_home
import db_util as dbu
class StartDucc(DuccUtil):
def __init__(self):
DuccUtil.__init__(self, True)
def start_broker(self):
broker_host = self.localhost
print 'Starting broker on', broker_host
node = broker_host
com = 'broker'
self.db_acct_start(node,com)
lines = self.ssh(broker_host, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c', 'broker', "'")
while 1:
line = lines.readline().strip()
if ( not line ):
break
#print '[] ' + line
if ( line.startswith('PID') ):
toks = line.split(' ') # get the PID
print "Broker on", broker_host, 'PID', toks[1]
#self.pids_daemons.put('broker@' + broker_host, toks[1])
lines.close()
break
for i in range(0, 9):
if ( self.is_amq_active() ):
return
print 'Waiting for broker .....', str(i)
time.sleep(1)
def start_component(self, args):
component, or_parms = args
msgs = []
com = component
if ( com.find('@') >= 0 ):
com, node = com.split('@')
else:
node = self.ducc_properties.get('ducc.head')
if (com in self.local_components):
node = self.localhost
if(self.automanage_database):
pass
else:
if (com in ['db', 'database']):
msgs.append(('Unmanaged component', component))
return msgs
if com == 'ag':
com = 'agent'
if ((com in self.default_components) or ( com == 'agent')) :
msgs.append(('Starting', com, 'on', node))
else:
msgs.append(('Unrecognized component', component))
return msgs
if ( or_parms == None ):
or_parms = '--or_parms='
else:
or_parms = '--or_parms=' + or_parms
if ( node == 'local' ):
node = self.localhost
else:
if not self.ssh_operational(node):
msgs.append(('>>>>> ERROR - cannot ssh to', node))
return msgs
lines = self.ssh(node, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c', com, '-b', or_parms, '-d', str(time.time()), '--nodup', "'")
# we'll capture anything that the python shell spews because it may be useful, and then drop the
# pipe when we see a PID message
pid = None
while 1:
line = lines.readline().strip()
if ( not line ):
break
#msgs.append(('[]', line))
if ( line.startswith('PID') ):
toks = line.split(' ') # get the PID
pid = toks[1]
lines.close()
break
if ( line.startswith('WARN') ):
msgs.append((' ', line))
pid = '?'
if ( line.startswith('NOTOK') ):
msgs.append((' ', line))
pid = '?'
sshmsgs = self.ssh_ok(node, line )
if ( sshmsgs != None ):
for m in sshmsgs:
print '[S]', m
if not pid:
msgs.append(('ERROR - failed to start', com, 'on', node))
return msgs;
if pid != '?':
msgs.append(('Started', com, 'on', node, 'with PID', pid))
#if ( com in self.default_components ): # tracks where the management processes are
# self.pidlock.acquire()
# self.pids_daemons.put(com, com + '@' + node)
# self.pidlock.release()
self.db_acct_start(node,com)
return msgs
def verify_required_directories(self):
for dir in ('history', 'state', 'logs'):
d = self.DUCC_HOME + '/' + dir
if ( not os.path.exists(d) ):
print "Initializing", d
os.mkdir(d)
def usage(self, *msg):
if ( msg[0] != None ):
print ' '.join(msg)
print "Usage:"
print " start_ducc [options]"
print " If no options are given, all DUCC processes are started, using the default"
print " nodelist, DUCC_HOME/resources/ducc.nodes. "
print ""
print " For reliable DUCC agents will not be started from backup head node. "
print ""
print " Broker will not be started when ducc.broker.automanage = false. "
print " Database will not be started when ducc.database.automanage = false. "
print ""
print "Options:"
print " -n --nodelist nodefile"
print " Start agents on the nodes in the nodefile. Multiple nodefiles may be specified:"
print ""
print " start_ducc -n foo.nodes -n bar.nodes -n baz.nodes"
print ""
print " -c, --component component"
print " Start a specific DUCC component, optionally on a specific node. If the component name"
print " is qualified with a nodename, the component is started on that node. To qualify a"
print " component name with a destination node, use the notation component@nodename."
print " Multiple components may be specified:"
print ""
print " start_ducc -c sm -c pm -c rm@node1 -c or@node2 -c agent@remote1 -c ag@remote2"
print ""
print " Components include:"
print " rm - resource manager"
print " or - orchestrator"
print " pm - process manager"
print " sm - services manager"
print " ws - web server"
print " ag or agent - node agent"
print ' head = { or, pm, rm, sm, ws, db, broker }'
print ""
print " --nothreading"
print " Disable multithreaded operation if it would otherwise be used"
print ""
print " Choose none or one of the following two options, which is only effective when the orchestrator (or) component is started."
print " When specified here it supersedes that specified for ducc.orchestrator.start.type in ducc.properties."
print " When not specified here or in ducc.properties, the default is --warm."
print ""
print " --warm"
print " Do NOT force active Jobs, Services, and Reservations to Completed state."
print ""
print " --cold"
print " Force active Jobs, Services, and Reservations to Completed state."
print ""
print "Examples:"
print " Start all DUCC processes, using custom nodelists:"
print " start_ducc -n foo.nodes -n bar.nodes"
print ""
print " Start just agents on a specific set of nodes:"
print " start_ducc -n foo.nodes -n bar.nodes"
print ""
print " Start the webserver on node 'bingle':"
print " start_ducc -c ws@bingle"
sys.exit(1)
def invalid(self, *msg):
if ( msg[0] != None ):
print
print ' '.join(msg)
print
print "For usage run"
print " start_ducc -h"
print 'or'
print ' start_ducc --help'
sys.exit(1)
def main(self, argv):
self.verify_head()
self.check_properties()
if ( not self.verify_jvm() ):
sys.exit(1);
self.set_duccling_version()
nodefiles = []
components = []
or_parms = self.ducc_properties.get('ducc.orchestrator.start.type')
try:
opts, args = getopt.getopt(argv, 'c:mn:sh?v', ['component=', 'help', 'nodelist=', 'cold', 'warm', 'nothreading'])
except:
self.invalid('Invalid arguments', ' '.join(argv))
if (len(args) > 0):
self.invalid('Invalid extra args: ', ' '.join(args))
for ( o, a ) in opts:
if o in ( '-c', '--component' ):
if (a.strip() == 'head'):
components.append('or')
components.append('pm')
components.append('rm')
components.append('sm')
components.append('ws')
components.append('broker')
if(self.automanage_database):
components.append('db')
else:
if(self.automanage_database):
pass
else:
if('db' in a):
print "Database not automanaged."
return
if('database' in a):
print "Database not automanaged."
return
components.append(a)
elif o in ( '-n', '--nodelist' ):
nodefiles.append(a)
elif o in ( '--nothreading' ):
self.disable_threading()
elif o in ( '--cold', '--warm' ):
or_parms = o[2:] # (strip the leading --)
elif ( o == '-v'):
print self.version()
sys.exit(0)
elif o in ( '-h', '--help' ):
self.usage(None)
elif ( o == '-?'):
self.usage(None)
else:
self.invalid('bad arg: ', o, 'in:', ' '.join(argv))
if not self.installed():
print "Head node is not initialized. Have you run ducc_post_install?"
return
environ = self.show_ducc_environment()
for e in environ:
print e
# no args, or just -s - make equivalent of -management and -nodefile=DUCC.HOME/resources/ducc.nodes
if ( (len(components) == 0) and (len(nodefiles) == 0 ) ) :
nodefiles = self.default_nodefiles
components = self.default_components
self.verify_required_directories()
# If this fails local_hooks.py should report the reason
if(self.is_head_node()):
if ( not verify_master_node(self.ducc_properties) ):
return
# make sure all the nodefiles exist and are readable
ok = True
nodes = {}
n_nodes = 0
for n in nodefiles:
n_nodes, nodes = self.read_nodefile(n, nodes)
for ( nf, nl ) in nodes.items():
if ( nl == None ):
print "Can't read nodefile", nf
ok = False
if ok and (nodefiles == self.default_nodefiles):
if self.verify_class_configuration(nodefiles[0], False):
print "OK: Class configuration checked"
else:
print "NOTOK: Bad configuration, cannot start."
ok = False
if ( not ok ):
sys.exit(1)
if ( not self.verify_limits() ):
print "Limits too low to run DUCC"
sys.exit(1)
is_active_head_node = self.is_active();
if ( self.automanage_database and 'db' in components ):
if (not is_active_head_node):
print 'DB can only be automanaged from the active head node'
else:
try:
if ( not self.db_start() ):
print "Failed to start or connect to the database."
sys.exit(1)
node = self.get_db_host()
com = 'database'
self.db_acct_start(node,com)
dbu.update_database(self.DUCC_HOME, self.jvm)
except Exception (e):
# print e
print sys.exc_info()[0], "Can't start the database."
sys.exit(1)
if ( len(components) == 1 ):
if ( self.automanage_database and ('db' in components) ):
return
# activeMQ needs to be started externally before starting any DUCC processes
if ( self.automanage_broker and ('broker' in components) ):
if ( self.is_amq_active() ):
print 'ActiveMQ broker is already running on host and port:', self.broker_host + ':' + self.broker_port, 'NOT restarting'
else:
try:
self.start_broker()
except:
print sys.exc_info()[0], "DUCC may not be started correctly."
sys.exit(1)
if ( self.is_amq_active() ):
print 'ActiveMQ broker is found on configured host and port:', self.broker_host + ':' + self.broker_port
else:
print 'ActiveMQ broker is required but cannot be found on', self.broker_host + ':' + self.broker_port
sys.exit(1)
#ducc = Ducc()
self.threadpool = ThreadPool(n_nodes + 5) # a few more for the head processes
#self.pidlock = threading.Lock()
#start 'or' first to field system log requests
if ( len(components) != 0 ):
for com in components:
if ( com in ('or') ):
try:
self.threadpool.invoke(self.start_component, com, or_parms)
except:
self.threadpool.quit()
print sys.exc_info()[0], "DUCC may not be started correctly."
sys.exit(1)
# give 'or' a small head start
time.sleep(2)
if (not is_active_head_node):
print '********** "backup" head node -> not starting agents'
else:
if n_nodes > 0:
print "Starting", n_nodes, "agents"
for (nodefile, nodelist) in nodes.items():
print '********** Starting agents from file', nodefile
try:
for node in nodelist:
self.threadpool.invoke(self.start_component, 'agent@'+node, None)
except:
self.threadpool.quit()
print sys.exc_info()[0], "DUCC may not be started correctly."
sys.exit(1)
if ( len(components) != 0 ):
print 'Starting', or_parms
for com in components:
if ( com in ('broker', 'db', 'or') ):
pass # already started
else:
try:
self.threadpool.invoke(self.start_component, com, or_parms)
except:
self.threadpool.quit()
print sys.exc_info()[0], "DUCC may not be started correctly."
sys.exit(1)
self.threadpool.quit()
return
if __name__ == "__main__":
# First check if ducc_post_install has been run
DUCC_HOME = find_ducc_home()
propsfile = DUCC_HOME + '/resources/site.ducc.properties'
if ( not os.path.exists(propsfile) ):
print "\n>> ERROR >> Missing site.ducc.properties -- please run ducc_post_install\n"
sys.exit(99)
starter = StartDucc()
starter.main(sys.argv[1:])