| #!/usr/bin/env python |
| # ----------------------------------------------------------------------- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # ----------------------------------------------------------------------- |
| |
| # |
| # This script starts a simulated DUCC cluster. All processes are real, so all |
| # codepaths are executed. The simulation comes from running multiple DUCC agents |
| # on a single node, allowing large-scale testing on small-scale clusters. All |
| # the system-test jobs sleep rather than compute and as such are very small and |
| # inexpensive, which allows large-scale testing to be performed on minimal |
| # hardware. |
| # |
| |
| import os |
| import sys |
| import time |
| import getopt |
| |
| from threading import * |
| import Queue |
| |
| #designed to run only here, relative to ducc_runtime |
| DUCC_HOME = os.path.abspath(__file__ + '/../../..') |
| sys.path.append(DUCC_HOME + '/admin') |
| |
| from ducc_util import DuccUtil |
| from properties import Properties |
| from ducc import Ducc |
| from ducc_util import ThreadPool |
| |
| |
| class StartSim(DuccUtil): |
| |
| def __init__(self): |
| DuccUtil.__init__(self, True) |
| |
| def start_broker(self): |
| |
| if ( not self.automanage ): |
| print "Broker is not automanaged, returning." |
| |
| broker_host = self.ducc_properties.get('ducc.broker.hostname') |
| print 'broker host', broker_host |
| lines = self.ssh(broker_host, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c', 'broker', "'") |
| while 1: |
| line = lines.readline().strip() |
| if ( not line ): |
| break |
| #print '[] ' + line |
| if ( line.startswith('PID') ): |
| toks = line.split(' ') # get the PID |
| print "Broker on", broker_host, 'PID', toks[1] |
| self.pids.put('broker@' + broker_host, toks[1]) |
| lines.close() |
| break |
| |
| for i in range(0, 9): |
| if ( self.is_amq_active() ): |
| return |
| print 'Waiting for broker', str(i) |
| time.sleep(1) |
| |
| def run_local_agent(self, pnode, ip, memory ): |
| |
| if ( not self.verify_jvm() ): |
| return |
| self.verify_duccling() |
| self.verify_limits() |
| |
| memory = int(memory) * 1024 * 1024 # to GB from KB |
| CMDPARMS = [] |
| CMDPARMS.append(self.java()) |
| CMDPARMS.append('-Dducc.deploy.components=agent') |
| CMDPARMS.append('-Dos.page.size=' + self.os_pagesize) |
| CMDPARMS.append('-Dducc.deploy.configuration=' + self.DUCC_HOME + "/resources/ducc.properties") |
| CMDPARMS.append('-Djava.library.path=' + self.DUCC_HOME) |
| CMDPARMS.append('-Xmx100M') |
| CMDPARMS.append('-Dducc.agent.node.metrics.fake.memory.size=' + str(memory)) |
| CMDPARMS.append('-Dducc.agent.virtual') |
| CMDPARMS.append('org.apache.uima.ducc.common.main.DuccService') |
| |
| print "Start agent with pnode", pnode, "IP", ip, "memory", memory |
| os.environ['NodeName'] = pnode |
| os.environ['IP'] = ip |
| |
| self.nohup(CMDPARMS) |
| #self.spawn(' '.join(CMDPARMS)) |
| |
| # |
| # Start admin components rm pm sm ws db or, on local node using Ducc.py |
| # |
| def startComponent(self, args): |
| |
| msgs = [] |
| com, or_parms = args |
| |
| if ( com in ('ws', 'viz') ): |
| node = self.webserver_node |
| else: |
| node = self.localhost |
| |
| if ( com == 'or' ): |
| lines = self.ssh(node, True, "'", |
| self.DUCC_HOME + '/admin/ducc.py', '-c', 'or', '-b', |
| '--or_parms', or_parms, "'") |
| else: |
| lines = self.ssh(node, True, "'", |
| self.DUCC_HOME + '/admin/ducc.py', '-c', com, '-b', "'") |
| |
| msgs.append(('Start', com, 'on', node)) |
| while 1: |
| line = lines.readline().strip() |
| if ( not line ): |
| break |
| # print '[] ' + line |
| if ( line.startswith('PID') ): |
| toks = line.split(' ') # get the PID |
| self.pidlock.acquire(); |
| self.pids.put(com, self.localhost + ' ' + toks[1] + ' ' + self.localhost) |
| self.pidlock.release(); |
| lines.close() |
| msgs.append((' PID', toks[1])) |
| break |
| |
| return msgs |
| |
| # |
| # Read the special nodelist and start "special" agents |
| # |
| # Nodelist has records compatible with java properties like this: |
| # |
| # index nodename memory-in-gb |
| # |
| # We generate fake IP addresses from 192.168.4.X where x is the index. |
| # We generate fake node names from nodename-X where x is the index. |
| # We pass the memory-in-gb to the agent as the fake memory it reports instead of real memory. |
| # |
| # We use Jerry's memory override for each agent to get it to falsely report the memory |
| # instead of reading from the real machine. |
| # |
| |
| def startOneAgent(self, args): |
| response = [] |
| node, cmd, mem, ip, pnode, index = args |
| response.append(('Starting agent on', node, 'instance', index, 'as pseudo-node', pnode, 'IP', ip, 'memory', mem)) |
| lines = self.ssh(node, True, "'", cmd, '--agent', '--memory', mem, '--addr', ip, '--pseudoname', pnode, "'") |
| while 1: |
| line = lines.readline().strip() |
| if ( not line ): |
| break |
| #print '[]', line |
| if ( line.startswith('PID')): |
| toks = line.split(' ') # get the PID |
| lines.close() |
| response.append((' ... Started, PID', toks[1])) |
| |
| # Gottoa run on old python that doesn't know 'with' |
| self.pidlock.acquire() |
| self.pids.put(index, node + ' ' + toks[1] + ' ' + pnode) |
| self.pidlock.release() |
| |
| break |
| return response |
| |
| def startAgents(self, nodelist, instances): |
| do_all = True |
| if ( len(instances) > 0 ): |
| do_all = False |
| print 'Starting instances:', instances |
| |
| print "Starting from nodes in", nodelist |
| props = Properties() |
| props.load(nodelist) |
| |
| # first parse the node config into a list of tuples describing the simulated nodes |
| # each tuple is like this: (index, nodename, mem) |
| # where 'index' is a unique number (must be a number) |
| # 'nodename' is the name of a real node, which will contain one or more agents |
| # to simulate multiple nodes |
| # 'mem' is the simulated memory the agent will report |
| nodes = props.get('nodes').split() |
| mems = props.get('memory').split() |
| |
| allnodes = [] |
| ndx = 1 |
| for node in nodes: |
| # allow the index to be reset by-node so you can vary the number of |
| # pseudo nodes on a real node without changing the indexes of others |
| # if desired, e.g. to manage a custom ducc.nodes |
| key = 'index.' + node |
| if ( props.has_key(key) ): |
| ndx = int(props.get(key)) |
| |
| # generate node names and associate simulated memory |
| for mem in mems: |
| count = props.get(node + '.' + mem) |
| if ( count != None ): |
| for i in range(0, int(count)): |
| allnodes.append( (str(ndx), node, mem) ) |
| ndx = ndx + 1 |
| |
| |
| here = os.getcwd() |
| cmd = os.path.abspath(sys.argv[0]) |
| for (index, node, mem) in allnodes: |
| if ( not do_all ): |
| if ( not instances.has_key(index) ): |
| continue |
| |
| ip = '192.168.4.' + index |
| pnode = node + '-' + index |
| self.threadpool.invoke(self.startOneAgent, node, cmd, mem, ip, pnode, index) |
| time.sleep(.1) |
| |
| def usage(self, msg): |
| if (msg != None): |
| if ( msg[0] != None ): |
| msg = ' '.join(msg) |
| print msg |
| |
| print "Usage:" |
| print " start_sim [options]" |
| print " If no options are given this help screen is shown." |
| print "" |
| print "Options:" |
| print " -n --nodelist nodelist" |
| print " The nodelist describing agents that is to be used. Not valid with --agent, --memory, --agent, or --pseudoname." |
| print " A nodelist provides the parameters for starting agents. Lines are of the form:" |
| print " index nodename mem-in-GB" |
| print " 'index' is any unique number." |
| print " 'nodename' is the physical node you want the agent placed on" |
| print " 'mem-in-GB' is the amount of simulated memory the agent will report" |
| print "" |
| print " Example:" |
| print " 1 node290 31" |
| print " 2 node290 31" |
| print " 9 node291 31" |
| print " 10 node291 31" |
| print " 17 node292 47" |
| print " 18 node292 47" |
| print " 25 node293 47" |
| print " 26 node293 47" |
| print "" |
| print " -c --components component" |
| print " Start the indicated component, must be one of", ' '.join(self.default_components), "or 'all'" |
| print "" |
| print " -i --instance instanceid" |
| print " Start only this instance of an agent from the nodelist." |
| print "" |
| print " --agent" |
| print " Start an agent only on localhost. All of --memory, --addr, and --pseudoname are required, -n is disallowed." |
| print "" |
| print " --memory mem" |
| print " Use this memory override. Valid only with --agent." |
| print "" |
| print " --addr" |
| print " Use this IP override. Valid only with --agent." |
| print "" |
| print " --pseudoname pseudoname" |
| print " Use this as the hostname for the agent. Valid only with -a." |
| print "" |
| print " --nothreading" |
| print " Disable multithreaded operation if it would otherwise be used" |
| print "" |
| print " -v, --version" |
| print " Print the current DUCC version" |
| sys.exit(1) |
| |
| |
| def invalid(self, *msg): |
| if ( msg != None ): |
| if ( msg[0] != None ): |
| msg = ' '.join(msg) |
| print msg |
| |
| print "For usage run" |
| print " start_sim -h" |
| print 'or' |
| print ' start_sim --help' |
| sys.exit(1) |
| |
| def main(self, argv): |
| print "Running as", os.getpid() |
| |
| if ( len(argv) == 0 ): |
| self.usage(None) |
| |
| node_config = None |
| components = {} |
| instances = {} |
| run_agent = False |
| memory = None |
| pseudoname = None |
| IP = None |
| |
| or_parms = self.ducc_properties.get('ducc.orchestrator.start.type') |
| |
| try: |
| opts, args = getopt.getopt(argv, 'c:i:n:vh?', ['component=', 'help', 'agent', 'memory=', |
| 'instance=', 'addr=', 'pseudoname=', 'node-config=', |
| 'version', 'hot', 'warm', 'cold', |
| 'nothreading']) |
| except: |
| self.invalid('Invalid arguments', ' '.join(argv)) |
| |
| for ( o, a ) in opts: |
| if o in ( '-n', '--node-config' ): |
| node_config = a |
| elif o in ( '--agent' ): |
| run_agent = True |
| elif o in ( '-c', '--component' ): |
| if ( a == 'all' ): |
| for cmp in self.default_components: |
| components[cmp] = cmp |
| else: |
| components[a] = a |
| elif o in ( '--memory' ): |
| memory = a |
| elif o in ( '--addr' ): |
| IP = a |
| elif o in ( '-i', '--instance' ): |
| instances[a] = a |
| elif o in ( '--pseudoname' ): |
| pseudoname = a |
| elif o in ( '--nothreading' ): |
| self.disable_threading() |
| elif o in ( '-v', '--version' ): |
| print self.version() |
| os.exit(0) |
| elif o in ( '--hot', '--warm', '--cold' ): |
| or_parms = o[2:] |
| elif o in ( '-h', '--help' ): |
| self.usage(None) |
| elif ( o == '-?'): |
| self.usage(None) |
| else: |
| self.invalid('bad args: ', ' '.join(argv)) |
| |
| self.pids = Properties() |
| self.ducc = Ducc() |
| |
| if ( not self.verify_jvm() ): |
| return |
| |
| self.set_duccling_version() |
| self.verify_duccling() |
| |
| if ( os.path.exists('sim.pids') ): |
| self.pids.load('sim.pids') |
| if ( run_agent ): |
| # |
| # checks that aren't valid if we want run_agent |
| # |
| if ( node_config != None ): |
| self.invalid("Nodelist is not compatible with agent") |
| |
| if ( (IP == None ) or ( memory == None) or ( pseudoname == None )): |
| self.invalid("Missing IP, memory, or pseudoname") |
| |
| self. run_local_agent(pseudoname, IP, memory) |
| sys.exit(0) |
| else: |
| self.pidlock = Lock() |
| self.threadpool = ThreadPool(50) |
| |
| if ( (IP != None) or (memory != None) or ( pseudoname != None )) : |
| self.invalid("Running with a nodelist is not compatible with running a single agent."); |
| |
| try: |
| if ( components.get('broker') != None ): |
| self.start_broker() |
| |
| if ( node_config != None ): |
| self.startAgents(node_config, instances) |
| |
| for (com, com) in components.items(): |
| if ( com != 'broker' ): # started separately |
| self.threadpool.invoke(self.startComponent, com, or_parms) |
| |
| except: |
| pass |
| |
| self.threadpool.quit() |
| self.pids.write('sim.pids') |
| |
| if __name__ == "__main__": |
| starter = StartSim() |
| starter.main(sys.argv[1:]) |
| |