Copy stable branch down to oldstable branch in preparation for new stable branch.


git-svn-id: https://svn.apache.org/repos/asf/incubator/tashi/branches/oldstable@1362316 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/INSTALL b/INSTALL
index 6240c35..17f4fc1 100644
--- a/INSTALL
+++ b/INSTALL
@@ -124,6 +124,7 @@
 given by the hostname command. If you plan on eventually having several 
 hosts and networks, feel free to add them now.
 
+root@grml:/usr/local/tashi# cd bin
 root@grml:/usr/local/tashi/bin# DEBUG=1 ./clustermanager
 2012-01-26 23:12:33,972 [./clustermanager:INFO] Using configuration file(s) ['/usr/local/tashi/etc/TashiDefaults.cfg']
 2012-01-26 23:12:33,972 [./clustermanager:INFO] Starting cluster manager
@@ -158,9 +159,8 @@
 
 In [4]: data.baseDataObject.save()
 
-In [5]: import os
-
-In [6]: os.kill(os.getpid(), 9)
+In [5]: (^C)
+2012-03-07 20:00:00,456 [./bin/clustermanager:INFO] Exiting cluster manager after receiving a SIGINT signal
 
 Run the cluster manager in the background:
 root@grml:/usr/local/tashi/bin# ./clustermanager &
diff --git a/Makefile b/Makefile
index 947b14c..618050d 100644
--- a/Makefile
+++ b/Makefile
@@ -128,11 +128,11 @@
 	if test -e /usr/local/bin/zoni; then echo Removing zoni...; rm /usr/local/bin/zoni; fi
 
 ## for now only print warnings having to do with bad indentation. pylint doesn't make it easy to enable only 1,2 checks
-disabled_warnings=$(shell pylint --list-msgs|grep :W0| awk -F: '{ORS=","; if ($$2 != "W0311" && $$2 != "W0312"){ print $$2}}')
+disabled_warnings=$(shell pylint --list-msgs|grep :W0| awk -F: '{ORS=","; if ($$2 != "W0311" && $$2 != "W0312"){ print $$2}}')",F0401"
 pysrc=$(shell find . \! -path '*gen-py*' \! -path '*services*' \! -path '*messagingthrift*' \! -name '__init__.py' -name "*.py")
 tidy: $(addprefix tidyfile/,$(pysrc))
-	@echo Insuring .py files are nice and tidy!
+	@echo Ensured .py files are nice and tidy!
 
 tidyfile/%: %
 	@echo Checking tidy for $*
-	pylint --report=no --disable-msg-cat=R,C,E --disable-msg=$(disabled_warnings) --indent-string="\t" $* 2> /dev/null; 
+	pylint --report=no --disable=R,C,E --disable=$(disabled_warnings) --indent-string="\t" $* 2> /dev/null; 
diff --git a/etc/NodeManager.cfg b/etc/NodeManager.cfg
index a47bccf..48f4044 100644
--- a/etc/NodeManager.cfg
+++ b/etc/NodeManager.cfg
@@ -80,7 +80,6 @@
 statsInterval = 0.0
 ;accountingHost = clustermanager
 ;accountingPort = 2228
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
 
 [Security]
 authAndEncrypt = False
diff --git a/etc/TashiDefaults.cfg b/etc/TashiDefaults.cfg
index 5e1a4d2..b7d32f3 100644
--- a/etc/TashiDefaults.cfg
+++ b/etc/TashiDefaults.cfg
@@ -57,7 +57,6 @@
 allowDuplicateNames = False
 ;accountingHost = clustermanager
 ;accountingPort = 2228
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
 
 [GetentOverride]
 baseData = tashi.clustermanager.data.Pickled
@@ -110,7 +109,6 @@
 clusterManagerHost = localhost 
 clusterManagerPort = 9882
 statsInterval = 0.0
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
 
 [Qemu]
 qemuBin = /usr/bin/kvm
diff --git a/src/tashi/accounting/accounting.py b/src/tashi/accounting/accounting.py
index 93d2999..6856e49 100755
--- a/src/tashi/accounting/accounting.py
+++ b/src/tashi/accounting/accounting.py
@@ -17,6 +17,8 @@
 # specific language governing permissions and limitations
 # under the License.    
 
+import os
+import time
 import sys
 import signal
 import logging.config
@@ -26,13 +28,13 @@
 #from rpyc.utils.authenticators import TlsliteVdbAuthenticator
 
 #from tashi.rpycservices.rpyctypes import *
-from tashi.util import getConfig, createClient, instantiateImplementation, boolean, debugConsole, signalHandler
+from tashi.util import getConfig, createClient, instantiateImplementation, boolean, debugConsole
 import tashi
 
 class Accounting(object):
-	def __init__(self, config, cmclient):
+	def __init__(self, config):
 		self.config = config
-		self.cm = cmclient
+		self.cm = createClient(config)
 		self.hooks = []
 		self.log = logging.getLogger(__file__)
 
@@ -62,25 +64,43 @@
 
 		debugConsole(globals())
 
-		try:
-			t.start()
-		except KeyboardInterrupt:
-			self.handleSIGTERM(signal.SIGTERM, None)
-
-	@signalHandler(signal.SIGTERM)
-	def handleSIGTERM(self, signalNumber, stackFrame):
-		self.log.info('Exiting cluster manager after receiving a SIGINT signal')
+		t.start()
+		# shouldn't exit by itself
 		sys.exit(0)
 
 def main():
 	(config, configFiles) = getConfig(["Accounting"])
 	publisher = instantiateImplementation(config.get("Accounting", "publisher"), config)
 	tashi.publisher = publisher
-	cmclient = createClient(config)
 	logging.config.fileConfig(configFiles)
-	accounting = Accounting(config, cmclient)
+	log = logging.getLogger(__name__)
+	log.info('Using configuration file(s) %s' % configFiles)
 
-	accounting.initAccountingServer()
+	accounting = Accounting(config)
+
+	# handle keyboard interrupts (http://code.activestate.com/recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/)
+	child = os.fork()
+
+	if child == 0:
+		accounting.initAccountingServer()
+		# shouldn't exit by itself
+		sys.exit(0)
+
+	else:
+		# main
+		try:
+			os.waitpid(child, 0)
+		except KeyboardInterrupt:
+			log.info("Exiting accounting service after receiving a SIGINT signal")
+			os._exit(0)
+		except Exception:
+			log.exception("Abnormal termination of accounting service")
+			os._exit(-1)
+
+		log.info("Exiting accounting service after service thread exited")
+		os._exit(-1)
+
+	return
 
 if __name__ == "__main__":
 	main()
diff --git a/src/tashi/accounting/accountingservice.py b/src/tashi/accounting/accountingservice.py
index b1c035a..56c1c90 100644
--- a/src/tashi/accounting/accountingservice.py
+++ b/src/tashi/accounting/accountingservice.py
@@ -5,15 +5,15 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
-# under the License.    
+# under the License.
 
 import logging
 import threading
@@ -22,43 +22,43 @@
 from tashi import createClient
 
 class AccountingService(object):
-        """RPC service for the Accounting service"""
-        
-        def __init__(self, config):
-            self.log = logging.getLogger(__name__)
-            self.log.setLevel(logging.INFO)
+	"""RPC service for the Accounting service"""
 
-	    self.config = config
+	def __init__(self, config):
+		self.log = logging.getLogger(__name__)
+		self.log.setLevel(logging.INFO)
 
-	    self.pollSleep = None
+		self.config = config
 
-	    # XXXstroucki new python has fallback values
-	    try:
-		    self.pollSleep = self.config.getint("AccountingService", "pollSleep")
-	    except:
-		    pass
+		self.pollSleep = None
 
-	    if self.pollSleep is None:
-		    self.pollSleep = 600
+		# XXXstroucki new python has fallback values
+		try:
+			self.pollSleep = self.config.getint("AccountingService", "pollSleep")
+		except:
+			pass
 
-            self.cm = createClient(config)
-            threading.Thread(target=self.__start).start()
+		if self.pollSleep is None:
+			self.pollSleep = 600
+
+		self.cm = createClient(config)
+		threading.Thread(target=self.__start).start()
 
 	# remote
-        def record(self, strings):
-            for string in strings:
-                self.log.info("Remote: %s" % (string))
+	def record(self, strings):
+		for string in strings:
+			self.log.info("Remote: %s" % (string))
 
-        def __start(self):
-            while True:
-                try:
-                    instances = self.cm.getInstances()
-                    for instance in instances:
-                        # XXXstroucki this currently duplicates what the CM was doing.
-                        self.log.info('Accounting: id %d host %d vmId %d user %d cores %d memory %d' % (instance.id, instance.hostId, instance.vmId, instance.userId, instance.cores, instance.memory))
-                except:
-                    self.log.warning("Accounting iteration failed")
+	def __start(self):
+		while True:
+			try:
+				instances = self.cm.getInstances()
+				for instance in instances:
+					# XXXstroucki this currently duplicates what the CM was doing.
+					self.log.info('Accounting: id %s host %s vmId %s user %s cores %s memory %s' % (instance.id, instance.hostId, instance.vmId, instance.userId, instance.cores, instance.memory))
+			except:
+				self.log.warning("Accounting iteration failed")
 
-                        
-                # wait to do the next iteration
-                time.sleep(self.pollSleep)
+
+			# wait to do the next iteration
+			time.sleep(self.pollSleep)
diff --git a/src/tashi/agents/primitive.py b/src/tashi/agents/primitive.py
index 99ef702..3a1b708 100755
--- a/src/tashi/agents/primitive.py
+++ b/src/tashi/agents/primitive.py
@@ -19,6 +19,7 @@
 
 import time
 import logging.config
+import sys
 
 from tashi.rpycservices.rpyctypes import Errors, HostState, InstanceState, TashiException
 
@@ -26,9 +27,9 @@
 import tashi
 
 class Primitive(object):
-	def __init__(self, config, cmclient):
+	def __init__(self, config):
 		self.config = config
-		self.cm = cmclient
+		self.cm = createClient(config)
 		self.hooks = []
 		self.log = logging.getLogger(__file__)
 		self.scheduleDelay = float(self.config.get("Primitive", "scheduleDelay"))
@@ -40,10 +41,10 @@
 			name = name.lower()
 			if (name.startswith("hook")):
 				try:
-					self.hooks.append(instantiateImplementation(value, config, cmclient, False))
+					self.hooks.append(instantiateImplementation(value, config, self.cm, False))
 				except:
 					self.log.exception("Failed to load hook %s" % (value))
-	        self.hosts = {}
+		self.hosts = {}
 		self.load = {}
 		self.instances = {}
 		self.muffle = {}
@@ -62,9 +63,9 @@
 		for h in self.cm.getHosts():
 			#XXXstroucki get all hosts here?
 			#if (h.up == True and h.state == HostState.Normal):
-				hosts[ctr] = h
-				ctr = ctr + 1
-				load[h.id] = []
+			hosts[ctr] = h
+			ctr = ctr + 1
+			load[h.id] = []
 			
 		load[None] = []
 		_instances = self.cm.getInstances()
@@ -199,7 +200,7 @@
 							if myDisk == i.disks[0].uri and i.disks[0].persistent == True:
 								count += 1
 						if count > 1:
-								minMaxHost = None
+							minMaxHost = None
 
 			if (minMaxHost):
 				# found a host
@@ -250,7 +251,7 @@
 				for i in oldInstances:
 					# XXXstroucki what about paused and saved VMs?
 					# XXXstroucki: do we need to look at Held VMs here?
-					if (i not in self.instances and (oldInstances[i].state == InstanceState.Running or oldInstances[i].state == InstanceState.Destroying)):
+					if (i not in self.instances and (oldInstances[i].state == InstanceState.Running or oldInstances[i].state == InstanceState.Destroying or oldInstances[i].state == InstanceState.ShuttingDown)):
 						self.log.info("VM exited: %s" % (oldInstances[i].name))
 						for hook in self.hooks:
 							hook.postDestroy(oldInstances[i])
@@ -283,10 +284,17 @@
 	(config, configFiles) = getConfig(["Agent"])
 	publisher = instantiateImplementation(config.get("Agent", "publisher"), config)
 	tashi.publisher = publisher
-	cmclient = createClient(config)
 	logging.config.fileConfig(configFiles)
-	agent = Primitive(config, cmclient)
-	agent.start()
+	agent = Primitive(config)
+
+	try:
+		agent.start()
+	except KeyboardInterrupt:
+		pass
+
+	log = logging.getLogger(__file__)
+	log.info("Primitive exiting")
+	sys.exit(0)
 
 if __name__ == "__main__":
 	main()
diff --git a/src/tashi/client/client.py b/src/tashi/client/client.py
deleted file mode 100755
index 71b5b20..0000000
--- a/src/tashi/client/client.py
+++ /dev/null
@@ -1,213 +0,0 @@
-#! /usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-import inspect
-import os
-import sys
-import types
-from tashi.services.ttypes import *
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
-from thrift.transport.TSocket import TSocket
-
-from tashi.services import clustermanagerservice
-from tashi import vmStates
-
-from tashi.util import getConfig
-
-def makeHTMLTable(list):
-	(stdin_r, stdin_w) = os.pipe()
-
-# XXXpipe: find number of columns in current window
-	pipe = os.popen("tput cols")
-	columns = pipe.read().strip()
-	keys = {}
-	for k in list:
-		for k2 in k.__dict__.keys():
-			if (not k2.endswith("Obj")):
-				keys[k2] = k2
-	if ('id' in keys):
-		del keys['id']
-		keylist = ['id'] + keys.keys()
-	else:
-		keylist = keys.keys()
-	output = "<html>"
-	output = output + "<table>"
-	output = output + "<tr>"
-	for k in keylist:
-		output = output + "<td>%s</td>" % (k)
-	output = output + "</tr>"
-	for k in list:
-		output = output + "<tr>"
-		for k2 in keylist:
-			if (k2 == "state"):
-				output = output + "<td>%s</td>" % (str(vmStates[k.__dict__.get(k2, None)]))
-			else:
-				output = output + "<td>%s</td>" % (str(k.__dict__.get(k2, None)))
-		output = output + "</tr>"
-	output = output + "</table>"
-	output = output + "</html>"
-	pid = os.fork()
-	if (pid == 0):
-		os.close(stdin_w)
-		os.dup2(stdin_r, 0)
-		os.close(stdin_r)
-		os.execl("/usr/bin/lynx", "/usr/bin/lynx", "-width=%s" % (columns), "-dump", "-stdin")
-		sys.exit(-1)
-	os.close(stdin_r)
-	os.write(stdin_w, output)
-	os.close(stdin_w)
-	os.waitpid(pid, 0)
-
-def getFunction(argv):
-	"""Tries to determine the name of the function requested by the user -- may be called multiple times if the binary name is 'client'"""
-	function = "None"
-	if (len(argv) > 0):
-		function = argv[0].strip()
-		if (function.rfind("/") != -1):
-			function = function[function.rfind("/")+1:]
-		if (function.rfind(".") != -1):
-			function = function[:function.rfind(".")]
-	return function
-
-def getFunctionInfo(m):
-	"""Gets a string that describes a function from the interface"""
-	f = getattr(clustermanagerservice.Iface, m)
-	argspec = inspect.getargspec(f)[0][1:]
-	return m + inspect.formatargspec(argspec)
-
-def usage():
-	"""Print program usage"""
-	print "Available methods:"
-	for m in methods:
-		print "\t" + getFunctionInfo(m)
-	print
-	print "Examples:"
-	print "\tgetInstances"
-	print "\taddUser 'User(d={\"username\":\"foobar\"})'"
-	print "\tremoveUser 2"
-	print "\tcreateVM 1 1"
-
-def simpleType(obj):
-	"""Determines whether an object is a simple type -- used as a helper function to pprint"""
-	if (type(obj) is not type([])):
-		if (not getattr(obj, "__dict__", None)):
-			return True
-	return False
-
-def pprint(obj, depth = 0, key = None):
-	"""My own version of pprint that prints out a dict in a readable, but slightly more compact format"""
-	valueManip = lambda x: x
-	if (key):
-		keyString = key + ": "
-		if (key == "state"):
-			valueManip = lambda x: vmStates[x]
-	else:
-		keyString = ""
-	if (type(obj) is type([])):
-		if (reduce(lambda x, y: x and simpleType(y), obj, True)):
-			print (" " * (depth * INDENT)) + keyString + str(obj)
-		else:
-			print (" " * (depth * INDENT)) + keyString + "["
-			for o in obj:
-				pprint(o, depth + 1)
-			print (" " * (depth * INDENT)) + "]"
-	elif (getattr(obj, "__dict__", None)):
-		if (reduce(lambda x, y: x and simpleType(y), obj.__dict__.itervalues(), True)):
-			print (" " * (depth * INDENT)) + keyString + str(obj)
-		else:
-			print (" " * (depth * INDENT)) + keyString + "{"
-			for (k, v) in obj.__dict__.iteritems():
-				pprint(v, depth + 1, k)
-			print (" " * (depth * INDENT)) + "}"
-	else:
-		print (" " * (depth * INDENT)) + keyString + str(valueManip(obj))
-
-def main():
-	"""Main function for the client program"""
-	global INDENT, methods, exitCode
-	exitCode = 0
-	INDENT = (os.getenv("INDENT", 4))
-	methods = filter(lambda x: not x.startswith("__"), clustermanagerservice.Iface.__dict__.keys())
-	function = getFunction(sys.argv)
-	if (function == "client"):
-		function = getFunction(sys.argv[1:])
-	if (function == "--makesyms"):
-		for m in methods:
-			os.symlink(sys.argv[0], m)
-		sys.exit(0)
-	if (function == "--rmsyms"):
-		for m in methods:
-			os.unlink(m)
-		sys.exit(0)
-
-	(config,configFiles) = getConfig(["Client"])
-	cfgHost = config.get('Client', 'clusterManagerHost')
-	cfgPort = config.get('Client', 'clusterManagerPort')
-	cfgTimeout = float(config.get('Client', 'clusterManagerTimeout'))
-	host = os.getenv('TASHI_CM_HOST', cfgHost)
-	port = os.getenv('TASHI_CM_PORT', cfgPort)
-	timeout = float(os.getenv('TASHI_CM_TIMEOUT', cfgTimeout)) * 1000.0
-
-	socket = TSocket(host, int(port))
-	socket.setTimeout(timeout)
-	transport = TBufferedTransport(socket)
-	protocol = TBinaryProtocol(transport)
-	client = clustermanagerservice.Client(protocol)
-	client._transport = transport
-	client._transport.open()
-	f = getattr(client, function, None)
-	if not f:
-		usage()
-		sys.exit(-1)
-	args = map(lambda x: eval(x), sys.argv[1:])
-	try:
-		res = f(*args)
-		def cmp(x, y):
-			try:
-				if (x.id < y.id):
-					return -1
-				elif (y.id < x.id):
-					return 1
-				else:
-					return 0
-			except Exception, e:
-				return 0
-		if (type(res) == types.ListType):
-			res.sort(cmp)
-		if (os.getenv("USE_HTML_TABLES")):
-			try:
-				makeHTMLTable(res)
-			except:
-				pprint(res)
-		else:
-			pprint(res)
-	except TashiException, e:
-		print e.msg
-		exitCode = e.errno
-	except TypeError, e:
-		print e
-		print "\t" + getFunctionInfo(function)
-		exitCode = -1
-	finally:
-		client._transport.close()
-	sys.exit(exitCode)
-
-if __name__ == "__main__":
-	main()
diff --git a/src/tashi/client/tashi-client.py b/src/tashi/client/tashi-client.py
index 8bbfeb0..261982b 100755
--- a/src/tashi/client/tashi-client.py
+++ b/src/tashi/client/tashi-client.py
@@ -186,12 +186,26 @@
 		instances.append(client.createVm(instance))
 	return instances
 
