blob: 3d5cb6fade874088fba1883a212ce48659833041 [file] [log] [blame]
#Licensed to the Apache Software Foundation (ASF) under one
#or more contributor license agreements. See the NOTICE file
#distributed with this work for additional information
#regarding copyright ownership. The ASF licenses this file
#to you under the Apache License, Version 2.0 (the
#"License"); you may not use this file except in compliance
#with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
import errno, sys, os, traceback, stat, socket, re, warnings, signal
from hodlib.Common.tcp import tcpSocket, tcpError
from hodlib.Common.threads import simpleCommand
setUGV = { 'S_ISUID' : 2, 'S_ISGID' : 1, 'S_ISVTX' : 0 }
reEscapeSeq = r"\\(.)?"
reEscapeSeq = re.compile(reEscapeSeq)
HOD_INTERRUPTED_CODE = 127
HOD_INTERRUPTED_MESG = "Hod interrupted. Cleaning up and exiting"
TORQUE_USER_LIMITS_COMMENT_FIELD = "User-limits exceeded. " + \
"Requested:([0-9]*) Used:([0-9]*) MaxLimit:([0-9]*)"
TORQUE_USER_LIMITS_EXCEEDED_MSG = "Requested number of nodes exceeded " + \
"maximum user limits. "
class AlarmException(Exception):
def __init__(self, msg=''):
self.message = msg
Exception.__init__(self, msg)
def __repr__(self):
return self.message
def isProcessRunning(pid):
'''Check if a process is running, by sending it a 0 signal, and checking for errors'''
# This method is documented in some email threads on the python mailing list.
# For e.g.: http://mail.python.org/pipermail/python-list/2002-May/144522.html
try:
os.kill(pid, 0)
return True
except OSError, err:
return err.errno == errno.EPERM
def untar(file, targetDir):
status = False
command = 'tar -C %s -zxf %s' % (targetDir, file)
commandObj = simpleCommand('untar', command)
commandObj.start()
commandObj.wait()
commandObj.join()
if commandObj.exit_code() == 0:
status = True
return status
def tar(tarFile, tarDirectory, tarList):
currentDir = os.getcwd()
os.chdir(tarDirectory)
status = False
command = 'tar -czf %s ' % (tarFile)
for file in tarList:
command = "%s%s " % (command, file)
commandObj = simpleCommand('tar', command)
commandObj.start()
commandObj.wait()
commandObj.join()
if commandObj.exit_code() == 0:
status = True
else:
status = commandObj.exit_status_string()
os.chdir(currentDir)
return status
def to_http_url(list):
"""convert [hostname, port] to a http url"""
str = ''
str = "http://%s:%s" % (list[0], list[1])
return str
def get_exception_string():
(type, value, tb) = sys.exc_info()
exceptList = traceback.format_exception(type, value, tb)
exceptString = ''
for line in exceptList:
exceptString = "%s%s" % (exceptString, line)
return exceptString
def get_exception_error_string():
(type, value, tb) = sys.exc_info()
if value:
exceptString = "%s %s" % (type, value)
else:
exceptString = type
return exceptString
def check_timestamp(timeStamp):
""" Checks the validity of a timeStamp.
timeStamp - (YYYY-MM-DD HH:MM:SS in UTC)
returns True or False
"""
isValid = True
try:
timeStruct = time.strptime(timeStamp, "%Y-%m-%d %H:%M:%S")
except:
isValid = False
return isValid
def sig_wrapper(sigNum, handler, *args):
if args:
handler(args)
else:
handler()
def get_perms(filename):
mode = stat.S_IMODE(os.stat(filename)[stat.ST_MODE])
permsString = ''
permSet = 0
place = 2
for who in "USR", "GRP", "OTH":
for what in "R", "W", "X":
if mode & getattr(stat,"S_I"+what+who):
permSet = permSet + 2**place
place = place - 1
permsString = "%s%s" % (permsString, permSet)
permSet = 0
place = 2
permSet = 0
for permFlag in setUGV.keys():
if mode & getattr(stat, permFlag):
permSet = permSet + 2**setUGV[permFlag]
permsString = "%s%s" % (permSet, permsString)
return permsString
def local_fqdn():
"""Return a system's true FQDN rather than any aliases, which are
occasionally returned by socket.gethostname."""
fqdn = None
me = os.uname()[1]
nameInfo=socket.gethostbyname_ex(me)
nameInfo[1].append(nameInfo[0])
for name in nameInfo[1]:
if name.count(".") and name.startswith(me):
fqdn = name
if fqdn == None:
fqdn = me
return(fqdn)
def need_to_allocate(allocated, config, command):
status = True
if allocated.isSet():
status = False
elif re.search("\s*dfs.*$", command) and \
config['gridservice-hdfs']['external']:
status = False
elif config['gridservice-mapred']['external']:
status = False
return status
def filter_warnings():
warnings.filterwarnings('ignore',
message=".*?'with' will become a reserved keyword.*")
def args_to_string(list):
"""return a string argument space seperated"""
arg = ''
for item in list:
arg = "%s%s " % (arg, item)
return arg[:-1]
def replace_escapes(object):
""" replace any escaped character. e.g \, with , \= with = and so on """
# here object is either a config object or a options object
for section in object._mySections:
for option in object._configDef[section].keys():
if object[section].has_key(option):
if object._configDef[section][option]['type'] == 'keyval':
keyValDict = object[section][option]
object[section][option] = {}
for (key,value) in keyValDict.iteritems():
match = reEscapeSeq.search(value)
if match:
value = reEscapeSeq.sub(r"\1", value)
object[section][option][key] = value
def hadoopVersion(hadoopDir, java_home, log):
# Determine the version of hadoop being used by executing the
# hadoop version command. Code earlier in idleTracker.py
hadoopVersion = { 'major' : None, 'minor' : None }
hadoopPath = os.path.join(hadoopDir, 'bin', 'hadoop')
cmd = "%s version" % hadoopPath
log.debug('Executing command %s to find hadoop version' % cmd)
env = os.environ
env['JAVA_HOME'] = java_home
hadoopVerCmd = simpleCommand('HadoopVersion', cmd, env)
hadoopVerCmd.start()
hadoopVerCmd.wait()
hadoopVerCmd.join()
if hadoopVerCmd.exit_code() == 0:
verLine = hadoopVerCmd.output()[0]
log.debug('Version from hadoop command: %s' % verLine)
hadoopVerRegExp = re.compile("Hadoop ([0-9]+)\.([0-9]+).*")
verMatch = hadoopVerRegExp.match(verLine)
if verMatch != None:
hadoopVersion['major'] = verMatch.group(1)
hadoopVersion['minor'] = verMatch.group(2)
return hadoopVersion
def get_cluster_status(hdfsAddress, mapredAddress):
"""Determine the status of the cluster based on socket availability
of HDFS and Map/Reduce."""
status = 0
mapredSocket = tcpSocket(mapredAddress)
try:
mapredSocket.open()
mapredSocket.close()
except tcpError:
status = 14
hdfsSocket = tcpSocket(hdfsAddress)
try:
hdfsSocket.open()
hdfsSocket.close()
except tcpError:
if status > 0:
status = 10
else:
status = 13
return status
def parseEquals(list):
# takes in a list of keyval pairs e.g ['a=b','c=d'] and returns a
# dict e.g {'a'='b','c'='d'}. Used in GridService/{mapred.py/hdfs.py} and
# HodRing/hodring.py. No need for specially treating escaped =. as in \=,
# since all keys are generated by hod and don't contain such anomalies
dict = {}
for elems in list:
splits = elems.split('=')
dict[splits[0]] = splits[1]
return dict
def getMapredSystemDirectory(mrSysDirRoot, userid, jobid):
return os.path.join(mrSysDirRoot, userid, 'mapredsystem', jobid)
class HodInterrupt:
def __init__(self):
self.HodInterruptFlag = False
self.log = None
def set_log(self, log):
self.log = log
def init_signals(self):
def sigStop(sigNum, handler):
sig_wrapper(sigNum, self.setFlag)
signal.signal(signal.SIGTERM, sigStop) # 15 : software termination signal
signal.signal(signal.SIGQUIT, sigStop) # 3 : Quit program
signal.signal(signal.SIGINT, sigStop) # 2 ^C : Interrupt program
def sig_wrapper(sigNum, handler, *args):
self.log.critical("Caught signal %s." % sigNum )
if args:
handler(args)
else:
handler()
def setFlag(self, val = True):
self.HodInterruptFlag = val
def isSet(self):
return self.HodInterruptFlag
class HodInterruptException(Exception):
def __init__(self, value = ""):
self.value = value
def __str__(self):
return repr(self.value)
hodInterrupt = HodInterrupt()