Copy stable branch down to oldstable branch in preparation for new stable branch.
git-svn-id: https://svn.apache.org/repos/asf/incubator/tashi/branches/oldstable@1362316 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/INSTALL b/INSTALL
index 6240c35..17f4fc1 100644
--- a/INSTALL
+++ b/INSTALL
@@ -124,6 +124,7 @@
given by the hostname command. If you plan on eventually having several
hosts and networks, feel free to add them now.
+root@grml:/usr/local/tashi# cd bin
root@grml:/usr/local/tashi/bin# DEBUG=1 ./clustermanager
2012-01-26 23:12:33,972 [./clustermanager:INFO] Using configuration file(s) ['/usr/local/tashi/etc/TashiDefaults.cfg']
2012-01-26 23:12:33,972 [./clustermanager:INFO] Starting cluster manager
@@ -158,9 +159,8 @@
In [4]: data.baseDataObject.save()
-In [5]: import os
-
-In [6]: os.kill(os.getpid(), 9)
+In [5]: (^C)
+2012-03-07 20:00:00,456 [./bin/clustermanager:INFO] Exiting cluster manager after receiving a SIGINT signal
Run the cluster manager in the background:
root@grml:/usr/local/tashi/bin# ./clustermanager &
diff --git a/Makefile b/Makefile
index 947b14c..618050d 100644
--- a/Makefile
+++ b/Makefile
@@ -128,11 +128,11 @@
if test -e /usr/local/bin/zoni; then echo Removing zoni...; rm /usr/local/bin/zoni; fi
## for now only print warnings having to do with bad indentation. pylint doesn't make it easy to enable only 1,2 checks
-disabled_warnings=$(shell pylint --list-msgs|grep :W0| awk -F: '{ORS=","; if ($$2 != "W0311" && $$2 != "W0312"){ print $$2}}')
+disabled_warnings=$(shell pylint --list-msgs|grep :W0| awk -F: '{ORS=","; if ($$2 != "W0311" && $$2 != "W0312"){ print $$2}}')",F0401"
pysrc=$(shell find . \! -path '*gen-py*' \! -path '*services*' \! -path '*messagingthrift*' \! -name '__init__.py' -name "*.py")
tidy: $(addprefix tidyfile/,$(pysrc))
- @echo Insuring .py files are nice and tidy!
+ @echo Ensured .py files are nice and tidy!
tidyfile/%: %
@echo Checking tidy for $*
- pylint --report=no --disable-msg-cat=R,C,E --disable-msg=$(disabled_warnings) --indent-string="\t" $* 2> /dev/null;
+ pylint --report=no --disable=R,C,E --disable=$(disabled_warnings) --indent-string="\t" $* 2> /dev/null;
diff --git a/etc/NodeManager.cfg b/etc/NodeManager.cfg
index a47bccf..48f4044 100644
--- a/etc/NodeManager.cfg
+++ b/etc/NodeManager.cfg
@@ -80,7 +80,6 @@
statsInterval = 0.0
;accountingHost = clustermanager
;accountingPort = 2228
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[Security]
authAndEncrypt = False
diff --git a/etc/TashiDefaults.cfg b/etc/TashiDefaults.cfg
index 5e1a4d2..b7d32f3 100644
--- a/etc/TashiDefaults.cfg
+++ b/etc/TashiDefaults.cfg
@@ -57,7 +57,6 @@
allowDuplicateNames = False
;accountingHost = clustermanager
;accountingPort = 2228
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[GetentOverride]
baseData = tashi.clustermanager.data.Pickled
@@ -110,7 +109,6 @@
clusterManagerHost = localhost
clusterManagerPort = 9882
statsInterval = 0.0
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[Qemu]
qemuBin = /usr/bin/kvm
diff --git a/src/tashi/accounting/accounting.py b/src/tashi/accounting/accounting.py
index 93d2999..6856e49 100755
--- a/src/tashi/accounting/accounting.py
+++ b/src/tashi/accounting/accounting.py
@@ -17,6 +17,8 @@
# specific language governing permissions and limitations
# under the License.
+import os
+import time
import sys
import signal
import logging.config
@@ -26,13 +28,13 @@
#from rpyc.utils.authenticators import TlsliteVdbAuthenticator
#from tashi.rpycservices.rpyctypes import *
-from tashi.util import getConfig, createClient, instantiateImplementation, boolean, debugConsole, signalHandler
+from tashi.util import getConfig, createClient, instantiateImplementation, boolean, debugConsole
import tashi
class Accounting(object):
- def __init__(self, config, cmclient):
+ def __init__(self, config):
self.config = config
- self.cm = cmclient
+ self.cm = createClient(config)
self.hooks = []
self.log = logging.getLogger(__file__)
@@ -62,25 +64,43 @@
debugConsole(globals())
- try:
- t.start()
- except KeyboardInterrupt:
- self.handleSIGTERM(signal.SIGTERM, None)
-
- @signalHandler(signal.SIGTERM)
- def handleSIGTERM(self, signalNumber, stackFrame):
- self.log.info('Exiting cluster manager after receiving a SIGINT signal')
+ t.start()
+ # shouldn't exit by itself
sys.exit(0)
def main():
(config, configFiles) = getConfig(["Accounting"])
publisher = instantiateImplementation(config.get("Accounting", "publisher"), config)
tashi.publisher = publisher
- cmclient = createClient(config)
logging.config.fileConfig(configFiles)
- accounting = Accounting(config, cmclient)
+ log = logging.getLogger(__name__)
+ log.info('Using configuration file(s) %s' % configFiles)
- accounting.initAccountingServer()
+ accounting = Accounting(config)
+
+ # handle keyboard interrupts (http://code.activestate.com/recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/)
+ child = os.fork()
+
+ if child == 0:
+ accounting.initAccountingServer()
+ # shouldn't exit by itself
+ sys.exit(0)
+
+ else:
+ # main
+ try:
+ os.waitpid(child, 0)
+ except KeyboardInterrupt:
+ log.info("Exiting accounting service after receiving a SIGINT signal")
+ os._exit(0)
+ except Exception:
+ log.exception("Abnormal termination of accounting service")
+ os._exit(-1)
+
+ log.info("Exiting accounting service after service thread exited")
+ os._exit(-1)
+
+ return
if __name__ == "__main__":
main()
diff --git a/src/tashi/accounting/accountingservice.py b/src/tashi/accounting/accountingservice.py
index b1c035a..56c1c90 100644
--- a/src/tashi/accounting/accountingservice.py
+++ b/src/tashi/accounting/accountingservice.py
@@ -5,15 +5,15 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
-# under the License.
+# under the License.
import logging
import threading
@@ -22,43 +22,43 @@
from tashi import createClient
class AccountingService(object):
- """RPC service for the Accounting service"""
-
- def __init__(self, config):
- self.log = logging.getLogger(__name__)
- self.log.setLevel(logging.INFO)
+ """RPC service for the Accounting service"""
- self.config = config
+ def __init__(self, config):
+ self.log = logging.getLogger(__name__)
+ self.log.setLevel(logging.INFO)
- self.pollSleep = None
+ self.config = config
- # XXXstroucki new python has fallback values
- try:
- self.pollSleep = self.config.getint("AccountingService", "pollSleep")
- except:
- pass
+ self.pollSleep = None
- if self.pollSleep is None:
- self.pollSleep = 600
+ # XXXstroucki new python has fallback values
+ try:
+ self.pollSleep = self.config.getint("AccountingService", "pollSleep")
+ except:
+ pass
- self.cm = createClient(config)
- threading.Thread(target=self.__start).start()
+ if self.pollSleep is None:
+ self.pollSleep = 600
+
+ self.cm = createClient(config)
+ threading.Thread(target=self.__start).start()
# remote
- def record(self, strings):
- for string in strings:
- self.log.info("Remote: %s" % (string))
+ def record(self, strings):
+ for string in strings:
+ self.log.info("Remote: %s" % (string))
- def __start(self):
- while True:
- try:
- instances = self.cm.getInstances()
- for instance in instances:
- # XXXstroucki this currently duplicates what the CM was doing.
- self.log.info('Accounting: id %d host %d vmId %d user %d cores %d memory %d' % (instance.id, instance.hostId, instance.vmId, instance.userId, instance.cores, instance.memory))
- except:
- self.log.warning("Accounting iteration failed")
+ def __start(self):
+ while True:
+ try:
+ instances = self.cm.getInstances()
+ for instance in instances:
+ # XXXstroucki this currently duplicates what the CM was doing.
+ self.log.info('Accounting: id %s host %s vmId %s user %s cores %s memory %s' % (instance.id, instance.hostId, instance.vmId, instance.userId, instance.cores, instance.memory))
+ except:
+ self.log.warning("Accounting iteration failed")
-
- # wait to do the next iteration
- time.sleep(self.pollSleep)
+
+ # wait to do the next iteration
+ time.sleep(self.pollSleep)
diff --git a/src/tashi/agents/primitive.py b/src/tashi/agents/primitive.py
index 99ef702..3a1b708 100755
--- a/src/tashi/agents/primitive.py
+++ b/src/tashi/agents/primitive.py
@@ -19,6 +19,7 @@
import time
import logging.config
+import sys
from tashi.rpycservices.rpyctypes import Errors, HostState, InstanceState, TashiException
@@ -26,9 +27,9 @@
import tashi
class Primitive(object):
- def __init__(self, config, cmclient):
+ def __init__(self, config):
self.config = config
- self.cm = cmclient
+ self.cm = createClient(config)
self.hooks = []
self.log = logging.getLogger(__file__)
self.scheduleDelay = float(self.config.get("Primitive", "scheduleDelay"))
@@ -40,10 +41,10 @@
name = name.lower()
if (name.startswith("hook")):
try:
- self.hooks.append(instantiateImplementation(value, config, cmclient, False))
+ self.hooks.append(instantiateImplementation(value, config, self.cm, False))
except:
self.log.exception("Failed to load hook %s" % (value))
- self.hosts = {}
+ self.hosts = {}
self.load = {}
self.instances = {}
self.muffle = {}
@@ -62,9 +63,9 @@
for h in self.cm.getHosts():
#XXXstroucki get all hosts here?
#if (h.up == True and h.state == HostState.Normal):
- hosts[ctr] = h
- ctr = ctr + 1
- load[h.id] = []
+ hosts[ctr] = h
+ ctr = ctr + 1
+ load[h.id] = []
load[None] = []
_instances = self.cm.getInstances()
@@ -199,7 +200,7 @@
if myDisk == i.disks[0].uri and i.disks[0].persistent == True:
count += 1
if count > 1:
- minMaxHost = None
+ minMaxHost = None
if (minMaxHost):
# found a host
@@ -250,7 +251,7 @@
for i in oldInstances:
# XXXstroucki what about paused and saved VMs?
# XXXstroucki: do we need to look at Held VMs here?
- if (i not in self.instances and (oldInstances[i].state == InstanceState.Running or oldInstances[i].state == InstanceState.Destroying)):
+ if (i not in self.instances and (oldInstances[i].state == InstanceState.Running or oldInstances[i].state == InstanceState.Destroying or oldInstances[i].state == InstanceState.ShuttingDown)):
self.log.info("VM exited: %s" % (oldInstances[i].name))
for hook in self.hooks:
hook.postDestroy(oldInstances[i])
@@ -283,10 +284,17 @@
(config, configFiles) = getConfig(["Agent"])
publisher = instantiateImplementation(config.get("Agent", "publisher"), config)
tashi.publisher = publisher
- cmclient = createClient(config)
logging.config.fileConfig(configFiles)
- agent = Primitive(config, cmclient)
- agent.start()
+ agent = Primitive(config)
+
+ try:
+ agent.start()
+ except KeyboardInterrupt:
+ pass
+
+ log = logging.getLogger(__file__)
+ log.info("Primitive exiting")
+ sys.exit(0)
if __name__ == "__main__":
main()
diff --git a/src/tashi/client/client.py b/src/tashi/client/client.py
deleted file mode 100755
index 71b5b20..0000000
--- a/src/tashi/client/client.py
+++ /dev/null
@@ -1,213 +0,0 @@
-#! /usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import inspect
-import os
-import sys
-import types
-from tashi.services.ttypes import *
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
-from thrift.transport.TSocket import TSocket
-
-from tashi.services import clustermanagerservice
-from tashi import vmStates
-
-from tashi.util import getConfig
-
-def makeHTMLTable(list):
- (stdin_r, stdin_w) = os.pipe()
-
-# XXXpipe: find number of columns in current window
- pipe = os.popen("tput cols")
- columns = pipe.read().strip()
- keys = {}
- for k in list:
- for k2 in k.__dict__.keys():
- if (not k2.endswith("Obj")):
- keys[k2] = k2
- if ('id' in keys):
- del keys['id']
- keylist = ['id'] + keys.keys()
- else:
- keylist = keys.keys()
- output = "<html>"
- output = output + "<table>"
- output = output + "<tr>"
- for k in keylist:
- output = output + "<td>%s</td>" % (k)
- output = output + "</tr>"
- for k in list:
- output = output + "<tr>"
- for k2 in keylist:
- if (k2 == "state"):
- output = output + "<td>%s</td>" % (str(vmStates[k.__dict__.get(k2, None)]))
- else:
- output = output + "<td>%s</td>" % (str(k.__dict__.get(k2, None)))
- output = output + "</tr>"
- output = output + "</table>"
- output = output + "</html>"
- pid = os.fork()
- if (pid == 0):
- os.close(stdin_w)
- os.dup2(stdin_r, 0)
- os.close(stdin_r)
- os.execl("/usr/bin/lynx", "/usr/bin/lynx", "-width=%s" % (columns), "-dump", "-stdin")
- sys.exit(-1)
- os.close(stdin_r)
- os.write(stdin_w, output)
- os.close(stdin_w)
- os.waitpid(pid, 0)
-
-def getFunction(argv):
- """Tries to determine the name of the function requested by the user -- may be called multiple times if the binary name is 'client'"""
- function = "None"
- if (len(argv) > 0):
- function = argv[0].strip()
- if (function.rfind("/") != -1):
- function = function[function.rfind("/")+1:]
- if (function.rfind(".") != -1):
- function = function[:function.rfind(".")]
- return function
-
-def getFunctionInfo(m):
- """Gets a string that describes a function from the interface"""
- f = getattr(clustermanagerservice.Iface, m)
- argspec = inspect.getargspec(f)[0][1:]
- return m + inspect.formatargspec(argspec)
-
-def usage():
- """Print program usage"""
- print "Available methods:"
- for m in methods:
- print "\t" + getFunctionInfo(m)
- print
- print "Examples:"
- print "\tgetInstances"
- print "\taddUser 'User(d={\"username\":\"foobar\"})'"
- print "\tremoveUser 2"
- print "\tcreateVM 1 1"
-
-def simpleType(obj):
- """Determines whether an object is a simple type -- used as a helper function to pprint"""
- if (type(obj) is not type([])):
- if (not getattr(obj, "__dict__", None)):
- return True
- return False
-
-def pprint(obj, depth = 0, key = None):
- """My own version of pprint that prints out a dict in a readable, but slightly more compact format"""
- valueManip = lambda x: x
- if (key):
- keyString = key + ": "
- if (key == "state"):
- valueManip = lambda x: vmStates[x]
- else:
- keyString = ""
- if (type(obj) is type([])):
- if (reduce(lambda x, y: x and simpleType(y), obj, True)):
- print (" " * (depth * INDENT)) + keyString + str(obj)
- else:
- print (" " * (depth * INDENT)) + keyString + "["
- for o in obj:
- pprint(o, depth + 1)
- print (" " * (depth * INDENT)) + "]"
- elif (getattr(obj, "__dict__", None)):
- if (reduce(lambda x, y: x and simpleType(y), obj.__dict__.itervalues(), True)):
- print (" " * (depth * INDENT)) + keyString + str(obj)
- else:
- print (" " * (depth * INDENT)) + keyString + "{"
- for (k, v) in obj.__dict__.iteritems():
- pprint(v, depth + 1, k)
- print (" " * (depth * INDENT)) + "}"
- else:
- print (" " * (depth * INDENT)) + keyString + str(valueManip(obj))
-
-def main():
- """Main function for the client program"""
- global INDENT, methods, exitCode
- exitCode = 0
- INDENT = (os.getenv("INDENT", 4))
- methods = filter(lambda x: not x.startswith("__"), clustermanagerservice.Iface.__dict__.keys())
- function = getFunction(sys.argv)
- if (function == "client"):
- function = getFunction(sys.argv[1:])
- if (function == "--makesyms"):
- for m in methods:
- os.symlink(sys.argv[0], m)
- sys.exit(0)
- if (function == "--rmsyms"):
- for m in methods:
- os.unlink(m)
- sys.exit(0)
-
- (config,configFiles) = getConfig(["Client"])
- cfgHost = config.get('Client', 'clusterManagerHost')
- cfgPort = config.get('Client', 'clusterManagerPort')
- cfgTimeout = float(config.get('Client', 'clusterManagerTimeout'))
- host = os.getenv('TASHI_CM_HOST', cfgHost)
- port = os.getenv('TASHI_CM_PORT', cfgPort)
- timeout = float(os.getenv('TASHI_CM_TIMEOUT', cfgTimeout)) * 1000.0
-
- socket = TSocket(host, int(port))
- socket.setTimeout(timeout)
- transport = TBufferedTransport(socket)
- protocol = TBinaryProtocol(transport)
- client = clustermanagerservice.Client(protocol)
- client._transport = transport
- client._transport.open()
- f = getattr(client, function, None)
- if not f:
- usage()
- sys.exit(-1)
- args = map(lambda x: eval(x), sys.argv[1:])
- try:
- res = f(*args)
- def cmp(x, y):
- try:
- if (x.id < y.id):
- return -1
- elif (y.id < x.id):
- return 1
- else:
- return 0
- except Exception, e:
- return 0
- if (type(res) == types.ListType):
- res.sort(cmp)
- if (os.getenv("USE_HTML_TABLES")):
- try:
- makeHTMLTable(res)
- except:
- pprint(res)
- else:
- pprint(res)
- except TashiException, e:
- print e.msg
- exitCode = e.errno
- except TypeError, e:
- print e
- print "\t" + getFunctionInfo(function)
- exitCode = -1
- finally:
- client._transport.close()
- sys.exit(exitCode)
-
-if __name__ == "__main__":
- main()
diff --git a/src/tashi/client/tashi-client.py b/src/tashi/client/tashi-client.py
index 8bbfeb0..261982b 100755
--- a/src/tashi/client/tashi-client.py
+++ b/src/tashi/client/tashi-client.py
@@ -186,12 +186,26 @@
instances.append(client.createVm(instance))
return instances
+def shutdownMany(basename):
+ return __shutdownOrDestroyMany("shutdown", basename)
+
def destroyMany(basename):
+ return __shutdownOrDestroyMany("destroy", basename)
+
+def __shutdownOrDestroyMany(method, basename):
instances = client.getInstances()
count = 0
for i in instances:
if (i.name.startswith(basename + "-") and i.name[len(basename)+1].isdigit()):
- client.destroyVm(i.id)
+ if method == "shutdown":
+ client.shutdownVm(i.id)
+
+ elif method == "destroy":
+ client.destroyVm(i.id)
+
+ else:
+ raise ValueError("Unknown method")
+
count = count + 1
if (count == 0):
raise ValueError("That is an unused basename")
@@ -213,6 +227,7 @@
'copyImage': (None, None),
'createVm': (None, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
'createMany': (createMany, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
+'shutdownMany': (shutdownMany, None),
'destroyMany': (destroyMany, None),
'getVmLayout': (getVmLayout, ['id', 'name', 'state', 'instances', 'usedMemory', 'memory', 'usedCores', 'cores']),
'getInstances': (None, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
@@ -225,6 +240,7 @@
'createMany': [('userId', int, getUser, False), ('basename', str, lambda: requiredArg('basename'), True), ('cores', int, lambda: 1, False), ('memory', int, lambda: 128, False), ('disks', parseDisks, lambda: requiredArg('disks'), True), ('nics', parseNics, randomNetwork, False), ('hints', parseHints, lambda: {}, False), ('count', int, lambda: requiredArg('count'), True)],
'shutdownVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
'destroyVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
+'shutdownMany': [('basename', str, lambda: requiredArg('basename'), True)],
'destroyMany': [('basename', str, lambda: requiredArg('basename'), True)],
'suspendVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
'resumeVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
@@ -250,6 +266,7 @@
'createMany': '[Instance(d={"userId":userId,"name":basename,"cores":cores,"memory":memory,"disks":disks,"nics":nics,"hints":hints}), count]',
'shutdownVm': '[instance]',
'destroyVm': '[instance]',
+'shutdownMany': '[basename]',
'destroyMany': '[basename]',
'suspendVm': '[instance]',
'resumeVm': '[instance]',
@@ -268,6 +285,7 @@
'createMany': 'Utility function that creates many VMs with the same set of parameters',
'shutdownVm': 'Attempts to shutdown a VM nicely',
'destroyVm': 'Immediately destroys a VM -- it is the same as unplugging a physical machine and should be used for non-persistent VMs or when all else fails',
+'shutdownMany': 'Attempts to gracefully shut down a group of VMs created with createMany',
'destroyMany': 'Destroys a group of VMs created with createMany',
'suspendVm': 'Suspends a running VM to disk',
'resumeVm': 'Resumes a suspended VM from disk',
@@ -293,6 +311,7 @@
'createMany': ['--basename foobar --disks i386-hardy.qcow2 --count 4'],
'shutdownVm': ['--instance 12345', '--instance foobar'],
'destroyVm': ['--instance 12345', '--instance foobar'],
+'shutdownMany': ['--basename foobar'],
'destroyMany': ['--basename foobar'],
'suspendVm': ['--instance 12345', '--instance foobar'],
'resumeVm': ['--instance 12345', '--instance foobar'],
@@ -321,7 +340,8 @@
print "Unknown function %s" % (func)
print
functions = argLists
- print "%s is the client program for Tashi, a system for cloud-computing on BigData." % (os.path.basename(sys.argv[0]))
+ print "%s is the client program for Tashi" % (os.path.basename(sys.argv[0]))
+ print "Tashi, a system for cloud-computing on BigData"
print "Visit http://incubator.apache.org/tashi/ for more information."
print
else:
@@ -507,6 +527,7 @@
"""Main function for the client program"""
global INDENT, exitCode, client
exitCode = 0
+ exception = None
INDENT = (os.getenv("INDENT", 4))
if (len(sys.argv) < 2):
usage()
@@ -551,25 +572,47 @@
if (arg.startswith("--")):
if (arg[2:] in possibleArgs):
(parg, conv, default, required) = possibleArgs[arg[2:]]
- val = conv(args.pop(0))
+ try:
+ val = None
+ lookahead = args[0]
+ if not lookahead.startswith("--"):
+ val = args.pop(0)
+ except:
+ pass
+
+ val = conv(val)
if (val == None):
val = default()
vals[parg] = val
continue
+ # somewhat lame, but i don't want to rewrite the fn at this time
+ exception = ValueError("Unknown argument %s" % (arg))
- raise ValueError("Unknown argument %s" % (arg))
-
-
- f = getattr(client, function, None)
+ f = None
+ try:
+ f = extraViews[function][0]
+ except:
+ pass
if (f is None):
- f = extraViews[function][0]
- if (function in convertArgs):
- fargs = eval(convertArgs[function], globals(), vals)
- else:
- fargs = []
- res = f(*fargs)
+ f = getattr(client, function, None)
+
+ try:
+ if exception is not None:
+ raise exception
+
+ if (function in convertArgs):
+ fargs = eval(convertArgs[function], globals(), vals)
+ else:
+ fargs = []
+
+ res = f(*fargs)
+ except Exception, e:
+ print "Failed in calling %s: %s" % (function, e)
+ print "Please run tashi-client --examples for syntax information"
+ sys.exit(-1)
+
if (res != None):
keys = extraViews.get(function, (None, None))[1]
try:
diff --git a/src/tashi/client/test.py b/src/tashi/client/test.py
deleted file mode 100644
index a53eefa..0000000
--- a/src/tashi/client/test.py
+++ /dev/null
@@ -1,314 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import unittest
-import logging
-import sys
-import signal
-import os.path
-import copy
-import time
-import random
-from ConfigParser import ConfigParser
-
-from tashi.services.ttypes import *
-from thrift.transport.TSocket import TSocket
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
-
-from tashi.services import clustermanagerservice
-from tashi.messaging.threadpool import synchronized
-from tashi.messaging.tashimessaging import TestTashiSubscriber
-
-from tashi.util import getConfig
-
-import tashi.client.client
-
-class ClientConnection(object):
- '''Creates an rpc proxy'''
- def __init__(self, host, port):
- self.host = host
- self.port = port
- self.transport = TBufferedTransport(TSocket(host, int(port)))
- self.protocol = TBinaryProtocol(self.transport)
- self.client = clustermanagerservice.Client(self.protocol)
- self.client._transport = self.transport
- self.client._transport.open()
- def __del__(self):
- self.client._transport.close()
-
-def incrementor(init=0):
- while 1:
- yield init
- init = init + 1
-
-# FIXME: don't duplicate code from clustermanager
-# def getConfig(args):
-# config = ConfigParser()
-# configFiles = [
-# '/usr/share/tashi/ClusterManagerDefaults.cfg',
-# '/etc/tashi/ClusterManager.cfg',
-# os.path.expanduser('~/.tashi/ClusterManager.cfg')
-# ] + ([args[0]] if len(args) > 0 else [])
-
-# configFiles = config.read(configFiles)
-# if len(configFiles) == 0:
-# print >>sys.stderr, 'Unable to find the configuration file\n'
-# sys.exit(3)
-
-# return config
-
-
-class TestClient(unittest.TestCase):
- @synchronized()
- def getPortNum(self):
- return self.portnum.next()
-
- """macro test cases for single-host tests
-
- Assumes cwd is 'src/tashi/client/'
- """
- def setUp(self):
- """Create a CM and single NM on local host"""
- logging.info('setting up test')
-
- (self.config, self.configfiles) = getConfig([])
-
- self.port = 1717 # FIXME: take this (and other things) from config file
- self.portnum = incrementor(self.port)
-
- self.cwd = os.getcwd()
- self.srcd = os.path.dirname(os.path.dirname(self.cwd))
-
- self.environ = copy.copy(os.environ)
- self.environ['PYTHONPATH'] = self.srcd
- logging.info('base path = %s' % self.srcd)
-
- self.nm = os.spawnlpe(os.P_NOWAIT, 'python', 'python',
- os.path.join(self.srcd, 'tashi', 'nodemanager', 'nodemanager.py'),
- self.environ)
- self.cm = os.spawnlpe(os.P_WAIT, 'python', 'python',
- os.path.join(self.srcd, 'tashi', 'clustermanager', 'clustermanager.py'),
- '--drop', '--create',
- os.path.expanduser('~/.tashi/ClusterManager.cfg'),
- self.environ)
- self.cm = os.spawnlpe(os.P_NOWAIT, 'python', 'python',
- os.path.join(self.srcd, 'tashi', 'clustermanager', 'clustermanager.py'),
- os.path.expanduser('~/.tashi/ClusterManager.cfg'),
- self.environ)
- # since we are spawning with P_NOWAIT, we need to sleep to ensure that the CM is listening
- time.sleep(1)
- try:
- self.connection = ClientConnection('localhost', self.config.get('ClusterManagerService', 'port'))
- except Exception, e:
- logging.warning('client connection failed')
- ex = None
- try:
- logging.warning("setUp killing node manager " + str(self.nm))
- os.kill(self.nm, signal.SIGKILL)
- except Exception, e:
- ex = e
- logging.warning('could not kill node manager: '+ str(e))
- try:
- logging.warning('setUp killing cluster manager ' + str(self.cm))
- os.kill(self.cm, signal.SIGKILL)
- except Exception, e:
- ex = e
- logging.warning('could not kill cluster manager: ' + str(e))
- if e != None:
- raise e
-
- logging.info('node manager PID: %i' % self.nm)
- def tearDown(self):
- '''Kill the CM and NM that were created by setUP'''
- logging.info('tearing down test')
- ex = None
- try:
- logging.debug("killing cluster manager " + str(self.cm))
- os.kill(self.cm, signal.SIGKILL)
- except Exception, e:
- ex = e
- logging.error('Could not kill cluster manager: ' + str(e))
-
- try:
- logging.debug("killing node manager " + str(self.nm))
- os.kill(self.nm, signal.SIGKILL)
- except Exception, e:
- ex = e
- logging.error('Could not kill node manager: ' + str(e))
- if ex != None:
- raise ex
- def testSetup(self):
- '''empty test to ensure that setUp code works'''
- logging.info('setting up')
- def testHostManagement(self):
- '''test adding/removing/listing hosts
-
- Right now this just adds a single host: localhost. Eventually
- it should 1) take a list of hosts from a test configuration
- file, 2) ensure that all were added, 3) remove a random
- subset, 4) ensure that they were correctly removed, 5) remove
- all, 6) ensure that they were correctly removed.'''
-
- # get empty host list
- hosts = self.connection.client.getHosts()
- self.assertEqual(hosts, [], 'starting host list not empty: ' + str(hosts) )
-
- # add a host
- host = Host()
- host.hostname = 'localhost'
- host.enabled=True
- self.connection.client.addHost(host)
- hosts = self.connection.client.getHosts()
- self.assertEqual(len(hosts), 1, 'wrong number of hosts %i, should be %i' % (len(hosts), 1) )
- self.assertEqual(hosts[0].hostname, 'localhost', 'wrong hostname: ' + str(hosts[0].hostname) )
-
- # remove first host
- hid = hosts[0].id
- self.connection.client.removeHost(hid)
- hosts = self.connection.client.getHosts()
- self.assertEqual(hosts, [], 'host list not empty after remove: ' + str(hosts) )
-
- def testMessaging(self):
- '''test messaging system started by CM
-
- tests messages published directly, through events in the CM,
- and the log system'''
- # FIXME: add tests for generating events as a side-effect of
- # rpc commands, as well as logging in the CM
- portnum = self.getPortNum()
- self.sub = TestTashiSubscriber(self.config, portnum)
- self.assertEqual(self.sub.messageQueue.qsize(), 0)
- self.pub = tashi.messaging.thriftmessaging.PublisherThrift(self.config.get('MessageBroker', 'host'),
- int(self.config.get('MessageBroker', 'port')))
- self.pub.publish({'message-type':'text', 'message':'Hello World!'})
- time.sleep(0.5)
- print '*** QSIZE', self.sub.messageQueue.qsize()
- self.assertEqual(self.sub.messageQueue.qsize(), 1)
-
- self.log = logging.getLogger(__name__)
- messageHandler = tashi.messaging.tashimessaging.TashiLogHandler(self.config)
- self.log.addHandler(messageHandler)
- # FIXME: why can't we log messages with severity below 'warning'?
- self.log.warning('test log message')
- time.sleep(0.5)
- self.assertEqual(self.sub.messageQueue.qsize(), 2)
-
- # This should generate at least one log message
-# hosts = self.connection.client.getHosts()
-# time.sleep(0.5)
-# if (self.sub.messageQueue.qsize() <= 2):
-# self.fail()
-
- def testUserManagement(self):
- '''test adding/removing/listing users
-
- same as testHostManagement, but with users'''
- usernames = ['sleepy', 'sneezy', 'dopey', 'doc',
- 'grumpy', 'bashful', 'happy']
- # add all users
- for un in usernames:
- user = User()
- user.username = un
- self.connection.client.addUser(user)
- # ensure that all were added
- users = self.connection.client.getUsers()
- self.assertEqual(len(usernames), len(users))
- for user in users:
- usernames.remove(user.username)
- self.assertEqual(0, len(usernames))
- # remove a random subset
- rm = random.sample(users, 4)
- for user in rm:
- self.connection.client.removeUser(user.id)
- users.remove(user)
- newUsers = self.connection.client.getUsers()
- # This ensures that the remaining ones are what we expect:
- for user in newUsers:
- # if there is a user remaining that we asked to be removed,
- # this will throw an exception
- users.remove(user)
- # if a user was removed that we did not intend, this will
- # throw an exception
- self.assertEqual(0, len(users))
-
-# def testInstanceConfigurationManagement(self):
-# '''test adding/removing/listing instance configurations
-
-# same as testHostManagement, but with instance configurations'''
-# self.fail('test not implemented')
- def testHardDiskConfigurationManagement(self):
- '''test adding/removing/listing hard disk configurations
-
- same as testHostManagement, but with hard disk configurations'''
-
- user = User(d={'username':'sleepy'})
- self.connection.client.addUser(user)
- users = self.connection.client.getUsers()
-
- per = PersistentImage()
- per.userId = users[0].id
- per.name = 'sleepy-PersistentImage'
- self.connection.client.addPersistentImage(per)
- pers = self.connection.client.getPersistentImages()
-
- inst = InstanceConfiguration()
- inst.name = 'sleepy-inst'
- inst.memory = 512
- inst.cores = 1
- self.connection.client.addInstanceConfiguration(inst)
- insts = self.connection.client.getInstanceConfigurations()
-
- hdc = HardDiskConfiguration()
- hdc.index = 0
- hdc.persistentImageId = pers[0].id
- hdc.persistent = False
- hdc.instanceConfigurationId = insts[0].id
-
-# def testCreateDestroyShutdown(self):
-# '''test creating/destroying/shutting down VMs
-
-# not implemented'''
-# self.fail('test not implemented')
-# def testSuspendResume(self):
-# '''test suspending/resuming VMs
-
-# not implemented'''
-# self.fail('test not implemented')
-# def testMigrate(self):
-# '''test migration
-
-# not implemented'''
-# self.fail('test not implemented')
-# def testPauseUnpause(self):
-# '''test pausing/unpausing VMs
-
-# not implemented'''
-# self.fail('test not implemented')
-
-
-##############################
-# Test Code
-##############################
-if __name__ == '__main__':
- logging.basicConfig(level=logging.NOTSET,
- format="%(asctime)s %(levelname)s:\t %(message)s",
- stream=sys.stdout)
-
- suite = unittest.TestLoader().loadTestsFromTestCase(TestClient)
- unittest.TextTestRunner(verbosity=2).run(suite)
-
diff --git a/src/tashi/clustermanager/clustermanager.py b/src/tashi/clustermanager/clustermanager.py
index db61194..818bbb0 100755
--- a/src/tashi/clustermanager/clustermanager.py
+++ b/src/tashi/clustermanager/clustermanager.py
@@ -17,11 +17,12 @@
# specific language governing permissions and limitations
# under the License.
+import os
import sys
-import signal
+import time
import logging.config
-from tashi.util import signalHandler, boolean, instantiateImplementation, getConfig, debugConsole
+from tashi.util import boolean, instantiateImplementation, getConfig, debugConsole
import tashi
from tashi.rpycservices import rpycservices
@@ -54,19 +55,11 @@
t.service._type = 'ClusterManagerService'
debugConsole(globals())
-
- try:
- t.start()
- except KeyboardInterrupt:
- handleSIGTERM(signal.SIGTERM, None)
-@signalHandler(signal.SIGTERM)
-def handleSIGTERM(signalNumber, stackFrame):
- global log
+ t.start()
+ # shouldn't exit by itself
+ return
- log.info('Exiting cluster manager after receiving a SIGINT signal')
- sys.exit(0)
-
def main():
global log
@@ -80,7 +73,32 @@
# bind the database
log.info('Starting cluster manager')
- startClusterManager(config)
+
+ # handle keyboard interrupts (http://code.activestate.com/recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/)
+ child = os.fork()
+
+ if child == 0:
+ startClusterManager(config)
+ # shouldn't exit by itself
+ sys.exit(0)
+
+ else:
+ # main
+ try:
+ os.waitpid(child, 0)
+ except KeyboardInterrupt:
+ log.info("Exiting cluster manager after receiving a SIGINT signal")
+ os._exit(0)
+ except Exception:
+ log.exception("Abnormal termination of cluster manager")
+ os._exit(-1)
+
+ log.info("Exiting cluster manager after service thread exited")
+ os._exit(-1)
+
+ return
+
+
if __name__ == "__main__":
main()
diff --git a/src/tashi/clustermanager/clustermanagerservice.py b/src/tashi/clustermanager/clustermanagerservice.py
index 32ab2f9..a2b116d 100644
--- a/src/tashi/clustermanager/clustermanagerservice.py
+++ b/src/tashi/clustermanager/clustermanagerservice.py
@@ -19,7 +19,7 @@
import threading
import time
-from tashi.rpycservices import rpycservices
+from tashi.rpycservices import rpycservices
from tashi.rpycservices.rpyctypes import Errors, InstanceState, HostState, TashiException
from tashi import boolean, ConnectionManager, vmStates, version, scrubString
@@ -36,7 +36,7 @@
else:
self.username = None
self.password = None
- self.proxy = ConnectionManager(self.username, self.password, int(self.config.get('ClusterManager', 'nodeManagerPort')))
+ self.proxy = ConnectionManager(self.username, self.password, int(self.config.get('ClusterManager', 'nodeManagerPort')), authAndEncrypt=self.authAndEncrypt)
self.dfs = dfs
self.convertExceptions = boolean(config.get('ClusterManagerService', 'convertExceptions'))
self.log = logging.getLogger(__name__)
@@ -71,7 +71,7 @@
try:
if (self.accountingHost is not None) and \
(self.accountingPort is not None):
- self.accountingClient=rpycservices.client(self.accountingHost, self.accountingPort)
+ self.accountingClient = ConnectionManager(self.username, self.password, self.accountingPort)[self.accountingHost]
except:
self.log.exception("Could not init accounting")
@@ -126,7 +126,7 @@
except:
self.log.exception("Invalid host data")
- secondary = ','.join(filter(None, (hostText, instanceText)))
+ secondary = ','.join(filter(None, (hostText, instanceText)))
line = "%s|%s|%s" % (now, text, secondary)
@@ -271,7 +271,12 @@
for instanceId in self.instanceLastContactTime.keys():
# XXXstroucki should lock instance here?
- if (self.instanceLastContactTime[instanceId] < (self.__now() - self.allowDecayed)):
+ try:
+ lastContactTime = self.instanceLastContactTime[instanceId]
+ except KeyError:
+ continue
+
+ if (lastContactTime < (self.__now() - self.allowDecayed)):
try:
instance = self.data.acquireInstance(instanceId)
# Don't query non-running VMs. eg. if a VM
@@ -348,7 +353,7 @@
def shutdownVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Running, InstanceState.ShuttingDown)
+ self.__stateTransition(instance, None, InstanceState.ShuttingDown)
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM SHUTDOWN", instance=instance)
hostname = self.data.getHost(instance.hostId).name
@@ -366,7 +371,7 @@
self.data.removeInstance(instance)
elif (instance.state is InstanceState.Activating):
self.__ACCOUNT("CM VM DESTROY STARTING", instance=instance)
- self.__stateTransition(instance, InstanceState.Activating, InstanceState.Destroying)
+ self.__stateTransition(instance, None, InstanceState.Destroying)
self.data.releaseInstance(instance)
else:
# XXXstroucki: This is a problem with keeping
@@ -382,7 +387,7 @@
self.proxy[hostname].destroyVm(instance.vmId)
self.data.releaseInstance(instance)
except:
- self.log.exception('destroyVm failed on host %s vmId %s' % (hostname, str(instance.vmId)))
+ self.log.warning('destroyVm failed on host %s vmId %s' % (hostname, str(instance.vmId)))
self.data.removeInstance(instance)
@@ -390,7 +395,12 @@
def suspendVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
+ try:
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM SUSPEND", instance=instance)
hostname = self.data.getHost(instance.hostId).name
@@ -404,7 +414,12 @@
def resumeVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
+ try:
+ self.__stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
source = "suspend/%d_%s" % (instance.id, instance.name)
instance.hints['__resume_source'] = source
self.data.releaseInstance(instance)
@@ -422,7 +437,13 @@
except:
self.data.releaseInstance(instance)
raise
- self.__stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
+
+ try:
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
try:
# Prepare the target
@@ -434,7 +455,12 @@
self.log.exception('prepReceiveVm failed')
raise
instance = self.data.acquireInstance(instance.id)
- self.__stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
+ try:
+ self.__stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
try:
# Send the VM
@@ -458,7 +484,12 @@
def pauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
+ try:
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM PAUSE", instance=instance)
hostname = self.data.getHost(instance.hostId).name
@@ -468,13 +499,23 @@
self.log.exception('pauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
raise
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
+ try:
+ self.__stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
return
def unpauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
+ try:
+ self.__stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM UNPAUSE", instance=instance)
hostname = self.data.getHost(instance.hostId).name
@@ -484,7 +525,12 @@
self.log.exception('unpauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
raise
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
+ try:
+ self.__stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
return
@@ -633,7 +679,7 @@
self.__ACCOUNT("CM VM ACTIVATE", instance=instance)
if ('__resume_source' in instance.hints):
- self.__stateTransition(instance, InstanceState.Pending, InstanceState.Resuming)
+ self.__stateTransition(instance, None, InstanceState.Resuming)
else:
# XXXstroucki should held VMs be continually tried? Or be explicitly set back to pending?
#self.__stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
@@ -679,12 +725,12 @@
self.data.releaseInstance(instance)
return "success"
- def registerHost(self, hostname, memory, cores, version):
- hostId, alreadyRegistered = self.data.registerHost(hostname, memory, cores, version)
- if alreadyRegistered:
- self.log.info("Host %s is already registered, it was updated now" % hostname)
- else:
- self.log.info("A host was registered - hostname: %s, version: %s, memory: %s, cores: %s" % (hostname, version, memory, cores))
+ def registerHost(self, hostname, memory, cores, version):
+ hostId, alreadyRegistered = self.data.registerHost(hostname, memory, cores, version)
+ if alreadyRegistered:
+ self.log.info("Host %s is already registered, it was updated now" % hostname)
+ else:
+ self.log.info("A host was registered - hostname: %s, version: %s, memory: %s, cores: %s" % (hostname, version, memory, cores))
try:
host = self.data.getHost(hostId)
@@ -692,9 +738,9 @@
except:
self.log.warning("Failed to lookup host %s" % hostId)
- return hostId
+ return hostId
- def unregisterHost(self, hostId):
+ def unregisterHost(self, hostId):
try:
host = self.data.getHost(hostId)
self.__ACCOUNT("CM HOST UNREGISTER", host=host)
@@ -702,9 +748,9 @@
self.log.warning("Failed to lookup host %s" % hostId)
return
- self.data.unregisterHost(hostId)
- self.log.info("Host %s was unregistered" % hostId)
- return
+ self.data.unregisterHost(hostId)
+ self.log.info("Host %s was unregistered" % hostId)
+ return
# service thread
def __monitorCluster(self):
diff --git a/src/tashi/clustermanager/data/fromconfig.py b/src/tashi/clustermanager/data/fromconfig.py
index 8511a07..d4cdaff 100644
--- a/src/tashi/clustermanager/data/fromconfig.py
+++ b/src/tashi/clustermanager/data/fromconfig.py
@@ -16,16 +16,18 @@
# under the License.
from __future__ import with_statement
+import logging
import threading
import os
import ConfigParser
-from tashi.rpycservices.rpyctypes import Host, Network, User, TashiException, Errors, HostState
+from tashi.rpycservices.rpyctypes import Host, Network, User, TashiException, Errors, HostState, Instance
from tashi.clustermanager.data import DataInterface
class FromConfig(DataInterface):
def __init__(self, config):
DataInterface.__init__(self, config)
+ self.log = logging.getLogger(__name__)
self.hosts = {}
self.instances = {}
self.networks = {}
@@ -78,6 +80,10 @@
return instanceId
def registerInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
self.acquireLock(self.instanceLock)
try:
if (instance.id is not None and instance.id not in self.instances):
@@ -107,6 +113,10 @@
return instance
def releaseInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
try:
if (instance.id not in self.instances): # MPR: should never be true, but good to check
raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (instance.id)})
@@ -114,6 +124,10 @@
self.releaseLock(instance._lock)
def removeInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
self.acquireLock(self.instanceLock)
try:
del self.instances[instance.id]
@@ -122,6 +136,10 @@
self.releaseLock(self.instanceLock)
def acquireHost(self, hostId):
+ if type(hostId) is not int:
+ self.log.exception("Argument is not of type int, but of type %s" % (type(hostId)))
+ raise TypeError
+
self.hostLock.acquire()
host = self.hosts.get(hostId, None)
if (host is None):
@@ -134,6 +152,10 @@
def releaseHost(self, host):
+ if type(host) is not Host:
+ self.log.exception("Argument is not of type Host, but of type %s" % (type(host)))
+ raise TypeError
+
try:
if (host.id not in self.hosts): # MPR: should never be true, but good to check
raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (host.id)})
diff --git a/src/tashi/clustermanager/data/getentoverride.py b/src/tashi/clustermanager/data/getentoverride.py
index 21b2f8f..75703ea 100644
--- a/src/tashi/clustermanager/data/getentoverride.py
+++ b/src/tashi/clustermanager/data/getentoverride.py
@@ -15,16 +15,18 @@
# specific language governing permissions and limitations
# under the License.
+import logging
import subprocess
import time
import os
-from tashi.rpycservices.rpyctypes import User, LocalImages
+from tashi.rpycservices.rpyctypes import User, LocalImages, Instance, Host
from tashi.clustermanager.data import DataInterface
from tashi.util import instantiateImplementation, humanReadable
class GetentOverride(DataInterface):
def __init__(self, config):
DataInterface.__init__(self, config)
+ self.log = logging.getLogger(__name__)
self.baseDataObject = instantiateImplementation(config.get("GetentOverride", "baseData"), config)
self.dfs = instantiateImplementation(config.get("ClusterManager", "dfs"), config)
@@ -33,21 +35,41 @@
self.fetchThreshold = float(config.get("GetentOverride", "fetchThreshold"))
def registerInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
return self.baseDataObject.registerInstance(instance)
def acquireInstance(self, instanceId):
return self.baseDataObject.acquireInstance(instanceId)
def releaseInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
return self.baseDataObject.releaseInstance(instance)
def removeInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
return self.baseDataObject.removeInstance(instance)
def acquireHost(self, hostId):
+ if type(hostId) is not int:
+ self.log.exception("Argument is not of type int, but of type %s" % (type(hostId)))
+ raise TypeError
+
return self.baseDataObject.acquireHost(hostId)
def releaseHost(self, host):
+ if type(host) is not Host:
+ self.log.exception("Argument is not of type Host, but of type %s" % (type(host)))
+ raise TypeError
+
return self.baseDataObject.releaseHost(host)
def getHosts(self):
diff --git a/src/tashi/clustermanager/data/ldapoverride.py b/src/tashi/clustermanager/data/ldapoverride.py
index 17904ab..66a60d6 100644
--- a/src/tashi/clustermanager/data/ldapoverride.py
+++ b/src/tashi/clustermanager/data/ldapoverride.py
@@ -72,16 +72,16 @@
def getNetwork(self, id):
return self.baseDataObject.getNetwork(id)
- def getImages(self):
- count = 0
- myList = []
- for i in self.dfs.list("images"):
- myFile = self.dfs.getLocalHandle("images/" + i)
- if os.path.isfile(myFile):
- image = LocalImages(d={'id':count, 'imageName':i, 'imageSize':humanReadable(self.dfs.stat(myFile)[6])})
- myList.append(image)
- count += 1
- return myList
+ def getImages(self):
+ count = 0
+ myList = []
+ for i in self.dfs.list("images"):
+ myFile = self.dfs.getLocalHandle("images/" + i)
+ if os.path.isfile(myFile):
+ image = LocalImages(d={'id':count, 'imageName':i, 'imageSize':humanReadable(self.dfs.stat(myFile)[6])})
+ myList.append(image)
+ count += 1
+ return myList
def fetchFromLdap(self):
now = time.time()
diff --git a/src/tashi/clustermanager/data/pickled.py b/src/tashi/clustermanager/data/pickled.py
index b6724c8..d5348a4 100644
--- a/src/tashi/clustermanager/data/pickled.py
+++ b/src/tashi/clustermanager/data/pickled.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
+import logging
import cPickle
import os
import threading
@@ -24,6 +25,7 @@
class Pickled(FromConfig):
def __init__(self, config):
DataInterface.__init__(self, config)
+ self.log = logging.getLogger(__name__)
self.file = self.config.get("Pickled", "file")
self.locks = {}
self.lockNames = {}
diff --git a/src/tashi/clustermanager/data/sql.py b/src/tashi/clustermanager/data/sql.py
index 64e5681..6b48017 100644
--- a/src/tashi/clustermanager/data/sql.py
+++ b/src/tashi/clustermanager/data/sql.py
@@ -130,6 +130,10 @@
return h
def registerInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
self.instanceLock.acquire()
try:
if (instance.id is not None and instance.id not in self.getInstances()):
@@ -173,6 +177,10 @@
return instance
def releaseInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
self.instanceLock.acquire()
try:
l = self.makeInstanceList(instance)
@@ -191,6 +199,10 @@
self.instanceLock.release()
def removeInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
self.instanceLock.acquire()
try:
self.executeStatement("DELETE FROM instances WHERE id = %d" % (instance.id))
@@ -205,6 +217,10 @@
self.instanceLock.release()
def acquireHost(self, hostId):
+ if type(hostId) is not int:
+ self.log.exception("Argument is not of type int, but of type %s" % (type(hostId)))
+ raise TypeError
+
host = self.getHost(hostId)
self.hostLock.acquire()
self.hostLocks[host.id] = self.hostLocks.get(host.id, threading.Lock())
@@ -214,6 +230,10 @@
return host
def releaseHost(self, host):
+ if type(host) is not Host:
+ self.log.exception("Argument is not of type Host, but of type %s" % (type(host)))
+ raise TypeError
+
l = self.makeHostList(host)
s = ""
for e in range(0, len(self.hostOrder)):
@@ -284,16 +304,17 @@
network = Network(d={'id':r[0], 'name':r[1]})
return network
- def getImages(self):
- count = 0
- myList = []
- for i in self.dfs.list("images"):
- myFile = self.dfs.getLocalHandle("images/" + i)
- if os.path.isfile(myFile):
- image = LocalImages(d={'id':count, 'imageName':i, 'imageSize':humanReadable(self.dfs.stat(myFile)[6])})
- myList.append(image)
- count += 1
- return myList
+ def getImages(self):
+ count = 0
+ myList = []
+ for i in self.dfs.list("images"):
+ myFile = self.dfs.getLocalHandle("images/" + i)
+ if os.path.isfile(myFile):
+ image = LocalImages(d={'id':count, 'imageName':i, 'imageSize':humanReadable(self.dfs.stat(myFile)[6])})
+ myList.append(image)
+ count += 1
+
+ return myList
def getUsers(self):
cur = self.executeStatement("SELECT * from users")
diff --git a/src/tashi/connectionmanager.py b/src/tashi/connectionmanager.py
index 5eeae6c..d4093b9 100644
--- a/src/tashi/connectionmanager.py
+++ b/src/tashi/connectionmanager.py
@@ -16,14 +16,16 @@
# under the License.
from tashi.rpycservices import rpycservices
+from tashi import Connection
#from tashi.rpycservices.rpyctypes import *
class ConnectionManager(object):
- def __init__(self, username, password, port, timeout=10000.0):
+ def __init__(self, username, password, port, timeout=10000.0, authAndEncrypt=False):
self.username = username
self.password = password
self.timeout = timeout
self.port = port
+ self.authAndEncrypt = authAndEncrypt
def __getitem__(self, hostname):
port = self.port
@@ -31,4 +33,4 @@
port = hostname[1]
hostname = hostname[0]
- return rpycservices.client(hostname, port, username=self.username, password=self.password)
+ return Connection(hostname, port, credentials=(self.username, self.password), authAndEncrypt=self.authAndEncrypt)
diff --git a/src/tashi/messaging/messageBroker.py b/src/tashi/messaging/messageBroker.py
deleted file mode 100644
index c21b57a..0000000
--- a/src/tashi/messaging/messageBroker.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import ConfigParser
-import getopt
-
-import os
-import sys
-import time
-
-import thriftmessaging
-
-options = []
-long_options = ['port=']
-
-# FIXME: should initialize from config file
-params = {"port":1717}
-
-try:
- optlist, args = getopt.getopt(sys.argv[1:], options, long_options)
-except getopt.GetoptError, err:
- print str(err)
- sys.exit(2)
-
-for opt in optlist:
- if opt[0] == "--port":
- try:
- params["port"] = int(opt[1])
- except:
- print "--port expects an integer, got %s" % opt[1]
- sys.exit(0)
-
-print "Starting message broker on port %i" % params["port"]
-broker = thriftmessaging.MessageBrokerThrift(params["port"], daemon=False)
-
diff --git a/src/tashi/messaging/messaging.py b/src/tashi/messaging/messaging.py
deleted file mode 100644
index c421d5c..0000000
--- a/src/tashi/messaging/messaging.py
+++ /dev/null
@@ -1,337 +0,0 @@
-#!/usr/bin/python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import threading
-import thread
-import sys
-import os
-import socket
-import Queue
-import copy
-import random
-import traceback
-
-from threadpool import ThreadPoolClass, threadpool, ThreadPool
-from threadpool import threadpoolmethod, threaded, synchronized, synchronizedmethod
-
-class RWLock(object):
- """RWLock: Simple reader/writer lock implementation
- FIXME: this implementation will starve writers!
- Methods:
- acquire() : take lock for read access
- release() : release lock from read access
- acquireWrite() : take lock for write access
- releaseWrite() : release lock from write access"""
- def __init__(self):
- self.lock = threading.Condition()
- self.readers = 0
- def acquire(self):
- self.lock.acquire()
- self.readers = self.readers + 1
- self.lock.release()
- def release(self):
- self.lock.acquire()
- self.readers = self.readers - 1
- self.lock.notify()
- self.lock.release()
- def acquireWrite(self):
- self.lock.acquire()
- while self.readers > 0:
- self.lock.wait()
- def releaseWrite(self):
- self.lock.notify()
- self.lock.release()
-
-
-
-class MessageBroker(object):
- def __init__(self):
- self.sublock = RWLock()
- self.subscribers = []
- self.random = random.Random()
- def log(self, msg):
- print "MessageBroker: Got log: '%s'" % str(msg)
- return msg
- def addSubscriber(self, subscriber):
- self.sublock.acquireWrite()
- self.subscribers.append(subscriber)
- l = len(self.subscribers)
- self.sublock.releaseWrite()
- return l
- def publish(self, message):
- removesubs = []
- i = self.random.randint(0,100)
-
-# subscribers = self.getSubscribers()
-# random.shuffle(subscribers)
-
- self.sublock.acquire()
-
- sys.stdout.flush()
-
- for subscriber in self.subscribers:
- try:
- sys.stdout.flush()
- assert(subscriber != self)
- subscriber.publish(message)
- sys.stdout.flush()
- except Exception, e:
- print e
- removesubs.append(subscriber)
-
- self.sublock.release()
-
- if len(removesubs) > 0:
- print "detected %i failed subscribers" % len(removesubs)
- sys.stdout.flush()
- self.sublock.acquireWrite()
- for subscriber in removesubs:
- try:
- self.subscribers.remove(subscriber)
- except:
- pass
- self.sublock.releaseWrite()
- def getSubscribers(self):
- self.sublock.acquire()
- subs = copy.copy(self.subscribers)
- self.sublock.release()
- return subs
- def removeSubscriber(self, subscriber):
- self.sublock.acquireWrite()
- try:
- self.subscribers.remove(subscriber)
- except:
- pass
- self.sublock.releaseWrite()
- def publishList(self, messages):
- for message in messages:
- self.publish(message)
-
-class Subscriber(object):
- def __init__(self, broker, pmatch={}, nmatch={}, synchronized=False):
- self.broker = broker
- self.lock = threading.Lock()
- self.synchronized = synchronized
- self.pmatch={}
- self.nmatch={}
- broker.addSubscriber(self)
- def publish(self, message):
- sys.stdout.flush()
- msg = message
- try:
- if self.synchronized:
- self.lock.acquire()
- msg = self.filter(msg)
- if (msg != None):
- self.handle(msg)
- if self.synchronized:
- self.lock.release()
- except Exception, x:
- if self.synchronized:
- self.lock.release()
- print '%s, %s, %s' % (type(x), x, traceback.format_exc())
- def publishList(self, messages):
- for message in messages:
- self.publish(message)
- def handle(self, message):
- print "Subscriber Default Handler: '%s'" % message
- def setMatch(self, pmatch={}, nmatch={}):
- self.lock.acquire()
- self.pmatch=pmatch
- self.nmatch=nmatch
- self.lock.release()
- def filter(self, message):
- """filter(self, message) : the filter function returns
- the message, modified to be passed to the handler.
- Returning (None) indicates that this is not a message
- we are interested in, and it will not be passed to the
- handler."""
- send = True
- for key in self.pmatch.keys():
- if (not message.has_key(key)):
- send = False
- break
- if self.pmatch[key] != None:
- if message[key] != self.pmatch[key]:
- send = False
- break
- if send == False:
- return None
- for key in message.keys():
- if self.nmatch.has_key(key):
- if self.nmatch[key] == None:
- send = False
- break
- if self.nmatch[key] == message[key]:
- send = False
- break
- if send == False:
- return None
- return message
-
-
-
-class Publisher(object):
- '''Superclass for pub/sub publishers
-
- FIXME: use finer-grained locking'''
- def __init__(self, broker, aggregate=100):
- self.pending = []
- self.pendingLock = threading.Lock()
- self.aggregateSize = aggregate
- self.broker = broker
- @synchronizedmethod
- def publish(self, message):
- if message.has_key('aggregate') and message['aggregate'] == 'True':
- self.aggregate(message)
- return
- else:
- self.broker.publish(message)
- @synchronizedmethod
- def publishList(self, messages):
- self.broker.publishList(messages)
- @synchronizedmethod
- def aggregate(self, message):
- # we can make this lock-less by using a queue for pending
- # messages
- self.pendingLock.acquire()
- self.pending.append(message)
- if len(self.pending) >= self.aggregateSize:
- self.broker.publishList(self.pending)
- self.pending = []
- self.pendingLock.release()
- @synchronizedmethod
- def setBroker(self, broker):
- self.broker = broker
-
-##############################
-# Testing Code
-##############################
-import time
-import unittest
-import sys
-import logging
-
-
-class TestSubscriber(Subscriber):
- def __init__(self, *args, **kwargs):
- self.queue = Queue.Queue()
- Subscriber.__init__(self, *args, **kwargs)
- def handle(self, message):
- self.queue.put(message)
-
-class TestMessaging(unittest.TestCase):
- def setUp(self):
- self.broker = MessageBroker()
- self.publisher = Publisher(self.broker)
- self.subscriber = TestSubscriber(self.broker)
- def testPublish(self):
- self.publisher.publish( {'message':'hello world'} )
- self.assertEqual(self.subscriber.queue.qsize(), 1)
- def testPublishList(self):
- nrmsgs = 10
- msgs = []
- for i in range(nrmsgs):
- msgs.append( {'msgnum':str(i)} )
- self.publisher.publishList( msgs )
- self.assertEqual(self.subscriber.queue.qsize(), nrmsgs)
- def testAggregate(self):
- nrmsgs = self.publisher.aggregateSize
- for i in range(nrmsgs):
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- self.publisher.aggregate( {'msgnum':str(i)} )
- self.assertEqual(self.subscriber.queue.qsize(), nrmsgs)
- def testAggregateKeyword(self):
- nrmsgs = self.publisher.aggregateSize
- for i in range(nrmsgs):
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- self.publisher.publish( {'msgnum':str(i), 'aggregate':'True'} )
- self.assertEqual(self.subscriber.queue.qsize(), nrmsgs)
-
-if __name__ == '__main__':
-
- logging.basicConfig(level=logging.INFO,
- format="%(asctime)s %(levelname)s:\t %(message)s",
- stream=sys.stdout)
-
- suite = unittest.TestLoader().loadTestsFromTestCase(TestMessaging)
- unittest.TextTestRunner(verbosity=2).run(suite)
-
- sys.exit(0)
-
-
-##############################
-# Old/Unused testing code
-##############################
-
-
-
- print 'testing removeSubscriber'
- broker.removeSubscriber(subscriber)
- publisher.publish( {'message':"you shouldn't see this"} )
-
- nsub = NullSubscriber(broker)
- print 'timing publish'
- nrmsg = 100000
- tt = time.time()
- for i in range(nrmsg):
-# publisher.publish( {"message":"hello world!"} )
- publisher.publish( {} )
- tt = time.time() - tt
- print "Published %i messages in %f seconds, %f msg/s"%(nrmsg,
- tt,
- nrmsg/tt)
- broker.removeSubscriber(nsub)
-
- class SlowSubscriber(Subscriber):
- def handle(self, message):
- print 'called slow subscriber with message', message
- time.sleep(1)
- print 'returning from slow subscriber with message', message
- class ThreadedSubscriber(Subscriber):
- @threaded
- def handle(self, message):
- print 'called threaded subscriber with message', message
- time.sleep(1)
- print 'returning from threaded subscriber with message', message
- class ThreadPoolSubscriber(Subscriber, ThreadPoolClass):
- @threadpoolmethod
- def handle(self, message):
- print 'called threadpool subscriber with message', message
- time.sleep(1)
- print 'returning from threadpool subscriber with message', message
-
-
-
- tsub = ThreadedSubscriber(broker)
- for i in range(8):
- publisher.publish( {"msg":str(i)} )
- broker.removeSubscriber(tsub)
- time.sleep(3)
-
- tpsub = ThreadPoolSubscriber(broker)
- for i in range(8):
- publisher.publish( {"msg":str(i)} )
- broker.removeSubscriber(tpsub)
- time.sleep(3)
-
- ssub = SlowSubscriber(broker)
- for i in range(4):
- publisher.publish( {"msg":str(i)} )
- broker.removeSubscriber(ssub)
diff --git a/src/tashi/messaging/soapmessaging.py b/src/tashi/messaging/soapmessaging.py
deleted file mode 100755
index be35fc9..0000000
--- a/src/tashi/messaging/soapmessaging.py
+++ /dev/null
@@ -1,229 +0,0 @@
-#! /usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from messaging import *
-
-import cPickle
-import soaplib.wsgi_soap
-import cherrypy.wsgiserver
-from soaplib.service import soapmethod
-from soaplib.serializers.primitive import *
-import SOAPpy.WSDL
-import time
-
-class MessageBrokerSoap(soaplib.wsgi_soap.SimpleWSGISoapApp, MessageBroker):
- def __init__(self, port):
- soaplib.wsgi_soap.SimpleWSGISoapApp.__init__(self)
- MessageBroker.__init__(self)
- self.port = port
- def trdfn():
- service = self
- server = cherrypy.wsgiserver.CherryPyWSGIServer(("0.0.0.0",port), service)
- server.start()
- threading.Thread(target=trdfn).start()
-
-
- @soapmethod(Array(String), Array(String), _returns=Null)
- def log(self, keys, values):
- message = {}
- if len(keys) != len(values):
- raise Exception, "Different lengths for keys and values"
- for i in range(len(keys)):
- message[keys[i]] = values[i]
- MessageBroker.log(self, message)
-
- @soapmethod(String, Integer, _returns=Null)
- def addSubscriber(self, host, port):
- subscriber = SubscriberSoapProxy(host, port)
- MessageBroker.addSubscriber(self, subscriber)
-
- @soapmethod(String, Integer, _returns=Null)
- def removeSubscriber(self, host, port):
- # should this method really be able to peek into subscriber.host/port
- subscriber = None
- subscribers = self.getSubscribers()
- for subscriber in subscribers:
- if subscriber.host == host and subscriber.port == port:
- subscriber = subscriber
- if subscriber != None:
- MessageBroker.removeSubscriber(self, subscriber)
-
-
- @soapmethod(Array(String), Array(String), _returns=Null)
- def publish(self, keys, values):
- message = {}
- if len(keys) != len(values):
- raise Exception, "Different lengths for keys and values"
- for i in range(len(keys)):
- message[keys[i]] = values[i]
- MessageBroker.publish(self, message)
-
-
-
-class MessageBrokerSoapProxy(object):
- def __init__(self, host, port):
- self.host = host
- self.port = port
- self.connection = SOAPpy.WSDL.Proxy("http://%s:%i/.wsdl"%(host,port))
- def log(self, message):
- keys = []
- values = []
- for k,v in message.items():
- keys.append(k)
- values.append(v)
- self.connection.log(keys=keys, values=values)
- def addSubscriber(self, subscriber):
- self.connection.addSubscriber(host=subscriber.host, port=subscriber.port)
- def publish(self, message):
- keys = []
- values = []
- for k,v in message.items():
- keys.append(k)
- values.append(v)
- self.connection.publish(keys=keys, values=values)
- def removeSubscriber(self, subscriber):
- self.connection.removeSubscriber(host=subscriber.host, port=subscriber.port)
-
-
-
-
-class SubscriberSoap(soaplib.wsgi_soap.SimpleWSGISoapApp, Subscriber):
- def __init__(self, broker, port, synchronized=False):
- soaplib.wsgi_soap.SimpleWSGISoapApp.__init__(self)
- Subscriber.__init__(self, synchronized=synchronized)
- self.host = socket.gethostname()
- self.port = port
- self.broker = broker
- self.server = None
- def trdfn():
- service = self
- self.server = cherrypy.wsgiserver.CherryPyWSGIServer(("0.0.0.0",port), service)
- self.server.start()
- threading.Thread(target=trdfn).start()
-# broker.log("Subscriber started")
- broker.addSubscriber(self)
- @soapmethod(Array(String), Array(String), _returns=Integer)
- def publish(self, keys, values):
- message = {}
- if len(keys) != len(values):
- raise Exception, "Different lengths for keys and values"
- for i in range(len(keys)):
- message[keys[i]] = values[i]
- Subscriber.publish(self, message)
- return 0
- def stop(self):
- self.server.stop()
-
-class SubscriberSoapProxy(object):
- def __init__(self, host, port):
- self.host = host
- self.port = port
- self.connection = SOAPpy.WSDL.Proxy("http://%s:%i/.wsdl"%(host,port))
- def publish(self, message):
- keys = []
- values = []
- for k,v in message.items():
- keys.append(k)
- values.append(v)
- self.connection.publish(keys=keys, values=values)
-
-
-####################
-# Testing Code
-####################
-
-class CustomSubscriber(SubscriberSoap):
- def handle(self, message):
- print "Custom Subscriber: '%s'" % str(message)
-
-class NullSubscriber(SubscriberSoap):
- def handle(self, message):
- pass
-
-
-if __name__ == '__main__':
- try:
- portnum = 1717
-
- print "\ntesting message broker"
- broker = MessageBrokerSoap(portnum)
- proxy = MessageBrokerSoapProxy("localhost", portnum)
- portnum = portnum + 1
-
- print "\ntesting log function"
- proxy.log( {"message":"Hello World!"} )
-# proxy.log("It looks like log works")
-
- print "\ntesting subscriber proxy"
- subscriber = SubscriberSoap(proxy, portnum)
- portnum = portnum + 1
-
- print "\ntesting custom subscriber"
- csub = CustomSubscriber(proxy, portnum)
- portnum = portnum + 1
-
- print "\ntesting publish"
- proxy.publish( {"message":"Hello World!"} )
-
- print "\ntesting stop"
- subscriber.stop()
- proxy.publish( {"message":"Everybody here?"} )
-
- print "\ntesting removeSubscriber"
- proxy.removeSubscriber(csub)
- proxy.publish( {"message":"Nobody home"} )
- proxy.addSubscriber(csub)
- proxy.publish( {"message":"You're back!"} )
-
- print "\ntesting filter"
- csub.setMatch( {"print":"yes"} )
- proxy.publish( {"print":"yes", "message":"this should be printed"} )
- proxy.publish( {"print":"no", "message":"this should NOT be printed"} )
- csub.setMatch()
-
- print "\ntesting publish performance"
- proxy.removeSubscriber(csub)
- nrmsg = 10000
- tt = time.time()
- for i in range(nrmsg):
- proxy.publish( {"message":"msg %i"%i} )
- tt = time.time() - tt
- print "Published %i messages in %f seconds, %f msg/s"%(nrmsg,
- tt,
- nrmsg/tt)
-
- print "\ntesting publish/subscribe performance"
- nsub = NullSubscriber(proxy, portnum)
- portnum = portnum + 1
- nrmsg = 10000
- tt = time.time()
- for i in range(nrmsg):
- proxy.publish( {"message":"msg %i"%i} )
- tt = time.time() - tt
- print "Published %i messages in %f seconds, %f msg/s"%(nrmsg,
- tt,
- nrmsg/tt)
-
-
-
- except Exception, e:
-# raise e
- print e
- sys.exit(0)
- sys.exit(0)
diff --git a/src/tashi/messaging/tashimessaging.py b/src/tashi/messaging/tashimessaging.py
deleted file mode 100644
index 006400f..0000000
--- a/src/tashi/messaging/tashimessaging.py
+++ /dev/null
@@ -1,148 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from thriftmessaging import *
-import logging
-import Queue
-from ConfigParser import ConfigParser
-import time
-import socket
-import signal
-
-class TashiLogHandler(logging.Handler, PublisherThrift):
- def __init__(self, config, *args, **kwargs):
- self.messages = Queue.Queue()
- self.config = config
- logging.Handler.__init__(self, *args, **kwargs)
- PublisherThrift.__init__(self,
- config.get('MessageBroker', 'host'),
- int(config.get('MessageBroker', 'port')))
- def emit(self, record):
- # 'args', 'created', 'exc_info', 'exc_text', 'filename',
- # 'funcName', 'getMessage', 'levelname', 'levelno', 'lineno',
- # 'module', 'msecs', 'msg', 'name', 'pathname', 'process',
- # 'relativeCreated', 'thread', 'threadName']
- msg = {}
- # args
- # created
- # exc_info
- # exc_text
- msg['log-filename'] = str(record.filename)
- msg['log-funcname'] = str(record.funcName)
- msg['log-levelname'] = str(record.levelname)
- msg['log-level'] = str(record.levelno)
- msg['log-lineno'] = str(record.lineno)
- msg['log-module'] = str(record.module)
- msg['log-msecs'] = str(record.msecs)
- msg['log-message'] = str(record.msg)
- msg['log-name'] = str(record.name)
- msg['log-pathname'] = str(record.pathname)
- msg['log-process'] = str(record.process)
- # relativeCreated
- msg['log-thread'] = str(record.thread)
- msg['log-threadname'] = str(record.threadName)
-
- # standard message fields
- msg['timestamp'] = str(time.time())
- msg['hostname'] = socket.gethostname()
- msg['message-type'] = 'log'
-
- self.messages.put(msg)
- self.publish(msg)
-
-class TashiSubscriber(SubscriberThrift):
- def __init__(self, config, port, **kwargs):
- sys.stdout.flush()
- brokerPort = int(config.get('MessageBroker', 'port'))
- self.broker = MessageBrokerThriftProxy(config.get('MessageBroker', 'host'), brokerPort)
- SubscriberThrift.__init__(self, self.broker, port, **kwargs)
-
-
-
-##############################
-# Test Code
-##############################
-import unittest
-import sys
-
-class TestTashiSubscriber(TashiSubscriber):
- def __init__(self, *args, **kwargs):
- self.messageQueue = Queue.Queue()
- TashiSubscriber.__init__(self, *args, **kwargs)
- def handle(self, message):
- self.messageQueue.put(message)
-
-
-def incrementor(start = 0):
- while True:
- a = start
- start = start + 1
- yield a
-increment = incrementor()
-
-class TestTashiMessaging(unittest.TestCase):
- def setUp(self):
- self.configFiles = [ '../../../etc/TestConfig.cfg']
- self.config = ConfigParser()
- self.configFiles = self.config.read(self.configFiles)
- self.port = int(self.config.get('MessageBroker', 'port'))
-
- try:
- self.brokerPid = os.spawnlpe(os.P_NOWAIT, 'python', 'python',
- './messageBroker.py',
- '--port', str(self.port),
- os.environ)
- self.port = self.port + 1
- # FIXME: what's the best way to wait for the broker to be ready?
- time.sleep(1)
- except Exception, e:
- sys.exit(0)
- self.initialized = True
- self.log = logging.getLogger('TestTashiMessaging')
- self.handler = TashiLogHandler(self.config)
- self.log.addHandler(self.handler)
- self.sub = TestTashiSubscriber(self.config, int(self.port) + increment.next())
- def tearDown(self):
- os.kill(self.brokerPid, signal.SIGKILL)
- # FIXME: wait for the port to be ready again
- time.sleep(2)
- self.log.removeHandler(self.handler)
-# self.sub.broker.removeSubscriber(self.sub)
- pass
- def testLog(self):
- self.log.log(50, "Hello World!")
- self.handler.messages.get(timeout=5)
- self.sub.messageQueue.get(timeout=5)
- self.assertEqual(self.handler.messages.qsize(), 0)
- self.assertEqual(self.sub.messageQueue.qsize(), 0)
- def testPublish(self):
- sys.stdout.flush()
- self.port = self.port + 1
- self.handler.publish({'message':'hello world'})
- self.sub.messageQueue.get(timeout=5)
- self.assertEqual(self.sub.messageQueue.qsize(), 0)
-
-
-if __name__=='__main__':
-
-
-# logging.basicConfig(level=logging.INFO,
-# format="%(asctime)s %(levelname)s:\t %(message)s",
-# stream=sys.stdout)
-
- suite = unittest.TestLoader().loadTestsFromTestCase(TestTashiMessaging)
- unittest.TextTestRunner(verbosity=2).run(suite)
diff --git a/src/tashi/messaging/threadpool.py b/src/tashi/messaging/threadpool.py
deleted file mode 100644
index 5684ef2..0000000
--- a/src/tashi/messaging/threadpool.py
+++ /dev/null
@@ -1,305 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import threading
-import time
-import Queue
-import logging
-
-_log = logging.getLogger('tashi.messaging.threadpool')
-
-def threaded(func):
- def fn(*args, **kwargs):
- thread = threading.Thread(target=func, args=args, kwargs=kwargs)
- thread.start()
- return thread
- return fn
-
-
-class ThreadPool(Queue.Queue):
- def __init__(self, size=8, maxsize=0):
- Queue.Queue.__init__(self, maxsize)
- for i in range(size):
- thread = threading.Thread(target=self._worker)
- thread.setDaemon(True)
- thread.start()
- def _worker(self):
- while True:
- try:
- func, args, kwargs = self.get()
- func(*args, **kwargs)
- except Exception, e:
- _log.error(e)
- # FIXME: do something smarter here, backtrace, log,
- # allow user-defined error handling...
-
- def submit(self, func, *args, **kwargs):
- self.put((func, args, kwargs))
- def submitlist(self, func, args, kwargs):
- self.put((func, args, kwargs))
-
-class ThreadPoolClass:
- def __init__(self, size=8, maxsize=0):
- self._threadpool_pool = ThreadPool(size=size, maxsize=maxsize)
-
-
-def threadpool(pool):
- def dec(func):
- def fn(*args, **kwargs):
- pool.submit(func, *args, **kwargs)
- return fn
- return dec
-
-def threadpoolmethod(meth):
- def fn(*args, **kwargs):
- try:
- pool = args[0]._threadpool_pool
- except AttributeError:
- pool = args[0].__dict__.setdefault('_threadpool_pool', ThreadPool())
- # FIXME: how do we check parent class?
-# assert args[0].__class__ == ThreadPoolClass, "Thread pool method must be in a ThreadPoolClass"
- pool.submit(meth, *args, **kwargs)
- return fn
-
-def synchronized(lock=None):
- _log.debug('synchronized decorator factory called')
- if lock==None:
- lock = threading.RLock()
- def dec(func):
- _log.debug('synchronized decorator called')
- def fn(*args, **kwargs):
- _log.debug('getting sync lock')
- lock.acquire()
- _log.debug('got sync lock')
- ex = None
- try:
- r = func(*args, **kwargs)
- except Exception, e:
- ex = e
- _log.debug('releasing sync lock')
- lock.release()
- _log.debug('released sync lock')
- if ex != None:
- raise e
- return r
- return fn
- return dec
-
-def synchronizedmethod(func):
- def fn(*args, **kwargs):
- try:
- lock = args[0]._synchronized_lock
- except AttributeError:
- lock = args[0].__dict__.setdefault('_synchronized_lock', threading.RLock())
- lock.acquire()
- ex = None
- try:
- func(*args, **kwargs)
- except Exception, e:
- ex = e
- lock.release()
- if ex != None:
- raise e
- return fn
-
-
-##############################
-# Test Code
-##############################
-import unittest
-import sys
-import time
-
-class TestThreadPool(unittest.TestCase):
- def setUp(self):
- self.errmargin = 0.5
-
- def testUnthreaded(self):
- queue = Queue.Queue()
- def slowfunc(sleep=1):
- time.sleep(sleep)
- queue.put(None)
- tt = time.time()
- for i in range(4):
- slowfunc()
- for i in range(4):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 4, 1)
-
- def testThreaded(self):
- queue = Queue.Queue()
- @threaded
- def slowthreadfunc(sleep=1):
- time.sleep(sleep)
- queue.put(None)
- tt = time.time()
- for i in range(8):
- slowthreadfunc()
- for i in range(8):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 1, 1)
-
- def testThreadPool(self):
- pool = ThreadPool(size=4)
- queue = Queue.Queue()
- @threadpool(pool)
- def slowpoolfunc(sleep=1):
- time.sleep(sleep)
- queue.put(None)
- tt = time.time()
- for i in range(8):
- slowpoolfunc()
- for i in range(8):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 2, 1)
-
- def testUnthreadedMethod(self):
- queue = Queue.Queue()
- class slowclass:
- def __init__(self, sleep=1):
- self.sleep=sleep
- def beslow(self):
- time.sleep(self.sleep)
- queue.put(None)
- sc = slowclass()
- tt = time.time()
- for i in range(4):
- sc.beslow()
- for i in range(4):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 4, 1)
-
- def testThreadedMethod(self):
- queue = Queue.Queue()
- class slowclass:
- def __init__(self, sleep=1):
- self.sleep=sleep
- @threaded
- def beslow(self):
- time.sleep(self.sleep)
- queue.put(None)
- sc = slowclass()
- tt = time.time()
- for i in range(4):
- sc.beslow()
- for i in range(4):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 1, 1)
-
- def testThreadPoolMethod(self):
- queue = Queue.Queue()
- class slowclass:
- def __init__(self, sleep=1):
- self.sleep=sleep
- @threadpoolmethod
- def beslow(self):
- time.sleep(self.sleep)
- queue.put(None)
- sc = slowclass()
- tt = time.time()
- for i in range(16):
- sc.beslow()
- for i in range(16):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 2, 1)
-
- def testSynchronized(self):
- queue = Queue.Queue()
- @synchronized()
- def addtoqueue():
- time.sleep(1)
- queue.put(None)
- @threaded
- def slowthreadfunc():
- addtoqueue()
- tt = time.time()
- for i in range(4):
- slowthreadfunc()
- for i in range(4):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 4, 1)
-
- def testSynchronizedMethod(self):
- queue = Queue.Queue()
- class addtoqueue:
- @synchronizedmethod
- def addtoqueue1(self):
- time.sleep(1)
- queue.put(None)
- @synchronizedmethod
- def addtoqueue2(self):
- time.sleep(1)
- queue.put(None)
- atc = addtoqueue()
- @threaded
- def slowthreadfunc1():
- atc.addtoqueue1()
- @threaded
- def slowthreadfunc2():
- atc.addtoqueue2()
- tt = time.time()
- for i in range(4):
- slowthreadfunc1()
- slowthreadfunc2()
- for i in range(8):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 8, 1)
-
- def testUnsynchronizedMethod(self):
- queue = Queue.Queue()
- class addtoqueue:
- def addtoqueue1(self):
- time.sleep(1)
- queue.put(None)
- def addtoqueue2(self):
- time.sleep(1)
- queue.put(None)
- atc = addtoqueue()
- @threaded
- def slowthreadfunc1():
- atc.addtoqueue1()
- @threaded
- def slowthreadfunc2():
- atc.addtoqueue2()
- tt = time.time()
- for i in range(4):
- slowthreadfunc1()
- slowthreadfunc2()
- for i in range(8):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 1, 1)
-
-
-
-if __name__=='__main__':
- import sys
-
- logging.basicConfig(level=logging.INFO,
- format="%(asctime)s %(levelname)s:\t %(message)s",
- stream=sys.stdout)
-
- suite = unittest.TestLoader().loadTestsFromTestCase(TestThreadPool)
- unittest.TextTestRunner(verbosity=2).run(suite)
diff --git a/src/tashi/messaging/thriftmessaging.py b/src/tashi/messaging/thriftmessaging.py
deleted file mode 100755
index 0c73ff0..0000000
--- a/src/tashi/messaging/thriftmessaging.py
+++ /dev/null
@@ -1,278 +0,0 @@
-#!/usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import sys
-import time
-import socket
-import traceback
-import threading
-
-sys.path.append('./gen-py')
-import tashi.messaging.messagingthrift
-import tashi.messaging.messagingthrift.MessageBrokerThrift
-import tashi.messaging.messagingthrift.SubscriberThrift
-from tashi.messaging.messagingthrift.ttypes import *
-
-from thrift import Thrift
-from thrift.transport import TSocket
-from thrift.transport import TTransport
-from thrift.protocol import TBinaryProtocol
-from thrift.server import TServer
-
-from tashi import ConnectionManager
-
-from tashi.messaging.messaging import *
-from tashi.messaging.threadpool import ThreadPoolClass, threadpool, ThreadPool, threadpoolmethod, threaded
-
-class MessageBrokerThrift(MessageBroker):
- def __init__(self, port, daemon=True):
- MessageBroker.__init__(self)
- self.processor = tashi.messaging.messagingthrift.MessageBrokerThrift.Processor(self)
- self.transport = TSocket.TServerSocket(port)
- self.tfactory = TTransport.TBufferedTransportFactory()
- self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
- self.proxy = ConnectionManager(tashi.messaging.messagingthrift.SubscriberThrift.Client, 0)
- self.ready = threading.Event()
-# self.server = TServer.TSimpleServer(self.processor,
-# self.transport,
-# self.tfactory,
-# self.pfactory)
-# self.server = TServer.TThreadPoolServer(self.processor,
-# self.transport,
-# self.tfactory,
-# self.pfactory)
- self.server = TServer.TThreadedServer(self.processor,
- self.transport,
- self.tfactory,
- self.pfactory)
- self.publishCalls = 0
-
- def ssvrthrd():
- try:
- # FIXME: Race condition, the ready event should be set after
- # starting the server. However, server.serve()
- # doesn't return under normal circumstances. This
- # seems to work in practice, even though it's clearly
- # wrong.
- self.ready.set()
- self.server.serve()
- except Exception, e:
- print e
- sys.stdout.flush()
- pass
- svt = threading.Thread(target=ssvrthrd)
- svt.setDaemon(daemon)
- svt.start()
- self.ready.wait()
- def log(self, message):
- MessageBroker.log(self, message)
- @synchronizedmethod
- def addSubscriber(self, host, port):
- subscribers = self.getSubscribers()
- for sub in subscribers:
- if sub.host == host and sub.port == port:
- return
- subscriber = SubscriberThriftProxy(host, port, self.proxy)
- MessageBroker.addSubscriber(self, subscriber)
- def removeSubscriber(self, host, port):
- subscriber = None
- subscribers = self.getSubscribers()
- for sub in subscribers:
- if sub.host == host and sub.port == port:
- subscriber = sub
- if subscriber != None:
- MessageBroker.removeSubscriber(self, subscriber)
- @synchronizedmethod
- def publish(self, message):
- self.publishCalls = self.publishCalls + 1
- sys.stdout.flush()
- MessageBroker.publish(self, message)
-
-class MessageBrokerThriftProxy:
- def __init__(self, host, port):
- self.host = host
- self.port = port
- self.proxy = ConnectionManager(tashi.messaging.messagingthrift.MessageBrokerThrift.Client,port)
- @synchronizedmethod
- def log(self, message):
- self.proxy[self.host, self.port].log(message)
- @synchronizedmethod
- def publish(self, message):
- self.proxy[self.host, self.port].publish(message)
- @synchronizedmethod
- def publishList(self, messages):
- self.proxy[self.host, self.port].publishList(messages)
- @synchronizedmethod
- def addSubscriber(self, subscriber):
- self.proxy[self.host, self.port].addSubscriber(host=subscriber.host, port=subscriber.port)
- @synchronizedmethod
- def removeSubscriber(self, subscriber):
- self.proxy[self.host, self.port].removeSubscriber(host=subscriber.host, port=subscriber.port)
-
-
-
-class SubscriberThrift(Subscriber, threading.Thread):
- def __init__(self, broker, port, synchronized=False):
- self.host = socket.gethostname()
- self.port = port
- self.processor = tashi.messaging.messagingthrift.SubscriberThrift.Processor(self)
- self.transport = TSocket.TServerSocket(port)
- self.tfactory = TTransport.TBufferedTransportFactory()
- self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
- self.server = TServer.TThreadedServer(self.processor,
- self.transport,
- self.tfactory,
- self.pfactory)
- def ssvrthrd():
- try:
- self.server.serve()
- except Exception, e:
- pass
-
-
- self.thread = threading.Thread(target=ssvrthrd)
- self.thread.setDaemon(True)
- self.thread.start()
-
- # We have to call this AFTER initializing our server, so that
- # the broker can contact us
- # Wrap this in a try/catch because the broker may not be online yet
- try:
- Subscriber.__init__(self, broker, synchronized=synchronized)
- except:
- pass
- threading.Thread.__init__(self)
- self.setDaemon(True)
- self.start()
-
- def stop(self):
-# # FIXME: this is broken, there is no clear way to stop a
-# # Thrift server
- self.broker.removeSubscriber(self)
- self.transport.close()
- def run(self):
- while(True):
- # renew subscription every 5 min
- try:
- self.broker.addSubscriber(self)
- except:
- pass
- time.sleep(5*60)
-
-class SubscriberThriftProxy:
- def __init__(self, host, port, proxy, aggregate = 100):
- self.host = host
- self.port = port
- self.proxy = proxy
- # for some reason, thrift clients are not thread-safe, lock during send
- self.lock = threading.Lock()
- self.pending = []
- self.aggregateSize = aggregate
- def publish(self, message):
- self.lock.acquire()
- sys.stdout.flush()
- if message.has_key('aggregate') and message['aggregate'] == 'True':
- self.pending.append(message)
- if len(self.pending) >= self.aggregateSize:
- try:
- self.proxy[self.host, self.port].publishList(self.pending)
- except Exception, e:
- print e
- self.lock.release()
- raise e
- self.pending = []
- else:
- try:
- self.proxy[self.host, self.port].publish(message)
- except Exception, e:
- sys.stdout.flush()
- print e
- self.lock.release()
- raise e
- self.lock.release()
-
-class PublisherThrift(Publisher):
- def __init__(self, host, port):
- self.host = host
- self.port = port
- self.broker = MessageBrokerThriftProxy(host, port)
- Publisher.__init__(self, self.broker)
-
-####################
-# Testing Code
-####################
-
-class TestSubscriberThrift(SubscriberThrift):
- def __init__(self, *args, **kwargs):
- self.queue = Queue.Queue()
- SubscriberThrift.__init__(self, *args, **kwargs)
- def handle(self, message):
- self.queue.put(message)
-
-portnum = 1718
-class TestThriftMessaging(unittest.TestCase):
- def setUp(self):
- global portnum
- self.broker = MessageBrokerThrift(portnum)
- self.brokerPort = portnum
- portnum = portnum + 1
- self.proxy = MessageBrokerThriftProxy('localhost', self.brokerPort)
- self.publisher = PublisherThrift('localhost', self.brokerPort)
- self.subscriber = TestSubscriberThrift(self.proxy, portnum)
- portnum = portnum + 1
- def tearDown(self):
- pass
- def testSetUp(self):
- pass
- def testPublish(self):
- self.publisher.publish( {'message':'hello world'} )
- self.subscriber.queue.get(True, timeout=5)
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- def testPublishList(self):
- nrmsgs = 10
- msgs = []
- for i in range(nrmsgs):
- msgs.append( {'msgnum':str(i)} )
- self.publisher.publishList( msgs )
- for i in range(nrmsgs):
- self.subscriber.queue.get(True, timeout=5)
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- def testAggregate(self):
- nrmsgs = self.publisher.aggregateSize
- for i in range(nrmsgs):
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- self.publisher.aggregate( {'msgnum':str(i)} )
- for i in range(nrmsgs):
- self.subscriber.queue.get(True, timeout=5)
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- def testAggregateKeyword(self):
- nrmsgs = self.publisher.aggregateSize
- for i in range(nrmsgs):
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- self.publisher.publish( {'msgnum':str(i), 'aggregate':'True'} )
- for i in range(nrmsgs):
- self.subscriber.queue.get(True, timeout=5)
- self.assertEqual(self.subscriber.queue.qsize(), 0)
-
-
-if __name__=='__main__':
- suite = unittest.TestLoader().loadTestsFromTestCase(TestThriftMessaging)
- unittest.TextTestRunner(verbosity=2).run(suite)
-
-
diff --git a/src/tashi/nodemanager/nodemanager.py b/src/tashi/nodemanager/nodemanager.py
index 66d2d5b..2c39903 100755
--- a/src/tashi/nodemanager/nodemanager.py
+++ b/src/tashi/nodemanager/nodemanager.py
@@ -20,8 +20,10 @@
import logging.config
import signal
import sys
+import os
+import time
-from tashi.util import instantiateImplementation, getConfig, debugConsole, signalHandler
+from tashi.util import instantiateImplementation, getConfig, debugConsole
import tashi
from tashi import boolean
@@ -29,12 +31,8 @@
from rpyc.utils.server import ThreadedServer
from rpyc.utils.authenticators import TlsliteVdbAuthenticator
-@signalHandler(signal.SIGTERM)
-def handleSIGTERM(signalNumber, stackFrame):
- sys.exit(0)
-
def main():
- global config, dfs, vmm, service, server, log, notifier
+ global config, log
(config, configFiles) = getConfig(["NodeManager"])
publisher = instantiateImplementation(config.get("NodeManager", "publisher"), config)
@@ -42,6 +40,35 @@
logging.config.fileConfig(configFiles)
log = logging.getLogger(__name__)
log.info('Using configuration file(s) %s' % configFiles)
+
+ # handle keyboard interrupts (http://code.activestate.com/recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/)
+ child = os.fork()
+
+ if child == 0:
+ startNodeManager()
+ # shouldn't exit by itself
+ sys.exit(0)
+
+ else:
+ # main
+ try:
+ os.waitpid(child, 0)
+ except KeyboardInterrupt:
+ log.info("Exiting node manager after receiving a SIGINT signal")
+ os._exit(0)
+ except Exception:
+ log.exception("Abnormal termination of node manager")
+ os._exit(-1)
+
+ log.info("Exiting node manager after service thread exited")
+ os._exit(-1)
+
+ return
+
+def startNodeManager():
+ global config, dfs, vmm, service, server, log, notifier
+ publisher = instantiateImplementation(config.get("NodeManager", "publisher"), config)
+ tashi.publisher = publisher
dfs = instantiateImplementation(config.get("NodeManager", "dfs"), config)
vmm = instantiateImplementation(config.get("NodeManager", "vmm"), config, dfs, None)
service = instantiateImplementation(config.get("NodeManager", "service"), config, vmm)
@@ -59,14 +86,11 @@
t.service._type = 'NodeManagerService'
debugConsole(globals())
-
- try:
- t.start()
- except KeyboardInterrupt:
- handleSIGTERM(signal.SIGTERM, None)
- except Exception, e:
- sys.stderr.write(str(e) + "\n")
- sys.exit(-1)
+
+ t.start()
+ # shouldn't exit by itself
+ sys.exit(0)
+
if __name__ == "__main__":
main()
diff --git a/src/tashi/nodemanager/nodemanagerservice.py b/src/tashi/nodemanager/nodemanagerservice.py
index c493ac9..a1037e7 100755
--- a/src/tashi/nodemanager/nodemanagerservice.py
+++ b/src/tashi/nodemanager/nodemanagerservice.py
@@ -5,15 +5,15 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
-# under the License.
+# under the License.
import logging
import socket
@@ -28,10 +28,10 @@
class NodeManagerService(object):
"""RPC handler for the NodeManager
-
- Perhaps in the future I can hide the dfs from the
+
+ Perhaps in the future I can hide the dfs from the
VmControlInterface and do all dfs operations here?"""
-
+
def __init__(self, config, vmm):
self.config = config
self.vmm = vmm
@@ -76,6 +76,8 @@
self.__registerHost()
+ # XXXstroucki: should make an effort to retry
+ # otherwise vmm will wait forever
self.id = self.cm.registerNodeManager(self.host, self.instances.values())
# XXXstroucki cut cross check for NM/VMM state
@@ -83,18 +85,18 @@
# start service threads
threading.Thread(target=self.__registerWithClusterManager).start()
threading.Thread(target=self.__statsThread).start()
-
+
def __initAccounting(self):
- self.accountBuffer = []
- self.accountLines = 0
- self.accountingClient = None
- try:
- if (self.accountingHost is not None) and \
- (self.accountingPort is not None):
- self.accountingClient=rpycservices.client(self.accountingHost, self.accountingPort)
- except:
- self.log.exception("Could not init accounting")
-
+ self.accountBuffer = []
+ self.accountLines = 0
+ self.accountingClient = None
+ try:
+ if (self.accountingHost is not None) and \
+ (self.accountingPort is not None):
+ self.accountingClient = ConnectionManager(self.username, self.password, self.accountingPort)[self.accountingHost]
+ except:
+ self.log.exception("Could not init accounting")
+
def __loadVmInfo(self):
try:
self.instances = self.vmm.getInstances()
@@ -112,6 +114,8 @@
notifyCM = []
try:
while (len(self.notifyCM) > 0):
+ # XXXstroucki ValueError: need more than 1 value to unpack
+ # observed here. How?
value = self.notifyCM.pop(0)
(instanceId, newInst, old, success) = value
try:
@@ -135,7 +139,7 @@
#if (toSleep > 0):
#time.sleep(toSleep)
- def __ACCOUNTFLUSH(self):
+ def __ACCOUNTFLUSH(self):
try:
if (self.accountingClient is not None):
self.accountingClient.record(self.accountBuffer)
@@ -145,33 +149,33 @@
self.log.exception("Failed to flush accounting data")
- def __ACCOUNT(self, text, instance=None, host=None):
- now = time.time()
- instanceText = None
- hostText = None
+ def __ACCOUNT(self, text, instance=None, host=None):
+ now = time.time()
+ instanceText = None
+ hostText = None
- if instance is not None:
+ if instance is not None:
try:
- instanceText = 'Instance(%s)' % (instance)
+ instanceText = 'Instance(%s)' % (instance)
except:
self.log.exception("Invalid instance data")
- if host is not None:
+ if host is not None:
try:
- hostText = "Host(%s)" % (host)
+ hostText = "Host(%s)" % (host)
except:
self.log.exception("Invalid host data")
- secondary = ','.join(filter(None, (hostText, instanceText)))
+ secondary = ','.join(filter(None, (hostText, instanceText)))
- line = "%s|%s|%s" % (now, text, secondary)
+ line = "%s|%s|%s" % (now, text, secondary)
- self.accountBuffer.append(line)
- self.accountLines += 1
+ self.accountBuffer.append(line)
+ self.accountLines += 1
# XXXstroucki think about force flush every so often
- if (self.accountLines > 0):
- self.__ACCOUNTFLUSH()
+ if (self.accountLines > 0):
+ self.__ACCOUNTFLUSH()
# service thread function
@@ -213,14 +217,14 @@
self.log.exception('statsThread threw an exception')
time.sleep(self.statsInterval)
- def __registerHost(self):
- hostname = socket.gethostname()
+ def __registerHost(self):
+ hostname = socket.gethostname()
# populate some defaults
# XXXstroucki: I think it's better if the nodemanager fills these in properly when registering with the clustermanager
memory = 0
cores = 0
version = "empty"
- #self.cm.registerHost(hostname, memory, cores, version)
+ #self.cm.registerHost(hostname, memory, cores, version)
def __getInstance(self, vmId):
instance = self.instances.get(vmId, None)
@@ -235,7 +239,7 @@
raise TashiException(d={'errno':Errors.NoSuchVmId,'msg':"There is no vmId %d on this host" % (vmId)})
-
+
# remote
# Called from VMM to update self.instances
# but only changes are Exited, MigrateTrans and Running
@@ -252,11 +256,11 @@
# make a note of mismatch, but go on.
# the VMM should know best
self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
-
+
instance.state = cur
self.__ACCOUNT("NM VM STATE CHANGE", instance=instance)
-
+
newInst = Instance(d={'state':cur})
success = lambda: None
# send the state change up to the CM
@@ -278,8 +282,8 @@
def createInstance(self, instance):
vmId = instance.vmId
self.instances[vmId] = instance
-
-
+
+
# remote
def instantiateVm(self, instance):
self.__ACCOUNT("NM VM INSTANTIATE", instance=instance)
@@ -291,7 +295,7 @@
return vmId
except:
self.log.exception("Failed to start instance")
-
+
# remote
def suspendVm(self, vmId, destination):
instance = self.__getInstance(vmId)
@@ -300,10 +304,12 @@
instance.state = InstanceState.Suspending
self.instances[vmId] = instance
threading.Thread(target=self.vmm.suspendVm, args=(vmId, destination)).start()
-
+
# called by resumeVm as thread
def __resumeVmHelper(self, instance, name):
self.vmm.resumeVmHelper(instance, name)
+ # XXXstroucki should the VMM be responsible for setting
+ # state? It should know better.
instance.state = InstanceState.Running
newInstance = Instance(d={'id':instance.id,'state':instance.state})
success = lambda: None
@@ -323,7 +329,7 @@
self.log.exception('resumeVm failed')
raise TashiException(d={'errno':Errors.UnableToResume,'msg':"resumeVm failed on the node manager"})
return instance.vmId
-
+
# remote
def prepReceiveVm(self, instance, source):
self.__ACCOUNT("NM VM MIGRATE RECEIVE PREP")
@@ -353,7 +359,7 @@
self.instances[vmId] = instance
threading.Thread(target=self.__migrateVmHelper, args=(instance, target, transportCookie)).start()
return
-
+
# called by receiveVm as thread
# XXXstroucki migrate in?
def __receiveVmHelper(self, instance, transportCookie):
@@ -429,4 +435,3 @@
# remote
def liveCheck(self):
return "alive"
-
diff --git a/src/tashi/nodemanager/vmcontrol/qemu.py b/src/tashi/nodemanager/vmcontrol/qemu.py
index 708d70d..d294fc7 100644
--- a/src/tashi/nodemanager/vmcontrol/qemu.py
+++ b/src/tashi/nodemanager/vmcontrol/qemu.py
@@ -132,9 +132,7 @@
def __getHostPids(self):
"""Utility function to get a list of system PIDs that match the QEMU_BIN specified (/proc/nnn/exe)"""
pids = []
- real_bin = self.QEMU_BIN
- while os.path.islink(real_bin):
- real_bin = os.readlink(self.QEMU_BIN)
+ real_bin = os.path.realpath(self.QEMU_BIN)
for f in os.listdir("/proc"):
try:
@@ -210,7 +208,7 @@
if self.scratchVg is not None:
log.info("Removing any scratch for %s" % (name))
cmd = "/sbin/lvremove --quiet -f %s" % self.scratchVg
- result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE, stderr=open(os.devnull, "w"), close_fds=True).wait()
+ result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE, stderr=open(os.devnull, "w"), close_fds=True).wait()
except:
log.warning("Problem cleaning scratch volumes")
pass
@@ -323,12 +321,12 @@
#print "[NEE]: %s" % (needle)
(rlist, wlist, xlist) = select.select([monitorFd], [], [], timeout)
if (len(rlist) == 0):
- log.error("Timeout getting results from monitor for vmId %d" % (child.pid))
+ log.error("Timeout getting results from monitor on FD %s for vmId %d" % (monitorFd, child.pid))
child.errorBit = True
raise RuntimeError
c = os.read(monitorFd, 1)
if (c == ""):
- log.error("Early termination on monitor for vmId %d" % (child.pid))
+ log.error("Early termination on monitor FD %s for vmId %d" % (monitorFd, child.pid))
child.errorBit = True
raise RuntimeError
buf = buf + c
@@ -504,8 +502,14 @@
nicModel = self.__stripSpace(nicModel)
nicString = ""
+ nicNetworks = {}
for i in range(0, len(instance.nics)):
+ # Don't allow more than one interface per vlan
nic = instance.nics[i]
+ if nicNetworks.has_key(nic.network):
+ continue
+ nicNetworks[nic.network] = True
+
nicString = nicString + "-net nic,macaddr=%s,model=%s,vlan=%d -net tap,ifname=%s%d.%d,vlan=%d,script=/etc/qemu-ifup.%d " % (nic.mac, nicModel, nic.network, self.ifPrefix, instance.id, i, nic.network, nic.network)
# ACPI
@@ -645,7 +649,8 @@
# extern
def resumeVmHelper(self, instance, source):
- child = self.__getChildFromPid(instance.vmId)
+ vmId = instance.vmId
+ child = self.__getChildFromPid(vmId)
try:
self.__getPtyInfo(child, True)
except RuntimeError:
@@ -654,8 +659,13 @@
raise
status = "paused"
while ("running" not in status):
- status = self.__enterCommand(child, "info status")
- time.sleep(1)
+ try:
+ status = self.__enterCommand(child, "info status")
+ except RuntimeError:
+ pass
+ time.sleep(60)
+
+ self.nm.vmStateChange(vmId, None, InstanceState.Running)
child.instance.state = InstanceState.Running
self.__saveChildInfo(child)
@@ -846,11 +856,63 @@
def listVms(self):
return self.controlledVMs.keys()
+ def __processVmStats(self, vmId):
+ try:
+ f = open("/proc/%d/stat" % (vmId))
+ procData = f.read()
+ f.close()
+ except:
+ log.warning("Unable to get data for instance %d" % vmId)
+ return
+
+ ws = procData.strip().split()
+ userTicks = float(ws[13])
+ sysTicks = float(ws[14])
+ myTicks = userTicks + sysTicks
+ vsize = (int(ws[22]))/1024.0/1024.0
+ rss = (int(ws[23])*4096)/1024.0/1024.0
+ cpuSeconds = myTicks/self.ticksPerSecond
+ # XXXstroucki be more exact here?
+ last = time.time() - self.statsInterval
+ lastCpuSeconds = self.cpuStats.get(vmId, cpuSeconds)
+ if lastCpuSeconds is None:
+ lastCpuSeconds = cpuSeconds
+ cpuLoad = (cpuSeconds - lastCpuSeconds)/(time.time() - last)
+ self.cpuStats[vmId] = cpuSeconds
+ try:
+ child = self.controlledVMs[vmId]
+ except:
+ log.warning("Unable to obtain information on instance %d" % vmId)
+ return
+
+ (recvMBs, sendMBs, recvBytes, sendBytes) = (0.0, 0.0, 0.0, 0.0)
+ for i in range(0, len(child.instance.nics)):
+ netDev = "%s%d.%d" % (self.ifPrefix, child.instance.id, i)
+ (tmpRecvMBs, tmpSendMBs, tmpRecvBytes, tmpSendBytes) = self.netStats.get(netDev, (0.0, 0.0, 0.0, 0.0))
+ (recvMBs, sendMBs, recvBytes, sendBytes) = (recvMBs + tmpRecvMBs, sendMBs + tmpSendMBs, recvBytes + tmpRecvBytes, sendBytes + tmpSendBytes)
+ self.stats[vmId] = self.stats.get(vmId, {})
+ child = self.controlledVMs.get(vmId, None)
+ if (child):
+ res = self.__enterCommand(child, "info blockstats")
+ for l in res.split("\n"):
+ (device, sep, data) = stringPartition(l, ": ")
+ if (data != ""):
+ for field in data.split(" "):
+ (label, sep, val) = stringPartition(field, "=")
+ if (val != ""):
+ self.stats[vmId]['%s_%s_per_s' % (device, label)] = (float(val) - float(self.stats[vmId].get('%s_%s' % (device, label), 0)))/self.statsInterval
+ self.stats[vmId]['%s_%s' % (device, label)] = int(val)
+ self.stats[vmId]['cpuLoad'] = cpuLoad
+ self.stats[vmId]['rss'] = rss
+ self.stats[vmId]['vsize'] = vsize
+ self.stats[vmId]['recvMBs'] = sendMBs
+ self.stats[vmId]['sendMBs'] = recvMBs
+
# thread
def statsThread(self):
- ticksPerSecond = float(os.sysconf('SC_CLK_TCK'))
- netStats = {}
- cpuStats = {}
+ self.ticksPerSecond = float(os.sysconf('SC_CLK_TCK'))
+ self.netStats = {}
+ self.cpuStats = {}
# XXXstroucki be more exact here?
last = time.time() - self.statsInterval
while True:
@@ -866,7 +928,7 @@
ws = ld.split()
recvBytes = float(ws[0])
sendBytes = float(ws[8])
- (recvMBs, sendMBs, lastRecvBytes, lastSendBytes) = netStats.get(dev, (0.0, 0.0, recvBytes, sendBytes))
+ (recvMBs, sendMBs, lastRecvBytes, lastSendBytes) = self.netStats.get(dev, (0.0, 0.0, recvBytes, sendBytes))
if (recvBytes < lastRecvBytes):
# We seem to have overflowed
# XXXstroucki How likely is this to happen?
@@ -882,44 +944,12 @@
lastSendBytes = lastSendBytes - 2**32
recvMBs = (recvBytes-lastRecvBytes)/(now-last)/1024.0/1024.0
sendMBs = (sendBytes-lastSendBytes)/(now-last)/1024.0/1024.0
- netStats[dev] = (recvMBs, sendMBs, recvBytes, sendBytes)
+ self.netStats[dev] = (recvMBs, sendMBs, recvBytes, sendBytes)
+
+
for vmId in self.controlledVMs:
- f = open("/proc/%d/stat" % (vmId))
- procData = f.read()
- f.close()
- ws = procData.strip().split()
- userTicks = float(ws[13])
- sysTicks = float(ws[14])
- myTicks = userTicks + sysTicks
- vsize = (int(ws[22]))/1024.0/1024.0
- rss = (int(ws[23])*4096)/1024.0/1024.0
- cpuSeconds = myTicks/ticksPerSecond
- lastCpuSeconds = cpuStats.get(vmId, cpuSeconds)
- cpuLoad = (cpuSeconds - lastCpuSeconds)/(now - last)
- cpuStats[vmId] = cpuSeconds
- child = self.controlledVMs[vmId]
- (recvMBs, sendMBs, recvBytes, sendBytes) = (0.0, 0.0, 0.0, 0.0)
- for i in range(0, len(child.instance.nics)):
- netDev = "%s%d.%d" % (self.ifPrefix, child.instance.id, i)
- (tmpRecvMBs, tmpSendMBs, tmpRecvBytes, tmpSendBytes) = netStats.get(netDev, (0.0, 0.0, 0.0, 0.0))
- (recvMBs, sendMBs, recvBytes, sendBytes) = (recvMBs + tmpRecvMBs, sendMBs + tmpSendMBs, recvBytes + tmpRecvBytes, sendBytes + tmpSendBytes)
- self.stats[vmId] = self.stats.get(vmId, {})
- child = self.controlledVMs.get(vmId, None)
- if (child):
- res = self.__enterCommand(child, "info blockstats")
- for l in res.split("\n"):
- (device, sep, data) = stringPartition(l, ": ")
- if (data != ""):
- for field in data.split(" "):
- (label, sep, val) = stringPartition(field, "=")
- if (val != ""):
- self.stats[vmId]['%s_%s_per_s' % (device, label)] = (float(val) - float(self.stats[vmId].get('%s_%s' % (device, label), 0)))/self.statsInterval
- self.stats[vmId]['%s_%s' % (device, label)] = int(val)
- self.stats[vmId]['cpuLoad'] = cpuLoad
- self.stats[vmId]['rss'] = rss
- self.stats[vmId]['vsize'] = vsize
- self.stats[vmId]['recvMBs'] = sendMBs
- self.stats[vmId]['sendMBs'] = recvMBs
+ self.__processVmStats(vmId)
+
except:
log.exception("statsThread threw an exception")
last = now
diff --git a/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py b/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py
index cd4fde8..19447f4 100644
--- a/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py
+++ b/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py
@@ -28,8 +28,8 @@
self.dfs = dfs
self.nm = nm
- def getInstances(self):
- """Will return a dict of instances by vmId to the caller"""
+ def getInstances(self):
+ """Will return a dict of instances by vmId to the caller"""
raise NotImplementedError
def instantiateVm(self, instance):
diff --git a/src/tashi/nodemanager/vmcontrol/xenpv.py b/src/tashi/nodemanager/vmcontrol/xenpv.py
index 8bf4a29..9401df3 100644
--- a/src/tashi/nodemanager/vmcontrol/xenpv.py
+++ b/src/tashi/nodemanager/vmcontrol/xenpv.py
@@ -28,7 +28,7 @@
from tashi.rpycservices.rpyctypes import Errors, InstanceState, TashiException
from tashi.rpycservices.rpyctypes import Instance, Host
from tashi import boolean, convertExceptions, ConnectionManager, version
-from tashi.util import isolatedRPC, broken
+from tashi.util import broken
import tashi.parallel
from tashi.parallel import synchronized, synchronizedmethod
@@ -168,7 +168,7 @@
vmType = hints.get('vmtype', self.defaultVmType)
print 'starting vm with type: ', vmType
- disk0 = 'tap:%s' % self.disktype
+ disk0 = 'tap:%s' % self.disktype
diskU = 'xvda1'
try:
@@ -313,10 +313,10 @@
@synchronizedmethod
def instantiateVm(self, instance):
- try:
- disktype = self.config.get('XenPV', 'defaultDiskType')
- except:
- disktype = 'vhd'
+ try:
+ disktype = self.config.get('XenPV', 'defaultDiskType')
+ except:
+ disktype = 'vhd'
# FIXME: this is NOT the right way to get out hostId
self.hostId = instance.hostId
@@ -346,6 +346,8 @@
instance.disks[i].local = newdisk
+ # XXXstroucki if ever supporting multiple nics,
+ # ensure more than one isn't put on the same network.
fn = self.createXenConfig(name,
instance.disks[0].local,
instance.nics[0].mac,
diff --git a/src/tashi/thrift/build.py b/src/tashi/thrift/build.py
deleted file mode 100755
index 42b22fa..0000000
--- a/src/tashi/thrift/build.py
+++ /dev/null
@@ -1,56 +0,0 @@
-#!/usr/bin/python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import shutil
-import os
-from os import path
-import re
-
-if __name__ == '__main__':
- if (path.exists('gen-py')):
- print 'Removing \'gen-py\' directory...'
- shutil.rmtree('gen-py')
-
- if (path.exists('../services')):
- print 'Removing \'../services\' directory...'
- shutil.rmtree('../services')
-
- if (path.exists('../messaging/messagingthrift')):
- print 'Removing \'../messaging/messagingthrift\' directory...'
- shutil.rmtree('../messaging/messagingthrift')
-
- print 'Generating Python code for \'services.thrift\'...'
- os.system('thrift --gen py:new_style services.thrift')
-
- print 'Copying generated code to \'tashi.services\' package...'
- shutil.copytree('gen-py/services', '../services')
-
- print 'Generatign Python code for \'messagingthrift\'...'
- os.system('rm -rf gen-py')
- os.system('thrift --gen py messagingthrift.thrift')
-
- print 'Copying generated code to \'tashi.messaging.messagingthrift\' package...'
- shutil.copytree(os.path.join('gen-py', 'messagingthrift'),
- os.path.join('..', 'messaging', 'messagingthrift'))
-
- print 'Generating Python code for \'layoutlocality.thrift\'...'
- os.system('thrift --gen py:new_style layoutlocality.thrift')
-
- print 'Copying generated code to \'tashi.services\' package...'
- shutil.copytree('gen-py/layoutlocality', '../services/layoutlocality')
diff --git a/src/tashi/thrift/layoutlocality.thrift b/src/tashi/thrift/layoutlocality.thrift
deleted file mode 100644
index e14910c..0000000
--- a/src/tashi/thrift/layoutlocality.thrift
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-struct BlockLocation {
- list<string> hosts, // hostnames of data nodes
- list<i32> ports, // ports for data nodes
- list<string> names, // hostname:port of data nodes
- i64 blocknum,
- i64 offset,
- i64 length
-}
-
-struct Pathname {
- string pathname
-}
-
-exception FileNotFoundException {
- string message
-}
-
-service layoutservice {
- list <BlockLocation> getFileBlockLocations(1:Pathname path, 2:i64 offset, 3:i64 length)
- throws (1:FileNotFoundException ouch),
-}
-
-service localityservice {
- list <list<double>> getHopCountMatrix(1:list<string> sourceHosts, 2:list<string> destHosts),
-}
diff --git a/src/tashi/thrift/messagingthrift.thrift b/src/tashi/thrift/messagingthrift.thrift
deleted file mode 100644
index 401e9a1..0000000
--- a/src/tashi/thrift/messagingthrift.thrift
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-typedef map<string, string> strstrmap
-
-service SubscriberThrift{
- # the async keyword seems to slow things down in the simple
- # tests. However, with non-trivial subscribers it will be
- # necessary to use async here.
- async void publish(strstrmap message)
- async void publishList(list<strstrmap> messages)
-}
-
-service MessageBrokerThrift{
- void log(strstrmap message),
- void addSubscriber(string host, i16 port)
- void removeSubscriber(string host, i16 port)
- async void publish(strstrmap message)
- async void publishList(list<strstrmap> messages)
-
-}
-
diff --git a/src/tashi/thrift/services.thrift b/src/tashi/thrift/services.thrift
deleted file mode 100644
index fa29c30..0000000
--- a/src/tashi/thrift/services.thrift
+++ /dev/null
@@ -1,166 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-enum Errors {
- ConvertedException = 1,
- NoSuchInstanceId = 2,
- NoSuchVmId = 3,
- IncorrectVmState = 4,
- NoSuchHost = 5,
- NoSuchHostId = 6,
- InstanceIdAlreadyExists = 7,
- HostNameMismatch = 8,
- HostNotUp = 9,
- HostStateError = 10,
- InvalidInstance = 11,
- UnableToResume = 12,
- UnableToSuspend = 13,
-}
-
-enum InstanceState {
- Pending = 1, // Job submitted
- Activating = 2, // activateVm has been called, but instantiateVm hasn't finished yet
- Running = 3, // Normal state
- Pausing = 4, // Beginning pause sequence
- Paused = 5 // Paused
- Unpausing = 6, // Beginning unpause sequence
- Suspending = 7, // Beginning suspend sequence
- Resuming = 8, // Beginning resume sequence
- MigratePrep = 9, // Migrate state #1
- MigrateTrans = 10, // Migrate state #2
- ShuttingDown = 11, // Beginning exit sequence
- Destroying = 12, // Beginning exit sequence
- Orphaned = 13, // Host is missing
- Held = 14, // Activation failed
- Exited = 15, // VM has exited
- Suspended = 16, // VM is suspended
-}
-
-enum HostState {
- Normal = 1,
- Drained = 2,
- VersionMismatch = 3
-}
-
-exception TashiException {
- 1: Errors errno
- 2: string msg
-}
-
-struct Host {
- 1:i32 id,
- 2:string name,
- 3:bool up,
- 4:bool decayed,
- 5:HostState state,
- 6:i32 memory,
- 7:i32 cores,
- 8:string version
- // Other properties (disk?)
-}
-
-struct Network {
- 1:i32 id
- 2:string name
-}
-
-struct User {
- 1:i32 id,
- 2:string name
-}
-
-struct DiskConfiguration {
- 1:string uri,
- 2:bool persistent
-}
-
-struct NetworkConfiguration {
- 1:i32 network,
- 2:string mac,
- 3:string ip
-}
-
-struct Instance {
- 1:i32 id,
- 2:i32 vmId,
- 3:i32 hostId,
- 4:bool decayed,
- 5:InstanceState state,
- 6:i32 userId,
- 7:string name, // User specified
- 8:i32 cores, // User specified
- 9:i32 memory, // User specified
- 10:list<DiskConfiguration> disks, // User specified
- 11:list<NetworkConfiguration> nics // User specified
- 12:map<string, string> hints // User specified
-}
-
-service clustermanagerservice {
- // Client-facing RPCs
- Instance createVm(1:Instance instance) throws (1:TashiException e)
-
- void shutdownVm(1:i32 instanceId) throws (1:TashiException e)
- void destroyVm(1:i32 instanceId) throws (1:TashiException e)
-
- void suspendVm(1:i32 instanceId) throws (1:TashiException e)
- Instance resumeVm(1:i32 instanceId) throws (1:TashiException e)
-
- void migrateVm(1:i32 instanceId, 2:i32 targetHostId) throws (1:TashiException e)
-
- void pauseVm(1:i32 instanceId) throws (1:TashiException e)
- void unpauseVm(1:i32 instanceId) throws (1:TashiException e)
-
- list<Host> getHosts() throws (1:TashiException e)
- list<Network> getNetworks() throws (1:TashiException e)
- list<User> getUsers() throws (1:TashiException e)
-
- list<Instance> getInstances() throws (1:TashiException e)
-
- string vmmSpecificCall(1:i32 instanceId, 2:string arg) throws (1:TashiException e)
-
- // NodeManager-facing RPCs
- i32 registerNodeManager(1:Host host, 2:list<Instance> instances) throws (1:TashiException e)
- void vmUpdate(1:i32 instanceId, 2:Instance instance, 3:InstanceState old) throws (1:TashiException e)
-
- // Agent-facing RPCs
- void activateVm(1:i32 instanceId, 2:Host host) throws (1:TashiException e)
-}
-
-service nodemanagerservice {
- // ClusterManager-facing RPCs
- i32 instantiateVm(1:Instance instance) throws (1:TashiException e)
-
- void shutdownVm(1:i32 vmId) throws (1:TashiException e)
- void destroyVm(1:i32 vmId) throws (1:TashiException e)
-
- void suspendVm(1:i32 vmId, 2:string destination) throws (1:TashiException e)
- i32 resumeVm(1:Instance instance, 2:string source) throws (1:TashiException e)
-
- string prepReceiveVm(1:Instance instance, 2:Host source) throws (1:TashiException e)
- void migrateVm(1:i32 vmId, 2:Host target, 3:string transportCookie) throws (1:TashiException e)
- void receiveVm(1:Instance instance, 2:string transportCookie) throws (1:TashiException e)
-
- void pauseVm(1:i32 vmId) throws (1:TashiException e)
- void unpauseVm(1:i32 vmId) throws (1:TashiException e)
-
- Instance getVmInfo(1:i32 vmId) throws (1:TashiException e)
- list<i32> listVms() throws (1:TashiException e)
-
- string vmmSpecificCall(1:i32 vmId, 2:string arg) throws (1:TashiException e)
-
- // Host getHostInfo() throws (1:TashiException e)
-}
diff --git a/src/tashi/util.py b/src/tashi/util.py
index 4eb0981..61cca4b 100644
--- a/src/tashi/util.py
+++ b/src/tashi/util.py
@@ -27,6 +27,7 @@
import traceback
import types
import getpass
+import functools
from tashi.rpycservices import rpycservices
from tashi.rpycservices.rpyctypes import TashiException, Errors, InstanceState, HostState
@@ -148,14 +149,6 @@
def __delattr__(self, name):
return delattr(self.__dict__['__real_obj__'], name)
-def isolatedRPC(client, method, *args, **kw):
- """Opens and closes a thrift transport for a single RPC call"""
- if (not client._iprot.trans.isOpen()):
- client._iprot.trans.open()
- res = getattr(client, method)(*args, **kw)
- client._iprot.trans.close()
- return res
-
def signalHandler(signalNumber):
"""Used to denote a particular function as the signal handler for a
specific signal"""
@@ -192,7 +185,7 @@
def convertExceptions(oldFunc):
"""This converts any exception type into a TashiException so that
- it can be passed over a Thrift RPC"""
+ it can be passed over an RPC"""
def newFunc(*args, **kw):
try:
return oldFunc(*args, **kw)
@@ -218,20 +211,33 @@
raise Exception("No config file could be found: %s" % (str(allLocations)))
return (config, configFiles)
+def __getShellFn():
+ try:
+ from IPython.Shell import IPShellEmbed
+ return (1, IPShellEmbed)
+ except ImportError:
+ import IPython
+ return (2, IPython.embed)
+
def debugConsole(globalDict):
"""A debugging console that optionally uses pysh"""
def realDebugConsole(globalDict):
+ import os
try :
import atexit
- from IPython.Shell import IPShellEmbed
+ (calltype, shellfn) = __getShellFn()
def resetConsole():
# XXXpipe: make input window sane
(stdin, stdout) = os.popen2("reset")
stdout.read()
- dbgshell = IPShellEmbed()
atexit.register(resetConsole)
- dbgshell(local_ns=globalDict, global_ns=globalDict)
- except Exception:
+ if calltype == 1:
+ dbgshell=shellfn(user_ns=globalDict)
+ dbgshell()
+ elif calltype == 2:
+ dbgshell=shellfn
+ dbgshell(user_ns=globalDict)
+ except Exception, e:
CONSOLE_TEXT=">>> "
input = " "
while (input != ""):
@@ -241,6 +247,9 @@
exec(input) in globalDict
except Exception, e:
sys.stdout.write(str(e) + "\n")
+
+ os._exit(0)
+
if (os.getenv("DEBUG", "0") == "1"):
threading.Thread(target=lambda: realDebugConsole(globalDict)).start()
@@ -260,6 +269,68 @@
ns = ns + c
return ns
+class Connection:
+ def __init__(self, host, port, authAndEncrypt=False, credentials=None):
+ self.host = host
+ self.port = port
+ self.credentials = credentials
+ self.authAndEncrypt = authAndEncrypt
+ self.connection = None
+ # XXXstroucki some thing may still depend on this (client)
+ self.username = None
+ if credentials is not None:
+ self.username = credentials[0]
+
+ def __connect(self):
+ # create new connection
+
+ username = None
+ password = None
+
+ if self.credentials is not None:
+ username = self.credentials[0]
+ password = self.credentials[1]
+
+ if self.authAndEncrypt:
+ if username is None:
+ username = raw_input("Enter Username:")
+
+ if password is None:
+ password = raw_input("Enter Password:")
+
+ if self.credentials != (username, password):
+ self.credentials = (username, password)
+
+ client = rpycservices.client(self.host, self.port, username=username, password=password)
+ else:
+ client = rpycservices.client(self.host, self.port)
+
+ self.connection = client
+
+
+ def __do(self, name, *args, **kwargs):
+ if self.connection is None:
+ self.__connect()
+
+ remotefn = getattr(self.connection, name, None)
+
+ try:
+ if callable(remotefn):
+ returns = remotefn(*args, **kwargs)
+
+ else:
+ raise TashiException({'msg':'%s not callable' % name})
+
+ except:
+ self.connection = None
+ raise
+
+ return returns
+
+ def __getattr__(self, name):
+ return functools.partial(self.__do, name)
+
+
def createClient(config):
cfgHost = config.get('Client', 'clusterManagerHost')
cfgPort = config.get('Client', 'clusterManagerPort')
@@ -273,14 +344,12 @@
authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
if authAndEncrypt:
username = config.get('AccessClusterManager', 'username')
- if username == '':
- username = raw_input('Enter Username:')
password = config.get('AccessClusterManager', 'password')
- if password == '':
- password = getpass.getpass('Enter Password:')
- client = rpycservices.client(host, port, username=username, password=password)
+ client = Connection(host, port, authAndEncrypt, (username, password))
+
else:
- client = rpycservices.client(host, port)
+ client = Connection(host, port)
+
return client
def enumToStringDict(cls):
diff --git a/src/tashi/version.py b/src/tashi/version.py
index 1fd7997..2380bed 100644
--- a/src/tashi/version.py
+++ b/src/tashi/version.py
@@ -15,4 +15,4 @@
# specific language governing permissions and limitations
# under the License.
-version = "201202"
+version = "201203"
diff --git a/src/utils/Makefile b/src/utils/Makefile
deleted file mode 100644
index aea56ee..0000000
--- a/src/utils/Makefile
+++ /dev/null
@@ -1,24 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-all: nmd
-
-clean:
- rm -f ./nmd
-
-nmd: nmd.c
- ${CC} $< -o $@
diff --git a/src/utils/getLocality.py b/src/utils/getLocality.py
deleted file mode 100755
index 49ecb11..0000000
--- a/src/utils/getLocality.py
+++ /dev/null
@@ -1,68 +0,0 @@
-#!/usr/bin/python
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import sys
-import os
-from os import system
-
-import tashi.services.layoutlocality.localityservice as localityservice
-
-from thrift import Thrift
-from thrift.transport import TSocket
-from thrift.transport import TTransport
-from thrift.protocol import TBinaryProtocol
-
-from tashi.util import getConfig
-
-(config, configFiles) = getConfig(["Client"])
-host = config.get('LocalityService', 'host')
-port = int(config.get('LocalityService', 'port'))
-
-socket = TSocket.TSocket(host, port)
-transport = TTransport.TBufferedTransport(socket)
-protocol = TBinaryProtocol.TBinaryProtocol(transport)
-client = localityservice.Client(protocol)
-transport.open()
-
-while True:
- line1 = "\n"
- line2 = "\n"
- while line1 != "":
- line1 = sys.stdin.readline()
- if line1 == "":
- sys.exit(0)
- if line1 != "\n":
- break
- line1 = line1.strip()
- while line2 != "":
- line2 = sys.stdin.readline()
- if line2 == "":
- sys.exit(0)
- if line2 != "\n":
- break
- line2 = line2.strip()
-
- sources = line1.split(" ")
- destinations = line2.split(" ")
-
- mat = client.getHopCountMatrix(sources, destinations)
- for r in mat:
- for c in r:
- print '%f\t'%c,
- print '\n',
- print '\n',
diff --git a/src/utils/nmd.c b/src/utils/nmd.c
deleted file mode 100644
index effa1d2..0000000
--- a/src/utils/nmd.c
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <sys/types.h>
-#include <sys/wait.h>
-#include <dirent.h>
-#include <fcntl.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <assert.h>
-
-#define SLEEP_INTERVAL 10
-#define TASHI_PATH "/usr/local/tashi/"
-#define LOG_FILE "/var/log/nodemanager.log"
-
-/* This function changes (on Linux!) its oom scoring, to make it
- * unattractive to kill
- */
-
-void make_invincible()
-{
- int oom_adj_fd;
- int r;
-
- oom_adj_fd = open("/proc/self/oom_adj", O_WRONLY);
- assert(oom_adj_fd != -1);
- r = write(oom_adj_fd, "-17\n", 4);
- assert(r == 4);
- close(oom_adj_fd);
-
-}
-
-/* This function resets (on Linux!) its oom scoring to default
- */
-void make_vulnerable()
-{
- int oom_adj_fd;
- int r;
-
- oom_adj_fd = open("/proc/self/oom_adj", O_WRONLY);
- assert(oom_adj_fd != -1);
- r = write(oom_adj_fd, "0\n", 2);
- assert(r == 2);
- close(oom_adj_fd);
-}
-
-int main(int argc, char **argv)
-{
- char* env[2];
- int status;
- DIR* d;
- int pid;
- int lfd;
- int foreground=0;
-
-/* If first argument is "-f", run in foreground */
- if ((argc > 1) && (strncmp(argv[1], "-f", 3)==0)) {
- foreground=1;
- }
-/* If not running in foreground, fork off and exit the parent.
- * The child closes its default file descriptors.
- */
- if (!foreground) {
- pid = fork();
- if (pid != 0) {
- exit(0);
- }
- close(0);
- close(1);
- close(2);
- }
-/* Adjust OOM preference */
- make_invincible();
-/* Configure environment of children */
- env[0] = "PYTHONPATH="TASHI_PATH"/src/";
- env[1] = NULL;
- while (1) {
- pid = fork();
- if (pid == 0) {
- /* child */
- /* nodemanagers are vulnerable. Not the supervisor. */
- make_vulnerable();
- if (!foreground) {
- /* If not running fg, open log file */
- lfd = open(LOG_FILE, O_WRONLY|O_APPEND|O_CREAT);
- if (lfd < 0) {
- /* If this failed, open something? */
- lfd = open("/dev/null", O_WRONLY);
- }
- /* Make this fd stdout and stderr */
- dup2(lfd, 2);
- dup2(lfd, 1);
- /* close stdin */
- close(0);
- }
- chdir(TASHI_PATH);
- /* start node manager with python environment */
- execle("./bin/nodemanager.py", "./bin/nodemanager.py", NULL, env);
- exit(-1);
- }
- /* sleep before checking for child's status */
- sleep(SLEEP_INTERVAL);
- /* catch child exiting and go through loop again */
- waitpid(pid, &status, 0);
- } /* while (1) */
-}
diff --git a/src/utils/nmd.py b/src/utils/nmd.py
index e74a82f..118aee8 100755
--- a/src/utils/nmd.py
+++ b/src/utils/nmd.py
@@ -16,9 +16,10 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
"""
+# XXXstroucki: why not use something like supervise instead?
import os
import sys
@@ -36,81 +37,81 @@
*/
"""
def make_invincible():
- # dependent on linux
- try:
- oom_adj_fd = os.open("/proc/self/oom_adj", os.O_WRONLY)
- except IOError:
- pass
- else:
- os.write(oom_adj_fd, "-17\n")
- os.close(oom_adj_fd)
+ # dependent on linux
+ try:
+ oom_adj_fd = os.open("/proc/self/oom_adj", os.O_WRONLY)
+ except IOError:
+ pass
+ else:
+ os.write(oom_adj_fd, "-17\n")
+ os.close(oom_adj_fd)
"""
/* This function resets (on Linux!) its oom scoring to default
*/
"""
def make_vulnerable():
- # dependent on linux
- try:
- oom_adj_fd = os.open("/proc/self/oom_adj", os.O_WRONLY)
- except IOError:
- pass
- else:
- os.write(oom_adj_fd, "0\n")
- os.close(oom_adj_fd)
+ # dependent on linux
+ try:
+ oom_adj_fd = os.open("/proc/self/oom_adj", os.O_WRONLY)
+ except IOError:
+ pass
+ else:
+ os.write(oom_adj_fd, "0\n")
+ os.close(oom_adj_fd)
def main(argv=None):
- if argv is None:
- argv = sys.argv
- try:
- opts, args = getopt.getopt(argv[1:], "f", ["foreground"])
- except getopt.GetoptError, err:
- # print help information and exit:
- print str(err) # will print something like "option -a not recognized"
- # usage()
- return 2
- foreground = False
- for o, a in opts:
- if o in ("-f", "--foreground"):
- foreground = True
- else:
- assert False, "unhandled option"
- if foreground == False:
- pid = os.fork();
- if pid != 0:
- os._exit(0)
- os.close(0)
- os.close(1)
- os.close(2)
+ if argv is None:
+ argv = sys.argv
+ try:
+ opts, args = getopt.getopt(argv[1:], "f", ["foreground"])
+ except getopt.GetoptError, err:
+ # print help information and exit:
+ print str(err) # will print something like "option -a not recognized"
+ # usage()
+ return 2
+ foreground = False
+ for o, a in opts:
+ if o in ("-f", "--foreground"):
+ foreground = True
+ else:
+ assert False, "unhandled option"
+ if foreground == False:
+ pid = os.fork();
+ if pid != 0:
+ os._exit(0)
+ os.close(0)
+ os.close(1)
+ os.close(2)
- # adjust oom preference
- make_invincible()
+ # adjust oom preference
+ make_invincible()
- # configure environment of children
- env = {"PYTHONPATH":TASHI_PATH+"/src"}
- while True:
- pid = os.fork();
- if pid == 0:
- # child
- # nodemanagers are vulnerable, not the supervisor
- make_vulnerable()
- if foreground == False:
- try:
- lfd = os.open(LOG_FILE, os.O_APPEND|os.O_CREAT|os.O_WRONLY)
- except IOError:
- lfd = os.open("/dev/null", os.O_WRONLY)
- # make this fd stdout and stderr
- os.dup2(lfd, 1)
- os.dup2(lfd, 2)
- # close stdin
- os.close(0)
- os.chdir(TASHI_PATH)
- os.execle("./bin/nodemanager.py", "./bin/nodemanager.py", env)
- os._exit(-1)
- # sleep before checking child status
- time.sleep(SLEEP_INTERVAL)
- os.waitpid(pid, 0)
- return 0
+ # configure environment of children
+ env = {"PYTHONPATH":TASHI_PATH+"/src"}
+ while True:
+ pid = os.fork();
+ if pid == 0:
+ # child
+ # nodemanagers are vulnerable, not the supervisor
+ make_vulnerable()
+ if foreground == False:
+ try:
+ lfd = os.open(LOG_FILE, os.O_APPEND|os.O_CREAT|os.O_WRONLY)
+ except IOError:
+ lfd = os.open("/dev/null", os.O_WRONLY)
+ # make this fd stdout and stderr
+ os.dup2(lfd, 1)
+ os.dup2(lfd, 2)
+ # close stdin
+ os.close(0)
+ os.chdir(TASHI_PATH)
+ os.execle("./bin/nodemanager.py", "./bin/nodemanager.py", env)
+ os._exit(-1)
+ # sleep before checking child status
+ time.sleep(SLEEP_INTERVAL)
+ os.waitpid(pid, 0)
+ return 0
if __name__ == "__main__":
- sys.exit(main())
+ sys.exit(main())
diff --git a/src/zoni/client/zoni-cli.py b/src/zoni/client/zoni-cli.py
index c5918ab..f22cc4e 100755
--- a/src/zoni/client/zoni-cli.py
+++ b/src/zoni/client/zoni-cli.py
@@ -327,11 +327,11 @@
if (options.nodeName):
cmdargs["sys_id"] = options.nodeName
- if (options.numCores or options.clockSpeed or options.numMemory or options.numProcs or options.cpuFlags) and not options.showResources:
- usage = "MISSING OPTION: When specifying hardware parameters, you need the -s or --showResources switch"
- print usage
- parser.print_help()
- exit()
+ if (options.numCores or options.clockSpeed or options.numMemory or options.numProcs or options.cpuFlags) and not options.showResources:
+ usage = "MISSING OPTION: When specifying hardware parameters, you need the -s or --showResources switch"
+ print usage
+ parser.print_help()
+ exit()
if options.getResources:
print "ALL resources"
diff --git a/src/zoni/extra/util.py b/src/zoni/extra/util.py
index 38ce9df..6a12b6a 100644
--- a/src/zoni/extra/util.py
+++ b/src/zoni/extra/util.py
@@ -19,6 +19,7 @@
#
import os
+import sys
import string
import ConfigParser
import time
@@ -218,19 +219,25 @@
return val
-
+def __getShellFn():
+ if sys.version_info < (2, 6, 1):
+ from IPython.Shell import IPShellEmbed
+ return IPShellEmbed()
+ else:
+ import IPython
+ return IPython.embed()
def debugConsole(globalDict):
"""A debugging console that optionally uses pysh"""
def realDebugConsole(globalDict):
try :
import atexit
- from IPython.Shell import IPShellEmbed
+ shellfn = __getShellFn()
def resetConsole():
# XXXpipe: make input window sane
(stdin, stdout) = os.popen2("reset")
stdout.read()
- dbgshell = IPShellEmbed()
+ dbgshell = shellfn()
atexit.register(resetConsole)
dbgshell(local_ns=globalDict, global_ns=globalDict)
except Exception:
diff --git a/src/zoni/hardware/delldrac.py b/src/zoni/hardware/delldrac.py
index 7cb189f..cbdd493 100644
--- a/src/zoni/hardware/delldrac.py
+++ b/src/zoni/hardware/delldrac.py
@@ -147,7 +147,7 @@
for val in fout.readlines():
if "OK" in val:
code = 1
- if "CURRENTLY POWER-OFF" in val:
+ if "CURRENTLY POWER-OFF" in val:
self.log.info("Hardware already power off : %s", self.hostname)
code = 1
if code < 1:
@@ -171,7 +171,7 @@
for val in fout.readlines():
if "OK" in val:
code = 1
- if "CURRENTLY POWER-OFF" in val:
+ if "CURRENTLY POWER-OFF" in val:
self.log.info("Hardware already power off : %s", self.hostname)
code = 1
if code < 1:
diff --git a/src/zoni/hardware/dellswitch.py b/src/zoni/hardware/dellswitch.py
index 0ddf8aa..d8296c3 100644
--- a/src/zoni/hardware/dellswitch.py
+++ b/src/zoni/hardware/dellswitch.py
@@ -54,7 +54,7 @@
pass
- def setVerbose(self, verbose):
+ def setVerbose(self, verbose):
self.verbose = verbose
def __login(self):
diff --git a/src/zoni/hardware/hpswitch.py b/src/zoni/hardware/hpswitch.py
index ada83b9..fe1f604 100644
--- a/src/zoni/hardware/hpswitch.py
+++ b/src/zoni/hardware/hpswitch.py
@@ -74,10 +74,10 @@
child.sendline(cmd)
opt = child.expect(["Confirm(.*)", "No save(.*)", pexpect.EOF, pexpect.TIMEOUT])
if opt == 0:
- print "saving to flash"
- child.sendline("y\n")
+ print "saving to flash"
+ child.sendline("y\n")
if opt == 1:
- print "no save needed"
+ print "no save needed"
child.sendline('exit')
child.terminate()
diff --git a/src/zoni/install/db/zoniDbSetup.py b/src/zoni/install/db/zoniDbSetup.py
index 88998b4..309a3a9 100644
--- a/src/zoni/install/db/zoniDbSetup.py
+++ b/src/zoni/install/db/zoniDbSetup.py
@@ -27,8 +27,8 @@
import optparse
import getpass
except ImportError, e:
- print "Module not installed : %s" % e
- exit()
+ print "Module not installed : %s" % e
+ exit()
a = os.path.join("../")
@@ -406,7 +406,7 @@
def entryExists(conn, table, col, checkVal):
query = "select * from " + table + " where " + col + " = '" + checkVal + "'"
- r = execQuery(conn, query)
+ r = execQuery(conn, query)
res = r.fetchall()
if len(res) > 0:
return (1, res)
@@ -415,5 +415,5 @@
if __name__ == "__main__":
- main()
+ main()
diff --git a/src/zoni/install/dnsdhcp/zoniDnsDhcpSetup.py b/src/zoni/install/dnsdhcp/zoniDnsDhcpSetup.py
index a68eb83..f55fb71 100755
--- a/src/zoni/install/dnsdhcp/zoniDnsDhcpSetup.py
+++ b/src/zoni/install/dnsdhcp/zoniDnsDhcpSetup.py
@@ -79,5 +79,5 @@
if __name__ == "__main__":
- main()
+ main()