+def shutdownMany(basename):
+	return __shutdownOrDestroyMany("shutdown", basename)
+
 def destroyMany(basename):
+	return __shutdownOrDestroyMany("destroy", basename)
+
+def __shutdownOrDestroyMany(method, basename):
 	instances = client.getInstances()
 	count = 0
 	for i in instances:
 		if (i.name.startswith(basename + "-") and i.name[len(basename)+1].isdigit()):
-			client.destroyVm(i.id)
+			if method == "shutdown":
+				client.shutdownVm(i.id)
+
+			elif method == "destroy":
+				client.destroyVm(i.id)
+
+			else:
+				raise ValueError("Unknown method")
+
 			count = count + 1
 	if (count == 0):
 		raise ValueError("That is an unused basename")
@@ -213,6 +227,7 @@
 'copyImage': (None, None), 
 'createVm': (None, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
 'createMany': (createMany, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
+'shutdownMany': (shutdownMany, None),
 'destroyMany': (destroyMany, None),
 'getVmLayout': (getVmLayout, ['id', 'name', 'state', 'instances', 'usedMemory', 'memory', 'usedCores', 'cores']),
 'getInstances': (None, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
@@ -225,6 +240,7 @@
 'createMany': [('userId', int, getUser, False), ('basename', str, lambda: requiredArg('basename'), True), ('cores', int, lambda: 1, False), ('memory', int, lambda: 128, False), ('disks', parseDisks, lambda: requiredArg('disks'), True), ('nics', parseNics, randomNetwork, False), ('hints', parseHints, lambda: {}, False), ('count', int, lambda: requiredArg('count'), True)],
 'shutdownVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
 'destroyVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
+'shutdownMany': [('basename', str, lambda: requiredArg('basename'), True)],
 'destroyMany': [('basename', str, lambda: requiredArg('basename'), True)],
 'suspendVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
 'resumeVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
@@ -250,6 +266,7 @@
 'createMany': '[Instance(d={"userId":userId,"name":basename,"cores":cores,"memory":memory,"disks":disks,"nics":nics,"hints":hints}), count]',
 'shutdownVm': '[instance]',
 'destroyVm': '[instance]',
+'shutdownMany': '[basename]',
 'destroyMany': '[basename]',
 'suspendVm': '[instance]',
 'resumeVm': '[instance]',
@@ -268,6 +285,7 @@
 'createMany': 'Utility function that creates many VMs with the same set of parameters',
 'shutdownVm': 'Attempts to shutdown a VM nicely',
 'destroyVm': 'Immediately destroys a VM -- it is the same as unplugging a physical machine and should be used for non-persistent VMs or when all else fails',
+'shutdownMany': 'Attempts to gracefully shut down a group of VMs created with createMany',
 'destroyMany': 'Destroys a group of VMs created with createMany',
 'suspendVm': 'Suspends a running VM to disk',
 'resumeVm': 'Resumes a suspended VM from disk',
@@ -293,6 +311,7 @@
 'createMany': ['--basename foobar --disks i386-hardy.qcow2 --count 4'],
 'shutdownVm': ['--instance 12345', '--instance foobar'],
 'destroyVm': ['--instance 12345', '--instance foobar'],
+'shutdownMany': ['--basename foobar'],
 'destroyMany': ['--basename foobar'],
 'suspendVm': ['--instance 12345', '--instance foobar'],
 'resumeVm': ['--instance 12345', '--instance foobar'],
@@ -321,7 +340,8 @@
 			print "Unknown function %s" % (func)
 			print
 		functions = argLists
-		print "%s is the client program for Tashi, a system for cloud-computing on BigData." % (os.path.basename(sys.argv[0]))
+		print "%s is the client program for Tashi" % (os.path.basename(sys.argv[0]))
+		print "Tashi, a system for cloud-computing on BigData"
 		print "Visit http://incubator.apache.org/tashi/ for more information."
 		print
 	else:
@@ -507,6 +527,7 @@
 	"""Main function for the client program"""
 	global INDENT, exitCode, client
 	exitCode = 0
+	exception = None
 	INDENT = (os.getenv("INDENT", 4))
 	if (len(sys.argv) < 2):
 		usage()
@@ -551,25 +572,47 @@
 			if (arg.startswith("--")):
 				if (arg[2:] in possibleArgs):
 					(parg, conv, default, required) = possibleArgs[arg[2:]]
-					val = conv(args.pop(0))
+					try:
+						val = None
+						lookahead = args[0]
+						if not lookahead.startswith("--"):
+							val = args.pop(0)
+					except:
+						pass
+
+					val = conv(val)
 					if (val == None):
 						val = default()
 
 					vals[parg] = val
 					continue
+			# somewhat lame, but i don't want to rewrite the fn at this time
+			exception = ValueError("Unknown argument %s" % (arg)) 
 
-			raise ValueError("Unknown argument %s" % (arg)) 
-
-		
-		f = getattr(client, function, None)
+		f = None
+		try:
+			f = extraViews[function][0]
+		except:
+			pass
 
 		if (f is None):
-			f = extraViews[function][0]
-		if (function in convertArgs):
-			fargs = eval(convertArgs[function], globals(), vals)
-		else:
-			fargs = []
-		res = f(*fargs)
+			f = getattr(client, function, None)
+
+		try:
+			if exception is not None:
+				raise exception
+
+			if (function in convertArgs):
+				fargs = eval(convertArgs[function], globals(), vals)
+			else:
+				fargs = []
+
+			res = f(*fargs)
+		except Exception, e:
+			print "Failed in calling %s: %s" % (function, e)
+			print "Please run tashi-client --examples for syntax information"
+			sys.exit(-1)
+
 		if (res != None):
 			keys = extraViews.get(function, (None, None))[1]
 			try:
diff --git a/src/tashi/client/test.py b/src/tashi/client/test.py
deleted file mode 100644
index a53eefa..0000000
--- a/src/tashi/client/test.py
+++ /dev/null
@@ -1,314 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-import unittest
-import logging
-import sys
-import signal
-import os.path
-import copy
-import time
-import random
-from ConfigParser import ConfigParser
-
-from tashi.services.ttypes import *
-from thrift.transport.TSocket import TSocket
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
-
-from tashi.services import clustermanagerservice
-from tashi.messaging.threadpool import synchronized
-from tashi.messaging.tashimessaging import TestTashiSubscriber
-
-from tashi.util import getConfig
-
-import tashi.client.client
-
-class ClientConnection(object):
-	'''Creates an rpc proxy'''
-	def __init__(self, host, port):
-		self.host = host
-		self.port = port
-		self.transport = TBufferedTransport(TSocket(host, int(port)))
-		self.protocol = TBinaryProtocol(self.transport)
-		self.client = clustermanagerservice.Client(self.protocol)
-		self.client._transport = self.transport
-		self.client._transport.open()
-	def __del__(self):
-		self.client._transport.close()
-
-def incrementor(init=0):
-	while 1:
-		yield init
-		init = init + 1
-
-# FIXME: don't duplicate code from clustermanager
-# def getConfig(args):
-#	 config = ConfigParser()
-#	 configFiles = [
-#		'/usr/share/tashi/ClusterManagerDefaults.cfg',
-#		'/etc/tashi/ClusterManager.cfg',
-#		os.path.expanduser('~/.tashi/ClusterManager.cfg')
-#		] + ([args[0]] if len(args) > 0 else [])
-
-#	 configFiles = config.read(configFiles)
-#	 if len(configFiles) == 0:
-#		 print >>sys.stderr, 'Unable to find the configuration file\n'
-#		 sys.exit(3)
-
-#	 return config
-
-
-class TestClient(unittest.TestCase):
-	@synchronized()
-	def getPortNum(self):
-		return self.portnum.next()
-
-	"""macro test cases for single-host tests
-
-	Assumes cwd is 'src/tashi/client/'
-	"""
-	def setUp(self):
-		"""Create a CM and single NM on local host"""
-		logging.info('setting up test')
-		
-		(self.config, self.configfiles) = getConfig([])
-
-		self.port = 1717		# FIXME: take this (and other things) from config file
-		self.portnum = incrementor(self.port)
-
-		self.cwd = os.getcwd()
-		self.srcd = os.path.dirname(os.path.dirname(self.cwd))
-		
-		self.environ = copy.copy(os.environ)
-		self.environ['PYTHONPATH'] = self.srcd
-		logging.info('base path = %s' % self.srcd)
-
-		self.nm = os.spawnlpe(os.P_NOWAIT, 'python', 'python', 
-							  os.path.join(self.srcd, 'tashi', 'nodemanager', 'nodemanager.py'),
-							  self.environ)
-		self.cm = os.spawnlpe(os.P_WAIT, 'python', 'python',
-							 os.path.join(self.srcd,  'tashi', 'clustermanager', 'clustermanager.py'),
-							 '--drop', '--create',
-							 os.path.expanduser('~/.tashi/ClusterManager.cfg'),
-							 self.environ)
-		self.cm = os.spawnlpe(os.P_NOWAIT, 'python', 'python',
-							 os.path.join(self.srcd,  'tashi', 'clustermanager', 'clustermanager.py'),
-							 os.path.expanduser('~/.tashi/ClusterManager.cfg'),
-							 self.environ)
-		# since we are spawning with P_NOWAIT, we need to sleep to ensure that the CM is listening
-		time.sleep(1)
-		try:
-			self.connection = ClientConnection('localhost', self.config.get('ClusterManagerService', 'port'))
-		except Exception, e:
-			logging.warning('client connection failed')
-			ex = None
-			try:
-				logging.warning("setUp killing node manager " + str(self.nm))
-				os.kill(self.nm, signal.SIGKILL)
-			except Exception, e:
-				ex = e
-				logging.warning('could not kill node manager: '+ str(e))
-			try:
-				logging.warning('setUp killing cluster manager ' + str(self.cm))
-				os.kill(self.cm, signal.SIGKILL)
-			except Exception, e:
-				ex = e
-				logging.warning('could not kill cluster manager: ' + str(e))
-			if e != None:
-				raise e
-
-		logging.info('node manager PID: %i' % self.nm)
-	def tearDown(self):
-		'''Kill the CM and NM that were created by setUP'''
-		logging.info('tearing down test')
-		ex = None
-		try:
-			logging.debug("killing cluster manager " + str(self.cm))
-			os.kill(self.cm, signal.SIGKILL)
-		except Exception, e:
-			ex = e
-			logging.error('Could not kill cluster manager: ' + str(e))
-			
-		try:
-			logging.debug("killing node manager " + str(self.nm))
-			os.kill(self.nm, signal.SIGKILL)
-		except Exception, e:
-			ex = e
-			logging.error('Could not kill node manager: ' + str(e))
-		if ex != None:
-			raise ex
-	def testSetup(self):
-		'''empty test to ensure that setUp code works'''
-		logging.info('setting up')
-	def testHostManagement(self):
-		'''test adding/removing/listing hosts
-
-		Right now this just adds a single host: localhost.  Eventually
-		it should 1) take a list of hosts from a test configuration
-		file, 2) ensure that all were added, 3) remove a random
-		subset, 4) ensure that they were correctly removed, 5) remove
-		all, 6) ensure that they were correctly removed.'''
-
-		# get empty host list
-		hosts = self.connection.client.getHosts()
-		self.assertEqual(hosts, [], 'starting host list not empty: ' + str(hosts) )
-
-		# add a host
-		host = Host()
-		host.hostname = 'localhost'
-		host.enabled=True
-		self.connection.client.addHost(host)
-		hosts = self.connection.client.getHosts()
-		self.assertEqual(len(hosts), 1, 'wrong number of hosts %i, should be %i' % (len(hosts), 1) )
-		self.assertEqual(hosts[0].hostname, 'localhost', 'wrong hostname: ' + str(hosts[0].hostname) )
-
-		# remove first host
-		hid = hosts[0].id
-		self.connection.client.removeHost(hid)
-		hosts = self.connection.client.getHosts()
-		self.assertEqual(hosts, [], 'host list not empty after remove: ' + str(hosts) )
-
-	def testMessaging(self):
-		'''test messaging system started by CM
-
-		tests messages published directly, through events in the CM,
-		and the log system'''
-		# FIXME: add tests for generating events as a side-effect of
-		# rpc commands, as well as logging in the CM
-		portnum = self.getPortNum()
-		self.sub = TestTashiSubscriber(self.config, portnum)
-		self.assertEqual(self.sub.messageQueue.qsize(), 0)
-		self.pub = tashi.messaging.thriftmessaging.PublisherThrift(self.config.get('MessageBroker', 'host'),
-																   int(self.config.get('MessageBroker', 'port')))
-		self.pub.publish({'message-type':'text', 'message':'Hello World!'})
-		time.sleep(0.5)
-		print '*** QSIZE', self.sub.messageQueue.qsize()
-		self.assertEqual(self.sub.messageQueue.qsize(), 1)
-
-		self.log = logging.getLogger(__name__)
-		messageHandler = tashi.messaging.tashimessaging.TashiLogHandler(self.config)
-		self.log.addHandler(messageHandler)
-		# FIXME: why can't we log messages with severity below 'warning'?
-		self.log.warning('test log message')
-		time.sleep(0.5)
-		self.assertEqual(self.sub.messageQueue.qsize(), 2)
-
-		# This should generate at least one log message
-#		 hosts = self.connection.client.getHosts()
-#		 time.sleep(0.5)
-#		 if (self.sub.messageQueue.qsize() <= 2):
-#			 self.fail()
-
-	def testUserManagement(self):
-		'''test adding/removing/listing users
-
-		same as testHostManagement, but with users'''
-		usernames = ['sleepy', 'sneezy', 'dopey', 'doc',
-					 'grumpy', 'bashful', 'happy']
-		# add all users
-		for un in usernames:
-			user = User()
-			user.username = un
-			self.connection.client.addUser(user)
-		# ensure that all were added
-		users = self.connection.client.getUsers()
-		self.assertEqual(len(usernames), len(users))
-		for user in users:
-			usernames.remove(user.username)
-		self.assertEqual(0, len(usernames))
-		# remove a random subset
-		rm = random.sample(users, 4)
-		for user in rm:
-			self.connection.client.removeUser(user.id)
-			users.remove(user)
-		newUsers = self.connection.client.getUsers()
-		# This ensures that the remaining ones are what we expect:
-		for user in newUsers:
-			# if there is a user remaining that we asked to be removed,
-			# this will throw an exception
-			users.remove(user)
-		# if a user was removed that we did not intend, this will
-		# throw an exception
-		self.assertEqual(0, len(users))
-
-#	 def testInstanceConfigurationManagement(self):
-#		 '''test adding/removing/listing instance configurations
-
-#		 same as testHostManagement, but with instance configurations'''
-#		 self.fail('test not implemented')
-	def testHardDiskConfigurationManagement(self):
-		'''test adding/removing/listing hard disk configurations
-
-		same as testHostManagement, but with hard disk configurations'''
-
-		user = User(d={'username':'sleepy'})
-		self.connection.client.addUser(user)
-		users = self.connection.client.getUsers()
-
-		per = PersistentImage()
-		per.userId = users[0].id
-		per.name = 'sleepy-PersistentImage'
-		self.connection.client.addPersistentImage(per)
-		pers = self.connection.client.getPersistentImages()
-
-		inst = InstanceConfiguration()
-		inst.name = 'sleepy-inst'
-		inst.memory = 512
-		inst.cores = 1
-		self.connection.client.addInstanceConfiguration(inst)
-		insts = self.connection.client.getInstanceConfigurations()
-
-		hdc = HardDiskConfiguration()
-		hdc.index = 0
-		hdc.persistentImageId = pers[0].id
-		hdc.persistent = False
-		hdc.instanceConfigurationId = insts[0].id
-
-#	 def testCreateDestroyShutdown(self):
-#		 '''test creating/destroying/shutting down VMs
-
-#		 not implemented'''
-#		 self.fail('test not implemented')
-#	 def testSuspendResume(self):
-#		 '''test suspending/resuming VMs
-
-#		 not implemented'''
-#		 self.fail('test not implemented')
-#	 def testMigrate(self):
-#		 '''test migration
-
-#		 not implemented'''
-#		 self.fail('test not implemented')
-#	 def testPauseUnpause(self):
-#		 '''test pausing/unpausing VMs
-
-#		 not implemented'''
-#		 self.fail('test not implemented')
-
-
-##############################
-# Test Code
-##############################
-if __name__ == '__main__':
-	logging.basicConfig(level=logging.NOTSET,
-						format="%(asctime)s %(levelname)s:\t %(message)s",
-						stream=sys.stdout)
-
-	suite = unittest.TestLoader().loadTestsFromTestCase(TestClient)
-	unittest.TextTestRunner(verbosity=2).run(suite)
-
diff --git a/src/tashi/clustermanager/clustermanager.py b/src/tashi/clustermanager/clustermanager.py
index db61194..818bbb0 100755
--- a/src/tashi/clustermanager/clustermanager.py
+++ b/src/tashi/clustermanager/clustermanager.py
@@ -17,11 +17,12 @@
 # specific language governing permissions and limitations
 # under the License.    
 
+import os
 import sys
-import signal
+import time
 import logging.config
 
-from tashi.util import signalHandler, boolean, instantiateImplementation, getConfig, debugConsole
+from tashi.util import boolean, instantiateImplementation, getConfig, debugConsole
 import tashi
 
 from tashi.rpycservices import rpycservices
@@ -54,19 +55,11 @@
 	t.service._type = 'ClusterManagerService'
 
 	debugConsole(globals())
-	
-	try:
-		t.start()
-	except KeyboardInterrupt:
-		handleSIGTERM(signal.SIGTERM, None)
 
-@signalHandler(signal.SIGTERM)
-def handleSIGTERM(signalNumber, stackFrame):
-	global log
+	t.start()
+	# shouldn't exit by itself
+	return
 
-	log.info('Exiting cluster manager after receiving a SIGINT signal')
-	sys.exit(0)
-	
 def main():
 	global log
 	
@@ -80,7 +73,32 @@
 	
 	# bind the database
 	log.info('Starting cluster manager')
-	startClusterManager(config)
+
+	# handle keyboard interrupts (http://code.activestate.com/recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/)
+	child = os.fork()
+
+	if child == 0:
+		startClusterManager(config)
+		# shouldn't exit by itself
+		sys.exit(0)
+
+	else:
+		# main
+		try:
+			os.waitpid(child, 0)
+		except KeyboardInterrupt:
+			log.info("Exiting cluster manager after receiving a SIGINT signal")
+			os._exit(0)
+		except Exception:
+			log.exception("Abnormal termination of cluster manager")
+			os._exit(-1)
+
+		log.info("Exiting cluster manager after service thread exited")
+		os._exit(-1)
+
+	return
+
+
 
 if __name__ == "__main__":
 	main()
diff --git a/src/tashi/clustermanager/clustermanagerservice.py b/src/tashi/clustermanager/clustermanagerservice.py
index 32ab2f9..a2b116d 100644
--- a/src/tashi/clustermanager/clustermanagerservice.py
+++ b/src/tashi/clustermanager/clustermanagerservice.py
@@ -19,7 +19,7 @@
 import threading
 import time
 
-from tashi.rpycservices import rpycservices             
+from tashi.rpycservices import rpycservices	     
 from tashi.rpycservices.rpyctypes import Errors, InstanceState, HostState, TashiException
 from tashi import boolean, ConnectionManager, vmStates, version, scrubString
 
@@ -36,7 +36,7 @@
 		else:
 			self.username = None
 			self.password = None
-		self.proxy = ConnectionManager(self.username, self.password, int(self.config.get('ClusterManager', 'nodeManagerPort')))
+		self.proxy = ConnectionManager(self.username, self.password, int(self.config.get('ClusterManager', 'nodeManagerPort')), authAndEncrypt=self.authAndEncrypt)
 		self.dfs = dfs
 		self.convertExceptions = boolean(config.get('ClusterManagerService', 'convertExceptions'))
 		self.log = logging.getLogger(__name__)
@@ -71,7 +71,7 @@
 		try:
 			if (self.accountingHost is not None) and \
 				    (self.accountingPort is not None):
-				self.accountingClient=rpycservices.client(self.accountingHost, self.accountingPort)
+				self.accountingClient = ConnectionManager(self.username, self.password, self.accountingPort)[self.accountingHost]
 		except:
 			self.log.exception("Could not init accounting")
 
@@ -126,7 +126,7 @@
 			except:
 				self.log.exception("Invalid host data")
 
-                secondary = ','.join(filter(None, (hostText, instanceText)))
+		secondary = ','.join(filter(None, (hostText, instanceText)))
 
 		line = "%s|%s|%s" % (now, text, secondary)
 
@@ -271,7 +271,12 @@
 		for instanceId in self.instanceLastContactTime.keys():
 
 			# XXXstroucki should lock instance here?
-			if (self.instanceLastContactTime[instanceId] < (self.__now() - self.allowDecayed)):
+			try:
+				lastContactTime = self.instanceLastContactTime[instanceId]
+			except KeyError:
+				continue
+
+			if (lastContactTime < (self.__now() - self.allowDecayed)):
 				try:
 					instance = self.data.acquireInstance(instanceId)
 					# Don't query non-running VMs. eg. if a VM
@@ -348,7 +353,7 @@
 	
 	def shutdownVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.__stateTransition(instance, InstanceState.Running, InstanceState.ShuttingDown)
+		self.__stateTransition(instance, None, InstanceState.ShuttingDown)
 		self.data.releaseInstance(instance)
 		self.__ACCOUNT("CM VM SHUTDOWN", instance=instance)
 		hostname = self.data.getHost(instance.hostId).name
@@ -366,7 +371,7 @@
 			self.data.removeInstance(instance)
 		elif (instance.state is InstanceState.Activating):
 			self.__ACCOUNT("CM VM DESTROY STARTING", instance=instance)
-			self.__stateTransition(instance, InstanceState.Activating, InstanceState.Destroying)
+			self.__stateTransition(instance, None, InstanceState.Destroying)
 			self.data.releaseInstance(instance)
 		else:
 			# XXXstroucki: This is a problem with keeping
@@ -382,7 +387,7 @@
 						self.proxy[hostname].destroyVm(instance.vmId)
 						self.data.releaseInstance(instance)
 				except:
-					self.log.exception('destroyVm failed on host %s vmId %s' % (hostname, str(instance.vmId)))
+					self.log.warning('destroyVm failed on host %s vmId %s' % (hostname, str(instance.vmId)))
 					self.data.removeInstance(instance)
 
 
@@ -390,7 +395,12 @@
 	
 	def suspendVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.__stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
+		try:
+			self.__stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
+		except TashiException:
+			self.data.releaseInstance(instance)
+			raise
+
 		self.data.releaseInstance(instance)
 		self.__ACCOUNT("CM VM SUSPEND", instance=instance)
 		hostname = self.data.getHost(instance.hostId).name
@@ -404,7 +414,12 @@
 	
 	def resumeVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.__stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
+		try:
+			self.__stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
+		except TashiException:
+			self.data.releaseInstance(instance)
+			raise
+
 		source = "suspend/%d_%s" % (instance.id, instance.name)
 		instance.hints['__resume_source'] = source
 		self.data.releaseInstance(instance)
@@ -422,7 +437,13 @@
 		except:
 			self.data.releaseInstance(instance)
 			raise
-		self.__stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
+
+		try:
+			self.__stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
+		except TashiException:
+			self.data.releaseInstance(instance)
+			raise
+
 		self.data.releaseInstance(instance)
 		try:
 			# Prepare the target
@@ -434,7 +455,12 @@
 			self.log.exception('prepReceiveVm failed')
 			raise
 		instance = self.data.acquireInstance(instance.id)
-		self.__stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
+		try:
+			self.__stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
+		except TashiException:
+			self.data.releaseInstance(instance)
+			raise
+
 		self.data.releaseInstance(instance)
 		try:
 			# Send the VM
@@ -458,7 +484,12 @@
 	
 	def pauseVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.__stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
+		try:
+			self.__stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
+		except TashiException:
+			self.data.releaseInstance(instance)
+			raise
+
 		self.data.releaseInstance(instance)
 		self.__ACCOUNT("CM VM PAUSE", instance=instance)
 		hostname = self.data.getHost(instance.hostId).name
@@ -468,13 +499,23 @@
 			self.log.exception('pauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
 			raise
 		instance = self.data.acquireInstance(instanceId)
-		self.__stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
+		try:
+			self.__stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
+		except TashiException:
+			self.data.releaseInstance(instance)
+			raise
+
 		self.data.releaseInstance(instance)
 		return
 
 	def unpauseVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
-		self.__stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
+		try:
+			self.__stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
+		except TashiException:
+			self.data.releaseInstance(instance)
+			raise
+
 		self.data.releaseInstance(instance)
 		self.__ACCOUNT("CM VM UNPAUSE", instance=instance)
 		hostname = self.data.getHost(instance.hostId).name
@@ -484,7 +525,12 @@
 			self.log.exception('unpauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
 			raise
 		instance = self.data.acquireInstance(instanceId)
-		self.__stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
+		try:
+			self.__stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
+		except TashiException:
+			self.data.releaseInstance(instance)
+			raise
+
 		self.data.releaseInstance(instance)
 		return
 	
@@ -633,7 +679,7 @@
 		self.__ACCOUNT("CM VM ACTIVATE", instance=instance)
 
 		if ('__resume_source' in instance.hints):
-			self.__stateTransition(instance, InstanceState.Pending, InstanceState.Resuming)
+			self.__stateTransition(instance, None, InstanceState.Resuming)
 		else:
 			# XXXstroucki should held VMs be continually tried? Or be explicitly set back to pending?
 			#self.__stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
@@ -679,12 +725,12 @@
 		self.data.releaseInstance(instance)
 		return "success"
 
-        def registerHost(self, hostname, memory, cores, version):
-                hostId, alreadyRegistered = self.data.registerHost(hostname, memory, cores, version)
-                if alreadyRegistered:
-                        self.log.info("Host %s is already registered, it was updated now" % hostname)
-                else:
-                        self.log.info("A host was registered - hostname: %s, version: %s, memory: %s, cores: %s" % (hostname, version, memory, cores))
+	def registerHost(self, hostname, memory, cores, version):
+		hostId, alreadyRegistered = self.data.registerHost(hostname, memory, cores, version)
+		if alreadyRegistered:
+			self.log.info("Host %s is already registered, it was updated now" % hostname)
+		else:
+			self.log.info("A host was registered - hostname: %s, version: %s, memory: %s, cores: %s" % (hostname, version, memory, cores))
 
 		try:
 			host = self.data.getHost(hostId)
@@ -692,9 +738,9 @@
 		except:
 			self.log.warning("Failed to lookup host %s" % hostId)
 
-                return hostId
+		return hostId
 
-        def unregisterHost(self, hostId):
+	def unregisterHost(self, hostId):
 		try:
 			host = self.data.getHost(hostId)
 			self.__ACCOUNT("CM HOST UNREGISTER", host=host)
@@ -702,9 +748,9 @@
 			self.log.warning("Failed to lookup host %s" % hostId)
 			return
 
-                self.data.unregisterHost(hostId)
-                self.log.info("Host %s was unregistered" % hostId)
-                return
+		self.data.unregisterHost(hostId)
+		self.log.info("Host %s was unregistered" % hostId)
+		return
 
 	# service thread
 	def __monitorCluster(self):
diff --git a/src/tashi/clustermanager/data/fromconfig.py b/src/tashi/clustermanager/data/fromconfig.py
index 8511a07..d4cdaff 100644
--- a/src/tashi/clustermanager/data/fromconfig.py
+++ b/src/tashi/clustermanager/data/fromconfig.py
@@ -16,16 +16,18 @@
 # under the License.    
 
 from __future__ import with_statement
+import logging
 import threading
 import os
 import ConfigParser
 
-from tashi.rpycservices.rpyctypes import Host, Network, User, TashiException, Errors, HostState
+from tashi.rpycservices.rpyctypes import Host, Network, User, TashiException, Errors, HostState, Instance
 from tashi.clustermanager.data import DataInterface
 
 class FromConfig(DataInterface):
 	def __init__(self, config):
 		DataInterface.__init__(self, config)
+		self.log = logging.getLogger(__name__)
 		self.hosts = {}
 		self.instances = {}
 		self.networks = {}
@@ -78,6 +80,10 @@
 		return instanceId
 	
 	def registerInstance(self, instance):
+		if type(instance) is not Instance:
+			self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+			raise TypeError
+
 		self.acquireLock(self.instanceLock)
 		try:
 			if (instance.id is not None and instance.id not in self.instances):
@@ -107,6 +113,10 @@
 		return instance
 	
 	def releaseInstance(self, instance):
+		if type(instance) is not Instance:
+			self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+			raise TypeError
+
 		try:
 			if (instance.id not in self.instances): # MPR: should never be true, but good to check
 				raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (instance.id)})
@@ -114,6 +124,10 @@
 			self.releaseLock(instance._lock)
 	
 	def removeInstance(self, instance):
+		if type(instance) is not Instance:
+			self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+			raise TypeError
+
 		self.acquireLock(self.instanceLock)
 		try:
 			del self.instances[instance.id]
@@ -122,6 +136,10 @@
 			self.releaseLock(self.instanceLock)
 	
 	def acquireHost(self, hostId):
+		if type(hostId) is not int:
+			self.log.exception("Argument is not of type int, but of type %s" % (type(hostId)))
+			raise TypeError
+
 		self.hostLock.acquire()
 		host = self.hosts.get(hostId, None)
 		if (host is None):
@@ -134,6 +152,10 @@
 
 	
 	def releaseHost(self, host):
+		if type(host) is not Host:
+			self.log.exception("Argument is not of type Host, but of type %s" % (type(host)))
+			raise TypeError
+
 		try:
 			if (host.id not in self.hosts): # MPR: should never be true, but good to check
 				raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (host.id)})
diff --git a/src/tashi/clustermanager/data/getentoverride.py b/src/tashi/clustermanager/data/getentoverride.py
index 21b2f8f..75703ea 100644
--- a/src/tashi/clustermanager/data/getentoverride.py
+++ b/src/tashi/clustermanager/data/getentoverride.py
@@ -15,16 +15,18 @@
 # specific language governing permissions and limitations
 # under the License.    
 
+import logging
 import subprocess
 import time
 import os
-from tashi.rpycservices.rpyctypes import User, LocalImages
+from tashi.rpycservices.rpyctypes import User, LocalImages, Instance, Host
 from tashi.clustermanager.data import DataInterface
 from tashi.util import instantiateImplementation, humanReadable
 
 class GetentOverride(DataInterface):
 	def __init__(self, config):
 		DataInterface.__init__(self, config)
+		self.log = logging.getLogger(__name__)
 		self.baseDataObject = instantiateImplementation(config.get("GetentOverride", "baseData"), config)
 		self.dfs = instantiateImplementation(config.get("ClusterManager", "dfs"), config)
 
@@ -33,21 +35,41 @@
 		self.fetchThreshold = float(config.get("GetentOverride", "fetchThreshold"))
 	
 	def registerInstance(self, instance):
+		if type(instance) is not Instance:
+			self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+			raise TypeError
+
 		return self.baseDataObject.registerInstance(instance)
 	
 	def acquireInstance(self, instanceId):
 		return self.baseDataObject.acquireInstance(instanceId)
 	
 	def releaseInstance(self, instance):
+		if type(instance) is not Instance:
+			self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+			raise TypeError
+
 		return self.baseDataObject.releaseInstance(instance)
 	
 	def removeInstance(self, instance):
+		if type(instance) is not Instance:
+			self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+			raise TypeError
+
 		return self.baseDataObject.removeInstance(instance)
 	
 	def acquireHost(self, hostId):
+		if type(hostId) is not int:
+			self.log.exception("Argument is not of type int, but of type %s" % (type(hostId)))
+			raise TypeError
+
 		return self.baseDataObject.acquireHost(hostId)
 	
 	def releaseHost(self, host):
+		if type(host) is not Host:
+			self.log.exception("Argument is not of type Host, but of type %s" % (type(host)))
+			raise TypeError
+
 		return self.baseDataObject.releaseHost(host)
 	
 	def getHosts(self):
diff --git a/src/tashi/clustermanager/data/ldapoverride.py b/src/tashi/clustermanager/data/ldapoverride.py
index 17904ab..66a60d6 100644
--- a/src/tashi/clustermanager/data/ldapoverride.py
+++ b/src/tashi/clustermanager/data/ldapoverride.py
@@ -72,16 +72,16 @@
 	def getNetwork(self, id):
 		return self.baseDataObject.getNetwork(id)
 
-        def getImages(self):
-                count = 0
-                myList = []
-                for i in self.dfs.list("images"):
-                        myFile = self.dfs.getLocalHandle("images/" + i)
-                        if os.path.isfile(myFile):
-                                image = LocalImages(d={'id':count, 'imageName':i, 'imageSize':humanReadable(self.dfs.stat(myFile)[6])})
-                                myList.append(image)
-                                count += 1
-                return myList
+	def getImages(self):
+		count = 0
+		myList = []
+		for i in self.dfs.list("images"):
+			myFile = self.dfs.getLocalHandle("images/" + i)
+			if os.path.isfile(myFile):
+				image = LocalImages(d={'id':count, 'imageName':i, 'imageSize':humanReadable(self.dfs.stat(myFile)[6])})
+				myList.append(image)
+				count += 1
+		return myList
 
 	def fetchFromLdap(self):
 		now = time.time()
diff --git a/src/tashi/clustermanager/data/pickled.py b/src/tashi/clustermanager/data/pickled.py
index b6724c8..d5348a4 100644
--- a/src/tashi/clustermanager/data/pickled.py
+++ b/src/tashi/clustermanager/data/pickled.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.    
 
+import logging
 import cPickle
 import os
 import threading
@@ -24,6 +25,7 @@
 class Pickled(FromConfig):
 	def __init__(self, config):
 		DataInterface.__init__(self, config)
+		self.log = logging.getLogger(__name__)
 		self.file = self.config.get("Pickled", "file")
 		self.locks = {}
 		self.lockNames = {}
diff --git a/src/tashi/clustermanager/data/sql.py b/src/tashi/clustermanager/data/sql.py
index 64e5681..6b48017 100644
--- a/src/tashi/clustermanager/data/sql.py
+++ b/src/tashi/clustermanager/data/sql.py
@@ -130,6 +130,10 @@
 		return h
 	
 	def registerInstance(self, instance):
+		if type(instance) is not Instance:
+			self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+			raise TypeError
+
 		self.instanceLock.acquire()
 		try:
 			if (instance.id is not None and instance.id not in self.getInstances()):
@@ -173,6 +177,10 @@
 		return instance
 	
 	def releaseInstance(self, instance):
+		if type(instance) is not Instance:
+			self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+			raise TypeError
+
 		self.instanceLock.acquire()
 		try:
 			l = self.makeInstanceList(instance)
@@ -191,6 +199,10 @@
 			self.instanceLock.release()
 	
 	def removeInstance(self, instance):
+		if type(instance) is not Instance:
+			self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+			raise TypeError
+
 		self.instanceLock.acquire()
 		try:
 			self.executeStatement("DELETE FROM instances WHERE id = %d" % (instance.id))
@@ -205,6 +217,10 @@
 			self.instanceLock.release()
 	
 	def acquireHost(self, hostId):
+		if type(hostId) is not int:
+			self.log.exception("Argument is not of type int, but of type %s" % (type(hostId)))
+			raise TypeError
+
 		host = self.getHost(hostId)
 		self.hostLock.acquire()
 		self.hostLocks[host.id] = self.hostLocks.get(host.id, threading.Lock())
@@ -214,6 +230,10 @@
 		return host
 	
 	def releaseHost(self, host):
+		if type(host) is not Host:
+			self.log.exception("Argument is not of type Host, but of type %s" % (type(host)))
+			raise TypeError
+
 		l = self.makeHostList(host)
 		s = ""
 		for e in range(0, len(self.hostOrder)):
@@ -284,16 +304,17 @@
 		network = Network(d={'id':r[0], 'name':r[1]})
 		return network
 
-        def getImages(self):
-                count = 0
-                myList = []
-                for i in self.dfs.list("images"):
-                        myFile = self.dfs.getLocalHandle("images/" + i)
-                        if os.path.isfile(myFile):
-                                image = LocalImages(d={'id':count, 'imageName':i, 'imageSize':humanReadable(self.dfs.stat(myFile)[6])})
-                                myList.append(image)
-                                count += 1
-                return myList
+	def getImages(self):
+		count = 0
+		myList = []
+		for i in self.dfs.list("images"):
+			myFile = self.dfs.getLocalHandle("images/" + i)
+			if os.path.isfile(myFile):
+				image = LocalImages(d={'id':count, 'imageName':i, 'imageSize':humanReadable(self.dfs.stat(myFile)[6])})
+				myList.append(image)
+				count += 1
+
+		return myList
 	
 	def getUsers(self):
 		cur = self.executeStatement("SELECT * from users")
diff --git a/src/tashi/connectionmanager.py b/src/tashi/connectionmanager.py
index 5eeae6c..d4093b9 100644
--- a/src/tashi/connectionmanager.py
+++ b/src/tashi/connectionmanager.py
@@ -16,14 +16,16 @@
 # under the License.    
 
 from tashi.rpycservices import rpycservices
+from tashi import Connection
 #from tashi.rpycservices.rpyctypes import *
 
 class ConnectionManager(object):
-	def __init__(self, username, password, port, timeout=10000.0):
+	def __init__(self, username, password, port, timeout=10000.0, authAndEncrypt=False):
 		self.username = username
 		self.password = password
 		self.timeout = timeout
 		self.port = port
+		self.authAndEncrypt = authAndEncrypt
 	
 	def __getitem__(self, hostname):
 		port = self.port
@@ -31,4 +33,4 @@
 			port = hostname[1]
 			hostname = hostname[0]
 
-		return rpycservices.client(hostname, port, username=self.username, password=self.password)
+		return Connection(hostname, port, credentials=(self.username, self.password), authAndEncrypt=self.authAndEncrypt)
diff --git a/src/tashi/messaging/messageBroker.py b/src/tashi/messaging/messageBroker.py
deleted file mode 100644
index c21b57a..0000000
--- a/src/tashi/messaging/messageBroker.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-import ConfigParser
-import getopt
-
-import os
-import sys
-import time
-
-import thriftmessaging
-
-options = []
-long_options = ['port=']
-
-# FIXME: should initialize from config file
-params = {"port":1717}
-
-try:
-	optlist, args = getopt.getopt(sys.argv[1:], options, long_options)
-except getopt.GetoptError, err:
-	print str(err)
-	sys.exit(2)
-
-for opt in optlist:
-	if opt[0] == "--port":
-		try:
-			params["port"] = int(opt[1])
-		except:
-			print "--port expects an integer, got %s" % opt[1]
-			sys.exit(0)
-
-print "Starting message broker on port %i" % params["port"]
-broker = thriftmessaging.MessageBrokerThrift(params["port"], daemon=False)
-
diff --git a/src/tashi/messaging/messaging.py b/src/tashi/messaging/messaging.py
deleted file mode 100644
index c421d5c..0000000
--- a/src/tashi/messaging/messaging.py
+++ /dev/null
@@ -1,337 +0,0 @@
-#!/usr/bin/python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-import threading
-import thread
-import sys
-import os
-import socket
-import Queue
-import copy
-import random
-import traceback
-
-from threadpool import ThreadPoolClass, threadpool, ThreadPool
-from threadpool import threadpoolmethod, threaded, synchronized, synchronizedmethod
-
-class RWLock(object):
-	"""RWLock: Simple reader/writer lock implementation
-	FIXME: this implementation will starve writers!
-	Methods:
-		acquire() : take lock for read access
-		release() : release lock from read access
-		acquireWrite() : take lock for write access
-		releaseWrite() : release lock from write access"""
-	def __init__(self):
-		self.lock = threading.Condition()
-		self.readers = 0
-	def acquire(self):
-		self.lock.acquire()
-		self.readers = self.readers + 1
-		self.lock.release()
-	def release(self):
-		self.lock.acquire()
-		self.readers = self.readers - 1
-		self.lock.notify()
-		self.lock.release()
-	def acquireWrite(self):
-		self.lock.acquire()
-		while self.readers > 0:
-			self.lock.wait()
-	def releaseWrite(self):
-		self.lock.notify()
-		self.lock.release()
-
-
-
-class MessageBroker(object):
-	def __init__(self):
-		self.sublock = RWLock()
-		self.subscribers = []
-		self.random = random.Random()
-	def log(self, msg):
-		print "MessageBroker: Got log: '%s'" % str(msg)
-		return msg
-	def addSubscriber(self, subscriber):
-		self.sublock.acquireWrite()
-		self.subscribers.append(subscriber)
-		l = len(self.subscribers)
-		self.sublock.releaseWrite()
-		return l
-	def publish(self, message):
-		removesubs = []
-		i = self.random.randint(0,100)
-
-#		 subscribers = self.getSubscribers()
-#		 random.shuffle(subscribers)
-
-		self.sublock.acquire()
-
-		sys.stdout.flush()
-
-		for subscriber in self.subscribers:
-			try:
-				sys.stdout.flush()
-				assert(subscriber != self)
-				subscriber.publish(message)
-				sys.stdout.flush()
-			except Exception, e:
-				print e
-				removesubs.append(subscriber)
-
-		self.sublock.release()
-
-		if len(removesubs) > 0:
-			print "detected %i failed subscribers" % len(removesubs)
-			sys.stdout.flush()
-			self.sublock.acquireWrite()
-			for subscriber in removesubs:
-				try:
-					self.subscribers.remove(subscriber)
-				except:
-					pass
-			self.sublock.releaseWrite()
-	def getSubscribers(self):
-		self.sublock.acquire()
-		subs = copy.copy(self.subscribers)
-		self.sublock.release()
-		return subs
-	def removeSubscriber(self, subscriber):
-		self.sublock.acquireWrite()
-		try:
-			self.subscribers.remove(subscriber)
-		except:
-			pass
-		self.sublock.releaseWrite()
-	def publishList(self, messages):
-		for message in messages:
-			self.publish(message)
-
-class Subscriber(object):
-	def __init__(self, broker, pmatch={}, nmatch={}, synchronized=False):
-		self.broker = broker
-		self.lock = threading.Lock()
-		self.synchronized = synchronized
-		self.pmatch={}
-		self.nmatch={}
-		broker.addSubscriber(self)
-	def publish(self, message):
-		sys.stdout.flush()
-		msg = message
-		try:
-			if self.synchronized:
-				self.lock.acquire()
-			msg = self.filter(msg)
-			if (msg != None):
-				self.handle(msg)
-			if self.synchronized:
-				self.lock.release()
-		except Exception, x:
-			if self.synchronized:
-				self.lock.release()
-			print '%s, %s, %s' % (type(x), x, traceback.format_exc())
-	def publishList(self, messages):
-		for message in messages:
-			self.publish(message)
-	def handle(self, message):
-		print "Subscriber Default Handler: '%s'" % message
-	def setMatch(self, pmatch={}, nmatch={}):
-		self.lock.acquire()
-		self.pmatch=pmatch
-		self.nmatch=nmatch
-		self.lock.release()
-	def filter(self, message):
-		"""filter(self, message) : the filter function returns
-		the message, modified to be passed to the handler.
-		Returning (None) indicates that this is not a message
-		we are interested in, and it will not be passed to the
-		handler."""
-		send = True
-		for key in self.pmatch.keys():
-			if (not message.has_key(key)):
-				send = False
-				break
-			if self.pmatch[key] != None:
-				if message[key] != self.pmatch[key]:
-					send = False
-					break
-		if send == False:
-			return None
-		for key in message.keys():
-			if self.nmatch.has_key(key):
-				if self.nmatch[key] == None:
-					send = False
-					break
-				if self.nmatch[key] == message[key]:
-					send = False
-					break
-		if send == False:
-			return None
-		return message
-
-
-	
-class Publisher(object):
-	'''Superclass for pub/sub publishers
-
-	FIXME: use finer-grained locking'''
-	def __init__(self, broker, aggregate=100):
-		self.pending = []
-		self.pendingLock = threading.Lock()
-		self.aggregateSize = aggregate
-		self.broker = broker
-	@synchronizedmethod
-	def publish(self, message):
-		if message.has_key('aggregate') and message['aggregate'] == 'True':
-			self.aggregate(message)
-			return
-		else:
-			self.broker.publish(message)
-	@synchronizedmethod
-	def publishList(self, messages):
-		self.broker.publishList(messages)
-	@synchronizedmethod
-	def aggregate(self, message):
-		# we can make this lock-less by using a queue for pending
-		# messages
-		self.pendingLock.acquire()
-		self.pending.append(message)
-		if len(self.pending) >= self.aggregateSize:
-			self.broker.publishList(self.pending)
-			self.pending = []
-		self.pendingLock.release()
-	@synchronizedmethod
-	def setBroker(self, broker):
-		self.broker = broker
-
-##############################
-# Testing Code
-##############################
-import time
-import unittest
-import sys
-import logging
-
-		
-class TestSubscriber(Subscriber):
-	def __init__(self, *args, **kwargs):
-		self.queue = Queue.Queue()
-		Subscriber.__init__(self, *args, **kwargs)
-	def handle(self, message):
-		self.queue.put(message)
-
-class TestMessaging(unittest.TestCase):
-	def setUp(self):
-		self.broker = MessageBroker()
-		self.publisher = Publisher(self.broker)
-		self.subscriber = TestSubscriber(self.broker)
-	def testPublish(self):
-		self.publisher.publish( {'message':'hello world'} )
-		self.assertEqual(self.subscriber.queue.qsize(), 1)
-	def testPublishList(self):
-		nrmsgs = 10
-		msgs = []
-		for i in range(nrmsgs):
-			msgs.append( {'msgnum':str(i)} )
-		self.publisher.publishList( msgs )
-		self.assertEqual(self.subscriber.queue.qsize(), nrmsgs)
-	def testAggregate(self):
-		nrmsgs = self.publisher.aggregateSize
-		for i in range(nrmsgs):
-			self.assertEqual(self.subscriber.queue.qsize(), 0)
-			self.publisher.aggregate( {'msgnum':str(i)} )
-		self.assertEqual(self.subscriber.queue.qsize(), nrmsgs)
-	def testAggregateKeyword(self):
-		nrmsgs = self.publisher.aggregateSize
-		for i in range(nrmsgs):
-			self.assertEqual(self.subscriber.queue.qsize(), 0)
-			self.publisher.publish( {'msgnum':str(i), 'aggregate':'True'} )
-		self.assertEqual(self.subscriber.queue.qsize(), nrmsgs)
-
-if __name__ == '__main__':
-
-	logging.basicConfig(level=logging.INFO,
-						format="%(asctime)s %(levelname)s:\t %(message)s",
-						stream=sys.stdout)
-
-	suite = unittest.TestLoader().loadTestsFromTestCase(TestMessaging)
-	unittest.TextTestRunner(verbosity=2).run(suite) 
-
-	sys.exit(0)
-
-
-##############################
-# Old/Unused testing code
-##############################
-
-
-
-	print 'testing removeSubscriber'
-	broker.removeSubscriber(subscriber)
-	publisher.publish( {'message':"you shouldn't see this"} )
-
-	nsub = NullSubscriber(broker)
-	print 'timing publish'
-	nrmsg = 100000
-	tt = time.time()
-	for i in range(nrmsg):
-#		publisher.publish( {"message":"hello world!"} )
-		publisher.publish( {} )
-	tt = time.time() - tt
-	print "Published %i messages in %f seconds, %f msg/s"%(nrmsg,
-														   tt,
-														   nrmsg/tt)
-	broker.removeSubscriber(nsub)
-
-	class SlowSubscriber(Subscriber):
-		def handle(self, message):
-			print 'called slow subscriber with message', message
-			time.sleep(1)
-			print 'returning from slow subscriber with message', message
-	class ThreadedSubscriber(Subscriber):
-		@threaded
-		def handle(self, message):
-			print 'called threaded subscriber with message', message
-			time.sleep(1)
-			print 'returning from threaded subscriber with message', message
-	class ThreadPoolSubscriber(Subscriber, ThreadPoolClass):
-		@threadpoolmethod
-		def handle(self, message):
-			print 'called threadpool subscriber with message', message
-			time.sleep(1)
-			print 'returning from threadpool subscriber with message', message
-
-
-
-	tsub = ThreadedSubscriber(broker)
-	for i in range(8):
-		publisher.publish( {"msg":str(i)} )
-	broker.removeSubscriber(tsub)
-	time.sleep(3)
-
-	tpsub = ThreadPoolSubscriber(broker)
-	for i in range(8):
-		publisher.publish( {"msg":str(i)} )
-	broker.removeSubscriber(tpsub)
-	time.sleep(3)
-
-	ssub = SlowSubscriber(broker)
-	for i in range(4):
-		publisher.publish( {"msg":str(i)} )
-	broker.removeSubscriber(ssub)
diff --git a/src/tashi/messaging/soapmessaging.py b/src/tashi/messaging/soapmessaging.py
deleted file mode 100755
index be35fc9..0000000
--- a/src/tashi/messaging/soapmessaging.py
+++ /dev/null
@@ -1,229 +0,0 @@
-#! /usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-from messaging import *
-
-import cPickle
-import soaplib.wsgi_soap
-import cherrypy.wsgiserver
-from soaplib.service import soapmethod
-from soaplib.serializers.primitive import *
-import SOAPpy.WSDL
-import time
-
-class MessageBrokerSoap(soaplib.wsgi_soap.SimpleWSGISoapApp, MessageBroker):
-	def __init__(self, port):
-		soaplib.wsgi_soap.SimpleWSGISoapApp.__init__(self)
-		MessageBroker.__init__(self)
-		self.port = port
-		def trdfn():
-			service = self
-			server = cherrypy.wsgiserver.CherryPyWSGIServer(("0.0.0.0",port), service)
-			server.start()
-		threading.Thread(target=trdfn).start()
-
-
-	@soapmethod(Array(String), Array(String), _returns=Null)
-	def log(self, keys, values):
-		message = {}
-		if len(keys) != len(values):
-			raise Exception, "Different lengths for keys and values"
-		for i in range(len(keys)):
-			message[keys[i]] = values[i]
-		MessageBroker.log(self, message)
-
-	@soapmethod(String, Integer, _returns=Null)
-	def addSubscriber(self, host, port):
-		subscriber = SubscriberSoapProxy(host, port)
-		MessageBroker.addSubscriber(self, subscriber)
-	
-	@soapmethod(String, Integer, _returns=Null)
-	def removeSubscriber(self, host, port):
-		# should this method really be able to peek into subscriber.host/port 
-		subscriber = None
-		subscribers = self.getSubscribers()
-		for subscriber in subscribers:
-			if subscriber.host == host and subscriber.port == port:
-				subscriber = subscriber
-		if subscriber != None:
-			MessageBroker.removeSubscriber(self, subscriber)
-		
-
-	@soapmethod(Array(String), Array(String), _returns=Null)
-	def publish(self, keys, values):
-		message = {}
-		if len(keys) != len(values):
-			raise Exception, "Different lengths for keys and values"
-		for i in range(len(keys)):
-			message[keys[i]] = values[i]
-		MessageBroker.publish(self, message)
-
-
-
-class MessageBrokerSoapProxy(object):
-	def __init__(self, host, port):
-		self.host = host
-		self.port = port
-		self.connection = SOAPpy.WSDL.Proxy("http://%s:%i/.wsdl"%(host,port))
-	def log(self, message):
-		keys = []
-		values = []
-		for k,v in message.items():
-			keys.append(k)
-			values.append(v)
-		self.connection.log(keys=keys, values=values)
-	def addSubscriber(self, subscriber):
-		self.connection.addSubscriber(host=subscriber.host, port=subscriber.port)
-	def publish(self, message):
-		keys = []
-		values = []
-		for k,v in message.items():
-			keys.append(k)
-			values.append(v)
-		self.connection.publish(keys=keys, values=values)
-	def removeSubscriber(self, subscriber):
-		self.connection.removeSubscriber(host=subscriber.host, port=subscriber.port)
-
-
-
-
-class SubscriberSoap(soaplib.wsgi_soap.SimpleWSGISoapApp, Subscriber):
-	def __init__(self, broker, port, synchronized=False):
-		soaplib.wsgi_soap.SimpleWSGISoapApp.__init__(self)
-		Subscriber.__init__(self, synchronized=synchronized)
-		self.host = socket.gethostname()
-		self.port = port
-		self.broker = broker
-		self.server = None
-		def trdfn():
-			service = self
-			self.server = cherrypy.wsgiserver.CherryPyWSGIServer(("0.0.0.0",port), service)
-			self.server.start()
-		threading.Thread(target=trdfn).start()
-#		broker.log("Subscriber started")
-		broker.addSubscriber(self)
-	@soapmethod(Array(String), Array(String), _returns=Integer)
-	def publish(self, keys, values):
-		message = {}
-		if len(keys) != len(values):
-			raise Exception, "Different lengths for keys and values"
-		for i in range(len(keys)):
-			message[keys[i]] = values[i]
-		Subscriber.publish(self, message)
-		return 0
-	def stop(self):
-		self.server.stop()
-
-class SubscriberSoapProxy(object):
-	def __init__(self, host, port):
-		self.host = host
-		self.port = port
-		self.connection = SOAPpy.WSDL.Proxy("http://%s:%i/.wsdl"%(host,port))
-	def publish(self, message):
-		keys = []
-		values = []
-		for k,v in message.items():
-			keys.append(k)
-			values.append(v)
-		self.connection.publish(keys=keys, values=values)
-
-
-####################
-# Testing Code 
-####################
-
-class CustomSubscriber(SubscriberSoap):
-	def handle(self, message):
-		print "Custom Subscriber: '%s'" % str(message)
-
-class NullSubscriber(SubscriberSoap):
-	def handle(self, message):
-		pass
-
-
-if __name__ == '__main__':
-	try:
-		portnum = 1717
-
-		print "\ntesting message broker"
-		broker = MessageBrokerSoap(portnum)
-		proxy = MessageBrokerSoapProxy("localhost", portnum)
-		portnum = portnum + 1 
-
-		print "\ntesting log function"
-		proxy.log( {"message":"Hello World!"} )
-#		proxy.log("It looks like log works")
-
-		print "\ntesting subscriber proxy"
-		subscriber = SubscriberSoap(proxy, portnum)
-		portnum = portnum + 1
-
-		print "\ntesting custom subscriber"
-		csub = CustomSubscriber(proxy, portnum)
-		portnum = portnum + 1
-
-		print "\ntesting publish"
-		proxy.publish( {"message":"Hello World!"} )
-
-		print "\ntesting stop"
-		subscriber.stop()
-		proxy.publish( {"message":"Everybody here?"} )
-
-		print "\ntesting removeSubscriber"
-		proxy.removeSubscriber(csub)
-		proxy.publish( {"message":"Nobody home"} )
-		proxy.addSubscriber(csub)
-		proxy.publish( {"message":"You're back!"} )
-
-		print "\ntesting filter"
-		csub.setMatch( {"print":"yes"} )
-		proxy.publish( {"print":"yes", "message":"this should be printed"} )
-		proxy.publish( {"print":"no", "message":"this should NOT be printed"} )
-		csub.setMatch()
-
-		print "\ntesting publish performance"
-		proxy.removeSubscriber(csub)
-		nrmsg = 10000
-		tt = time.time()
-		for i in range(nrmsg):
-			proxy.publish( {"message":"msg %i"%i} )
-		tt = time.time() - tt
-		print "Published %i messages in %f seconds, %f msg/s"%(nrmsg,
-															   tt,
-															   nrmsg/tt)
-
-		print "\ntesting publish/subscribe performance"
-		nsub = NullSubscriber(proxy, portnum)
-		portnum = portnum + 1
-		nrmsg = 10000
-		tt = time.time()
-		for i in range(nrmsg):
-			proxy.publish( {"message":"msg %i"%i} )
-		tt = time.time() - tt
-		print "Published %i messages in %f seconds, %f msg/s"%(nrmsg,
-															   tt,
-															   nrmsg/tt)
-
-																   
-
-	except Exception, e:
-#		raise e
-		print  e
-		sys.exit(0)
-	sys.exit(0)
diff --git a/src/tashi/messaging/tashimessaging.py b/src/tashi/messaging/tashimessaging.py
deleted file mode 100644
index 006400f..0000000
--- a/src/tashi/messaging/tashimessaging.py
+++ /dev/null
@@ -1,148 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-from thriftmessaging import *
-import logging
-import Queue
-from ConfigParser import ConfigParser
-import time
-import socket
-import signal
-
-class TashiLogHandler(logging.Handler, PublisherThrift):
-	def __init__(self, config, *args, **kwargs):
-		self.messages = Queue.Queue()
-		self.config = config
-		logging.Handler.__init__(self, *args, **kwargs)
-		PublisherThrift.__init__(self, 
-								 config.get('MessageBroker', 'host'),
-								 int(config.get('MessageBroker', 'port')))
-	def emit(self, record):
-		# 'args', 'created', 'exc_info', 'exc_text', 'filename',
-		# 'funcName', 'getMessage', 'levelname', 'levelno', 'lineno',
-		# 'module', 'msecs', 'msg', 'name', 'pathname', 'process',
-		# 'relativeCreated', 'thread', 'threadName']
-		msg = {}
-		# args
-		# created
-		# exc_info
-		# exc_text
-		msg['log-filename'] = str(record.filename)
-		msg['log-funcname'] = str(record.funcName)
-		msg['log-levelname'] = str(record.levelname)
-		msg['log-level'] = str(record.levelno)
-		msg['log-lineno'] = str(record.lineno)
-		msg['log-module'] = str(record.module)
-		msg['log-msecs'] = str(record.msecs)
-		msg['log-message'] = str(record.msg)
-		msg['log-name'] = str(record.name)
-		msg['log-pathname'] = str(record.pathname)
-		msg['log-process'] = str(record.process)
-		# relativeCreated
-		msg['log-thread'] = str(record.thread)
-		msg['log-threadname'] = str(record.threadName)
-
-		# standard message fields
-		msg['timestamp'] = str(time.time())
-		msg['hostname'] = socket.gethostname()
-		msg['message-type'] = 'log'
-
-		self.messages.put(msg)
-		self.publish(msg)
-
-class TashiSubscriber(SubscriberThrift):
-	def __init__(self, config, port, **kwargs):
-		sys.stdout.flush()
-		brokerPort = int(config.get('MessageBroker', 'port'))
-		self.broker = MessageBrokerThriftProxy(config.get('MessageBroker', 'host'), brokerPort)
-		SubscriberThrift.__init__(self, self.broker, port, **kwargs)
-
-		
-
-##############################
-# Test Code
-##############################
-import unittest
-import sys
-
-class TestTashiSubscriber(TashiSubscriber):
-	def __init__(self, *args, **kwargs):
-		self.messageQueue = Queue.Queue()
-		TashiSubscriber.__init__(self, *args, **kwargs)
-	def handle(self, message):
-		self.messageQueue.put(message)
-
-
-def incrementor(start = 0):
-	while True:
-		a = start
-		start = start + 1
-		yield a
-increment = incrementor()
-
-class TestTashiMessaging(unittest.TestCase):
-	def setUp(self):
-		self.configFiles = [ '../../../etc/TestConfig.cfg']
-		self.config = ConfigParser()
-		self.configFiles = self.config.read(self.configFiles)
-		self.port = int(self.config.get('MessageBroker', 'port'))
-
-		try:
-			self.brokerPid = os.spawnlpe(os.P_NOWAIT, 'python', 'python', 
-										 './messageBroker.py', 
-										 '--port', str(self.port),
-										 os.environ)
-			self.port = self.port + 1
-			# FIXME: what's the best way to wait for the broker to be ready?
-			time.sleep(1)
-		except Exception, e:
-			sys.exit(0)
-		self.initialized = True
-		self.log = logging.getLogger('TestTashiMessaging')
-		self.handler = TashiLogHandler(self.config)
-		self.log.addHandler(self.handler)
-		self.sub = TestTashiSubscriber(self.config, int(self.port) + increment.next())
-	def tearDown(self):
-		os.kill(self.brokerPid, signal.SIGKILL)
-		# FIXME: wait for the port to be ready again
-		time.sleep(2)
-		self.log.removeHandler(self.handler)
-#		 self.sub.broker.removeSubscriber(self.sub)
-		pass
-	def testLog(self):
-		self.log.log(50, "Hello World!")
-		self.handler.messages.get(timeout=5)
-		self.sub.messageQueue.get(timeout=5)
-		self.assertEqual(self.handler.messages.qsize(), 0)
-		self.assertEqual(self.sub.messageQueue.qsize(), 0)
-	def testPublish(self):
-		sys.stdout.flush()
-		self.port = self.port + 1
-		self.handler.publish({'message':'hello world'})
-		self.sub.messageQueue.get(timeout=5)
-		self.assertEqual(self.sub.messageQueue.qsize(), 0)
-		
-
-if __name__=='__main__':
-
-
-#	 logging.basicConfig(level=logging.INFO,
-#						 format="%(asctime)s %(levelname)s:\t %(message)s",
-#						 stream=sys.stdout)
-
-	suite = unittest.TestLoader().loadTestsFromTestCase(TestTashiMessaging)
-	unittest.TextTestRunner(verbosity=2).run(suite)
diff --git a/src/tashi/messaging/threadpool.py b/src/tashi/messaging/threadpool.py
deleted file mode 100644
index 5684ef2..0000000
--- a/src/tashi/messaging/threadpool.py
+++ /dev/null
@@ -1,305 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-import threading
-import time
-import Queue
-import logging
-
-_log = logging.getLogger('tashi.messaging.threadpool')
-
-def threaded(func):
-	def fn(*args, **kwargs):
-		thread = threading.Thread(target=func, args=args, kwargs=kwargs)
-		thread.start()
-		return thread
-	return fn
-
-
-class ThreadPool(Queue.Queue):
-	def __init__(self, size=8, maxsize=0):
-		Queue.Queue.__init__(self, maxsize)
-		for i in range(size):
-			thread = threading.Thread(target=self._worker)
-			thread.setDaemon(True)
-			thread.start()
-	def _worker(self):
-		while True:
-			try:
-				func, args, kwargs = self.get()
-				func(*args, **kwargs)
-			except Exception, e:
-				_log.error(e)
-				# FIXME: do something smarter here, backtrace, log,
-				# allow user-defined error handling...
-				
-	def submit(self, func, *args, **kwargs):
-		self.put((func, args, kwargs))
-	def submitlist(self, func, args, kwargs):
-		self.put((func, args, kwargs))
-
-class ThreadPoolClass:
-	def __init__(self, size=8, maxsize=0):
-		self._threadpool_pool = ThreadPool(size=size, maxsize=maxsize)
-
-
-def threadpool(pool):
-	def dec(func):
-		def fn(*args, **kwargs):
-			pool.submit(func, *args, **kwargs)
-		return fn
-	return dec
-
-def threadpoolmethod(meth):
-	def fn(*args, **kwargs):
-		try:
-			pool = args[0]._threadpool_pool
-		except AttributeError:
-			pool = args[0].__dict__.setdefault('_threadpool_pool', ThreadPool())
-		# FIXME: how do we check parent class?
-#		assert args[0].__class__ == ThreadPoolClass, "Thread pool method must be in a ThreadPoolClass"
-		pool.submit(meth, *args, **kwargs)
-	return fn
-
-def synchronized(lock=None):
-	_log.debug('synchronized decorator factory called')
-	if lock==None:
-		lock = threading.RLock()
-	def dec(func):
-		_log.debug('synchronized decorator called')
-		def fn(*args, **kwargs):
-			_log.debug('getting sync lock')
-			lock.acquire()
-			_log.debug('got sync lock')
-			ex = None
-			try:
-				r = func(*args, **kwargs)
-			except Exception, e:
-				ex = e
-			_log.debug('releasing sync lock')
-			lock.release()
-			_log.debug('released sync lock')
-			if ex != None:
-				raise e
-			return r
-		return fn
-	return dec
-			
-def synchronizedmethod(func):
-	def fn(*args, **kwargs):
-		try:
-			lock = args[0]._synchronized_lock
-		except AttributeError:
-			lock = args[0].__dict__.setdefault('_synchronized_lock', threading.RLock())
-		lock.acquire()
-		ex = None
-		try:
-			func(*args, **kwargs)
-		except Exception, e:
-			ex = e
-		lock.release()
-		if ex != None:
-			raise e
-	return fn
-		
-
-##############################
-# Test Code
-##############################
-import unittest
-import sys
-import time
-
-class TestThreadPool(unittest.TestCase):
-	def setUp(self):
-		self.errmargin = 0.5
-
-	def testUnthreaded(self):
-		queue = Queue.Queue()
-		def slowfunc(sleep=1):
-			time.sleep(sleep)
-			queue.put(None)
-		tt = time.time()
-		for i in range(4):
-			slowfunc()
-		for i in range(4):
-			queue.get()
-		tt = time.time() - tt
-		self.assertAlmostEqual(tt, 4, 1) 
-
-	def testThreaded(self):
-		queue = Queue.Queue()
-		@threaded
-		def slowthreadfunc(sleep=1):
-			time.sleep(sleep)
-			queue.put(None)
-		tt = time.time()
-		for i in range(8):
-			slowthreadfunc()
-		for i in range(8):
-			queue.get()
-		tt = time.time() - tt
-		self.assertAlmostEqual(tt, 1, 1) 
-
-	def testThreadPool(self):
-		pool = ThreadPool(size=4)
-		queue = Queue.Queue()
-		@threadpool(pool)
-		def slowpoolfunc(sleep=1):
-			time.sleep(sleep)
-			queue.put(None)
-		tt = time.time()
-		for i in range(8):
-			slowpoolfunc()
-		for i in range(8):
-			queue.get()
-		tt = time.time() - tt
-		self.assertAlmostEqual(tt, 2, 1) 
-
-	def testUnthreadedMethod(self):
-		queue = Queue.Queue()
-		class slowclass:
-			def __init__(self, sleep=1):
-				self.sleep=sleep
-			def beslow(self):
-				time.sleep(self.sleep)
-				queue.put(None)
-		sc = slowclass()
-		tt = time.time()
-		for i in range(4):
-			sc.beslow()
-		for i in range(4):
-			queue.get()
-		tt = time.time() - tt
-		self.assertAlmostEqual(tt, 4, 1)
-	
-	def testThreadedMethod(self):
-		queue = Queue.Queue()
-		class slowclass:
-			def __init__(self, sleep=1):
-				self.sleep=sleep
-			@threaded
-			def beslow(self):
-				time.sleep(self.sleep)
-				queue.put(None)
-		sc = slowclass()
-		tt = time.time()
-		for i in range(4):
-			sc.beslow()
-		for i in range(4):
-			queue.get()
-		tt = time.time() - tt
-		self.assertAlmostEqual(tt, 1, 1)
-	
-	def testThreadPoolMethod(self):
-		queue = Queue.Queue()
-		class slowclass:
-			def __init__(self, sleep=1):
-				self.sleep=sleep
-			@threadpoolmethod
-			def beslow(self):
-				time.sleep(self.sleep)
-				queue.put(None)
-		sc = slowclass()
-		tt = time.time()
-		for i in range(16):
-			sc.beslow()
-		for i in range(16):
-			queue.get()
-		tt = time.time() - tt
-		self.assertAlmostEqual(tt, 2, 1)
-	
-	def testSynchronized(self):
-		queue = Queue.Queue()
-		@synchronized()
-		def addtoqueue():
-			time.sleep(1)
-			queue.put(None)
-		@threaded
-		def slowthreadfunc():
-			addtoqueue()
-		tt = time.time()
-		for i in range(4):
-			slowthreadfunc()
-		for i in range(4):
-			queue.get()
-		tt = time.time() - tt
-		self.assertAlmostEqual(tt, 4, 1) 
-
-	def testSynchronizedMethod(self):
-		queue = Queue.Queue()
-		class addtoqueue:
-			@synchronizedmethod
-			def addtoqueue1(self):
-				time.sleep(1)
-				queue.put(None)
-			@synchronizedmethod
-			def addtoqueue2(self):
-				time.sleep(1)
-				queue.put(None)
-		atc = addtoqueue()
-		@threaded
-		def slowthreadfunc1():
-			atc.addtoqueue1()
-		@threaded
-		def slowthreadfunc2():
-			atc.addtoqueue2()
-		tt = time.time()
-		for i in range(4):
-			slowthreadfunc1()
-			slowthreadfunc2()
-		for i in range(8):
-			queue.get()
-		tt = time.time() - tt
-		self.assertAlmostEqual(tt, 8, 1) 
-
-	def testUnsynchronizedMethod(self):
-		queue = Queue.Queue()
-		class addtoqueue:
-			def addtoqueue1(self):
-				time.sleep(1)
-				queue.put(None)
-			def addtoqueue2(self):
-				time.sleep(1)
-				queue.put(None)
-		atc = addtoqueue()
-		@threaded
-		def slowthreadfunc1():
-			atc.addtoqueue1()
-		@threaded
-		def slowthreadfunc2():
-			atc.addtoqueue2()
-		tt = time.time()
-		for i in range(4):
-			slowthreadfunc1()
-			slowthreadfunc2()
-		for i in range(8):
-			queue.get()
-		tt = time.time() - tt
-		self.assertAlmostEqual(tt, 1, 1) 
-
-
-
-if __name__=='__main__':
-	import sys
-
-	logging.basicConfig(level=logging.INFO,
-						format="%(asctime)s %(levelname)s:\t %(message)s",
-						stream=sys.stdout)
-
-	suite = unittest.TestLoader().loadTestsFromTestCase(TestThreadPool)
-	unittest.TextTestRunner(verbosity=2).run(suite)
diff --git a/src/tashi/messaging/thriftmessaging.py b/src/tashi/messaging/thriftmessaging.py
deleted file mode 100755
index 0c73ff0..0000000
--- a/src/tashi/messaging/thriftmessaging.py
+++ /dev/null
@@ -1,278 +0,0 @@
-#!/usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-import sys
-import time
-import socket
-import traceback
-import threading
-
-sys.path.append('./gen-py')
-import tashi.messaging.messagingthrift
-import tashi.messaging.messagingthrift.MessageBrokerThrift
-import tashi.messaging.messagingthrift.SubscriberThrift
-from tashi.messaging.messagingthrift.ttypes import *
-
-from thrift import Thrift
-from thrift.transport import TSocket
-from thrift.transport import TTransport
-from thrift.protocol import TBinaryProtocol
-from thrift.server import TServer
-
-from tashi import ConnectionManager
-
-from tashi.messaging.messaging import *
-from tashi.messaging.threadpool import ThreadPoolClass, threadpool, ThreadPool, threadpoolmethod, threaded
-
-class MessageBrokerThrift(MessageBroker):
-	def __init__(self, port, daemon=True):
-		MessageBroker.__init__(self)
-		self.processor = tashi.messaging.messagingthrift.MessageBrokerThrift.Processor(self)
-		self.transport = TSocket.TServerSocket(port)
-		self.tfactory = TTransport.TBufferedTransportFactory()
-		self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
-		self.proxy = ConnectionManager(tashi.messaging.messagingthrift.SubscriberThrift.Client, 0)
-		self.ready = threading.Event()
-#		 self.server = TServer.TSimpleServer(self.processor,
-#											 self.transport,
-#											 self.tfactory,
-#											 self.pfactory)
-#		 self.server = TServer.TThreadPoolServer(self.processor,
-#												 self.transport,
-#												 self.tfactory,
-#												 self.pfactory)
-		self.server = TServer.TThreadedServer(self.processor,
-												self.transport,
-												self.tfactory,
-												self.pfactory)
-		self.publishCalls = 0
-
-		def ssvrthrd():
-			try:
-				# FIXME: Race condition, the ready event should be set after
-				# starting the server.  However, server.serve()
-				# doesn't return under normal circumstances.  This
-				# seems to work in practice, even though it's clearly
-				# wrong.
-				self.ready.set()
-				self.server.serve()
-			except Exception, e:
-				print e
-				sys.stdout.flush()
-				pass
-		svt = threading.Thread(target=ssvrthrd)
-		svt.setDaemon(daemon)
-		svt.start()
-		self.ready.wait()
-	def log(self, message):
-		MessageBroker.log(self, message)
-	@synchronizedmethod
-	def addSubscriber(self, host, port):
-		subscribers = self.getSubscribers()
-		for sub in subscribers:
-			if sub.host == host and sub.port == port:
-				return
-		subscriber = SubscriberThriftProxy(host, port, self.proxy)
-		MessageBroker.addSubscriber(self, subscriber)
-	def removeSubscriber(self, host, port):
-		subscriber = None
-		subscribers = self.getSubscribers()
-		for sub in subscribers:
-			if sub.host == host and sub.port == port:
-				subscriber = sub
-		if subscriber != None:
-			MessageBroker.removeSubscriber(self, subscriber)
-	@synchronizedmethod
-	def publish(self, message):
-		self.publishCalls  = self.publishCalls + 1
-		sys.stdout.flush()
-		MessageBroker.publish(self, message)
-
-class MessageBrokerThriftProxy:
-	def __init__(self, host, port):
-		self.host = host
-		self.port = port
-		self.proxy = ConnectionManager(tashi.messaging.messagingthrift.MessageBrokerThrift.Client,port)
-	@synchronizedmethod
-	def log(self, message):
-		self.proxy[self.host, self.port].log(message)
-	@synchronizedmethod
-	def publish(self, message):
-		self.proxy[self.host, self.port].publish(message)
-	@synchronizedmethod
-	def publishList(self, messages):
-		self.proxy[self.host, self.port].publishList(messages)
-	@synchronizedmethod
-	def addSubscriber(self, subscriber):
-		self.proxy[self.host, self.port].addSubscriber(host=subscriber.host, port=subscriber.port)
-	@synchronizedmethod
-	def removeSubscriber(self, subscriber):
-		self.proxy[self.host, self.port].removeSubscriber(host=subscriber.host, port=subscriber.port)
-
-
-
-class SubscriberThrift(Subscriber, threading.Thread):
-	def __init__(self, broker, port, synchronized=False):
-		self.host = socket.gethostname()
-		self.port = port
-		self.processor = tashi.messaging.messagingthrift.SubscriberThrift.Processor(self)
-		self.transport = TSocket.TServerSocket(port)
-		self.tfactory = TTransport.TBufferedTransportFactory()
-		self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
-		self.server = TServer.TThreadedServer(self.processor,
-											  self.transport,
-											  self.tfactory,
-											  self.pfactory)
-		def ssvrthrd():
-			try:
-				self.server.serve()
-			except Exception, e:
-				pass
-
-
-		self.thread = threading.Thread(target=ssvrthrd)
-		self.thread.setDaemon(True)
-		self.thread.start()
-
-		# We have to call this AFTER initializing our server, so that
-		# the broker can contact us
-		# Wrap this in a try/catch because the broker may not be online yet
-		try:
-			Subscriber.__init__(self, broker,  synchronized=synchronized)		
-		except:
-			pass
-		threading.Thread.__init__(self)
-		self.setDaemon(True)
-		self.start()
-
-	def stop(self):
-#		 # FIXME: this is broken, there is no clear way to stop a
-#		 # Thrift server
-		self.broker.removeSubscriber(self)
-		self.transport.close()
-	def run(self):
-		while(True):
-			# renew subscription every 5 min
-			try:
-				self.broker.addSubscriber(self)
-			except:
-				pass
-			time.sleep(5*60)
-
-class SubscriberThriftProxy:
-	def __init__(self, host, port, proxy, aggregate = 100):
-		self.host = host
-		self.port = port
-		self.proxy = proxy
-		# for some reason, thrift clients are not thread-safe, lock during send
-		self.lock = threading.Lock()
-		self.pending = []
-		self.aggregateSize = aggregate
-	def publish(self, message):
-		self.lock.acquire()
-		sys.stdout.flush()
-		if message.has_key('aggregate') and message['aggregate'] == 'True':
-			self.pending.append(message)
-			if len(self.pending) >= self.aggregateSize:
-				try:
-					self.proxy[self.host, self.port].publishList(self.pending)
-				except Exception, e:
-					print e
-					self.lock.release()
-					raise e
-				self.pending = []
-		else:
-			try:
-				self.proxy[self.host, self.port].publish(message)
-			except Exception, e:
-				sys.stdout.flush()
-				print e
-				self.lock.release()
-				raise e
-		self.lock.release()
-
-class PublisherThrift(Publisher):
-	def __init__(self, host, port):
-		self.host = host
-		self.port = port
-		self.broker = MessageBrokerThriftProxy(host, port)
-		Publisher.__init__(self, self.broker)
-		
-####################
-# Testing Code 
-####################
-
-class TestSubscriberThrift(SubscriberThrift):
-	def __init__(self, *args, **kwargs):
-		self.queue = Queue.Queue()
-		SubscriberThrift.__init__(self, *args, **kwargs)
-	def handle(self, message):
-		self.queue.put(message)
-
-portnum = 1718
-class TestThriftMessaging(unittest.TestCase):
-	def setUp(self):
-		global portnum
-		self.broker = MessageBrokerThrift(portnum)
-		self.brokerPort = portnum
-		portnum = portnum + 1 
-		self.proxy = MessageBrokerThriftProxy('localhost', self.brokerPort)
-		self.publisher = PublisherThrift('localhost', self.brokerPort)
-		self.subscriber = TestSubscriberThrift(self.proxy, portnum)
-		portnum = portnum + 1
-	def tearDown(self):
-		pass
-	def testSetUp(self):
-		pass
-	def testPublish(self):
-		self.publisher.publish( {'message':'hello world'} )
-		self.subscriber.queue.get(True, timeout=5)
-		self.assertEqual(self.subscriber.queue.qsize(), 0)
-	def testPublishList(self):
-		nrmsgs = 10
-		msgs = []
-		for i in range(nrmsgs):
-			msgs.append( {'msgnum':str(i)} )
-		self.publisher.publishList( msgs )
-		for i in range(nrmsgs):
-			self.subscriber.queue.get(True, timeout=5)
-		self.assertEqual(self.subscriber.queue.qsize(), 0)
-	def testAggregate(self):
-		nrmsgs = self.publisher.aggregateSize
-		for i in range(nrmsgs):
-			self.assertEqual(self.subscriber.queue.qsize(), 0)
-			self.publisher.aggregate( {'msgnum':str(i)} )
-		for i in range(nrmsgs):
-			self.subscriber.queue.get(True, timeout=5)
-		self.assertEqual(self.subscriber.queue.qsize(), 0)
-	def testAggregateKeyword(self):
-		nrmsgs = self.publisher.aggregateSize
-		for i in range(nrmsgs):
-			self.assertEqual(self.subscriber.queue.qsize(), 0)
-			self.publisher.publish( {'msgnum':str(i), 'aggregate':'True'} )
-		for i in range(nrmsgs):
-			self.subscriber.queue.get(True, timeout=5)
-		self.assertEqual(self.subscriber.queue.qsize(), 0)
-
-
-if __name__=='__main__':
-	suite = unittest.TestLoader().loadTestsFromTestCase(TestThriftMessaging)
-	unittest.TextTestRunner(verbosity=2).run(suite)
-
-
diff --git a/src/tashi/nodemanager/nodemanager.py b/src/tashi/nodemanager/nodemanager.py
index 66d2d5b..2c39903 100755
--- a/src/tashi/nodemanager/nodemanager.py
+++ b/src/tashi/nodemanager/nodemanager.py
@@ -20,8 +20,10 @@
 import logging.config
 import signal
 import sys
+import os
+import time
 
-from tashi.util import instantiateImplementation, getConfig, debugConsole, signalHandler
+from tashi.util import instantiateImplementation, getConfig, debugConsole
 import tashi
 from tashi import boolean
 
@@ -29,12 +31,8 @@
 from rpyc.utils.server import ThreadedServer
 from rpyc.utils.authenticators import TlsliteVdbAuthenticator
 
-@signalHandler(signal.SIGTERM)
-def handleSIGTERM(signalNumber, stackFrame):
-	sys.exit(0)
-
 def main():
-	global config, dfs, vmm, service, server, log, notifier
+	global config, log
 	
 	(config, configFiles) = getConfig(["NodeManager"])
 	publisher = instantiateImplementation(config.get("NodeManager", "publisher"), config)
@@ -42,6 +40,35 @@
 	logging.config.fileConfig(configFiles)
 	log = logging.getLogger(__name__)
 	log.info('Using configuration file(s) %s' % configFiles)
+
+	# handle keyboard interrupts (http://code.activestate.com/recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/)
+	child = os.fork()
+	
+	if child == 0:
+		startNodeManager()
+		# shouldn't exit by itself
+		sys.exit(0)
+
+	else:
+		# main
+		try:
+			os.waitpid(child, 0)
+		except KeyboardInterrupt:
+			log.info("Exiting node manager after receiving a SIGINT signal")
+			os._exit(0)
+		except Exception:
+			log.exception("Abnormal termination of node manager")
+			os._exit(-1)
+
+		log.info("Exiting node manager after service thread exited")
+		os._exit(-1)
+
+	return
+
+def startNodeManager():
+	global config, dfs, vmm, service, server, log, notifier
+	publisher = instantiateImplementation(config.get("NodeManager", "publisher"), config)
+	tashi.publisher = publisher
 	dfs = instantiateImplementation(config.get("NodeManager", "dfs"), config)
 	vmm = instantiateImplementation(config.get("NodeManager", "vmm"), config, dfs, None)
 	service = instantiateImplementation(config.get("NodeManager", "service"), config, vmm)
@@ -59,14 +86,11 @@
 	t.service._type = 'NodeManagerService'
 
 	debugConsole(globals())
-	
-	try:
-		t.start()
-	except KeyboardInterrupt:
-		handleSIGTERM(signal.SIGTERM, None)
-	except Exception, e:
-		sys.stderr.write(str(e) + "\n")
-		sys.exit(-1)
+
+	t.start()
+	# shouldn't exit by itself
+	sys.exit(0)
+
 
 if __name__ == "__main__":
 	main()
diff --git a/src/tashi/nodemanager/nodemanagerservice.py b/src/tashi/nodemanager/nodemanagerservice.py
index c493ac9..a1037e7 100755
--- a/src/tashi/nodemanager/nodemanagerservice.py
+++ b/src/tashi/nodemanager/nodemanagerservice.py
@@ -5,15 +5,15 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
-# under the License.    
+# under the License.
 
 import logging
 import socket
@@ -28,10 +28,10 @@
 
 class NodeManagerService(object):
 	"""RPC handler for the NodeManager
-	   
-	   Perhaps in the future I can hide the dfs from the 
+
+	   Perhaps in the future I can hide the dfs from the
 	   VmControlInterface and do all dfs operations here?"""
-	
+
 	def __init__(self, config, vmm):
 		self.config = config
 		self.vmm = vmm
@@ -76,6 +76,8 @@
 
 		self.__registerHost()
 
+		# XXXstroucki: should make an effort to retry
+		# otherwise vmm will wait forever
 		self.id = self.cm.registerNodeManager(self.host, self.instances.values())
 
 		# XXXstroucki cut cross check for NM/VMM state
@@ -83,18 +85,18 @@
 		# start service threads
 		threading.Thread(target=self.__registerWithClusterManager).start()
 		threading.Thread(target=self.__statsThread).start()
-	
+
 	def __initAccounting(self):
-                self.accountBuffer = []
-                self.accountLines = 0
-                self.accountingClient = None
-                try:
-                        if (self.accountingHost is not None) and \
-                                    (self.accountingPort is not None):
-                                self.accountingClient=rpycservices.client(self.accountingHost, self.accountingPort)
-                except:
-                        self.log.exception("Could not init accounting")
-			
+		self.accountBuffer = []
+		self.accountLines = 0
+		self.accountingClient = None
+		try:
+			if (self.accountingHost is not None) and \
+						(self.accountingPort is not None):
+				self.accountingClient = ConnectionManager(self.username, self.password, self.accountingPort)[self.accountingHost]
+		except:
+			self.log.exception("Could not init accounting")
+
 	def __loadVmInfo(self):
 		try:
 			self.instances = self.vmm.getInstances()
@@ -112,6 +114,8 @@
 			notifyCM = []
 			try:
 				while (len(self.notifyCM) > 0):
+					# XXXstroucki ValueError: need more than 1 value to unpack
+					# observed here. How?
 					value = self.notifyCM.pop(0)
 					(instanceId, newInst, old, success) = value
 					try:
@@ -135,7 +139,7 @@
 		#if (toSleep > 0):
 			#time.sleep(toSleep)
 
-        def __ACCOUNTFLUSH(self):
+	def __ACCOUNTFLUSH(self):
 		try:
 			if (self.accountingClient is not None):
 				self.accountingClient.record(self.accountBuffer)
@@ -145,33 +149,33 @@
 			self.log.exception("Failed to flush accounting data")
 
 
-        def __ACCOUNT(self, text, instance=None, host=None):
-                now = time.time()
-                instanceText = None
-                hostText = None
+	def __ACCOUNT(self, text, instance=None, host=None):
+		now = time.time()
+		instanceText = None
+		hostText = None
 
-                if instance is not None:
+		if instance is not None:
 			try:
-                        	instanceText = 'Instance(%s)' % (instance)
+				instanceText = 'Instance(%s)' % (instance)
 			except:
 				self.log.exception("Invalid instance data")
 
-                if host is not None:
+		if host is not None:
 			try:
-                        	hostText = "Host(%s)" % (host)
+				hostText = "Host(%s)" % (host)
 			except:
 				self.log.exception("Invalid host data")
 
-                secondary = ','.join(filter(None, (hostText, instanceText)))
+		secondary = ','.join(filter(None, (hostText, instanceText)))
 
-                line = "%s|%s|%s" % (now, text, secondary)
+		line = "%s|%s|%s" % (now, text, secondary)
 
-                self.accountBuffer.append(line)
-                self.accountLines += 1
+		self.accountBuffer.append(line)
+		self.accountLines += 1
 
 		# XXXstroucki think about force flush every so often
-                if (self.accountLines > 0):
-                        self.__ACCOUNTFLUSH()
+		if (self.accountLines > 0):
+			self.__ACCOUNTFLUSH()
 
 
 	# service thread function
@@ -213,14 +217,14 @@
 				self.log.exception('statsThread threw an exception')
 			time.sleep(self.statsInterval)
 
-        def __registerHost(self):
-                hostname = socket.gethostname()
+	def __registerHost(self):
+		hostname = socket.gethostname()
 		# populate some defaults
 		# XXXstroucki: I think it's better if the nodemanager fills these in properly when registering with the clustermanager
 		memory = 0
 		cores = 0
 		version = "empty"
-                #self.cm.registerHost(hostname, memory, cores, version)
+		#self.cm.registerHost(hostname, memory, cores, version)
 
 	def __getInstance(self, vmId):
 		instance = self.instances.get(vmId, None)
@@ -235,7 +239,7 @@
 
 
 		raise TashiException(d={'errno':Errors.NoSuchVmId,'msg':"There is no vmId %d on this host" % (vmId)})
-	
+
 	# remote
 	# Called from VMM to update self.instances
 	# but only changes are Exited, MigrateTrans and Running
@@ -252,11 +256,11 @@
 			# make a note of mismatch, but go on.
 			# the VMM should know best
 			self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
-                        
+
 		instance.state = cur
 
 		self.__ACCOUNT("NM VM STATE CHANGE", instance=instance)
-			      
+
 		newInst = Instance(d={'state':cur})
 		success = lambda: None
 		# send the state change up to the CM
@@ -278,8 +282,8 @@
 	def createInstance(self, instance):
 		vmId = instance.vmId
 		self.instances[vmId] = instance
-		
-	
+
+
 	# remote
 	def instantiateVm(self, instance):
 		self.__ACCOUNT("NM VM INSTANTIATE", instance=instance)
@@ -291,7 +295,7 @@
 			return vmId
 		except:
 			self.log.exception("Failed to start instance")
-	
+
 	# remote
 	def suspendVm(self, vmId, destination):
 		instance = self.__getInstance(vmId)
@@ -300,10 +304,12 @@
 		instance.state = InstanceState.Suspending
 		self.instances[vmId] = instance
 		threading.Thread(target=self.vmm.suspendVm, args=(vmId, destination)).start()
-	
+
 	# called by resumeVm as thread
 	def __resumeVmHelper(self, instance, name):
 		self.vmm.resumeVmHelper(instance, name)
+		# XXXstroucki should the VMM be responsible for setting
+		# state? It should know better.
 		instance.state = InstanceState.Running
 		newInstance = Instance(d={'id':instance.id,'state':instance.state})
 		success = lambda: None
@@ -323,7 +329,7 @@
 			self.log.exception('resumeVm failed')
 			raise TashiException(d={'errno':Errors.UnableToResume,'msg':"resumeVm failed on the node manager"})
 		return instance.vmId
-	
+
 	# remote
 	def prepReceiveVm(self, instance, source):
 		self.__ACCOUNT("NM VM MIGRATE RECEIVE PREP")
@@ -353,7 +359,7 @@
 		self.instances[vmId] = instance
 		threading.Thread(target=self.__migrateVmHelper, args=(instance, target, transportCookie)).start()
 		return
-	
+
 	# called by receiveVm as thread
 	# XXXstroucki migrate in?
 	def __receiveVmHelper(self, instance, transportCookie):
@@ -429,4 +435,3 @@
 	# remote
 	def liveCheck(self):
 		return "alive"
-	
diff --git a/src/tashi/nodemanager/vmcontrol/qemu.py b/src/tashi/nodemanager/vmcontrol/qemu.py
index 708d70d..d294fc7 100644
--- a/src/tashi/nodemanager/vmcontrol/qemu.py
+++ b/src/tashi/nodemanager/vmcontrol/qemu.py
@@ -132,9 +132,7 @@
 	def __getHostPids(self):
 		"""Utility function to get a list of system PIDs that match the QEMU_BIN specified (/proc/nnn/exe)"""
 		pids = []
-		real_bin = self.QEMU_BIN
-		while os.path.islink(real_bin):
-			real_bin = os.readlink(self.QEMU_BIN)
+		real_bin = os.path.realpath(self.QEMU_BIN)
 
 		for f in os.listdir("/proc"):
 			try:
@@ -210,7 +208,7 @@
 					if self.scratchVg is not None:
 						log.info("Removing any scratch for %s" % (name))
 						cmd = "/sbin/lvremove --quiet -f %s" % self.scratchVg
-    						result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE, stderr=open(os.devnull, "w"), close_fds=True).wait()
+						result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE, stderr=open(os.devnull, "w"), close_fds=True).wait()
 				except:
 					log.warning("Problem cleaning scratch volumes")
 					pass
