blob: c89a9c9efd4eb6bc8e2ccc18c235f64299f39cf7 [file] [log] [blame]
#!/bin/sh
#Licensed to the Apache Software Foundation (ASF) under one
#or more contributor license agreements. See the NOTICE file
#distributed with this work for additional information
#regarding copyright ownership. The ASF licenses this file
#to you under the Apache License, Version 2.0 (the
#"License"); you may not use this file except in compliance
#with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
""":"
work_dir=$(dirname $0)
base_name=$(basename $0)
cd $work_dir
if [ $HOD_PYTHON_HOME ]; then
exec $HOD_PYTHON_HOME -OO -u $base_name ${1+"$@"}
elif [ -e /usr/bin/python ]; then
exec /usr/bin/python -OO -u $base_name ${1+"$@"}
elif [ -e /usr/local/bin/python ]; then
exec /usr/local/bin/python -OO -u $base_name ${1+"$@"}
else
exec python -OO -u $base_name ${1+"$@"}
fi
":"""
from os import popen3
import os, sys
import re
import time
from datetime import datetime
from optparse import OptionParser
myName = os.path.basename(sys.argv[0])
myName = re.sub(".*/", "", myName)
reVersion = re.compile(".*(\d+_\d+).*")
VERSION = '$HeadURL$'
reMatch = reVersion.match(VERSION)
if reMatch:
VERSION = reMatch.group(1)
VERSION = re.sub("_", ".", VERSION)
else:
VERSION = 'DEV'
options = ( {'short' : "-p",
'long' : "--package",
'type' : "string",
'action' : "store",
'dest' : "package",
'metavar' : " ",
'default' : 'hadoop',
'help' : "Bin file for hadoop"},
{'short' : "-d",
'long' : "--days",
'type' : "int",
'action' : "store",
'dest' : "days",
'metavar' : " ",
'default' : 7,
'help' : "Number of days before logs are deleted"},
{'short' : "-c",
'long' : "--config",
'type' : "string",
'action' : "store",
'dest' : "config",
'metavar' : " ",
'default' : None,
'help' : "config directory for hadoop"},
{'short' : "-l",
'long' : "--logs",
'type' : "string",
'action' : "store",
'dest' : "log",
'metavar' : " ",
'default' : "/user",
'help' : "directory prefix under which logs are stored per user"},
{'short' : "-n",
'long' : "--dynamicdfs",
'type' : "string",
'action' : "store",
'dest' : "dynamicdfs",
'metavar' : " ",
'default' : "false",
'help' : "'true', if the cluster is used to bring up dynamic dfs clusters, 'false' otherwise"},
{'short' : "-r",
'long' : "--retain-master-logs",
'type' : "string",
'action' : "store",
'dest' : "retain_masters_logs",
'metavar' : " ",
'default' : "false",
'help' : "'true' if the logs of the masters(jobtracker and namenode if '--dynamicdfs' is set) have to be retained, 'false' if everything has to be removed"}
)
def getDfsCommand(options, args):
if (options.config == None):
cmd = options.package + " " + "dfs " + args
else:
cmd = options.package + " " + "--config " + options.config + " dfs " + args
return cmd
def runcondense():
import shutil
options = process_args()
# if the retain-master-logs option is true, we do not delete
# the jobtracker, and in case of dynamic dfs, namenode logs.
# else, we delete the entire job directory, as nothing other
# than master and slave log files should be under the hod-logs
# directory.
filteredNames = [] # logs to skip while deleting
deletedNamePrefixes = [] # logs prefixes to delete.
if options.retain_masters_logs == 'true':
filteredNames = ['jobtracker']
deletedNamePrefixes = ['*-tasktracker-*']
if options.dynamicdfs == 'true' and options.retain_masters_logs == 'true':
filteredNames.append('namenode')
deletedNamePrefixes.append('*-datanode-*')
filepath = '%s/\*/hod-logs/' % (options.log)
cmd = getDfsCommand(options, "-lsr " + filepath)
(stdin, stdout, stderr) = popen3(cmd)
lastjobid = 'none'
toPurge = { }
for line in stdout:
try:
m = re.match("^.*\s(.*)\n$", line)
filename = m.group(1)
# file name format: <prefix>/<user>/hod-logs/<jobid>/[0-9]*-[jobtracker|tasktracker|datanode|namenode|]-hostname-YYYYMMDDtime-random.tar.gz
# first strip prefix:
if filename.startswith(options.log):
filename = filename[len(options.log):]
if not filename.startswith('/'):
filename = '/' + filename
else:
continue
# Now get other details from filename.
k = re.match("/(.*)/hod-logs/(.*)/.*-.*-([0-9][0-9][0-9][0-9])([0-9][0-9])([0-9][0-9]).*$", filename)
if k:
username = k.group(1)
jobid = k.group(2)
datetimefile = datetime(int(k.group(3)), int(k.group(4)), int(k.group(5)))
datetimenow = datetime.utcnow()
diff = datetimenow - datetimefile
filedate = k.group(3) + k.group(4) + k.group(5)
newdate = datetimenow.strftime("%Y%m%d")
print "%s %s %s %d" % (filename, filedate, newdate, diff.days)
# if the cluster is used to bring up dynamic dfs, we must also leave NameNode logs.
foundFilteredName = False
for name in filteredNames:
if filename.find(name) >= 0:
foundFilteredName = True
break
if foundFilteredName:
continue
if (diff.days > options.days):
desttodel = filename
if not toPurge.has_key(jobid):
toPurge[jobid] = options.log.rstrip("/") + "/" + username + "/hod-logs/" + jobid
except Exception, e:
print >> sys.stderr, e
for job in toPurge.keys():
try:
if options.retain_masters_logs == 'false':
# delete entire job-id directory.
cmd = getDfsCommand(options, "-rmr " + toPurge[job])
print cmd
ret = os.system(cmd)
if (ret != 0):
print >> sys.stderr, "Command failed to delete job directory " + cmd
else:
# delete only the prefixes we're interested in.
for prefix in deletedNamePrefixes:
cmd = getDfsCommand(options, "-rm " + toPurge[job] + '/' + prefix)
print cmd
ret = os.system(cmd)
if (ret != 0):
print >> sys.stderr, "Command failed to delete file " + cmd
except Exception, e:
print >> sys.stderr, e
def process_args():
global options, myName, VERSION
usage = "usage: %s <ARGS>" % (myName)
version = "%s %s" % (myName, VERSION)
argParser = OptionParser(usage=usage, version=VERSION)
for option_element in options:
argParser.add_option(option_element['short'], option_element['long'],
type=option_element['type'], action=option_element['action'],
dest=option_element['dest'], default=option_element['default'],
metavar=option_element['metavar'], help=option_element['help'])
(parsedOptions, args) = argParser.parse_args()
if not os.path.exists(parsedOptions.package):
argParser.error("Could not find path to hadoop binary: %s" % parsedOptions.package)
if not os.path.exists(parsedOptions.config):
argParser.error("Could not find config: %s" % parsedOptions.config)
if parsedOptions.days <= 0:
argParser.error("Invalid number of days specified, must be > 0: %s" % parsedOptions.config)
if parsedOptions.dynamicdfs!='true' and parsedOptions.dynamicdfs!='false':
argParser.error("Invalid option for dynamicdfs, must be true or false: %s" % parsedOptions.dynamicdfs)
return parsedOptions
if __name__ == '__main__':
runcondense()