blob: 039738945d3a4da3c3941b42ef3e5431ea98abf3 [file] [log] [blame]
#!/usr/bin/env python
# -----------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# -----------------------------------------------------------------------
#
# This script starts a simulated DUCC cluster. All processes are real, so all
# codepaths are executed. The simulation comes from running multiple DUCC agents
# on a single node, allowing large-scale testing on small-scale clusters. All
# the system-test jobs sleep rather than compute and as such are very small and
# inexpensive, which allows large-scale testing to be performed on minimal
# hardware.
#
import os
import sys
import time
import getopt
from threading import *
import Queue
#designed to run only here, relative to ducc_runtime
DUCC_HOME = os.path.abspath(__file__ + '/../../..')
sys.path.append(DUCC_HOME + '/admin')
from ducc_util import DuccUtil
from properties import Properties
from ducc import Ducc
from ducc_util import ThreadPool
# multi-threaded start can overwhelm ssh if going to the same target host. We inject
# a short sleep between starts to make it better. This is how long to sleep in secs.
# Note: when 0.1 the ducc.properties merge step run by each agent caused problems/hangs
global SLEEP_TIME
SLEEP_TIME = 0.5
class StartSim(DuccUtil):
def __init__(self):
DuccUtil.__init__(self, True)
def start_broker(self):
if ( not self.automanage_broker ):
print "Broker is not automanaged, returning."
return
broker_host = self.localhost
print 'broker host', broker_host
lines = self.ssh(broker_host, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c', 'broker', '--simtest', "'")
for i in range(0, 9):
if ( self.is_amq_active() ):
return
print 'Waiting for broker', str(i)
time.sleep(1)
def start_database(self):
if ( not self.automanage_database ):
print "Database is not automanaged, returning."
return
try:
self.db_start()
except Exception (e):
# print e
print sys.exc_info()[0], "Can't start the database."
sys.exit(1)
def run_local_agent(self, pnode, ip, memory ):
if ( not self.verify_jvm() ):
return
self.verify_duccling()
self.verify_limits()
memory = int(memory) * 1024 * 1024 # to GB from KB
CMDPARMS = []
CMDPARMS.append(self.java())
CMDPARMS.append('-Dducc.deploy.components=agent')
CMDPARMS.append('-Dos.page.size=' + self.os_pagesize)
CMDPARMS.append('-Dducc.deploy.configuration=' + self.DUCC_HOME + "/resources/ducc.properties")
CMDPARMS.append('-Djava.library.path=' + self.DUCC_HOME)
CMDPARMS.append('-Xmx100M')
CMDPARMS.append('-XX:+HeapDumpOnOutOfMemoryError')
ducc_heap_dump_path = self.ducc_properties.get('ducc.heap.dump.path')
if ducc_heap_dump_path != None:
CMDPARMS.append('-XX:HeapDumpPath='+ducc_heap_dump_path)
CMDPARMS.append('-Dducc.agent.node.metrics.fake.memory.size=' + str(memory))
# Put virtual IP on command line for duplicate daemon detector
CMDPARMS.append('-Dducc.agent.virtual=' + ip)
CMDPARMS.append('org.apache.uima.ducc.common.main.DuccService')
print "Start agent with pnode", pnode, "IP", ip, "memory", memory
os.environ['DUCC_NODENAME'] = pnode
os.environ['DUCC_IP'] = ip
self.nohup(CMDPARMS)
#self.spawn(' '.join(CMDPARMS))
#
# Start admin components rm pm sm ws db or, on local node using Ducc.py
#
def startComponent(self, args):
msgs = []
com, or_parms = args
if ( com in ('ws', 'viz') ):
node = self.webserver_node
else:
node = self.localhost
if ( com == 'or' ):
lines = self.ssh(node, True, "'",
self.DUCC_HOME + '/admin/ducc.py', '-c', 'or', '-b',
'--or_parms', or_parms, "'")
else:
lines = self.ssh(node, True, "'",
self.DUCC_HOME + '/admin/ducc.py', '-c', com, '-b', "'")
msgs.append(('Start', com, 'on', node))
while 1:
line = lines.readline().strip()
if ( not line ):
break
#print '[] ' + line
if ( line.startswith('PID') ):
toks = line.split(' ') # get the PID
self.pidlock.acquire();
self.pids.put(com, self.localhost + ' ' + toks[1] + ' ' + self.localhost)
self.pidlock.release();
lines.close()
msgs.append((' PID', toks[1]))
break
return msgs
#
# Read the special nodelist and start "special" agents
#
# Nodelist has records compatible with java properties like this:
#
# index nodename memory-in-gb
#
# We generate fake IP addresses from 192.168.4.X where x is the index.
# We generate fake node names from nodename-X where x is the index.
# We pass the memory-in-gb to the agent as the fake memory it reports instead of real memory.
#
# We use Jerry's memory override for each agent to get it to falsely report the memory
# instead of reading from the real machine.
#
def startOneAgent(self, args):
response = []
node, cmd, mem, ip, pnode, index = args
response.append(('Starting agent on', node, 'instance', index, 'as pseudo-node', pnode, 'IP', ip, 'memory', mem))
lines = self.ssh(node, True, "'", cmd, '--agent', '--memory', mem, '--addr', ip, '--pseudoname', pnode, "'")
while 1:
line = lines.readline().strip()
if ( not line ):
break
#print '[]', line
if ( line.startswith('PID')):
toks = line.split(' ') # get the PID
lines.close()
response.append((' ... Started, PID', toks[1]))
# Gottoa run on old python that doesn't know 'with'
self.pidlock.acquire()
self.pids.put(index, node + ' ' + toks[1] + ' ' + pnode)
self.pidlock.release()
break
return response
def startAgents(self, nodelist, instances):
do_all = True
if ( len(instances) > 0 ):
do_all = False
print 'Starting instances:', instances
print "Starting from nodes in", nodelist
props = Properties()
props.load(nodelist)
# first parse the node config into a list of tuples describing the simulated nodes
# each tuple is like this: (index, nodename, mem)
# where 'index' is a unique number (must be a number)
# 'nodename' is the name of a real node, which will contain one or more agents
# to simulate multiple nodes
# 'mem' is the simulated memory the agent will report
nodes = props.get('nodes').split()
mems = props.get('memory').split()
allnodes = []
ndx = 1
for node in nodes:
# allow the index to be reset by-node so you can vary the number of
# pseudo nodes on a real node without changing the indexes of others
# if desired, e.g. to manage a custom ducc.nodes
key = 'index.' + node
if ( props.has_key(key) ):
ndx = int(props.get(key))
# generate node names and associate simulated memory
for mem in mems:
count = props.get(node + '.' + mem)
if ( count != None ):
for i in range(0, int(count)):
allnodes.append( (str(ndx), node, mem) )
ndx = ndx + 1
here = os.getcwd()
cmd = os.path.abspath(sys.argv[0])
for (index, node, mem) in allnodes:
if ( not do_all ):
if ( not instances.has_key(index) ):
continue
ip = '192.168.4.' + index
pnode = node + '-' + index
self.threadpool.invoke(self.startOneAgent, node, cmd, mem, ip, pnode, index)
time.sleep(SLEEP_TIME)
def usage(self, msg):
if (msg != None):
if ( msg[0] != None ):
msg = ' '.join(msg)
print msg
print "Usage:"
print " start_sim [options]"
print " If no options are given this help screen is shown."
print ""
print "Options:"
print " -n --nodelist nodelist"
print " The nodelist describing agents that is to be used. Not valid with --agent, --memory, --agent, or --pseudoname."
print " A nodelist provides the parameters for starting agents. Lines are of the form:"
print " index nodename mem-in-GB"
print " 'index' is any unique number."
print " 'nodename' is the physical node you want the agent placed on"
print " 'mem-in-GB' is the amount of simulated memory the agent will report"
print ""
print " Example:"
print " 1 node290 31"
print " 2 node290 31"
print " 9 node291 31"
print " 10 node291 31"
print " 17 node292 47"
print " 18 node292 47"
print " 25 node293 47"
print " 26 node293 47"
print ""
print " -c --components component"
print " Start the indicated component, must be one of", ' '.join(self.default_components), "or 'all'"
print ""
print " -i --instance instanceid"
print " Start only this instance of an agent from the nodelist."
print ""
print " --agent"
print " Start an agent only on localhost. All of --memory, --addr, and --pseudoname are required, -n is disallowed."
print ""
print " --memory mem"
print " Use this memory override. Valid only with --agent."
print ""
print " --addr"
print " Use this IP override. Valid only with --agent."
print ""
print " --pseudoname pseudoname"
print " Use this as the hostname for the agent. Valid only with -a."
print ""
print " --nothreading"
print " Disable multithreaded operation if it would otherwise be used"
print ""
print " -v, --version"
print " Print the current DUCC version"
sys.exit(1)
def invalid(self, *msg):
if ( msg != None ):
if ( msg[0] != None ):
msg = ' '.join(msg)
print msg
print "For usage run"
print " start_sim -h"
print 'or'
print ' start_sim --help'
sys.exit(1)
def main(self, argv):
print "Running as", os.getpid()
if ( len(argv) == 0 ):
self.usage(None)
os.environ['DUCC_SIM'] = 'True'
node_config = None
components = {}
instances = {}
run_agent = False
memory = None
pseudoname = None
IP = None
or_parms = self.ducc_properties.get('ducc.orchestrator.start.type')
try:
opts, args = getopt.getopt(argv, 'c:i:n:vh?', ['component=', 'help', 'agent', 'memory=',
'instance=', 'addr=', 'pseudoname=', 'node-config=',
'version', 'warm', 'cold',
'nothreading'])
except:
self.invalid('Invalid arguments', ' '.join(argv))
for ( o, a ) in opts:
if o in ( '-n', '--node-config' ):
node_config = a
elif o in ( '--agent' ):
run_agent = True
elif o in ( '-c', '--component' ):
if ( a == 'all' ):
for cmp in self.default_components:
components[cmp] = cmp
else:
components[a] = a
elif o in ( '--memory' ):
memory = a
elif o in ( '--addr' ):
IP = a
elif o in ( '-i', '--instance' ):
instances[a] = a
elif o in ( '--pseudoname' ):
pseudoname = a
elif o in ( '--nothreading' ):
self.disable_threading()
elif o in ( '-v', '--version' ):
print self.version()
os.exit(0)
elif o in ( '--warm', '--cold' ):
or_parms = o[2:]
elif o in ( '-h', '--help' ):
self.usage(None)
elif ( o == '-?'):
self.usage(None)
else:
self.invalid('bad args: ', ' '.join(argv))
self.pids = Properties()
self.ducc = Ducc()
if ( not self.verify_jvm() ):
return
self.set_duccling_version()
self.verify_duccling()
if ( os.path.exists('sim.pids') ):
self.pids.load('sim.pids')
if ( run_agent ):
#
# checks that aren't valid if we want run_agent
#
if ( node_config != None ):
self.invalid("Nodelist is not compatible with agent")
if ( (IP == None ) or ( memory == None) or ( pseudoname == None )):
self.invalid("Missing IP, memory, or pseudoname")
self. run_local_agent(pseudoname, IP, memory)
sys.exit(0)
else:
self.pidlock = Lock()
self.threadpool = ThreadPool(50)
if ( (IP != None) or (memory != None) or ( pseudoname != None )) :
self.invalid("Running with a nodelist is not compatible with running a single agent.");
try:
specials = ['broker', 'db']
sc = set(components)
sb = set(specials)
specials_only = False
if ( sc.issubset(sb) ):
read_pids = True
print '-------- start broker'
if ( components.get('broker') != None ):
self.start_broker()
print '-------- start database'
if ( components.get('db') != None ):
self.start_database()
print '-------- specials_only', specials_only
if ( specials_only ):
return
print '-------- start agents'
if ( node_config != None ):
self.startAgents(node_config, instances)
for (com, com) in components.items():
if ( not com in specials ): # specials start with different rules
self.threadpool.invoke(self.startComponent, com, or_parms)
time.sleep(SLEEP_TIME)
except:
pass
self.threadpool.quit()
self.pids.write('sim.pids')
if __name__ == "__main__":
starter = StartSim()
starter.main(sys.argv[1:])