| #!/usr/bin/env python |
| # ----------------------------------------------------------------------- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # ----------------------------------------------------------------------- |
| |
| # |
| # This is the main DUCC system test driver. Parameters allow various |
| # rates of error injection (both during analytic initialization and |
| # execution) , selection of execution styles (AE, DD, SE- service |
| # based), initialization times. See the usage() method below for details. |
| # |
| # This script assumes a job directory has been prepared by the "prepare" |
| # script in this directory. |
| # |
| |
| import os |
| import sys |
| import getopt |
| import string |
| import time |
| import subprocess |
| import shutil |
| import signal |
| from threading import * |
| import Queue |
| import random |
| |
| DUCC_HOME = os.path.abspath(__file__ + '/../../..') |
| sys.path.append(DUCC_HOME + '/admin') |
| |
| from ducc_util import DuccUtil |
| from properties import Properties |
| from ducc import Ducc |
| |
| class DuccProcess(Thread): |
| def __init__(self, runner, jobfile): |
| Thread.__init__(self) |
| self.runner = runner |
| self.jobfile = jobfile |
| self.DUCC_HOME = DUCC_HOME |
| |
| # |
| # read file and get nthreads, memory, class |
| # |
| def read_jobfile(self): |
| print 'Reading jobfile', self.jobfile |
| f = open(self.jobfile); |
| threads = '1' |
| memory = '15' |
| clz = 'normal' |
| machines = None |
| services = None |
| jobtype = None |
| |
| for line in f: |
| toks = line.strip().split('='); |
| if ( toks[0].strip() == 'threads' ): |
| if ( self.runner.thread_override == None ): |
| threads = toks[1] |
| else: |
| threads = self.runner.thread_override |
| elif (toks[0].strip() == 'class'): |
| clz = toks[1] |
| elif (toks[0].strip() == 'memory'): |
| if ( self.runner.memory_override == None ): |
| memory = toks[1] |
| else: |
| memory = self.runner.memory_override |
| elif (toks[0].strip() == 'user'): |
| user = toks[1] |
| elif (toks[0].strip() == 'machines'): |
| machines = toks[1] |
| elif (toks[0].strip() == 'services'): |
| services = toks[1].strip() |
| elif (toks[0].strip() == 'type'): |
| jobtype = toks[1].strip() |
| |
| answer = {} |
| answer['threads'] = threads |
| answer['memory'] = memory |
| answer['class'] = clz |
| answer['user'] = user |
| answer['machines'] = machines |
| answer['services'] = services |
| answer['type'] = jobtype |
| return answer |
| |
| def execute(self): |
| os.environ['USER'] = self.user |
| print 'CLASSPATH:', os.environ['CLASSPATH'] |
| print 'Running', self.jobfile, 'as', os.environ['USER'], 'compression', self.runner.compression |
| print self.cmd |
| os.system(self.cmd) |
| |
| def run(self): |
| os.environ['USER'] = self.user |
| |
| print 'CLASSPATH:', os.environ['CLASSPATH'] |
| print "Running as", os.environ['USER'] |
| print self.cmd |
| |
| ducc = subprocess.Popen(self.cmd, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) |
| p = ducc.stdout |
| realid = None |
| while 1: |
| line = p.readline().strip() |
| print line |
| if ( line.endswith('submitted') ) : |
| toks = line.split() |
| realid = toks[1] |
| |
| if ( not line ): |
| ducc.wait() |
| break |
| |
| if ( realid == None ): |
| print 'Cannot verify job, no id' |
| return |
| |
| #CMD = "./verify.py -j " + realid |
| #ducc = subprocess.Popen(CMD, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) |
| #p = ducc.stdout |
| #while 1: |
| # line = p.readline().strip() |
| # if ( not line ): |
| # ducc.wait() |
| # return |
| # print 'VERIFY ' + realid + ':', line |
| |
| if ( self.runner.observe ): |
| self.runner.queue.get() # remove marker so main() can eventually exit |
| self.runner.queue.task_done() |
| |
| def jdbloat(self): |
| if ( self.runner.cr_getnext_bloat == 0 ): |
| return '0' |
| |
| toss = random.randint(0, 100) |
| if ( toss < self.runner.cr_getnext_bloat ): |
| return str(self.runner.cr_getnext_bloat) |
| |
| return '0' |
| |
| def calculate_bloat(self, memory): |
| # if we're testing our ability to contan bloat, we'll set xmx to double the |
| # requested memory. the JP will start to allocate stuff in an infinite loop. |
| # the agents are expected to catch this and kill the process before the |
| # machine dies. |
| # |
| # sure hope that works! |
| if ( (self.runner.init_bloat != None) or (self.runner.process_bloat != None) ): |
| xmx = '-Xmx' + str(int(memory)*2) + 'G' |
| if ( self.runner.init_bloat != None ): |
| envparms = ' INIT_BLOAT=' + self.runner.init_bloat |
| else: |
| envparms = ' PROCESS_BLOAT=' + self.runner.process_bloat |
| else: |
| xmx = '-Xmx100M' |
| envparms = '' |
| |
| return (xmx, envparms) |
| |
| def mkargs(self, arglist): |
| return '"' + ' '.join(arglist) + '"' |
| |
| def submit(self): |
| |
| print 'SUBMIT', self.jobfile |
| args = None |
| |
| HERE = os.getcwd(); |
| |
| cr = 'org.apache.uima.ducc.test.randomsleep.FixedSleepCR' |
| |
| parms = self.read_jobfile() |
| nthreads = parms['threads'] |
| memory = parms['memory'] |
| pclass = parms['class'] |
| user = parms['user'] |
| machines = parms['machines'] |
| services = parms['services'] |
| jobtype = parms['type'] |
| (process_xmx, bloat_parms) = self.calculate_bloat(memory) |
| driver_args = [] |
| process_args = [] |
| |
| if ( self.runner.style == 'AE' ): |
| ae = 'org.apache.uima.ducc.test.randomsleep.FixedSleepAE' |
| elif ( self.runner.style == 'DD' ): |
| if ( self.runner.descriptor_as_file ): |
| dd = self.DUCC_HOME + '/examples/simple/resources/randomsleep/DDSleepDescriptor.xml' |
| else: |
| dd = 'org.apache.uima.ducc.test.randomsleep.DDSleepDescriptor' |
| else: |
| ae = 'UimaAsFailAgg_' + services |
| |
| if ( self.runner.use_http ): |
| plain_broker_url = 'http://' + self.runner.broker_host + ':8081' |
| else: |
| plain_broker_url = self.runner.broker_protocol + '://' + self.runner.broker_host + ':' + self.runner.broker_port |
| |
| cr_parms = '"jobfile=' + self.jobfile + ' compression=' + self.runner.compression + '"' |
| process_args.append(process_xmx) |
| process_args.append('-DdefaultBrokerURL=' + plain_broker_url ) |
| |
| if ( self.runner.system == 'Darwin' ): # Keep JP / JD processes from stealing focus on Mac |
| process_args.append('-Djava.awt.headless=true') |
| driver_args.append('-Djava.awt.headless=true') |
| |
| driver_args.append('-Xmx500M') |
| if ( self.runner.jd_uima_log != None ): |
| driver_args.append(' -Djava.util.logging.config.file=' + self.runner.jd_uima_log) |
| |
| if ( self.runner.jp_uima_log != None ): |
| driver_args.append(' -Djava.util.logging.config.file=' + self.runner.jp_uima_log) |
| |
| jvm_driver_args = self.mkargs(driver_args) |
| jvm_process_args = self.mkargs(process_args) |
| |
| print 'jvm_driver_args', jvm_driver_args |
| print 'jvm_process_args', jvm_process_args |
| |
| CMD = os.environ['JAVA_HOME'] + '/bin/java' |
| CMD = CMD + ' ' + self.runner.submit_package + '.cli.DuccJobSubmit' |
| |
| CMD = CMD + ' --description ' + '"' + self.jobfile + '[' + self.runner.style + ']"' |
| CMD = CMD + ' --driver_descriptor_CR ' + cr |
| CMD = CMD + ' --driver_descriptor_CR_overrides ' + cr_parms |
| CMD = CMD + ' --driver_jvm_args ' + jvm_driver_args |
| |
| if ( self.runner.style == 'DD' ): |
| CMD = CMD + ' --process_DD ' + dd |
| else: # ae and se |
| CMD = CMD + ' --process_descriptor_AE ' + ae |
| |
| |
| if ( self.runner.style == 'SE' ): |
| CMD = CMD + ' --service_dependency UIMA-AS:FixedSleepAE_'+ services + ':' + plain_broker_url |
| #CMD = CMD + ' --working_directory ' + working_dir |
| |
| CMD = CMD + ' --process_memory_size ' + memory |
| CMD = CMD + ' --classpath ' + self.runner.examples_classpath |
| |
| CMD = CMD + ' --process_jvm_args ' + jvm_process_args |
| CMD = CMD + ' --process_pipeline_count ' + nthreads |
| CMD = CMD + ' --scheduling_class ' + pclass |
| |
| CMD = CMD + ' --process_per_item_time_max ' + self.runner.process_timeout # in minutes |
| CMD = CMD + ' --process_initialization_failures_cap ' + self.runner.init_fail_cap |
| |
| if ( self.runner.init_timeout > 0 ): |
| CMD = CMD + ' --process_initialization_time_max ' + str(self.runner.init_timeout) |
| |
| CMD = CMD + ' --environment ' \ |
| + '"' \ |
| + ' AE_INIT_TIME=' + str(self.runner.init_time) \ |
| + ' AE_INIT_RANGE=' + str(self.runner.init_range) \ |
| + ' AE_INIT_EXIT=' + str(self.runner.ae_init_exit) \ |
| + ' AE_INIT_ERROR=' + str(self.runner.ae_init_error) \ |
| + ' AE_RUNTIME_EXIT=' + str(self.runner.ae_runtime_exit) \ |
| + ' AE_RUNTIME_ERROR=' + str(self.runner.ae_runtime_error) \ |
| + ' CR_INIT_EXIT=' + str(self.runner.cr_init_exit) \ |
| + ' CR_INIT_ERROR=' + str(self.runner.cr_init_error) \ |
| + ' CR_RUNTIME_EXIT=' + str(self.runner.cr_runtime_exit) \ |
| + ' CR_RUNTIME_ERROR=' + str(self.runner.cr_runtime_error) \ |
| + bloat_parms \ |
| + ' CR_GETNEXT_BLOAT=' + self.jdbloat() \ |
| + ' LD_LIBRARY_PATH=/a/bogus/path' \ |
| + '"' |
| |
| if ( self.runner.max_machines == 0 or jobtype == 'reserve' ): |
| if ( machines != None ): |
| CMD = CMD + ' --process_deployments_max ' + machines |
| elif (self.runner.max_machines != -1 ): |
| CMD = CMD + ' --process_deployments_max ' + self.runner.max_machines |
| |
| if ( self.runner.observe ): |
| CMD = CMD + ' --wait_for_completion' |
| |
| self.user = user |
| self.cmd = CMD |
| |
| class ServiceThread(Thread): |
| |
| def __init__(self, cmd): |
| Thread.__init__(self) |
| self.cmd = cmd |
| self.terminated = False |
| |
| def stop_service(self): |
| self.terminated = True |
| #os.system('kill -2 ' + str(self.svc.pid)) |
| self.svc.send_signal(2) |
| |
| def run(self): |
| print 'Starting service:', self.cmd |
| self.svc = subprocess.Popen(self.cmd, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) |
| p = self.svc.stdout |
| while 1: |
| line = p.readline().strip() |
| if ( (not line) or ( line == '') ): |
| if ( self.terminated ): |
| return |
| |
| print 'VERIFY :' + line + ':' |
| |
| |
| class ServiceStarter(DuccUtil): |
| def __init__(self, runner): |
| DuccUtil.__init__(self) |
| self.runner = runner |
| |
| def gen_service(self, svcid, autostart): |
| os.environ['USER'] = os.environ['LOGNAME'] # make sure I'm me - after submit may not be |
| |
| if ( self.runner.use_http ): |
| plain_broker_url = 'http://' + self.broker_host + ':8081' |
| else: |
| plain_broker_url = self.broker_protocol + '://' + self.broker_host + ':' + self.broker_port |
| |
| props = Properties() |
| props.put('description', 'Test Service ' + svcid) |
| props.put('process_descriptor_DD', self.DUCC_HOME + '/examples/simple/resources/service/Service_FixedSleep_' + svcid + '.xml') |
| props.put('process_memory_size', '15') |
| props.put('service_linger', '60000') |
| |
| props.put('classpath', self.runner.examples_classpath); |
| props.put('process_jvm_args', '-Xmx100M -DdefaultBrokerURL=' + plain_broker_url) |
| props.put('environment', 'AE_INIT_TIME=5000 AE_INIT_RANGE=1000 INIT_ERROR=0 LD_LIBRARY_PATH=/yet/a/nother/dumb/path') |
| props.put('scheduling_class', 'fixed') |
| props.put('working_directory', os.getcwd()) |
| props.put('service_ping_arguments', 'broker-jmx-port=' + self.broker_jmx_port) |
| if ( autostart ): |
| props.put('autostart', 'true') |
| |
| svcfile = svcid + '.gen.svc' |
| props.write(svcfile) |
| return svcfile |
| |
| def register_service(self, svcid, instances, start, autostart): |
| os.environ['USER'] = os.environ['LOGNAME'] # make sure I'm me - after submit may not be |
| |
| svcfile = self.gen_service(svcid, autostart) |
| CMD = self.DUCC_HOME + '/bin/ducc_services --register ' + svcfile + ' --instances ' + instances |
| lines = self.popen(CMD) |
| for line in lines: |
| line = line.strip() |
| print 'REGISTER', line |
| toks = line.split() |
| if ( (toks[0] == 'Service') and (toks[2] == 'succeeded') ): |
| print 'Service registered as service', toks[7] |
| if ( start ): |
| print 'Starting registered service instance', svcid, 'service id', toks[7] |
| os.system(self.DUCC_HOME + '/bin/ducc_services --start ' + toks[6]) |
| return toks[6] |
| print 'Cannot register service', svcid, ':', line |
| sys.exit(1) |
| |
| def start_services(self, svcfile): |
| os.environ['USER'] = os.environ['LOGNAME'] # make sure I'm me - after submit may not be |
| |
| svcprops = Properties() |
| svcprops.load(svcfile) |
| |
| all_services = {} |
| |
| register = svcprops.get('register') |
| self.registered = {} |
| if ( register != None ): |
| register = register.strip() |
| # make a map with the service id as key and the number of instances as val |
| toks = register.split() |
| for t in toks: |
| t = t.strip() |
| if ( self.registered.has_key(t) or all_services.has_key(t) ): |
| print "Duplicate registered service", t |
| sys.exit(1) |
| self.registered[t] = None |
| all_services[t] = None |
| |
| start = svcprops.get('start') |
| self.started = {} |
| if ( start != None ): |
| start = start.strip() |
| # make a map with the service id as key and the number of instances as val |
| toks = start.split() |
| for t in toks: |
| t = t.strip() |
| if ( self.started.has_key(t) ): |
| print "Duplicate started service", t |
| sys.exit(1) |
| if ( not self.registered.has_key(t) ): |
| print "Trying to start service", t, "but it is not registered." |
| self.started[t] = None |
| |
| auto = svcprops.get('autostart') |
| self.autostarted = {} |
| if ( auto != None ): |
| auto = auto.strip() |
| # make a map with the service id as key and the number of instances as val |
| toks = auto.split() |
| for t in toks: |
| t = t.strip() |
| if ( self.autostarted.has_key(t) ): |
| print "Duplicate auto-started service", t |
| sys.exit(1) |
| if ( not self.registered.has_key(t) ): |
| print "Trying to start service", t, "but it is not registered." |
| self.autostarted[t] = None |
| |
| for (k, v) in self.registered.items(): |
| instances = svcprops.get("instances_" + k) |
| if ( instances == None ): |
| print "Missing instances for registered job", k |
| print "Registering service", k, "with", instances, "instances" |
| start = self.started.has_key(k) |
| autostart = self.autostarted.has_key(k) |
| service = self.register_service(k, instances, start, autostart) |
| svcs = self.registered[k] |
| if ( svcs == None ): |
| svcs = [] |
| self.registered[k] = svcs |
| svcs.append(service) |
| |
| os.system(self.DUCC_HOME + '/bin/ducc_services --query') |
| |
| def stop_services(self): |
| os.environ['USER'] = os.environ['LOGNAME'] # make sure I'm me - after submit may not be |
| |
| for (k, v) in self.registered.items(): |
| for id in v: |
| print 'Unregistering', id |
| os.system(self.DUCC_HOME + "/bin/ducc_services --unregister " + id) |
| |
| class RunDucc(DuccUtil): |
| |
| def run_batch(self): |
| |
| counter = 0 |
| running = False |
| |
| bfile = self.test_dir + '/' + self.batchfile |
| if ( not os.path.exists(bfile) ): |
| print 'File', bfile, 'does not exist.' |
| sys.exit(1) |
| |
| f = open(bfile) |
| |
| for line in f: |
| print '----', line.strip() |
| if ( line[0] == '#' ): |
| continue |
| |
| if ( line[0:2] == 's ' ): |
| running = True |
| toks = line.split() |
| if ( toks[1] == '-c' ): |
| jobfile = toks[3] |
| self.compression = toks[2] |
| else: |
| jobfile = toks[1] |
| self.compression = '1' |
| |
| ducc_process = DuccProcess(self, jobfile) |
| ducc_process.submit() |
| |
| if ( self.observe ) : |
| self.queue.put(jobfile) # any old marker will do |
| ducc_process.start() |
| else: |
| ducc_process.execute() |
| continue |
| |
| if ( not running ): |
| continue |
| |
| if ( line[0:2] == 'x ' ): |
| sys.exit(0) |
| continue |
| |
| if ( line[0:6] == '[sleep' ): |
| #toks = string.translate(line, None, '[]').split() |
| toks = line.strip().strip('[]').split() |
| print toks |
| t = toks[1].strip() |
| |
| # we're not going to try millisecond sleep - it's probably overdesigned |
| # to do that anyway. |
| if ( t[-1:] == 'S' ): |
| delay = int(t.strip('S')) |
| elif ( t[-1:] == 'M' ): |
| delay = int(t.strip('M')) * 60 |
| else: |
| delay = int(t); |
| print 'SLEEP', str(delay) |
| |
| time.sleep(delay) |
| continue |
| |
| # these next aren't supported in "ducc" mode |
| if ( line[0:3] == 'qm '): |
| continue |
| if ( line[0:3] == 'qj '): |
| continue |
| if ( line[0:3] == 'qc '): |
| continue |
| |
| def usage(self, msg): |
| if ( msg != None ): |
| print msg |
| print 'Usage:' |
| print ' runducc.py [optons]' |
| print 'Options:' |
| print ' --AE' |
| print ' Specifies to run this as a single CR and AE.' |
| print '' |
| print ' --DD' |
| print ' Specifies to run this as CR + CM / AE / CC pipeline.' |
| print '' |
| print ' --SE service-startup-config' |
| print ' Specifies to run this with the AE as a delegate service. The required parameter specifies' |
| print ' a service startup script.' |
| print '' |
| print ' --FILE' |
| print ' Use DD descriptor in filesystem, not as resource in jar file.' |
| print '' |
| print ' --http' |
| print ' Use HTTP instead of tcp for services' |
| print '' |
| print ' -d, --directory dir' |
| print ' This is the directory with the test files and configuration. Required' |
| print '' |
| print ' -b, --batchfile file' |
| print ' This is the batch file describing the submissions. Required.' |
| print '' |
| print ' -i, --init_time milliseconds' |
| print ' This is the AE initialization minimum time in seconds. Default:', self.init_time |
| print '' |
| print ' --init_timeout minutes' |
| print ' Max time in minutes NOTE MINUTES a process is allowed to initialize. Best used n conjunction with careful choice of' |
| print ' -i and -r' |
| print '' |
| print ' --init_fail_cap number-of-failures.' |
| print ' This is the max init failures tolerated before the system starts to cap processes. Default:', self.init_fail_cap |
| print '' |
| print ' --IB' |
| print ' The JP will leak in init() until DUCC (hopefully) kills us' |
| print '' |
| print ' --PB' |
| print ' The JP will leak in process() until DUCC (hopefully) kills us' |
| print '' |
| print ' --TO number-of-threads' |
| print ' Thread-override: force this number of threads regardless of what is in job spec.' |
| print '' |
| print ' -r, --range seconds' |
| print ' This is the AE initializion time range over base in milliseconds. Default:', self.init_range |
| print ' Init time is -i value + random[0, -rvalue]' |
| print '' |
| print ' -m, --memory_override mem-in-GB' |
| print ' Use this instead of what is in the props file. Default: None' |
| print '' |
| print ' -n, --nmachines_override process_deployments_max' |
| print '' |
| print ' -o, --observe' |
| print ' Specifies that we submit in keepalive mode and observe(watch) the jobs, creating a dir with outputs. Default:', self.observe |
| print ' If specified, we run verification against the results.' |
| print '' |
| print ' -p, --process_timeout sec' |
| print ' Process timeout, in minutes. Default:', self.process_timeout |
| print '' |
| print ' --jd_uima_log log-properties' |
| print ' If specified, use the indicated properties file for JD UIMA/UIMA-AS logging. Default:', self.jd_uima_log |
| print '' |
| print ' --jp_uima_log log-properties' |
| print ' If specified, use the indicated properties file for JP UIMA/UIMA-AS logging. Default:', self.jp_uima_log |
| print '' |
| print ' -q CR Probality it will leak on each getNext. Default', self.cr_getnext_bloat |
| print '' |
| print ' -s' |
| print ' AE Probability that a JP will spontaneously exit during initialization. Default:', self.ae_init_exit |
| print '' |
| print ' -t' |
| print ' AE Probability that a JP will throw an exception during initialization. Default:', self.ae_init_error |
| print '' |
| print ' -u' |
| print ' AE Probability that a JP will spontaneously exit in the process method. Default:', self.ae_runtime_exit |
| print '' |
| print ' -v' |
| print ' AE Probability that a JP will throw an exception in the process method. Default:', self.ae_runtime_error |
| print '' |
| print ' -w' |
| print ' CR Probability that a JD will spontaneously exit during initialization. Default:', self.cr_init_exit |
| print '' |
| print ' -x' |
| print ' CR Probability that a JD will throw an exception during initialization. Default:', self.cr_init_error |
| print '' |
| print ' -y' |
| print ' CR Probability that a JD will spontaneously exit in the process method. Default:', self.cr_runtime_exit |
| print '' |
| print ' -z' |
| print ' CR Probability that a JD will throw an exception in the process method. Default:', self.cr_runtime_error |
| print '' |
| print 'We run with DUCC_HOME set to', self.DUCC_HOME |
| sys.exit(1) |
| |
| def main(self, argv): |
| |
| self.test_dir = None |
| self.batchfile = None |
| self.observe = False |
| self.ae_init_exit = 0 # -s int 0-100 |
| self.ae_init_error = 0 # -t int 0-100 |
| self.ae_runtime_exit = 0.0 # -u float |
| self.ae_runtime_error = 0.0 # -v float |
| self.cr_init_exit = 0 # -w int 0-100 |
| self.cr_init_error = 0 # -x int 0-100 |
| self.cr_runtime_exit = 0.0 # -y float |
| self.cr_runtime_error = 0.0 # -z float |
| self.cr_getnext_bloat = 0 # -q jd leakage |
| self.init_fail_cap = '99' |
| self.memory_override = None |
| self.init_time = 10000 |
| self.init_range = 1000 |
| self.init_timeout = 0 |
| self.process_timeout = str(60*24) # 24 hour default - nothing in current megas will fail on this |
| self.style = 'AE' |
| self.service_pid = None |
| self.init_bloat = None |
| self.process_bloat = None |
| self.service_startup = None |
| self.jd_uima_log = None |
| self.jp_uima_log = None |
| self.submit_package = 'org.apache.uima.ducc' |
| self.max_machines = 0 |
| self.use_http = False |
| self.descriptor_as_file = False |
| self.thread_override = None |
| |
| try: |
| opts, args = getopt.getopt(argv, 'b:d:fi:m:n:op:q:r:s:t:u:v:w:x:y:z:?h', ['AE', 'DD', 'file', 'SE=', 'IB=', 'PB=', 'directory=', 'batchfile=', 'init_time=', |
| 'init_fail_cap=', 'range=', 'memory_override=', 'nmachines=', 'process_timeout=', |
| 'init_timeout=', 'observe' |
| 'jd_uima_log=', 'jp_uima_log=', |
| 'http', 'threads=' |
| ]) |
| except: |
| print "Unknown option" |
| self.usage(None) |
| |
| for ( o, a ) in opts: |
| print o, a |
| if o in ('-d', '--directory'): |
| self.test_dir = a |
| elif o in ('-b', '--batchfile'): |
| self.batchfile = a |
| elif o in ('-i', '--init_time'): |
| self.init_time = int(a) * 1000 |
| elif o in ('-i', '--init_fail_cap'): |
| self.init_fail_cap = a |
| elif o in ('-r', '--range'): |
| self.init_range = int(a) * 1000 |
| elif o in ('-m', '--memory_override'): |
| self.memory_override = a |
| elif o in ('-n', '--nmachines'): |
| self.max_machines = int(a) # force ugly failure if not a number |
| self.max_machines = a |
| elif o in ('-p', '--process_timeout'): |
| self.process_timeout = a |
| elif o in ('-o', '--observe' ): |
| self.observe = True |
| elif o in ('--init_timeout' ): |
| self.init_timeout = int(a) |
| elif o in ('--jd_uima_log' ): |
| self.jd_uima_log = a |
| elif o in ('--jp_uima_log' ): |
| self.jp_uima_log = a |
| elif o in ('--AE'): |
| self.style = 'AE' |
| elif o in ('--DD'): |
| self.style = 'DD' |
| elif o in ('--SE'): |
| self.style = 'SE' |
| self.service_startup = a |
| self.observe = True |
| elif o in ( '-f', '--file'): |
| self.descriptor_as_file = True |
| elif o in ('--http'): |
| self.use_http = True |
| elif o in ('--IB'): |
| self.init_bloat = a |
| elif o in ('--PB'): |
| self.process_bloat = a |
| elif o in ('--threads'): |
| self.thread_override = a |
| elif ( o == '-q'): |
| self.cr_getnext_bloat = int(a) |
| elif ( o == '-s'): |
| self.ae_init_exit = int(a) |
| elif ( o == '-t'): |
| self.ae_init_error = int(a) |
| elif ( o == '-u'): |
| self.ae_runtime_exit = float(a) |
| elif ( o == '-v'): |
| self.ae_runtime_error = float(a) |
| elif ( o == '-w'): |
| self.cr_init_exit = int(a) |
| elif ( o == '-x'): |
| self.cr_init_error = int(a) |
| elif ( o == '-y'): |
| self.cr_runtime_exit = float(a) |
| elif ( o == '-z'): |
| self.cr_runtime_error = float(a) |
| elif ( o == '-?'): |
| self.usage(None) |
| else: |
| print 'Invalud argument:', o |
| self.usage(None) |
| |
| if ( self.test_dir == None ): |
| self.usage("Missing test_dir") |
| |
| print 'Running with' |
| print ' test_dir :', self.test_dir |
| print ' batchfile :', self.batchfile |
| print ' style :', self.style |
| print ' descriptor as file :', self.descriptor_as_file |
| print ' http :', self.use_http |
| print ' init-time :', self.init_time / 1000 |
| print ' init-range :', self.init_range / 1000 |
| print ' init-timeout :', self.init_timeout |
| print ' init-bloat :', self.init_bloat |
| print ' process-bloat :', self.process_bloat |
| print ' observe :', self.observe |
| print ' ae_init_exit :', self.ae_init_exit |
| print ' ae_init_error :', self.ae_init_error |
| print ' ae_runtime_exit :', self.ae_runtime_exit |
| print ' ae_runtime_error :', self.ae_runtime_error |
| print ' cr_init_exit :', self.cr_init_exit |
| print ' cr_init_error :', self.cr_init_error |
| print ' cr_runtime_exit :', self.cr_runtime_exit |
| print ' cr_runtime_error :', self.cr_runtime_error |
| print ' cr_getnext_bloat :', self.cr_getnext_bloat |
| print ' process_timeout :', self.process_timeout |
| print ' memory_override :', self.memory_override |
| print ' max_machines :', self.max_machines |
| print ' jd_uima_log :', self.jd_uima_log |
| print ' jp_uima_log :', self.jp_uima_log |
| print ' DUCC_HOME :', self.DUCC_HOME |
| print ' Thread override :', self.thread_override |
| |
| self.submit_package = 'org.apache.uima.ducc' |
| |
| |
| cp = [] |
| cp.append(self.DUCC_HOME + '/lib/uima-ducc/examples/*') |
| cp.append(self.DUCC_HOME + '/apache-uima/lib/*') |
| cp.append(self.DUCC_HOME + '/apache-uima/apache-activemq/lib/*') |
| cp.append(self.DUCC_HOME + '/apache-uima/apache-activemq/lib/optional/*') |
| cp.append(self.DUCC_HOME + '/examples/simple/resources/service') |
| self.examples_classpath = ':'.join(cp) |
| |
| if ( self.style == 'SE' ): |
| if ( self.service_startup == None ): |
| usage("Missing service startup file") |
| |
| svcfile = self.test_dir + '/' + self.service_startup |
| service_starter = ServiceStarter(self); |
| service_starter.start_services(svcfile) |
| |
| print 'Pausing a bit' |
| time.sleep(15) |
| os.system(self.DUCC_HOME + '/bin/ducc_services --query') |
| #service_starter.stop_services() |
| #return; |
| |
| os.environ['CLASSPATH'] = self.DUCC_HOME + "/lib/uima-ducc-cli.jar" |
| if ( self.observe ): |
| self.queue = Queue.Queue() |
| |
| self.run_batch() |
| |
| if ( self.observe ): |
| self.queue.join() |
| print 'All threads returned' |
| if ( self.style == 'SE' ): |
| service_starter.stop_services() |
| |
| |
| # -------------------------------------------------------------------------------- |
| |
| # -------------------------------------------------------------------------------- |
| if __name__ == "__main__": |
| runducc = RunDucc() |
| runducc.main(sys.argv[1:]) |