@@ -323,12 +321,12 @@
 				#print "[NEE]: %s" % (needle)
 				(rlist, wlist, xlist) = select.select([monitorFd], [], [], timeout)
 				if (len(rlist) == 0):
-					log.error("Timeout getting results from monitor for vmId %d" % (child.pid))
+					log.error("Timeout getting results from monitor on FD %s for vmId %d" % (monitorFd, child.pid))
 					child.errorBit = True
 					raise RuntimeError
 				c = os.read(monitorFd, 1)
 				if (c == ""):
-					log.error("Early termination on monitor for vmId %d" % (child.pid))
+					log.error("Early termination on monitor FD %s for vmId %d" % (monitorFd, child.pid))
 					child.errorBit = True
 					raise RuntimeError
 				buf = buf + c
@@ -504,8 +502,14 @@
 		nicModel = self.__stripSpace(nicModel)
 
 		nicString = ""
+		nicNetworks = {}
 		for i in range(0, len(instance.nics)):
+			# Don't allow more than one interface per vlan
 			nic = instance.nics[i]
+			if nicNetworks.has_key(nic.network):
+				continue
+			nicNetworks[nic.network] = True
+
 			nicString = nicString + "-net nic,macaddr=%s,model=%s,vlan=%d -net tap,ifname=%s%d.%d,vlan=%d,script=/etc/qemu-ifup.%d " % (nic.mac, nicModel, nic.network, self.ifPrefix, instance.id, i, nic.network, nic.network)
 
 		#  ACPI
