| #!/usr/bin/python |
| |
| # ----------------------------------------------------------------------- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # ----------------------------------------------------------------------- |
| |
| import os |
| import sys |
| import string |
| import subprocess |
| import re |
| import grp |
| import resource |
| import time |
| import platform |
| import httplib |
| |
| from threading import * |
| import traceback |
| import Queue |
| |
| from stat import * |
| from local_hooks import find_other_processes |
| |
| # Catch the annoying problem when the current directory has been changed, e.g. by installing a new release |
| try: |
| os.getcwd() |
| except: |
| print "ERROR getting current directory ... may have been replaced .. try cd'ing to it again" |
| sys.exit(1) |
| |
| # simple bootstrap to establish DUCC_HOME and to set the python path so it can |
| # find the common code in DUCC_HOME/admin |
| # Infer DUCC_HOME from our location - no longer use a (possibly inaccurate) environment variable |
| me = os.path.abspath(__file__) |
| ndx = me.rindex('/') |
| ndx = me.rindex('/', 0, ndx) |
| DUCC_HOME = me[:ndx] # split from 0 to ndx |
| |
| sys.path.append(DUCC_HOME + '/bin') |
| from ducc_base import DuccBase |
| from properties import Properties |
| |
| import db_util as dbu |
| |
| global use_threading |
| use_threading = True |
| |
| class ThreadWorker(Thread): |
| def __init__(self, queue, outlock): |
| Thread.__init__(self) |
| self.queue = queue |
| self.outlock = outlock |
| |
| def run(self): |
| while True: |
| (method, args) = self.queue.get() |
| |
| if ( args == 'quit' ): |
| self.queue.task_done() |
| return; |
| |
| try: |
| response = method(args) |
| if ( response != None and len(response) > 0): |
| self.outlock.acquire() |
| for l in response: |
| print ' '.join(l) |
| self.outlock.release() |
| except: |
| print "Exception executing", str(method), str(args) |
| traceback.print_exc() |
| |
| self.queue.task_done() |
| |
| class ThreadPool: |
| def __init__(self, size): |
| if ( use_threading ): |
| self.size = size |
| self.queue = Queue.Queue() |
| outlock = Lock() |
| |
| MAX_NPSIZE = 100 |
| if ( self.size > MAX_NPSIZE ): |
| self.size = MAX_NPSIZE |
| |
| for i in range(self.size): |
| worker = ThreadWorker(self.queue, outlock) |
| worker.start() |
| |
| def invoke(self, method, *args): |
| if ( use_threading ): |
| self.queue.put((method, args)) |
| else: |
| response = method(args) |
| if ( response != None and len(response) > 0): |
| for l in response: |
| print ' '.join(l) |
| |
| def quit(self): |
| if ( use_threading ): |
| for i in range(self.size): |
| self.queue.put((None, 'quit')) |
| |
| print "Waiting for Completion" |
| self.queue.join() |
| print "All threads returned" |
| else: |
| print 'All Work completed' |
| |
| class DuccUtil(DuccBase): |
| |
| def update_properties(self): |
| |
| if ( self.ducc_properties == None ): |
| DuccBase.read_properties(self) |
| |
| self.ssh_enabled = self.ducc_properties.get('ducc.ssh') |
| self.duccling = self.ducc_properties.get('ducc.agent.launcher.ducc_spawn_path') |
| |
| # self.broker_url = self.ducc_properties.get('ducc.broker.url') |
| self.broker_protocol = self.ducc_properties.get('ducc.broker.protocol') |
| self.broker_host = self.ducc_properties.get('ducc.broker.hostname') |
| self.broker_port = self.ducc_properties.get('ducc.broker.port') |
| self.broker_jmx_port = self.ducc_properties.get('ducc.broker.jmx.port') |
| self.broker_decoration = self.ducc_properties.get('ducc.broker.url.decoration') |
| self.broker_url = self.broker_protocol + '://' + self.broker_host + ':' + self.broker_port |
| self.agent_jvm_args = self.ducc_properties.get('ducc.agent.jvm.args') |
| self.ws_jvm_args = self.ducc_properties.get('ducc.ws.jvm.args') |
| self.pm_jvm_args = self.ducc_properties.get('ducc.pm.jvm.args') |
| self.rm_jvm_args = self.ducc_properties.get('ducc.rm.jvm.args') |
| self.sm_jvm_args = self.ducc_properties.get('ducc.sm.jvm.args') |
| self.or_jvm_args = self.ducc_properties.get('ducc.orchestrator.jvm.args') |
| |
| |
| if ( self.broker_decoration == '' ): |
| self.broker_decoration = None |
| |
| if ( self.broker_decoration != None ): |
| self.broker_url = self.broker_url + '?' + self.broker_decoration |
| |
| if ( self.webserver_node == None ): |
| self.webserver_node = self.localhost |
| |
| def merge_properties(self): |
| # first task, always, merge the properties so subsequent code can depend on their validity. |
| base_props = DUCC_HOME + '/resources/default.ducc.properties' |
| site_props = DUCC_HOME + '/resources/site.ducc.properties' |
| run_props = DUCC_HOME + '/resources/ducc.properties' |
| merger = DUCC_HOME + '/admin/ducc_props_manager' |
| CMD = [merger, '--merge', base_props, '--with', site_props, '--to', run_props] |
| CMD = ' '.join(CMD) |
| print 'Merging', base_props, 'with', site_props, 'into', run_props |
| os.system(CMD) |
| |
| |
| def db_configure(self): |
| dbhost = self.ducc_properties.get('ducc.database.host') |
| if ( dbhost == self.db_disabled ): |
| self.db_bypass = True |
| return; |
| else: |
| self.db_bypass = False |
| |
| dbprops = Properties() |
| dbprops.load(self.DUCC_HOME + '/resources.private/ducc.private.properties') |
| self.db_password = dbprops.get('db_password') |
| if ( self.db_password == None ): |
| print "bypassing database because no password is set." |
| self.db_bypass = True |
| |
| # does the database process exist? |
| def db_process_alive(self): |
| pidfile = self.DUCC_HOME + '/state/cassandra.pid' |
| |
| if ( not os.path.exists(pidfile) ): |
| return False |
| |
| f = open(self.DUCC_HOME + '/state/cassandra.pid') |
| pid = f.read(); |
| f.close() |
| answer = [] |
| if ( self.system == 'Darwin'): |
| ps = 'ps -eo user,pid,comm,args ' + pid |
| else: |
| ps = 'ps -eo user:14,pid,comm,args ' + pid |
| lines = self.popen(ps) |
| |
| for line in lines: |
| line = line.strip() |
| if (pid in line and 'cassandra' in line): |
| return True |
| return False |
| |
| # contact the database and see how useful it seems to be |
| def db_alive(self, retry=10): |
| if ( self.db_bypass == True ): |
| return True |
| |
| dbnode = self.ducc_properties.get('ducc.database.host') |
| if ( dbnode == None ): |
| print 'No database location defined.' |
| return False |
| |
| pidfile = self.DUCC_HOME + '/state/cassandra.pid' |
| if ( not os.path.exists(pidfile) ): |
| print 'Database pid file does not exist. Checking DB connectivity.' |
| |
| # get our log4j config into the path to shut up noisy logging |
| os.environ['CLASSPATH'] = os.environ['CLASSPATH'] + ':' + self.DUCC_HOME + '/resources' |
| |
| CMD = [self.java(), 'org.apache.uima.ducc.database.DbAlive', dbnode, 'ducc', self.db_password, str(retry)] |
| |
| CMD = ' '.join(CMD) |
| rc = os.system(CMD) |
| if ( rc == 0 ): |
| return True |
| else: |
| return False |
| |
| |
| def db_start(self): |
| |
| # bypass all of this for the initial delivery |
| if ( self.db_bypass == True) : |
| print ' (Bypass database start because ducc.database.host =', self.db_disabled + ')' |
| return True |
| |
| print 'Starting database' |
| dbnode = self.ducc_properties.get('ducc.database.host') |
| dbu.update_cassandra_config(self.DUCC_HOME, dbnode) |
| |
| max_attempts = 5 |
| attempt = 0 |
| while attempt < max_attempts: |
| lines = self.ssh(dbnode, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c', 'db', '--nodup', "'") |
| # we'll capture anything that the python shell spews because it may be useful, and then drop the |
| # pipe when we see a PID message |
| |
| while True: |
| try: |
| line = lines.readline().strip() |
| except: |
| break |
| #print '[]', line |
| |
| if ( not line ): |
| break |
| if ( line == '' ): |
| break |
| if ( line == 'OK' ): |
| print 'GOT OK from db start' |
| lines.close(); |
| |
| print 'waiting for database to start' |
| if ( self.db_alive() ): |
| return True |
| |
| attempt = attempt + 1 |
| print 'Did not connect to database, retrying (', attempt, 'of', max_attempts, ')' |
| |
| return False |
| |
| def db_stop(self): |
| |
| if ( self.db_bypass == True) : |
| print ' (Bypass database stop because ducc.database.host =', self.db_disabled + ')' |
| return True |
| |
| pidfile = self.DUCC_HOME + '/state/cassandra.pid' |
| if ( os.path.exists(pidfile) ): |
| # for cassandra, just send it a terminate signal. a pidfile is written on startup |
| CMD = ['kill', '-TERM', '`cat ' + pidfile + '`'] |
| CMD = ' '.join(CMD) |
| os.system(CMD) |
| |
| def find_netstat(self): |
| # don't you wish people would get together on where stuff lives? |
| if ( os.path.exists('/sbin/netstat') ): |
| return '/sbin/netstat' |
| if ( os.path.exists('/usr/sbin/netstat') ): |
| return '/usr/sbin/netstat' |
| if ( os.path.exists('/bin/netstat') ): |
| return '/bin/netstat' |
| if ( os.path.exists('/sbin/netstat') ): |
| return '/usr/bin/netstat' |
| print 'Cannot find netstat' |
| return None |
| |
| def is_amq_active(self): |
| netstat = self.find_netstat() |
| if ( netstat == None ): |
| print "Cannot determine if ActiveMq broker is alive." |
| return false |
| |
| lines = self.ssh(self.broker_host, True, netstat, '-an') |
| # |
| # look for lines like this with the configured port in the 4th token, and |
| # ending with LISTEN: |
| # |
| # tcp 0 0 :::61616 :::* LISTEN |
| for line in lines: |
| toks = line.split() |
| #print '[]', line |
| if ( toks[-1] == 'LISTEN' ): |
| port = toks[3] |
| if (port.endswith(self.broker_port)): |
| return True |
| return False |
| |
| def stop_broker(self): |
| |
| broker_host = self.ducc_properties.get('ducc.broker.hostname') |
| broker_home = self.ducc_properties.get('ducc.broker.home') |
| broker_name = self.ducc_properties.get('ducc.broker.name') |
| broker_jmx = self.ducc_properties.get('ducc.broker.jmx.port') |
| here = os.getcwd() |
| CMD = broker_home + '/bin/activemq' |
| CMD = CMD + ' stop --all' |
| CMD = CMD + ' --jmxurl service:jmx:rmi:///jndi/rmi://' + broker_host + ':' + broker_jmx + '/jmxrmi' |
| CMD = CMD + ' ' + broker_name |
| CMD = 'JAVA_HOME=' + self.java_home() + ' ' + CMD |
| print '--------------------', CMD |
| lines = self.ssh(broker_host, True, CMD) |
| for l in lines: |
| pass # throw away junk from ssh |
| |
| |
| def nohup(self, cmd, showpid=True): |
| # Skip use of ssh? |
| if cmd[0] == "ssh" and 'false' == self.ssh_enabled: |
| cmd = cmd[2:] |
| cmd = ' '.join(cmd) |
| # print '**** nohup', cmd, '****' |
| devnw = open(os.devnull, 'w') |
| devnr = open(os.devnull, 'r') |
| ducc = subprocess.Popen(cmd, shell=True, stdin=devnr, stdout=devnw, stderr=devnw) |
| devnr.close() |
| devnw.close() |
| if ( showpid ) : |
| print 'PID', ducc.pid |
| |
| # like popen, only it spawns via ssh |
| # Skip use of ssh? |
| # NOTE: Current callers always have do_wait True |
| def ssh(self, host, do_wait, *CMD): |
| cmd = ' '.join(CMD) |
| # Some callers quote the string which is OK for ssh but not the direct call |
| if cmd[0] == "'" and cmd[-1] == "'": |
| cmd = cmd[1:len(cmd)-2] |
| if ( do_wait ): |
| if 'false' == self.ssh_enabled: |
| return self.popen(cmd) |
| return self.popen('ssh -q -o BatchMode=yes -o ConnectTimeout=10', host, cmd) |
| else: |
| if 'false' == self.ssh_enabled: |
| return self.spawn(cmd) |
| return self.spawn('ssh -q -o BatchMode=yes -o ConnectTimeout=10', host, cmd) |
| |
| |
| def set_classpath(self): |
| DH = self.DUCC_HOME + '/' |
| LIB = DH + 'lib/' |
| |
| local_jars = self.ducc_properties.get('ducc.local.jars') #local mods |
| |
| CLASSPATH = '' |
| |
| if ( local_jars != None ): |
| extra_jars = local_jars.split() |
| for j in extra_jars: |
| CLASSPATH = CLASSPATH + ':' + LIB + j |
| |
| CLASSPATH = CLASSPATH + ':' + DH + 'apache-uima/lib/*' |
| CLASSPATH = CLASSPATH + ':' + DH + 'apache-uima/apache-activemq/lib/*' |
| CLASSPATH = CLASSPATH + ':' + DH + 'apache-uima/apache-activemq/lib/optional/*' |
| CLASSPATH = CLASSPATH + ':' + LIB + 'apache-commons/*' |
| CLASSPATH = CLASSPATH + ':' + LIB + 'guava/*' |
| CLASSPATH = CLASSPATH + ':' + LIB + 'google-gson/*' |
| CLASSPATH = CLASSPATH + ':' + LIB + 'apache-log4j/*' |
| CLASSPATH = CLASSPATH + ':' + LIB + 'apache-camel/*' |
| CLASSPATH = CLASSPATH + ':' + LIB + 'joda-time/*' |
| CLASSPATH = CLASSPATH + ':' + LIB + 'springframework/*' |
| CLASSPATH = CLASSPATH + ':' + LIB + 'jna/*' |
| CLASSPATH = CLASSPATH + ':' + LIB + 'libpam4j/*' |
| CLASSPATH = CLASSPATH + ':' + LIB + 'uima-ducc/*' |
| CLASSPATH = CLASSPATH + ':' + LIB + 'cassandra/*' |
| |
| # CLASSPATH = CLASSPATH + ':' + DH + 'resources' UIMA-4168 Use API, not classpath to configure log4j |
| |
| # more are added to some components in ducc.py, e.g. |
| # apache-activemq/lib/optional, jetty from ws lib, jsp, http- client |
| # |
| os.environ['CLASSPATH'] = CLASSPATH |
| |
| def format_classpath(self, cp): |
| strings = cp.split(':') |
| for s in strings: |
| print s |
| |
| def check_clock_skew(self, localdate): |
| user = os.environ['LOGNAME'] |
| bypass = (user != 'ducc') |
| |
| if bypass: |
| tag = 'NOTE' |
| else: |
| tag = 'NOTOK' |
| |
| # Check clock skew |
| ok = True |
| acceptable_skew = 300 |
| skew = abs(long(localdate) - long(time.time())) |
| if ( skew > (acceptable_skew) ): |
| ok = False |
| print tag, 'Clock skew[', skew, '] on', os.uname()[1], ". Remote time is", time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.localtime()) |
| return ok or bypass |
| |
| def set_duccling_version(self): |
| CMD = self.duccling + ' -v >' + self.DUCC_HOME + '/state/duccling.version' |
| os.system(CMD) |
| print 'Set ducc_ling version from', self.localhost, ':', CMD |
| |
| def verify_limits(self): |
| ret = True |
| if ( self.system == 'Darwin' ): |
| return ret # on mac, just use what you have |
| |
| proclimit = 20000 |
| filelimit = 8192 |
| |
| (softnproc , hardnproc) = resource.getrlimit(resource.RLIMIT_NPROC) |
| (softnfiles, hardnfiles) = resource.getrlimit(resource.RLIMIT_NOFILE) |
| if ( softnproc < hardnproc ): |
| try: |
| resource.setrlimit(resource.RLIMIT_NPROC, (hardnproc, hardnproc)) |
| except: |
| print 'NOTOK: could not set soft RLIMIT_NPROC up to the hard limit' |
| ret = False |
| |
| if ( softnfiles < hardnfiles ): |
| try: |
| resource.setrlimit(resource.RLIMIT_NOFILE, (hardnfiles, hardnfiles)) |
| except: |
| print 'NOTOK: could not set soft RLIMIT_NOFILES up to the hard limit' |
| ret = False |
| |
| (softnproc , hardnproc) = resource.getrlimit(resource.RLIMIT_NPROC) |
| (softnfiles, hardnfiles) = resource.getrlimit(resource.RLIMIT_NOFILE) |
| |
| if ( softnproc < proclimit ): |
| print 'WARN: Soft limit RLIMIT_NPROC is too small at', softnproc, '(<', proclimit, ' ). DUCC may be unable to create sufficient threads.' |
| |
| if ( softnfiles < filelimit ): |
| print 'WARN: Soft limit RLIMIT_NOFILES is too small at', softnfiles, '(<', filelimit, '). DUCC may be unable to open sufficient files or sockets.' |
| |
| return ret |
| |
| def verify_jvm(self): |
| jvm = self.java() |
| CMD = jvm + ' -version > /dev/null 2>&1' |
| rc = os.system(CMD) |
| if ( rc != 0 ): |
| print 'NOTOK', CMD, 'returns', int(rc), '. Must return rc 0. Startup cannot continue.' |
| return False |
| return True |
| |
| # Exit if this is not the head node. Ignore the domain as uname sometimes drops it. |
| def verify_head(self): |
| head = self.ducc_properties.get("ducc.head").split('.')[0] |
| local = self.localhost.split('.')[0] |
| if local != head: |
| print ">>> ERROR - this script must be run from the head node" |
| sys.exit(1); |
| |
| # |
| # Verify the viability of ducc_ling. |
| # Returns a tuple (viable, elevated, safe) |
| # where 'viable' is a boolean indicating whether the ducc_ling is viable (exists and is correct version) |
| # If this is false, the other two values are meaningless |
| # |
| # 'elevated' indicates whether priveleges are at least partially elevated |
| # 'safe' indicates whether ducc_ling is safe (elevated privileges and correct permissions) |
| # The caller will evaluate these and take appriopriate action |
| # |
| def verify_duccling(self): |
| viable = True |
| elevated = False |
| safe = False |
| |
| dl = self.duccling |
| path = os.path.dirname(os.path.abspath(dl)) |
| |
| |
| if ( not (os.path.exists(dl) and os.access(dl, os.X_OK)) ): |
| print dl, 'does not exist or is not executable.' |
| viable = False |
| |
| path = os.path.dirname(os.path.abspath(dl)) |
| dl = path + '/ducc_ling' |
| |
| dl_stat = os.stat(dl) # dl_stat is stat for ducc_ling |
| dl_mode = dl_stat.st_mode |
| |
| if ( (dl_mode & S_ISUID) != S_ISUID): |
| if ( os.environ['LOGNAME'] == 'ducc' ): |
| print 'ducc_ling module', dl, ': setuid bit is not set. Processes will run as user ducc' |
| elevated = False |
| file_safe = True |
| dir_safe = True |
| own_safe = True |
| else: |
| elevated = True |
| file_safe = False |
| dir_safe = False |
| own_safe = False |
| |
| |
| if ( elevated ): |
| |
| # |
| # if setuid bit is set, all this MUST be true or we won't mark ducc_ling safe: |
| # file permissions are 750 |
| # dir permissions are 700 |
| # owenership is root.ducc |
| # |
| dl_perm = oct(dl_mode & (S_IRWXU | S_IRWXG | S_IRWXO)) |
| expected = oct(0750) |
| if ( dl_perm != expected ): |
| print dl, ': Invalid execution bits', dl_perm, 'should be', expected |
| else: |
| file_safe = True |
| |
| dir_stat = os.stat(path) # dir_stat is stat for ducc_ling |
| dir_mode = dir_stat.st_mode |
| |
| dir_perm = oct(dir_mode & (S_IRWXU | S_IRWXG | S_IRWXO)) |
| expected = oct(0700) |
| if ( dir_perm != expected ): |
| print 'ducc_ling', path, ': Invalid directory permissions', dir_perm, 'should be', expected |
| else: |
| dir_safe = True |
| |
| try: |
| grpinfo = grp.getgrnam('ducc') |
| duccgid = grpinfo.gr_gid |
| |
| if ( (dl_stat.st_uid != 0) or (dl_stat.st_gid != duccgid) ): |
| print 'ducc_ling module', dl, ': Invalid ownership. Should be root.ducc' |
| else: |
| own_safe = True |
| except: |
| print 'ducc_ling group "ducc" cannot be found.' |
| |
| safe = file_safe and dir_safe and own_safe |
| |
| # A last viability check, do versions match? This also runs it, proving it |
| # can execute in this environment. |
| lines = self.popen(self.duccling + ' -v') |
| version_from_head = lines.readline().strip(); |
| toks = version_from_head.split() |
| version_from_head = ' '.join(toks[0:4]) |
| |
| version_file = self.DUCC_HOME + '/state/duccling.version'; |
| if ( os.path.exists(version_file) ): |
| verfile = open(version_file) |
| for line in verfile: |
| line = line.strip(); |
| toks = line.split(); |
| line = ' '.join(toks[0:4]) |
| if ( line != version_from_head ): |
| print "Mismatched ducc_ling versions:" |
| print "ALERT: Version on Agent Node:", version_from_head |
| print "ALERT: Version on Ducc Head:", line |
| viable = False |
| verfile.close() |
| else: |
| print "NOTE: ducc_ling version file missing, cannot verify version." |
| |
| # leave the decisions to the caller |
| return (viable, elevated, safe) |
| |
| # Apply these rules to determine if ducc_ling is installed ok |
| # |
| # Caller Elevated Protected (safe) Action |
| # -------- -------- --------- ------ |
| # ducc Y Y OK |
| # Y N Fail |
| # |
| # N Y (by def) OK |
| # |
| # ~ducc Y N (by def) Fail |
| # N Y (by def) OK, Note |
| def duccling_ok(self, viable, elevated, safe): |
| |
| if ( not viable ): |
| return False |
| |
| user = os.environ['LOGNAME'] |
| |
| if ( user == 'ducc' ): |
| if ( elevated ): |
| return safe |
| else: |
| if ( elevated ): |
| return False |
| print 'Note: Running unprivileged ducc_ling. Process will run as user', user |
| |
| return True |
| |
| def ssh_ok(self, node, line): |
| spacer = ' ' |
| messages = [] |
| if ( line.startswith("Permission denied") ): |
| messages.append(' ') |
| messages.append(spacer + "ALERT: Passwordless SSH is not configured correctly for node " + node) |
| messages.append(spacer + "ALERT: SSH returns '" + line + "'") |
| return messages |
| |
| if ( line.startswith("Host key verification failed") ): |
| messages.append(' ') |
| messages.append(spacer + "ALERT: Passwordless SSH is not configured correctly for node " + node) |
| messages.append(spacer + "ALERT: SSH returns '" + line + "'") |
| return messages |
| |
| if ( line.find("Connection refused") >= 0 ): |
| messages.append(' ') |
| messages.append(spacer + "ALERT: SSH is not not enabled on node " + node) |
| messages.append(spacer + "ALERT: SSH returns '" + line + "'") |
| return messages |
| |
| if ( line.find("Connection timed") >= 0 ): |
| messages.append(' ') |
| messages.append(spacer + "\nALERT: SSH did not respond with timeout of 10 secnds " + node) |
| messages.append(spacer + "ALERT: SSH returns '" + line + "'") |
| return messages |
| |
| if ( line.find("No route") >= 0 ): |
| messages.append(' ') |
| messages.append(spacer + 'ALERT: SSH cannot connect to host.') |
| messages.append(spacer + "ALERT: SSH returns '" + line + "'") |
| return messages |
| |
| return None |
| |
| # |
| # Input is array lines from ps command looking for ducc processes owned this user. |
| # Output is list of dictionaries, where each dictionary describes a ducc process. |
| # |
| # If no ducc processes are found here the list is empty. |
| # |
| # The caller executes the 'ps' command and knows the node this is for. |
| # |
| def find_ducc_process(self, node): |
| |
| answer = [] |
| if ( self.system == 'Darwin'): |
| ps = 'ps -eo user,pid,comm,args' |
| else: |
| ps = 'ps -eo user:14,pid,comm,args' |
| resp = self.ssh(node, True, ps) |
| ok = True |
| |
| while True: |
| line = resp.readline().strip() |
| if ( line.startswith('PID')): |
| continue |
| |
| ssh_errors = self.ssh_ok(line, node) |
| if ( ssh_errors != None ): |
| for m in ssh_errors: |
| print m |
| ok = False |
| continue |
| |
| # from here on, assume no error |
| if ( not line ): |
| break |
| |
| toks = line.split() |
| if ( len(toks) < 4): |
| continue |
| |
| user = toks[0] |
| pid = toks[1] |
| procname = toks[2] |
| fullargs = toks[3:] |
| |
| if ( not ('java' in procname) ): |
| continue |
| |
| cont = False |
| for tok in fullargs: |
| if ( tok.startswith('-Dducc.deploy.components=') ): |
| cmp = tok.split('=') |
| dp = (cmp[1], pid, user) |
| answer.append(dp) |
| cont = True |
| break |
| if ( tok.startswith('-DDUCC_BROKER_CREDENTIALS_FILE=') ): |
| dp = ('broker', pid, user) |
| answer.append(dp) |
| cont = True |
| break |
| if ( cont ): # stupid python only continues out of inner loop |
| continue |
| if fullargs[-1] == 'org.apache.cassandra.service.CassandraDaemon': |
| dp = ('database', pid, user) |
| answer.append(dp) |
| continue |
| |
| # Look for site-specific processes |
| other_processes = find_other_processes(pid, user, line) |
| if ( type(other_processes) is list ): |
| if ( len(other_processes) > 0 ): |
| answer = answer + other_processes |
| else: |
| print 'Invalid response from \'find_other_processes\':', other_processes |
| |
| return (ok, answer) |
| |
| # |
| # Given the name of a file containing ducc nodes, a ducc user (usually 'ducc' unless you're running |
| # as yourself for test), find all ducc processes owned by this user and print them to the console. |
| # |
| def find_ducc(self, nodefile, user): |
| if ( nodefile == None ): |
| nodefile = self.DUCC_HOME + '/resources/ducc.nodes' |
| |
| if ( not os.path.exists(nodefile) ): |
| print 'Nodefile', nodefile, 'does not exist or cannot be read.' |
| sys.exit(1) |
| |
| answer = {} |
| nodes = [] |
| f = open(nodefile) |
| for node in f: |
| node = node.strip() |
| if ( not node ): |
| continue |
| if ( node.startswith('#') ): |
| continue |
| nodes.append(node) |
| |
| if ( self.webserver_node != 'localhost' ): # might be configured somewhere else |
| nodes.append(self.webserver_node) |
| |
| for node in nodes: |
| data = self.find_ducc_process(node, user) |
| answer[node] = data |
| |
| return answer |
| |
| def kill_process(self, node, proc, signal): |
| lines = self.ssh(node, True, 'kill', signal, proc[1]) |
| for l in lines: |
| pass # throw away the noise |
| |
| def clean_shutdown(self): |
| DUCC_JVM_OPTS = ' -Dducc.deploy.configuration=' + self.DUCC_HOME + "/resources/ducc.properties " |
| DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -DDUCC_HOME=' + self.DUCC_HOME |
| DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -Dducc.head=' + self.ducc_properties.get('ducc.head') |
| self.spawn(self.java(), DUCC_JVM_OPTS, 'org.apache.uima.ducc.common.main.DuccAdmin', '--killAll') |
| |
| def get_os_pagesize(self): |
| lines = self.popen('/usr/bin/getconf', 'PAGESIZE') |
| return lines.readline().strip() |
| |
| def show_ducc_environment(self): |
| global use_threading |
| # |
| # Print the java version |
| # |
| response = [] |
| jvm = self.ducc_properties.get('ducc.jvm') |
| check_java = True |
| if ( jvm == None ): |
| response.append('WARNING: No jvm configured. Default is used.') |
| jvm = 'java' |
| else: |
| response.append('ENV: Java is configured as: ' + jvm) |
| if ( not os.path.exists(jvm) ): |
| print 'NOTOK: configured jvm cannot be found:', jvm |
| check_java = False |
| |
| if ( check_java ): |
| print 'JVM:', jvm |
| lines = self.popen(jvm + ' -fullversion') |
| for line in lines: |
| response.append('ENV ' + line.strip()) |
| |
| |
| response.append('ENV: Threading enabled: ' + str(use_threading)) |
| # |
| # Get the total memory for the node |
| # |
| if ( self.system != 'Darwin' ): |
| meminfo = Properties() |
| meminfo.load('/proc/meminfo') |
| mem = meminfo.get('MemTotal') |
| if ( mem.endswith('kB') ): |
| toks = mem.split(' ') |
| mem = str(int(toks[0]) / (1024*1024)) + ' gB' |
| response.append('MEM: memory is ' + mem) |
| |
| # |
| # Get the operating system information |
| # |
| response.append('ENV: system is ' + self.system) |
| |
| return response |
| |
| # |
| # Resolve the 'path' relative to the path 'relative_to' |
| # |
| def resolve(self, path, relative_to): |
| if ( not path.startswith('/') ): |
| (head, tail) = os.path.split(os.path.abspath(relative_to)) |
| path = head + '/' + path |
| return path |
| |
| # |
| # Read the nodefile, recursing into 'imports' if needed, returning a |
| # map. The map is keyed on filename, with each entry a list of the nodes. |
| # |
| def read_nodefile(self, nodefile, ret): |
| #print 'READ_NODEFILE:', nodefile, ret |
| n_nodes = 0 |
| if ( os.path.exists(nodefile) ): |
| nodes = [] |
| f = open(nodefile) |
| for node in f: |
| node = node.strip() |
| if ( not node ): |
| continue |
| if ( node.startswith('#') ): |
| continue |
| if ( node.startswith('import ') ): |
| toks = node.split(' ') |
| newfile = toks[1] |
| newfile = self.resolve(newfile, nodefile) # resolve newfile relative to nodefile |
| (count, ret) = self.read_nodefile(newfile, ret) |
| n_nodes = n_nodes + count |
| continue |
| nodes.append(node) |
| n_nodes = n_nodes + 1 |
| ret[nodefile] = nodes |
| else: |
| print 'Cannot read nodefile', nodefile |
| ret[nodefile] = None |
| |
| #print 'RETURN', n_nodes, nodefile, ret |
| return (n_nodes, ret) |
| |
| def compare_nodes(self, n1, n2): |
| |
| if ( n1 == n2 ): # exact match - covers both short and both long |
| return True |
| |
| if ( n1.find('.') >= 0 ): # shortened n1 == n2? |
| t1 = n1.split('.') |
| n1A = t1[0] |
| if ( n1A == n2 ): |
| return True |
| |
| if ( n2.find('.') >= 0 ): # n1 == shortened n2? |
| t2 = n2.split('.') |
| n2A = t2[0] |
| if ( n1 == n2A ): |
| return True |
| return False |
| |
| def verify_class_configuration(self, allnodes, verbose): |
| |
| print 'allnodes', allnodes |
| answer = True |
| # first, find the class definition |
| classfile = self.ducc_properties.get('ducc.rm.class.definitions') |
| |
| print 'Class definition file is', classfile |
| CMD = self.jvm |
| CMD = CMD + " -DDUCC_HOME=" + self.DUCC_HOME |
| CMD = CMD + " org.apache.uima.ducc.common.NodeConfiguration " |
| CMD = CMD + " -n " + allnodes |
| CMD = CMD + " -c " + classfile |
| if ( verbose ): |
| CMD = CMD + " -p " + classfile |
| print CMD |
| else: |
| CMD = CMD + " " + classfile |
| rc = os.system(CMD) |
| if ( rc == 0 ): |
| print "OK: Class and node definitions validated." |
| else: |
| print "NOTOK: Cannot validate class and/or node definitions." |
| |
| return (rc == 0) |
| |
| def disable_threading(self): |
| global use_threading |
| use_threading = False |
| |
| def installed(self): |
| head = self.ducc_properties.get('ducc.head') |
| if ( head == '<head-node>' ): |
| return False |
| return True |
| |
| def __init__(self, merge=False): |
| global use_threading |
| DuccBase.__init__(self, merge) |
| |
| self.db_disabled = '--disabled--' |
| self.duccling = None |
| self.broker_url = 'tcp://localhost:61616' |
| self.broker_protocol = 'tcp' |
| self.broker_host = 'localhost' |
| self.broker_port = '61616' |
| self.default_components = ['rm', 'pm', 'sm', 'or', 'ws', 'db', 'broker'] |
| self.default_nodefiles = [self.DUCC_HOME + '/resources/ducc.nodes'] |
| |
| if ( self.localhost == self.ducc_properties.get("ducc.head")): |
| self.is_ducc_head = True |
| |
| os.environ['DUCC_NODENAME'] = self.localhost # to match java code's implicit propery so script and java match |
| |
| self.pid_file = self.DUCC_HOME + '/state/ducc.pids' |
| self.set_classpath() |
| self.os_pagesize = self.get_os_pagesize() |
| self.update_properties() |
| |
| self.db_configure() |
| |
| |
| manage_broker = self.ducc_properties.get('ducc.broker.automanage') |
| self.automanage = False |
| if (manage_broker in ('t', 'true', 'T', 'True')) : |
| self.automanage = True |
| |
| py_version = platform.python_version().split('.') |
| if ( int(py_version[0]) > 2 ): |
| print "Warning, only Python Version 2 is supported." |
| if ( int(py_version[1]) < 4 ): |
| print "Python must be at least at version 2.4." |
| sys.exit(1) |
| if ( int(py_version[1]) < 6 ): |
| use_threading = False |
| |
| |
| if __name__ == "__main__": |
| util = DuccUtil() |
| |