| #Licensed to the Apache Software Foundation (ASF) under one |
| #or more contributor license agreements. See the NOTICE file |
| #distributed with this work for additional information |
| #regarding copyright ownership. The ASF licenses this file |
| #to you under the Apache License, Version 2.0 (the |
| #"License"); you may not use this file except in compliance |
| #with the License. You may obtain a copy of the License at |
| |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| |
| #Unless required by applicable law or agreed to in writing, software |
| #distributed under the License is distributed on an "AS IS" BASIS, |
| #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| #See the License for the specific language governing permissions and |
| #limitations under the License. |
| #!/usr/bin/env python |
| """manages services and nodepool""" |
| # -*- python -*- |
| |
| import os, sys, random, time, sets, shutil, threading |
| import urllib, urlparse, re, getpass, pprint, signal, shutil |
| |
| from pprint import pformat |
| from HTMLParser import HTMLParser |
| |
| binfile = sys.path[0] |
| libdir = os.path.dirname(binfile) |
| sys.path.append(libdir) |
| |
| import hodlib.Common.logger |
| from hodlib.RingMaster.idleJobTracker import JobTrackerMonitor, HadoopJobStatus |
| |
| from hodlib.Common.threads import func |
| |
| from hodlib.Hod.nodePool import * |
| from hodlib.Common.util import * |
| from hodlib.Common.nodepoolutil import NodePoolUtil |
| from hodlib.Common.socketServers import hodXMLRPCServer |
| from hodlib.Common.socketServers import threadedHTTPServer |
| from hodlib.NodePools import * |
| from hodlib.NodePools.torque import * |
| from hodlib.GridServices import * |
| from hodlib.Common.descGenerator import * |
| from hodlib.Common.xmlrpc import hodXRClient |
| from hodlib.Common.miniHTMLParser import miniHTMLParser |
| from hodlib.Common.threads import simpleCommand |
| |
| class ringMasterServer: |
| """The RPC server that exposes all the master config |
| changes. Also, one of these RPC servers runs as a proxy |
| and all the hodring instances register with this proxy""" |
| instance = None |
| xmlrpc = None |
| |
| def __init__(self, cfg, log, logMasterSources, retry=5): |
| try: |
| from hodlib.Common.socketServers import twistedXMLRPCServer |
| ringMasterServer.xmlrpc = twistedXMLRPCServer("", |
| cfg['ringmaster']['xrs-port-range']) |
| except ImportError: |
| log.info("Twisted interface not found. Using hodXMLRPCServer.") |
| ringMasterServer.xmlrpc = hodXMLRPCServer("", |
| cfg['ringmaster']['xrs-port-range']) |
| |
| ringMasterServer.xmlrpc.register_instance(logMasterSources) |
| self.logMasterSources = logMasterSources |
| ringMasterServer.xmlrpc.serve_forever() |
| |
| while not ringMasterServer.xmlrpc.is_alive(): |
| time.sleep(.5) |
| |
| log.debug('Ringmaster RPC Server at %d' % |
| ringMasterServer.xmlrpc.server_address[1]) |
| |
| def startService(ss, cfg, np, log, rm): |
| logMasterSources = _LogMasterSources(ss, cfg, np, log, rm) |
| ringMasterServer.instance = ringMasterServer(cfg, log, logMasterSources) |
| |
| def stopService(): |
| ringMasterServer.xmlrpc.stop() |
| |
| def getPort(): |
| return ringMasterServer.instance.port |
| |
| def getAddress(): |
| return 'http://%s:%d/' % (socket.gethostname(), |
| ringMasterServer.xmlrpc.server_address[1]) |
| |
| startService = staticmethod(startService) |
| stopService = staticmethod(stopService) |
| getPort = staticmethod(getPort) |
| getAddress = staticmethod(getAddress) |
| |
| class _LogMasterSources: |
| """All the methods that are run by the RPC server are |
| added into this class """ |
| |
| def __init__(self, serviceDict, cfg, np, log, rm): |
| self.serviceDict = serviceDict |
| self.tarSource = [] |
| self.tarSourceLock = threading.Lock() |
| self.dict = {} |
| self.count = {} |
| self.logsourceList = [] |
| self.logsourceListLock = threading.Lock() |
| self.masterParam = [] |
| self.masterParamLock = threading.Lock() |
| self.verify = 'none' |
| self.cmdLock = threading.Lock() |
| self.cfg = cfg |
| self.log = log |
| self.np = np |
| self.rm = rm |
| self.hdfsHost = None |
| self.mapredHost = None |
| self.maxconnect = self.cfg['ringmaster']['max-connect'] |
| self.log.debug("Using max-connect value %s"%self.maxconnect) |
| |
| |
| def registerTarSource(self, hostname, url, addr=None): |
| self.log.debug("registering: " + url) |
| lock = self.tarSourceLock |
| lock.acquire() |
| self.dict[url] = url |
| self.count[url] = 0 |
| # addr is None when ringMaster himself invokes this method |
| if addr: |
| c = self.count[addr] |
| self.count[addr] = c - 1 |
| lock.release() |
| if addr: |
| str = "%s is done" % (addr) |
| self.log.debug(str) |
| return url |
| |
| def getTarList(self,hodring): # this looks useful |
| lock = self.tarSourceLock |
| lock.acquire() |
| leastkey = None |
| leastval = -1 |
| for k, v in self.count.iteritems(): |
| if (leastval == -1): |
| leastval = v |
| pass |
| if (v <= leastval and v < self.maxconnect): |
| leastkey = k |
| leastval = v |
| if (leastkey == None): |
| url = 'none' |
| else: |
| url = self.dict[leastkey] |
| self.count[leastkey] = leastval + 1 |
| self.log.debug("%s %d" % (leastkey, self.count[leastkey])) |
| lock.release() |
| self.log.debug('sending url ' + url+" to "+hodring) # this looks useful |
| return url |
| |
| def tarDone(self, uri): |
| str = "%s is done" % (uri) |
| self.log.debug(str) |
| lock = self.tarSourceLock |
| lock.acquire() |
| c = self.count[uri] |
| self.count[uri] = c - 1 |
| lock.release() |
| return uri |
| |
| def status(self): |
| return True |
| |
| # FIXME: this code is broken, it relies on a central service registry |
| # |
| # def clusterStart(self, changedClusterParams=[]): |
| # self.log.debug("clusterStart method invoked.") |
| # self.dict = {} |
| # self.count = {} |
| # try: |
| # if (len(changedClusterParams) > 0): |
| # self.log.debug("Updating config.") |
| # for param in changedClusterParams: |
| # (key, sep1, val) = param.partition('=') |
| # (i1, sep2, i2) = key.partition('.') |
| # try: |
| # prev = self.cfg[i1][i2] |
| # self.rm.cfg[i1][i2] = val |
| # self.cfg[i1][i2] = val |
| # self.log.debug("\nModified [%s][%s]=%s to [%s][%s]=%s" % (i1, i2, prev, i1, i2, val)) |
| # except KeyError, e: |
| # self.log.info("Skipping %s as no such config parameter found in ringmaster" % param) |
| # self.log.debug("Regenerating Service Description.") |
| # dGen = DescGenerator(self.rm.cfg) |
| # self.rm.cfg['servicedesc'] = dGen.createServiceDescDict() |
| # self.cfg['servicedesc'] = self.rm.cfg['servicedesc'] |
| # |
| # self.rm.tar = None |
| # if self.rm.cfg['ringmaster'].has_key('hadoop-tar-ball'): |
| # self.rm.download = True |
| # self.rm.tar = self.rm.cfg['ringmaster']['hadoop-tar-ball'] |
| # self.log.debug("self.rm.tar=%s" % self.rm.tar) |
| # |
| # self.rm.cd_to_tempdir() |
| # |
| # self.rm.tarAddress = None |
| # hostname = socket.gethostname() |
| # if (self.rm.download): |
| # self.rm.basename = os.path.basename(self.rm.tar) |
| # dest = os.path.join(os.getcwd(), self.rm.basename) |
| # src = self.rm.tar |
| # self.log.debug("cp %s -> %s" % (src, dest)) |
| # shutil.copy(src, dest) |
| # self.rm.tarAddress = "%s%s" % (self.rm.httpAddress, self.rm.basename) |
| # self.registerTarSource(hostname, self.rm.tarAddress) |
| # self.log.debug("Registered new tarAddress %s" % self.rm.tarAddress) |
| # else: |
| # self.log.debug("Download not set.") |
| # |
| # if (self.rm.tar != None): |
| # self.cfg['hodring']['download-addr'] = self.rm.tarAddress |
| # self.rm.cfg['hodring']['download-addr'] = self.rm.tarAddress |
| # |
| # sdl = self.rm.cfg['servicedesc'] |
| # workDirs = self.rm.getWorkDirs(self.rm.cfg, True) |
| # hdfsDesc = sdl['hdfs'] |
| # hdfs = None |
| # if hdfsDesc.isExternal(): |
| # hdfs = HdfsExternal(hdfsDesc, workDirs) |
| # else: |
| # hdfs = Hdfs(hdfsDesc, workDirs, 0, False, True) |
| # |
| # self.rm.serviceDict[hdfs.getName()] = hdfs |
| # mrDesc = sdl['mapred'] |
| # mr = None |
| # if mrDesc.isExternal(): |
| # mr = MapReduceExternal(mrDesc, workDirs) |
| # else: |
| # mr = MapReduce(mrDesc, workDirs, 1) |
| # self.rm.serviceDict[mr.getName()] = mr |
| # |
| # ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'], |
| # self.np.getServiceId(), 'hodring', 'hod') |
| # |
| # slaveList = ringList |
| # hdfsringXRAddress = None |
| # # Start HDFS Master - Step 1 |
| # if not hdfsDesc.isExternal(): |
| # masterFound = False |
| # for ring in ringList: |
| # ringXRAddress = ring['xrs'] |
| # if ringXRAddress == None: |
| # raise Exception("Could not get hodring XML-RPC server address.") |
| # if (ringXRAddress.find(self.hdfsHost) != -1): |
| # ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0) |
| # hdfsringXRAddress = ringXRAddress |
| # self.log.debug("Invoking clusterStart on " + ringXRAddress + " (HDFS Master)") |
| # ringClient.clusterStart() |
| # masterFound = True |
| # slaveList.remove(ring) |
| # break |
| # if not masterFound: |
| # raise Exception("HDFS Master host not found") |
| # while hdfs.getInfoAddrs() == None: |
| # self.log.debug("Waiting for HDFS Master (Name Node) to register dfs.info.port") |
| # time.sleep(1) |
| # |
| # # Start MAPRED Master - Step 2 |
| # if not mrDesc.isExternal(): |
| # masterFound = False |
| # for ring in ringList: |
| # ringXRAddress = ring['xrs'] |
| # if ringXRAddress == None: |
| # raise Exception("Could not get hodring XML-RPC server address.") |
| # if (not mrDesc.isExternal() and ringXRAddress.find(self.mapredHost) != -1): |
| # ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0) |
| # self.log.debug("Invoking clusterStart on " + ringXRAddress + " (MAPRED Master)") |
| # ringClient.clusterStart() |
| # masterFound = True |
| # slaveList.remove(ring) |
| # break |
| # if not masterFound: |
| # raise Excpetion("MAPRED Master host not found") |
| # while mr.getInfoAddrs() == None: |
| # self.log.debug("Waiting for MAPRED Master (Job Tracker) to register \ |
| # mapred.job.tracker.info.port") |
| # time.sleep(1) |
| # |
| # # Start Slaves - Step 3 |
| # for ring in slaveList: |
| # ringXRAddress = ring['xrs'] |
| # if ringXRAddress == None: |
| # raise Exception("Could not get hodring XML-RPC server address.") |
| # ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0) |
| # self.log.debug("Invoking clusterStart on " + ringXRAddress + " (Slaves)") |
| # ringThread = func(name='hodring_slaves_start', functionRef=ringClient.clusterStart()) |
| # ring['thread'] = ringThread |
| # ringThread.start() |
| # |
| # for ring in slaveList: |
| # ringThread = ring['thread'] |
| # if ringThread == None: |
| # raise Exception("Could not get hodring thread (Slave).") |
| # ringThread.join() |
| # self.log.debug("Completed clusterStart on " + ring['xrs'] + " (Slave)") |
| # |
| # # Run Admin Commands on HDFS Master - Step 4 |
| # if not hdfsDesc.isExternal(): |
| # if hdfsringXRAddress == None: |
| # raise Exception("HDFS Master host not found (to Run Admin Commands)") |
| # ringClient = hodXRClient(hdfsringXRAddress, None, None, 0, 0, 0, False, 0) |
| # self.log.debug("Invoking clusterStart(False) - Admin on " |
| # + hdfsringXRAddress + " (HDFS Master)") |
| # ringClient.clusterStart(False) |
| # |
| # except: |
| # self.log.debug(get_exception_string()) |
| # return False |
| # |
| # self.log.debug("Successfully started cluster.") |
| # return True |
| # |
| # def clusterStop(self): |
| # self.log.debug("clusterStop method invoked.") |
| # try: |
| # hdfsAddr = self.getServiceAddr('hdfs') |
| # if hdfsAddr.find(':') != -1: |
| # h, p = hdfsAddr.split(':', 1) |
| # self.hdfsHost = h |
| # self.log.debug("hdfsHost: " + self.hdfsHost) |
| # mapredAddr = self.getServiceAddr('mapred') |
| # if mapredAddr.find(':') != -1: |
| # h, p = mapredAddr.split(':', 1) |
| # self.mapredHost = h |
| # self.log.debug("mapredHost: " + self.mapredHost) |
| # ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'], |
| # self.np.getServiceId(), |
| # 'hodring', 'hod') |
| # for ring in ringList: |
| # ringXRAddress = ring['xrs'] |
| # if ringXRAddress == None: |
| # raise Exception("Could not get hodring XML-RPC server address.") |
| # ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False) |
| # self.log.debug("Invoking clusterStop on " + ringXRAddress) |
| # ringThread = func(name='hodring_stop', functionRef=ringClient.clusterStop()) |
| # ring['thread'] = ringThread |
| # ringThread.start() |
| # |
| # for ring in ringList: |
| # ringThread = ring['thread'] |
| # if ringThread == None: |
| # raise Exception("Could not get hodring thread.") |
| # ringThread.join() |
| # self.log.debug("Completed clusterStop on " + ring['xrs']) |
| # |
| # except: |
| # self.log.debug(get_exception_string()) |
| # return False |
| # |
| # self.log.debug("Successfully stopped cluster.") |
| # |
| # return True |
| |
| def getCommand(self, addr): |
| """This method is called by the |
| hodrings to get commands from |
| the ringmaster""" |
| lock = self.cmdLock |
| cmdList = [] |
| lock.acquire() |
| try: |
| try: |
| for v in self.serviceDict.itervalues(): |
| if (not v.isExternal()): |
| if v.isLaunchable(self.serviceDict): |
| # If a master is still not launched, or the number of |
| # retries for launching master is not reached, |
| # launch master |
| if not v.isMasterLaunched() and \ |
| (v.getMasterFailureCount() <= \ |
| self.cfg['ringmaster']['max-master-failures']): |
| cmdList = v.getMasterCommands(self.serviceDict) |
| v.setlaunchedMaster() |
| v.setMasterAddress(addr) |
| break |
| if cmdList == []: |
| for s in self.serviceDict.itervalues(): |
| if (not v.isExternal()): |
| if s.isMasterInitialized(): |
| cl = s.getWorkerCommands(self.serviceDict) |
| cmdList.extend(cl) |
| else: |
| cmdList = [] |
| break |
| except: |
| self.log.debug(get_exception_string()) |
| finally: |
| lock.release() |
| pass |
| |
| cmd = addr + pformat(cmdList) |
| self.log.debug("getCommand returning " + cmd) |
| return cmdList |
| |
| def getAdminCommand(self, addr): |
| """This method is called by the |
| hodrings to get admin commands from |
| the ringmaster""" |
| lock = self.cmdLock |
| cmdList = [] |
| lock.acquire() |
| try: |
| try: |
| for v in self.serviceDict.itervalues(): |
| cmdList = v.getAdminCommands(self.serviceDict) |
| if cmdList != []: |
| break |
| except Exception, e: |
| self.log.debug(get_exception_string()) |
| finally: |
| lock.release() |
| pass |
| cmd = addr + pformat(cmdList) |
| self.log.debug("getAdminCommand returning " + cmd) |
| return cmdList |
| |
| def addMasterParams(self, addr, vals): |
| """This method is called by |
| hodring to update any parameters |
| its changed for the commands it was |
| running""" |
| self.log.debug('Comment: adding master params from %s' % addr) |
| self.log.debug(pformat(vals)) |
| lock = self.masterParamLock |
| lock.acquire() |
| try: |
| for v in self.serviceDict.itervalues(): |
| if v.isMasterLaunched(): |
| if (v.getMasterAddress() == addr): |
| v.setMasterParams(vals) |
| v.setMasterInitialized() |
| except: |
| self.log.debug(get_exception_string()) |
| pass |
| lock.release() |
| |
| return addr |
| |
| def setHodRingErrors(self, addr, errors): |
| """This method is called by the hodrings to update errors |
| it encountered while starting up""" |
| self.log.critical("Hodring at %s failed with following errors:\n%s" \ |
| % (addr, errors)) |
| lock = self.masterParamLock |
| lock.acquire() |
| try: |
| for v in self.serviceDict.itervalues(): |
| if v.isMasterLaunched(): |
| if (v.getMasterAddress() == addr): |
| # strip the PID part. |
| idx = addr.rfind('_') |
| if idx is not -1: |
| addr = addr[:idx] |
| v.setMasterFailed("Hodring at %s failed with following" \ |
| " errors:\n%s" % (addr, errors)) |
| except: |
| self.log.debug(get_exception_string()) |
| pass |
| lock.release() |
| return True |
| |
| def getKeys(self): |
| lock= self.masterParamLock |
| lock.acquire() |
| keys = self.serviceDict.keys() |
| lock.release() |
| |
| return keys |
| |
| def getServiceAddr(self, name): |
| addr = 'not found' |
| self.log.debug("getServiceAddr name: %s" % name) |
| lock= self.masterParamLock |
| lock.acquire() |
| try: |
| service = self.serviceDict[name] |
| except KeyError: |
| pass |
| else: |
| self.log.debug("getServiceAddr service: %s" % service) |
| # Check if we should give up ! If the limit on max failures is hit, |
| # give up. |
| err = service.getMasterFailed() |
| if (err is not None) and \ |
| (service.getMasterFailureCount() > \ |
| self.cfg['ringmaster']['max-master-failures']): |
| self.log.critical("Detected errors (%s) beyond allowed number"\ |
| " of failures (%s). Flagging error to client" \ |
| % (service.getMasterFailureCount(), \ |
| self.cfg['ringmaster']['max-master-failures'])) |
| addr = "Error: " + err |
| elif (service.isMasterInitialized()): |
| addr = service.getMasterAddrs()[0] |
| else: |
| addr = 'not found' |
| lock.release() |
| self.log.debug("getServiceAddr addr %s: %s" % (name, addr)) |
| |
| return addr |
| |
| def getURLs(self, name): |
| addr = 'none' |
| lock = self.masterParamLock |
| lock.acquire() |
| |
| try: |
| service = self.serviceDict[name] |
| except KeyError: |
| pass |
| else: |
| if (service.isMasterInitialized()): |
| addr = service.getInfoAddrs()[0] |
| |
| lock.release() |
| |
| return addr |
| |
| def stopRM(self): |
| """An XMLRPC call which will spawn a thread to stop the Ringmaster program.""" |
| # We spawn a thread here because we want the XMLRPC call to return. Calling |
| # stop directly from here will also stop the XMLRPC server. |
| try: |
| self.log.debug("inside xml-rpc call to stop ringmaster") |
| rmStopperThread = func('RMStopper', self.rm.stop) |
| rmStopperThread.start() |
| self.log.debug("returning from xml-rpc call to stop ringmaster") |
| return True |
| except: |
| self.log.debug("Exception in stop: %s" % get_exception_string()) |
| return False |
| |
| class RingMaster: |
| def __init__(self, cfg, log, **kwds): |
| """starts nodepool and services""" |
| self.download = False |
| self.httpServer = None |
| self.cfg = cfg |
| self.log = log |
| self.__hostname = local_fqdn() |
| self.workDirs = None |
| |
| # ref to the idle job tracker object. |
| self.__jtMonitor = None |
| self.__idlenessDetected = False |
| self.__stopInProgress = False |
| self.__isStopped = False # to let main exit |
| self.__exitCode = 0 # exit code with which the ringmaster main method should return |
| |
| self.workers_per_ring = self.cfg['ringmaster']['workers_per_ring'] |
| |
| self.__initialize_signal_handlers() |
| |
| sdd = self.cfg['servicedesc'] |
| gsvc = None |
| for key in sdd: |
| gsvc = sdd[key] |
| break |
| |
| npd = self.cfg['nodepooldesc'] |
| self.np = NodePoolUtil.getNodePool(npd, cfg, log) |
| |
| self.log.debug("Getting service ID.") |
| |
| self.serviceId = self.np.getServiceId() |
| |
| self.log.debug("Got service ID: %s" % self.serviceId) |
| |
| self.tarSrcLoc = None |
| if self.cfg['ringmaster'].has_key('hadoop-tar-ball'): |
| self.download = True |
| self.tarSrcLoc = self.cfg['ringmaster']['hadoop-tar-ball'] |
| |
| self.cd_to_tempdir() |
| |
| if (self.download): |
| self.__copy_tarball(os.getcwd()) |
| self.basename = self.__find_tarball_in_dir(os.getcwd()) |
| if self.basename is None: |
| raise Exception('Did not find tarball copied from %s in %s.' |
| % (self.tarSrcLoc, os.getcwd())) |
| |
| self.serviceAddr = to_http_url(self.cfg['ringmaster']['svcrgy-addr']) |
| |
| self.log.debug("Service registry @ %s" % self.serviceAddr) |
| |
| self.serviceClient = hodXRClient(self.serviceAddr) |
| self.serviceDict = {} |
| try: |
| sdl = self.cfg['servicedesc'] |
| |
| workDirs = self.getWorkDirs(cfg) |
| |
| hdfsDesc = sdl['hdfs'] |
| hdfs = None |
| |
| # Determine hadoop Version |
| hadoopVers = hadoopVersion(self.__getHadoopDir(), \ |
| self.cfg['hodring']['java-home'], self.log) |
| |
| if (hadoopVers['major']==None) or (hadoopVers['minor']==None): |
| raise Exception('Could not retrive the version of Hadoop.' |
| + ' Check the Hadoop installation or the value of the hodring.java-home variable.') |
| if hdfsDesc.isExternal(): |
| hdfs = HdfsExternal(hdfsDesc, workDirs, version=int(hadoopVers['minor'])) |
| hdfs.setMasterParams( self.cfg['gridservice-hdfs'] ) |
| else: |
| hdfs = Hdfs(hdfsDesc, workDirs, 0, version=int(hadoopVers['minor']), |
| workers_per_ring = self.workers_per_ring) |
| |
| self.serviceDict[hdfs.getName()] = hdfs |
| |
| mrDesc = sdl['mapred'] |
| mr = None |
| if mrDesc.isExternal(): |
| mr = MapReduceExternal(mrDesc, workDirs, version=int(hadoopVers['minor'])) |
| mr.setMasterParams( self.cfg['gridservice-mapred'] ) |
| else: |
| mr = MapReduce(mrDesc, workDirs,1, version=int(hadoopVers['minor']), |
| workers_per_ring = self.workers_per_ring) |
| |
| self.serviceDict[mr.getName()] = mr |
| except: |
| self.log.critical("Exception in creating Hdfs and Map/Reduce descriptor objects: \ |
| %s." % get_exception_error_string()) |
| self.log.debug(get_exception_string()) |
| raise |
| |
| # should not be starting these in a constructor |
| ringMasterServer.startService(self.serviceDict, cfg, self.np, log, self) |
| |
| self.rpcserver = ringMasterServer.getAddress() |
| |
| self.httpAddress = None |
| self.tarAddress = None |
| hostname = socket.gethostname() |
| if (self.download): |
| self.httpServer = threadedHTTPServer(hostname, |
| self.cfg['ringmaster']['http-port-range']) |
| |
| self.httpServer.serve_forever() |
| self.httpAddress = "http://%s:%d/" % (self.httpServer.server_address[0], |
| self.httpServer.server_address[1]) |
| self.tarAddress = "%s%s" % (self.httpAddress, self.basename) |
| |
| ringMasterServer.instance.logMasterSources.registerTarSource(hostname, |
| self.tarAddress) |
| else: |
| self.log.debug("Download not set.") |
| |
| self.log.debug("%s %s %s %s %s" % (self.cfg['ringmaster']['userid'], |
| self.serviceId, self.__hostname, 'ringmaster', 'hod')) |
| |
| if self.cfg['ringmaster']['register']: |
| if self.httpAddress: |
| self.serviceClient.registerService(self.cfg['ringmaster']['userid'], |
| self.serviceId, self.__hostname, 'ringmaster', 'hod', { |
| 'xrs' : self.rpcserver, 'http' : self.httpAddress }) |
| else: |
| self.serviceClient.registerService(self.cfg['ringmaster']['userid'], |
| self.serviceId, self.__hostname, 'ringmaster', 'hod', { |
| 'xrs' : self.rpcserver, }) |
| |
| self.log.debug("Registered with serivce registry: %s." % self.serviceAddr) |
| |
| hodRingPath = os.path.join(cfg['ringmaster']['base-dir'], 'bin', 'hodring') |
| hodRingWorkDir = os.path.join(cfg['hodring']['temp-dir'], 'hodring' + '_' |
| + getpass.getuser()) |
| |
| self.cfg['hodring']['hodring'] = [hodRingWorkDir,] |
| self.cfg['hodring']['svcrgy-addr'] = self.cfg['ringmaster']['svcrgy-addr'] |
| self.cfg['hodring']['service-id'] = self.np.getServiceId() |
| |
| self.cfg['hodring']['ringmaster-xrs-addr'] = self.__url_to_addr(self.rpcserver) |
| |
| if (self.tarSrcLoc != None): |
| cfg['hodring']['download-addr'] = self.tarAddress |
| |
| self.__init_job_tracker_monitor(ringMasterServer.instance.logMasterSources) |
| |
| def __init_job_tracker_monitor(self, logMasterSources): |
| hadoopDir = self.__getHadoopDir() |
| self.log.debug('hadoopdir=%s, java-home=%s' % \ |
| (hadoopDir, self.cfg['hodring']['java-home'])) |
| try: |
| self.__jtMonitor = JobTrackerMonitor(self.log, self, |
| self.cfg['ringmaster']['jt-poll-interval'], |
| self.cfg['ringmaster']['idleness-limit'], |
| hadoopDir, self.cfg['hodring']['java-home'], |
| logMasterSources) |
| self.log.debug('starting jt monitor') |
| self.__jtMonitor.start() |
| except: |
| self.log.critical('Exception in running idle job tracker. This cluster cannot be deallocated if idle.\ |
| Exception message: %s' % get_exception_error_string()) |
| self.log.debug('Exception details: %s' % get_exception_string()) |
| |
| |
| def __getHadoopDir(self): |
| hadoopDir = None |
| if self.cfg['ringmaster'].has_key('hadoop-tar-ball'): |
| tarFile = os.path.join(os.getcwd(), self.basename) |
| ret = untar(tarFile, os.getcwd()) |
| if not ret: |
| raise Exception('Untarring tarfile %s to directory %s failed. Cannot find hadoop directory.' \ |
| % (tarFile, os.getcwd())) |
| hadoopDir = os.path.join(os.getcwd(), self.__get_dir(tarFile)) |
| else: |
| hadoopDir = self.cfg['gridservice-mapred']['pkgs'] |
| self.log.debug('Returning Hadoop directory as: %s' % hadoopDir) |
| return hadoopDir |
| |
| def __get_dir(self, name): |
| """Return the root directory inside the tarball |
| specified by name. Assumes that the tarball begins |
| with a root directory.""" |
| import tarfile |
| myTarFile = tarfile.open(name) |
| hadoopPackage = myTarFile.getnames()[0] |
| self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage)) |
| return hadoopPackage |
| |
| def __find_tarball_in_dir(self, dir): |
| """Find the tarball among files specified in the given |
| directory. We need this method because how the tarball |
| source URI is given depends on the method of copy and |
| we can't get the tarball name from that. |
| This method will fail if there are multiple tarballs |
| in the directory with the same suffix.""" |
| files = os.listdir(dir) |
| for file in files: |
| if self.tarSrcLoc.endswith(file): |
| return file |
| return None |
| |
| def __copy_tarball(self, destDir): |
| """Copy the hadoop tar ball from a remote location to the |
| specified destination directory. Based on the URL it executes |
| an appropriate copy command. Throws an exception if the command |
| returns a non-zero exit code.""" |
| # for backwards compatibility, treat the default case as file:// |
| url = '' |
| if self.tarSrcLoc.startswith('/'): |
| url = 'file:/' |
| src = '%s%s' % (url, self.tarSrcLoc) |
| if src.startswith('file://'): |
| src = src[len('file://')-1:] |
| cpCmd = '/bin/cp' |
| cmd = '%s %s %s' % (cpCmd, src, destDir) |
| self.log.debug('Command to execute: %s' % cmd) |
| copyProc = simpleCommand('remote copy', cmd) |
| copyProc.start() |
| copyProc.wait() |
| copyProc.join() |
| ret = copyProc.exit_code() |
| self.log.debug('Completed command execution. Exit Code: %s.' % ret) |
| |
| if ret != 0: |
| output = copyProc.output() |
| raise Exception('Could not copy tarball using command %s. Exit code: %s. Output: %s' |
| % (cmd, ret, output)) |
| else: |
| raise Exception('Unsupported URL for file: %s' % src) |
| |
| # input: http://hostname:port/. output: [hostname,port] |
| def __url_to_addr(self, url): |
| addr = url.rstrip('/') |
| if addr.startswith('http://'): |
| addr = addr.replace('http://', '', 1) |
| addr_parts = addr.split(':') |
| return [addr_parts[0], int(addr_parts[1])] |
| |
| def __initialize_signal_handlers(self): |
| def sigStop(sigNum, handler): |
| sig_wrapper(sigNum, self.stop) |
| |
| signal.signal(signal.SIGTERM, sigStop) |
| signal.signal(signal.SIGINT, sigStop) |
| signal.signal(signal.SIGQUIT, sigStop) |
| |
| def __clean_up(self): |
| tempDir = self.__get_tempdir() |
| os.chdir(os.path.split(tempDir)[0]) |
| if os.path.exists(tempDir): |
| shutil.rmtree(tempDir, True) |
| |
| self.log.debug("Cleaned up temporary dir: %s" % tempDir) |
| |
| def __get_tempdir(self): |
| dir = os.path.join(self.cfg['ringmaster']['temp-dir'], |
| "%s.%s.ringmaster" % (self.cfg['ringmaster']['userid'], |
| self.np.getServiceId())) |
| return dir |
| |
| def getWorkDirs(self, cfg, reUse=False): |
| |
| if (not reUse) or (self.workDirs == None): |
| import math |
| frand = random.random() |
| while math.ceil(frand) != math.floor(frand): |
| frand = frand * 100 |
| |
| irand = int(frand) |
| uniq = '%s-%d-%s' % (socket.gethostname(), os.getpid(), irand) |
| dirs = [] |
| parentDirs = cfg['ringmaster']['work-dirs'] |
| for p in parentDirs: |
| dir = os.path.join(p, uniq) |
| dirs.append(dir) |
| self.workDirs = dirs |
| |
| return self.workDirs |
| |
| def _fetchLink(self, link, parentDir): |
| parser = miniHTMLParser() |
| self.log.debug("Checking link %s" %link) |
| while link: |
| |
| # Get the file from the site and link |
| input = urllib.urlopen(link) |
| out = None |
| contentType = input.info().gettype() |
| isHtml = contentType == 'text/html' |
| |
| #print contentType |
| if isHtml: |
| parser.setBaseUrl(input.geturl()) |
| else: |
| parsed = urlparse.urlparse(link) |
| hp = parsed[1] |
| h = hp |
| p = None |
| if hp.find(':') != -1: |
| h, p = hp.split(':', 1) |
| path = parsed[2] |
| path = path.split('/') |
| file = os.path.join(parentDir, h, p) |
| for c in path: |
| if c == '': |
| continue |
| file = os.path.join(file, c) |
| |
| try: |
| self.log.debug('Creating %s' % file) |
| dir, tail = os.path.split(file) |
| if not os.path.exists(dir): |
| os.makedirs(dir) |
| except: |
| self.log.debug(get_exception_string()) |
| |
| out = open(file, 'w') |
| |
| bufSz = 8192 |
| buf = input.read(bufSz) |
| while len(buf) > 0: |
| if isHtml: |
| # Feed the file into the HTML parser |
| parser.feed(buf) |
| if out: |
| out.write(buf) |
| buf = input.read(bufSz) |
| |
| input.close() |
| if out: |
| out.close() |
| |
| # Search the retfile here |
| |
| # Get the next link in level traversal order |
| link = parser.getNextLink() |
| |
| parser.close() |
| |
| def _finalize(self): |
| try: |
| # FIXME: get dir from config |
| dir = 'HOD-log-P%d' % (os.getpid()) |
| dir = os.path.join('.', dir) |
| except: |
| self.log.debug(get_exception_string()) |
| |
| self.np.finalize() |
| |
| def handleIdleJobTracker(self): |
| self.log.critical("Detected idle job tracker for %s seconds. The allocation will be cleaned up." \ |
| % self.cfg['ringmaster']['idleness-limit']) |
| self.__idlenessDetected = True |
| |
| def cd_to_tempdir(self): |
| dir = self.__get_tempdir() |
| |
| if not os.path.exists(dir): |
| os.makedirs(dir) |
| os.chdir(dir) |
| |
| return dir |
| |
| def getWorkload(self): |
| return self.workload |
| |
| def getHostName(self): |
| return self.__hostname |
| |
| def start(self): |
| """run the thread main loop""" |
| |
| self.log.debug("Entered start method.") |
| hodring = os.path.join(self.cfg['ringmaster']['base-dir'], |
| 'bin', 'hodring') |
| largs = [hodring] |
| targs = self.cfg.get_args(section='hodring') |
| largs.extend(targs) |
| |
| hodringCmd = "" |
| for item in largs: |
| hodringCmd = "%s%s " % (hodringCmd, item) |
| |
| self.log.debug(hodringCmd) |
| |
| if self.np.runWorkers(largs) > 0: |
| self.log.critical("Failed to start worker.") |
| |
| self.log.debug("Returned from runWorkers.") |
| |
| self._finalize() |
| |
| def __findExitCode(self): |
| """Determine the exit code based on the status of the cluster or jobs run on them""" |
| xmlrpcServer = ringMasterServer.instance.logMasterSources |
| if xmlrpcServer.getServiceAddr('hdfs') == 'not found' or \ |
| xmlrpcServer.getServiceAddr('hdfs').startswith("Error: "): |
| self.__exitCode = 7 |
| elif xmlrpcServer.getServiceAddr('mapred') == 'not found' or \ |
| xmlrpcServer.getServiceAddr('mapred').startswith("Error: "): |
| self.__exitCode = 8 |
| else: |
| clusterStatus = get_cluster_status(xmlrpcServer.getServiceAddr('hdfs'), |
| xmlrpcServer.getServiceAddr('mapred')) |
| if clusterStatus != 0: |
| self.__exitCode = clusterStatus |
| else: |
| self.__exitCode = self.__findHadoopJobsExitCode() |
| self.log.debug('exit code %s' % self.__exitCode) |
| |
| def __findHadoopJobsExitCode(self): |
| """Determine the consolidate exit code of hadoop jobs run on this cluster, provided |
| this information is available. Return 0 otherwise""" |
| ret = 0 |
| failureStatus = 3 |
| failureCount = 0 |
| if self.__jtMonitor: |
| jobStatusList = self.__jtMonitor.getJobsStatus() |
| try: |
| if len(jobStatusList) > 0: |
| for jobStatus in jobStatusList: |
| self.log.debug('job status for %s: %s' % (jobStatus.getJobId(), |
| jobStatus.getStatus())) |
| if jobStatus.getStatus() == failureStatus: |
| failureCount = failureCount+1 |
| if failureCount > 0: |
| if failureCount == len(jobStatusList): # all jobs failed |
| ret = 16 |
| else: |
| ret = 17 |
| except: |
| self.log.debug('exception in finding hadoop jobs exit code' % get_exception_string()) |
| return ret |
| |
| def stop(self): |
| self.log.debug("RingMaster stop method invoked.") |
| if self.__stopInProgress or self.__isStopped: |
| return |
| self.__stopInProgress = True |
| if ringMasterServer.instance is not None: |
| self.log.debug('finding exit code') |
| self.__findExitCode() |
| self.log.debug('stopping ringmaster instance') |
| ringMasterServer.stopService() |
| else: |
| self.__exitCode = 6 |
| if self.__jtMonitor is not None: |
| self.__jtMonitor.stop() |
| if self.httpServer: |
| self.httpServer.stop() |
| |
| self.__clean_up() |
| self.__isStopped = True |
| |
| def shouldStop(self): |
| """Indicates whether the main loop should exit, either due to idleness condition, |
| or a stop signal was received""" |
| return self.__idlenessDetected or self.__isStopped |
| |
| def getExitCode(self): |
| """return the exit code of the program""" |
| return self.__exitCode |
| |
| def main(cfg,log): |
| try: |
| rm = None |
| dGen = DescGenerator(cfg) |
| cfg = dGen.initializeDesc() |
| rm = RingMaster(cfg, log) |
| rm.start() |
| while not rm.shouldStop(): |
| time.sleep(1) |
| rm.stop() |
| log.debug('returning from main') |
| return rm.getExitCode() |
| except Exception, e: |
| if log: |
| log.critical(get_exception_string()) |
| raise Exception(e) |