blob: a2b116d9b7ec3ac3083f038040c4ccb6cb0276cf [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import threading
import time
from tashi.rpycservices import rpycservices
from tashi.rpycservices.rpyctypes import Errors, InstanceState, HostState, TashiException
from tashi import boolean, ConnectionManager, vmStates, version, scrubString
class ClusterManagerService(object):
"""RPC service for the ClusterManager"""
def __init__(self, config, data, dfs):
self.config = config
self.data = data
self.authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
if self.authAndEncrypt:
self.username = config.get('AccessNodeManager', 'username')
self.password = config.get('AccessNodeManager', 'password')
else:
self.username = None
self.password = None
self.proxy = ConnectionManager(self.username, self.password, int(self.config.get('ClusterManager', 'nodeManagerPort')), authAndEncrypt=self.authAndEncrypt)
self.dfs = dfs
self.convertExceptions = boolean(config.get('ClusterManagerService', 'convertExceptions'))
self.log = logging.getLogger(__name__)
self.log.setLevel(logging.ERROR)
self.hostLastContactTime = {}
#self.hostLastUpdateTime = {}
self.instanceLastContactTime = {}
self.expireHostTime = float(self.config.get('ClusterManagerService', 'expireHostTime'))
self.allowDecayed = float(self.config.get('ClusterManagerService', 'allowDecayed'))
self.allowMismatchedVersions = boolean(self.config.get('ClusterManagerService', 'allowMismatchedVersions'))
self.maxMemory = int(self.config.get('ClusterManagerService', 'maxMemory'))
self.maxCores = int(self.config.get('ClusterManagerService', 'maxCores'))
self.allowDuplicateNames = boolean(self.config.get('ClusterManagerService', 'allowDuplicateNames'))
self.accountingHost = None
self.accountingPort = None
try:
self.accountingHost = self.config.get('ClusterManagerService', 'accountingHost')
self.accountingPort = self.config.getint('ClusterManagerService', 'accountingPort')
except:
pass
self.__initAccounting()
self.__initCluster()
threading.Thread(target=self.__monitorCluster).start()
def __initAccounting(self):
self.accountBuffer = []
self.accountLines = 0
self.accountingClient = None
try:
if (self.accountingHost is not None) and \
(self.accountingPort is not None):
self.accountingClient = ConnectionManager(self.username, self.password, self.accountingPort)[self.accountingHost]
except:
self.log.exception("Could not init accounting")
def __initCluster(self):
# initialize state of VMs if restarting
for instance in self.data.getInstances().itervalues():
instanceId = instance.id
instance = self.data.acquireInstance(instanceId)
instance.decayed = False
if instance.hostId is None:
self.__stateTransition(instance, None, InstanceState.Pending)
else:
self.__stateTransition(instance, None, InstanceState.Orphaned)
self.data.releaseInstance(instance)
# initialize state of hosts if restarting
for host in self.data.getHosts().itervalues():
hostId = host.id
host = self.data.acquireHost(hostId)
host.up = False
host.decayed = False
self.data.releaseHost(host)
def __ACCOUNTFLUSH(self):
try:
if (self.accountingClient is not None):
self.accountingClient.record(self.accountBuffer)
self.accountLines = 0
self.accountBuffer = []
except:
self.log.exception("Failed to flush accounting data")
def __ACCOUNT(self, text, instance=None, host=None):
now = self.__now()
instanceText = None
hostText = None
if instance is not None:
try:
instanceText = 'Instance(%s)' % (instance)
except:
self.log.exception("Invalid instance data")
if host is not None:
try:
hostText = "Host(%s)" % (host)
except:
self.log.exception("Invalid host data")
secondary = ','.join(filter(None, (hostText, instanceText)))
line = "%s|%s|%s" % (now, text, secondary)
self.accountBuffer.append(line)
self.accountLines += 1
# XXXstroucki think about autoflush by time
if (self.accountLines > 0):
self.__ACCOUNTFLUSH()
def __stateTransition(self, instance, old, cur):
if (old and instance.state != old):
raise TashiException(d={'errno':Errors.IncorrectVmState,'msg':"VmState is not %s - it is %s" % (vmStates[old], vmStates[instance.state])})
if (instance.state == cur):
# don't do anything if we're already at current state
return
instance.state = cur
# pass something down to the NM?
def __now(self):
return time.time()
def __downHost(self, host):
self.log.warning('Host %s is down' % (host.name))
host.up = False
host.decayed = False
self.__orphanInstances(host)
def __upHost(self, host):
self.log.warning('Host %s is up' % (host.name))
host.up = True
host.decayed = True
def __orphanInstances(self, host):
# expects lock to be held on host
instances = [instance.id for instance in self.data.getInstances().itervalues() if instance.hostId == host.id]
for instanceId in instances:
instance = self.data.acquireInstance(instanceId)
if instance.hostId == host.id:
instance.decayed = True
self.__stateTransition(instance, None, InstanceState.Orphaned)
self.data.releaseInstance(instance)
def __checkHosts(self):
# Check if hosts have been heard from recently
# Otherwise, see if it is alive
for hostId in self.hostLastContactTime.keys():
if (self.hostLastContactTime[hostId] < (self.__now() - self.expireHostTime)):
host = self.data.acquireHost(hostId)
string = None
try:
string = self.proxy[host.name].liveCheck()
except:
pass
if string != "alive":
self.__downHost(host)
del self.hostLastContactTime[hostId]
else:
self.__upHost(host)
self.hostLastContactTime[hostId] = self.__now()
self.data.releaseHost(host)
def __checkInstances(self):
# Reconcile instances with nodes
# obtain a list of instances I know about
myInstancesError = False
try:
myInstances = self.data.getInstances()
except:
myInstancesError = True
self.log.warning('Failure communicating with my database')
if myInstancesError == True:
return
# iterate through all hosts I believe are up
for hostId in self.hostLastContactTime.keys():
#self.log.warning("iterate %d" % hostId)
host = self.data.acquireHost(hostId)
# XXXstroucki: timing has changed with the message
# buffering in the NM, so this wasn't being run any-
# more because the time check was passing.
# I should think a bit more about this, but
# the "if True" is probably appropriate.
#if (self.hostLastContactTime[hostId] < (self.__now() - self.allowDecayed)):
if True:
host.decayed = True
self.log.debug('Fetching state from host %s because it is decayed' % (host.name))
myInstancesThisHost = [i for i in myInstances.values() if i.hostId == host.id]
# get a list of VMs running on host
try:
hostProxy = self.proxy[host.name]
remoteInstances = [hostProxy.getVmInfo(vmId) for vmId in hostProxy.listVms()]
except:
self.log.warning('Failure getting instances from host %s' % (host.name))
self.data.releaseHost(host)
continue
# register instances I don't know about
for instance in remoteInstances:
if (instance.id not in myInstances):
instance.hostId = host.id
instance = self.data.registerInstance(instance)
self.data.releaseInstance(instance)
remoteInstanceIds = [i.id for i in remoteInstances]
# remove instances that shouldn't be running
for instance in myInstancesThisHost:
if (instance.id not in remoteInstanceIds):
# XXXstroucki before 20110902 excepted here with host lock
try:
instance = self.data.acquireInstance(instance.id)
except:
continue
# XXXstroucki destroy?
try:
del self.instanceLastContactTime[instance.id]
except:
pass
self.data.removeInstance(instance)
self.hostLastContactTime[hostId] = self.__now()
host.decayed = False
self.data.releaseHost(host)
#self.log.warning("iterate %d done" % hostId)
# iterate through all VMs I believe are active
for instanceId in self.instanceLastContactTime.keys():
# XXXstroucki should lock instance here?
try:
lastContactTime = self.instanceLastContactTime[instanceId]
except KeyError:
continue
if (lastContactTime < (self.__now() - self.allowDecayed)):
try:
instance = self.data.acquireInstance(instanceId)
# Don't query non-running VMs. eg. if a VM
# is suspended, and has no host, then there's
# no one to ask
if instance.state != InstanceState.Running and \
instance.state != InstanceState.Activating and \
instance.state != InstanceState.Orphaned:
self.data.releaseInstance(instance)
continue
except:
continue
instance.decayed = True
self.log.debug('Fetching state on instance %s because it is decayed' % (instance.name))
if instance.hostId is None: raise AssertionError
# XXXstroucki check if host is down?
host = self.data.getHost(instance.hostId)
# get updated state on VM
try:
hostProxy = self.proxy[host.name]
newInstance = hostProxy.getVmInfo(instance.vmId)
except:
self.log.warning('Failure getting data for instance %s from host %s' % (instance.name, host.name))
self.data.releaseInstance(instance)
continue
# replace existing state with new state
# XXXstroucki more?
instance.state = newInstance.state
self.instanceLastContactTime[instanceId] = self.__now()
instance.decayed = False
self.data.releaseInstance(instance)
def normalize(self, instance):
instance.id = None
instance.vmId = None
instance.hostId = None
instance.decayed = False
instance.name = scrubString(instance.name, allowed="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-.")
instance.state = InstanceState.Pending
# XXXstroucki At some point, check userId
if (not self.allowDuplicateNames):
for i in self.data.getInstances().itervalues():
if (i.name == instance.name):
raise TashiException(d={'errno':Errors.InvalidInstance,'msg':"The name %s is already in use" % (instance.name)})
if (instance.cores < 1):
raise TashiException(d={'errno':Errors.InvalidInstance,'msg':"Number of cores must be >= 1"})
if (instance.cores > self.maxCores):
raise TashiException(d={'errno':Errors.InvalidInstance,'msg':"Number of cores must be <= %d" % (self.maxCores)})
if (instance.memory < 1):
raise TashiException(d={'errno':Errors.InvalidInstance,'msg':"Amount of memory must be >= 1"})
if (instance.memory > self.maxMemory):
raise TashiException(d={'errno':Errors.InvalidInstance,'msg':"Amount of memory must be <= %d" % (self.maxMemory)})
# Make sure disk spec is valid
# Make sure network spec is valid
# Ignore internal hints
for hint in instance.hints:
if (hint.startswith("__")):
del instance.hints[hint]
return instance
def createVm(self, instance):
"""Function to add a VM to the list of pending VMs"""
# XXXstroucki: check for exception here
instance = self.normalize(instance)
instance = self.data.registerInstance(instance)
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM REQUEST", instance=instance)
return instance
def shutdownVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
self.__stateTransition(instance, None, InstanceState.ShuttingDown)
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM SHUTDOWN", instance=instance)
hostname = self.data.getHost(instance.hostId).name
try:
self.proxy[hostname].shutdownVm(instance.vmId)
except Exception:
self.log.exception('shutdownVm failed for host %s vmId %d' % (instance.name, instance.vmId))
raise
return
def destroyVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
if (instance.state is InstanceState.Pending or instance.state is InstanceState.Held):
self.__ACCOUNT("CM VM DESTROY UNSTARTED", instance=instance)
self.data.removeInstance(instance)
elif (instance.state is InstanceState.Activating):
self.__ACCOUNT("CM VM DESTROY STARTING", instance=instance)
self.__stateTransition(instance, None, InstanceState.Destroying)
self.data.releaseInstance(instance)
else:
# XXXstroucki: This is a problem with keeping
# clean state.
self.__ACCOUNT("CM VM DESTROY", instance=instance)
self.__stateTransition(instance, None, InstanceState.Destroying)
if instance.hostId is None:
self.data.removeInstance(instance)
else:
hostname = self.data.getHost(instance.hostId).name
try:
if hostname is not None:
self.proxy[hostname].destroyVm(instance.vmId)
self.data.releaseInstance(instance)
except:
self.log.warning('destroyVm failed on host %s vmId %s' % (hostname, str(instance.vmId)))
self.data.removeInstance(instance)
return
def suspendVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
try:
self.__stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
except TashiException:
self.data.releaseInstance(instance)
raise
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM SUSPEND", instance=instance)
hostname = self.data.getHost(instance.hostId).name
destination = "suspend/%d_%s" % (instance.id, instance.name)
try:
self.proxy[hostname].suspendVm(instance.vmId, destination)
except:
self.log.exception('suspendVm failed for host %s vmId %d' % (hostname, instance.vmId))
raise TashiException(d={'errno':Errors.UnableToSuspend, 'msg':'Failed to suspend %s' % (instance.name)})
return
def resumeVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
try:
self.__stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
except TashiException:
self.data.releaseInstance(instance)
raise
source = "suspend/%d_%s" % (instance.id, instance.name)
instance.hints['__resume_source'] = source
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM RESUME", instance=instance)
return instance
def migrateVm(self, instanceId, targetHostId):
instance = self.data.acquireInstance(instanceId)
self.__ACCOUNT("CM VM MIGRATE", instance=instance)
try:
# FIXME: should these be acquire/release host?
targetHost = self.data.getHost(targetHostId)
sourceHost = self.data.getHost(instance.hostId)
# FIXME: Are these the correct state transitions?
except:
self.data.releaseInstance(instance)
raise
try:
self.__stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
except TashiException:
self.data.releaseInstance(instance)
raise
self.data.releaseInstance(instance)
try:
# Prepare the target
self.log.info("migrateVm: Calling prepSourceVm on source host %s" % sourceHost.name)
self.proxy[sourceHost.name].prepSourceVm(instance.vmId)
self.log.info("migrateVm: Calling prepReceiveVm on target host %s" % targetHost.name)
cookie = self.proxy[targetHost.name].prepReceiveVm(instance, sourceHost)
except Exception:
self.log.exception('prepReceiveVm failed')
raise
instance = self.data.acquireInstance(instance.id)
try:
self.__stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
except TashiException:
self.data.releaseInstance(instance)
raise
self.data.releaseInstance(instance)
try:
# Send the VM
self.proxy[sourceHost.name].migrateVm(instance.vmId, targetHost, cookie)
except Exception:
self.log.exception('migrateVm failed')
raise
try:
instance = self.data.acquireInstance(instance.id)
instance.hostId = targetHost.id
finally:
self.data.releaseInstance(instance)
try:
# Notify the target
vmId = self.proxy[targetHost.name].receiveVm(instance, cookie)
except Exception:
self.log.exception('receiveVm failed')
raise
return
def pauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
try:
self.__stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
except TashiException:
self.data.releaseInstance(instance)
raise
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM PAUSE", instance=instance)
hostname = self.data.getHost(instance.hostId).name
try:
self.proxy[hostname].pauseVm(instance.vmId)
except Exception:
self.log.exception('pauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
raise
instance = self.data.acquireInstance(instanceId)
try:
self.__stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
except TashiException:
self.data.releaseInstance(instance)
raise
self.data.releaseInstance(instance)
return
def unpauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
try:
self.__stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
except TashiException:
self.data.releaseInstance(instance)
raise
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM UNPAUSE", instance=instance)
hostname = self.data.getHost(instance.hostId).name
try:
self.proxy[hostname].unpauseVm(instance.vmId)
except Exception:
self.log.exception('unpauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
raise
instance = self.data.acquireInstance(instanceId)
try:
self.__stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
except TashiException:
self.data.releaseInstance(instance)
raise
self.data.releaseInstance(instance)
return
def getHosts(self):
return self.data.getHosts().values()
def getNetworks(self):
return self.data.getNetworks().values()
def getUsers(self):
return self.data.getUsers().values()
def getInstances(self):
return self.data.getInstances().values()
def getImages(self):
return self.data.getImages()
def copyImage(self, src, dst):
imageSrc = self.dfs.getLocalHandle("images/" + src)
imageDst = self.dfs.getLocalHandle("images/" + dst)
try:
# Attempt to restrict to the image directory
if ".." not in imageSrc and ".." not in imageDst:
self.dfs.copy(imageSrc, imageDst)
self.log.info('DFS image copy: %s->%s' % (imageSrc, imageDst))
else:
self.log.warning('DFS image copy bad path: %s->%s' % (imageSrc, imageDst))
except Exception, e:
self.log.exception('DFS image copy failed: %s (%s->%s)' % (e, imageSrc, imageDst))
def vmmSpecificCall(self, instanceId, arg):
instance = self.data.getInstance(instanceId)
hostname = self.data.getHost(instance.hostId).name
self.__ACCOUNT("CM VM SPECIFIC CALL", instance=instance)
try:
res = self.proxy[hostname].vmmSpecificCall(instance.vmId, arg)
except Exception:
self.log.exception('vmmSpecificCall failed on host %s with vmId %d' % (hostname, instance.vmId))
raise
return res
# @timed
def registerNodeManager(self, host, instances):
"""Called by the NM every so often as a keep-alive/state polling -- state changes here are NOT AUTHORITATIVE"""
# Handle a new registration
if (host.id == None):
hostList = [h for h in self.data.getHosts().itervalues() if h.name == host.name]
if (len(hostList) != 1):
raise TashiException(d={'errno':Errors.NoSuchHost, 'msg':'A host with name %s is not identifiable' % (host.name)})
host.id = hostList[0].id
# Check if remote host information matches mine
oldHost = self.data.acquireHost(host.id)
if (oldHost.name != host.name):
self.data.releaseHost(oldHost)
raise TashiException(d={'errno':Errors.NoSuchHostId, 'msg':'Host id and hostname mismatch'})
if oldHost.up == False:
self.__upHost(oldHost)
self.hostLastContactTime[host.id] = self.__now()
oldHost.version = host.version
oldHost.memory = host.memory
oldHost.cores = host.cores
# compare whether CM / NM versions are compatible
if (host.version != version and not self.allowMismatchedVersions):
oldHost.state = HostState.VersionMismatch
if (host.version == version and oldHost.state == HostState.VersionMismatch):
oldHost.state = HostState.Normal
# let the host communicate what it is running
# and note that the information is not stale
for instance in instances:
self.instanceLastContactTime.setdefault(instance.id, 0)
self.data.releaseHost(oldHost)
return host.id
def vmUpdate(self, instanceId, instance, oldState):
try:
oldInstance = self.data.acquireInstance(instanceId)
except TashiException, e:
# shouldn't have a lock to clean up after here
if (e.errno == Errors.NoSuchInstanceId):
self.log.warning('Got vmUpdate for unknown instanceId %d' % (instanceId))
return
except:
self.log.exception("Could not acquire instance")
raise
self.instanceLastContactTime[instanceId] = self.__now()
oldInstance.decayed = False
self.__ACCOUNT("CM VM UPDATE", instance=oldInstance)
if (instance.state == InstanceState.Exited):
# determine why a VM has exited
hostname = self.data.getHost(oldInstance.hostId).name
if (oldInstance.state not in [InstanceState.ShuttingDown, InstanceState.Destroying, InstanceState.Suspending]):
self.log.warning('Unexpected exit on %s of instance %s (vmId %d)' % (hostname, oldInstance.name, oldInstance.vmId))
if (oldInstance.state == InstanceState.Suspending):
self.__stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
oldInstance.hostId = None
oldInstance.vmId = None
self.data.releaseInstance(oldInstance)
else:
del self.instanceLastContactTime[oldInstance.id]
self.data.removeInstance(oldInstance)
else:
if (instance.state):
# XXXstroucki does this matter?
if (oldState and oldInstance.state != oldState):
self.log.warning('Got vmUpdate of state from %s to %s, but the instance was previously %s' % (vmStates[oldState], vmStates[instance.state], vmStates[oldInstance.state]))
oldInstance.state = instance.state
if (instance.vmId):
oldInstance.vmId = instance.vmId
if (instance.hostId):
oldInstance.hostId = instance.hostId
if (instance.nics):
for nic in instance.nics:
if (nic.ip):
for oldNic in oldInstance.nics:
if (oldNic.mac == nic.mac):
oldNic.ip = nic.ip
self.data.releaseInstance(oldInstance)
return "success"
def activateVm(self, instanceId, host):
dataHost = self.data.acquireHost(host.id)
if (dataHost.name != host.name):
self.data.releaseHost(dataHost)
raise TashiException(d={'errno':Errors.HostNameMismatch,'msg':"Mismatched target host"})
if (not dataHost.up):
self.data.releaseHost(dataHost)
raise TashiException(d={'errno':Errors.HostNotUp,'msg':"Target host is not up"})
if (dataHost.state != HostState.Normal):
self.data.releaseHost(dataHost)
raise TashiException(d={'errno':Errors.HostStateError,'msg':"Target host state is not normal"})
self.data.releaseHost(dataHost)
instance = self.data.acquireInstance(instanceId)
self.__ACCOUNT("CM VM ACTIVATE", instance=instance)
if ('__resume_source' in instance.hints):
self.__stateTransition(instance, None, InstanceState.Resuming)
else:
# XXXstroucki should held VMs be continually tried? Or be explicitly set back to pending?
#self.__stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
self.__stateTransition(instance, None, InstanceState.Activating)
instance.hostId = host.id
self.data.releaseInstance(instance)
try:
if ('__resume_source' in instance.hints):
vmId = self.proxy[host.name].resumeVm(instance, instance.hints['__resume_source'])
else:
vmId = self.proxy[host.name].instantiateVm(instance)
except Exception:
instance = self.data.acquireInstance(instanceId)
if (instance.state is InstanceState.Destroying): # Special case for if destroyVm is called during initialization and initialization fails
self.data.removeInstance(instance)
else:
# XXXstroucki what can we do about pending hosts in the scheduler?
# put them at the end of the queue and keep trying?
self.__stateTransition(instance, None, InstanceState.Held)
instance.hostId = None
self.data.releaseInstance(instance)
return "failure"
instance = self.data.acquireInstance(instanceId)
instance.vmId = vmId
if (instance.state is InstanceState.Destroying): # Special case for if destroyVm is called during initialization
try:
self.proxy[host.name].destroyVm(vmId)
self.data.removeInstance(instance)
except Exception:
self.log.exception('destroyVm failed for host %s vmId %d' % (host.name, instance.vmId))
self.data.releaseInstance(instance)
return "failure"
else:
if ('__resume_source' not in instance.hints):
# XXXstroucki should we just wait for NM to update?
#self.__stateTransition(instance, InstanceState.Activating, InstanceState.Running)
pass
self.data.releaseInstance(instance)
return "success"
def registerHost(self, hostname, memory, cores, version):
hostId, alreadyRegistered = self.data.registerHost(hostname, memory, cores, version)
if alreadyRegistered:
self.log.info("Host %s is already registered, it was updated now" % hostname)
else:
self.log.info("A host was registered - hostname: %s, version: %s, memory: %s, cores: %s" % (hostname, version, memory, cores))
try:
host = self.data.getHost(hostId)
self.__ACCOUNT("CM HOST REGISTER", host=host)
except:
self.log.warning("Failed to lookup host %s" % hostId)
return hostId
def unregisterHost(self, hostId):
try:
host = self.data.getHost(hostId)
self.__ACCOUNT("CM HOST UNREGISTER", host=host)
except:
self.log.warning("Failed to lookup host %s" % hostId)
return
self.data.unregisterHost(hostId)
self.log.info("Host %s was unregistered" % hostId)
return
# service thread
def __monitorCluster(self):
while True:
sleepFor = min(self.expireHostTime, self.allowDecayed)
try:
self.__checkHosts()
self.__checkInstances()
except:
self.log.exception('monitorCluster iteration failed')
# XXXrgass too chatty. Remove
# XXXstroucki the risk is that a deadlock in obtaining
# data could prevent this loop from continuing.
#self.log.info("Sleeping for %d seconds" % sleepFor)
time.sleep(sleepFor)