blob: fdb35749b0089729f49e4ad0040c9a7bb20ca7f4 [file] [log] [blame]
#! /usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# XXXstroucki: wiki is a text based resource manager that maui can
# use. It also seems to have disappeared from the face of the web.
# This code is unmaintained.
# XXXstroucki former file mauipacket.py
#import subprocess
import time
import SocketServer
from tashi.utils import pseudoDes
from tashi.rpycservices.rpyctypes import HostState, InstanceState
class MauiPacket:
def __init__(self, key=0):
self.size = 0
self.char = '\n'
self.chksum = '0'*16
self.timestamp = int(time.time())
self.auth = ''
self.data = []
self.msg = ''
self.key=key
def readPacket(self, istream):
self.msg = ''
size = istream.read(8)
self.msg = self.msg+size
self.size = int(size)
self.char = istream.read(1)
self.msg = self.msg + self.char
packet = istream.read(self.size)
self.msg = self.msg + packet
packet = packet.split()
for i in range(len(packet)):
item = packet[i].split('=')
if item[0] == 'CK':
self.chksum = item[1]
if item[0] == 'TS':
self.timestamp = int(item[1])
if item[0] == 'AUTH':
self.auth = item[1]
if item[0] == 'DT':
self.data = packet[i:]
self.data=self.data[0].split('=',1)[1:] + self.data[1:]
def checksumMessage(self, message, key=None):
if key == None:
key = self.key
if type(key) == type(''):
key = int(key)
chksum = pseudoDes.generateKey(message, key)
chksum = '%016x' % chksum
return chksum
def getChecksum(self):
cs = self.msg.partition('TS=')
cs = cs[1]+cs[2]
chksum = self.checksumMessage(cs)
return chksum
def verifyChecksum(self):
chksum = self.getChecksum()
if chksum != self.chksum:
print 'verifyChecksum: "%s"\t"%s"'%(chksum, self.chksum)
print 'verifyChecksum (types): %s\t%s' %(type(chksum), type(self.chksum))
return False
return True
def set(self, data, auth=None, key=None, timestamp=None):
if timestamp==None:
timestamp = int(time.time())
self.data = data
if auth !=None:
self.auth = auth
if key != None:
self.key = key
self.timestamp=timestamp
self.fixup()
def fixup(self):
datastring = "TS=%i AUTH=%s DT=%s"%(self.timestamp, self.auth, (' '.join(self.data)))
self.chksum = self.checksumMessage(datastring)
pktstring = 'CK=%s %s'%(self.chksum, datastring)
self.size = len(pktstring)
def __str__(self):
datastring = "TS=%i AUTH=%s DT=%s"%(self.timestamp, self.auth, (' '.join(self.data)))
self.chksum = self.checksumMessage(datastring)
pktstring = 'CK=%s %s'%(self.chksum, datastring)
self.msg = ''
self.msg = self.msg + '%08i'%len(pktstring)
self.msg = self.msg + self.char
self.msg = self.msg + pktstring
return self.msg
def prettyString(self):
s = '''Maui Packet
-----------
size:\t\t%i
checksum:\t%s
timestamp:\t%s
auth:\t\t%s
data:
%s
-----------'''
s = s%(self.size, self.chksum, self.timestamp, self.auth, self.data)
return s
# XXXstroucki original file mauiwiki.py
import threading
import logging.config
from tashi.parallel import synchronizedmethod
from tashi.services.ttypes import *
from tashi.util import getConfig, createClient, instantiateImplementation
#from tashi.agents.mauipacket import MauiPacket
import tashi.util
def jobnameToId(jobname):
return int(jobname.split('.')[-1])
class InstanceHooks():
def __init__(self, config):
self.log = logging.getLogger(__file__)
self.hooks=[]
items = config.items("MauiWiki")
items.sort()
for item in items:
(name, value) = item
name = name.lower()
if (name.startswith("hook")):
try:
self.hooks.append(instantiateImplementation(value, config, client, transport, False))
except:
self.log.exception("Failed to load hook %s" % (value))
(self.client, self.transport) = createClient(config)
def preCreate(self, inst):
for hook in self.hooks:
hook.preCreate(inst)
def postDestroy(self, inst):
for hook in self.hooks:
hook.postDestroy(inst)
def idToInst(self, _id):
instances = self.client.getInstances()
print 'instances ', instances
insts = [i for i in instances if str(i.id)==str(_id)]
if len(insts) == 0:
raise "No instance with ID %s"%_id
if len(insts) > 1:
raise "Multiple instances with ID %s"%_id
inst = insts[0]
return inst
def destroyById(self, _id):
inst = self.idToInst(_id)
self.client.destroyVm(int(_id))
self.postDestroy(inst)
def activateById(self, _id, host):
inst = self.idToInst(_id)
self.preCreate(inst)
self.client.activateVm(int(_id), host)
def cmplists(a, b):
for i in range(len(a)):
if a[i] < b[i]:
return -1
if a[i] > b[i]:
return 1
return 0
class TashiConnection(threading.Thread):
def __init__(self, config, client, transport):
(self.client, self.transport) = createClient(config)
self.hosts={}
self.instances={}
self.users={}
self.config = config
self.ihooks = InstanceHooks(config)
self.log = logging.getLogger(__file__)
self.refreshTime = float(self.config.get('MauiWiki', 'refreshTime'))
self.defaultJobTime = str(self.config.get('MauiWiki', 'defaultJobTime'))
threading.Thread.__init__(self)
self.daemon = True
def run(self):
while True:
print 'TashiConnection:run updating hosts ...'
self.updateHosts()
print 'TashiConnection:run updating instances ...'
self.updateInstances()
print 'TashiConnection:run updating users ...'
self.updateUsers()
time.sleep(self.refreshTime)
def wikiHostState(self, host):
'''Returns a string representing the host state in a form compatable
with the maui-wiki protocol. This code simply chooses between
"Down" and "Unknown":
- Busy: Node is running some jobs and will not accept additional jobs
- Down: Resource Manager problems have been detected. Node is
incapable of running jobs.
- Draining: Node is responding but will not accept new jobs
- Idle: Node is ready to run jobs but currently is not running any.
Running: Node is running some jobs and will accept additional jobs
- Unknown: Node is capable of running jobs but the scheduler
will need to determine if the node state is actually Idle,
Running, or Busy.'''
if host.up == False or host.state == HostState.VersionMismatch:
return "Down"
if host.state == HostState.Drained:
return "Draining"
return "Unknown"
def wikiInstanceState(self, instance):
'''Returns a string representing the instance stat in a form compatable
with the maui-wiki protocol.
Completed: Job has completed
Hold: Job is in the queue but is not allowed to run
Idle: Job is ready to run
Removed: Job has been canceled or otherwise terminated externally
Running: Job is currently executing
Suspended: job has started but execution has temporarily been suspended'''
tashiToWiki = {InstanceState.Pending:'Idle',
InstanceState.Held:'Hold',
InstanceState.Exited:'Removed'}
if tashiToWiki.has_key(instance.state):
return tashiToWiki[instance.state]
else:
return 'Running'
# Host handling
def compareHosts(self, host1, host2):
def ii(host):
try:
state = tashi.util.hostStates[host.state]
except:
state = 'Unknown'
return [host.id, host.name, host.up, state, host.memory, host.cores]
return cmplists(ii(host1), ii(host2))
# @synchronizedmethod
def updateHost(self, host):
self.hosts[host.id] = host
self.hosts[host.id].updateTime = time.time()
# @synchronizedmethod
def addHost(self, host):
self.hosts[host.id] = host
self.hosts[host.id].updateTime = time.time()
# @synchronizedmethod
def removeHost(self, host):
self.hosts.pop(host.id)
# @synchronizedmethod
def updateHosts(self):
if (not self.transport.isOpen):
self.transport.open()
hosts = self.client.getHosts()
for host in hosts:
if not self.hosts.has_key(host.id):
self.addHost(host)
elif self.compareHosts(self.hosts[host.id], host) != 0:
self.updateHost(host)
hhosts = {}
for host in hosts:
hhosts[host.id] = host
for host in self.hosts.values():
if not hhosts.has_key(host.id):
self.removeHost(host)
# Instance handling
def compareInstances(self, instance1, instance2):
def ii(inst):
return [inst.id,
inst.vmId,
inst.hostId,
tashi.util.vmStates[inst.state],
inst.userId,
inst.name,
inst.cores,
inst.memory,
len(inst.disks), # FIXME: this isn't a good way to compare
len(inst.nics), # FIXME: this isn't a good way to compare
len(inst.hints)]
return cmplists(ii(instance1), ii(instance2))
return 0
@synchronizedmethod
def updateInstance(self, instance):
qt = self.instances[instance.id].queueTime
self.instances[instance.id] = instance
self.instances[instance.id].updateTime = time.time()
self.instances[instance.id].queueTime = qt
@synchronizedmethod
def addInstance(self, instance):
self.instances[instance.id] = instance
self.instances[instance.id].updateTime = time.time()
self.instances[instance.id].queueTime = time.time()
@synchronizedmethod
def removeInstance(self, instance):
self.instances[instance.id].state = InstanceState.Exited
self.ihooks.postDestroy(instance)
@synchronizedmethod
def updateInstances(self):
if (not self.transport.isOpen):
self.transport.open()
instances = self.client.getInstances()
for instance in instances:
print 'found instance', instance.id
if not self.instances.has_key(instance.id):
print "it's a new instance"
self.addInstance(instance)
elif self.compareInstances(self.instances[instance.id], instance) != 0:
self.updateInstance(instance)
iinsts = {}
for instance in instances:
iinsts[instance.id] = instance
for instance in self.instances.values():
if instance.state == InstanceState.Exited:
continue
if not iinsts.has_key(instance.id):
print 'removing instance ', instance.id
self.removeInstance(instance)
# User handling
def compareUsers(self, user1, user2):
if user1.id < user2.id:
return -1
elif user1.id > user2.id:
return 1
if user1.name < user2.name:
return -1
if user1.name > user2.name:
return 1
return 0
@synchronizedmethod
def updateUser(self, user):
self.users[user.id] = user
self.users[user.id].updatetime = time.time()
@synchronizedmethod
def addUser(self, user):
self.users[user.id] = user
self.users[user.id].updatetime = time.time()
@synchronizedmethod
def removeUser(self, user):
self.users.pop(user.id)
@synchronizedmethod
def updateUsers(self):
if (not self.transport.isOpen):
self.transport.open()
users = self.client.getUsers()
for user in users:
if not self.users.has_key(user.id):
self.addUser(user)
elif self.compareUsers(self.users[user.id], user) != 0:
self.updateUser(user)
uusers = {}
for user in users:
uusers[user.id] = user
for user in self.users.values():
if not uusers.has_key(user.id):
self.removeUser(user)
# Get data structures for maui
# Format is {id:{field:value}}
@synchronizedmethod
def getNodes(self, updatetime=0, nodelist=['ALL']):
if len(nodelist) == 0:
return {}
if nodelist[0]=='ALL':
nodes = [n for n in self.hosts.values() if n.updateTime >= updatetime]
else:
nodes = [n for n in self.hosts.values()
if n.updateTime >= updatetime and n.name in nodelist]
nl = {}
for node in nodes:
nl[node.name] = {'STATE':self.wikiHostState(node),
'UPDATETIME':str(int(node.updateTime)),
'CPROC':str(node.cores),
'CMEMORY':str(node.memory)}
return nl
@synchronizedmethod
def getJobs(self, updatetime=0, joblist=['ALL']):
if len(joblist) == 0:
return {}
if joblist[0] == 'ALL':
jobs = [j for j in self.instances.values() if j.updateTime >= updatetime]
else:
jobs = [j for j in self.instances.values()
if j.updateTime >= updatetime and j.id in joblist]
jl = {}
for job in jobs:
_id = "%s.%i"%(job.name, job.id)
jl[_id] = {'STATE':self.wikiInstanceState(job),
'UNAME':self.users[job.userId].name,
'GNAME':self.users[job.userId].name,
'UPDATETIME':int(job.updateTime),
'QUEUETIME':job.queueTime,
'TASKS':'1',
'DPROCS':str(job.cores),
'DMEM':str(job.memory),
'RMEM':str(job.memory),
'WCLIMIT':str(self.defaultJobTime)}
if job.hostId != None:
jl[_id]['TASKLIST'] = self.hosts[job.hostId].name
return jl
@synchronizedmethod
def activateById(self, _id, host):
if not self.instances.has_key(_id):
raise "no such instance"
self.ihooks.activateById(_id, host)
self.instances[_id].state=InstanceState.Activating
class MauiListener(SocketServer.StreamRequestHandler):
def setup(self):
global config
self.log = logging.getLogger(__file__)
SocketServer.StreamRequestHandler.setup(self)
self.ihooks = InstanceHooks(config)
(self.client, self.transport) = createClient(config)
self.tashiconnection=tashiconnection
self.auth = config.get('MauiWiki', 'authuser')
self.key = config.get('MauiWiki', 'authkey')
def handle(self):
p = MauiPacket(key=self.key)
self.istream = self.ostream = self.rfile
p.readPacket(self.istream)
self.processPacket(p)
def processGetNodes(self, p):
arg = p.data[1]
arg = arg.split('=')
arg = arg[1].split(':')
updatetime = int(arg[0])
nodelist = arg[1:]
print 'got GETNODES packet "%s" "%s"'%(updatetime, nodelist)
r = MauiPacket()
nodes = tashiconnection.getNodes(updatetime, nodelist)
numNodes = len(nodes)
dat = 'ARG=%i#'%numNodes
first = True
for node, attributes in nodes.iteritems():
if first:
dat = dat + '%s:'%node
first = False
else:
dat = dat + '#%s:'%node
attrs = ['%s=%s'%(a,v) for a,v in attributes.iteritems()]
dat = dat + ';'.join(attrs)+';'
r.set(['SC=0', dat], auth=self.auth, key=self.key)
return r
def processGetJobs(self, p):
arg = p.data[1]
arg = arg.split('=')
arg = arg[1].split(':')
updatetime = int(arg[0])
joblist = arg[1:]
print 'got GETJOBS packet "%s" "%s"'%(updatetime, joblist)
r = MauiPacket();
jobs = tashiconnection.getJobs(updatetime, joblist)
numJobs = len(jobs)
dat = 'ARG=%i#'%numJobs
first = True
for job, attributes in jobs.iteritems():
if first:
dat = dat +'%s:'%job
first = False
else:
dat = dat +'#%s:'%job
# FIXME: support limits
attributes['WCLIMIT']=str(10000)
attrs = ['%s=%s'%(a,v) for a,v in attributes.iteritems()]
dat = dat + ';'.join(attrs) + ';'
r.set(['SC=0', dat], auth=self.auth, key=self.key)
return r
def processStartJob(self, p):
job = p.data[1]
job = job.split('=')[1].strip()
job = job.split('.')[-1]
tasklist = p.data[2].split('=')[1].split(':')
print 'STARTJOB ', job, tasklist
try:
hosts = self.client.getHosts()
print 'hosts ', hosts
host = [h for h in hosts if h.name == tasklist[0]][0]
self.tashiconnection.activateById(jobnameToId(job), host)
print '\tactivated VM!'
r = MauiPacket()
r.set(['SC=0','RESPONSE=VM %s started on host %s'%(job, tasklist[0])])
return r
except Exception, e:
# FIXME: make this a real failure response
print 'Oh noes! ', e
r = MauiPacket()
r.set(['SC=-1', 'RESPONSE=%s'%str(e)])
return r
def processCancelJob(self, p):
job = p.data[1]
job = job.split('=')[1].strip()
print 'CANCELJOB ', job
try:
self.client.destroyVm(jobnameToId(job))
print '\tdestroyed VM!'
r = MauiPacket()
r.set(['SC=0', 'RESPONSE=VM %s destroyed'%job])
return r
except Exception, e:
# FIXME: make this a real failure response
print 'Oh noes! ', e
r = MauiPacket()
r.set(['SC=-1', 'RESPONSE=%s'%str(e)])
return r
def processPacket(self,p):
dat = p.data
if not p.verifyChecksum():
print p
print 'bad checksum'
return
r = None
if dat[0] == 'CMD=GETNODES':
r = self.processGetNodes(p)
elif dat[0] == 'CMD=GETJOBS':
r = self.processGetJobs(p)
elif dat[0] == 'CMD=STARTJOB':
r = self.processStartJob(p)
elif dat[0] == 'CMD=CANCELJOB':
r = self.processCancelJob(p)
else:
print 'got unknown packet'
print p.prettyString()
r = MauiPacket()
r.set(['SC=-810', 'RESPONSE=command not supported'])
print r.prettyString()
self.ostream.write(str(r))
self.ostream.flush()
self.ostream.close()
self.ostream.close()
self.istream.close()
if __name__ == '__main__':
(config, configFiles) = getConfig(["Agent"])
publisher = instantiateImplementation(config.get("Agent", "publisher"), config)
tashi.publisher = publisher
(client, transport) = createClient(config)
logging.config.fileConfig(configFiles)
tashiconnection = TashiConnection(config, client, transport)
tashiconnection.start()
HOST, PORT = '', 1717
server = SocketServer.TCPServer((HOST,PORT), MauiListener)
server.serve_forever()