blob: f1c5a436445027f45a403eeb61d475060418f1a5 [file] [log] [blame]
#!/usr/bin/python
# -----------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# -----------------------------------------------------------------------
import os
import sys
import string
import subprocess
import re
import grp
import pwd
import resource
import time
import platform
import httplib
from threading import *
import traceback
import Queue
from stat import *
from local_hooks import find_other_processes
# Catch the annoying problem when the current directory has been changed, e.g. by installing a new release
try:
os.getcwd()
except:
print "ERROR getting current directory ... may have been replaced .. try cd'ing to it again"
sys.exit(1)
# simple bootstrap to establish DUCC_HOME and to set the python path so it can
# find the common code in DUCC_HOME/admin
# Infer DUCC_HOME from our location - no longer use a (possibly inaccurate) environment variable
me = os.path.abspath(__file__)
ndx = me.rindex('/')
ndx = me.rindex('/', 0, ndx)
DUCC_HOME = me[:ndx] # split from 0 to ndx
sys.path.append(DUCC_HOME + '/bin')
from ducc_base import DuccBase
from properties import Properties
from ducc_logger import DuccLogger
logger = DuccLogger()
import db_util as dbu
global use_threading
use_threading = True
# The "ducc" userid is the user that installed DUCC and created this file.
# If the admin dir's permissions were 700 then could assume the current user is the ducc user
def find_ducc_uid():
my_file = os.path.abspath(__file__)
my_stat = os.stat(my_file)
my_uid = my_stat.st_uid
pwdinfo = pwd.getpwuid(my_uid)
return pwdinfo.pw_name
class ThreadWorker(Thread):
def __init__(self, queue, outlock):
Thread.__init__(self)
self.queue = queue
self.outlock = outlock
def run(self):
while True:
(method, args) = self.queue.get()
if ( args == 'quit' ):
self.queue.task_done()
return;
try:
response = method(args)
if ( response != None and len(response) > 0):
self.outlock.acquire()
for l in response:
print ' '.join(l)
self.outlock.release()
except:
print "Exception executing", str(method), str(args)
traceback.print_exc()
self.queue.task_done()
class ThreadPool:
def __init__(self, size):
if ( use_threading ):
self.size = size
self.queue = Queue.Queue()
outlock = Lock()
MAX_NPSIZE = 100
if ( self.size > MAX_NPSIZE ):
self.size = MAX_NPSIZE
for i in range(self.size):
worker = ThreadWorker(self.queue, outlock)
worker.start()
def invoke(self, method, *args):
if ( use_threading ):
self.queue.put((method, args))
else:
response = method(args)
if ( response != None and len(response) > 0):
for l in response:
print ' '.join(l)
def quit(self):
if ( use_threading ):
for i in range(self.size):
self.queue.put((None, 'quit'))
print "Waiting for Completion"
self.queue.join()
print "All threads returned"
else:
print 'All Work completed'
class DuccUtil(DuccBase):
def update_properties(self):
if ( self.ducc_properties == None ):
DuccBase.read_properties(self)
self.ssh_enabled = self.ducc_properties.get('ducc.ssh')
self.duccling = self.ducc_properties.get('ducc.agent.launcher.ducc_spawn_path')
self.ducc_uid = find_ducc_uid()
# self.broker_url = self.ducc_properties.get('ducc.broker.url')
self.broker_protocol = self.ducc_properties.get('ducc.broker.protocol')
self.broker_host = self.ducc_properties.get('ducc.broker.hostname')
self.broker_port = self.ducc_properties.get('ducc.broker.port')
self.broker_jmx_port = self.ducc_properties.get('ducc.broker.jmx.port')
self.broker_decoration = self.ducc_properties.get('ducc.broker.url.decoration')
self.broker_url = self.broker_protocol + '://' + self.broker_host + ':' + self.broker_port
self.agent_jvm_args = self.ducc_properties.get('ducc.agent.jvm.args')
self.ws_jvm_args = self.ducc_properties.get('ducc.ws.jvm.args')
self.pm_jvm_args = self.ducc_properties.get('ducc.pm.jvm.args')
self.rm_jvm_args = self.ducc_properties.get('ducc.rm.jvm.args')
self.sm_jvm_args = self.ducc_properties.get('ducc.sm.jvm.args')
self.or_jvm_args = self.ducc_properties.get('ducc.orchestrator.jvm.args')
if ( self.broker_decoration == '' ):
self.broker_decoration = None
if ( self.broker_decoration != None ):
self.broker_url = self.broker_url + '?' + self.broker_decoration
if ( self.webserver_node == None ):
self.webserver_node = self.localhost
def merge_properties(self):
# first task, always, merge the properties so subsequent code can depend on their validity.
base_props = DUCC_HOME + '/resources/default.ducc.properties'
site_props = DUCC_HOME + '/resources/site.ducc.properties'
run_props = DUCC_HOME + '/resources/ducc.properties'
merger = DUCC_HOME + '/admin/ducc_props_manager'
CMD = [merger, '--merge', base_props, '--with', site_props, '--to', run_props]
CMD = ' '.join(CMD)
print 'Merging', base_props, 'with', site_props, 'into', run_props
os.system(CMD)
def db_configure(self):
dbhost = self.ducc_properties.get('ducc.database.host')
if ( dbhost == self.db_disabled ):
self.db_bypass = True
return;
else:
self.db_bypass = False
dbprops = Properties()
dbprops.load(self.DUCC_HOME + '/resources.private/ducc.private.properties')
self.db_password = dbprops.get('db_password')
if ( self.db_password == None ):
print "bypassing database because no password is set."
self.db_bypass = True
self.db_password_guest = dbprops.get('db_password_guest')
if ( self.db_password_guest == None ):
self.db_password_guest = 'guest'
# does the database process exist?
def db_process_alive(self):
pidfile = self.DUCC_HOME + '/state/cassandra.pid'
if ( not os.path.exists(pidfile) ):
return False
f = open(self.DUCC_HOME + '/state/cassandra.pid')
pid = f.read();
f.close()
answer = []
if ( self.system == 'Darwin'):
ps = 'ps -eo user,pid,comm,args ' + pid
else:
ps = 'ps -eo user:14,pid,comm,args ' + pid
lines = self.popen(ps)
for line in lines:
line = line.strip()
if (pid in line and 'cassandra' in line):
return True
return False
# contact the database and see how useful it seems to be
def db_alive(self, retry=10, verbose=True):
if ( self.db_bypass == True ):
return True
else:
return self.db_alive_check(retry,verbose)
def db_alive_check(self, retry=10, verbose=True):
dbnode = self.ducc_properties.get('ducc.database.host')
if ( dbnode == None ):
if(verbose):
print 'No database location defined.'
return False
pidfile = self.DUCC_HOME + '/state/cassandra.pid'
if ( not os.path.exists(pidfile) ):
if(verbose):
print 'Database pid file does not exist. Checking DB connectivity.'
# get our log4j config into the path to shut up noisy logging
os.environ['CLASSPATH'] = os.environ['CLASSPATH'] + ':' + self.DUCC_HOME + '/resources'
CMD = [self.java(), 'org.apache.uima.ducc.database.DbAlive', dbnode, 'ducc', self.db_password, str(retry)]
CMD = ' '.join(CMD)
if(not verbose):
CMD = CMD + " >/dev/null 2>&1"
rc = os.system(CMD)
if ( rc == 0 ):
return True
else:
return False
def db_start(self):
# bypass all of this for the initial delivery
if ( self.db_bypass == True) :
print ' (Bypass database start because ducc.database.host =', self.db_disabled + ')'
return True
print 'Starting database'
dbnode = self.ducc_properties.get('ducc.database.host')
dbu.update_cassandra_config(self.DUCC_HOME, dbnode)
max_attempts = 5
attempt = 1
while attempt <= max_attempts:
lines = self.ssh(dbnode, True, "'", self.DUCC_HOME + '/admin/ducc.py', '-c', 'db', '--nodup', "'")
# we'll capture anything that the python shell spews because it may be useful, and then drop the
# pipe when we see a PID message
while True:
try:
line = lines.readline().strip()
except:
break
#print '[]', line
if ( not line ):
break
if ( line == '' ):
break
if ( line == 'Database is already running.' ):
print 'Database process is running ... if connection fails stop the DB and try again'
break
if ( line == 'OK' ):
print 'GOT OK from db start'
lines.close();
print 'waiting for database to start'
if ( self.db_alive() ):
return True
attempt = attempt + 1
if attempt <= max_attempts:
print 'Did not connect to database, retrying (', attempt, 'of', max_attempts, ')'
return False
def db_stop(self):
if ( self.db_bypass == True) :
print ' (Bypass database stop because ducc.database.host =', self.db_disabled + ')'
return True
pidfile = self.DUCC_HOME + '/state/cassandra.pid'
if ( os.path.exists(pidfile) ):
# for cassandra, just send it a terminate signal. a pidfile is written on startup
CMD = ['kill', '-TERM', '`cat ' + pidfile + '`']
CMD = ' '.join(CMD)
os.system(CMD)
def find_netstat(self):
# don't you wish people would get together on where stuff lives?
if ( os.path.exists('/sbin/netstat') ):
return '/sbin/netstat'
if ( os.path.exists('/usr/sbin/netstat') ):
return '/usr/sbin/netstat'
if ( os.path.exists('/bin/netstat') ):
return '/bin/netstat'
if ( os.path.exists('/sbin/netstat') ):
return '/usr/bin/netstat'
print 'Cannot find netstat'
return None
def is_amq_active(self):
netstat = self.find_netstat()
if ( netstat == None ):
print "Cannot determine if ActiveMq broker is alive."
return false
lines = self.ssh(self.broker_host, True, netstat, '-an')
#
# look for lines like this with the configured port in the 4th token, and
# ending with LISTEN:
#
# tcp 0 0 :::61616 :::* LISTEN
for line in lines:
toks = line.split()
#print '[]', line
if ( toks[-1] == 'LISTEN' ):
port = toks[3]
if (port.endswith(self.broker_port)):
return True
return False
def stop_broker(self):
broker_host = self.ducc_properties.get('ducc.broker.hostname')
broker_home = self.ducc_properties.get('ducc.broker.home')
broker_name = self.ducc_properties.get('ducc.broker.name')
broker_jmx = self.ducc_properties.get('ducc.broker.jmx.port')
here = os.getcwd()
CMD = broker_home + '/bin/activemq'
CMD = CMD + ' stop --all'
CMD = CMD + ' --jmxurl service:jmx:rmi:///jndi/rmi://' + broker_host + ':' + broker_jmx + '/jmxrmi'
CMD = CMD + ' ' + broker_name
CMD = 'JAVA_HOME=' + self.java_home() + ' ' + CMD
print '--------------------', CMD
lines = self.ssh(broker_host, True, CMD)
for l in lines:
pass # throw away junk from ssh
def nohup(self, cmd, showpid=True):
# Skip use of ssh?
if cmd[0] == "ssh" and 'false' == self.ssh_enabled:
cmd = cmd[2:]
cmd = ' '.join(cmd)
# print '**** nohup', cmd, '****'
devnw = open(os.devnull, 'w')
devnr = open(os.devnull, 'r')
ducc = subprocess.Popen(cmd, shell=True, stdin=devnr, stdout=devnw, stderr=devnw)
devnr.close()
devnw.close()
if ( showpid ) :
print 'PID', ducc.pid
def ssh_operational(self, node):
is_operational = False
req = node.split('.')[0]
cmd = '/bin/hostname'
ssh_cmd = 'ssh -q -o BatchMode=yes -o ConnectTimeout=10'+' '+node+" "+cmd
resp = self.popen(ssh_cmd)
lines = resp.readlines()
if(len(lines)== 1):
line = lines[0]
line = line.strip();
rsp = line.split('.')[0]
if(req == rsp):
is_operational = True;
if(not is_operational):
print 'ssh not operational - unexpected results'
print ssh_cmd
for line in lines:
print line
return is_operational
# like popen, only it spawns via ssh
# Skip use of ssh?
# NOTE: Current callers always have do_wait True
def ssh(self, host, do_wait, *CMD):
cmd = ' '.join(CMD)
# Some callers quote the string which is OK for ssh but not the direct call
if cmd[0] == "'" and cmd[-1] == "'":
cmd = cmd[1:len(cmd)-2]
if ( do_wait ):
if 'false' == self.ssh_enabled:
return self.popen(cmd)
return self.popen('ssh -q -o BatchMode=yes -o ConnectTimeout=10', host, cmd)
else:
if 'false' == self.ssh_enabled:
return self.spawn(cmd)
return self.spawn('ssh -q -o BatchMode=yes -o ConnectTimeout=10', host, cmd)
def set_classpath(self):
DH = self.DUCC_HOME + '/'
LIB = DH + 'lib/'
local_jars = self.ducc_properties.get('ducc.local.jars') #local mods
CLASSPATH = ''
if ( local_jars != None ):
extra_jars = local_jars.split()
for j in extra_jars:
CLASSPATH = CLASSPATH + ':' + LIB + j
CLASSPATH = CLASSPATH + ':' + DH + 'apache-uima/lib/*'
CLASSPATH = CLASSPATH + ':' + DH + 'apache-uima/apache-activemq/lib/*'
CLASSPATH = CLASSPATH + ':' + DH + 'apache-uima/apache-activemq/lib/optional/*'
CLASSPATH = CLASSPATH + ':' + LIB + 'apache-commons/*'
CLASSPATH = CLASSPATH + ':' + LIB + 'guava/*'
CLASSPATH = CLASSPATH + ':' + LIB + 'google-gson/*'
CLASSPATH = CLASSPATH + ':' + LIB + 'apache-log4j/*'
CLASSPATH = CLASSPATH + ':' + LIB + 'apache-camel/*'
CLASSPATH = CLASSPATH + ':' + LIB + 'joda-time/*'
CLASSPATH = CLASSPATH + ':' + LIB + 'springframework/*'
CLASSPATH = CLASSPATH + ':' + LIB + 'jna/*'
CLASSPATH = CLASSPATH + ':' + LIB + 'libpam4j/*'
CLASSPATH = CLASSPATH + ':' + LIB + 'uima-ducc/*'
CLASSPATH = CLASSPATH + ':' + LIB + 'cassandra/*'
# CLASSPATH = CLASSPATH + ':' + DH + 'resources' UIMA-4168 Use API, not classpath to configure log4j
# more are added to some components in ducc.py, e.g.
# apache-activemq/lib/optional, jetty from ws lib, jsp, http- client
#
os.environ['CLASSPATH'] = CLASSPATH
def format_classpath(self, cp):
strings = cp.split(':')
for s in strings:
print s
def check_clock_skew(self, localdate):
user = os.environ['LOGNAME']
bypass = (user != self.ducc_uid)
if bypass:
tag = 'NOTE'
else:
tag = 'NOTOK'
# Check clock skew
ok = True
acceptable_skew = 300
skew = abs(long(localdate) - long(time.time()))
if ( skew > (acceptable_skew) ):
ok = False
print tag, 'Clock skew[', skew, '] on', os.uname()[1], ". Remote time is", time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.localtime())
return ok or bypass
def set_duccling_version(self):
CMD = self.duccling + ' -v >' + self.DUCC_HOME + '/state/duccling.version'
os.system(CMD)
print 'Set ducc_ling version from', self.localhost, ':', CMD
def verify_limits(self):
ret = True
if ( self.system == 'Darwin' ):
return ret # on mac, just use what you have
proclimit = 20000
filelimit = 8192
(softnproc , hardnproc) = resource.getrlimit(resource.RLIMIT_NPROC)
(softnfiles, hardnfiles) = resource.getrlimit(resource.RLIMIT_NOFILE)
if ( softnproc < hardnproc ):
try:
resource.setrlimit(resource.RLIMIT_NPROC, (hardnproc, hardnproc))
except:
print 'NOTOK: could not set soft RLIMIT_NPROC up to the hard limit'
ret = False
if ( softnfiles < hardnfiles ):
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (hardnfiles, hardnfiles))
except:
print 'NOTOK: could not set soft RLIMIT_NOFILES up to the hard limit'
ret = False
(softnproc , hardnproc) = resource.getrlimit(resource.RLIMIT_NPROC)
(softnfiles, hardnfiles) = resource.getrlimit(resource.RLIMIT_NOFILE)
if ( softnproc < proclimit ):
print 'WARN: Soft limit RLIMIT_NPROC is too small at', softnproc, '(<', proclimit, ' ). DUCC may be unable to create sufficient threads.'
if ( softnfiles < filelimit ):
print 'WARN: Soft limit RLIMIT_NOFILES is too small at', softnfiles, '(<', filelimit, '). DUCC may be unable to open sufficient files or sockets.'
return ret
def verify_jvm(self):
jvm = self.java()
CMD = jvm + ' -version > /dev/null 2>&1'
rc = os.system(CMD)
if ( rc != 0 ):
print 'NOTOK', CMD, 'returns', int(rc), '. Must return rc 0. Startup cannot continue.'
return False
return True
# inspect ducc.head.failover
def verify_head_failover(self, head):
key = "ducc.head.failover"
failover = self.ducc_properties.get(key)
# check for no failover
if(failover == None):
logger.debug(key+" not specified")
else:
# insure ducc.head listed in ducc.head.failover
if(not head in failover):
text = head+" not found in "+key
logger.error(text)
sys.exit(1);
# test viability fo failover nodes
nodes = failover.replace(',',' ').split()
for node in nodes:
if(self.ssh_operational(node)):
text = "ssh is operational to "+node
logger.debug(text)
else:
text = "ssh to specified failover node unsuccessful or otherwise problematic: "+node
logger.warn(text)
# Exit if this is not the head node. Ignore the domain as uname sometimes drops it.
# Also check that ssh to this node works
# Also restrict operations to the userid that installed ducc
def verify_head(self):
head = self.ducc_properties.get("ducc.head").split('.')[0]
local = self.localhost.split('.')[0]
if local != head:
print ">>> ERROR - this script must be run from the head node"
sys.exit(1);
node = head
if(self.ssh_operational(node)):
text = "ssh is operational to "+node
#print text
else:
print ">>> ERROR - this script cannot ssh to head node"
sys.exit(1);
# Ensure that root or another id doesn't start/stop ducc
dir_stat = os.stat(DUCC_HOME + '/resources/site.ducc.properties')
if dir_stat.st_uid != os.getuid():
print ">>> ERROR - this script must be run by the userid that installed DUCC"
sys.exit(1);
self.verify_head_failover(head)
#
# Verify the viability of ducc_ling.
# Returns a tuple (viable, elevated, safe)
# where 'viable' is a boolean indicating whether the ducc_ling is viable (exists and is correct version)
# If this is false, the other two values are meaningless
#
# 'elevated' indicates whether priveleges are at least partially elevated
# 'safe' indicates whether ducc_ling is safe (elevated privileges and correct permissions)
# The caller will evaluate these and take appriopriate action
#
def verify_duccling(self):
viable = True
elevated = False
safe = False
dl = self.duccling
path = os.path.dirname(os.path.abspath(dl))
if ( not (os.path.exists(dl) and os.access(dl, os.X_OK)) ):
print dl, 'does not exist or is not executable.'
viable = False
path = os.path.dirname(os.path.abspath(dl))
dl = path + '/ducc_ling'
dl_stat = os.stat(dl) # dl_stat is stat for ducc_ling
dl_mode = dl_stat.st_mode
if ( (dl_mode & S_ISUID) != S_ISUID):
if ( os.environ['LOGNAME'] == self.ducc_uid ):
print 'ducc_ling module', dl, ': setuid bit is not set. Processes will run as user '+self.ducc_uid
elevated = False
file_safe = True
dir_safe = True
own_safe = True
else:
elevated = True
file_safe = False
dir_safe = False
own_safe = False
if ( elevated ):
#
# if setuid bit is set, all this MUST be true or we won't mark ducc_ling safe:
# file permissions are 750
# dir permissions are 700
# owenership is root.ducc
#
dl_perm = oct(dl_mode & (S_IRWXU | S_IRWXG | S_IRWXO))
expected = oct(0750)
if ( dl_perm != expected ):
print dl, ': Invalid execution bits', dl_perm, 'should be', expected
else:
file_safe = True
dir_stat = os.stat(path) # dir_stat is stat for ducc_ling
dir_mode = dir_stat.st_mode
dir_perm = oct(dir_mode & (S_IRWXU | S_IRWXG | S_IRWXO))
expected = oct(0700)
if ( dir_perm != expected ):
print 'ducc_ling', path, ': Invalid directory permissions', dir_perm, 'should be', expected
else:
dir_safe = True
try:
grpinfo = grp.getgrnam(self.ducc_uid)
duccgid = grpinfo.gr_gid
if ( (dl_stat.st_uid != 0) or (dl_stat.st_gid != duccgid) ):
print 'ducc_ling module', dl, ': Invalid ownership. Should be '+self.ducc_uid
else:
own_safe = True
except:
print 'ducc_ling group "'+self.ducc_uid+'" cannot be found.'
safe = file_safe and dir_safe and own_safe
# A last viability check, do versions match? This also runs it, proving it
# can execute in this environment.
lines = self.popen(self.duccling + ' -v')
version_from_head = lines.readline().strip();
toks = version_from_head.split()
version_from_head = ' '.join(toks[0:4])
version_file = self.DUCC_HOME + '/state/duccling.version';
if ( os.path.exists(version_file) ):
verfile = open(version_file)
for line in verfile:
line = line.strip();
toks = line.split();
line = ' '.join(toks[0:4])
if ( line != version_from_head ):
print "Mismatched ducc_ling versions:"
print "ALERT: Version on Agent Node:", version_from_head
print "ALERT: Version on Ducc Head:", line
viable = False
verfile.close()
else:
print "NOTE: ducc_ling version file missing, cannot verify version."
# leave the decisions to the caller
return (viable, elevated, safe)
# Apply these rules to determine if ducc_ling is installed ok
#
# Caller Elevated Protected (safe) Action
# -------- -------- --------- ------
# ducc Y Y OK
# Y N Fail
#
# N Y (by def) OK
#
# ~ducc Y N (by def) Fail
# N Y (by def) OK, Note
def duccling_ok(self, viable, elevated, safe):
if ( not viable ):
return False
user = os.environ['LOGNAME']
if ( user == self.ducc_uid ):
if ( elevated ):
return safe
else:
if ( elevated ):
return False
print 'Note: Running unprivileged ducc_ling. Process will run as user', user
return True
def ssh_ok(self, node, line):
spacer = ' '
messages = []
if ( line.startswith("Permission denied") ):
messages.append(' ')
messages.append(spacer + "ALERT: Passwordless SSH is not configured correctly for node " + node)
messages.append(spacer + "ALERT: SSH returns '" + line + "'")
return messages
if ( line.startswith("Host key verification failed") ):
messages.append(' ')
messages.append(spacer + "ALERT: Passwordless SSH is not configured correctly for node " + node)
messages.append(spacer + "ALERT: SSH returns '" + line + "'")
return messages
if ( line.find("Connection refused") >= 0 ):
messages.append(' ')
messages.append(spacer + "ALERT: SSH is not not enabled on node " + node)
messages.append(spacer + "ALERT: SSH returns '" + line + "'")
return messages
if ( line.find("Connection timed") >= 0 ):
messages.append(' ')
messages.append(spacer + "\nALERT: SSH did not respond with timeout of 10 secnds " + node)
messages.append(spacer + "ALERT: SSH returns '" + line + "'")
return messages
if ( line.find("No route") >= 0 ):
messages.append(' ')
messages.append(spacer + 'ALERT: SSH cannot connect to host.')
messages.append(spacer + "ALERT: SSH returns '" + line + "'")
return messages
return None
#
# Input is array lines from ps command looking for ducc processes owned this user.
# Output is list of dictionaries, where each dictionary describes a ducc process.
#
# If no ducc processes are found here the list is empty.
#
# The caller executes the 'ps' command and knows the node this is for.
#
def find_ducc_process(self, node):
answer = []
if ( self.system == 'Darwin'):
ps = 'ps -eo user,pid,comm,args'
else:
ps = 'ps -eo user:14,pid,comm,args'
resp = self.ssh(node, True, ps)
ok = True
while True:
line = resp.readline().strip()
if ( line.startswith('PID')):
continue
ssh_errors = self.ssh_ok(line, node)
if ( ssh_errors != None ):
for m in ssh_errors:
print m
ok = False
continue
# from here on, assume no error
if ( not line ):
break
toks = line.split()
if ( len(toks) < 4):
continue
user = toks[0]
pid = toks[1]
procname = toks[2]
fullargs = toks[3:]
if ( not ('java' in procname) and not ('JIT' in procname)):
continue
cont = False
for tok in fullargs:
if ( tok.startswith('-Dducc.deploy.components=') ):
cmp = tok.split('=')
dp = (cmp[1], pid, user)
answer.append(dp)
cont = True
break
if ( tok.startswith('-DDUCC_BROKER_CREDENTIALS_FILE=') ):
dp = ('broker', pid, user)
answer.append(dp)
cont = True
break
if ( cont ): # stupid python only continues out of inner loop
continue
if fullargs[-1] == 'org.apache.cassandra.service.CassandraDaemon':
dp = ('database', pid, user)
answer.append(dp)
continue
# Look for site-specific processes
other_processes = find_other_processes(pid, user, line)
if ( type(other_processes) is list ):
if ( len(other_processes) > 0 ):
answer = answer + other_processes
else:
print 'Invalid response from \'find_other_processes\':', other_processes
return (ok, answer)
#
# Given the name of a file containing ducc nodes, a ducc user (usually the 'ducc' user unless you're running
# as yourself for test), find all ducc processes owned by this user and print them to the console.
#
def find_ducc(self, nodefile, user):
if ( nodefile == None ):
nodefile = self.DUCC_HOME + '/resources/ducc.nodes'
if ( not os.path.exists(nodefile) ):
print 'Nodefile', nodefile, 'does not exist or cannot be read.'
sys.exit(1)
answer = {}
nodes = []
f = open(nodefile)
for node in f:
node = node.strip()
if ( not node ):
continue
if ( node.startswith('#') ):
continue
nodes.append(node)
if ( self.webserver_node != 'localhost' ): # might be configured somewhere else
nodes.append(self.webserver_node)
for node in nodes:
data = self.find_ducc_process(node, user)
answer[node] = data
return answer
def kill_process(self, node, proc, signal):
lines = self.ssh(node, True, 'kill', signal, proc[1])
for l in lines:
pass # throw away the noise
def clean_shutdown(self):
DUCC_JVM_OPTS = ' -Dducc.deploy.configuration=' + self.DUCC_HOME + "/resources/ducc.properties "
DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -DDUCC_HOME=' + self.DUCC_HOME
DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -Dducc.head=' + self.ducc_properties.get('ducc.head')
self.spawn(self.java(), DUCC_JVM_OPTS, 'org.apache.uima.ducc.common.main.DuccAdmin', '--killAll')
def get_os_pagesize(self):
lines = self.popen('/usr/bin/getconf', 'PAGESIZE')
return lines.readline().strip()
def show_ducc_environment(self):
global use_threading
#
# Print the java version
#
response = []
jvm = self.ducc_properties.get('ducc.jvm')
jvm = jvm.replace('//', '/')
check_java = True
if ( jvm == None ):
response.append('WARNING: No jvm configured. Default is used.')
jvm = 'java'
else:
response.append('ENV: Java is configured as: ' + jvm)
if ( not os.path.exists(jvm) ):
print 'NOTOK: configured jvm cannot be found:', jvm
check_java = False
if ( check_java ):
print 'JVM:', jvm
lines = self.popen(jvm + ' -fullversion')
for line in lines:
response.append('ENV ' + line.strip())
response.append('ENV: Threading enabled: ' + str(use_threading))
#
# Get the total memory for the node
#
if ( self.system != 'Darwin' ):
meminfo = Properties()
meminfo.load('/proc/meminfo')
mem = meminfo.get('MemTotal')
if ( mem.endswith('kB') ):
toks = mem.split(' ')
mem = str(int(toks[0]) / (1024*1024)) + ' gB'
response.append('MEM: memory is ' + mem)
#
# Get the operating system information
#
response.append('ENV: system is ' + self.system)
return response
#
# Resolve the 'path' relative to the path 'relative_to'
#
def resolve(self, path, relative_to):
if ( not path.startswith('/') ):
(head, tail) = os.path.split(os.path.abspath(relative_to))
path = head + '/' + path
return path
#
# Read the nodefile, recursing into 'imports' if needed, returning a
# map. The map is keyed on filename, with each entry a list of the nodes.
#
def read_nodefile(self, nodefile, ret):
#print 'READ_NODEFILE:', nodefile, ret
n_nodes = 0
if ( os.path.exists(nodefile) ):
nodes = []
f = open(nodefile)
for node in f:
node = node.strip()
if ( not node ):
continue
if ( node.startswith('#') ):
continue
if ( node.startswith('import ') ):
toks = node.split(' ')
newfile = toks[1]
newfile = self.resolve(newfile, nodefile) # resolve newfile relative to nodefile
(count, ret) = self.read_nodefile(newfile, ret)
n_nodes = n_nodes + count
continue
nodes.append(node)
n_nodes = n_nodes + 1
ret[nodefile] = nodes
else:
print 'Cannot read nodefile', nodefile
ret[nodefile] = None
#print 'RETURN', n_nodes, nodefile, ret
return (n_nodes, ret)
def compare_nodes(self, n1, n2):
if ( n1 == n2 ): # exact match - covers both short and both long
return True
if ( n1.find('.') >= 0 ): # shortened n1 == n2?
t1 = n1.split('.')
n1A = t1[0]
if ( n1A == n2 ):
return True
if ( n2.find('.') >= 0 ): # n1 == shortened n2?
t2 = n2.split('.')
n2A = t2[0]
if ( n1 == n2A ):
return True
return False
def verify_class_configuration(self, allnodes, verbose):
print 'allnodes', allnodes
answer = True
# first, find the class definition
classfile = self.ducc_properties.get('ducc.rm.class.definitions')
print 'Class definition file is', classfile
CMD = self.jvm
CMD = CMD + " -DDUCC_HOME=" + self.DUCC_HOME
CMD = CMD + " org.apache.uima.ducc.common.NodeConfiguration "
CMD = CMD + " -n " + allnodes
CMD = CMD + " -c " + classfile
if ( verbose ):
CMD = CMD + " -p " + classfile
print CMD
else:
CMD = CMD + " " + classfile
rc = os.system(CMD)
if ( rc == 0 ):
print "OK: Class and node definitions validated."
else:
print "NOTOK: Cannot validate class and/or node definitions."
return (rc == 0)
def get_nodepool(self, node, default=''):
classpath = '"'+self.DUCC_HOME+'/lib/uima-ducc/*'+'"'
#print classpath
classfile = self.ducc_properties.get('ducc.rm.class.definitions')
#print 'classfile: '+classfile
cmd = ''
cmd = cmd+self.jvm
cmd = cmd + ' '
cmd = cmd+'-cp '+classpath
cmd = cmd + ' '
cmd = cmd+'-DDUCC_HOME='+self.DUCC_HOME
cmd = cmd + ' '
cmd = cmd+'org.apache.uima.ducc.common.NodeConfiguration'
cmd = cmd + ' '
cmd = cmd+'-c'+' '+classfile
cmd = cmd + ' '
cmd = cmd+'-m'+' '+node
cmd = ''.join(cmd)
#print 'cmd: '+cmd
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
(out, err) = p.communicate()
status = p.wait()
result = out.strip()
if(result == ''):
result = default
#print 'result: '+result
#print 'status: '+str(status)
return result
def get_nodepool_file(self, node, default=''):
classpath = '"'+self.DUCC_HOME+'/lib/uima-ducc/*'+'"'
#print classpath
classfile = self.ducc_properties.get('ducc.rm.class.definitions')
#print 'classfile: '+classfile
cmd = ''
cmd = cmd+self.jvm
cmd = cmd + ' '
cmd = cmd+'-cp '+classpath
cmd = cmd + ' '
cmd = cmd+'-DDUCC_HOME='+self.DUCC_HOME
cmd = cmd + ' '
cmd = cmd+'org.apache.uima.ducc.common.NodeConfiguration'
cmd = cmd + ' '
cmd = cmd+'-c'+' '+classfile
cmd = cmd + ' '
cmd = cmd+'-m'+' '+node
cmd = cmd + ' '
cmd = cmd+'-f'+' '+node
cmd = ''.join(cmd)
#print 'cmd: '+cmd
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
(out, err) = p.communicate()
status = p.wait()
result = out.strip()
if(result == ''):
result = default
#print 'result: '+result
#print 'status: '+str(status)
return result
def verify_head_failover_configuration(self):
rc = 0
failover_nodes = self.ducc_properties.get('ducc.head.failover')
message = "OK: Head node failover not configured."
if(failover_nodes != None):
failover_nodes = failover_nodes.strip()
if(len(failover_nodes) >= 0):
nodes = failover_nodes.split()
head_node = self.ducc_properties.get('ducc.head')
head_pool = self.get_nodepool(head_node,'<None>')
for node in nodes:
node_pool = self.get_nodepool(node,'<None>')
#print 'head:'+head_pool+' '+'node:'+node_pool
if( head_pool != node_pool):
if(rc == 0):
message = 'OK: Head failover node '+head_node+' in node pool '+head_pool
print message
message = 'NOTOK: Head failover node '+node+' in node pool '+node_pool
print message
rc = 1
if (rc > 0):
message = "NOTOK: Head failover nodepools incorrectly configured."
else:
message = "OK: Head failover nodepools correctly configured."
print message
return (rc == 0)
def disable_threading(self):
global use_threading
use_threading = False
def installed(self):
head = self.ducc_properties.get('ducc.head')
if ( head == '<head-node>' ):
return False
return True
def __init__(self, merge=False):
global use_threading
DuccBase.__init__(self, merge)
self.db_disabled = '--disabled--'
self.duccling = None
self.broker_url = 'tcp://localhost:61616'
self.broker_protocol = 'tcp'
self.broker_host = 'localhost'
self.broker_port = '61616'
self.default_components = ['rm', 'pm', 'sm', 'or', 'ws', 'db', 'broker']
self.default_nodefiles = [self.DUCC_HOME + '/resources/ducc.nodes']
if ( self.localhost == self.ducc_properties.get("ducc.head")):
self.is_ducc_head = True
os.environ['DUCC_NODENAME'] = self.localhost # to match java code's implicit propery so script and java match
self.pid_file = self.DUCC_HOME + '/state/ducc.pids'
self.set_classpath()
self.os_pagesize = self.get_os_pagesize()
self.update_properties()
self.db_configure()
manage_broker = self.ducc_properties.get('ducc.broker.automanage')
self.automanage = False
if (manage_broker in ('t', 'true', 'T', 'True')) :
self.automanage = True
py_version = platform.python_version().split('.')
if ( int(py_version[0]) > 2 ):
print "Warning, only Python Version 2 is supported."
if ( int(py_version[1]) < 4 ):
print "Python must be at least at version 2.4."
sys.exit(1)
if ( int(py_version[1]) < 6 ):
use_threading = False
if __name__ == "__main__":
util = DuccUtil()