blob: f0c7f5cbbf8571dcb2ba9eab70d2d633a8fabb2b [file] [log] [blame]
#Licensed to the Apache Software Foundation (ASF) under one
#or more contributor license agreements. See the NOTICE file
#distributed with this work for additional information
#regarding copyright ownership. The ASF licenses this file
#to you under the Apache License, Version 2.0 (the
#"License"); you may not use this file except in compliance
#with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
"""defines Service as abstract interface"""
# -*- python -*-
import random, socket
class Service:
""" the service base class that all the
other services inherit from. """
def __init__(self, serviceDesc, workDirs):
self.serviceDesc = serviceDesc
self.workDirs = workDirs
def getName(self):
return self.serviceDesc.getName()
def getInfoAddrs(self):
"""Return a list of addresses that provide
information about the servie"""
return []
def isLost(self):
"""True if the service is down"""
raise NotImplementedError
def addNodes(self, nodeList):
"""add nodeSet"""
raise NotImplementedError
def removeNodes(self, nodeList):
"""remove a nodeset"""
raise NotImplementedError
def getWorkers(self):
raise NotImplementedError
def needsMore(self):
"""return number of nodes the service wants to add"""
raise NotImplementedError
def needsLess(self):
"""return number of nodes the service wants to remove"""
raise NotImplementedError
class MasterSlave(Service):
""" the base class for a master slave
service architecture. """
def __init__(self, serviceDesc, workDirs,requiredNode):
Service.__init__(self, serviceDesc, workDirs)
self.launchedMaster = False
self.masterInitialized = False
self.masterAddress = 'none'
self.requiredNode = requiredNode
self.failedMsg = None
self.masterFailureCount = 0
def getRequiredNode(self):
return self.requiredNode
def getMasterRequest(self):
""" the number of master you need
to run for this service. """
raise NotImplementedError
def isLaunchable(self, serviceDict):
""" if your service does not depend on
other services. is set to true by default. """
return True
def getMasterCommands(self, serviceDict):
""" a list of master commands you
want to run for this service. """
raise NotImplementedError
def getAdminCommands(self, serviceDict):
""" a list of admin commands you
want to run for this service. """
raise NotImplementedError
def getWorkerCommands(self, serviceDict):
""" a list of worker commands you want to
run for this service. """
raise NotImplementedError
def setMasterNodes(self, list):
""" set the status of master nodes
after they start running on a node cluster. """
raise NotImplementedError
def addNodes(self, list):
""" add nodes to a service. Not implemented
currently. """
raise NotImplementedError
def getMasterAddrs(self):
""" return the addresses of master. the
hostname:port to which worker nodes should
connect. """
raise NotImplementedError
def setMasterParams(self, list):
""" set the various master params
depending on what each hodring set
the master params to. """
raise NotImplementedError
def setlaunchedMaster(self):
""" set the status of master launched
to true. """
self.launchedMaster = True
def isMasterLaunched(self):
""" return if a master has been launched
for the service or not. """
return self.launchedMaster
def isMasterInitialized(self):
""" return if a master if launched
has been initialized or not. """
return self.masterInitialized
def setMasterInitialized(self):
""" set the master initialized to
true. """
self.masterInitialized = True
# Reset failure related variables, as master is initialized successfully.
self.masterFailureCount = 0
self.failedMsg = None
def getMasterAddress(self):
""" it needs to change to reflect
more that one masters. Currently it
keeps a knowledge of where the master
was launched and to keep track if it was actually
up or not. """
return self.masterAddress
def setMasterAddress(self, addr):
self.masterAddress = addr
def isExternal(self):
return self.serviceDesc.isExternal()
def setMasterFailed(self, err):
"""Sets variables related to Master failure"""
self.masterFailureCount += 1
self.failedMsg = err
# When command is sent to HodRings, this would have been set to True.
# Reset it to reflect the correct status.
self.launchedMaster = False
def getMasterFailed(self):
return self.failedMsg
def getMasterFailureCount(self):
return self.masterFailureCount
class NodeRequest:
""" A class to define
a node request. """
def __init__(self, n, required = [], preferred = [], isPreemptee = True):
self.numNodes = n
self.preferred = preferred
self.isPreemptee = isPreemptee
self.required = required
def setNumNodes(self, n):
self.numNodes = n
def setPreferredList(self, list):
self.preferred = list
def setIsPreemptee(self, flag):
self.isPreemptee = flag
class ServiceUtil:
""" this class should be moved out of
service.py to a util file"""
localPortUsed = {}
def getUniqRandomPort(h=None, low=50000, high=60000, retry=900, log=None):
"""This allocates a randome free port between low and high"""
# We use a default value of 900 retries, which takes an agreeable
# time limit of ~ 6.2 seconds to check 900 ports, in the worse case
# of no available port in those 900.
while retry > 0:
n = random.randint(low, high)
if n in ServiceUtil.localPortUsed:
continue
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if not h:
h = socket.gethostname()
avail = False
if log: log.debug("Trying to see if port %s is available"% n)
try:
s.bind((h, n))
if log: log.debug("Yes, port %s is available" % n)
avail = True
except socket.error,e:
if log: log.debug("Could not bind to the port %s. Reason %s" % (n,e))
retry -= 1
pass
# The earlier code that used to be here had syntax errors. The code path
# couldn't be followd anytime, so the error remained uncaught.
# This time I stumbled upon the error
s.close()
if avail:
ServiceUtil.localPortUsed[n] = True
return n
raise ValueError, "Can't find unique local port between %d and %d" % (low, high)
getUniqRandomPort = staticmethod(getUniqRandomPort)
def getUniqPort(h=None, low=40000, high=60000, retry=900, log=None):
"""get unique port on a host that can be used by service
This and its consumer code should disappear when master
nodes get allocatet by nodepool"""
# We use a default value of 900 retries, which takes an agreeable
# time limit of ~ 6.2 seconds to check 900 ports, in the worse case
# of no available port in those 900.
n = low
while retry > 0:
n = n + 1
if n in ServiceUtil.localPortUsed:
continue
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if not h:
h = socket.gethostname()
avail = False
if log: log.debug("Trying to see if port %s is available"% n)
try:
s.bind((h, n))
if log: log.debug("Yes, port %s is available" % n)
avail = True
except socket.error,e:
if log: log.debug("Could not bind to the port %s. Reason %s" % (n,e))
retry -= 1
pass
s.close()
if avail:
ServiceUtil.localPortUsed[n] = True
return n
raise ValueError, "Can't find unique local port between %d and %d" % (low, high)
getUniqPort = staticmethod(getUniqPort)