blob: 51613eae0a1139b0fc85f53b23df2572ba8bcd7a [file] [log] [blame]
#!/bin/sh
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""":"
work_dir=$(dirname $0)
base_name=$(basename $0)
original_dir=$PWD
cd $work_dir
if [ $HOD_PYTHON_HOME ]; then
exec $HOD_PYTHON_HOME -u -OO $base_name ${1+"$@"}
elif [ -e /usr/bin/python ]; then
exec /usr/bin/python -u -OO $base_name ${1+"$@"}
elif [ -e /usr/local/bin/python ]; then
exec /usr/local/bin/python -u -OO $base_name ${1+"$@"}
else
exec python -u -OO $base_name ${1+"$@"}
fi
":"""
"""The executable to be used by the user"""
import sys, os, re, pwd, threading, sys, random, time, pprint, shutil, time, re
from pprint import pformat
from optparse import OptionParser
myName = os.path.basename(sys.argv[0])
myName = re.sub(".*/", "", myName)
binDirectory = os.path.realpath(sys.argv[0])
rootDirectory = re.sub("/bin/.*", "", binDirectory)
libDirectory = rootDirectory
sys.path.append(libDirectory)
from hodlib.Common.threads import simpleCommand
from hodlib.Common.util import local_fqdn, tar, filter_warnings,\
get_exception_string, get_exception_error_string
from hodlib.Common.logger import hodLog
from hodlib.Common.logger import getLogger
from hodlib.HodRing.hodRing import createMRSystemDirectoryManager
filter_warnings()
reVersion = re.compile(".*(\d+_\d+).*")
reHdfsURI = re.compile("(hdfs://.*?:\d+)(.*)")
VERSION = None
if os.path.exists("./VERSION"):
vFile = open("./VERSION", 'r')
VERSION = vFile.readline()
vFile.close()
def __archive_logs(conf, log):
# need log-destination-uri, __hadoopLogDirs, temp-dir
status = True
logUri = conf['log-destination-uri']
hadoopLogDirs = conf['hadoop-log-dirs']
if logUri:
try:
if hadoopLogDirs:
date = time.localtime()
for logDir in hadoopLogDirs:
(head, tail) = os.path.split(logDir)
(head, logType) = os.path.split(head)
tarBallFile = "%s-%s-%04d%02d%02d%02d%02d%02d-%s.tar.gz" % (
logType, local_fqdn(), date[0], date[1], date[2], date[3],
date[4], date[5], random.randint(0,1000))
if logUri.startswith('file://'):
tarBallFile = os.path.join(logUri[7:],
tarBallFile)
else:
tarBallFile = os.path.join(conf['temp-dir'], tarBallFile)
log.debug('archiving log files to: %s' % tarBallFile)
status = tar(tarBallFile, logDir, ['*',])
log.info('archive %s status: %s' % (tarBallFile, status))
if status and \
logUri.startswith('hdfs://'):
__copy_archive_to_dfs(conf, tarBallFile)
log.info("copying archive to dfs finished")
dict = {}
except:
log.error(get_exception_string())
status = False
return status
def __copy_archive_to_dfs(conf, archiveFile):
# need log-destination-uri, hadoopCommandstring and/or pkgs
hdfsURIMatch = reHdfsURI.match(conf['log-destination-uri'])
(head, tail) = os.path.split(archiveFile)
destFile = os.path.join(hdfsURIMatch.group(2), conf['user-id'], 'hod-logs', conf['service-id'], tail)
log.info("copying archive %s to DFS %s ..." % (archiveFile, destFile))
hadoopCmd = conf['hadoop-command-string']
if conf['pkgs']:
hadoopCmd = os.path.join(conf['pkgs'], 'bin', 'hadoop')
copyCommand = "%s dfs -fs %s -copyFromLocal %s %s" % (hadoopCmd,
hdfsURIMatch.group(1), archiveFile, destFile)
log.debug(copyCommand)
copyThread = simpleCommand('hadoop', copyCommand)
copyThread.start()
copyThread.wait()
copyThread.join()
log.debug(pprint.pformat(copyThread.output()))
os.unlink(archiveFile)
def unpack():
parser = OptionParser()
option_list=["--log-destination-uri", "--hadoop-log-dirs", \
"--temp-dir", "--hadoop-command-string", "--pkgs", "--user-id", \
"--service-id", "--hodring-debug", "--hodring-log-dir", \
"--hodring-syslog-address", "--hodring-cleanup-list", \
"--jt-pid", "--mr-sys-dir", "--fs-name", "--hadoop-path"]
regexp = re.compile("^--")
for opt in option_list:
parser.add_option(opt,dest=regexp.sub("",opt),action="store")
option_list.append("--hodring-stream")
parser.add_option("--hodring-stream",dest="hodring-stream",metavar="bool",\
action="store_true")
(options, args) = parser.parse_args()
_options= {}
_options['hodring'] = {}
for opt in dir(options):
if "--"+opt in option_list:
_options[opt] = getattr(options,opt)
if _options.has_key('hadoop-log-dirs') and _options['hadoop-log-dirs']:
_options['hadoop-log-dirs'] = _options['hadoop-log-dirs'].split(",")
if _options.has_key('hodring-syslog-address') and _options['hodring-syslog-address']:
_options['hodring']['syslog-address'] = \
_options['hodring-syslog-address'].split(':')
_options['hodring']['debug'] = int(_options['hodring-debug'])
_options['hodring']['log-dir'] = _options['hodring-log-dir']
_options['hodring']['stream'] = _options['hodring-stream']
_options['hodring']['userid'] = _options['user-id']
os.putenv('PBS_JOBID', _options['service-id'] )
return _options
if __name__ == '__main__':
log = None
try:
conf = unpack()
# Use the same log as hodring
log = getLogger(conf['hodring'],'hodring')
log.debug("Logger initialised successfully")
mrSysDirManager = createMRSystemDirectoryManager(conf, log)
if mrSysDirManager is not None:
mrSysDirManager.removeMRSystemDirectory()
status = __archive_logs(conf,log)
log.info("Archive status : %s" % status)
list = conf['hodring-cleanup-list'].split(',')
log.info("now removing %s" % list)
for dir in list:
if os.path.exists(dir):
log.debug('removing %s' % (dir))
shutil.rmtree(dir, True)
log.debug("done")
log.info("Cleanup successfully completed")
except Exception, e:
if log:
log.info("Stack trace:\n%s\n%s" %(get_exception_error_string(),get_exception_string()))