blob: 5f47de476614d6b2bfc9665943fe3023e7f321a9 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import rpyc
from tashi.rpycservices.rpyctypes import Instance, Host, User
import cPickle
clusterManagerRPCs = ['createVm', 'shutdownVm', 'destroyVm', 'suspendVm', 'resumeVm', 'migrateVm', 'pauseVm', 'unpauseVm', 'getHosts', 'getNetworks', 'getUsers', 'getInstances', 'vmmSpecificCall', 'registerNodeManager', 'vmUpdate', 'activateVm', 'registerHost', 'unregisterHost', 'getImages', 'copyImage', 'cloneImage', 'rebaseImage', 'setHostState', 'setHostNotes', 'addReservation', 'delReservation', 'getReservation']
nodeManagerRPCs = ['instantiateVm', 'shutdownVm', 'destroyVm', 'suspendVm', 'resumeVm', 'prepReceiveVm', 'prepSourceVm', 'migrateVm', 'receiveVm', 'pauseVm', 'unpauseVm', 'getVmInfo', 'listVms', 'vmmSpecificCall', 'getHostInfo', 'liveCheck']
accountingRPCs = ['record']
def clean(args):
"""Cleans the object so cPickle can be used."""
if isinstance(args, list) or isinstance(args, tuple):
cleanArgs = []
for arg in args:
cleanArgs.append(clean(arg))
if isinstance(args, tuple):
cleanArgs = tuple(cleanArgs)
return cleanArgs
if isinstance(args, Instance):
return Instance(args.__dict__)
if isinstance(args, Host):
return Host(args.__dict__)
if isinstance(args, User):
user = User(args.__dict__)
user.passwd = None
return user
return args
class client:
def __init__(self, host, port, username=None, password=None):
"""Client for ManagerService. If username and password are provided, rpyc.tlslite_connect will be used to connect, else rpyc.connect will be used."""
self.host = host
self.port = int(port)
self.username = username
self.password = password
self.conn = self.createConn()
def createConn(self):
"""Creates a rpyc connection."""
if self.username != None and self.password != None:
return rpyc.tlslite_connect(host=self.host, port=self.port, username=self.username, password=self.password)
else:
return rpyc.connect(host=self.host, port=self.port)
def __getattr__(self, name):
"""Returns a function that makes the RPC call. No keyword arguments allowed when calling this function."""
if self.conn.closed == True:
self.conn = self.createConn()
if name not in clusterManagerRPCs and name not in nodeManagerRPCs and name not in accountingRPCs:
return None
def connectWrap(*args):
# XXXstroucki: why not talk directly, instead
# of using rpyc? We're already using pickle to move
# args.
args = cPickle.dumps(clean(args))
try:
res = getattr(self.conn.root, name)(args)
except Exception, e:
self.conn.close()
raise e
res = cPickle.loads(res)
if isinstance(res, Exception):
raise res
return res
return connectWrap
class ManagerService(rpyc.Service):
"""Wrapper for rpyc service"""
# Note: self.service and self._type are set before rpyc.utils.server.ThreadedServer is started.
def checkValidUser(self, functionName, clientUsername, args):
"""Checks whether the operation requested by the user is valid based on clientUsername. An exception will be thrown if not valid."""
if self._type == 'AccountingService':
return
if self._type == 'NodeManagerService':
return
if clientUsername in ['nodeManager', 'agent', 'root']:
return
if functionName in ['destroyVm', 'shutdownVm', 'pauseVm', 'vmmSpecificCall', 'suspendVm', 'unpauseVm', 'migrateVm', 'resumeVm']:
instanceId = args[0]
instance = self.service.data.getInstance(instanceId)
instanceUsername = self.service.data.getUser(instance.userId).name
if clientUsername != instanceUsername:
raise Exception('Permission Denied: %s cannot perform %s on VM owned by %s' % (clientUsername, functionName, instanceUsername))
return
def _rpyc_getattr(self, name):
"""Returns the RPC corresponding to the function call"""
def makeCall(args):
args = cPickle.loads(args)
if self._conn._config['credentials'] != None:
try:
self.checkValidUser(makeCall._name, self._conn._config['credentials'], args)
except Exception, e:
e = cPickle.dumps(clean(e))
return e
try:
res = getattr(self.service, makeCall._name)(*args)
except Exception, e:
res = e
res = cPickle.dumps(clean(res))
return res
makeCall._name = name
if self._type == 'ClusterManagerService' and name in clusterManagerRPCs:
return makeCall
if self._type == 'NodeManagerService' and name in nodeManagerRPCs:
return makeCall
if self._type == 'AccountingService' and name in accountingRPCs:
return makeCall
raise AttributeError('RPC does not exist')