blob: 0494c694afd18ad5324f1bc3e869fda78a297f8e [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import threading
import time
from tashi.rpycservices import rpycservices
from tashi.rpycservices.rpyctypes import Errors, InstanceState, Instance, HostState, TashiException
from tashi import boolean, ConnectionManager, vmStates, version, scrubString
class ClusterManagerService(object):
"""RPC service for the ClusterManager"""
def __init__(self, config, data, dfs):
self.config = config
self.data = data
self.authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
if self.authAndEncrypt:
self.username = config.get('AccessNodeManager', 'username')
self.password = config.get('AccessNodeManager', 'password')
else:
self.username = None
self.password = None
self.proxy = ConnectionManager(self.username, self.password, int(self.config.get('ClusterManager', 'nodeManagerPort')), authAndEncrypt=self.authAndEncrypt)
self.dfs = dfs
self.convertExceptions = boolean(config.get('ClusterManagerService', 'convertExceptions'))
self.log = logging.getLogger(__name__)
self.log.setLevel(logging.ERROR)
self.hostLastContactTime = {}
#self.hostLastUpdateTime = {}
self.instanceLastContactTime = {}
self.expireHostTime = float(self.config.get('ClusterManagerService', 'expireHostTime'))
self.allowDecayed = float(self.config.get('ClusterManagerService', 'allowDecayed'))
self.allowMismatchedVersions = boolean(self.config.get('ClusterManagerService', 'allowMismatchedVersions'))
self.maxMemory = int(self.config.get('ClusterManagerService', 'maxMemory'))
self.maxCores = int(self.config.get('ClusterManagerService', 'maxCores'))
self.defaultNetwork = self.config.getint('ClusterManagerService', 'defaultNetwork', 0)
self.allowDuplicateNames = boolean(self.config.get('ClusterManagerService', 'allowDuplicateNames'))
self.accountingHost = None
self.accountingPort = None
try:
self.accountingHost = self.config.get('ClusterManagerService', 'accountingHost')
self.accountingPort = self.config.getint('ClusterManagerService', 'accountingPort')
except:
pass
self.__initAccounting()
self.__initCluster()
threading.Thread(name="monitorCluster", target=self.__monitorCluster).start()
def __initAccounting(self):
self.accountBuffer = []
self.accountLines = 0
self.accountingClient = None
try:
if (self.accountingHost is not None) and \
(self.accountingPort is not None):
self.accountingClient = ConnectionManager(self.username, self.password, self.accountingPort)[self.accountingHost]
except:
self.log.exception("Could not init accounting")
def __initCluster(self):
# initialize state of VMs if restarting
for instance in self.data.getInstances().itervalues():
instanceId = instance.id
instance = self.data.acquireInstance(instanceId)
instance.decayed = False
if instance.hostId is None:
self.__stateTransition(instance, None, InstanceState.Pending)
else:
self.__stateTransition(instance, None, InstanceState.Orphaned)
self.data.releaseInstance(instance)
# initialize state of hosts if restarting
for host in self.data.getHosts().itervalues():
hostId = host.id
host = self.data.acquireHost(hostId)
host.up = False
host.decayed = False
self.data.releaseHost(host)
def __ACCOUNTFLUSH(self):
try:
if (self.accountingClient is not None):
self.accountingClient.record(self.accountBuffer)
self.accountLines = 0
self.accountBuffer = []
except:
self.log.exception("Failed to flush accounting data")
def __ACCOUNT(self, text, instance=None, host=None):
now = self.__now()
instanceText = None
hostText = None
if instance is not None:
try:
instanceText = 'Instance(%s)' % (instance)
except:
self.log.exception("Invalid instance data")
if host is not None:
try:
hostText = "Host(%s)" % (host)
except:
self.log.exception("Invalid host data")
secondary = ','.join(filter(None, (hostText, instanceText)))
line = "%s|%s|%s" % (now, text, secondary)
self.accountBuffer.append(line)
self.accountLines += 1
# XXXstroucki think about autoflush by time
if (self.accountLines > 0):
self.__ACCOUNTFLUSH()
def __stateTransition(self, instance, old, cur):
if (old and instance.state != old):
raise TashiException(d={'errno':Errors.IncorrectVmState,'msg':"VmState is not %s - it is %s" % (vmStates[old], vmStates[instance.state])})
if (instance.state == cur):
# don't do anything if we're already at current state
return
instance.state = cur
# pass something down to the NM?
def __now(self):
return time.time()
def __downHost(self, host):
self.log.warning('Host %s is down' % (host.name))
host.up = False
host.decayed = False
self.__orphanInstances(host)
def __upHost(self, host):
self.log.warning('Host %s is up' % (host.name))
host.up = True
host.decayed = True
def __orphanInstances(self, host):
# expects lock to be held on host
instances = [instance.id for instance in self.data.getInstances().itervalues() if instance.hostId == host.id]
for instanceId in instances:
instance = self.data.acquireInstance(instanceId)
if instance.hostId == host.id:
instance.decayed = True
self.__stateTransition(instance, None, InstanceState.Orphaned)
self.data.releaseInstance(instance)
def __checkHosts(self):
# Check if hosts have been heard from recently
# Otherwise, see if it is alive
for hostId in self.hostLastContactTime.keys():
if (self.hostLastContactTime[hostId] < (self.__now() - self.expireHostTime)):
host = self.data.acquireHost(hostId)
string = None
try:
string = self.proxy[host.name].liveCheck()
except:
pass
if string != "alive":
self.__downHost(host)
del self.hostLastContactTime[hostId]
else:
self.__upHost(host)
self.hostLastContactTime[hostId] = self.__now()
self.data.releaseHost(host)
def __checkInstances(self):
# Reconcile instances with nodes
# obtain a list of instances I know about
myInstancesError = False
try:
myInstances = self.data.getInstances()
except:
myInstancesError = True
self.log.warning('Failure communicating with my database')
if myInstancesError == True:
return
# iterate through all hosts I believe are up
for hostId in self.hostLastContactTime.keys():
#self.log.warning("iterate %d" % hostId)
host = self.data.acquireHost(hostId)
# XXXstroucki: timing has changed with the message
# buffering in the NM, so this wasn't being run any-
# more because the time check was passing.
# I should think a bit more about this, but
# the "if True" is probably appropriate.
#if (self.hostLastContactTime[hostId] < (self.__now() - self.allowDecayed)):
if True:
host.decayed = True
self.log.debug('Fetching state from host %s because it is decayed' % (host.name))
myInstancesThisHost = [i for i in myInstances.values() if i.hostId == host.id]
# get a list of VMs running on host
try:
hostProxy = self.proxy[host.name]
remoteInstances = [self.__getVmInfo(host.name, vmId) for vmId in hostProxy.listVms()]
except:
self.log.warning('Failure getting instances from host %s' % (host.name))
self.data.releaseHost(host)
continue
# register instances I don't know about
for instance in remoteInstances:
if (instance.id not in myInstances):
if instance.state == InstanceState.Exited:
self.log.warning("%s telling me about exited instance %s, ignoring." % (host.name, instance.id))
continue
instance.hostId = host.id
instance = self.data.registerInstance(instance)
self.data.releaseInstance(instance)
remoteInstanceIds = [i.id for i in remoteInstances]
# remove instances that shouldn't be running
for instance in myInstancesThisHost:
if (instance.id not in remoteInstanceIds):
# XXXstroucki before 20110902 excepted here with host lock
try:
instance = self.data.acquireInstance(instance.id)
except:
continue
# XXXstroucki destroy?
try:
del self.instanceLastContactTime[instance.id]
except:
pass
self.data.removeInstance(instance)
self.hostLastContactTime[hostId] = self.__now()
host.decayed = False
self.data.releaseHost(host)
#self.log.warning("iterate %d done" % hostId)
# iterate through all VMs I believe are active
for instanceId in self.instanceLastContactTime.keys():
# XXXstroucki should lock instance here?
try:
lastContactTime = self.instanceLastContactTime[instanceId]
except KeyError:
continue
if (lastContactTime < (self.__now() - self.allowDecayed)):
try:
instance = self.data.acquireInstance(instanceId)
# Don't query non-running VMs. eg. if a VM
# is suspended, and has no host, then there's
# no one to ask
if instance.state not in [InstanceState.Running, InstanceState.Activating, InstanceState.Orphaned]:
self.data.releaseInstance(instance)
continue
except:
continue
instance.decayed = True
self.log.debug('Fetching state on instance %s because it is decayed' % (instance.name))
if instance.hostId is None: raise AssertionError
# XXXstroucki check if host is down?
host = self.data.getHost(instance.hostId)
# get updated state on VM
try:
newInstance = self.__getVmInfo(host.name, instance.vmId)
except:
self.log.warning('Failure getting data for instance %s from host %s' % (instance.name, host.name))
self.data.releaseInstance(instance)
continue
# update the information we have on the vm
before = instance.state
rv = self.__vmUpdate(instance, newInstance, None)
if (rv == "release"):
self.data.releaseInstance(instance)
if (rv == "remove"):
self.data.removeInstance(instance)
def __getVmInfo(self, host, vmid):
hostProxy = self.proxy[host]
rv = hostProxy.getVmInfo(vmid)
if isinstance(rv, Exception):
raise rv
if not isinstance(rv, Instance):
raise ValueError
return rv
def __normalize(self, instance):
instance.id = None
instance.vmId = None
instance.hostId = None
instance.decayed = False
instance.name = scrubString(instance.name, allowed="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-.")
instance.state = InstanceState.Pending
# XXXstroucki At some point, check userId
if (not self.allowDuplicateNames):
for i in self.data.getInstances().itervalues():
if (i.name == instance.name):
raise TashiException(d={'errno':Errors.InvalidInstance,'msg':"The name %s is already in use" % (instance.name)})
if (instance.cores < 1):
raise TashiException(d={'errno':Errors.InvalidInstance,'msg':"Number of cores must be >= 1"})
if (instance.cores > self.maxCores):
raise TashiException(d={'errno':Errors.InvalidInstance,'msg':"Number of cores must be <= %d" % (self.maxCores)})
if (instance.memory < 1):
raise TashiException(d={'errno':Errors.InvalidInstance,'msg':"Amount of memory must be >= 1"})
if (instance.memory > self.maxMemory):
raise TashiException(d={'errno':Errors.InvalidInstance,'msg':"Amount of memory must be <= %d" % (self.maxMemory)})
# Make sure disk spec is valid
# Make sure network spec is valid
# Ignore internal hints
for hint in instance.hints:
if (hint.startswith("__")):
del instance.hints[hint]
return instance
# extern
def createVm(self, instance):
"""Function to add a VM to the list of pending VMs"""
# XXXstroucki: check for exception here
instance = self.__normalize(instance)
instance = self.data.registerInstance(instance)
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM REQUEST", instance=instance)
return instance
# extern
def shutdownVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
self.__stateTransition(instance, None, InstanceState.ShuttingDown)
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM SHUTDOWN", instance=instance)
hostname = self.data.getHost(instance.hostId).name
try:
self.proxy[hostname].shutdownVm(instance.vmId)
except Exception:
self.log.exception('shutdownVm failed for host %s vmId %d' % (instance.name, instance.vmId))
raise
return
# extern
def destroyVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
if (instance.state is InstanceState.Pending or instance.state is InstanceState.Held):
self.__ACCOUNT("CM VM DESTROY UNSTARTED", instance=instance)
self.data.removeInstance(instance)
elif (instance.state is InstanceState.Activating):
self.__ACCOUNT("CM VM DESTROY STARTING", instance=instance)
self.__stateTransition(instance, None, InstanceState.Destroying)
self.data.releaseInstance(instance)
else:
# XXXstroucki: This is a problem with keeping
# clean state.
self.__ACCOUNT("CM VM DESTROY", instance=instance)
self.__stateTransition(instance, None, InstanceState.Destroying)
if instance.hostId is None:
self.data.removeInstance(instance)
else:
hostname = self.data.getHost(instance.hostId).name
try:
if hostname is not None:
self.proxy[hostname].destroyVm(instance.vmId)
self.data.releaseInstance(instance)
except:
self.log.warning('destroyVm failed on host %s vmId %s' % (hostname, str(instance.vmId)))
self.data.removeInstance(instance)
return
# extern
def suspendVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
try:
self.__stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
except TashiException:
self.data.releaseInstance(instance)
raise
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM SUSPEND", instance=instance)
hostname = self.data.getHost(instance.hostId).name
destination = "suspend/%d_%s" % (instance.id, instance.name)
try:
self.proxy[hostname].suspendVm(instance.vmId, destination)
except:
self.log.exception('suspendVm failed for host %s vmId %d' % (hostname, instance.vmId))
raise TashiException(d={'errno':Errors.UnableToSuspend, 'msg':'Failed to suspend %s' % (instance.name)})
return
# extern
def resumeVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
try:
self.__stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
except TashiException:
self.data.releaseInstance(instance)
raise
source = "suspend/%d_%s" % (instance.id, instance.name)
instance.hints['__resume_source'] = source
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM RESUME", instance=instance)
return instance
# extern
def migrateVm(self, instanceId, targetHostId):
instance = self.data.acquireInstance(instanceId)
self.__ACCOUNT("CM VM MIGRATE", instance=instance)
try:
# FIXME: should these be acquire/release host?
targetHost = self.data.getHost(targetHostId)
sourceHost = self.data.getHost(instance.hostId)
# FIXME: Are these the correct state transitions?
except:
self.data.releaseInstance(instance)
raise
try:
self.__stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
except TashiException:
self.data.releaseInstance(instance)
raise
self.data.releaseInstance(instance)
try:
# Prepare the target
self.log.info("migrateVm: Calling prepSourceVm on source host %s" % sourceHost.name)
self.proxy[sourceHost.name].prepSourceVm(instance.vmId)
self.log.info("migrateVm: Calling prepReceiveVm on target host %s" % targetHost.name)
cookie = self.proxy[targetHost.name].prepReceiveVm(instance, sourceHost)
except Exception:
self.log.exception('prepReceiveVm failed')
raise
instance = self.data.acquireInstance(instance.id)
try:
self.__stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
except TashiException:
self.data.releaseInstance(instance)
raise
self.data.releaseInstance(instance)
try:
# Send the VM
self.proxy[sourceHost.name].migrateVm(instance.vmId, targetHost, cookie)
except Exception:
self.log.exception('migrateVm failed')
raise
try:
instance = self.data.acquireInstance(instance.id)
instance.hostId = targetHost.id
finally:
self.data.releaseInstance(instance)
try:
# Notify the target
vmId = self.proxy[targetHost.name].receiveVm(instance, cookie)
except Exception:
self.log.exception('receiveVm failed')
raise
self.log.info("migrateVM finished")
return
# extern
def pauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
try:
self.__stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
except TashiException:
self.data.releaseInstance(instance)
raise
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM PAUSE", instance=instance)
hostname = self.data.getHost(instance.hostId).name
try:
self.proxy[hostname].pauseVm(instance.vmId)
except Exception:
self.log.exception('pauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
raise
instance = self.data.acquireInstance(instanceId)
try:
self.__stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
except TashiException:
self.data.releaseInstance(instance)
raise
self.data.releaseInstance(instance)
return
# extern
def unpauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
try:
self.__stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
except TashiException:
self.data.releaseInstance(instance)
raise
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM UNPAUSE", instance=instance)
hostname = self.data.getHost(instance.hostId).name
try:
self.proxy[hostname].unpauseVm(instance.vmId)
except Exception:
self.log.exception('unpauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
raise
instance = self.data.acquireInstance(instanceId)
try:
self.__stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
except TashiException:
self.data.releaseInstance(instance)
raise
self.data.releaseInstance(instance)
return
# extern
def getHosts(self):
return self.data.getHosts().values()
# extern
def getNetworks(self):
networks = self.data.getNetworks()
for network in networks:
if self.defaultNetwork == networks[network].id:
setattr(networks[network], "default", True)
return networks.values()
# extern
def getUsers(self):
return self.data.getUsers().values()
# extern
def getInstances(self):
return self.data.getInstances().values()
# extern
def getImages(self):
return self.data.getImages()
# extern
def copyImage(self, src, dst):
imageSrc = self.dfs.getLocalHandle("images/" + src)
imageDst = self.dfs.getLocalHandle("images/" + dst)
try:
# Attempt to restrict to the image directory
if ".." not in imageSrc and ".." not in imageDst:
self.dfs.copy(imageSrc, imageDst)
self.log.info('DFS image copy: %s->%s' % (imageSrc, imageDst))
else:
self.log.warning('DFS image copy bad path: %s->%s' % (imageSrc, imageDst))
except Exception, e:
self.log.exception('DFS image copy failed: %s (%s->%s)' % (e, imageSrc, imageDst))
# extern
def vmmSpecificCall(self, instanceId, arg):
instance = self.data.getInstance(instanceId)
hostname = self.data.getHost(instance.hostId).name
self.__ACCOUNT("CM VM SPECIFIC CALL", instance=instance)
try:
res = self.proxy[hostname].vmmSpecificCall(instance.vmId, arg)
except Exception:
self.log.exception('vmmSpecificCall failed on host %s with vmId %d' % (hostname, instance.vmId))
raise
return res
# extern
def registerNodeManager(self, host, instances):
"""Called by the NM every so often as a keep-alive/state polling -- state changes here are NOT AUTHORITATIVE"""
# Handle a new registration
if (host.id == None):
hostList = [h for h in self.data.getHosts().itervalues() if h.name == host.name]
if (len(hostList) != 1):
raise TashiException(d={'errno':Errors.NoSuchHost, 'msg':'A host with name %s is not identifiable' % (host.name)})
host.id = hostList[0].id
# Check if remote host information matches mine
oldHost = self.data.acquireHost(host.id)
if (oldHost.name != host.name):
self.data.releaseHost(oldHost)
raise TashiException(d={'errno':Errors.NoSuchHostId, 'msg':'Host id and hostname mismatch'})
if oldHost.up == False:
self.__upHost(oldHost)
self.hostLastContactTime[host.id] = self.__now()
oldHost.version = host.version
oldHost.memory = host.memory
oldHost.cores = host.cores
# compare whether CM / NM versions are compatible
if (host.version != version and not self.allowMismatchedVersions):
oldHost.state = HostState.VersionMismatch
if (host.version == version and oldHost.state == HostState.VersionMismatch):
oldHost.state = HostState.Normal
# let the host communicate what it is running
# and note that the information is not stale
for instance in instances:
if instance.state == InstanceState.Exited:
self.log.warning("%s reporting exited instance %s, ignoring." % (host.name, instance.id))
continue
self.instanceLastContactTime.setdefault(instance.id, 0)
self.data.releaseHost(oldHost)
return host.id
def __vmUpdate(self, oldInstance, instance, oldState):
# this function assumes a lock is held on the instance
# already, and will be released elsewhere
self.instanceLastContactTime[oldInstance.id] = self.__now()
oldInstance.decayed = False
if (instance.state == InstanceState.Exited):
# determine why a VM has exited
hostname = self.data.getHost(oldInstance.hostId).name
if (oldInstance.state not in [InstanceState.ShuttingDown, InstanceState.Destroying, InstanceState.Suspending]):
self.log.warning('Unexpected exit on %s of instance %s (vmId %d)' % (hostname, oldInstance.name, oldInstance.vmId))
if (oldInstance.state == InstanceState.Suspending):
self.__stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
oldInstance.hostId = None
oldInstance.vmId = None
return "release"
if (oldInstance.state == InstanceState.MigrateTrans):
# Just await update from target host
return "release"
else:
del self.instanceLastContactTime[oldInstance.id]
return "remove"
else:
if (instance.state):
# XXXstroucki does this matter?
if (oldState and oldInstance.state != oldState):
self.log.warning('Doing vmUpdate of state from %s to %s, but the instance was previously %s' % (vmStates[oldState], vmStates[instance.state], vmStates[oldInstance.state]))
oldInstance.state = instance.state
if (instance.vmId):
oldInstance.vmId = instance.vmId
if (instance.hostId):
oldInstance.hostId = instance.hostId
if (instance.nics):
for nic in instance.nics:
if (nic.ip):
for oldNic in oldInstance.nics:
if (oldNic.mac == nic.mac):
oldNic.ip = nic.ip
return "release"
return "success"
# extern
def vmUpdate(self, instanceId, instance, oldState):
try:
oldInstance = self.data.acquireInstance(instanceId)
except TashiException, e:
# shouldn't have a lock to clean up after here
if (e.errno == Errors.NoSuchInstanceId):
self.log.warning('Got vmUpdate for unknown instanceId %d' % (instanceId))
return
except:
self.log.exception("Could not acquire instance")
raise
import copy
displayInstance = copy.copy(oldInstance)
displayInstance.state = instance.state
self.__ACCOUNT("CM VM UPDATE", instance=displayInstance)
rv = self.__vmUpdate(oldInstance, instance, oldState)
if (rv == "release"):
self.data.releaseInstance(oldInstance)
if (rv == "remove"):
self.data.removeInstance(oldInstance)
return "success"
# extern
def activateVm(self, instanceId, host):
# XXXstroucki: check my idea of the host's capacity before
# trying.
dataHost = self.data.acquireHost(host.id)
if (dataHost.name != host.name):
self.data.releaseHost(dataHost)
raise TashiException(d={'errno':Errors.HostNameMismatch,'msg':"Mismatched target host"})
if (not dataHost.up):
self.data.releaseHost(dataHost)
raise TashiException(d={'errno':Errors.HostNotUp,'msg':"Target host is not up"})
if (dataHost.state != HostState.Normal):
self.data.releaseHost(dataHost)
raise TashiException(d={'errno':Errors.HostStateError,'msg':"Target host state is not normal"})
self.data.releaseHost(dataHost)
instance = self.data.acquireInstance(instanceId)
self.__ACCOUNT("CM VM ACTIVATE", instance=instance)
if ('__resume_source' in instance.hints):
self.__stateTransition(instance, None, InstanceState.Resuming)
else:
# XXXstroucki should held VMs be continually tried? Or be explicitly set back to pending?
#self.__stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
self.__stateTransition(instance, None, InstanceState.Activating)
instance.hostId = host.id
self.data.releaseInstance(instance)
try:
if ('__resume_source' in instance.hints):
vmId = self.proxy[host.name].resumeVm(instance, instance.hints['__resume_source'])
else:
vmId = self.proxy[host.name].instantiateVm(instance)
except Exception:
instance = self.data.acquireInstance(instanceId)
if (instance.state is InstanceState.Destroying): # Special case for if destroyVm is called during initialization and initialization fails
self.data.removeInstance(instance)
else:
# XXXstroucki what can we do about pending hosts in the scheduler?
# put them at the end of the queue and keep trying?
self.__stateTransition(instance, None, InstanceState.Held)
instance.hostId = None
self.data.releaseInstance(instance)
return "failure"
instance = self.data.acquireInstance(instanceId)
instance.vmId = vmId
if (instance.state is InstanceState.Destroying): # Special case for if destroyVm is called during initialization
try:
self.proxy[host.name].destroyVm(vmId)
self.data.removeInstance(instance)
except Exception:
self.log.exception('destroyVm failed for host %s vmId %d' % (host.name, instance.vmId))
self.data.releaseInstance(instance)
return "failure"
else:
if ('__resume_source' not in instance.hints):
# XXXstroucki should we just wait for NM to update?
#self.__stateTransition(instance, InstanceState.Activating, InstanceState.Running)
pass
self.data.releaseInstance(instance)
return "success"
# extern
def registerHost(self, hostname, memory, cores, version):
hostId, alreadyRegistered = self.data.registerHost(hostname, memory, cores, version)
if alreadyRegistered:
self.log.info("Host %s is already registered, it was updated now" % hostname)
else:
self.log.info("A host was registered - hostname: %s, version: %s, memory: %s, cores: %s" % (hostname, version, memory, cores))
try:
host = self.data.getHost(hostId)
self.__ACCOUNT("CM HOST REGISTER", host=host)
except:
self.log.warning("Failed to lookup host %s" % hostId)
return hostId
# extern
def unregisterHost(self, hostId):
try:
host = self.data.getHost(hostId)
self.__ACCOUNT("CM HOST UNREGISTER", host=host)
except:
self.log.warning("Failed to lookup host %s" % hostId)
return
self.data.unregisterHost(hostId)
self.log.info("Host %s was unregistered" % hostId)
return
# service thread
def __monitorCluster(self):
while True:
sleepFor = min(self.expireHostTime, self.allowDecayed)
try:
self.__checkHosts()
self.__checkInstances()
except:
self.log.exception('monitorCluster iteration failed')
# XXXrgass too chatty. Remove
# XXXstroucki the risk is that a deadlock in obtaining
# data could prevent this loop from continuing.
#self.log.info("Sleeping for %d seconds" % sleepFor)
time.sleep(sleepFor)