#!/usr/bin/python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# 
#   http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.    

import time
import logging.config
import sys

from tashi.rpycservices.rpyctypes import Errors, HostState, InstanceState, TashiException

from tashi.util import createClient, instantiateImplementation, boolean
from tashi.utils.config import Config
import tashi

class Primitive(object):
	def __init__(self, config):
		self.config = config
		self.cm = createClient(config)
		self.hooks = []
		self.log = logging.getLogger(__file__)
		self.scheduleDelay = float(self.config.get("Primitive", "scheduleDelay"))
		self.densePack = boolean(self.config.get("Primitive", "densePack"))
		items = self.config.items("Primitive")
		items.sort()
		for item in items:
			(name, value) = item
			name = name.lower()
			if (name.startswith("hook")):
				try:
					self.hooks.append(instantiateImplementation(value, config, self.cm, False))
				except:
					self.log.exception("Failed to load hook %s" % (value))
		self.hosts = {}
		self.load = {}
		self.instances = {}
		self.muffle = {}
		self.lastScheduledHost = 0
		self.clearHints = {}
					
					
	def __getState(self):
		# Generate a list of hosts and
		# current loading of VMs per host
		hosts = {}
		# load's keys are the host id, or None if not on a host. values are instance ids
		load = {}
		ctr = 0

		for h in self.cm.getHosts():
			#XXXstroucki get all hosts here?
			#if (h.up == True and h.state == HostState.Normal):
			hosts[ctr] = h
			ctr = ctr + 1
			load[h.id] = []
			
		load[None] = []
		_instances = self.cm.getInstances()
		instances = {}
		for i in _instances:
			instances[i.id] = i
			
		# XXXstroucki put held machines behind pending ones
		heldInstances = []
		for i in instances.itervalues():
			# Nonrunning VMs will have hostId of None, but
			# so will Suspended VMs.
			if (i.hostId or i.state == InstanceState.Pending):
				load[i.hostId] = load[i.hostId] + [i.id]
			elif (i.hostId is None and i.state == InstanceState.Held):
				heldInstances = heldInstances + [i.id]

		load[None] = load[None] + heldInstances

		self.hosts = hosts
		self.load = load
		self.instances = instances

	def __checkCapacity(self, host, inst):
		# ensure host can carry new load
		memUsage = reduce(lambda x, y: x + self.instances[y].memory, self.load[host.id], inst.memory)
		coreUsage = reduce(lambda x, y: x + self.instances[y].cores, self.load[host.id], inst.cores)
		if (memUsage <= host.memory and coreUsage <= host.cores):
			return True
		
		return False

	def __clearHints(self, hint, name):
		#  remove the clearHint if the host comes back to normal mode
		if name in self.clearHints[hint]:
			popit = self.clearHints[hint].index(name)
			self.clearHints[hint].pop(popit)
	
	def __scheduleInstance(self, inst):

		try:

			minMax = None
			minMaxHost = None
			minMaxCtr = None

			densePack = inst.hints.get("densePack", None)
			if (densePack is None):
				densePack = self.densePack
			else:
				densePack = boolean(densePack)
			
			#  Grab the targetHost config options if passed
			targetHost = inst.hints.get("targetHost", None)
			#  Check to see if we have already handled this hint
			clearHints = self.clearHints
			clearHints["targetHost"] = clearHints.get("targetHost", [])
			#  If we handled the hint, don't look at it anymore
			if targetHost in clearHints["targetHost"]:
				targetHost = None

			try:
				allowElsewhere = boolean(inst.hints.get("allowElsewhere", "False"))
			except Exception, e:
				allowElsewhere = False
			# has a host preference been expressed?
			if (targetHost != None):
				for h in self.hosts.values():
					if (h.state == HostState.Normal):
						self.__clearHints("targetHost", h.name)
					# if this is not the host we are looking for, continue
					if ((str(h.id) != targetHost and h.name != targetHost)):
						continue
					# we found the targetHost
					#  If a host machine is reserved, only allow if userid is in reserved list
					if ((len(h.reserved) > 0) and inst.userId not in h.reserved):
						# Machine is reserved and not available for userId.
						# XXXstroucki: Should we log something here for analysis?
						break
					if self.__checkCapacity(h, inst):
						minMax = len(self.load[h.id])
						minMaxHost = h

		
			# end targethost != none
		
		
			# If we don't have a host yet, find one here
			if ((targetHost == None or allowElsewhere) and minMaxHost == None):
				# cycle list
				#  Adding this to catch if this gets set to None.  Fix
				if self.lastScheduledHost == None:
					self.lastScheduledHost = 0
				for ctr in range(self.lastScheduledHost, len(self.hosts)) + range(0, self.lastScheduledHost):
					h = self.hosts[ctr]

					# XXXstroucki if it's down, find another machine
					if (h.up == False):
						continue

					#  If the host not in normal operating state, 
					#  find another machine
					if (h.state != HostState.Normal):
						continue
					else:
						#  If the host is back to normal, get rid of the entry in clearHints
						self.__clearHints("targetHost", h.name)
		
					# if it's reserved, see if we can use it
					if ((len(h.reserved) > 0) and inst.userId not in h.reserved):
						# reserved for somebody else, so find another machine
						continue
		
					# implement dense packing policy:
					# consider this host if
					# minMax has not been modified  or
					# the number of vms here is greater than minmax if we're dense packing or
					# the number of vms here is less than minmax if we're not dense packing
					if (minMax is None or (densePack and len(self.load[h.id]) > minMax) or (not densePack and len(self.load[h.id]) < minMax)):
						if self.__checkCapacity(h, inst):
							minMax = len(self.load[h.id])
							minMaxHost = h
							minMaxCtr = ctr

					#  check that VM image isn't mounted persistent already
					#  Should set a status code to alert user
					#  Tried to update the state of the instance and set persistent=False but 
					#  couldn't do it, should work until we find a better way to do this
					if inst.disks[0].persistent == True:
						count = 0
						myDisk = inst.disks[0].uri
						for i in self.cm.getInstances():
							if myDisk == i.disks[0].uri and i.disks[0].persistent == True:
								count += 1
						if count > 1:
							minMaxHost = None

			if (minMaxHost):
				# found a host
				if (not inst.hints.get("__resume_source", None)):
					# only run preCreate hooks if newly starting
					for hook in self.hooks:
						try:
							hook.preCreate(inst)
						except:
							self.log.warning("Failed to run preCreate hook")
				self.log.info("Scheduling instance %s (%d mem, %d cores, %d uid) on host %s" % (inst.name, inst.memory, inst.cores, inst.userId, minMaxHost.name))	
				rv = "fail"
				try:
					rv = self.cm.activateVm(inst.id, minMaxHost)
					if rv == "success":
						self.lastScheduledHost = minMaxCtr
						self.load[minMaxHost.id] = self.load[minMaxHost.id] + [inst.id]
						# get rid of its possible entry in muffle if VM is scheduled to a host
						if (inst.name in self.muffle):
							self.muffle.pop(inst.name)
					else:
						self.log.warning("Instance %s failed to activate on host %s" % (inst.name, minMaxHost.name))
				except TashiException, e :
					#  If we try to activate the VM and get errno 10, host not in normal mode, add it to the list
					#  check for other errors later
					if e.errno == Errors.HostStateError:
						self.clearHints["targetHost"] = self.clearHints.get("targetHost", [])
						self.clearHints["targetHost"].append(targetHost)

			else:
				# did not find a host
				if (inst.name not in self.muffle):
					self.log.info("Failed to find a suitable place to schedule %s" % (inst.name))
					self.muffle[inst.name] = True
						
		except Exception, e:
			# XXXstroucki: how can we get here?
			if (inst.name not in self.muffle):
				self.log.exception("Failed to schedule or activate %s" % (inst.name))
				self.muffle[inst.name] = True
									
	def start(self):
		oldInstances = {}

		# XXXstroucki: scheduling races have been observed, where
		# a vm is scheduled on a host that had not updated its
		# capacity with the clustermanager, leading to overloaded
		# hosts. I think the place to insure against this happening
		# is in the nodemanager. This scheduler will keep an
		# internal state of cluster loading, but that is best
		# effort and will be refreshed from CM once the buffer
		# of vms to be scheduled is exhausted.

		while True:
			try:
				# XXXstroucki: to get a list of vms to be
				# scheduled, it asks the CM for a full
				# cluster state, and will look at those
				# without a host.
				self.__getState()
				
				# Check for VMs that have exited and call
				# postDestroy hook
				for i in oldInstances:
					# XXXstroucki what about paused and saved VMs?
					# XXXstroucki: do we need to look at Held VMs here?
					if (i not in self.instances and (oldInstances[i].state == InstanceState.Running or oldInstances[i].state == InstanceState.Destroying or oldInstances[i].state == InstanceState.ShuttingDown)):
						self.log.info("VM exited: %s" % (oldInstances[i].name))
						for hook in self.hooks:
							hook.postDestroy(oldInstances[i])

				oldInstances = self.instances


				if (len(self.load.get(None, [])) > 0):
					# Schedule VMs if they are waiting

					# sort by id number (FIFO?)
					self.load[None].sort()
					for i in self.load[None]:
						inst = self.instances[i]
						self.__scheduleInstance(inst)
					# end for unassigned vms


			except TashiException:
				self.log.exception("Tashi exception")

			except Exception:
				self.log.warning("Scheduler iteration failed")


			# wait to do the next iteration
			time.sleep(self.scheduleDelay)

def main():
	config = Config(["Agent"])
	configFiles = config.getFiles()

	publisher = instantiateImplementation(config.get("Agent", "publisher"), config)
	tashi.publisher = publisher
	logging.config.fileConfig(configFiles)
	agent = Primitive(config)

	try:
		agent.start()
	except KeyboardInterrupt:
		pass

	log = logging.getLogger(__file__)
	log.info("Primitive exiting")
	sys.exit(0)

if __name__ == "__main__":
	main()