@@ -645,7 +649,8 @@
 	
 	# extern
 	def resumeVmHelper(self, instance, source):
-		child = self.__getChildFromPid(instance.vmId)
+		vmId = instance.vmId
+		child = self.__getChildFromPid(vmId)
 		try:
 			self.__getPtyInfo(child, True)
 		except RuntimeError:
@@ -654,8 +659,13 @@
 			raise
 		status = "paused"
 		while ("running" not in status):
-			status = self.__enterCommand(child, "info status")
-			time.sleep(1)
+			try:
+				status = self.__enterCommand(child, "info status")
+			except RuntimeError:
+				pass
+			time.sleep(60)
+
+		self.nm.vmStateChange(vmId, None, InstanceState.Running)
 		child.instance.state = InstanceState.Running
 		self.__saveChildInfo(child)
 	
@@ -846,11 +856,63 @@
 	def listVms(self):
 		return self.controlledVMs.keys()
 
+	def __processVmStats(self, vmId):
+		try:
+			f = open("/proc/%d/stat" % (vmId))
+			procData = f.read()
+			f.close()
+		except:
+			log.warning("Unable to get data for instance %d" % vmId)
+			return
+
+		ws = procData.strip().split()
+		userTicks = float(ws[13])
+		sysTicks = float(ws[14])
+		myTicks = userTicks + sysTicks
+		vsize = (int(ws[22]))/1024.0/1024.0
+		rss = (int(ws[23])*4096)/1024.0/1024.0
+		cpuSeconds = myTicks/self.ticksPerSecond
+		# XXXstroucki be more exact here?
+		last = time.time() - self.statsInterval
+		lastCpuSeconds = self.cpuStats.get(vmId, cpuSeconds)
+		if lastCpuSeconds is None:
+			lastCpuSeconds = cpuSeconds
+		cpuLoad = (cpuSeconds - lastCpuSeconds)/(time.time() - last)
+		self.cpuStats[vmId] = cpuSeconds
+		try:
+			child = self.controlledVMs[vmId]
+		except:
+			log.warning("Unable to obtain information on instance %d" % vmId)
+			return
+
+		(recvMBs, sendMBs, recvBytes, sendBytes) = (0.0, 0.0, 0.0, 0.0)
+		for i in range(0, len(child.instance.nics)):
+			netDev = "%s%d.%d" % (self.ifPrefix, child.instance.id, i)
+			(tmpRecvMBs, tmpSendMBs, tmpRecvBytes, tmpSendBytes) = self.netStats.get(netDev, (0.0, 0.0, 0.0, 0.0))
+			(recvMBs, sendMBs, recvBytes, sendBytes) = (recvMBs + tmpRecvMBs, sendMBs + tmpSendMBs, recvBytes + tmpRecvBytes, sendBytes + tmpSendBytes)
+		self.stats[vmId] = self.stats.get(vmId, {})
+		child = self.controlledVMs.get(vmId, None)
+		if (child):
+			res = self.__enterCommand(child, "info blockstats")
+			for l in res.split("\n"):
+				(device, sep, data) = stringPartition(l, ": ")
+				if (data != ""):
+					for field in data.split(" "):
+						(label, sep, val) = stringPartition(field, "=")
+						if (val != ""):
+							self.stats[vmId]['%s_%s_per_s' % (device, label)] = (float(val) - float(self.stats[vmId].get('%s_%s' % (device, label), 0)))/self.statsInterval
+							self.stats[vmId]['%s_%s' % (device, label)] = int(val)
+		self.stats[vmId]['cpuLoad'] = cpuLoad
+		self.stats[vmId]['rss'] = rss
+		self.stats[vmId]['vsize'] = vsize
+		self.stats[vmId]['recvMBs'] = sendMBs
+		self.stats[vmId]['sendMBs'] = recvMBs
+
 	# thread
 	def statsThread(self):
