blob: 6b48017e0dd1f91d87146aa9f2871d1580c14def [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import threading
# XXXstroucki getImages needs os?
import os
from tashi.rpycservices.rpyctypes import Errors, Network, Host, User, Instance, TashiException, LocalImages, DiskConfiguration, NetworkConfiguration
from tashi.clustermanager.data.datainterface import DataInterface
from tashi.util import stringPartition, boolean, instantiateImplementation, humanReadable
class SQL(DataInterface):
def __init__(self, config):
DataInterface.__init__(self, config)
self.uri = self.config.get("SQL", "uri")
self.log = logging.getLogger(__name__)
self.dfs = instantiateImplementation(config.get("ClusterManager", "dfs"), config)
if (self.uri.startswith("sqlite://")):
import sqlite
self.dbEngine = "sqlite"
self.conn = sqlite.connect(self.uri[9:], autocommit=1, timeout=1500)
elif (self.uri.startswith("mysql://")):
import MySQLdb
self.dbEngine = "mysql"
uri = self.uri[8:]
(user, _, hostdb) = stringPartition(uri, '@')
(host, _, db) = stringPartition(hostdb, '/')
self.password = self.config.get('SQL', 'password')
self.conn = MySQLdb.connect(host=host, user=user, passwd=self.password, db=db)
else:
raise TashiException, 'Unknown SQL database engine by URI: %s' % (self.uri)
self.instanceOrder = ['id', 'vmId', 'hostId', 'decayed', 'state', 'userId', 'name', 'cores', 'memory', 'disks', 'nics', 'hints']
self.hostOrder = ['id', 'name', 'up', 'decayed', 'state', 'memory', 'cores', 'version']
self.instanceLock = threading.Lock()
self.instanceIdLock = threading.Lock()
self.instanceLocks = {}
self.instanceBusy = {}
self.hostLock = threading.Lock()
self.hostLocks = {}
self.maxInstanceId = 1
self.idLock = threading.Lock()
self.sqlLock = threading.Lock()
self.verifyStructure()
def executeStatement(self, stmt):
self.sqlLock.acquire()
try:
cur = self.conn.cursor()
try:
cur.execute(stmt)
except:
self.log.exception('Exception executing SQL statement %s' % stmt)
finally:
self.sqlLock.release()
return cur
def getNewInstanceId(self):
self.instanceIdLock.acquire()
cur = self.executeStatement("SELECT MAX(id) FROM instances")
self.maxInstanceId = cur.fetchone()[0]
# XXXstroucki perhaps this can be handled nicer
if (self.maxInstanceId is None):
self.maxInstanceId = 0
self.maxInstanceId = self.maxInstanceId + 1
instanceId = self.maxInstanceId
self.instanceIdLock.release()
return instanceId
def verifyStructure(self):
self.executeStatement("CREATE TABLE IF NOT EXISTS instances (id int(11) NOT NULL, vmId int(11), hostId int(11), decayed tinyint(1) NOT NULL, state int(11) NOT NULL, userId int(11), name varchar(256), cores int(11) NOT NULL, memory int(11) NOT NULL, disks varchar(1024) NOT NULL, nics varchar(1024) NOT NULL, hints varchar(1024) NOT NULL)")
self.executeStatement("CREATE TABLE IF NOT EXISTS hosts (id INTEGER PRIMARY KEY, name varchar(256) NOT NULL, up tinyint(1) DEFAULT 0, decayed tinyint(1) DEFAULT 0, state int(11) DEFAULT 1, memory int(11), cores int(11), version varchar(256))")
self.executeStatement("CREATE TABLE IF NOT EXISTS networks (id int(11) NOT NULL, name varchar(256) NOT NULL)")
self.executeStatement("CREATE TABLE IF NOT EXISTS users (id int(11) NOT NULL, name varchar(256) NOT NULL, passwd varchar(256))")
def sanitizeForSql(self, s):
if (s == '"True"'):
return '"1"'
if (s == '"False"'):
return '"0"'
if (s == '"None"'):
return 'NULL'
return s
def makeInstanceList(self, i):
l = []
for e in range(0, len(self.instanceOrder)):
l.append(i.__dict__[self.instanceOrder[e]])
return map(lambda x: self.sanitizeForSql('"' + str(x) + '"'), l)
def makeListInstance(self, l):
i = Instance()
for e in range(0, len(self.instanceOrder)):
i.__dict__[self.instanceOrder[e]] = l[e]
i.state = int(i.state)
i.decayed = boolean(i.decayed)
i.disks = map(lambda x: DiskConfiguration(d=x), eval(i.disks))
i.nics = map(lambda x: NetworkConfiguration(d=x), eval(i.nics))
i.hints = eval(i.hints)
return i
def makeHostList(self, h):
l = []
for e in range(0, len(self.hostOrder)):
l.append(h.__dict__[self.hostOrder[e]])
return map(lambda x: self.sanitizeForSql('"' + str(x) + '"'), l)
def makeListHost(self, l):
h = Host()
for e in range(0, len(self.hostOrder)):
h.__dict__[self.hostOrder[e]] = l[e]
h.up = boolean(h.up)
h.decayed = boolean(h.decayed)
h.state = int(h.state)
return h
def registerInstance(self, instance):
if type(instance) is not Instance:
self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
raise TypeError
self.instanceLock.acquire()
try:
if (instance.id is not None and instance.id not in self.getInstances()):
self.instanceIdLock.acquire()
if (instance.id >= self.maxInstanceId):
self.maxInstanceId = instance.id + 1
self.instanceIdLock.release()
else:
instance.id = self.getNewInstanceId()
instance._lock = threading.Lock()
self.instanceLocks[instance.id] = instance._lock
instance._lock.acquire()
self.instanceBusy[instance.id] = True
l = self.makeInstanceList(instance)
self.executeStatement("INSERT INTO instances VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" % tuple(l))
finally:
self.instanceLock.release()
return instance
def acquireInstance(self, instanceId):
busyCheck = True
while busyCheck == True:
self.instanceLock.acquire()
busyCheck = self.instanceBusy.setdefault(instanceId, False)
if busyCheck:
self.instanceLock.release()
try:
cur = self.executeStatement("SELECT * from instances WHERE id = %d" % (instanceId))
l = cur.fetchone()
if (not l):
raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (instanceId)})
instance = self.makeListInstance(l)
self.instanceLocks[instance.id] = self.instanceLocks.get(instance.id, threading.Lock())
instance._lock = self.instanceLocks[instance.id]
instance._lock.acquire()
self.instanceBusy[instance.id] = True
finally:
self.instanceLock.release()
return instance
def releaseInstance(self, instance):
if type(instance) is not Instance:
self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
raise TypeError
self.instanceLock.acquire()
try:
l = self.makeInstanceList(instance)
s = ""
for e in range(0, len(self.instanceOrder)):
s = s + self.instanceOrder[e] + "=" + l[e]
if (e < len(self.instanceOrder)-1):
s = s + ", "
self.executeStatement("UPDATE instances SET %s WHERE id = %d" % (s, instance.id))
self.instanceBusy[instance.id] = False
instance._lock.release()
except:
self.log.exception("Excepted while holding lock")
raise
finally:
self.instanceLock.release()
def removeInstance(self, instance):
if type(instance) is not Instance:
self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
raise TypeError
self.instanceLock.acquire()
try:
self.executeStatement("DELETE FROM instances WHERE id = %d" % (instance.id))
#XXXstroucki extraneous instance won't have a lock
try:
instance._lock.release()
except:
pass
del self.instanceLocks[instance.id]
del self.instanceBusy[instance.id]
finally:
self.instanceLock.release()
def acquireHost(self, hostId):
if type(hostId) is not int:
self.log.exception("Argument is not of type int, but of type %s" % (type(hostId)))
raise TypeError
host = self.getHost(hostId)
self.hostLock.acquire()
self.hostLocks[host.id] = self.hostLocks.get(host.id, threading.Lock())
self.hostLock.release()
host._lock = self.hostLocks[host.id]
host._lock.acquire()
return host
def releaseHost(self, host):
if type(host) is not Host:
self.log.exception("Argument is not of type Host, but of type %s" % (type(host)))
raise TypeError
l = self.makeHostList(host)
s = ""
for e in range(0, len(self.hostOrder)):
s = s + self.hostOrder[e] + "=" + l[e]
if (e < len(self.hostOrder)-1):
s = s + ", "
self.executeStatement("UPDATE hosts SET %s WHERE id = %d" % (s, host.id))
host._lock.release()
def getHosts(self):
cur = self.executeStatement("SELECT * FROM hosts")
res = cur.fetchall()
hosts = {}
for r in res:
host = self.makeListHost(r)
hosts[host.id] = host
return hosts
def getHost(self, in_id):
try:
id = int(in_id)
except:
self.log.exception("Argument to getHost was not integer: %s" % in_id)
cur = self.executeStatement("SELECT * FROM hosts WHERE id = %d" % id)
r = cur.fetchone()
if (r == None):
raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (id)})
host = self.makeListHost(r)
return host
def getInstances(self):
cur = self.executeStatement("SELECT * FROM instances")
res = cur.fetchall()
instances = {}
for r in res:
instance = self.makeListInstance(r)
instances[instance.id] = instance
return instances
def getInstance(self, in_id):
try:
id = int(in_id)
except:
self.log.exception("Argument to getInstance was not integer: %s" % in_id)
cur = self.executeStatement("SELECT * FROM instances WHERE id = %d" % (id))
# XXXstroucki should only return one row.
# what about migration? should it be enforced?
r = cur.fetchone()
if (not r):
raise TashiException(d={'errno':Errors.NoSuchInstanceId, 'msg':"No such instanceId - %d" % (id)})
instance = self.makeListInstance(r)
return instance
def getNetworks(self):
cur = self.executeStatement("SELECT * FROM networks")
res = cur.fetchall()
networks = {}
for r in res:
network = Network(d={'id':r[0], 'name':r[1]})
networks[network.id] = network
return networks
def getNetwork(self, id):
cur = self.executeStatement("SELECT * FROM networks WHERE id = %d" % (id))
r = cur.fetchone()
network = Network(d={'id':r[0], 'name':r[1]})
return network
def getImages(self):
count = 0
myList = []
for i in self.dfs.list("images"):
myFile = self.dfs.getLocalHandle("images/" + i)
if os.path.isfile(myFile):
image = LocalImages(d={'id':count, 'imageName':i, 'imageSize':humanReadable(self.dfs.stat(myFile)[6])})
myList.append(image)
count += 1
return myList
def getUsers(self):
cur = self.executeStatement("SELECT * from users")
res = cur.fetchall()
users = {}
for r in res:
user = User(d={'id':r[0], 'name':r[1], 'passwd':r[2]})
users[user.id] = user
return users
def getUser(self, id):
cur = self.executeStatement("SELECT * FROM users WHERE id = %d" % (id))
r = cur.fetchone()
user = User(d={'id':r[0], 'name':r[1], 'passwd':r[2]})
return user
def registerHost(self, hostname, memory, cores, version):
self.hostLock.acquire()
cur = self.executeStatement("SELECT * from hosts")
res = cur.fetchall()
for r in res:
if r[1] == hostname:
id = r[0]
self.log.warning("Host %s already registered, update will be done" % id)
s = ""
host = Host(d={'id': id, 'up': 0, 'decayed': 0, 'state': 1, 'name': hostname, 'memory':memory, 'cores': cores, 'version':version})
l = self.makeHostList(host)
for e in range(0, len(self.hostOrder)):
s = s + self.hostOrder[e] + "=" + l[e]
if (e < len(self.hostOrder)-1):
s = s + ", "
self.executeStatement("UPDATE hosts SET %s WHERE id = %d" % (s, id))
self.hostLock.release()
return r[0], True
id = self.getNewId("hosts")
host = Host(d={'id': id, 'up': 0, 'decayed': 0, 'state': 1, 'name': hostname, 'memory':memory, 'cores': cores, 'version':version})
l = self.makeHostList(host)
self.executeStatement("INSERT INTO hosts VALUES (%s, %s, %s, %s, %s, %s, %s, %s)" % tuple(l))
self.hostLock.release()
return id, False
def unregisterHost(self, hostId):
self.hostLock.acquire()
cur = self.executeStatement("SELECT * from hosts")
res = cur.fetchall()
for r in res:
if r[0] == hostId:
self.executeStatement("DELETE FROM hosts WHERE id = %d" % hostId)
self.hostLock.release()
def getNewId(self, table):
""" Generates id for a new object. For example for hosts and users.
"""
self.idLock.acquire()
cur = self.executeStatement("SELECT * from %s" % table)
res = cur.fetchall()
maxId = 0 # the first id would be 1
l = []
for r in res:
id = r[0]
l.append(id)
if id >= maxId:
maxId = id
l.sort() # sort to enable comparing with range output
# check if some id is released:
t = range(maxId + 1)
t.remove(0)
if l != t and l != []:
releasedIds = filter(lambda x : x not in l, t)
self.idLock.release()
return releasedIds[0]
else:
self.idLock.release()
return maxId + 1