| #!/bin/sh |
| |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| |
| """:" |
| work_dir=$(dirname $0) |
| base_name=$(basename $0) |
| original_dir=$PWD |
| cd $work_dir |
| |
| if [ $HOD_PYTHON_HOME ]; then |
| exec $HOD_PYTHON_HOME -u -OO $base_name ${1+"$@"} |
| elif [ -e /usr/bin/python ]; then |
| exec /usr/bin/python -u -OO $base_name ${1+"$@"} |
| elif [ -e /usr/local/bin/python ]; then |
| exec /usr/local/bin/python -u -OO $base_name ${1+"$@"} |
| else |
| exec python -u -OO $base_name ${1+"$@"} |
| fi |
| ":""" |
| |
| """The executable to be used by the user""" |
| import sys, os, re, pwd, threading, sys, random, time, pprint, shutil, time, re |
| from pprint import pformat |
| from optparse import OptionParser |
| |
| myName = os.path.basename(sys.argv[0]) |
| myName = re.sub(".*/", "", myName) |
| binDirectory = os.path.realpath(sys.argv[0]) |
| rootDirectory = re.sub("/bin/.*", "", binDirectory) |
| libDirectory = rootDirectory |
| |
| sys.path.append(libDirectory) |
| |
| from hodlib.Common.threads import simpleCommand |
| from hodlib.Common.util import local_fqdn, tar, filter_warnings,\ |
| get_exception_string, get_exception_error_string |
| from hodlib.Common.logger import hodLog |
| from hodlib.Common.logger import getLogger |
| from hodlib.HodRing.hodRing import createMRSystemDirectoryManager |
| |
| filter_warnings() |
| |
| reVersion = re.compile(".*(\d+_\d+).*") |
| reHdfsURI = re.compile("(hdfs://.*?:\d+)(.*)") |
| |
| VERSION = None |
| if os.path.exists("./VERSION"): |
| vFile = open("./VERSION", 'r') |
| VERSION = vFile.readline() |
| vFile.close() |
| |
| def __archive_logs(conf, log): |
| # need log-destination-uri, __hadoopLogDirs, temp-dir |
| status = True |
| logUri = conf['log-destination-uri'] |
| hadoopLogDirs = conf['hadoop-log-dirs'] |
| if logUri: |
| try: |
| if hadoopLogDirs: |
| date = time.localtime() |
| for logDir in hadoopLogDirs: |
| (head, tail) = os.path.split(logDir) |
| (head, logType) = os.path.split(head) |
| tarBallFile = "%s-%s-%04d%02d%02d%02d%02d%02d-%s.tar.gz" % ( |
| logType, local_fqdn(), date[0], date[1], date[2], date[3], |
| date[4], date[5], random.randint(0,1000)) |
| |
| if logUri.startswith('file://'): |
| tarBallFile = os.path.join(logUri[7:], |
| tarBallFile) |
| else: |
| tarBallFile = os.path.join(conf['temp-dir'], tarBallFile) |
| |
| log.debug('archiving log files to: %s' % tarBallFile) |
| status = tar(tarBallFile, logDir, ['*',]) |
| log.info('archive %s status: %s' % (tarBallFile, status)) |
| if status and \ |
| logUri.startswith('hdfs://'): |
| __copy_archive_to_dfs(conf, tarBallFile) |
| log.info("copying archive to dfs finished") |
| dict = {} |
| except: |
| log.error(get_exception_string()) |
| status = False |
| return status |
| |
| |
| def __copy_archive_to_dfs(conf, archiveFile): |
| # need log-destination-uri, hadoopCommandstring and/or pkgs |
| hdfsURIMatch = reHdfsURI.match(conf['log-destination-uri']) |
| |
| (head, tail) = os.path.split(archiveFile) |
| destFile = os.path.join(hdfsURIMatch.group(2), conf['user-id'], 'hod-logs', conf['service-id'], tail) |
| |
| log.info("copying archive %s to DFS %s ..." % (archiveFile, destFile)) |
| |
| hadoopCmd = conf['hadoop-command-string'] |
| if conf['pkgs']: |
| hadoopCmd = os.path.join(conf['pkgs'], 'bin', 'hadoop') |
| |
| copyCommand = "%s dfs -fs %s -copyFromLocal %s %s" % (hadoopCmd, |
| hdfsURIMatch.group(1), archiveFile, destFile) |
| |
| log.debug(copyCommand) |
| |
| copyThread = simpleCommand('hadoop', copyCommand) |
| copyThread.start() |
| copyThread.wait() |
| copyThread.join() |
| log.debug(pprint.pformat(copyThread.output())) |
| |
| os.unlink(archiveFile) |
| |
| def unpack(): |
| parser = OptionParser() |
| option_list=["--log-destination-uri", "--hadoop-log-dirs", \ |
| "--temp-dir", "--hadoop-command-string", "--pkgs", "--user-id", \ |
| "--service-id", "--hodring-debug", "--hodring-log-dir", \ |
| "--hodring-syslog-address", "--hodring-cleanup-list", \ |
| "--jt-pid", "--mr-sys-dir", "--fs-name", "--hadoop-path"] |
| regexp = re.compile("^--") |
| for opt in option_list: |
| parser.add_option(opt,dest=regexp.sub("",opt),action="store") |
| option_list.append("--hodring-stream") |
| parser.add_option("--hodring-stream",dest="hodring-stream",metavar="bool",\ |
| action="store_true") |
| (options, args) = parser.parse_args() |
| _options= {} |
| _options['hodring'] = {} |
| for opt in dir(options): |
| if "--"+opt in option_list: |
| _options[opt] = getattr(options,opt) |
| if _options.has_key('hadoop-log-dirs') and _options['hadoop-log-dirs']: |
| _options['hadoop-log-dirs'] = _options['hadoop-log-dirs'].split(",") |
| if _options.has_key('hodring-syslog-address') and _options['hodring-syslog-address']: |
| _options['hodring']['syslog-address'] = \ |
| _options['hodring-syslog-address'].split(':') |
| _options['hodring']['debug'] = int(_options['hodring-debug']) |
| _options['hodring']['log-dir'] = _options['hodring-log-dir'] |
| _options['hodring']['stream'] = _options['hodring-stream'] |
| _options['hodring']['userid'] = _options['user-id'] |
| os.putenv('PBS_JOBID', _options['service-id'] ) |
| return _options |
| |
| if __name__ == '__main__': |
| log = None |
| try: |
| conf = unpack() |
| # Use the same log as hodring |
| log = getLogger(conf['hodring'],'hodring') |
| log.debug("Logger initialised successfully") |
| mrSysDirManager = createMRSystemDirectoryManager(conf, log) |
| if mrSysDirManager is not None: |
| mrSysDirManager.removeMRSystemDirectory() |
| |
| status = __archive_logs(conf,log) |
| log.info("Archive status : %s" % status) |
| list = conf['hodring-cleanup-list'].split(',') |
| log.info("now removing %s" % list) |
| for dir in list: |
| if os.path.exists(dir): |
| log.debug('removing %s' % (dir)) |
| shutil.rmtree(dir, True) |
| log.debug("done") |
| log.info("Cleanup successfully completed") |
| except Exception, e: |
| if log: |
| log.info("Stack trace:\n%s\n%s" %(get_exception_error_string(),get_exception_string())) |