-		ticksPerSecond = float(os.sysconf('SC_CLK_TCK'))
-		netStats = {}
-		cpuStats = {}
+		self.ticksPerSecond = float(os.sysconf('SC_CLK_TCK'))
+		self.netStats = {}
+		self.cpuStats = {}
 		# XXXstroucki be more exact here?
 		last = time.time() - self.statsInterval
 		while True:
@@ -866,7 +928,7 @@
 						ws = ld.split()
 						recvBytes = float(ws[0])
 						sendBytes = float(ws[8])
-						(recvMBs, sendMBs, lastRecvBytes, lastSendBytes) = netStats.get(dev, (0.0, 0.0, recvBytes, sendBytes))
+						(recvMBs, sendMBs, lastRecvBytes, lastSendBytes) = self.netStats.get(dev, (0.0, 0.0, recvBytes, sendBytes))
 						if (recvBytes < lastRecvBytes):
 							# We seem to have overflowed
 							# XXXstroucki How likely is this to happen?
@@ -882,44 +944,12 @@
 								lastSendBytes = lastSendBytes - 2**32
 						recvMBs = (recvBytes-lastRecvBytes)/(now-last)/1024.0/1024.0
 						sendMBs = (sendBytes-lastSendBytes)/(now-last)/1024.0/1024.0
