blob: a289d95d75d9b6ba4f28fd0bc88641db9ed95ccd [file] [log] [blame]
#Licensed to the Apache Software Foundation (ASF) under one
#or more contributor license agreements. See the NOTICE file
#distributed with this work for additional information
#regarding copyright ownership. The ASF licenses this file
#to you under the Apache License, Version 2.0 (the
#"License"); you may not use this file except in compliance
#with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
#!/usr/bin/env python
"""manages services and nodepool"""
# -*- python -*-
import os, sys, random, time, sets, shutil, threading
import urllib, urlparse, re, getpass, pprint, signal, shutil
from pprint import pformat
from HTMLParser import HTMLParser
binfile = sys.path[0]
libdir = os.path.dirname(binfile)
sys.path.append(libdir)
import hodlib.Common.logger
from hodlib.RingMaster.idleJobTracker import JobTrackerMonitor, HadoopJobStatus
from hodlib.Common.threads import func
from hodlib.Hod.nodePool import *
from hodlib.Common.util import *
from hodlib.Common.nodepoolutil import NodePoolUtil
from hodlib.Common.socketServers import hodXMLRPCServer
from hodlib.Common.socketServers import threadedHTTPServer
from hodlib.NodePools import *
from hodlib.NodePools.torque import *
from hodlib.GridServices import *
from hodlib.Common.descGenerator import *
from hodlib.Common.xmlrpc import hodXRClient
from hodlib.Common.miniHTMLParser import miniHTMLParser
from hodlib.Common.threads import simpleCommand
class ringMasterServer:
"""The RPC server that exposes all the master config
changes. Also, one of these RPC servers runs as a proxy
and all the hodring instances register with this proxy"""
instance = None
xmlrpc = None
def __init__(self, cfg, log, logMasterSources, retry=5):
try:
from hodlib.Common.socketServers import twistedXMLRPCServer
ringMasterServer.xmlrpc = twistedXMLRPCServer("",
cfg['ringmaster']['xrs-port-range'])
except ImportError:
log.info("Twisted interface not found. Using hodXMLRPCServer.")
ringMasterServer.xmlrpc = hodXMLRPCServer("",
cfg['ringmaster']['xrs-port-range'])
ringMasterServer.xmlrpc.register_instance(logMasterSources)
self.logMasterSources = logMasterSources
ringMasterServer.xmlrpc.serve_forever()
while not ringMasterServer.xmlrpc.is_alive():
time.sleep(.5)
log.debug('Ringmaster RPC Server at %d' %
ringMasterServer.xmlrpc.server_address[1])
def startService(ss, cfg, np, log, rm):
logMasterSources = _LogMasterSources(ss, cfg, np, log, rm)
ringMasterServer.instance = ringMasterServer(cfg, log, logMasterSources)
def stopService():
ringMasterServer.xmlrpc.stop()
def getPort():
return ringMasterServer.instance.port
def getAddress():
return 'http://%s:%d/' % (socket.gethostname(),
ringMasterServer.xmlrpc.server_address[1])
startService = staticmethod(startService)
stopService = staticmethod(stopService)
getPort = staticmethod(getPort)
getAddress = staticmethod(getAddress)
class _LogMasterSources:
"""All the methods that are run by the RPC server are
added into this class """
def __init__(self, serviceDict, cfg, np, log, rm):
self.serviceDict = serviceDict
self.tarSource = []
self.tarSourceLock = threading.Lock()
self.dict = {}
self.count = {}
self.logsourceList = []
self.logsourceListLock = threading.Lock()
self.masterParam = []
self.masterParamLock = threading.Lock()
self.verify = 'none'
self.cmdLock = threading.Lock()
self.cfg = cfg
self.log = log
self.np = np
self.rm = rm
self.hdfsHost = None
self.mapredHost = None
self.maxconnect = self.cfg['ringmaster']['max-connect']
self.log.debug("Using max-connect value %s"%self.maxconnect)
def registerTarSource(self, hostname, url, addr=None):
self.log.debug("registering: " + url)
lock = self.tarSourceLock
lock.acquire()
self.dict[url] = url
self.count[url] = 0
# addr is None when ringMaster himself invokes this method
if addr:
c = self.count[addr]
self.count[addr] = c - 1
lock.release()
if addr:
str = "%s is done" % (addr)
self.log.debug(str)
return url
def getTarList(self,hodring): # this looks useful
lock = self.tarSourceLock
lock.acquire()
leastkey = None
leastval = -1
for k, v in self.count.iteritems():
if (leastval == -1):
leastval = v
pass
if (v <= leastval and v < self.maxconnect):
leastkey = k
leastval = v
if (leastkey == None):
url = 'none'
else:
url = self.dict[leastkey]
self.count[leastkey] = leastval + 1
self.log.debug("%s %d" % (leastkey, self.count[leastkey]))
lock.release()
self.log.debug('sending url ' + url+" to "+hodring) # this looks useful
return url
def tarDone(self, uri):
str = "%s is done" % (uri)
self.log.debug(str)
lock = self.tarSourceLock
lock.acquire()
c = self.count[uri]
self.count[uri] = c - 1
lock.release()
return uri
def status(self):
return True
# FIXME: this code is broken, it relies on a central service registry
#
# def clusterStart(self, changedClusterParams=[]):
# self.log.debug("clusterStart method invoked.")
# self.dict = {}
# self.count = {}
# try:
# if (len(changedClusterParams) > 0):
# self.log.debug("Updating config.")
# for param in changedClusterParams:
# (key, sep1, val) = param.partition('=')
# (i1, sep2, i2) = key.partition('.')
# try:
# prev = self.cfg[i1][i2]
# self.rm.cfg[i1][i2] = val
# self.cfg[i1][i2] = val
# self.log.debug("\nModified [%s][%s]=%s to [%s][%s]=%s" % (i1, i2, prev, i1, i2, val))
# except KeyError, e:
# self.log.info("Skipping %s as no such config parameter found in ringmaster" % param)
# self.log.debug("Regenerating Service Description.")
# dGen = DescGenerator(self.rm.cfg)
# self.rm.cfg['servicedesc'] = dGen.createServiceDescDict()
# self.cfg['servicedesc'] = self.rm.cfg['servicedesc']
#
# self.rm.tar = None
# if self.rm.cfg['ringmaster'].has_key('hadoop-tar-ball'):
# self.rm.download = True
# self.rm.tar = self.rm.cfg['ringmaster']['hadoop-tar-ball']
# self.log.debug("self.rm.tar=%s" % self.rm.tar)
#
# self.rm.cd_to_tempdir()
#
# self.rm.tarAddress = None
# hostname = socket.gethostname()
# if (self.rm.download):
# self.rm.basename = os.path.basename(self.rm.tar)
# dest = os.path.join(os.getcwd(), self.rm.basename)
# src = self.rm.tar
# self.log.debug("cp %s -> %s" % (src, dest))
# shutil.copy(src, dest)
# self.rm.tarAddress = "%s%s" % (self.rm.httpAddress, self.rm.basename)
# self.registerTarSource(hostname, self.rm.tarAddress)
# self.log.debug("Registered new tarAddress %s" % self.rm.tarAddress)
# else:
# self.log.debug("Download not set.")
#
# if (self.rm.tar != None):
# self.cfg['hodring']['download-addr'] = self.rm.tarAddress
# self.rm.cfg['hodring']['download-addr'] = self.rm.tarAddress
#
# sdl = self.rm.cfg['servicedesc']
# workDirs = self.rm.getWorkDirs(self.rm.cfg, True)
# hdfsDesc = sdl['hdfs']
# hdfs = None
# if hdfsDesc.isExternal():
# hdfs = HdfsExternal(hdfsDesc, workDirs)
# else:
# hdfs = Hdfs(hdfsDesc, workDirs, 0, False, True)
#
# self.rm.serviceDict[hdfs.getName()] = hdfs
# mrDesc = sdl['mapred']
# mr = None
# if mrDesc.isExternal():
# mr = MapReduceExternal(mrDesc, workDirs)
# else:
# mr = MapReduce(mrDesc, workDirs, 1)
# self.rm.serviceDict[mr.getName()] = mr
#
# ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'],
# self.np.getServiceId(), 'hodring', 'hod')
#
# slaveList = ringList
# hdfsringXRAddress = None
# # Start HDFS Master - Step 1
# if not hdfsDesc.isExternal():
# masterFound = False
# for ring in ringList:
# ringXRAddress = ring['xrs']
# if ringXRAddress == None:
# raise Exception("Could not get hodring XML-RPC server address.")
# if (ringXRAddress.find(self.hdfsHost) != -1):
# ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)
# hdfsringXRAddress = ringXRAddress
# self.log.debug("Invoking clusterStart on " + ringXRAddress + " (HDFS Master)")
# ringClient.clusterStart()
# masterFound = True
# slaveList.remove(ring)
# break
# if not masterFound:
# raise Exception("HDFS Master host not found")
# while hdfs.getInfoAddrs() == None:
# self.log.debug("Waiting for HDFS Master (Name Node) to register dfs.info.port")
# time.sleep(1)
#
# # Start MAPRED Master - Step 2
# if not mrDesc.isExternal():
# masterFound = False
# for ring in ringList:
# ringXRAddress = ring['xrs']
# if ringXRAddress == None:
# raise Exception("Could not get hodring XML-RPC server address.")
# if (not mrDesc.isExternal() and ringXRAddress.find(self.mapredHost) != -1):
# ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)
# self.log.debug("Invoking clusterStart on " + ringXRAddress + " (MAPRED Master)")
# ringClient.clusterStart()
# masterFound = True
# slaveList.remove(ring)
# break
# if not masterFound:
# raise Excpetion("MAPRED Master host not found")
# while mr.getInfoAddrs() == None:
# self.log.debug("Waiting for MAPRED Master (Job Tracker) to register \
# mapred.job.tracker.info.port")
# time.sleep(1)
#
# # Start Slaves - Step 3
# for ring in slaveList:
# ringXRAddress = ring['xrs']
# if ringXRAddress == None:
# raise Exception("Could not get hodring XML-RPC server address.")
# ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)
# self.log.debug("Invoking clusterStart on " + ringXRAddress + " (Slaves)")
# ringThread = func(name='hodring_slaves_start', functionRef=ringClient.clusterStart())
# ring['thread'] = ringThread
# ringThread.start()
#
# for ring in slaveList:
# ringThread = ring['thread']
# if ringThread == None:
# raise Exception("Could not get hodring thread (Slave).")
# ringThread.join()
# self.log.debug("Completed clusterStart on " + ring['xrs'] + " (Slave)")
#
# # Run Admin Commands on HDFS Master - Step 4
# if not hdfsDesc.isExternal():
# if hdfsringXRAddress == None:
# raise Exception("HDFS Master host not found (to Run Admin Commands)")
# ringClient = hodXRClient(hdfsringXRAddress, None, None, 0, 0, 0, False, 0)
# self.log.debug("Invoking clusterStart(False) - Admin on "
# + hdfsringXRAddress + " (HDFS Master)")
# ringClient.clusterStart(False)
#
# except:
# self.log.debug(get_exception_string())
# return False
#
# self.log.debug("Successfully started cluster.")
# return True
#
# def clusterStop(self):
# self.log.debug("clusterStop method invoked.")
# try:
# hdfsAddr = self.getServiceAddr('hdfs')
# if hdfsAddr.find(':') != -1:
# h, p = hdfsAddr.split(':', 1)
# self.hdfsHost = h
# self.log.debug("hdfsHost: " + self.hdfsHost)
# mapredAddr = self.getServiceAddr('mapred')
# if mapredAddr.find(':') != -1:
# h, p = mapredAddr.split(':', 1)
# self.mapredHost = h
# self.log.debug("mapredHost: " + self.mapredHost)
# ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'],
# self.np.getServiceId(),
# 'hodring', 'hod')
# for ring in ringList:
# ringXRAddress = ring['xrs']
# if ringXRAddress == None:
# raise Exception("Could not get hodring XML-RPC server address.")
# ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False)
# self.log.debug("Invoking clusterStop on " + ringXRAddress)
# ringThread = func(name='hodring_stop', functionRef=ringClient.clusterStop())
# ring['thread'] = ringThread
# ringThread.start()
#
# for ring in ringList:
# ringThread = ring['thread']
# if ringThread == None:
# raise Exception("Could not get hodring thread.")
# ringThread.join()
# self.log.debug("Completed clusterStop on " + ring['xrs'])
#
# except:
# self.log.debug(get_exception_string())
# return False
#
# self.log.debug("Successfully stopped cluster.")
#
# return True
def getCommand(self, addr):
"""This method is called by the
hodrings to get commands from
the ringmaster"""
lock = self.cmdLock
cmdList = []
lock.acquire()
try:
try:
for v in self.serviceDict.itervalues():
if (not v.isExternal()):
if v.isLaunchable(self.serviceDict):
# If a master is still not launched, or the number of
# retries for launching master is not reached,
# launch master
if not v.isMasterLaunched() and \
(v.getMasterFailureCount() <= \
self.cfg['ringmaster']['max-master-failures']):
cmdList = v.getMasterCommands(self.serviceDict)
v.setlaunchedMaster()
v.setMasterAddress(addr)
break
if cmdList == []:
for s in self.serviceDict.itervalues():
if (not v.isExternal()):
if s.isMasterInitialized():
cl = s.getWorkerCommands(self.serviceDict)
cmdList.extend(cl)
else:
cmdList = []
break
except:
self.log.debug(get_exception_string())
finally:
lock.release()
pass
cmd = addr + pformat(cmdList)
self.log.debug("getCommand returning " + cmd)
return cmdList
def getAdminCommand(self, addr):
"""This method is called by the
hodrings to get admin commands from
the ringmaster"""
lock = self.cmdLock
cmdList = []
lock.acquire()
try:
try:
for v in self.serviceDict.itervalues():
cmdList = v.getAdminCommands(self.serviceDict)
if cmdList != []:
break
except Exception, e:
self.log.debug(get_exception_string())
finally:
lock.release()
pass
cmd = addr + pformat(cmdList)
self.log.debug("getAdminCommand returning " + cmd)
return cmdList
def addMasterParams(self, addr, vals):
"""This method is called by
hodring to update any parameters
its changed for the commands it was
running"""
self.log.debug('Comment: adding master params from %s' % addr)
self.log.debug(pformat(vals))
lock = self.masterParamLock
lock.acquire()
try:
for v in self.serviceDict.itervalues():
if v.isMasterLaunched():
if (v.getMasterAddress() == addr):
v.setMasterParams(vals)
v.setMasterInitialized()
except:
self.log.debug(get_exception_string())
pass
lock.release()
return addr
def setHodRingErrors(self, addr, errors):
"""This method is called by the hodrings to update errors
it encountered while starting up"""
self.log.critical("Hodring at %s failed with following errors:\n%s" \
% (addr, errors))
lock = self.masterParamLock
lock.acquire()
try:
for v in self.serviceDict.itervalues():
if v.isMasterLaunched():
if (v.getMasterAddress() == addr):
# strip the PID part.
idx = addr.rfind('_')
if idx is not -1:
addr = addr[:idx]
v.setMasterFailed("Hodring at %s failed with following" \
" errors:\n%s" % (addr, errors))
except:
self.log.debug(get_exception_string())
pass
lock.release()
return True
def getKeys(self):
lock= self.masterParamLock
lock.acquire()
keys = self.serviceDict.keys()
lock.release()
return keys
def getServiceAddr(self, name):
addr = 'not found'
self.log.debug("getServiceAddr name: %s" % name)
lock= self.masterParamLock
lock.acquire()
try:
service = self.serviceDict[name]
except KeyError:
pass
else:
self.log.debug("getServiceAddr service: %s" % service)
# Check if we should give up ! If the limit on max failures is hit,
# give up.
err = service.getMasterFailed()
if (err is not None) and \
(service.getMasterFailureCount() > \
self.cfg['ringmaster']['max-master-failures']):
self.log.critical("Detected errors (%s) beyond allowed number"\
" of failures (%s). Flagging error to client" \
% (service.getMasterFailureCount(), \
self.cfg['ringmaster']['max-master-failures']))
addr = "Error: " + err
elif (service.isMasterInitialized()):
addr = service.getMasterAddrs()[0]
else:
addr = 'not found'
lock.release()
self.log.debug("getServiceAddr addr %s: %s" % (name, addr))
return addr
def getURLs(self, name):
addr = 'none'
lock = self.masterParamLock
lock.acquire()
try:
service = self.serviceDict[name]
except KeyError:
pass
else:
if (service.isMasterInitialized()):
addr = service.getInfoAddrs()[0]
lock.release()
return addr
def stopRM(self):
"""An XMLRPC call which will spawn a thread to stop the Ringmaster program."""
# We spawn a thread here because we want the XMLRPC call to return. Calling
# stop directly from here will also stop the XMLRPC server.
try:
self.log.debug("inside xml-rpc call to stop ringmaster")
rmStopperThread = func('RMStopper', self.rm.stop)
rmStopperThread.start()
self.log.debug("returning from xml-rpc call to stop ringmaster")
return True
except:
self.log.debug("Exception in stop: %s" % get_exception_string())
return False
class RingMaster:
def __init__(self, cfg, log, **kwds):
"""starts nodepool and services"""
self.download = False
self.httpServer = None
self.cfg = cfg
self.log = log
self.__hostname = local_fqdn()
self.workDirs = None
# ref to the idle job tracker object.
self.__jtMonitor = None
self.__idlenessDetected = False
self.__stopInProgress = False
self.__isStopped = False # to let main exit
self.__exitCode = 0 # exit code with which the ringmaster main method should return
self.workers_per_ring = self.cfg['ringmaster']['workers_per_ring']
self.__initialize_signal_handlers()
sdd = self.cfg['servicedesc']
gsvc = None
for key in sdd:
gsvc = sdd[key]
break
npd = self.cfg['nodepooldesc']
self.np = NodePoolUtil.getNodePool(npd, cfg, log)
self.log.debug("Getting service ID.")
self.serviceId = self.np.getServiceId()
self.log.debug("Got service ID: %s" % self.serviceId)
self.tarSrcLoc = None
if self.cfg['ringmaster'].has_key('hadoop-tar-ball'):
self.download = True
self.tarSrcLoc = self.cfg['ringmaster']['hadoop-tar-ball']
self.cd_to_tempdir()
if (self.download):
self.__copy_tarball(os.getcwd())
self.basename = self.__find_tarball_in_dir(os.getcwd())
if self.basename is None:
raise Exception('Did not find tarball copied from %s in %s.'
% (self.tarSrcLoc, os.getcwd()))
self.serviceAddr = to_http_url(self.cfg['ringmaster']['svcrgy-addr'])
self.log.debug("Service registry @ %s" % self.serviceAddr)
self.serviceClient = hodXRClient(self.serviceAddr)
self.serviceDict = {}
try:
sdl = self.cfg['servicedesc']
workDirs = self.getWorkDirs(cfg)
hdfsDesc = sdl['hdfs']
hdfs = None
# Determine hadoop Version
hadoopVers = hadoopVersion(self.__getHadoopDir(), \
self.cfg['hodring']['java-home'], self.log)
if (hadoopVers['major']==None) or (hadoopVers['minor']==None):
raise Exception('Could not retrive the version of Hadoop.'
+ ' Check the Hadoop installation or the value of the hodring.java-home variable.')
if hdfsDesc.isExternal():
hdfs = HdfsExternal(hdfsDesc, workDirs, version=int(hadoopVers['minor']))
hdfs.setMasterParams( self.cfg['gridservice-hdfs'] )
else:
hdfs = Hdfs(hdfsDesc, workDirs, 0, version=int(hadoopVers['minor']),
workers_per_ring = self.workers_per_ring)
self.serviceDict[hdfs.getName()] = hdfs
mrDesc = sdl['mapred']
mr = None
if mrDesc.isExternal():
mr = MapReduceExternal(mrDesc, workDirs, version=int(hadoopVers['minor']))
mr.setMasterParams( self.cfg['gridservice-mapred'] )
else:
mr = MapReduce(mrDesc, workDirs,1, version=int(hadoopVers['minor']),
workers_per_ring = self.workers_per_ring)
self.serviceDict[mr.getName()] = mr
except:
self.log.critical("Exception in creating Hdfs and Map/Reduce descriptor objects: \
%s." % get_exception_error_string())
self.log.debug(get_exception_string())
raise
# should not be starting these in a constructor
ringMasterServer.startService(self.serviceDict, cfg, self.np, log, self)
self.rpcserver = ringMasterServer.getAddress()
self.httpAddress = None
self.tarAddress = None
hostname = socket.gethostname()
if (self.download):
self.httpServer = threadedHTTPServer(hostname,
self.cfg['ringmaster']['http-port-range'])
self.httpServer.serve_forever()
self.httpAddress = "http://%s:%d/" % (self.httpServer.server_address[0],
self.httpServer.server_address[1])
self.tarAddress = "%s%s" % (self.httpAddress, self.basename)
ringMasterServer.instance.logMasterSources.registerTarSource(hostname,
self.tarAddress)
else:
self.log.debug("Download not set.")
self.log.debug("%s %s %s %s %s" % (self.cfg['ringmaster']['userid'],
self.serviceId, self.__hostname, 'ringmaster', 'hod'))
if self.cfg['ringmaster']['register']:
if self.httpAddress:
self.serviceClient.registerService(self.cfg['ringmaster']['userid'],
self.serviceId, self.__hostname, 'ringmaster', 'hod', {
'xrs' : self.rpcserver, 'http' : self.httpAddress })
else:
self.serviceClient.registerService(self.cfg['ringmaster']['userid'],
self.serviceId, self.__hostname, 'ringmaster', 'hod', {
'xrs' : self.rpcserver, })
self.log.debug("Registered with serivce registry: %s." % self.serviceAddr)
hodRingPath = os.path.join(cfg['ringmaster']['base-dir'], 'bin', 'hodring')
hodRingWorkDir = os.path.join(cfg['hodring']['temp-dir'], 'hodring' + '_'
+ getpass.getuser())
self.cfg['hodring']['hodring'] = [hodRingWorkDir,]
self.cfg['hodring']['svcrgy-addr'] = self.cfg['ringmaster']['svcrgy-addr']
self.cfg['hodring']['service-id'] = self.np.getServiceId()
self.cfg['hodring']['ringmaster-xrs-addr'] = self.__url_to_addr(self.rpcserver)
if (self.tarSrcLoc != None):
cfg['hodring']['download-addr'] = self.tarAddress
self.__init_job_tracker_monitor(ringMasterServer.instance.logMasterSources)
def __init_job_tracker_monitor(self, logMasterSources):
hadoopDir = self.__getHadoopDir()
self.log.debug('hadoopdir=%s, java-home=%s' % \
(hadoopDir, self.cfg['hodring']['java-home']))
try:
self.__jtMonitor = JobTrackerMonitor(self.log, self,
self.cfg['ringmaster']['jt-poll-interval'],
self.cfg['ringmaster']['idleness-limit'],
hadoopDir, self.cfg['hodring']['java-home'],
logMasterSources)
self.log.debug('starting jt monitor')
self.__jtMonitor.start()
except:
self.log.critical('Exception in running idle job tracker. This cluster cannot be deallocated if idle.\
Exception message: %s' % get_exception_error_string())
self.log.debug('Exception details: %s' % get_exception_string())
def __getHadoopDir(self):
hadoopDir = None
if self.cfg['ringmaster'].has_key('hadoop-tar-ball'):
tarFile = os.path.join(os.getcwd(), self.basename)
ret = untar(tarFile, os.getcwd())
if not ret:
raise Exception('Untarring tarfile %s to directory %s failed. Cannot find hadoop directory.' \
% (tarFile, os.getcwd()))
hadoopDir = os.path.join(os.getcwd(), self.__get_dir(tarFile))
else:
hadoopDir = self.cfg['gridservice-mapred']['pkgs']
self.log.debug('Returning Hadoop directory as: %s' % hadoopDir)
return hadoopDir
def __get_dir(self, name):
"""Return the root directory inside the tarball
specified by name. Assumes that the tarball begins
with a root directory."""
import tarfile
myTarFile = tarfile.open(name)
hadoopPackage = myTarFile.getnames()[0]
self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage))
return hadoopPackage
def __find_tarball_in_dir(self, dir):
"""Find the tarball among files specified in the given
directory. We need this method because how the tarball
source URI is given depends on the method of copy and
we can't get the tarball name from that.
This method will fail if there are multiple tarballs
in the directory with the same suffix."""
files = os.listdir(dir)
for file in files:
if self.tarSrcLoc.endswith(file):
return file
return None
def __copy_tarball(self, destDir):
"""Copy the hadoop tar ball from a remote location to the
specified destination directory. Based on the URL it executes
an appropriate copy command. Throws an exception if the command
returns a non-zero exit code."""
# for backwards compatibility, treat the default case as file://
url = ''
if self.tarSrcLoc.startswith('/'):
url = 'file:/'
src = '%s%s' % (url, self.tarSrcLoc)
if src.startswith('file://'):
src = src[len('file://')-1:]
cpCmd = '/bin/cp'
cmd = '%s %s %s' % (cpCmd, src, destDir)
self.log.debug('Command to execute: %s' % cmd)
copyProc = simpleCommand('remote copy', cmd)
copyProc.start()
copyProc.wait()
copyProc.join()
ret = copyProc.exit_code()
self.log.debug('Completed command execution. Exit Code: %s.' % ret)
if ret != 0:
output = copyProc.output()
raise Exception('Could not copy tarball using command %s. Exit code: %s. Output: %s'
% (cmd, ret, output))
else:
raise Exception('Unsupported URL for file: %s' % src)
# input: http://hostname:port/. output: [hostname,port]
def __url_to_addr(self, url):
addr = url.rstrip('/')
if addr.startswith('http://'):
addr = addr.replace('http://', '', 1)
addr_parts = addr.split(':')
return [addr_parts[0], int(addr_parts[1])]
def __initialize_signal_handlers(self):
def sigStop(sigNum, handler):
sig_wrapper(sigNum, self.stop)
signal.signal(signal.SIGTERM, sigStop)
signal.signal(signal.SIGINT, sigStop)
signal.signal(signal.SIGQUIT, sigStop)
def __clean_up(self):
tempDir = self.__get_tempdir()
os.chdir(os.path.split(tempDir)[0])
if os.path.exists(tempDir):
shutil.rmtree(tempDir, True)
self.log.debug("Cleaned up temporary dir: %s" % tempDir)
def __get_tempdir(self):
dir = os.path.join(self.cfg['ringmaster']['temp-dir'],
"%s.%s.ringmaster" % (self.cfg['ringmaster']['userid'],
self.np.getServiceId()))
return dir
def getWorkDirs(self, cfg, reUse=False):
if (not reUse) or (self.workDirs == None):
import math
frand = random.random()
while math.ceil(frand) != math.floor(frand):
frand = frand * 100
irand = int(frand)
uniq = '%s-%d-%s' % (socket.gethostname(), os.getpid(), irand)
dirs = []
parentDirs = cfg['ringmaster']['work-dirs']
for p in parentDirs:
dir = os.path.join(p, uniq)
dirs.append(dir)
self.workDirs = dirs
return self.workDirs
def _fetchLink(self, link, parentDir):
parser = miniHTMLParser()
self.log.debug("Checking link %s" %link)
while link:
# Get the file from the site and link
input = urllib.urlopen(link)
out = None
contentType = input.info().gettype()
isHtml = contentType == 'text/html'
#print contentType
if isHtml:
parser.setBaseUrl(input.geturl())
else:
parsed = urlparse.urlparse(link)
hp = parsed[1]
h = hp
p = None
if hp.find(':') != -1:
h, p = hp.split(':', 1)
path = parsed[2]
path = path.split('/')
file = os.path.join(parentDir, h, p)
for c in path:
if c == '':
continue
file = os.path.join(file, c)
try:
self.log.debug('Creating %s' % file)
dir, tail = os.path.split(file)
if not os.path.exists(dir):
os.makedirs(dir)
except:
self.log.debug(get_exception_string())
out = open(file, 'w')
bufSz = 8192
buf = input.read(bufSz)
while len(buf) > 0:
if isHtml:
# Feed the file into the HTML parser
parser.feed(buf)
if out:
out.write(buf)
buf = input.read(bufSz)
input.close()
if out:
out.close()
# Search the retfile here
# Get the next link in level traversal order
link = parser.getNextLink()
parser.close()
def _finalize(self):
try:
# FIXME: get dir from config
dir = 'HOD-log-P%d' % (os.getpid())
dir = os.path.join('.', dir)
except:
self.log.debug(get_exception_string())
self.np.finalize()
def handleIdleJobTracker(self):
self.log.critical("Detected idle job tracker for %s seconds. The allocation will be cleaned up." \
% self.cfg['ringmaster']['idleness-limit'])
self.__idlenessDetected = True
def cd_to_tempdir(self):
dir = self.__get_tempdir()
if not os.path.exists(dir):
os.makedirs(dir)
os.chdir(dir)
return dir
def getWorkload(self):
return self.workload
def getHostName(self):
return self.__hostname
def start(self):
"""run the thread main loop"""
self.log.debug("Entered start method.")
hodring = os.path.join(self.cfg['ringmaster']['base-dir'],
'bin', 'hodring')
largs = [hodring]
targs = self.cfg.get_args(section='hodring')
largs.extend(targs)
hodringCmd = ""
for item in largs:
hodringCmd = "%s%s " % (hodringCmd, item)
self.log.debug(hodringCmd)
if self.np.runWorkers(largs) > 0:
self.log.critical("Failed to start worker.")
self.log.debug("Returned from runWorkers.")
self._finalize()
def __findExitCode(self):
"""Determine the exit code based on the status of the cluster or jobs run on them"""
xmlrpcServer = ringMasterServer.instance.logMasterSources
if xmlrpcServer.getServiceAddr('hdfs') == 'not found' or \
xmlrpcServer.getServiceAddr('hdfs').startswith("Error: "):
self.__exitCode = 7
elif xmlrpcServer.getServiceAddr('mapred') == 'not found' or \
xmlrpcServer.getServiceAddr('mapred').startswith("Error: "):
self.__exitCode = 8
else:
clusterStatus = get_cluster_status(xmlrpcServer.getServiceAddr('hdfs'),
xmlrpcServer.getServiceAddr('mapred'))
if clusterStatus != 0:
self.__exitCode = clusterStatus
else:
self.__exitCode = self.__findHadoopJobsExitCode()
self.log.debug('exit code %s' % self.__exitCode)
def __findHadoopJobsExitCode(self):
"""Determine the consolidate exit code of hadoop jobs run on this cluster, provided
this information is available. Return 0 otherwise"""
ret = 0
failureStatus = 3
failureCount = 0
if self.__jtMonitor:
jobStatusList = self.__jtMonitor.getJobsStatus()
try:
if len(jobStatusList) > 0:
for jobStatus in jobStatusList:
self.log.debug('job status for %s: %s' % (jobStatus.getJobId(),
jobStatus.getStatus()))
if jobStatus.getStatus() == failureStatus:
failureCount = failureCount+1
if failureCount > 0:
if failureCount == len(jobStatusList): # all jobs failed
ret = 16
else:
ret = 17
except:
self.log.debug('exception in finding hadoop jobs exit code' % get_exception_string())
return ret
def stop(self):
self.log.debug("RingMaster stop method invoked.")
if self.__stopInProgress or self.__isStopped:
return
self.__stopInProgress = True
if ringMasterServer.instance is not None:
self.log.debug('finding exit code')
self.__findExitCode()
self.log.debug('stopping ringmaster instance')
ringMasterServer.stopService()
else:
self.__exitCode = 6
if self.__jtMonitor is not None:
self.__jtMonitor.stop()
if self.httpServer:
self.httpServer.stop()
self.__clean_up()
self.__isStopped = True
def shouldStop(self):
"""Indicates whether the main loop should exit, either due to idleness condition,
or a stop signal was received"""
return self.__idlenessDetected or self.__isStopped
def getExitCode(self):
"""return the exit code of the program"""
return self.__exitCode
def main(cfg,log):
try:
rm = None
dGen = DescGenerator(cfg)
cfg = dGen.initializeDesc()
rm = RingMaster(cfg, log)
rm.start()
while not rm.shouldStop():
time.sleep(1)
rm.stop()
log.debug('returning from main')
return rm.getExitCode()
except Exception, e:
if log:
log.critical(get_exception_string())
raise Exception(e)