| #Licensed to the Apache Software Foundation (ASF) under one |
| #or more contributor license agreements. See the NOTICE file |
| #distributed with this work for additional information |
| #regarding copyright ownership. The ASF licenses this file |
| #to you under the Apache License, Version 2.0 (the |
| #"License"); you may not use this file except in compliance |
| #with the License. You may obtain a copy of the License at |
| |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| |
| #Unless required by applicable law or agreed to in writing, software |
| #distributed under the License is distributed on an "AS IS" BASIS, |
| #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| #See the License for the specific language governing permissions and |
| #limitations under the License. |
| """defines Service as abstract interface""" |
| |
| # -*- python -*- |
| import random, socket |
| |
| class Service: |
| """ the service base class that all the |
| other services inherit from. """ |
| def __init__(self, serviceDesc, workDirs): |
| self.serviceDesc = serviceDesc |
| self.workDirs = workDirs |
| |
| def getName(self): |
| return self.serviceDesc.getName() |
| |
| def getInfoAddrs(self): |
| """Return a list of addresses that provide |
| information about the servie""" |
| return [] |
| |
| def isLost(self): |
| """True if the service is down""" |
| raise NotImplementedError |
| |
| def addNodes(self, nodeList): |
| """add nodeSet""" |
| raise NotImplementedError |
| |
| def removeNodes(self, nodeList): |
| """remove a nodeset""" |
| raise NotImplementedError |
| |
| def getWorkers(self): |
| raise NotImplementedError |
| |
| def needsMore(self): |
| """return number of nodes the service wants to add""" |
| raise NotImplementedError |
| |
| def needsLess(self): |
| """return number of nodes the service wants to remove""" |
| raise NotImplementedError |
| |
| class MasterSlave(Service): |
| """ the base class for a master slave |
| service architecture. """ |
| def __init__(self, serviceDesc, workDirs,requiredNode): |
| Service.__init__(self, serviceDesc, workDirs) |
| self.launchedMaster = False |
| self.masterInitialized = False |
| self.masterAddress = 'none' |
| self.requiredNode = requiredNode |
| self.failedMsg = None |
| self.masterFailureCount = 0 |
| |
| def getRequiredNode(self): |
| return self.requiredNode |
| |
| def getMasterRequest(self): |
| """ the number of master you need |
| to run for this service. """ |
| raise NotImplementedError |
| |
| def isLaunchable(self, serviceDict): |
| """ if your service does not depend on |
| other services. is set to true by default. """ |
| return True |
| |
| def getMasterCommands(self, serviceDict): |
| """ a list of master commands you |
| want to run for this service. """ |
| raise NotImplementedError |
| |
| def getAdminCommands(self, serviceDict): |
| """ a list of admin commands you |
| want to run for this service. """ |
| raise NotImplementedError |
| |
| def getWorkerCommands(self, serviceDict): |
| """ a list of worker commands you want to |
| run for this service. """ |
| raise NotImplementedError |
| |
| def setMasterNodes(self, list): |
| """ set the status of master nodes |
| after they start running on a node cluster. """ |
| raise NotImplementedError |
| |
| def addNodes(self, list): |
| """ add nodes to a service. Not implemented |
| currently. """ |
| raise NotImplementedError |
| |
| def getMasterAddrs(self): |
| """ return the addresses of master. the |
| hostname:port to which worker nodes should |
| connect. """ |
| raise NotImplementedError |
| |
| def setMasterParams(self, list): |
| """ set the various master params |
| depending on what each hodring set |
| the master params to. """ |
| raise NotImplementedError |
| |
| def setlaunchedMaster(self): |
| """ set the status of master launched |
| to true. """ |
| self.launchedMaster = True |
| |
| def isMasterLaunched(self): |
| """ return if a master has been launched |
| for the service or not. """ |
| return self.launchedMaster |
| |
| def isMasterInitialized(self): |
| """ return if a master if launched |
| has been initialized or not. """ |
| return self.masterInitialized |
| |
| def setMasterInitialized(self): |
| """ set the master initialized to |
| true. """ |
| self.masterInitialized = True |
| # Reset failure related variables, as master is initialized successfully. |
| self.masterFailureCount = 0 |
| self.failedMsg = None |
| |
| def getMasterAddress(self): |
| """ it needs to change to reflect |
| more that one masters. Currently it |
| keeps a knowledge of where the master |
| was launched and to keep track if it was actually |
| up or not. """ |
| return self.masterAddress |
| |
| def setMasterAddress(self, addr): |
| self.masterAddress = addr |
| |
| def isExternal(self): |
| return self.serviceDesc.isExternal() |
| |
| def setMasterFailed(self, err): |
| """Sets variables related to Master failure""" |
| self.masterFailureCount += 1 |
| self.failedMsg = err |
| # When command is sent to HodRings, this would have been set to True. |
| # Reset it to reflect the correct status. |
| self.launchedMaster = False |
| |
| def getMasterFailed(self): |
| return self.failedMsg |
| |
| def getMasterFailureCount(self): |
| return self.masterFailureCount |
| |
| class NodeRequest: |
| """ A class to define |
| a node request. """ |
| def __init__(self, n, required = [], preferred = [], isPreemptee = True): |
| self.numNodes = n |
| self.preferred = preferred |
| self.isPreemptee = isPreemptee |
| self.required = required |
| |
| def setNumNodes(self, n): |
| self.numNodes = n |
| |
| def setPreferredList(self, list): |
| self.preferred = list |
| |
| def setIsPreemptee(self, flag): |
| self.isPreemptee = flag |
| |
| |
| class ServiceUtil: |
| """ this class should be moved out of |
| service.py to a util file""" |
| localPortUsed = {} |
| |
| def getUniqRandomPort(h=None, low=50000, high=60000, retry=900, log=None): |
| """This allocates a randome free port between low and high""" |
| # We use a default value of 900 retries, which takes an agreeable |
| # time limit of ~ 6.2 seconds to check 900 ports, in the worse case |
| # of no available port in those 900. |
| |
| while retry > 0: |
| n = random.randint(low, high) |
| if n in ServiceUtil.localPortUsed: |
| continue |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| if not h: |
| h = socket.gethostname() |
| avail = False |
| if log: log.debug("Trying to see if port %s is available"% n) |
| try: |
| s.bind((h, n)) |
| if log: log.debug("Yes, port %s is available" % n) |
| avail = True |
| except socket.error,e: |
| if log: log.debug("Could not bind to the port %s. Reason %s" % (n,e)) |
| retry -= 1 |
| pass |
| # The earlier code that used to be here had syntax errors. The code path |
| # couldn't be followd anytime, so the error remained uncaught. |
| # This time I stumbled upon the error |
| s.close() |
| |
| if avail: |
| ServiceUtil.localPortUsed[n] = True |
| return n |
| raise ValueError, "Can't find unique local port between %d and %d" % (low, high) |
| |
| getUniqRandomPort = staticmethod(getUniqRandomPort) |
| |
| def getUniqPort(h=None, low=40000, high=60000, retry=900, log=None): |
| """get unique port on a host that can be used by service |
| This and its consumer code should disappear when master |
| nodes get allocatet by nodepool""" |
| |
| # We use a default value of 900 retries, which takes an agreeable |
| # time limit of ~ 6.2 seconds to check 900 ports, in the worse case |
| # of no available port in those 900. |
| |
| n = low |
| while retry > 0: |
| n = n + 1 |
| if n in ServiceUtil.localPortUsed: |
| continue |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| if not h: |
| h = socket.gethostname() |
| avail = False |
| if log: log.debug("Trying to see if port %s is available"% n) |
| try: |
| s.bind((h, n)) |
| if log: log.debug("Yes, port %s is available" % n) |
| avail = True |
| except socket.error,e: |
| if log: log.debug("Could not bind to the port %s. Reason %s" % (n,e)) |
| retry -= 1 |
| pass |
| s.close() |
| |
| if avail: |
| ServiceUtil.localPortUsed[n] = True |
| return n |
| |
| raise ValueError, "Can't find unique local port between %d and %d" % (low, high) |
| |
| getUniqPort = staticmethod(getUniqPort) |