-						netStats[dev] = (recvMBs, sendMBs, recvBytes, sendBytes)
+						self.netStats[dev] = (recvMBs, sendMBs, recvBytes, sendBytes)
+
+
 				for vmId in self.controlledVMs:
-					f = open("/proc/%d/stat" % (vmId))
-					procData = f.read()
-					f.close()
-					ws = procData.strip().split()
-					userTicks = float(ws[13])
-					sysTicks = float(ws[14])
-					myTicks = userTicks + sysTicks
-					vsize = (int(ws[22]))/1024.0/1024.0
-					rss = (int(ws[23])*4096)/1024.0/1024.0
-					cpuSeconds = myTicks/ticksPerSecond
-					lastCpuSeconds = cpuStats.get(vmId, cpuSeconds)
-					cpuLoad = (cpuSeconds - lastCpuSeconds)/(now - last)
-					cpuStats[vmId] = cpuSeconds
-					child = self.controlledVMs[vmId]
-					(recvMBs, sendMBs, recvBytes, sendBytes) = (0.0, 0.0, 0.0, 0.0)
-					for i in range(0, len(child.instance.nics)):
-						netDev = "%s%d.%d" % (self.ifPrefix, child.instance.id, i)
-						(tmpRecvMBs, tmpSendMBs, tmpRecvBytes, tmpSendBytes) = netStats.get(netDev, (0.0, 0.0, 0.0, 0.0))
-						(recvMBs, sendMBs, recvBytes, sendBytes) = (recvMBs + tmpRecvMBs, sendMBs + tmpSendMBs, recvBytes + tmpRecvBytes, sendBytes + tmpSendBytes)
-					self.stats[vmId] = self.stats.get(vmId, {})
-					child = self.controlledVMs.get(vmId, None)
-					if (child):
-						res = self.__enterCommand(child, "info blockstats")
-						for l in res.split("\n"):
-							(device, sep, data) = stringPartition(l, ": ")
-							if (data != ""):
-								for field in data.split(" "):
-									(label, sep, val) = stringPartition(field, "=")
-									if (val != ""):
-										self.stats[vmId]['%s_%s_per_s' % (device, label)] = (float(val) - float(self.stats[vmId].get('%s_%s' % (device, label), 0)))/self.statsInterval
-										self.stats[vmId]['%s_%s' % (device, label)] = int(val)
-					self.stats[vmId]['cpuLoad'] = cpuLoad
-					self.stats[vmId]['rss'] = rss
-					self.stats[vmId]['vsize'] = vsize
-					self.stats[vmId]['recvMBs'] = sendMBs
-					self.stats[vmId]['sendMBs'] = recvMBs
+					self.__processVmStats(vmId)
+
 			except:
 				log.exception("statsThread threw an exception")
 			last = now
