blob: 14a23bd8b7fdef14b5d151a76558b725abc08f1f [file] [log] [blame]
#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import time
import logging.config
import sys
from tashi.rpycservices.rpyctypes import Errors, HostState, InstanceState, TashiException
from tashi.util import createClient, instantiateImplementation, boolean
from tashi.utils.config import Config
import tashi
class Primitive(object):
def __init__(self, config):
self.config = config
self.cm = createClient(config)
self.hooks = []
self.log = logging.getLogger(__file__)
self.scheduleDelay = float(self.config.get("Primitive", "scheduleDelay"))
self.densePack = boolean(self.config.get("Primitive", "densePack"))
items = self.config.items("Primitive")
items.sort()
for item in items:
(name, value) = item
name = name.lower()
if (name.startswith("hook")):
try:
self.hooks.append(instantiateImplementation(value, config, self.cm, False))
except:
self.log.exception("Failed to load hook %s" % (value))
self.hosts = {}
self.load = {}
self.instances = {}
self.muffle = {}
self.lastScheduledHost = 0
self.clearHints = {}
def __getState(self):
# Generate a list of hosts and
# current loading of VMs per host
hosts = {}
# load's keys are the host id, or None if not on a host. values are instance ids
load = {}
ctr = 0
for h in self.cm.getHosts():
#XXXstroucki get all hosts here?
#if (__isReady(h)):
hosts[ctr] = h
ctr = ctr + 1
load[h.id] = []
load[None] = []
_instances = self.cm.getInstances()
instances = {}
for i in _instances:
instances[i.id] = i
# XXXstroucki put held machines behind pending ones
heldInstances = []
for i in instances.itervalues():
# Nonrunning VMs will have hostId of None, but
# so will Suspended VMs.
if (i.hostId or i.state == InstanceState.Pending):
load[i.hostId] = load[i.hostId] + [i.id]
elif (i.hostId is None and i.state == InstanceState.Held):
heldInstances = heldInstances + [i.id]
load[None] = load[None] + heldInstances
self.hosts = hosts
self.load = load
self.instances = instances
def __checkCapacity(self, host, inst):
# ensure host can carry new load
memUsage = reduce(lambda x, y: x + self.instances[y].memory, self.load[host.id], inst.memory)
coreUsage = reduce(lambda x, y: x + self.instances[y].cores, self.load[host.id], inst.cores)
if (memUsage <= host.memory and coreUsage <= host.cores):
return True
return False
def __clearHints(self, hint, name):
# remove the clearHint if the host comes back to normal mode
if name in self.clearHints[hint]:
popit = self.clearHints[hint].index(name)
self.clearHints[hint].pop(popit)
def __isReady(host):
if host.up == False or host.state != HostState.Normal:
return False
return True
def __scheduleInstance(self, inst):
try:
minMax = None
minMaxHost = None
minMaxCtr = None
densePack = inst.hints.get("densePack", None)
if (densePack is None):
densePack = self.densePack
else:
densePack = boolean(densePack)
# Grab the targetHost config options if passed
targetHost = inst.hints.get("targetHost", None)
# Check to see if we have already handled this hint
clearHints = self.clearHints
clearHints["targetHost"] = clearHints.get("targetHost", [])
# If we handled the hint, don't look at it anymore
if targetHost in clearHints["targetHost"]:
targetHost = None
try:
allowElsewhere = boolean(inst.hints.get("allowElsewhere", "False"))
except Exception, e:
allowElsewhere = False
# has a host preference been expressed?
if (targetHost != None):
for h in self.hosts.values():
if (__isReady(h)):
self.__clearHints("targetHost", h.name)
# if this is not the host we are looking for, continue
if ((str(h.id) != targetHost and h.name != targetHost)):
continue
# we found the targetHost
# If a host machine is reserved, only allow if userid is in reserved list
if ((len(h.reserved) > 0) and inst.userId not in h.reserved):
# Machine is reserved and not available for userId.
# XXXstroucki: Should we log something here for analysis?
break
if self.__checkCapacity(h, inst):
minMax = len(self.load[h.id])
minMaxHost = h
# end targethost != none
# If we don't have a host yet, find one here
if ((targetHost == None or allowElsewhere) and minMaxHost == None):
# cycle list
# Adding this to catch if this gets set to None. Fix
if self.lastScheduledHost == None:
self.lastScheduledHost = 0
for ctr in range(self.lastScheduledHost, len(self.hosts)) + range(0, self.lastScheduledHost):
h = self.hosts[ctr]
# XXXstroucki if it's unavailable, find another machine
if (__isReady(h) == False):
continue
else:
# If the host is back to normal, get rid of the entry in clearHints
self.__clearHints("targetHost", h.name)
# if it's reserved, see if we can use it
if ((len(h.reserved) > 0) and inst.userId not in h.reserved):
# reserved for somebody else, so find another machine
continue
# implement dense packing policy:
# consider this host if
# minMax has not been modified or
# the number of vms here is greater than minmax if we're dense packing or
# the number of vms here is less than minmax if we're not dense packing
if (minMax is None or (densePack and len(self.load[h.id]) > minMax) or (not densePack and len(self.load[h.id]) < minMax)):
if self.__checkCapacity(h, inst):
minMax = len(self.load[h.id])
minMaxHost = h
minMaxCtr = ctr
# check that VM image isn't mounted persistent already
# Should set a status code to alert user
# Tried to update the state of the instance and set persistent=False but
# couldn't do it, should work until we find a better way to do this
if inst.disks[0].persistent == True:
count = 0
myDisk = inst.disks[0].uri
for i in self.cm.getInstances():
if myDisk == i.disks[0].uri and i.disks[0].persistent == True:
count += 1
if count > 1:
minMaxHost = None
if (minMaxHost):
# found a host
if (not inst.hints.get("__resume_source", None)):
# only run preCreate hooks if newly starting
for hook in self.hooks:
try:
hook.preCreate(inst)
except:
self.log.warning("Failed to run preCreate hook")
self.log.info("Scheduling instance %s (%d mem, %d cores, %d uid) on host %s" % (inst.name, inst.memory, inst.cores, inst.userId, minMaxHost.name))
rv = "fail"
try:
rv = self.cm.activateVm(inst.id, minMaxHost)
if rv == "success":
self.lastScheduledHost = minMaxCtr
self.load[minMaxHost.id] = self.load[minMaxHost.id] + [inst.id]
# get rid of its possible entry in muffle if VM is scheduled to a host
if (inst.name in self.muffle):
self.muffle.pop(inst.name)
else:
self.log.warning("Instance %s failed to activate on host %s" % (inst.name, minMaxHost.name))
except TashiException, e :
# If we try to activate the VM and get errno 10, host not in normal mode, add it to the list
# check for other errors later
if e.errno == Errors.HostStateError:
self.clearHints["targetHost"] = self.clearHints.get("targetHost", [])
self.clearHints["targetHost"].append(targetHost)
else:
# did not find a host
if (inst.name not in self.muffle):
self.log.info("Failed to find a suitable place to schedule %s" % (inst.name))
self.muffle[inst.name] = True
except Exception, e:
# XXXstroucki: how can we get here?
if (inst.name not in self.muffle):
self.log.exception("Failed to schedule or activate %s" % (inst.name))
self.muffle[inst.name] = True
def start(self):
oldInstances = {}
# XXXstroucki: scheduling races have been observed, where
# a vm is scheduled on a host that had not updated its
# capacity with the clustermanager, leading to overloaded
# hosts. I think the place to insure against this happening
# is in the nodemanager. This scheduler will keep an
# internal state of cluster loading, but that is best
# effort and will be refreshed from CM once the buffer
# of vms to be scheduled is exhausted.
while True:
try:
# XXXstroucki: to get a list of vms to be
# scheduled, it asks the CM for a full
# cluster state, and will look at those
# without a host.
self.__getState()
# Check for VMs that have exited and call
# postDestroy hook
for i in oldInstances:
# XXXstroucki what about paused and saved VMs?
# XXXstroucki: do we need to look at Held VMs here?
if (i not in self.instances and (oldInstances[i].state == InstanceState.Running or oldInstances[i].state == InstanceState.Destroying or oldInstances[i].state == InstanceState.ShuttingDown)):
self.log.info("VM exited: %s" % (oldInstances[i].name))
for hook in self.hooks:
hook.postDestroy(oldInstances[i])
oldInstances = self.instances
if (len(self.load.get(None, [])) > 0):
# Schedule VMs if they are waiting
# sort by id number (FIFO?)
self.load[None].sort()
for i in self.load[None]:
inst = self.instances[i]
self.__scheduleInstance(inst)
# end for unassigned vms
except TashiException:
self.log.exception("Tashi exception")
except Exception:
self.log.warning("Scheduler iteration failed")
# wait to do the next iteration
time.sleep(self.scheduleDelay)
def main():
config = Config(["Agent"])
configFiles = config.getFiles()
publisher = instantiateImplementation(config.get("Agent", "publisher"), config)
tashi.publisher = publisher
logging.config.fileConfig(configFiles)
agent = Primitive(config)
try:
agent.start()
except KeyboardInterrupt:
pass
log = logging.getLogger(__file__)
log.info("Primitive exiting")
sys.exit(0)
if __name__ == "__main__":
main()