blob: 33f145baab34ece28de27a8663ee01f8afae4410 [file] [log] [blame]
#Licensed to the Apache Software Foundation (ASF) under one
#or more contributor license agreements. See the NOTICE file
#distributed with this work for additional information
#regarding copyright ownership. The ASF licenses this file
#to you under the Apache License, Version 2.0 (the
#"License"); you may not use this file except in compliance
#with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
import os, re, time
from hodlib.Common.threads import loop, func
from hodlib.Common.threads import simpleCommand
from hodlib.Common.util import get_exception_string, hadoopVersion
class HadoopJobStatus:
"""This class represents the status of a single Hadoop job"""
def __init__(self, jobId, status):
self.__jobId = jobId
self.__status = status
def getJobId(self):
return self.__jobId
def getStatus(self):
return self.__status
class HadoopClientException(Exception):
"""This class represents an exception that is raised when we fail in
running the job client."""
def __init__(self, errorCode):
self.errorCode = errorCode
class JobTrackerMonitor:
"""This class monitors the JobTracker of an allocated cluster
periodically to detect whether it is idle. If it is found
to be idle for more than a configured limit, it calls back
registered handlers who can act upon the idle cluster."""
def __init__(self, log, idleJTHandler, interval, limit,
hadoopDir, javaHome, servInfoProvider):
self.__log = log
self.__idlenessLimit = limit
self.__idleJobTrackerHandler = idleJTHandler
self.__hadoopDir = hadoopDir
hadoopPath = os.path.join(self.__hadoopDir, "bin", "hadoop")
#hadoop directory can be from pkgs or a temp location like tarball. Verify once.
if not os.path.exists(hadoopPath):
raise Exception('Invalid Hadoop path specified: %s' % hadoopPath)
self.__javaHome = javaHome
# Note that when this object is created, we don't yet know the JT URL.
# The service info provider will be polled until we get the URL.
self.__serviceInfoProvider = servInfoProvider
self.__jobCountRegExp = re.compile("([0-9]+) jobs currently running.*")
self.__jobStatusRegExp = re.compile("(\S+)\s+(\d)\s+\d+\s+\S+$")
self.__firstIdleTime = 0
self.__hadoop15Version = { 'major' : '0', 'minor' : '15' }
#Assumption: we are not going to support versions older than 0.15 for Idle Job tracker.
if not self.__isCompatibleHadoopVersion(self.__hadoop15Version):
raise Exception('Incompatible Hadoop Version: Cannot check status')
self.__stopFlag = False
self.__jtURLFinderThread = func(name='JTURLFinderThread', functionRef=self.getJobTrackerURL)
self.__jtMonitorThread = loop(name='JTMonitorThread', functionRef=self.monitorJobTracker,
sleep=interval)
self.__jobTrackerURL = None
def start(self):
"""This method starts a thread that will determine the JobTracker URL"""
self.__jtURLFinderThread.start()
def stop(self):
self.__log.debug('Joining the monitoring thread.')
self.__stopFlag = True
if self.__jtMonitorThread.isAlive():
self.__jtMonitorThread.join()
self.__log.debug('Joined the monitoring thread.')
def getJobTrackerURL(self):
"""This method periodically checks the service info provider for the JT URL"""
self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred')
while not self.__stopFlag and not self.__isValidJobTrackerURL():
time.sleep(10)
if not self.__stopFlag:
self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred')
else:
break
if self.__isValidJobTrackerURL():
self.__log.debug('Got URL %s. Starting monitoring' % self.__jobTrackerURL)
self.__jtMonitorThread.start()
def monitorJobTracker(self):
"""This method is periodically called to monitor the JobTracker of the cluster."""
try:
if self.__isIdle():
if self.__idleJobTrackerHandler:
self.__log.info('Detected cluster as idle. Calling registered callback handler.')
self.__idleJobTrackerHandler.handleIdleJobTracker()
except:
self.__log.debug('Exception while monitoring job tracker. %s' % get_exception_string())
def getJobsStatus(self):
"""This method should return the status of all jobs that are run on the HOD allocated
hadoop cluster"""
jobStatusList = []
try:
hadoop16Version = { 'major' : '0', 'minor' : '16' }
if self.__isCompatibleHadoopVersion(hadoop16Version):
jtStatusCommand = self.__initStatusCommand(option='-list all')
jtStatusCommand.start()
jtStatusCommand.wait()
jtStatusCommand.join()
if jtStatusCommand.exit_code() == 0:
for line in jtStatusCommand.output():
jobStatus = self.__extractJobStatus(line)
if jobStatus is not None:
jobStatusList.append(jobStatus)
except:
self.__log.debug('Exception while getting job statuses. %s' % get_exception_string())
return jobStatusList
def __isValidJobTrackerURL(self):
"""This method checks that the passed in URL is not one of the special case strings
returned by the getServiceAddr API"""
return ((self.__jobTrackerURL != None) and (self.__jobTrackerURL != 'not found') \
and (not self.__jobTrackerURL.startswith('Error')))
def __extractJobStatus(self, line):
"""This method parses an output line from the job status command and creates
the JobStatus object if there is a match"""
jobStatus = None
line = line.strip()
jsMatch = self.__jobStatusRegExp.match(line)
if jsMatch:
jobStatus = HadoopJobStatus(jsMatch.group(1), int(jsMatch.group(2)))
return jobStatus
def __isIdle(self):
"""This method checks if the JobTracker is idle beyond a certain limit."""
jobCount = 0
err = False
try:
jobCount = self.__getJobCount()
except HadoopClientException, hce:
self.__log.debug('HadoopClientException handled in getting job count. \
Error code: %s' % hce.errorCode)
err = True
if (jobCount==0) or err:
if self.__firstIdleTime == 0:
#detecting idleness for the first time
self.__firstIdleTime = time.time()
else:
if ((time.time()-self.__firstIdleTime) >= self.__idlenessLimit):
self.__log.info('Idleness limit crossed for cluster')
return True
else:
# reset idleness time
self.__firstIdleTime = 0
return False
def __getJobCount(self):
"""This method executes the hadoop job -list command and parses the output to detect
the number of running jobs."""
# We assume here that the poll interval is small enough to detect running jobs.
# If jobs start and stop within the poll interval, the cluster would be incorrectly
# treated as idle. Hadoop 2266 will provide a better mechanism than this.
jobs = -1
jtStatusCommand = self.__initStatusCommand()
jtStatusCommand.start()
jtStatusCommand.wait()
jtStatusCommand.join()
if jtStatusCommand.exit_code() == 0:
for line in jtStatusCommand.output():
match = self.__jobCountRegExp.match(line)
if match:
jobs = int(match.group(1))
elif jtStatusCommand.exit_code() == 1:
# for now, exit code 1 comes for any exception raised by JobClient. If hadoop gets
# to differentiate and give more granular exit codes, we can check for those errors
# corresponding to network errors etc.
raise HadoopClientException(jtStatusCommand.exit_code())
return jobs
def __isCompatibleHadoopVersion(self, expectedVersion):
"""This method determines whether the version of hadoop being used is one that
is higher than the expectedVersion.
This can be used for checking if a particular feature is available or not"""
ver = hadoopVersion(self.__hadoopDir, self.__javaHome, self.__log)
ret = False
if (ver['major']!=None) and (int(ver['major']) >= int(expectedVersion['major'])) \
and (ver['minor']!=None) and (int(ver['minor']) >= int(expectedVersion['minor'])):
ret = True
return ret
def __initStatusCommand(self, option="-list"):
"""This method initializes the command to run to check the JT status"""
cmd = None
hadoopPath = os.path.join(self.__hadoopDir, 'bin', 'hadoop')
cmdStr = "%s job -jt %s" % (hadoopPath, self.__jobTrackerURL)
cmdStr = "%s %s" % (cmdStr, option)
self.__log.debug('cmd str %s' % cmdStr)
env = os.environ
env['JAVA_HOME'] = self.__javaHome
cmd = simpleCommand('HadoopStatus', cmdStr, env)
return cmd