diff --git a/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py b/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py
index cd4fde8..19447f4 100644
--- a/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py
+++ b/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py
@@ -28,8 +28,8 @@
 		self.dfs = dfs
 		self.nm = nm
 
-        def getInstances(self):
-                """Will return a dict of instances by vmId to the caller"""
+	def getInstances(self):
+		"""Will return a dict of instances by vmId to the caller"""
 		raise NotImplementedError
 	
 	def instantiateVm(self, instance):
diff --git a/src/tashi/nodemanager/vmcontrol/xenpv.py b/src/tashi/nodemanager/vmcontrol/xenpv.py
index 8bf4a29..9401df3 100644
--- a/src/tashi/nodemanager/vmcontrol/xenpv.py
+++ b/src/tashi/nodemanager/vmcontrol/xenpv.py
@@ -28,7 +28,7 @@
 from tashi.rpycservices.rpyctypes import Errors, InstanceState, TashiException
 from tashi.rpycservices.rpyctypes import Instance, Host
 from tashi import boolean, convertExceptions, ConnectionManager, version
-from tashi.util import isolatedRPC, broken
+from tashi.util import broken
 
 import tashi.parallel
 from tashi.parallel import synchronized, synchronizedmethod
@@ -168,7 +168,7 @@
 		vmType = hints.get('vmtype', self.defaultVmType)
 		print 'starting vm with type: ', vmType
 
-                disk0 = 'tap:%s' % self.disktype
+		disk0 = 'tap:%s' % self.disktype
 		diskU = 'xvda1'
 
 		try:
@@ -313,10 +313,10 @@
 	@synchronizedmethod
 	def instantiateVm(self, instance):
 
-                try:
-                   disktype = self.config.get('XenPV', 'defaultDiskType')
-                except:
-                   disktype = 'vhd'
+		try:
+			disktype = self.config.get('XenPV', 'defaultDiskType')
+		except:
+			disktype = 'vhd'
 
 		# FIXME: this is NOT the right way to get out hostId
 		self.hostId = instance.hostId
@@ -346,6 +346,8 @@
 				instance.disks[i].local = newdisk
 
 
