blob: c2f3f0b987c5084280dd48d7da02e40458c7fb52 [file] [log] [blame]
#!/usr/bin/env python
# -----------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# -----------------------------------------------------------------------
import os
import sys
import time
import getopt
import threading
import traceback
from ducc_util import DuccUtil
from properties import Properties
from local_hooks import verify_slave_node
from local_hooks import verify_master_node
from ducc import Ducc
from ducc_util import ThreadPool
from ducc_base import find_ducc_home
class StartDucc(DuccUtil):
def __init__(self):
DuccUtil.__init__(self, True)
def start_broker(self):
broker_host = self.ducc_properties.get('ducc.broker.hostname')
print 'Starting broker on', broker_host
lines = self.ssh(broker_host, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c', 'broker', "'")
while 1:
line = lines.readline().strip()
if ( not line ):
break
#print '[] ' + line
if ( line.startswith('PID') ):
toks = line.split(' ') # get the PID
print "Broker on", broker_host, 'PID', toks[1]
self.pids.put('broker@' + broker_host, toks[1])
lines.close()
break
for i in range(0, 9):
if ( self.is_amq_active() ):
return
print 'Waiting for broker .....', str(i)
time.sleep(1)
def start_component(self, args):
ducc, component, or_parms = args
msgs = []
node = self.ducc_properties.get('ducc.head')
com = component
if ( com.find('@') >= 0 ):
com, node = com.split('@')
if ( ( com == 'ws' ) and ( node == 'local' ) and ( self.webserver_node != 'localhost' )):
if ( self.webserver_node != None ):
node = self.webserver_node
component = com + '@' + node
if ((com in self.default_components) or ( com == 'agent')) :
msgs.append((node, 'Starting', com))
else:
msgs.append(('Unrecognized component', component))
return msgs
if ( or_parms == None ):
or_parms = '--or_parms='
else:
or_parms = '--or_parms=' + or_parms
if ( node == 'local' ):
node = self.localhost
lines = self.ssh(node, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c', com, '-b', or_parms, '-d', str(time.time()), '--nodup', "'")
# we'll capture anything that the python shell spews because it may be useful, and then drop the
# pipe when we see a PID message
while 1:
line = lines.readline().strip()
if ( not line ):
break
#msgs.append(('[]', line))
if ( line.startswith('PID') ):
toks = line.split(' ') # get the PID
msgs.append((' PID', toks[1]))
self.pids.put(com + '@' + node, toks[1])
lines.close()
break
if ( line.startswith('WARN') ):
msgs.append((' ', line))
if ( com in self.default_components ): # tracks where the management processes are
self.pidlock.acquire()
self.pids.put(com, com + '@' + node)
self.pidlock.release()
return msgs
def start_one_agent(self, args):
host = args[0]
msgs = []
spacer = ' '
msgs.append((host, ""))
lines = self.ssh(host, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c' 'agent', '-b', '-d', str(time.time()), '--nodup', "'")
for line in lines:
line = line.strip()
# print '[]', host, line
# msgs.append(('[l]', line))
if ( line.startswith('PID') ):
toks = line.split(' ')
pid = toks[1]
self.pidlock.acquire()
self.pids.put('agent@' + host, pid)
self.pidlock.release()
lines.close()
msgs.append((spacer, 'DUCC Agent started PID', pid))
break
if ( 'tty' in line ):
# ssh junk if mesg is set
continue
toks = line.split()
sshmsgs = self.ssh_ok(host, line )
if ( sshmsgs != None ):
for m in sshmsgs:
print '[S]', m
if ( toks[0] == 'NOTOK' ):
msgs.append((spacer, 'NOTOK Not started:', ' '.join(toks[1:])))
else:
msgs.append((spacer, line))
return msgs
def verify_required_directories(self):
for dir in ('history', 'state', 'logs'):
d = self.DUCC_HOME + '/' + dir
if ( not os.path.exists(d) ):
print "Initializing", d
os.mkdir(d)
def usage(self, *msg):
if ( msg[0] != None ):
print ' '.join(msg)
print "Usage:"
print " start_ducc [options]"
print " If no options are given, all DUCC processes are started, using the default"
print " nodelist, DUCC_HOME/resources/ducc.nodes. "
print ""
print "Options:"
print " -n --nodelist nodefile"
print " Start agents on the nodes in the nodefile. Multiple nodefiles may be specified:"
print ""
print " start_ducc -n foo.nodes -n bar.nodes -n baz.nodes"
print ""
print " -c, --component component"
print " Start a specific DUCC component, optionally on a specific node. If the component name"
print " is qualified with a nodename, the component is started on that node. To qualify a"
print " component name with a destination node, use the notation component@nodename."
print " Multiple components may be specified:"
print ""
print " start_ducc -c sm -c pm -c rm@node1 -c or@node2 -c agent@remote1 -c agent@remote2"
print ""
print " Components include:"
print " rm - resource manager"
print " or - orchestrator"
print " pm - process manager"
print " sm - services manager"
print " ws - web server"
print " agent - node agent"
print ""
print " --nothreading"
print " Disable multithreaded operation if it would otherwise be used"
print ""
print "Examples:"
print " Start all DUCC processes, using custom nodelists:"
print " start_ducc -n foo.nodes -n bar.nodes"
print ""
print " Start just agents on a specific set of nodes:"
print " start_ducc -n foo.nodes -n bar.nodes"
print ""
print " Start the webserver on node 'bingle':"
print " start_ducc -c ws@bingle"
sys.exit(1)
def invalid(self, *msg):
if ( msg[0] != None ):
print ' '.join(msg)
print "For usage run"
print " start_ducc -h"
print 'or'
print ' start_ducc --help'
sys.exit(1)
def main(self, argv):
self.verify_head()
if ( not self.verify_jvm() ):
sys.exit(1);
self.set_duccling_version()
nodefiles = []
components = []
or_parms = self.ducc_properties.get('ducc.orchestrator.start.type')
self.pids = Properties()
self.pids.load_if_exists(self.pid_file)
try:
opts, args = getopt.getopt(argv, 'c:mn:sh?v', ['component=', 'components=', 'help', 'nodelist=', 'cold', 'warm', 'hot', 'nothreading'])
except:
self.invalid('Invalid arguments', ' '.join(argv))
for ( o, a ) in opts:
if o in ( '-c', '--components' ):
components.append(a)
elif o in ( '-n', '--nodelist' ):
nodefiles.append(a)
elif o in ( '--nothreading' ):
self.disable_threading()
elif o in ( '--cold', '--warm', '--hot' ):
or_parms = o[2:] # (strip the leading --)
elif ( o == '-v'):
print self.version()
sys.exit(0)
elif o in ( '-h', '--help' ):
self.usage(None)
elif ( o == '-?'):
self.usage(None)
else:
self.invalid('bad args: ', ' '.join(argv))
if not self.installed():
print "Head node is not initialized. Have you run ducc_post_install?"
return
environ = self.show_ducc_environment()
for e in environ:
print e
# no args, or just -s - make equivalent of -management and -nodefile=DUCC.HOME/resources/ducc.nodes
if ( (len(components) == 0) and (len(nodefiles) == 0 ) ) :
nodefiles = self.default_nodefiles
components = self.default_components
self.verify_required_directories()
if ( not verify_master_node(self.ducc_properties) ):
print 'FAIL: Cannot run javac to run java verification'
return
# make sure all the nodefiles exist and are readable
ok = True
nodes = {}
n_nodes = 0
for n in nodefiles:
n_nodes, nodes = self.read_nodefile(n, nodes)
for ( nf, nl ) in nodes.items():
if ( nl == None ):
print "Can't read nodefile", nf
ok = False
if ok and (nodefiles == self.default_nodefiles):
if self.verify_class_configuration(nodefiles[0], False):
print "OK: Class configuration checked"
else:
print "NOTOK: Bad configuration, cannot start."
ok = False
if ( not ok ):
sys.exit(1)
if ( not self.verify_limits() ):
print "Limits too low to run DUCC"
sys.exit(1)
# activeMQ needs to be started externally before starting any DUCC processes
if ( self.automanage and ('broker' in components) ):
if ( self.is_amq_active() ):
print 'ActiveMQ broker is already running on host and port:', self.broker_host + ':' + self.broker_port, 'NOT restarting'
else:
try:
self.start_broker()
except:
print sys.exc_info()[0], "DUCC may not be started correctly."
sys.exit(1)
if ( 'db' in components ):
try:
self.db_start()
except Exception (e):
# print e
print sys.exc_info()[0], "Can't start the database."
sys.exit(1)
if ( self.is_amq_active() ):
print 'ActiveMQ broker is found on configured host and port:', self.broker_host + ':' + self.broker_port
else:
print 'ActiveMQ broker is required but cannot be found on', self.broker_host + ':' + self.broker_port
sys.exit(1)
ducc = Ducc()
print "Starting", n_nodes, "agents"
self.threadpool = ThreadPool(n_nodes + 5) # a few more for the head processes
self.pidlock = threading.Lock()
for (nodefile, nodelist) in nodes.items():
print '********** Starting agents from file', nodefile
try:
for node in nodelist:
self.threadpool.invoke(self.start_one_agent, node)
except:
self.threadpool.quit()
print sys.exc_info()[0], "DUCC may not be started correctly."
sys.exit(1)
if ( len(components) != 0 ):
print 'Starting', or_parms
for com in components:
if ( com in ('broker', 'db') ):
pass # already started
else:
try:
self.threadpool.invoke(self.start_component, ducc, com, or_parms)
#self.start_component(ducc, com, or_parms)
except:
self.threadpool.quit()
print sys.exc_info()[0], "DUCC may not be started correctly."
sys.exit(1)
self.threadpool.quit()
if ( len(self.pids) > 0 ):
self.pids.write(self.pid_file)
return
if __name__ == "__main__":
# First check if ducc_post_install has been run
DUCC_HOME = find_ducc_home()
propsfile = DUCC_HOME + '/resources/site.ducc.properties'
if ( not os.path.exists(propsfile) ):
print "\n>> ERROR >> Missing site.ducc.properties -- please run ducc_post_install\n"
sys.exit(99)
starter = StartDucc()
starter.main(sys.argv[1:])