blob: aab9ad95468c41ff428c835dfe46e4ddd36f0b10 [file] [log] [blame]
#!/usr/bin/env python
# -----------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# -----------------------------------------------------------------------
# This is the main DUCC system test driver. Parameters allow various
# rates of error injection (both during analytic initialization and
# execution) , selection of execution styles (AE, DD, SE- service
# based), initialization times. See the usage() method below for details.
# This script assumes a job directory has been prepared by the "prepare"
# script in this directory.
import os
import sys
import getopt
import string
import time
import subprocess
import shutil
import signal
from threading import *
import Queue
import random
DUCC_HOME = os.path.abspath(__file__ + '/../../..')
sys.path.append(DUCC_HOME + '/admin')
from ducc_util import DuccUtil
from properties import Properties
from ducc import Ducc
class DuccProcess(Thread):
def __init__(self, runner, jobfile):
self.runner = runner
self.jobfile = jobfile
# read file and get nthreads, memory, class
def read_jobfile(self):
print 'Reading jobfile', self.jobfile
f = open(self.jobfile);
threads = '1'
memory = '15'
clz = 'normal'
machines = None
services = None
jobtype = None
for line in f:
toks = line.strip().split('=');
if ( toks[0].strip() == 'threads' ):
threads = toks[1]
elif (toks[0].strip() == 'class'):
clz = toks[1]
elif (toks[0].strip() == 'memory'):
if ( self.runner.memory_override == None ):
memory = toks[1]
memory = self.runner.memory_override
elif (toks[0].strip() == 'user'):
user = toks[1]
elif (toks[0].strip() == 'machines'):
machines = toks[1]
elif (toks[0].strip() == 'services'):
services = toks[1].strip()
elif (toks[0].strip() == 'type'):
jobtype = toks[1].strip()
answer = {}
answer['threads'] = threads
answer['memory'] = memory
answer['class'] = clz
answer['user'] = user
answer['machines'] = machines
answer['services'] = services
answer['type'] = jobtype
return answer
def execute(self):
os.environ['USER'] = self.user
print 'CLASSPATH:', os.environ['CLASSPATH']
print 'Running', self.jobfile, 'as', os.environ['USER'], 'compression', self.runner.compression
print self.cmd
def run(self):
os.environ['USER'] = self.user
print 'CLASSPATH:', os.environ['CLASSPATH']
print "Running as", os.environ['USER']
print self.cmd
ducc = subprocess.Popen(self.cmd, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
p = ducc.stdout
realid = None
while 1:
line = p.readline().strip()
print line
if ( line.endswith('submitted') ) :
toks = line.split()
realid = toks[1]
if ( not line ):
if ( realid == None ):
print 'Cannot verify job, no id'
#CMD = "./ -j " + realid
#ducc = subprocess.Popen(CMD, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
#p = ducc.stdout
#while 1:
# line = p.readline().strip()
# if ( not line ):
# ducc.wait()
# return
# print 'VERIFY ' + realid + ':', line
if ( self.runner.observe ):
self.runner.queue.get() # remove marker so main() can eventually exit
def jdbloat(self):
if ( self.runner.cr_getnext_bloat == 0 ):
return '0'
toss = random.randint(0, 100)
if ( toss < self.runner.cr_getnext_bloat ):
return str(self.runner.cr_getnext_bloat)
return '0'
def calculate_bloat(self, memory):
# if we're testing our ability to contan bloat, we'll set xmx to double the
# requested memory. the JP will start to allocate stuff in an infinite loop.
# the agents are expected to catch this and kill the process before the
# machine dies.
# sure hope that works!
if ( (self.runner.init_bloat != None) or (self.runner.process_bloat != None) ):
xmx = '-Xmx' + str(int(memory)*2) + 'G'
if ( self.runner.init_bloat != None ):
envparms = ' INIT_BLOAT=' + self.runner.init_bloat
envparms = ' PROCESS_BLOAT=' + self.runner.process_bloat
xmx = '-Xmx100M'
envparms = ''
return (xmx, envparms)
def mkargs(self, arglist):
return '"' + ' '.join(arglist) + '"'
def submit(self):
print 'SUBMIT', self.jobfile
args = None
HERE = os.getcwd();
cr = 'org.apache.uima.ducc.test.randomsleep.FixedSleepCR'
parms = self.read_jobfile()
nthreads = parms['threads']
memory = parms['memory']
pclass = parms['class']
user = parms['user']
machines = parms['machines']
services = parms['services']
jobtype = parms['type']
(process_xmx, bloat_parms) = self.calculate_bloat(memory)
driver_args = []
process_args = []
if ( == 'AE' ):
ae = 'org.apache.uima.ducc.test.randomsleep.FixedSleepAE'
elif ( == 'DD' ):
if ( self.runner.descriptor_as_file ):
dd = self.DUCC_HOME + '/examples/simple/resources/randomsleep/DDSleepDescriptor.xml'
dd = 'org.apache.uima.ducc.test.randomsleep.DDSleepDescriptor'
ae = 'UimaAsFailAgg_' + services
if ( self.runner.use_http ):
plain_broker_url = 'http://' + self.runner.broker_host + ':8081'
plain_broker_url = self.runner.broker_protocol + '://' + self.runner.broker_host + ':' + self.runner.broker_port
cr_parms = '"jobfile=' + self.jobfile + ' compression=' + self.runner.compression + '"'
process_args.append('-DdefaultBrokerURL=' + plain_broker_url )
if ( self.runner.system == 'Darwin' ): # Keep JP / JD processes from stealing focus on Mac
if ( self.runner.jd_uima_log != None ):
driver_args.append(' -Djava.util.logging.config.file=' + self.runner.jd_uima_log)
if ( self.runner.jp_uima_log != None ):
driver_args.append(' -Djava.util.logging.config.file=' + self.runner.jp_uima_log)
jvm_driver_args = self.mkargs(driver_args)
jvm_process_args = self.mkargs(process_args)
print 'jvm_driver_args', jvm_driver_args
print 'jvm_process_args', jvm_process_args
CMD = os.environ['JAVA_HOME'] + '/bin/java'
CMD = CMD + ' ' + self.runner.submit_package + '.cli.DuccJobSubmit'
CMD = CMD + ' --description ' + '"' + self.jobfile + '[' + + ']"'
CMD = CMD + ' --driver_descriptor_CR ' + cr
CMD = CMD + ' --driver_descriptor_CR_overrides ' + cr_parms
CMD = CMD + ' --driver_jvm_args ' + jvm_driver_args
if ( == 'DD' ):
CMD = CMD + ' --process_DD ' + dd
else: # ae and se
CMD = CMD + ' --process_descriptor_AE ' + ae
if ( == 'SE' ):
CMD = CMD + ' --service_dependency UIMA-AS:FixedSleepAE_'+ services + ':' + plain_broker_url
#CMD = CMD + ' --working_directory ' + working_dir
CMD = CMD + ' --process_memory_size ' + memory
CMD = CMD + ' --classpath ' + self.runner.examples_classpath
CMD = CMD + ' --process_jvm_args ' + jvm_process_args
CMD = CMD + ' --process_thread_count ' + nthreads
CMD = CMD + ' --scheduling_class ' + pclass
CMD = CMD + ' --process_per_item_time_max ' + self.runner.process_timeout # in minutes
CMD = CMD + ' --process_initialization_failures_cap ' + self.runner.init_fail_cap
if ( self.runner.init_timeout > 0 ):
CMD = CMD + ' --process_initialization_time_max ' + str(self.runner.init_timeout)
CMD = CMD + ' --environment ' \
+ '"' \
+ ' AE_INIT_TIME=' + str(self.runner.init_time) \
+ ' AE_INIT_RANGE=' + str(self.runner.init_range) \
+ ' AE_INIT_EXIT=' + str(self.runner.ae_init_exit) \
+ ' AE_INIT_ERROR=' + str(self.runner.ae_init_error) \
+ ' AE_RUNTIME_EXIT=' + str(self.runner.ae_runtime_exit) \
+ ' AE_RUNTIME_ERROR=' + str(self.runner.ae_runtime_error) \
+ ' CR_INIT_EXIT=' + str(self.runner.cr_init_exit) \
+ ' CR_INIT_ERROR=' + str(self.runner.cr_init_error) \
+ ' CR_RUNTIME_EXIT=' + str(self.runner.cr_runtime_exit) \
+ ' CR_RUNTIME_ERROR=' + str(self.runner.cr_runtime_error) \
+ bloat_parms \
+ ' CR_GETNEXT_BLOAT=' + self.jdbloat() \
+ ' LD_LIBRARY_PATH=/a/bogus/path' \
+ '"'
if ( self.runner.max_machines == 0 or jobtype == 'reserve' ):
if ( machines != None ):
CMD = CMD + ' --process_deployments_max ' + machines
elif (self.runner.max_machines != -1 ):
CMD = CMD + ' --process_deployments_max ' + self.runner.max_machines
if ( self.runner.observe ):
CMD = CMD + ' --wait_for_completion'
self.user = user
self.cmd = CMD
class ServiceThread(Thread):
def __init__(self, cmd):
self.cmd = cmd
self.terminated = False
def stop_service(self):
self.terminated = True
#os.system('kill -2 ' + str(
def run(self):
print 'Starting service:', self.cmd
self.svc = subprocess.Popen(self.cmd, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
p = self.svc.stdout
while 1:
line = p.readline().strip()
if ( (not line) or ( line == '') ):
if ( self.terminated ):
print 'VERIFY :' + line + ':'
class ServiceStarter(DuccUtil):
def __init__(self, runner):
self.runner = runner
def gen_service(self, svcid, autostart):
os.environ['USER'] = os.environ['LOGNAME'] # make sure I'm me - after submit may not be
if ( self.runner.use_http ):
plain_broker_url = 'http://' + self.broker_host + ':8081'
plain_broker_url = self.broker_protocol + '://' + self.broker_host + ':' + self.broker_port
props = Properties()
props.put('description', 'Test Service ' + svcid)
props.put('process_descriptor_DD', self.DUCC_HOME + '/examples/simple/resources/service/Service_FixedSleep_' + svcid + '.xml')
props.put('process_memory_size', '15')
props.put('service_linger', '60000')
props.put('classpath', self.runner.examples_classpath);
props.put('process_jvm_args', '-Xmx100M -DdefaultBrokerURL=' + plain_broker_url)
props.put('environment', 'AE_INIT_TIME=5000 AE_INIT_RANGE=1000 INIT_ERROR=0 LD_LIBRARY_PATH=/yet/a/nother/dumb/path')
props.put('scheduling_class', 'fixed')
props.put('working_directory', os.getcwd())
props.put('service_ping_arguments', 'broker-jmx-port=' + self.broker_jmx_port)
if ( autostart ):
props.put('autostart', 'true')
svcfile = svcid + '.gen.svc'
return svcfile
def register_service(self, svcid, instances, start, autostart):
os.environ['USER'] = os.environ['LOGNAME'] # make sure I'm me - after submit may not be
svcfile = self.gen_service(svcid, autostart)
CMD = self.DUCC_HOME + '/bin/ducc_services --register ' + svcfile + ' --instances ' + instances
lines = self.popen(CMD)
for line in lines:
line = line.strip()
print 'REGISTER', line
toks = line.split()
if ( (toks[0] == 'Service') and (toks[2] == 'succeeded') ):
print 'Service registered as service', toks[7]
if ( start ):
print 'Starting registered service instance', svcid, 'service id', toks[7]
os.system(self.DUCC_HOME + '/bin/ducc_services --start ' + toks[6])
return toks[6]
print 'Cannot register service', svcid, ':', line
def start_services(self, svcfile):
os.environ['USER'] = os.environ['LOGNAME'] # make sure I'm me - after submit may not be
svcprops = Properties()
all_services = {}
register = svcprops.get('register')
self.registered = {}
if ( register != None ):
register = register.strip()
# make a map with the service id as key and the number of instances as val
toks = register.split()
for t in toks:
t = t.strip()
if ( self.registered.has_key(t) or all_services.has_key(t) ):
print "Duplicate registered service", t
self.registered[t] = None
all_services[t] = None
start = svcprops.get('start')
self.started = {}
if ( start != None ):
start = start.strip()
# make a map with the service id as key and the number of instances as val
toks = start.split()
for t in toks:
t = t.strip()
if ( self.started.has_key(t) ):
print "Duplicate started service", t
if ( not self.registered.has_key(t) ):
print "Trying to start service", t, "but it is not registered."
self.started[t] = None
auto = svcprops.get('autostart')
self.autostarted = {}
if ( auto != None ):
auto = auto.strip()
# make a map with the service id as key and the number of instances as val
toks = auto.split()
for t in toks:
t = t.strip()
if ( self.autostarted.has_key(t) ):
print "Duplicate auto-started service", t
if ( not self.registered.has_key(t) ):
print "Trying to start service", t, "but it is not registered."
self.autostarted[t] = None
for (k, v) in self.registered.items():
instances = svcprops.get("instances_" + k)
if ( instances == None ):
print "Missing instances for registered job", k
print "Registering service", k, "with", instances, "instances"
start = self.started.has_key(k)
autostart = self.autostarted.has_key(k)
service = self.register_service(k, instances, start, autostart)
svcs = self.registered[k]
if ( svcs == None ):
svcs = []
self.registered[k] = svcs
os.system(self.DUCC_HOME + '/bin/ducc_services --query')
def stop_services(self):
os.environ['USER'] = os.environ['LOGNAME'] # make sure I'm me - after submit may not be
for (k, v) in self.registered.items():
for id in v:
print 'Unregistering', id
os.system(self.DUCC_HOME + "/bin/ducc_services --unregister " + id)
class RunDucc(DuccUtil):
def run_batch(self):
counter = 0
running = False
bfile = self.test_dir + '/' + self.batchfile
if ( not os.path.exists(bfile) ):
print 'File', bfile, 'does not exist.'
f = open(bfile)
for line in f:
print '----', line.strip()
if ( line[0] == '#' ):
if ( line[0:2] == 's ' ):
running = True
toks = line.split()
if ( toks[1] == '-c' ):
jobfile = toks[3]
self.compression = toks[2]
jobfile = toks[1]
self.compression = '1'
ducc_process = DuccProcess(self, jobfile)
if ( self.observe ) :
self.queue.put(jobfile) # any old marker will do
if ( not running ):
if ( line[0:2] == 'x ' ):
if ( line[0:6] == '[sleep' ):
#toks = string.translate(line, None, '[]').split()
toks = line.strip().strip('[]').split()
print toks
t = toks[1].strip()
# we're not going to try millisecond sleep - it's probably overdesigned
# to do that anyway.
if ( t[-1:] == 'S' ):
delay = int(t.strip('S'))
elif ( t[-1:] == 'M' ):
delay = int(t.strip('M')) * 60
delay = int(t);
print 'SLEEP', str(delay)
# these next aren't supported in "ducc" mode
if ( line[0:3] == 'qm '):
if ( line[0:3] == 'qj '):
if ( line[0:3] == 'qc '):
def usage(self, msg):
if ( msg != None ):
print msg
print 'Usage:'
print ' [optons]'
print 'Options:'
print ' --AE'
print ' Specifies to run this as a single CR and AE.'
print ''
print ' --DD'
print ' Specifies to run this as CR + CM / AE / CC pipeline.'
print ''
print ' --SE service-startup-config'
print ' Specifies to run this with the AE as a delegate service. The required parameter specifies'
print ' a service startup script.'
print ''
print ' --FILE'
print ' Use DD descriptor in filesystem, not as resource in jar file.'
print ''
print ' --http'
print ' Use HTTP instead of tcp for services'
print ''
print ' -d, --directory dir'
print ' This is the directory with the test files and configuration. Required'
print ''
print ' -b, --batchfile file'
print ' This is the batch file describing the submissions. Required.'
print ''
print ' -i, --init_time milliseconds'
print ' This is the AE initialization minimum time in seconds. Default:', self.init_time
print ''
print ' --init_timeout minutes'
print ' Max time in minutes NOTE MINUTES a process is allowed to initialize. Best used n conjunction with careful choice of'
print ' -i and -r'
print ''
print ' --init_fail_cap number-of-failures.'
print ' This is the max init failures tolerated before the system starts to cap processes. Default:', self.init_fail_cap
print ''
print ' -IB'
print ' The JP will leak in init() until DUCC (hopefully) kills us'
print ''
print ' -PB'
print ' The JP will leak in process() until DUCC (hopefully) kills us'
print ''
print ' -r, --range seconds'
print ' This is the AE initializion time range over base in milliseconds. Default:', self.init_range
print ' Init time is -i value + random[0, -rvalue]'
print ''
print ' -m, --memory_override mem-in-GB'
print ' Use this instead of what is in the props file. Default: None'
print ''
print ' -n, --nmachines_override process_deployments_max'
print ''
print ' -o, --observe'
print ' Specifies that we submit in keepalive mode and observe(watch) the jobs, creating a dir with outputs. Default:', self.observe
print ' If specified, we run verification against the results.'
print ''
print ' -p, --process_timeout sec'
print ' Process timeout, in minutes. Default:', self.process_timeout
print ''
print ' --jd_uima_log log-properties'
print ' If specified, use the indicated properties file for JD UIMA/UIMA-AS logging. Default:', self.jd_uima_log
print ''
print ' --jp_uima_log log-properties'
print ' If specified, use the indicated properties file for JP UIMA/UIMA-AS logging. Default:', self.jp_uima_log
print ''
print ' -q CR Probality it will leak on each getNext. Default', self.cr_getnext_bloat
print ''
print ' -s'
print ' AE Probability that a JP will spontaneously exit during initialization. Default:', self.ae_init_exit
print ''
print ' -t'
print ' AE Probability that a JP will throw an exception during initialization. Default:', self.ae_init_error
print ''
print ' -u'
print ' AE Probability that a JP will spontaneously exit in the process method. Default:', self.ae_runtime_exit
print ''
print ' -v'
print ' AE Probability that a JP will throw an exception in the process method. Default:', self.ae_runtime_error
print ''
print ' -w'
print ' CR Probability that a JD will spontaneously exit during initialization. Default:', self.cr_init_exit
print ''
print ' -x'
print ' CR Probability that a JD will throw an exception during initialization. Default:', self.cr_init_error
print ''
print ' -y'
print ' CR Probability that a JD will spontaneously exit in the process method. Default:', self.cr_runtime_exit
print ''
print ' -z'
print ' CR Probability that a JD will throw an exception in the process method. Default:', self.cr_runtime_error
print ''
print 'We run with DUCC_HOME set to', self.DUCC_HOME
def main(self, argv):
self.test_dir = None
self.batchfile = None
self.observe = False
self.ae_init_exit = 0 # -s int 0-100
self.ae_init_error = 0 # -t int 0-100
self.ae_runtime_exit = 0.0 # -u float
self.ae_runtime_error = 0.0 # -v float
self.cr_init_exit = 0 # -w int 0-100
self.cr_init_error = 0 # -x int 0-100
self.cr_runtime_exit = 0.0 # -y float
self.cr_runtime_error = 0.0 # -z float
self.cr_getnext_bloat = 0 # -q jd leakage
self.init_fail_cap = '99'
self.memory_override = None
self.init_time = 10000
self.init_range = 1000
self.init_timeout = 0
self.process_timeout = str(60*24) # 24 hour default - nothing in current megas will fail on this = 'AE'
self.service_pid = None
self.init_bloat = None
self.process_bloat = None
self.service_startup = None
self.jd_uima_log = None
self.jp_uima_log = None
self.submit_package = 'org.apache.uima.ducc'
self.max_machines = 0
self.use_http = False
self.descriptor_as_file = False
opts, args = getopt.getopt(argv, 'b:d:fi:m:n:op:q:r:s:t:u:v:w:x:y:z:?h', ['AE', 'DD', 'file', 'SE=', 'IB=', 'PB=', 'directory=', 'batchfile=', 'init_time=',
'init_fail_cap=', 'range=', 'memory_override=', 'nmachines=', 'process_timeout=',
'init_timeout=', 'observe'
'jd_uima_log=', 'jp_uima_log=',
print "Unknown option"
for ( o, a ) in opts:
print o, a
if o in ('-d', '--directory'):
self.test_dir = a
elif o in ('-b', '--batchfile'):
self.batchfile = a
elif o in ('-i', '--init_time'):
self.init_time = int(a) * 1000
elif o in ('-i', '--init_fail_cap'):
self.init_fail_cap = a
elif o in ('-r', '--range'):
self.init_range = int(a) * 1000
elif o in ('-m', '--memory_override'):
self.memory_override = a
elif o in ('-n', '--nmachines'):
self.max_machines = int(a) # force ugly failure if not a number
self.max_machines = a
elif o in ('-p', '--process_timeout'):
self.process_timeout = a
elif o in ('-o', '--observe' ):
self.observe = True
elif o in ('--init_timeout' ):
self.init_timeout = int(a)
elif o in ('--jd_uima_log' ):
self.jd_uima_log = a
elif o in ('--jp_uima_log' ):
self.jp_uima_log = a
elif o in ('--AE'): = 'AE'
elif o in ('--DD'): = 'DD'
elif o in ('--SE'): = 'SE'
self.service_startup = a
self.observe = True
elif o in ( '-f', '--file'):
self.descriptor_as_file = True
elif o in ('--http'):
self.use_http = True
elif o in ('--IB'):
self.init_bloat = a
elif o in ('--PB'):
self.process_bloat = a
elif ( o == '-q'):
self.cr_getnext_bloat = int(a)
elif ( o == '-s'):
self.ae_init_exit = int(a)
elif ( o == '-t'):
self.ae_init_error = int(a)
elif ( o == '-u'):
self.ae_runtime_exit = float(a)
elif ( o == '-v'):
self.ae_runtime_error = float(a)
elif ( o == '-w'):
self.cr_init_exit = int(a)
elif ( o == '-x'):
self.cr_init_error = int(a)
elif ( o == '-y'):
self.cr_runtime_exit = float(a)
elif ( o == '-z'):
self.cr_runtime_error = float(a)
elif ( o == '-?'):
print 'Invalud argument:', o
if ( self.test_dir == None ):
self.usage("Missing test_dir")
print 'Running with'
print ' test_dir :', self.test_dir
print ' batchfile :', self.batchfile
print ' style :',
print ' descriptor as file :', self.descriptor_as_file
print ' http :', self.use_http
print ' init-time :', self.init_time / 1000
print ' init-range :', self.init_range / 1000
print ' init-timeout :', self.init_timeout
print ' init-bloat :', self.init_bloat
print ' process-bloat :', self.process_bloat
print ' observe :', self.observe
print ' ae_init_exit :', self.ae_init_exit
print ' ae_init_error :', self.ae_init_error
print ' ae_runtime_exit :', self.ae_runtime_exit
print ' ae_runtime_error :', self.ae_runtime_error
print ' cr_init_exit :', self.cr_init_exit
print ' cr_init_error :', self.cr_init_error
print ' cr_runtime_exit :', self.cr_runtime_exit
print ' cr_runtime_error :', self.cr_runtime_error
print ' cr_getnext_bloat :', self.cr_getnext_bloat
print ' process_timeout :', self.process_timeout
print ' memory_override :', self.memory_override
print ' max_machines :', self.max_machines
print ' jd_uima_log :', self.jd_uima_log
print ' jp_uima_log :', self.jp_uima_log
print ' DUCC_HOME :', self.DUCC_HOME
self.submit_package = 'org.apache.uima.ducc'
cp = []
cp.append(self.DUCC_HOME + '/lib/uima-ducc/examples/*')
cp.append(self.DUCC_HOME + '/apache-uima/lib/*')
cp.append(self.DUCC_HOME + '/apache-uima/apache-activemq/lib/*')
cp.append(self.DUCC_HOME + '/apache-uima/apache-activemq/lib/optional/*')
cp.append(self.DUCC_HOME + '/examples/simple/resources/service')
self.examples_classpath = ':'.join(cp)
if ( == 'SE' ):
if ( self.service_startup == None ):
usage("Missing service startup file")
svcfile = self.test_dir + '/' + self.service_startup
service_starter = ServiceStarter(self);
print 'Pausing a bit'
os.system(self.DUCC_HOME + '/bin/ducc_services --query')
os.environ['CLASSPATH'] = self.DUCC_HOME + "/lib/uima-ducc-cli.jar"
if ( self.observe ):
self.queue = Queue.Queue()
if ( self.observe ):
print 'All threads returned'
if ( == 'SE' ):
# --------------------------------------------------------------------------------
# --------------------------------------------------------------------------------
if __name__ == "__main__":
runducc = RunDucc()