+		# XXXstroucki if ever supporting multiple nics,
+		# ensure more than one isn't put on the same network.
 		fn = self.createXenConfig(name, 
 					  instance.disks[0].local, 
 					  instance.nics[0].mac, 
diff --git a/src/tashi/thrift/build.py b/src/tashi/thrift/build.py
deleted file mode 100755
index 42b22fa..0000000
--- a/src/tashi/thrift/build.py
+++ /dev/null
@@ -1,56 +0,0 @@
-#!/usr/bin/python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-import shutil
-import os
-from os import path
-import re
-
-if __name__ == '__main__':
-	if (path.exists('gen-py')):
-		print 'Removing \'gen-py\' directory...'
-		shutil.rmtree('gen-py')
-		
-	if (path.exists('../services')):
-		print 'Removing \'../services\' directory...'
-		shutil.rmtree('../services')
-	
-	if (path.exists('../messaging/messagingthrift')):
-		print 'Removing \'../messaging/messagingthrift\' directory...'
-		shutil.rmtree('../messaging/messagingthrift')
-	
-	print 'Generating Python code for \'services.thrift\'...'
-	os.system('thrift --gen py:new_style services.thrift')
-	
-	print 'Copying generated code to \'tashi.services\' package...'
-	shutil.copytree('gen-py/services', '../services')
-	
-	print 'Generatign Python code for \'messagingthrift\'...'
-	os.system('rm -rf gen-py')
-	os.system('thrift --gen py messagingthrift.thrift')
-	
-	print 'Copying generated code to \'tashi.messaging.messagingthrift\' package...'
-	shutil.copytree(os.path.join('gen-py', 'messagingthrift'),
-			os.path.join('..', 'messaging', 'messagingthrift'))
-
-	print 'Generating Python code for \'layoutlocality.thrift\'...'
-	os.system('thrift --gen py:new_style layoutlocality.thrift')
- 
-	print 'Copying generated code to \'tashi.services\' package...'
-	shutil.copytree('gen-py/layoutlocality', '../services/layoutlocality')
diff --git a/src/tashi/thrift/layoutlocality.thrift b/src/tashi/thrift/layoutlocality.thrift
deleted file mode 100644
index e14910c..0000000
--- a/src/tashi/thrift/layoutlocality.thrift
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-struct BlockLocation {
-       list<string> hosts,           // hostnames of data nodes
-       list<i32> ports,              // ports for data nodes
-       list<string> names,           // hostname:port of data nodes
-       i64 blocknum,
-       i64 offset,
-       i64 length
-}
-
-struct Pathname {
-       string pathname
-}
-
-exception FileNotFoundException {
-       string message
-}
-
-service layoutservice {
-       list <BlockLocation> getFileBlockLocations(1:Pathname path, 2:i64 offset, 3:i64 length)
-                            throws (1:FileNotFoundException ouch),
-}
-
-service localityservice {
-       list <list<double>> getHopCountMatrix(1:list<string> sourceHosts, 2:list<string> destHosts),
-}
diff --git a/src/tashi/thrift/messagingthrift.thrift b/src/tashi/thrift/messagingthrift.thrift
deleted file mode 100644
index 401e9a1..0000000
--- a/src/tashi/thrift/messagingthrift.thrift
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-typedef map<string, string> strstrmap
-
-service SubscriberThrift{
-  # the async keyword seems to slow things down in the simple
-  # tests.  However, with non-trivial subscribers it will be 
-  # necessary to use async here.
-  async void publish(strstrmap message)
-  async void publishList(list<strstrmap> messages)
-}
-
-service MessageBrokerThrift{
-  void log(strstrmap message),
-  void addSubscriber(string host, i16 port)
-  void removeSubscriber(string host, i16 port)
-  async void publish(strstrmap message)
-  async void publishList(list<strstrmap> messages)
-
-}
-
diff --git a/src/tashi/thrift/services.thrift b/src/tashi/thrift/services.thrift
deleted file mode 100644
index fa29c30..0000000
--- a/src/tashi/thrift/services.thrift
+++ /dev/null
@@ -1,166 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-enum Errors {
-	ConvertedException = 1,
-	NoSuchInstanceId = 2,
-	NoSuchVmId = 3,
-	IncorrectVmState = 4,
-	NoSuchHost = 5,
-	NoSuchHostId = 6,
-	InstanceIdAlreadyExists = 7,
-	HostNameMismatch = 8,
-	HostNotUp = 9,
-	HostStateError = 10,
-	InvalidInstance = 11,
-	UnableToResume = 12,
-	UnableToSuspend = 13,
-}
-
-enum InstanceState {
-	Pending = 1,		// Job submitted
-	Activating = 2,		// activateVm has been called, but instantiateVm hasn't finished yet
-	Running = 3,		// Normal state
-	Pausing = 4,		// Beginning pause sequence
-	Paused = 5		// Paused
-	Unpausing = 6,		// Beginning unpause sequence
-	Suspending = 7,		// Beginning suspend sequence
-	Resuming = 8,		// Beginning resume sequence
-	MigratePrep = 9,	// Migrate state #1
-	MigrateTrans = 10,	// Migrate state #2
-	ShuttingDown = 11,	// Beginning exit sequence
-	Destroying = 12,	// Beginning exit sequence
-	Orphaned = 13,		// Host is missing
-	Held = 14,		// Activation failed
-	Exited = 15,		// VM has exited
-	Suspended = 16,		// VM is suspended
-}
-
-enum HostState {
-	Normal = 1,
-	Drained = 2,
-	VersionMismatch = 3
-}
-
-exception TashiException {
-	1: Errors errno
-	2: string msg
-}
-
-struct Host {
-	1:i32 id,
-	2:string name,
-	3:bool up,
-	4:bool decayed,
-	5:HostState state,
-	6:i32 memory,
-	7:i32 cores,
-	8:string version
-	// Other properties (disk?)
-}
-
-struct Network {
-	1:i32 id
-	2:string name
-}
-
-struct User {
-	1:i32 id,
-	2:string name
-}
-
-struct DiskConfiguration {
-	1:string uri,
-	2:bool persistent
-}
-
-struct NetworkConfiguration {
-	1:i32 network,
-	2:string mac,
-	3:string ip
-}
-
-struct Instance {
-	1:i32 id,
-	2:i32 vmId,
-	3:i32 hostId,
-	4:bool decayed,
-	5:InstanceState state,
-	6:i32 userId,
-	7:string name, // User specified
-	8:i32 cores, // User specified
-	9:i32 memory, // User specified
-	10:list<DiskConfiguration> disks, // User specified
-	11:list<NetworkConfiguration> nics // User specified
-	12:map<string, string> hints // User specified
-}
-
-service clustermanagerservice {
-	// Client-facing RPCs
-	Instance createVm(1:Instance instance) throws (1:TashiException e)
-	
-	void shutdownVm(1:i32 instanceId) throws (1:TashiException e)
-	void destroyVm(1:i32 instanceId) throws (1:TashiException e)
-	
-	void suspendVm(1:i32 instanceId) throws (1:TashiException e)
-	Instance resumeVm(1:i32 instanceId) throws (1:TashiException e)
-	
-	void migrateVm(1:i32 instanceId, 2:i32 targetHostId) throws (1:TashiException e)
-	
-	void pauseVm(1:i32 instanceId) throws (1:TashiException e)
-	void unpauseVm(1:i32 instanceId) throws (1:TashiException e)
-	
-	list<Host> getHosts() throws (1:TashiException e)
-	list<Network> getNetworks() throws (1:TashiException e)
-	list<User> getUsers() throws (1:TashiException e)
-
-	list<Instance> getInstances() throws (1:TashiException e)
-	
-	string vmmSpecificCall(1:i32 instanceId, 2:string arg) throws (1:TashiException e)
-	
-	// NodeManager-facing RPCs
-	i32 registerNodeManager(1:Host host, 2:list<Instance> instances) throws (1:TashiException e)
-	void vmUpdate(1:i32 instanceId, 2:Instance instance, 3:InstanceState old) throws (1:TashiException e)
-
-	// Agent-facing RPCs
-	void activateVm(1:i32 instanceId, 2:Host host) throws (1:TashiException e)
-}
-
-service nodemanagerservice {
-	// ClusterManager-facing RPCs
-	i32 instantiateVm(1:Instance instance) throws (1:TashiException e)
-	
-	void shutdownVm(1:i32 vmId) throws (1:TashiException e)
-	void destroyVm(1:i32 vmId) throws (1:TashiException e)
-	
-	void suspendVm(1:i32 vmId, 2:string destination) throws (1:TashiException e)
-	i32 resumeVm(1:Instance instance, 2:string source) throws (1:TashiException e)
-	
-	string prepReceiveVm(1:Instance instance, 2:Host source) throws (1:TashiException e)
-	void migrateVm(1:i32 vmId, 2:Host target, 3:string transportCookie) throws (1:TashiException e)
-	void receiveVm(1:Instance instance, 2:string transportCookie) throws (1:TashiException e)
-	
-	void pauseVm(1:i32 vmId) throws (1:TashiException e)
-	void unpauseVm(1:i32 vmId) throws (1:TashiException e)
-
-	Instance getVmInfo(1:i32 vmId) throws (1:TashiException e)
-	list<i32> listVms() throws (1:TashiException e)
-	
-	string vmmSpecificCall(1:i32 vmId, 2:string arg) throws (1:TashiException e)
-	
-	// Host getHostInfo() throws (1:TashiException e)
-}
diff --git a/src/tashi/util.py b/src/tashi/util.py
index 4eb0981..61cca4b 100644
--- a/src/tashi/util.py
+++ b/src/tashi/util.py
@@ -27,6 +27,7 @@
 import traceback
 import types
 import getpass
+import functools
 
 from tashi.rpycservices import rpycservices
 from tashi.rpycservices.rpyctypes import TashiException, Errors, InstanceState, HostState
@@ -148,14 +149,6 @@
 	def __delattr__(self, name):
 		return delattr(self.__dict__['__real_obj__'], name)
 
-def isolatedRPC(client, method, *args, **kw):
-	"""Opens and closes a thrift transport for a single RPC call"""
-	if (not client._iprot.trans.isOpen()):
-		client._iprot.trans.open()
-	res = getattr(client, method)(*args, **kw)
-	client._iprot.trans.close()
-	return res
-
 def signalHandler(signalNumber):
 	"""Used to denote a particular function as the signal handler for a 
 	   specific signal"""
@@ -192,7 +185,7 @@
 
 def convertExceptions(oldFunc):
 	"""This converts any exception type into a TashiException so that 
-	   it can be passed over a Thrift RPC"""
+	   it can be passed over an RPC"""
 	def newFunc(*args, **kw):
 		try:
 			return oldFunc(*args, **kw)
@@ -218,20 +211,33 @@
 		raise Exception("No config file could be found: %s" % (str(allLocations)))
 	return (config, configFiles)
 
+def __getShellFn():
+	try:
+		from IPython.Shell import IPShellEmbed
+		return (1, IPShellEmbed)
+	except ImportError:
+		import IPython
+		return (2, IPython.embed)
+
 def debugConsole(globalDict):
 	"""A debugging console that optionally uses pysh"""
 	def realDebugConsole(globalDict):
+		import os
 		try :
 			import atexit
-			from IPython.Shell import IPShellEmbed
+			(calltype, shellfn) = __getShellFn()
 			def resetConsole():
 # XXXpipe: make input window sane
 				(stdin, stdout) = os.popen2("reset")
 				stdout.read()
-			dbgshell = IPShellEmbed()
 			atexit.register(resetConsole)
-			dbgshell(local_ns=globalDict, global_ns=globalDict)
-		except Exception:
+			if calltype == 1:
+				dbgshell=shellfn(user_ns=globalDict)
+				dbgshell()
+			elif calltype == 2:
+				dbgshell=shellfn
+				dbgshell(user_ns=globalDict)
+		except Exception, e:
 			CONSOLE_TEXT=">>> "
 			input = " " 
 			while (input != ""):
@@ -241,6 +247,9 @@
 					exec(input) in globalDict
 				except Exception, e:
 					sys.stdout.write(str(e) + "\n")
+
+		os._exit(0)
+
 	if (os.getenv("DEBUG", "0") == "1"):
 		threading.Thread(target=lambda: realDebugConsole(globalDict)).start()
 
@@ -260,6 +269,68 @@
 			ns = ns + c
 	return ns
 
+class Connection:
+	def __init__(self, host, port, authAndEncrypt=False, credentials=None):
+		self.host = host
+		self.port = port
+		self.credentials = credentials
+		self.authAndEncrypt = authAndEncrypt
+		self.connection = None
+		# XXXstroucki some thing may still depend on this (client)
+		self.username = None
+		if credentials is not None:
+			self.username = credentials[0]
+
+	def __connect(self):
+		# create new connection
+
+		username = None
+		password = None
+
+		if self.credentials is not None:
+			username = self.credentials[0]
+			password = self.credentials[1]
+
+		if self.authAndEncrypt:
+			if username is None:
+				username = raw_input("Enter Username:")
+
+			if password is None:
+				password = raw_input("Enter Password:")
+
+			if self.credentials != (username, password):
+				self.credentials = (username, password)
+
+			client = rpycservices.client(self.host, self.port, username=username, password=password)
+		else:
+			client = rpycservices.client(self.host, self.port)
+
+		self.connection = client
+
+
+	def __do(self, name, *args, **kwargs):
+		if self.connection is None:
+			self.__connect()
+
+		remotefn = getattr(self.connection, name, None)
+
+		try:
+			if callable(remotefn):
+				returns = remotefn(*args, **kwargs)
+
+			else:
+				raise TashiException({'msg':'%s not callable' % name})
+
+		except:
+			self.connection = None
+			raise
+
+		return returns
+
+	def __getattr__(self, name):
+		return functools.partial(self.__do, name)
+
+
 def createClient(config):
 	cfgHost = config.get('Client', 'clusterManagerHost')
 	cfgPort = config.get('Client', 'clusterManagerPort')
@@ -273,14 +344,12 @@
 	authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
 	if authAndEncrypt:
 		username = config.get('AccessClusterManager', 'username')
-		if username == '':
-			username = raw_input('Enter Username:')
 		password = config.get('AccessClusterManager', 'password')
-		if password == '':
-			password = getpass.getpass('Enter Password:')
-		client = rpycservices.client(host, port, username=username, password=password)
+		client = Connection(host, port, authAndEncrypt, (username, password))
+
 	else:
-		client = rpycservices.client(host, port)
+		client = Connection(host, port)
+
 	return client
 
 def enumToStringDict(cls):
diff --git a/src/tashi/version.py b/src/tashi/version.py
index 1fd7997..2380bed 100644
--- a/src/tashi/version.py
+++ b/src/tashi/version.py
@@ -15,4 +15,4 @@
 # specific language governing permissions and limitations
 # under the License.    
 
-version = "201202"
+version = "201203"
diff --git a/src/utils/Makefile b/src/utils/Makefile
deleted file mode 100644
index aea56ee..0000000
--- a/src/utils/Makefile
+++ /dev/null
@@ -1,24 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-all: nmd
-
-clean:
-	rm -f ./nmd
-
-nmd: nmd.c
-	${CC} $< -o $@
diff --git a/src/utils/getLocality.py b/src/utils/getLocality.py
deleted file mode 100755
index 49ecb11..0000000
--- a/src/utils/getLocality.py
+++ /dev/null
@@ -1,68 +0,0 @@
-#!/usr/bin/python
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import sys
-import os
-from os import system
-
-import tashi.services.layoutlocality.localityservice as localityservice
-
-from thrift import Thrift
-from thrift.transport import TSocket
-from thrift.transport import TTransport
-from thrift.protocol import TBinaryProtocol
-
-from tashi.util import getConfig
-
-(config, configFiles) = getConfig(["Client"])
-host = config.get('LocalityService', 'host')
-port = int(config.get('LocalityService', 'port'))
-
-socket = TSocket.TSocket(host, port)
-transport = TTransport.TBufferedTransport(socket)
-protocol = TBinaryProtocol.TBinaryProtocol(transport)
-client = localityservice.Client(protocol)
-transport.open()
-
-while True:
-	line1 = "\n"
-	line2 = "\n"
-	while line1 != "":
-		line1 = sys.stdin.readline()
-		if line1 == "":
-			sys.exit(0)
-		if line1 != "\n":
-			break
-	line1 = line1.strip()
-	while line2 != "":
-		line2 = sys.stdin.readline()
-		if line2 == "":
-			sys.exit(0)
-		if line2 != "\n":
-			break
-	line2 = line2.strip()
-
-	sources = line1.split(" ")
-	destinations = line2.split(" ")
-
-	mat = client.getHopCountMatrix(sources, destinations)
-	for r in mat:
-		for c in r:
-			print '%f\t'%c,
-		print '\n',
-	print '\n',
diff --git a/src/utils/nmd.c b/src/utils/nmd.c
deleted file mode 100644
index effa1d2..0000000
--- a/src/utils/nmd.c
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.    
- */
-
-#include <sys/types.h>
-#include <sys/wait.h>
-#include <dirent.h>
-#include <fcntl.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <assert.h>
-
-#define SLEEP_INTERVAL 10
-#define TASHI_PATH "/usr/local/tashi/"
-#define LOG_FILE "/var/log/nodemanager.log"
-
-/* This function changes (on Linux!) its oom scoring, to make it
- * unattractive to kill
- */
-
-void make_invincible()
-{
-	int oom_adj_fd;
-	int r;
-
-	oom_adj_fd = open("/proc/self/oom_adj", O_WRONLY);
-	assert(oom_adj_fd != -1);
-	r = write(oom_adj_fd, "-17\n", 4);
-	assert(r == 4);
-	close(oom_adj_fd);
-
-}
-
-/* This function resets (on Linux!) its oom scoring to default
- */
-void make_vulnerable()
-{
-	int oom_adj_fd;
-	int r;
-
-	oom_adj_fd = open("/proc/self/oom_adj", O_WRONLY);
-	assert(oom_adj_fd != -1);
-	r = write(oom_adj_fd, "0\n", 2);
-	assert(r == 2);
-	close(oom_adj_fd);
-}
-
-int main(int argc, char **argv)
-{
-	char* env[2];
-	int status;
-	DIR* d;
-	int pid;
-	int lfd;
-	int foreground=0;
-
-/* If first argument is "-f", run in foreground */
-	if ((argc > 1) && (strncmp(argv[1], "-f", 3)==0)) {
-		foreground=1;
-	}
-/* If not running in foreground, fork off and exit the parent.
- * The child closes its default file descriptors.
- */
-	if (!foreground) {
-		pid = fork();
-		if (pid != 0) {
-			exit(0);
-		}
-		close(0);
-		close(1);
-		close(2);
-	}
-/* Adjust OOM preference */
-	make_invincible();
-/* Configure environment of children */
-	env[0] = "PYTHONPATH="TASHI_PATH"/src/";
-	env[1] = NULL;
-	while (1) {
-		pid = fork();
-		if (pid == 0) {
-			/* child */
-			/* nodemanagers are vulnerable. Not the supervisor. */
-			make_vulnerable();
-			if (!foreground) {
-				/* If not running fg, open log file */
-				lfd = open(LOG_FILE, O_WRONLY|O_APPEND|O_CREAT);
-				if (lfd < 0) {
-					/* If this failed, open something? */
-					lfd = open("/dev/null", O_WRONLY);
-				}
-				/* Make this fd stdout and stderr */
-				dup2(lfd, 2);
-				dup2(lfd, 1);
-				/* close stdin */
-				close(0);
-			}
-			chdir(TASHI_PATH);
-			/* start node manager with python environment */
-			execle("./bin/nodemanager.py", "./bin/nodemanager.py", NULL, env);
-			exit(-1);
-		}
-		/* sleep before checking for child's status */
-		sleep(SLEEP_INTERVAL);
-		/* catch child exiting and go through loop again */
-		waitpid(pid, &status, 0);
-	}	 /* while (1) */
-}
diff --git a/src/utils/nmd.py b/src/utils/nmd.py
index e74a82f..118aee8 100755
--- a/src/utils/nmd.py
+++ b/src/utils/nmd.py
@@ -16,9 +16,10 @@
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
- * under the License.    
+ * under the License.	 
  */
 """
+# XXXstroucki: why not use something like supervise instead?
 
 import os
 import sys
@@ -36,81 +37,81 @@
  */
 """
 def make_invincible():
-   # dependent on linux
-   try:
-      oom_adj_fd = os.open("/proc/self/oom_adj", os.O_WRONLY)
-   except IOError:
-      pass
-   else:
-      os.write(oom_adj_fd, "-17\n")
-      os.close(oom_adj_fd)
+	# dependent on linux
+	try:
+		oom_adj_fd = os.open("/proc/self/oom_adj", os.O_WRONLY)
+	except IOError:
+		pass
+	else:
+		os.write(oom_adj_fd, "-17\n")
+		os.close(oom_adj_fd)
 
 """
 /* This function resets (on Linux!) its oom scoring to default
  */
 """
 def make_vulnerable():
-   # dependent on linux
-   try:
-      oom_adj_fd = os.open("/proc/self/oom_adj", os.O_WRONLY)
-   except IOError:
-      pass
-   else:
-      os.write(oom_adj_fd, "0\n")
-      os.close(oom_adj_fd)
+	# dependent on linux
+	try:
+		oom_adj_fd = os.open("/proc/self/oom_adj", os.O_WRONLY)
+	except IOError:
+		pass
+	else:
+		os.write(oom_adj_fd, "0\n")
+		os.close(oom_adj_fd)
 
 def main(argv=None):
-   if argv is None:
-      argv = sys.argv
-   try:
-      opts, args = getopt.getopt(argv[1:], "f", ["foreground"])
-   except getopt.GetoptError, err:
-      # print help information and exit:
-      print str(err) # will print something like "option -a not recognized"
-      # usage()
-      return 2
-   foreground = False
-   for o, a in opts:
-      if o in ("-f", "--foreground"):
-         foreground = True
-      else:
-         assert False, "unhandled option"
-   if foreground == False:
-      pid = os.fork();
-      if pid != 0:
-         os._exit(0)
-      os.close(0)
-      os.close(1)
-      os.close(2)
+	if argv is None:
+		argv = sys.argv
+	try:
+		opts, args = getopt.getopt(argv[1:], "f", ["foreground"])
+	except getopt.GetoptError, err:
+		# print help information and exit:
+		print str(err) # will print something like "option -a not recognized"
+		# usage()
+		return 2
+	foreground = False
+	for o, a in opts:
+		if o in ("-f", "--foreground"):
+			foreground = True
+		else:
+			assert False, "unhandled option"
+	if foreground == False:
+		pid = os.fork();
+		if pid != 0:
+			os._exit(0)
+		os.close(0)
+		os.close(1)
+		os.close(2)
 
-   # adjust oom preference
-   make_invincible()
+	# adjust oom preference
+	make_invincible()
 
-   # configure environment of children
-   env = {"PYTHONPATH":TASHI_PATH+"/src"}
-   while True:
-      pid = os.fork();
-      if pid == 0:
-         # child
-         # nodemanagers are vulnerable, not the supervisor
-         make_vulnerable()
-         if foreground == False:
-            try:
-               lfd = os.open(LOG_FILE, os.O_APPEND|os.O_CREAT|os.O_WRONLY)
-            except IOError:
-               lfd = os.open("/dev/null", os.O_WRONLY)
-            # make this fd stdout and stderr
-            os.dup2(lfd, 1)
-            os.dup2(lfd, 2)
-            # close stdin
-            os.close(0)
-         os.chdir(TASHI_PATH)
-         os.execle("./bin/nodemanager.py", "./bin/nodemanager.py", env)
-         os._exit(-1)
-      # sleep before checking child status
-      time.sleep(SLEEP_INTERVAL)
-      os.waitpid(pid, 0)
-   return 0
+	# configure environment of children
+	env = {"PYTHONPATH":TASHI_PATH+"/src"}
+	while True:
+		pid = os.fork();
+		if pid == 0:
+			# child
+			# nodemanagers are vulnerable, not the supervisor
+			make_vulnerable()
+			if foreground == False:
+				try:
+					lfd = os.open(LOG_FILE, os.O_APPEND|os.O_CREAT|os.O_WRONLY)
+				except IOError:
+					lfd = os.open("/dev/null", os.O_WRONLY)
+				# make this fd stdout and stderr
+				os.dup2(lfd, 1)
+				os.dup2(lfd, 2)
+				# close stdin
+				os.close(0)
+			os.chdir(TASHI_PATH)
+			os.execle("./bin/nodemanager.py", "./bin/nodemanager.py", env)
+			os._exit(-1)
+		# sleep before checking child status
+		time.sleep(SLEEP_INTERVAL)
+		os.waitpid(pid, 0)
+	return 0
 
 if __name__ == "__main__":
-   sys.exit(main())
+	sys.exit(main())
diff --git a/src/zoni/client/zoni-cli.py b/src/zoni/client/zoni-cli.py
index c5918ab..f22cc4e 100755
--- a/src/zoni/client/zoni-cli.py
+++ b/src/zoni/client/zoni-cli.py
@@ -327,11 +327,11 @@
 	if (options.nodeName):
 		cmdargs["sys_id"] = options.nodeName
 
-	if 	(options.numCores or options.clockSpeed or options.numMemory or options.numProcs or options.cpuFlags) and not options.showResources:
-			usage = "MISSING OPTION: When specifying hardware parameters, you need the -s or --showResources switch"
-			print usage
-			parser.print_help()	
-			exit()
+	if (options.numCores or options.clockSpeed or options.numMemory or options.numProcs or options.cpuFlags) and not options.showResources:
+		usage = "MISSING OPTION: When specifying hardware parameters, you need the -s or --showResources switch"
+		print usage
+		parser.print_help()	
+		exit()
 
 	if options.getResources:
 		print "ALL resources"
diff --git a/src/zoni/extra/util.py b/src/zoni/extra/util.py
index 38ce9df..6a12b6a 100644
--- a/src/zoni/extra/util.py
+++ b/src/zoni/extra/util.py
@@ -19,6 +19,7 @@
 #
 
 import os
+import sys
 import string
 import ConfigParser
 import time
@@ -218,19 +219,25 @@
 	return val
 	
 
-
+def __getShellFn():
+	if sys.version_info < (2, 6, 1):
+		from IPython.Shell import IPShellEmbed
+		return IPShellEmbed()
+	else:
+		import IPython
+		return IPython.embed()
 
 def debugConsole(globalDict):
 	"""A debugging console that optionally uses pysh"""
 	def realDebugConsole(globalDict):
 		try :
 			import atexit
-			from IPython.Shell import IPShellEmbed
+			shellfn = __getShellFn()
 			def resetConsole():
 # XXXpipe: make input window sane
 				(stdin, stdout) = os.popen2("reset")
 				stdout.read()
-			dbgshell = IPShellEmbed()
+			dbgshell = shellfn()
 			atexit.register(resetConsole)
 			dbgshell(local_ns=globalDict, global_ns=globalDict)
 		except Exception:
diff --git a/src/zoni/hardware/delldrac.py b/src/zoni/hardware/delldrac.py
index 7cb189f..cbdd493 100644
--- a/src/zoni/hardware/delldrac.py
+++ b/src/zoni/hardware/delldrac.py
@@ -147,7 +147,7 @@
 		for val in fout.readlines():
 			if "OK" in val:
 				code = 1
- 			if "CURRENTLY POWER-OFF" in val:
+			if "CURRENTLY POWER-OFF" in val:
 				self.log.info("Hardware already power off : %s", self.hostname)
 				code = 1
 		if code < 1:
@@ -171,7 +171,7 @@
 		for val in fout.readlines():
 			if "OK" in val:
 				code = 1
- 			if "CURRENTLY POWER-OFF" in val:
+			if "CURRENTLY POWER-OFF" in val:
 				self.log.info("Hardware already power off : %s", self.hostname)
 				code = 1
 		if code < 1:
diff --git a/src/zoni/hardware/dellswitch.py b/src/zoni/hardware/dellswitch.py
index 0ddf8aa..d8296c3 100644
--- a/src/zoni/hardware/dellswitch.py
+++ b/src/zoni/hardware/dellswitch.py
@@ -54,7 +54,7 @@
 			pass
 
 
- 	def setVerbose(self, verbose):
+	def setVerbose(self, verbose):
 		self.verbose = verbose
 
 	def __login(self):
diff --git a/src/zoni/hardware/hpswitch.py b/src/zoni/hardware/hpswitch.py
index ada83b9..fe1f604 100644
--- a/src/zoni/hardware/hpswitch.py
+++ b/src/zoni/hardware/hpswitch.py
@@ -74,10 +74,10 @@
 		child.sendline(cmd)
 		opt = child.expect(["Confirm(.*)", "No save(.*)", pexpect.EOF, pexpect.TIMEOUT])
 		if opt == 0:
-				print "saving to flash"
-				child.sendline("y\n")
+			print "saving to flash"
+			child.sendline("y\n")
 		if opt == 1:
-				print "no save needed"
+			print "no save needed"
 		child.sendline('exit')
 		child.terminate()
 
diff --git a/src/zoni/install/db/zoniDbSetup.py b/src/zoni/install/db/zoniDbSetup.py
index 88998b4..309a3a9 100644
--- a/src/zoni/install/db/zoniDbSetup.py
+++ b/src/zoni/install/db/zoniDbSetup.py
@@ -27,8 +27,8 @@
 	import optparse
 	import getpass
 except ImportError, e:
-        print "Module not installed : %s" % e
-        exit()
+	print "Module not installed : %s" % e
+	exit()
 
 
 a = os.path.join("../")
@@ -406,7 +406,7 @@
 
 def entryExists(conn, table, col, checkVal):
 	query = "select * from " + table + " where " + col + " = '" + checkVal + "'"
- 	r = execQuery(conn, query)
+	r = execQuery(conn, query)
 	res = r.fetchall()
 	if len(res) > 0:
 		return (1, res)
@@ -415,5 +415,5 @@
 
 
 if __name__ == "__main__":
-    main()
+	main()
 
diff --git a/src/zoni/install/dnsdhcp/zoniDnsDhcpSetup.py b/src/zoni/install/dnsdhcp/zoniDnsDhcpSetup.py
index a68eb83..f55fb71 100755
--- a/src/zoni/install/dnsdhcp/zoniDnsDhcpSetup.py
+++ b/src/zoni/install/dnsdhcp/zoniDnsDhcpSetup.py
@@ -79,5 +79,5 @@
 
 
 if __name__ == "__main__":
-    main()
+	main()