merge stroucki-dropthrift from trunk
Nodemanager.cfg, TashiDefaults.cfg: remove thrift config items


git-svn-id: https://svn.apache.org/repos/asf/incubator/tashi/branches/stroucki-dropthrift@1297655 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/TODO b/TODO
new file mode 100644
index 0000000..08150be
--- /dev/null
+++ b/TODO
@@ -0,0 +1 @@
+remove thrift cruft
diff --git a/etc/NodeManager.cfg b/etc/NodeManager.cfg
index a47bccf..48f4044 100644
--- a/etc/NodeManager.cfg
+++ b/etc/NodeManager.cfg
@@ -80,7 +80,6 @@
 statsInterval = 0.0
 ;accountingHost = clustermanager
 ;accountingPort = 2228
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
 
 [Security]
 authAndEncrypt = False
diff --git a/etc/TashiDefaults.cfg b/etc/TashiDefaults.cfg
index fd034eb..101da69 100644
--- a/etc/TashiDefaults.cfg
+++ b/etc/TashiDefaults.cfg
@@ -57,7 +57,6 @@
 allowDuplicateNames = False
 ;accountingHost = clustermanager
 ;accountingPort = 2228
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
 
 [GetentOverride]
 baseData = tashi.clustermanager.data.Pickled
@@ -110,7 +109,6 @@
 clusterManagerHost = localhost 
 clusterManagerPort = 9882
 statsInterval = 0.0
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
 
 [Qemu]
 qemuBin = /usr/bin/kvm
diff --git a/src/tashi/agents/locality-server.py b/src/tashi/agents/locality-server.py
deleted file mode 100755
index a2b64a5..0000000
--- a/src/tashi/agents/locality-server.py
+++ /dev/null
@@ -1,228 +0,0 @@
-#!/usr/bin/python
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# this module provides a service to locate servers that are close
-# to a VM. Uses all-pairs shortest path algorithm. Need to provide
-# a topology for the underlying physical infrastructure.
-
-from socket import gethostname
-import os
-import threading
-import time
-import socket
-
-from tashi.services.ttypes import *
-
-from thrift import Thrift
-from thrift.transport import TSocket
-from thrift.transport import TTransport
-from thrift.protocol import TBinaryProtocol
-from thrift.server import TServer
-
-from tashi.services import clustermanagerservice
-from tashi.util import getConfig, createClient
-from tashi.parallel import *
-
-import tashi.services.layoutlocality.localityservice as localityservice
-
-from numpy import *
-from scipy import *
-
-cnames = {}
-def cannonicalName(hn):
-	try:
-		if cnames.has_key(hn):
-			return cnames[hn]
-		r = socket.gethostbyname_ex(hn)[0]
-		cnames[hn] = r
-		return r
-	except:
-		return hn
-
-# define matrix multiplication that can be used to calculate a min-plus
-# distance product
-def genMul(A, B, add, mult):
-	'''generalized matrix multiplication'''
-	C = zeros((shape(A)[0], shape(B)[1]))
-	for i in range(shape(C)[0]):
-		for j in range(shape(C)[1]):
-			C[i,j] = add(mult(A[i,:], B[:,j]))
-	return C
-
-def addHost(graph, hostVals, host):
-	if not graph.has_key(host):
-		graph[host] = []
-	if not hostVals.has_key(host):
-		hostVals[host] = len(hostVals)
-
-def graphConnect(graph, h1, h2):
-	if not h1 in graph[h2]:
-		graph[h2].append(h1)
-	if not h2 in graph[h1]:
-		graph[h1].append(h2)
-
-def graphFromFile(fn = 'serverLayout', graph = {}, hostVals = {}):
-	f = open(fn)
-	for line in f.readlines():
-		line = line.split()
-		if len(line) < 1:
-			continue
-		server = cannonicalName(line[0].strip())
-
-		addHost(graph, hostVals, server)
-		for peer in line[1:]:
-			peer = cannonicalName(peer.strip())
-			addHost(graph, hostVals, peer)
-			graphConnect(graph, server, peer)
-	return graph, hostVals
-
-def graphFromTashi(client, transport, graph={}, hostVals={}):
-	print 'getting graph'
-	if not transport.isOpen():
-		transport.open()
-	hosts = client.getHosts()
-	instances = client.getInstances()
-	for instance in instances:
-		host = [cannonicalName(h.name) for h in hosts if h.id == instance.hostId]
-		if len(host) <1 :
-			print 'cant find vm host'
-			continue
-		host = host[0]
-		print 'host is ', host
-		addHost(graph, hostVals, host)
-		print 'added host'
-		vmhost = cannonicalName(instance.name)
-		addHost(graph, hostVals, vmhost)
-		print 'added vm'
-		graphConnect(graph, host, vmhost)
-		print 'connected'
-	print 'returning from graphFromTashi'
-	return graph, hostVals
-
-
-
-def graphToArray(graph, hostVals):
-	a = zeros((len(hostVals), len(hostVals)))
-	for host in graph.keys():
-		if not hostVals.has_key(host):
-			continue
-		a[hostVals[host], hostVals[host]] = 1
-		for peer in graph[host]:
-			if not hostVals.has_key(peer):
-				continue
-			a[hostVals[host], hostVals[peer]] = 1
-	a[a==0] = inf
-	for i in range(shape(a)[0]):
-		a[i,i]=0
-	return a
-
-def shortestPaths(graphArray):
-	a = graphArray
-	for i in range(math.ceil(math.log(shape(a)[0],2))):
-		a = genMul(a,a,min,plus)
-	return a
-
-def plus(A, B):
-	return A + B
-
-
-def getHopCountMatrix(sourceHosts, destHosts, array, hostVals):
-	a = zeros((len(sourceHosts), len(destHosts)))
-	a[a==0] = inf
-	for i in range(len(sourceHosts)):
-		sh = cannonicalName(sourceHosts[i])
-		shv = None
-		if hostVals.has_key(sh):
-			shv = hostVals[sh]
-		else:
-			print 'host not found', sh
-			continue
-		for j in range(len(destHosts)):
-			dh = cannonicalName(destHosts[j])
-			dhv = None
-			if hostVals.has_key(dh):
-				dhv = hostVals[dh]
-			else:
-				print 'dest not found', dh
-				continue
-			print sh, dh, i,j, shv, dhv, array[shv, dhv]
-			a[i,j] = array[shv, dhv]
-	return a
-
-
-class LocalityService:
-	def __init__(self):
-		(config, configFiles) = getConfig(["Agent"])
-		self.port = int(config.get('LocalityService', 'port'))
-		print 'Locality service on port %i' % self.port
-		self.processor = localityservice.Processor(self)
-		self.transport = TSocket.TServerSocket(self.port)
-		self.tfactory = TTransport.TBufferedTransportFactory()
-		self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
-		self.server = TServer.TThreadedServer(self.processor,
-							self.transport,
-							self.tfactory,
-							self.pfactory)
-                
-		self.hostVals =[]
-		self.array = array([[]])
-		self.rtime = 0
-
-
-		self.fileName = os.path.expanduser(config.get("LocalityService", "staticLayout"))
-		(self.client, self.transport) = createClient(config)
-
-		self.server.serve()
-
-	@synchronizedmethod
-	def refresh(self):
-		if time.time() - self.rtime < 10:
-			return
-		g, self.hostVals = graphFromFile(self.fileName)
-		try:
-			g, self.hostVals = graphFromTashi(self.client, self.transport, g, self.hostVals)
-		except e:
-			print e
-			print 'could not get instance list from cluster manager'
-		print 'graph to array'
-		a = graphToArray(g, self.hostVals)
-		print 'calling shortest paths ', a.shape
-		self.array = shortestPaths(a)
-		print 'computed shortest paths'
-		print self.array
-		print self.hostVals
-	@synchronizedmethod
-	def getHopCountMatrix(self, sourceHosts, destHosts):
-		self.refresh()
-		print 'getting hop count matrix for', sourceHosts, destHosts
-		hcm =  getHopCountMatrix(sourceHosts, destHosts, self.array, self.hostVals)
-		print hcm
-		return hcm
-
-
-def main():
-
-	#XXXstroucki This code has not been updated for several years.
-	# It may still be useful as an example.
-	import sys
-	sys.exit(0);
-
-	ls = LocalityService()
-
-if __name__ == "__main__":
-	main()
diff --git a/src/tashi/client/client.py b/src/tashi/client/client.py
deleted file mode 100755
index 71b5b20..0000000
--- a/src/tashi/client/client.py
+++ /dev/null
@@ -1,213 +0,0 @@
-#! /usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-import inspect
-import os
-import sys
-import types
-from tashi.services.ttypes import *
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
-from thrift.transport.TSocket import TSocket
-
-from tashi.services import clustermanagerservice
-from tashi import vmStates
-
-from tashi.util import getConfig
-
-def makeHTMLTable(list):
-	(stdin_r, stdin_w) = os.pipe()
-
-# XXXpipe: find number of columns in current window
-	pipe = os.popen("tput cols")
-	columns = pipe.read().strip()
-	keys = {}
-	for k in list:
-		for k2 in k.__dict__.keys():
-			if (not k2.endswith("Obj")):
-				keys[k2] = k2
-	if ('id' in keys):
-		del keys['id']
-		keylist = ['id'] + keys.keys()
-	else:
-		keylist = keys.keys()
-	output = "<html>"
-	output = output + "<table>"
-	output = output + "<tr>"
-	for k in keylist:
-		output = output + "<td>%s</td>" % (k)
-	output = output + "</tr>"
-	for k in list:
-		output = output + "<tr>"
-		for k2 in keylist:
-			if (k2 == "state"):
-				output = output + "<td>%s</td>" % (str(vmStates[k.__dict__.get(k2, None)]))
-			else:
-				output = output + "<td>%s</td>" % (str(k.__dict__.get(k2, None)))
-		output = output + "</tr>"
-	output = output + "</table>"
-	output = output + "</html>"
-	pid = os.fork()
-	if (pid == 0):
-		os.close(stdin_w)
-		os.dup2(stdin_r, 0)
-		os.close(stdin_r)
-		os.execl("/usr/bin/lynx", "/usr/bin/lynx", "-width=%s" % (columns), "-dump", "-stdin")
-		sys.exit(-1)
-	os.close(stdin_r)
-	os.write(stdin_w, output)
-	os.close(stdin_w)
-	os.waitpid(pid, 0)
-
-def getFunction(argv):
-	"""Tries to determine the name of the function requested by the user -- may be called multiple times if the binary name is 'client'"""
-	function = "None"
-	if (len(argv) > 0):
-		function = argv[0].strip()
-		if (function.rfind("/") != -1):
-			function = function[function.rfind("/")+1:]
-		if (function.rfind(".") != -1):
-			function = function[:function.rfind(".")]
-	return function
-
-def getFunctionInfo(m):
-	"""Gets a string that describes a function from the interface"""
-	f = getattr(clustermanagerservice.Iface, m)
-	argspec = inspect.getargspec(f)[0][1:]
-	return m + inspect.formatargspec(argspec)
-
-def usage():
-	"""Print program usage"""
-	print "Available methods:"
-	for m in methods:
-		print "\t" + getFunctionInfo(m)
-	print
-	print "Examples:"
-	print "\tgetInstances"
-	print "\taddUser 'User(d={\"username\":\"foobar\"})'"
-	print "\tremoveUser 2"
-	print "\tcreateVM 1 1"
-
-def simpleType(obj):
-	"""Determines whether an object is a simple type -- used as a helper function to pprint"""
-	if (type(obj) is not type([])):
-		if (not getattr(obj, "__dict__", None)):
-			return True
-	return False
-
-def pprint(obj, depth = 0, key = None):
-	"""My own version of pprint that prints out a dict in a readable, but slightly more compact format"""
-	valueManip = lambda x: x
-	if (key):
-		keyString = key + ": "
-		if (key == "state"):
-			valueManip = lambda x: vmStates[x]
-	else:
-		keyString = ""
-	if (type(obj) is type([])):
-		if (reduce(lambda x, y: x and simpleType(y), obj, True)):
-			print (" " * (depth * INDENT)) + keyString + str(obj)
-		else:
-			print (" " * (depth * INDENT)) + keyString + "["
-			for o in obj:
-				pprint(o, depth + 1)
-			print (" " * (depth * INDENT)) + "]"
-	elif (getattr(obj, "__dict__", None)):
-		if (reduce(lambda x, y: x and simpleType(y), obj.__dict__.itervalues(), True)):
-			print (" " * (depth * INDENT)) + keyString + str(obj)
-		else:
-			print (" " * (depth * INDENT)) + keyString + "{"
-			for (k, v) in obj.__dict__.iteritems():
-				pprint(v, depth + 1, k)
-			print (" " * (depth * INDENT)) + "}"
-	else:
-		print (" " * (depth * INDENT)) + keyString + str(valueManip(obj))
-
-def main():
-	"""Main function for the client program"""
-	global INDENT, methods, exitCode
-	exitCode = 0
-	INDENT = (os.getenv("INDENT", 4))
-	methods = filter(lambda x: not x.startswith("__"), clustermanagerservice.Iface.__dict__.keys())
-	function = getFunction(sys.argv)
-	if (function == "client"):
-		function = getFunction(sys.argv[1:])
-	if (function == "--makesyms"):
-		for m in methods:
-			os.symlink(sys.argv[0], m)
-		sys.exit(0)
-	if (function == "--rmsyms"):
-		for m in methods:
-			os.unlink(m)
-		sys.exit(0)
-
-	(config,configFiles) = getConfig(["Client"])
-	cfgHost = config.get('Client', 'clusterManagerHost')
-	cfgPort = config.get('Client', 'clusterManagerPort')
-	cfgTimeout = float(config.get('Client', 'clusterManagerTimeout'))
-	host = os.getenv('TASHI_CM_HOST', cfgHost)
-	port = os.getenv('TASHI_CM_PORT', cfgPort)
-	timeout = float(os.getenv('TASHI_CM_TIMEOUT', cfgTimeout)) * 1000.0
-
-	socket = TSocket(host, int(port))
-	socket.setTimeout(timeout)
-	transport = TBufferedTransport(socket)
-	protocol = TBinaryProtocol(transport)
-	client = clustermanagerservice.Client(protocol)
-	client._transport = transport
-	client._transport.open()
-	f = getattr(client, function, None)
-	if not f:
-		usage()
-		sys.exit(-1)
-	args = map(lambda x: eval(x), sys.argv[1:])
-	try:
-		res = f(*args)
-		def cmp(x, y):
-			try:
-				if (x.id < y.id):
-					return -1
-				elif (y.id < x.id):
-					return 1
-				else:
-					return 0
-			except Exception, e:
-				return 0
-		if (type(res) == types.ListType):
-			res.sort(cmp)
-		if (os.getenv("USE_HTML_TABLES")):
-			try:
-				makeHTMLTable(res)
-			except:
-				pprint(res)
-		else:
-			pprint(res)
-	except TashiException, e:
-		print e.msg
-		exitCode = e.errno
-	except TypeError, e:
-		print e
-		print "\t" + getFunctionInfo(function)
-		exitCode = -1
-	finally:
-		client._transport.close()
-	sys.exit(exitCode)
-
-if __name__ == "__main__":
-	main()
diff --git a/src/tashi/client/test.py b/src/tashi/client/test.py
deleted file mode 100644
index a53eefa..0000000
--- a/src/tashi/client/test.py
+++ /dev/null
@@ -1,314 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-import unittest
-import logging
-import sys
-import signal
-import os.path
-import copy
-import time
-import random
-from ConfigParser import ConfigParser
-
-from tashi.services.ttypes import *
-from thrift.transport.TSocket import TSocket
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
-
-from tashi.services import clustermanagerservice
-from tashi.messaging.threadpool import synchronized
-from tashi.messaging.tashimessaging import TestTashiSubscriber
-
-from tashi.util import getConfig
-
-import tashi.client.client
-
-class ClientConnection(object):
-	'''Creates an rpc proxy'''
-	def __init__(self, host, port):
-		self.host = host
-		self.port = port
-		self.transport = TBufferedTransport(TSocket(host, int(port)))
-		self.protocol = TBinaryProtocol(self.transport)
-		self.client = clustermanagerservice.Client(self.protocol)
-		self.client._transport = self.transport
-		self.client._transport.open()
-	def __del__(self):
-		self.client._transport.close()
-
-def incrementor(init=0):
-	while 1:
-		yield init
-		init = init + 1
-
-# FIXME: don't duplicate code from clustermanager
-# def getConfig(args):
-#	 config = ConfigParser()
-#	 configFiles = [
-#		'/usr/share/tashi/ClusterManagerDefaults.cfg',
-#		'/etc/tashi/ClusterManager.cfg',
-#		os.path.expanduser('~/.tashi/ClusterManager.cfg')
-#		] + ([args[0]] if len(args) > 0 else [])
-
-#	 configFiles = config.read(configFiles)
-#	 if len(configFiles) == 0:
-#		 print >>sys.stderr, 'Unable to find the configuration file\n'
-#		 sys.exit(3)
-
-#	 return config
-
-
-class TestClient(unittest.TestCase):
-	@synchronized()
-	def getPortNum(self):
-		return self.portnum.next()
-
-	"""macro test cases for single-host tests
-
-	Assumes cwd is 'src/tashi/client/'
-	"""
-	def setUp(self):
-		"""Create a CM and single NM on local host"""
-		logging.info('setting up test')
-		
-		(self.config, self.configfiles) = getConfig([])
-
-		self.port = 1717		# FIXME: take this (and other things) from config file
-		self.portnum = incrementor(self.port)
-
-		self.cwd = os.getcwd()
-		self.srcd = os.path.dirname(os.path.dirname(self.cwd))
-		
-		self.environ = copy.copy(os.environ)
-		self.environ['PYTHONPATH'] = self.srcd
-		logging.info('base path = %s' % self.srcd)
-
-		self.nm = os.spawnlpe(os.P_NOWAIT, 'python', 'python', 
-							  os.path.join(self.srcd, 'tashi', 'nodemanager', 'nodemanager.py'),
-							  self.environ)
-		self.cm = os.spawnlpe(os.P_WAIT, 'python', 'python',
-							 os.path.join(self.srcd,  'tashi', 'clustermanager', 'clustermanager.py'),
-							 '--drop', '--create',
-							 os.path.expanduser('~/.tashi/ClusterManager.cfg'),
-							 self.environ)
-		self.cm = os.spawnlpe(os.P_NOWAIT, 'python', 'python',
-							 os.path.join(self.srcd,  'tashi', 'clustermanager', 'clustermanager.py'),
-							 os.path.expanduser('~/.tashi/ClusterManager.cfg'),
-							 self.environ)
-		# since we are spawning with P_NOWAIT, we need to sleep to ensure that the CM is listening
-		time.sleep(1)
-		try:
-			self.connection = ClientConnection('localhost', self.config.get('ClusterManagerService', 'port'))
-		except Exception, e:
-			logging.warning('client connection failed')
-			ex = None
-			try:
-				logging.warning("setUp killing node manager " + str(self.nm))
-				os.kill(self.nm, signal.SIGKILL)
-			except Exception, e:
-				ex = e
-				logging.warning('could not kill node manager: '+ str(e))
-			try:
-				logging.warning('setUp killing cluster manager ' + str(self.cm))
-				os.kill(self.cm, signal.SIGKILL)
-			except Exception, e:
-				ex = e
-				logging.warning('could not kill cluster manager: ' + str(e))
-			if e != None:
-				raise e
-
-		logging.info('node manager PID: %i' % self.nm)
-	def tearDown(self):
-		'''Kill the CM and NM that were created by setUP'''
-		logging.info('tearing down test')
-		ex = None
-		try:
-			logging.debug("killing cluster manager " + str(self.cm))
-			os.kill(self.cm, signal.SIGKILL)
-		except Exception, e:
-			ex = e
-			logging.error('Could not kill cluster manager: ' + str(e))
-			
-		try:
-			logging.debug("killing node manager " + str(self.nm))
-			os.kill(self.nm, signal.SIGKILL)
-		except Exception, e:
-			ex = e
-			logging.error('Could not kill node manager: ' + str(e))
-		if ex != None:
-			raise ex
-	def testSetup(self):
-		'''empty test to ensure that setUp code works'''
-		logging.info('setting up')
-	def testHostManagement(self):
-		'''test adding/removing/listing hosts
-
-		Right now this just adds a single host: localhost.  Eventually
-		it should 1) take a list of hosts from a test configuration
-		file, 2) ensure that all were added, 3) remove a random
-		subset, 4) ensure that they were correctly removed, 5) remove
-		all, 6) ensure that they were correctly removed.'''
-
-		# get empty host list
-		hosts = self.connection.client.getHosts()
-		self.assertEqual(hosts, [], 'starting host list not empty: ' + str(hosts) )
-
-		# add a host
-		host = Host()
-		host.hostname = 'localhost'
-		host.enabled=True
-		self.connection.client.addHost(host)
-		hosts = self.connection.client.getHosts()
-		self.assertEqual(len(hosts), 1, 'wrong number of hosts %i, should be %i' % (len(hosts), 1) )
-		self.assertEqual(hosts[0].hostname, 'localhost', 'wrong hostname: ' + str(hosts[0].hostname) )
-
-		# remove first host
-		hid = hosts[0].id
-		self.connection.client.removeHost(hid)
-		hosts = self.connection.client.getHosts()
-		self.assertEqual(hosts, [], 'host list not empty after remove: ' + str(hosts) )
-
-	def testMessaging(self):
-		'''test messaging system started by CM
-
-		tests messages published directly, through events in the CM,
-		and the log system'''
-		# FIXME: add tests for generating events as a side-effect of
-		# rpc commands, as well as logging in the CM
-		portnum = self.getPortNum()
-		self.sub = TestTashiSubscriber(self.config, portnum)
-		self.assertEqual(self.sub.messageQueue.qsize(), 0)
-		self.pub = tashi.messaging.thriftmessaging.PublisherThrift(self.config.get('MessageBroker', 'host'),
-																   int(self.config.get('MessageBroker', 'port')))
-		self.pub.publish({'message-type':'text', 'message':'Hello World!'})
-		time.sleep(0.5)
-		print '*** QSIZE', self.sub.messageQueue.qsize()
-		self.assertEqual(self.sub.messageQueue.qsize(), 1)
-
-		self.log = logging.getLogger(__name__)
-		messageHandler = tashi.messaging.tashimessaging.TashiLogHandler(self.config)
-		self.log.addHandler(messageHandler)
-		# FIXME: why can't we log messages with severity below 'warning'?
-		self.log.warning('test log message')
-		time.sleep(0.5)
-		self.assertEqual(self.sub.messageQueue.qsize(), 2)
-
-		# This should generate at least one log message
-#		 hosts = self.connection.client.getHosts()
-#		 time.sleep(0.5)
-#		 if (self.sub.messageQueue.qsize() <= 2):
-#			 self.fail()
-
-	def testUserManagement(self):
-		'''test adding/removing/listing users
-
-		same as testHostManagement, but with users'''
-		usernames = ['sleepy', 'sneezy', 'dopey', 'doc',
-					 'grumpy', 'bashful', 'happy']
-		# add all users
-		for un in usernames:
-			user = User()
-			user.username = un
-			self.connection.client.addUser(user)
-		# ensure that all were added
-		users = self.connection.client.getUsers()
-		self.assertEqual(len(usernames), len(users))
-		for user in users:
-			usernames.remove(user.username)
-		self.assertEqual(0, len(usernames))
-		# remove a random subset
-		rm = random.sample(users, 4)
-		for user in rm:
-			self.connection.client.removeUser(user.id)
-			users.remove(user)
-		newUsers = self.connection.client.getUsers()
-		# This ensures that the remaining ones are what we expect:
-		for user in newUsers:
-			# if there is a user remaining that we asked to be removed,
-			# this will throw an exception
-			users.remove(user)
-		# if a user was removed that we did not intend, this will
-		# throw an exception
-		self.assertEqual(0, len(users))
-
-#	 def testInstanceConfigurationManagement(self):
-#		 '''test adding/removing/listing instance configurations
-
-#		 same as testHostManagement, but with instance configurations'''
-#		 self.fail('test not implemented')
-	def testHardDiskConfigurationManagement(self):
-		'''test adding/removing/listing hard disk configurations
-
-		same as testHostManagement, but with hard disk configurations'''
-
-		user = User(d={'username':'sleepy'})
-		self.connection.client.addUser(user)
-		users = self.connection.client.getUsers()
-
-		per = PersistentImage()
-		per.userId = users[0].id
-		per.name = 'sleepy-PersistentImage'
-		self.connection.client.addPersistentImage(per)
-		pers = self.connection.client.getPersistentImages()
-
-		inst = InstanceConfiguration()
-		inst.name = 'sleepy-inst'
-		inst.memory = 512
-		inst.cores = 1
-		self.connection.client.addInstanceConfiguration(inst)
-		insts = self.connection.client.getInstanceConfigurations()
-
-		hdc = HardDiskConfiguration()
-		hdc.index = 0
-		hdc.persistentImageId = pers[0].id
-		hdc.persistent = False
-		hdc.instanceConfigurationId = insts[0].id
-
-#	 def testCreateDestroyShutdown(self):
-#		 '''test creating/destroying/shutting down VMs
-
-#		 not implemented'''
-#		 self.fail('test not implemented')
-#	 def testSuspendResume(self):
-#		 '''test suspending/resuming VMs
-
-#		 not implemented'''
-#		 self.fail('test not implemented')
-#	 def testMigrate(self):
-#		 '''test migration
-
-#		 not implemented'''
-#		 self.fail('test not implemented')
-#	 def testPauseUnpause(self):
-#		 '''test pausing/unpausing VMs
-
-#		 not implemented'''
-#		 self.fail('test not implemented')
-
-
-##############################
-# Test Code
-##############################
-if __name__ == '__main__':
-	logging.basicConfig(level=logging.NOTSET,
-						format="%(asctime)s %(levelname)s:\t %(message)s",
-						stream=sys.stdout)
-
-	suite = unittest.TestLoader().loadTestsFromTestCase(TestClient)
-	unittest.TextTestRunner(verbosity=2).run(suite)
-
diff --git a/src/tashi/messaging/messageBroker.py b/src/tashi/messaging/messageBroker.py
deleted file mode 100644
index c21b57a..0000000
--- a/src/tashi/messaging/messageBroker.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-import ConfigParser
-import getopt
-
-import os
-import sys
-import time
-
-import thriftmessaging
-
-options = []
-long_options = ['port=']
-
-# FIXME: should initialize from config file
-params = {"port":1717}
-
-try:
-	optlist, args = getopt.getopt(sys.argv[1:], options, long_options)
-except getopt.GetoptError, err:
-	print str(err)
-	sys.exit(2)
-
-for opt in optlist:
-	if opt[0] == "--port":
-		try:
-			params["port"] = int(opt[1])
-		except:
-			print "--port expects an integer, got %s" % opt[1]
-			sys.exit(0)
-
-print "Starting message broker on port %i" % params["port"]
-broker = thriftmessaging.MessageBrokerThrift(params["port"], daemon=False)
-
diff --git a/src/tashi/messaging/messaging.py b/src/tashi/messaging/messaging.py
deleted file mode 100644
index c421d5c..0000000
--- a/src/tashi/messaging/messaging.py
+++ /dev/null
@@ -1,337 +0,0 @@
-#!/usr/bin/python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-import threading
-import thread
-import sys
-import os
-import socket
-import Queue
-import copy
-import random
-import traceback
-
-from threadpool import ThreadPoolClass, threadpool, ThreadPool
-from threadpool import threadpoolmethod, threaded, synchronized, synchronizedmethod
-
-class RWLock(object):
-	"""RWLock: Simple reader/writer lock implementation
-	FIXME: this implementation will starve writers!
-	Methods:
-		acquire() : take lock for read access
-		release() : release lock from read access
-		acquireWrite() : take lock for write access
-		releaseWrite() : release lock from write access"""
-	def __init__(self):
-		self.lock = threading.Condition()
-		self.readers = 0
-	def acquire(self):
-		self.lock.acquire()
-		self.readers = self.readers + 1
-		self.lock.release()
-	def release(self):
-		self.lock.acquire()
-		self.readers = self.readers - 1
-		self.lock.notify()
-		self.lock.release()
-	def acquireWrite(self):
-		self.lock.acquire()
-		while self.readers > 0:
-			self.lock.wait()
-	def releaseWrite(self):
-		self.lock.notify()
-		self.lock.release()
-
-
-
-class MessageBroker(object):
-	def __init__(self):
-		self.sublock = RWLock()
-		self.subscribers = []
-		self.random = random.Random()
-	def log(self, msg):
-		print "MessageBroker: Got log: '%s'" % str(msg)
-		return msg
-	def addSubscriber(self, subscriber):
-		self.sublock.acquireWrite()
-		self.subscribers.append(subscriber)
-		l = len(self.subscribers)
-		self.sublock.releaseWrite()
-		return l
-	def publish(self, message):
-		removesubs = []
-		i = self.random.randint(0,100)
-
-#		 subscribers = self.getSubscribers()
-#		 random.shuffle(subscribers)
-
-		self.sublock.acquire()
-
-		sys.stdout.flush()
-
-		for subscriber in self.subscribers:
-			try:
-				sys.stdout.flush()
-				assert(subscriber != self)
-				subscriber.publish(message)
-				sys.stdout.flush()
-			except Exception, e:
-				print e
-				removesubs.append(subscriber)
-
-		self.sublock.release()
-
-		if len(removesubs) > 0:
-			print "detected %i failed subscribers" % len(removesubs)
-			sys.stdout.flush()
-			self.sublock.acquireWrite()
-			for subscriber in removesubs:
-				try:
-					self.subscribers.remove(subscriber)
-				except:
-					pass
-			self.sublock.releaseWrite()
-	def getSubscribers(self):
-		self.sublock.acquire()
-		subs = copy.copy(self.subscribers)
-		self.sublock.release()
-		return subs
-	def removeSubscriber(self, subscriber):
-		self.sublock.acquireWrite()
-		try:
-			self.subscribers.remove(subscriber)
-		except:
-			pass
-		self.sublock.releaseWrite()
-	def publishList(self, messages):
-		for message in messages:
-			self.publish(message)
-
-class Subscriber(object):
-	def __init__(self, broker, pmatch={}, nmatch={}, synchronized=False):
-		self.broker = broker
-		self.lock = threading.Lock()
-		self.synchronized = synchronized
-		self.pmatch={}
-		self.nmatch={}
-		broker.addSubscriber(self)
-	def publish(self, message):
-		sys.stdout.flush()
-		msg = message
-		try:
-			if self.synchronized:
-				self.lock.acquire()
-			msg = self.filter(msg)
-			if (msg != None):
-				self.handle(msg)
-			if self.synchronized:
-				self.lock.release()
-		except Exception, x:
-			if self.synchronized:
-				self.lock.release()
-			print '%s, %s, %s' % (type(x), x, traceback.format_exc())
-	def publishList(self, messages):
-		for message in messages:
-			self.publish(message)
-	def handle(self, message):
-		print "Subscriber Default Handler: '%s'" % message
-	def setMatch(self, pmatch={}, nmatch={}):
-		self.lock.acquire()
-		self.pmatch=pmatch
-		self.nmatch=nmatch
-		self.lock.release()
-	def filter(self, message):
-		"""filter(self, message) : the filter function returns
-		the message, modified to be passed to the handler.
-		Returning (None) indicates that this is not a message
-		we are interested in, and it will not be passed to the
-		handler."""
-		send = True
-		for key in self.pmatch.keys():
-			if (not message.has_key(key)):
-				send = False
-				break
-			if self.pmatch[key] != None:
-				if message[key] != self.pmatch[key]:
-					send = False
-					break
-		if send == False:
-			return None
-		for key in message.keys():
-			if self.nmatch.has_key(key):
-				if self.nmatch[key] == None:
-					send = False
-					break
-				if self.nmatch[key] == message[key]:
-					send = False
-					break
-		if send == False:
-			return None
-		return message
-
-
-	
-class Publisher(object):
-	'''Superclass for pub/sub publishers
-
-	FIXME: use finer-grained locking'''
-	def __init__(self, broker, aggregate=100):
-		self.pending = []
-		self.pendingLock = threading.Lock()
-		self.aggregateSize = aggregate
-		self.broker = broker
-	@synchronizedmethod
-	def publish(self, message):
-		if message.has_key('aggregate') and message['aggregate'] == 'True':
-			self.aggregate(message)
-			return
-		else:
-			self.broker.publish(message)
-	@synchronizedmethod
-	def publishList(self, messages):
-		self.broker.publishList(messages)
-	@synchronizedmethod
-	def aggregate(self, message):
-		# we can make this lock-less by using a queue for pending
-		# messages
-		self.pendingLock.acquire()
-		self.pending.append(message)
-		if len(self.pending) >= self.aggregateSize:
-			self.broker.publishList(self.pending)
-			self.pending = []
-		self.pendingLock.release()
-	@synchronizedmethod
-	def setBroker(self, broker):
-		self.broker = broker
-
-##############################
-# Testing Code
-##############################
-import time
-import unittest
-import sys
-import logging
-
-		
-class TestSubscriber(Subscriber):
-	def __init__(self, *args, **kwargs):
-		self.queue = Queue.Queue()
-		Subscriber.__init__(self, *args, **kwargs)
-	def handle(self, message):
-		self.queue.put(message)
-
-class TestMessaging(unittest.TestCase):
-	def setUp(self):
-		self.broker = MessageBroker()
-		self.publisher = Publisher(self.broker)
-		self.subscriber = TestSubscriber(self.broker)
-	def testPublish(self):
-		self.publisher.publish( {'message':'hello world'} )
-		self.assertEqual(self.subscriber.queue.qsize(), 1)
-	def testPublishList(self):
-		nrmsgs = 10
-		msgs = []
-		for i in range(nrmsgs):
-			msgs.append( {'msgnum':str(i)} )
-		self.publisher.publishList( msgs )
-		self.assertEqual(self.subscriber.queue.qsize(), nrmsgs)
-	def testAggregate(self):
-		nrmsgs = self.publisher.aggregateSize
-		for i in range(nrmsgs):
-			self.assertEqual(self.subscriber.queue.qsize(), 0)
-			self.publisher.aggregate( {'msgnum':str(i)} )
-		self.assertEqual(self.subscriber.queue.qsize(), nrmsgs)
-	def testAggregateKeyword(self):
-		nrmsgs = self.publisher.aggregateSize
-		for i in range(nrmsgs):
-			self.assertEqual(self.subscriber.queue.qsize(), 0)
-			self.publisher.publish( {'msgnum':str(i), 'aggregate':'True'} )
-		self.assertEqual(self.subscriber.queue.qsize(), nrmsgs)
-
-if __name__ == '__main__':
-
-	logging.basicConfig(level=logging.INFO,
-						format="%(asctime)s %(levelname)s:\t %(message)s",
-						stream=sys.stdout)
-
-	suite = unittest.TestLoader().loadTestsFromTestCase(TestMessaging)
-	unittest.TextTestRunner(verbosity=2).run(suite) 
-
-	sys.exit(0)
-
-
-##############################
-# Old/Unused testing code
-##############################
-
-
-
-	print 'testing removeSubscriber'
-	broker.removeSubscriber(subscriber)
-	publisher.publish( {'message':"you shouldn't see this"} )
-
-	nsub = NullSubscriber(broker)
-	print 'timing publish'
-	nrmsg = 100000
-	tt = time.time()
-	for i in range(nrmsg):
-#		publisher.publish( {"message":"hello world!"} )
-		publisher.publish( {} )
-	tt = time.time() - tt
-	print "Published %i messages in %f seconds, %f msg/s"%(nrmsg,
-														   tt,
-														   nrmsg/tt)
-	broker.removeSubscriber(nsub)
-
-	class SlowSubscriber(Subscriber):
-		def handle(self, message):
-			print 'called slow subscriber with message', message
-			time.sleep(1)
-			print 'returning from slow subscriber with message', message
-	class ThreadedSubscriber(Subscriber):
-		@threaded
-		def handle(self, message):
-			print 'called threaded subscriber with message', message
-			time.sleep(1)
-			print 'returning from threaded subscriber with message', message
-	class ThreadPoolSubscriber(Subscriber, ThreadPoolClass):
-		@threadpoolmethod
-		def handle(self, message):
-			print 'called threadpool subscriber with message', message
-			time.sleep(1)
-			print 'returning from threadpool subscriber with message', message
-
-
-
-	tsub = ThreadedSubscriber(broker)
-	for i in range(8):
-		publisher.publish( {"msg":str(i)} )
-	broker.removeSubscriber(tsub)
-	time.sleep(3)
-
-	tpsub = ThreadPoolSubscriber(broker)
-	for i in range(8):
-		publisher.publish( {"msg":str(i)} )
-	broker.removeSubscriber(tpsub)
-	time.sleep(3)
-
-	ssub = SlowSubscriber(broker)
-	for i in range(4):
-		publisher.publish( {"msg":str(i)} )
-	broker.removeSubscriber(ssub)
diff --git a/src/tashi/messaging/soapmessaging.py b/src/tashi/messaging/soapmessaging.py
deleted file mode 100755
index be35fc9..0000000
--- a/src/tashi/messaging/soapmessaging.py
+++ /dev/null
@@ -1,229 +0,0 @@
-#! /usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-from messaging import *
-
-import cPickle
-import soaplib.wsgi_soap
-import cherrypy.wsgiserver
-from soaplib.service import soapmethod
-from soaplib.serializers.primitive import *
-import SOAPpy.WSDL
-import time
-
-class MessageBrokerSoap(soaplib.wsgi_soap.SimpleWSGISoapApp, MessageBroker):
-	def __init__(self, port):
-		soaplib.wsgi_soap.SimpleWSGISoapApp.__init__(self)
-		MessageBroker.__init__(self)
-		self.port = port
-		def trdfn():
-			service = self
-			server = cherrypy.wsgiserver.CherryPyWSGIServer(("0.0.0.0",port), service)
-			server.start()
-		threading.Thread(target=trdfn).start()
-
-
-	@soapmethod(Array(String), Array(String), _returns=Null)
-	def log(self, keys, values):
-		message = {}
-		if len(keys) != len(values):
-			raise Exception, "Different lengths for keys and values"
-		for i in range(len(keys)):
-			message[keys[i]] = values[i]
-		MessageBroker.log(self, message)
-
-	@soapmethod(String, Integer, _returns=Null)
-	def addSubscriber(self, host, port):
-		subscriber = SubscriberSoapProxy(host, port)
-		MessageBroker.addSubscriber(self, subscriber)
-	
-	@soapmethod(String, Integer, _returns=Null)
-	def removeSubscriber(self, host, port):
-		# should this method really be able to peek into subscriber.host/port 
-		subscriber = None
-		subscribers = self.getSubscribers()
-		for subscriber in subscribers:
-			if subscriber.host == host and subscriber.port == port:
-				subscriber = subscriber
-		if subscriber != None:
-			MessageBroker.removeSubscriber(self, subscriber)
-		
-
-	@soapmethod(Array(String), Array(String), _returns=Null)
-	def publish(self, keys, values):
-		message = {}
-		if len(keys) != len(values):
-			raise Exception, "Different lengths for keys and values"
-		for i in range(len(keys)):
-			message[keys[i]] = values[i]
-		MessageBroker.publish(self, message)
-
-
-
-class MessageBrokerSoapProxy(object):
-	def __init__(self, host, port):
-		self.host = host
-		self.port = port
-		self.connection = SOAPpy.WSDL.Proxy("http://%s:%i/.wsdl"%(host,port))
-	def log(self, message):
-		keys = []
-		values = []
-		for k,v in message.items():
-			keys.append(k)
-			values.append(v)
-		self.connection.log(keys=keys, values=values)
-	def addSubscriber(self, subscriber):
-		self.connection.addSubscriber(host=subscriber.host, port=subscriber.port)
-	def publish(self, message):
-		keys = []
-		values = []
-		for k,v in message.items():
-			keys.append(k)
-			values.append(v)
-		self.connection.publish(keys=keys, values=values)
-	def removeSubscriber(self, subscriber):
-		self.connection.removeSubscriber(host=subscriber.host, port=subscriber.port)
-
-
-
-
-class SubscriberSoap(soaplib.wsgi_soap.SimpleWSGISoapApp, Subscriber):
-	def __init__(self, broker, port, synchronized=False):
-		soaplib.wsgi_soap.SimpleWSGISoapApp.__init__(self)
-		Subscriber.__init__(self, synchronized=synchronized)
-		self.host = socket.gethostname()
-		self.port = port
-		self.broker = broker
-		self.server = None
-		def trdfn():
-			service = self
-			self.server = cherrypy.wsgiserver.CherryPyWSGIServer(("0.0.0.0",port), service)
-			self.server.start()
-		threading.Thread(target=trdfn).start()
-#		broker.log("Subscriber started")
-		broker.addSubscriber(self)
-	@soapmethod(Array(String), Array(String), _returns=Integer)
-	def publish(self, keys, values):
-		message = {}
-		if len(keys) != len(values):
-			raise Exception, "Different lengths for keys and values"
-		for i in range(len(keys)):
-			message[keys[i]] = values[i]
-		Subscriber.publish(self, message)
-		return 0
-	def stop(self):
-		self.server.stop()
-
-class SubscriberSoapProxy(object):
-	def __init__(self, host, port):
-		self.host = host
-		self.port = port
-		self.connection = SOAPpy.WSDL.Proxy("http://%s:%i/.wsdl"%(host,port))
-	def publish(self, message):
-		keys = []
-		values = []
-		for k,v in message.items():
-			keys.append(k)
-			values.append(v)
-		self.connection.publish(keys=keys, values=values)
-
-
-####################
-# Testing Code 
-####################
-
-class CustomSubscriber(SubscriberSoap):
-	def handle(self, message):
-		print "Custom Subscriber: '%s'" % str(message)
-
-class NullSubscriber(SubscriberSoap):
-	def handle(self, message):
-		pass
-
-
-if __name__ == '__main__':
-	try:
-		portnum = 1717
-
-		print "\ntesting message broker"
-		broker = MessageBrokerSoap(portnum)
-		proxy = MessageBrokerSoapProxy("localhost", portnum)
-		portnum = portnum + 1 
-
-		print "\ntesting log function"
-		proxy.log( {"message":"Hello World!"} )
-#		proxy.log("It looks like log works")
-
-		print "\ntesting subscriber proxy"
-		subscriber = SubscriberSoap(proxy, portnum)
-		portnum = portnum + 1
-
-		print "\ntesting custom subscriber"
-		csub = CustomSubscriber(proxy, portnum)
-		portnum = portnum + 1
-
-		print "\ntesting publish"
-		proxy.publish( {"message":"Hello World!"} )
-
-		print "\ntesting stop"
-		subscriber.stop()
-		proxy.publish( {"message":"Everybody here?"} )
-
-		print "\ntesting removeSubscriber"
-		proxy.removeSubscriber(csub)
-		proxy.publish( {"message":"Nobody home"} )
-		proxy.addSubscriber(csub)
-		proxy.publish( {"message":"You're back!"} )
-
-		print "\ntesting filter"
-		csub.setMatch( {"print":"yes"} )
-		proxy.publish( {"print":"yes", "message":"this should be printed"} )
-		proxy.publish( {"print":"no", "message":"this should NOT be printed"} )
-		csub.setMatch()
-
-		print "\ntesting publish performance"
-		proxy.removeSubscriber(csub)
-		nrmsg = 10000
-		tt = time.time()
-		for i in range(nrmsg):
-			proxy.publish( {"message":"msg %i"%i} )
-		tt = time.time() - tt
-		print "Published %i messages in %f seconds, %f msg/s"%(nrmsg,
-															   tt,
-															   nrmsg/tt)
-
-		print "\ntesting publish/subscribe performance"
-		nsub = NullSubscriber(proxy, portnum)
-		portnum = portnum + 1
-		nrmsg = 10000
-		tt = time.time()
-		for i in range(nrmsg):
-			proxy.publish( {"message":"msg %i"%i} )
-		tt = time.time() - tt
-		print "Published %i messages in %f seconds, %f msg/s"%(nrmsg,
-															   tt,
-															   nrmsg/tt)
-
-																   
-
-	except Exception, e:
-#		raise e
-		print  e
-		sys.exit(0)
-	sys.exit(0)
diff --git a/src/tashi/messaging/tashimessaging.py b/src/tashi/messaging/tashimessaging.py
deleted file mode 100644
index 006400f..0000000
--- a/src/tashi/messaging/tashimessaging.py
+++ /dev/null
@@ -1,148 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-from thriftmessaging import *
-import logging
-import Queue
-from ConfigParser import ConfigParser
-import time
-import socket
-import signal
-
-class TashiLogHandler(logging.Handler, PublisherThrift):
-	def __init__(self, config, *args, **kwargs):
-		self.messages = Queue.Queue()
-		self.config = config
-		logging.Handler.__init__(self, *args, **kwargs)
-		PublisherThrift.__init__(self, 
-								 config.get('MessageBroker', 'host'),
-								 int(config.get('MessageBroker', 'port')))
-	def emit(self, record):
-		# 'args', 'created', 'exc_info', 'exc_text', 'filename',
-		# 'funcName', 'getMessage', 'levelname', 'levelno', 'lineno',
-		# 'module', 'msecs', 'msg', 'name', 'pathname', 'process',
-		# 'relativeCreated', 'thread', 'threadName']
-		msg = {}
-		# args
-		# created
-		# exc_info
-		# exc_text
-		msg['log-filename'] = str(record.filename)
-		msg['log-funcname'] = str(record.funcName)
-		msg['log-levelname'] = str(record.levelname)
-		msg['log-level'] = str(record.levelno)
-		msg['log-lineno'] = str(record.lineno)
-		msg['log-module'] = str(record.module)
-		msg['log-msecs'] = str(record.msecs)
-		msg['log-message'] = str(record.msg)
-		msg['log-name'] = str(record.name)
-		msg['log-pathname'] = str(record.pathname)
-		msg['log-process'] = str(record.process)
-		# relativeCreated
-		msg['log-thread'] = str(record.thread)
-		msg['log-threadname'] = str(record.threadName)
-
-		# standard message fields
-		msg['timestamp'] = str(time.time())
-		msg['hostname'] = socket.gethostname()
-		msg['message-type'] = 'log'
-
-		self.messages.put(msg)
-		self.publish(msg)
-
-class TashiSubscriber(SubscriberThrift):
-	def __init__(self, config, port, **kwargs):
-		sys.stdout.flush()
-		brokerPort = int(config.get('MessageBroker', 'port'))
-		self.broker = MessageBrokerThriftProxy(config.get('MessageBroker', 'host'), brokerPort)
-		SubscriberThrift.__init__(self, self.broker, port, **kwargs)
-
-		
-
-##############################
-# Test Code
-##############################
-import unittest
-import sys
-
-class TestTashiSubscriber(TashiSubscriber):
-	def __init__(self, *args, **kwargs):
-		self.messageQueue = Queue.Queue()
-		TashiSubscriber.__init__(self, *args, **kwargs)
-	def handle(self, message):
-		self.messageQueue.put(message)
-
-
-def incrementor(start = 0):
-	while True:
-		a = start
-		start = start + 1
-		yield a
-increment = incrementor()
-
-class TestTashiMessaging(unittest.TestCase):
-	def setUp(self):
-		self.configFiles = [ '../../../etc/TestConfig.cfg']
-		self.config = ConfigParser()
-		self.configFiles = self.config.read(self.configFiles)
-		self.port = int(self.config.get('MessageBroker', 'port'))
-
-		try:
-			self.brokerPid = os.spawnlpe(os.P_NOWAIT, 'python', 'python', 
-										 './messageBroker.py', 
-										 '--port', str(self.port),
-										 os.environ)
-			self.port = self.port + 1
-			# FIXME: what's the best way to wait for the broker to be ready?
-			time.sleep(1)
-		except Exception, e:
-			sys.exit(0)
-		self.initialized = True
-		self.log = logging.getLogger('TestTashiMessaging')
-		self.handler = TashiLogHandler(self.config)
-		self.log.addHandler(self.handler)
-		self.sub = TestTashiSubscriber(self.config, int(self.port) + increment.next())
-	def tearDown(self):
-		os.kill(self.brokerPid, signal.SIGKILL)
-		# FIXME: wait for the port to be ready again
-		time.sleep(2)
-		self.log.removeHandler(self.handler)
-#		 self.sub.broker.removeSubscriber(self.sub)
-		pass
-	def testLog(self):
-		self.log.log(50, "Hello World!")
-		self.handler.messages.get(timeout=5)
-		self.sub.messageQueue.get(timeout=5)
-		self.assertEqual(self.handler.messages.qsize(), 0)
-		self.assertEqual(self.sub.messageQueue.qsize(), 0)
-	def testPublish(self):
-		sys.stdout.flush()
-		self.port = self.port + 1
-		self.handler.publish({'message':'hello world'})
-		self.sub.messageQueue.get(timeout=5)
-		self.assertEqual(self.sub.messageQueue.qsize(), 0)
-		
-
-if __name__=='__main__':
-
-
-#	 logging.basicConfig(level=logging.INFO,
-#						 format="%(asctime)s %(levelname)s:\t %(message)s",
-#						 stream=sys.stdout)
-
-	suite = unittest.TestLoader().loadTestsFromTestCase(TestTashiMessaging)
-	unittest.TextTestRunner(verbosity=2).run(suite)
diff --git a/src/tashi/messaging/threadpool.py b/src/tashi/messaging/threadpool.py
deleted file mode 100644
index 5684ef2..0000000
--- a/src/tashi/messaging/threadpool.py
+++ /dev/null
@@ -1,305 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-import threading
-import time
-import Queue
-import logging
-
-_log = logging.getLogger('tashi.messaging.threadpool')
-
-def threaded(func):
-	def fn(*args, **kwargs):
-		thread = threading.Thread(target=func, args=args, kwargs=kwargs)
-		thread.start()
-		return thread
-	return fn
-
-
-class ThreadPool(Queue.Queue):
-	def __init__(self, size=8, maxsize=0):
-		Queue.Queue.__init__(self, maxsize)
-		for i in range(size):
-			thread = threading.Thread(target=self._worker)
-			thread.setDaemon(True)
-			thread.start()
-	def _worker(self):
-		while True:
-			try:
-				func, args, kwargs = self.get()
-				func(*args, **kwargs)
-			except Exception, e:
-				_log.error(e)
-				# FIXME: do something smarter here, backtrace, log,
-				# allow user-defined error handling...
-				
-	def submit(self, func, *args, **kwargs):
-		self.put((func, args, kwargs))
-	def submitlist(self, func, args, kwargs):
-		self.put((func, args, kwargs))
-
-class ThreadPoolClass:
-	def __init__(self, size=8, maxsize=0):
-		self._threadpool_pool = ThreadPool(size=size, maxsize=maxsize)
-
-
-def threadpool(pool):
-	def dec(func):
-		def fn(*args, **kwargs):
-			pool.submit(func, *args, **kwargs)
-		return fn
-	return dec
-
-def threadpoolmethod(meth):
-	def fn(*args, **kwargs):
-		try:
-			pool = args[0]._threadpool_pool
-		except AttributeError:
-			pool = args[0].__dict__.setdefault('_threadpool_pool', ThreadPool())
-		# FIXME: how do we check parent class?
-#		assert args[0].__class__ == ThreadPoolClass, "Thread pool method must be in a ThreadPoolClass"
-		pool.submit(meth, *args, **kwargs)
-	return fn
-
-def synchronized(lock=None):
-	_log.debug('synchronized decorator factory called')
-	if lock==None:
-		lock = threading.RLock()
-	def dec(func):
-		_log.debug('synchronized decorator called')
-		def fn(*args, **kwargs):
-			_log.debug('getting sync lock')
-			lock.acquire()
-			_log.debug('got sync lock')
-			ex = None
-			try:
-				r = func(*args, **kwargs)
-			except Exception, e:
-				ex = e
-			_log.debug('releasing sync lock')
-			lock.release()
-			_log.debug('released sync lock')
-			if ex != None:
-				raise e
-			return r
-		return fn
-	return dec
-			
-def synchronizedmethod(func):
-	def fn(*args, **kwargs):
-		try:
-			lock = args[0]._synchronized_lock
-		except AttributeError:
-			lock = args[0].__dict__.setdefault('_synchronized_lock', threading.RLock())
-		lock.acquire()
-		ex = None
-		try:
-			func(*args, **kwargs)
-		except Exception, e:
-			ex = e
-		lock.release()
-		if ex != None:
-			raise e
-	return fn
-		
-
-##############################
-# Test Code
-##############################
-import unittest
-import sys
-import time
-
-class TestThreadPool(unittest.TestCase):
-	def setUp(self):
-		self.errmargin = 0.5
-
-	def testUnthreaded(self):
-		queue = Queue.Queue()
-		def slowfunc(sleep=1):
-			time.sleep(sleep)
-			queue.put(None)
-		tt = time.time()
-		for i in range(4):
-			slowfunc()
-		for i in range(4):
-			queue.get()
-		tt = time.time() - tt
-		self.assertAlmostEqual(tt, 4, 1) 
-
-	def testThreaded(self):
-		queue = Queue.Queue()
-		@threaded
-		def slowthreadfunc(sleep=1):
-			time.sleep(sleep)
-			queue.put(None)
-		tt = time.time()
-		for i in range(8):
-			slowthreadfunc()
-		for i in range(8):
-			queue.get()
-		tt = time.time() - tt
-		self.assertAlmostEqual(tt, 1, 1) 
-
-	def testThreadPool(self):
-		pool = ThreadPool(size=4)
-		queue = Queue.Queue()
-		@threadpool(pool)
-		def slowpoolfunc(sleep=1):
-			time.sleep(sleep)
-			queue.put(None)
-		tt = time.time()
-		for i in range(8):
-			slowpoolfunc()
-		for i in range(8):
-			queue.get()
-		tt = time.time() - tt
-		self.assertAlmostEqual(tt, 2, 1) 
-
-	def testUnthreadedMethod(self):
-		queue = Queue.Queue()
-		class slowclass:
-			def __init__(self, sleep=1):
-				self.sleep=sleep
-			def beslow(self):
-				time.sleep(self.sleep)
-				queue.put(None)
-		sc = slowclass()
-		tt = time.time()
-		for i in range(4):
-			sc.beslow()
-		for i in range(4):
-			queue.get()
-		tt = time.time() - tt
-		self.assertAlmostEqual(tt, 4, 1)
-	
-	def testThreadedMethod(self):
-		queue = Queue.Queue()
-		class slowclass:
-			def __init__(self, sleep=1):
-				self.sleep=sleep
-			@threaded
-			def beslow(self):
-				time.sleep(self.sleep)
-				queue.put(None)
-		sc = slowclass()
-		tt = time.time()
-		for i in range(4):
-			sc.beslow()
-		for i in range(4):
-			queue.get()
-		tt = time.time() - tt
-		self.assertAlmostEqual(tt, 1, 1)
-	
-	def testThreadPoolMethod(self):
-		queue = Queue.Queue()
-		class slowclass:
-			def __init__(self, sleep=1):
-				self.sleep=sleep
-			@threadpoolmethod
-			def beslow(self):
-				time.sleep(self.sleep)
-				queue.put(None)
-		sc = slowclass()
-		tt = time.time()
-		for i in range(16):
-			sc.beslow()
-		for i in range(16):
-			queue.get()
-		tt = time.time() - tt
-		self.assertAlmostEqual(tt, 2, 1)
-	
-	def testSynchronized(self):
-		queue = Queue.Queue()
-		@synchronized()
-		def addtoqueue():
-			time.sleep(1)
-			queue.put(None)
-		@threaded
-		def slowthreadfunc():
-			addtoqueue()
-		tt = time.time()
-		for i in range(4):
-			slowthreadfunc()
-		for i in range(4):
-			queue.get()
-		tt = time.time() - tt
-		self.assertAlmostEqual(tt, 4, 1) 
-
-	def testSynchronizedMethod(self):
-		queue = Queue.Queue()
-		class addtoqueue:
-			@synchronizedmethod
-			def addtoqueue1(self):
-				time.sleep(1)
-				queue.put(None)
-			@synchronizedmethod
-			def addtoqueue2(self):
-				time.sleep(1)
-				queue.put(None)
-		atc = addtoqueue()
-		@threaded
-		def slowthreadfunc1():
-			atc.addtoqueue1()
-		@threaded
-		def slowthreadfunc2():
-			atc.addtoqueue2()
-		tt = time.time()
-		for i in range(4):
-			slowthreadfunc1()
-			slowthreadfunc2()
-		for i in range(8):
-			queue.get()
-		tt = time.time() - tt
-		self.assertAlmostEqual(tt, 8, 1) 
-
-	def testUnsynchronizedMethod(self):
-		queue = Queue.Queue()
-		class addtoqueue:
-			def addtoqueue1(self):
-				time.sleep(1)
-				queue.put(None)
-			def addtoqueue2(self):
-				time.sleep(1)
-				queue.put(None)
-		atc = addtoqueue()
-		@threaded
-		def slowthreadfunc1():
-			atc.addtoqueue1()
-		@threaded
-		def slowthreadfunc2():
-			atc.addtoqueue2()
-		tt = time.time()
-		for i in range(4):
-			slowthreadfunc1()
-			slowthreadfunc2()
-		for i in range(8):
-			queue.get()
-		tt = time.time() - tt
-		self.assertAlmostEqual(tt, 1, 1) 
-
-
-
-if __name__=='__main__':
-	import sys
-
-	logging.basicConfig(level=logging.INFO,
-						format="%(asctime)s %(levelname)s:\t %(message)s",
-						stream=sys.stdout)
-
-	suite = unittest.TestLoader().loadTestsFromTestCase(TestThreadPool)
-	unittest.TextTestRunner(verbosity=2).run(suite)
diff --git a/src/tashi/messaging/thriftmessaging.py b/src/tashi/messaging/thriftmessaging.py
deleted file mode 100755
index 0c73ff0..0000000
--- a/src/tashi/messaging/thriftmessaging.py
+++ /dev/null
@@ -1,278 +0,0 @@
-#!/usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-import sys
-import time
-import socket
-import traceback
-import threading
-
-sys.path.append('./gen-py')
-import tashi.messaging.messagingthrift
-import tashi.messaging.messagingthrift.MessageBrokerThrift
-import tashi.messaging.messagingthrift.SubscriberThrift
-from tashi.messaging.messagingthrift.ttypes import *
-
-from thrift import Thrift
-from thrift.transport import TSocket
-from thrift.transport import TTransport
-from thrift.protocol import TBinaryProtocol
-from thrift.server import TServer
-
-from tashi import ConnectionManager
-
-from tashi.messaging.messaging import *
-from tashi.messaging.threadpool import ThreadPoolClass, threadpool, ThreadPool, threadpoolmethod, threaded
-
-class MessageBrokerThrift(MessageBroker):
-	def __init__(self, port, daemon=True):
-		MessageBroker.__init__(self)
-		self.processor = tashi.messaging.messagingthrift.MessageBrokerThrift.Processor(self)
-		self.transport = TSocket.TServerSocket(port)
-		self.tfactory = TTransport.TBufferedTransportFactory()
-		self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
-		self.proxy = ConnectionManager(tashi.messaging.messagingthrift.SubscriberThrift.Client, 0)
-		self.ready = threading.Event()
-#		 self.server = TServer.TSimpleServer(self.processor,
-#											 self.transport,
-#											 self.tfactory,
-#											 self.pfactory)
-#		 self.server = TServer.TThreadPoolServer(self.processor,
-#												 self.transport,
-#												 self.tfactory,
-#												 self.pfactory)
-		self.server = TServer.TThreadedServer(self.processor,
-												self.transport,
-												self.tfactory,
-												self.pfactory)
-		self.publishCalls = 0
-
-		def ssvrthrd():
-			try:
-				# FIXME: Race condition, the ready event should be set after
-				# starting the server.  However, server.serve()
-				# doesn't return under normal circumstances.  This
-				# seems to work in practice, even though it's clearly
-				# wrong.
-				self.ready.set()
-				self.server.serve()
-			except Exception, e:
-				print e
-				sys.stdout.flush()
-				pass
-		svt = threading.Thread(target=ssvrthrd)
-		svt.setDaemon(daemon)
-		svt.start()
-		self.ready.wait()
-	def log(self, message):
-		MessageBroker.log(self, message)
-	@synchronizedmethod
-	def addSubscriber(self, host, port):
-		subscribers = self.getSubscribers()
-		for sub in subscribers:
-			if sub.host == host and sub.port == port:
-				return
-		subscriber = SubscriberThriftProxy(host, port, self.proxy)
-		MessageBroker.addSubscriber(self, subscriber)
-	def removeSubscriber(self, host, port):
-		subscriber = None
-		subscribers = self.getSubscribers()
-		for sub in subscribers:
-			if sub.host == host and sub.port == port:
-				subscriber = sub
-		if subscriber != None:
-			MessageBroker.removeSubscriber(self, subscriber)
-	@synchronizedmethod
-	def publish(self, message):
-		self.publishCalls  = self.publishCalls + 1
-		sys.stdout.flush()
-		MessageBroker.publish(self, message)
-
-class MessageBrokerThriftProxy:
-	def __init__(self, host, port):
-		self.host = host
-		self.port = port
-		self.proxy = ConnectionManager(tashi.messaging.messagingthrift.MessageBrokerThrift.Client,port)
-	@synchronizedmethod
-	def log(self, message):
-		self.proxy[self.host, self.port].log(message)
-	@synchronizedmethod
-	def publish(self, message):
-		self.proxy[self.host, self.port].publish(message)
-	@synchronizedmethod
-	def publishList(self, messages):
-		self.proxy[self.host, self.port].publishList(messages)
-	@synchronizedmethod
-	def addSubscriber(self, subscriber):
-		self.proxy[self.host, self.port].addSubscriber(host=subscriber.host, port=subscriber.port)
-	@synchronizedmethod
-	def removeSubscriber(self, subscriber):
-		self.proxy[self.host, self.port].removeSubscriber(host=subscriber.host, port=subscriber.port)
-
-
-
-class SubscriberThrift(Subscriber, threading.Thread):
-	def __init__(self, broker, port, synchronized=False):
-		self.host = socket.gethostname()
-		self.port = port
-		self.processor = tashi.messaging.messagingthrift.SubscriberThrift.Processor(self)
-		self.transport = TSocket.TServerSocket(port)
-		self.tfactory = TTransport.TBufferedTransportFactory()
-		self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
-		self.server = TServer.TThreadedServer(self.processor,
-											  self.transport,
-											  self.tfactory,
-											  self.pfactory)
-		def ssvrthrd():
-			try:
-				self.server.serve()
-			except Exception, e:
-				pass
-
-
-		self.thread = threading.Thread(target=ssvrthrd)
-		self.thread.setDaemon(True)
-		self.thread.start()
-
-		# We have to call this AFTER initializing our server, so that
-		# the broker can contact us
-		# Wrap this in a try/catch because the broker may not be online yet
-		try:
-			Subscriber.__init__(self, broker,  synchronized=synchronized)		
-		except:
-			pass
-		threading.Thread.__init__(self)
-		self.setDaemon(True)
-		self.start()
-
-	def stop(self):
-#		 # FIXME: this is broken, there is no clear way to stop a
-#		 # Thrift server
-		self.broker.removeSubscriber(self)
-		self.transport.close()
-	def run(self):
-		while(True):
-			# renew subscription every 5 min
-			try:
-				self.broker.addSubscriber(self)
-			except:
-				pass
-			time.sleep(5*60)
-
-class SubscriberThriftProxy:
-	def __init__(self, host, port, proxy, aggregate = 100):
-		self.host = host
-		self.port = port
-		self.proxy = proxy
-		# for some reason, thrift clients are not thread-safe, lock during send
-		self.lock = threading.Lock()
-		self.pending = []
-		self.aggregateSize = aggregate
-	def publish(self, message):
-		self.lock.acquire()
-		sys.stdout.flush()
-		if message.has_key('aggregate') and message['aggregate'] == 'True':
-			self.pending.append(message)
-			if len(self.pending) >= self.aggregateSize:
-				try:
-					self.proxy[self.host, self.port].publishList(self.pending)
-				except Exception, e:
-					print e
-					self.lock.release()
-					raise e
-				self.pending = []
-		else:
-			try:
-				self.proxy[self.host, self.port].publish(message)
-			except Exception, e:
-				sys.stdout.flush()
-				print e
-				self.lock.release()
-				raise e
-		self.lock.release()
-
-class PublisherThrift(Publisher):
-	def __init__(self, host, port):
-		self.host = host
-		self.port = port
-		self.broker = MessageBrokerThriftProxy(host, port)
-		Publisher.__init__(self, self.broker)
-		
-####################
-# Testing Code 
-####################
-
-class TestSubscriberThrift(SubscriberThrift):
-	def __init__(self, *args, **kwargs):
-		self.queue = Queue.Queue()
-		SubscriberThrift.__init__(self, *args, **kwargs)
-	def handle(self, message):
-		self.queue.put(message)
-
-portnum = 1718
-class TestThriftMessaging(unittest.TestCase):
-	def setUp(self):
-		global portnum
-		self.broker = MessageBrokerThrift(portnum)
-		self.brokerPort = portnum
-		portnum = portnum + 1 
-		self.proxy = MessageBrokerThriftProxy('localhost', self.brokerPort)
-		self.publisher = PublisherThrift('localhost', self.brokerPort)
-		self.subscriber = TestSubscriberThrift(self.proxy, portnum)
-		portnum = portnum + 1
-	def tearDown(self):
-		pass
-	def testSetUp(self):
-		pass
-	def testPublish(self):
-		self.publisher.publish( {'message':'hello world'} )
-		self.subscriber.queue.get(True, timeout=5)
-		self.assertEqual(self.subscriber.queue.qsize(), 0)
-	def testPublishList(self):
-		nrmsgs = 10
-		msgs = []
-		for i in range(nrmsgs):
-			msgs.append( {'msgnum':str(i)} )
-		self.publisher.publishList( msgs )
-		for i in range(nrmsgs):
-			self.subscriber.queue.get(True, timeout=5)
-		self.assertEqual(self.subscriber.queue.qsize(), 0)
-	def testAggregate(self):
-		nrmsgs = self.publisher.aggregateSize
-		for i in range(nrmsgs):
-			self.assertEqual(self.subscriber.queue.qsize(), 0)
-			self.publisher.aggregate( {'msgnum':str(i)} )
-		for i in range(nrmsgs):
-			self.subscriber.queue.get(True, timeout=5)
-		self.assertEqual(self.subscriber.queue.qsize(), 0)
-	def testAggregateKeyword(self):
-		nrmsgs = self.publisher.aggregateSize
-		for i in range(nrmsgs):
-			self.assertEqual(self.subscriber.queue.qsize(), 0)
-			self.publisher.publish( {'msgnum':str(i), 'aggregate':'True'} )
-		for i in range(nrmsgs):
-			self.subscriber.queue.get(True, timeout=5)
-		self.assertEqual(self.subscriber.queue.qsize(), 0)
-
-
-if __name__=='__main__':
-	suite = unittest.TestLoader().loadTestsFromTestCase(TestThriftMessaging)
-	unittest.TextTestRunner(verbosity=2).run(suite)
-
-
diff --git a/src/tashi/nodemanager/vmcontrol/xenpv.py b/src/tashi/nodemanager/vmcontrol/xenpv.py
index 7c7de75..9401df3 100644
--- a/src/tashi/nodemanager/vmcontrol/xenpv.py
+++ b/src/tashi/nodemanager/vmcontrol/xenpv.py
@@ -28,7 +28,7 @@
 from tashi.rpycservices.rpyctypes import Errors, InstanceState, TashiException
 from tashi.rpycservices.rpyctypes import Instance, Host
 from tashi import boolean, convertExceptions, ConnectionManager, version
-from tashi.util import isolatedRPC, broken
+from tashi.util import broken
 
 import tashi.parallel
 from tashi.parallel import synchronized, synchronizedmethod
diff --git a/src/tashi/thrift/build.py b/src/tashi/thrift/build.py
deleted file mode 100755
index 42b22fa..0000000
--- a/src/tashi/thrift/build.py
+++ /dev/null
@@ -1,56 +0,0 @@
-#!/usr/bin/python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-import shutil
-import os
-from os import path
-import re
-
-if __name__ == '__main__':
-	if (path.exists('gen-py')):
-		print 'Removing \'gen-py\' directory...'
-		shutil.rmtree('gen-py')
-		
-	if (path.exists('../services')):
-		print 'Removing \'../services\' directory...'
-		shutil.rmtree('../services')
-	
-	if (path.exists('../messaging/messagingthrift')):
-		print 'Removing \'../messaging/messagingthrift\' directory...'
-		shutil.rmtree('../messaging/messagingthrift')
-	
-	print 'Generating Python code for \'services.thrift\'...'
-	os.system('thrift --gen py:new_style services.thrift')
-	
-	print 'Copying generated code to \'tashi.services\' package...'
-	shutil.copytree('gen-py/services', '../services')
-	
-	print 'Generatign Python code for \'messagingthrift\'...'
-	os.system('rm -rf gen-py')
-	os.system('thrift --gen py messagingthrift.thrift')
-	
-	print 'Copying generated code to \'tashi.messaging.messagingthrift\' package...'
-	shutil.copytree(os.path.join('gen-py', 'messagingthrift'),
-			os.path.join('..', 'messaging', 'messagingthrift'))
-
-	print 'Generating Python code for \'layoutlocality.thrift\'...'
-	os.system('thrift --gen py:new_style layoutlocality.thrift')
- 
-	print 'Copying generated code to \'tashi.services\' package...'
-	shutil.copytree('gen-py/layoutlocality', '../services/layoutlocality')
diff --git a/src/tashi/thrift/layoutlocality.thrift b/src/tashi/thrift/layoutlocality.thrift
deleted file mode 100644
index e14910c..0000000
--- a/src/tashi/thrift/layoutlocality.thrift
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-struct BlockLocation {
-       list<string> hosts,           // hostnames of data nodes
-       list<i32> ports,              // ports for data nodes
-       list<string> names,           // hostname:port of data nodes
-       i64 blocknum,
-       i64 offset,
-       i64 length
-}
-
-struct Pathname {
-       string pathname
-}
-
-exception FileNotFoundException {
-       string message
-}
-
-service layoutservice {
-       list <BlockLocation> getFileBlockLocations(1:Pathname path, 2:i64 offset, 3:i64 length)
-                            throws (1:FileNotFoundException ouch),
-}
-
-service localityservice {
-       list <list<double>> getHopCountMatrix(1:list<string> sourceHosts, 2:list<string> destHosts),
-}
diff --git a/src/tashi/thrift/messagingthrift.thrift b/src/tashi/thrift/messagingthrift.thrift
deleted file mode 100644
index 401e9a1..0000000
--- a/src/tashi/thrift/messagingthrift.thrift
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-typedef map<string, string> strstrmap
-
-service SubscriberThrift{
-  # the async keyword seems to slow things down in the simple
-  # tests.  However, with non-trivial subscribers it will be 
-  # necessary to use async here.
-  async void publish(strstrmap message)
-  async void publishList(list<strstrmap> messages)
-}
-
-service MessageBrokerThrift{
-  void log(strstrmap message),
-  void addSubscriber(string host, i16 port)
-  void removeSubscriber(string host, i16 port)
-  async void publish(strstrmap message)
-  async void publishList(list<strstrmap> messages)
-
-}
-
diff --git a/src/tashi/thrift/services.thrift b/src/tashi/thrift/services.thrift
deleted file mode 100644
index fa29c30..0000000
--- a/src/tashi/thrift/services.thrift
+++ /dev/null
@@ -1,166 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-enum Errors {
-	ConvertedException = 1,
-	NoSuchInstanceId = 2,
-	NoSuchVmId = 3,
-	IncorrectVmState = 4,
-	NoSuchHost = 5,
-	NoSuchHostId = 6,
-	InstanceIdAlreadyExists = 7,
-	HostNameMismatch = 8,
-	HostNotUp = 9,
-	HostStateError = 10,
-	InvalidInstance = 11,
-	UnableToResume = 12,
-	UnableToSuspend = 13,
-}
-
-enum InstanceState {
-	Pending = 1,		// Job submitted
-	Activating = 2,		// activateVm has been called, but instantiateVm hasn't finished yet
-	Running = 3,		// Normal state
-	Pausing = 4,		// Beginning pause sequence
-	Paused = 5		// Paused
-	Unpausing = 6,		// Beginning unpause sequence
-	Suspending = 7,		// Beginning suspend sequence
-	Resuming = 8,		// Beginning resume sequence
-	MigratePrep = 9,	// Migrate state #1
-	MigrateTrans = 10,	// Migrate state #2
-	ShuttingDown = 11,	// Beginning exit sequence
-	Destroying = 12,	// Beginning exit sequence
-	Orphaned = 13,		// Host is missing
-	Held = 14,		// Activation failed
-	Exited = 15,		// VM has exited
-	Suspended = 16,		// VM is suspended
-}
-
-enum HostState {
-	Normal = 1,
-	Drained = 2,
-	VersionMismatch = 3
-}
-
-exception TashiException {
-	1: Errors errno
-	2: string msg
-}
-
-struct Host {
-	1:i32 id,
-	2:string name,
-	3:bool up,
-	4:bool decayed,
-	5:HostState state,
-	6:i32 memory,
-	7:i32 cores,
-	8:string version
-	// Other properties (disk?)
-}
-
-struct Network {
-	1:i32 id
-	2:string name
-}
-
-struct User {
-	1:i32 id,
-	2:string name
-}
-
-struct DiskConfiguration {
-	1:string uri,
-	2:bool persistent
-}
-
-struct NetworkConfiguration {
-	1:i32 network,
-	2:string mac,
-	3:string ip
-}
-
-struct Instance {
-	1:i32 id,
-	2:i32 vmId,
-	3:i32 hostId,
-	4:bool decayed,
-	5:InstanceState state,
-	6:i32 userId,
-	7:string name, // User specified
-	8:i32 cores, // User specified
-	9:i32 memory, // User specified
-	10:list<DiskConfiguration> disks, // User specified
-	11:list<NetworkConfiguration> nics // User specified
-	12:map<string, string> hints // User specified
-}
-
-service clustermanagerservice {
-	// Client-facing RPCs
-	Instance createVm(1:Instance instance) throws (1:TashiException e)
-	
-	void shutdownVm(1:i32 instanceId) throws (1:TashiException e)
-	void destroyVm(1:i32 instanceId) throws (1:TashiException e)
-	
-	void suspendVm(1:i32 instanceId) throws (1:TashiException e)
-	Instance resumeVm(1:i32 instanceId) throws (1:TashiException e)
-	
-	void migrateVm(1:i32 instanceId, 2:i32 targetHostId) throws (1:TashiException e)
-	
-	void pauseVm(1:i32 instanceId) throws (1:TashiException e)
-	void unpauseVm(1:i32 instanceId) throws (1:TashiException e)
-	
-	list<Host> getHosts() throws (1:TashiException e)
-	list<Network> getNetworks() throws (1:TashiException e)
-	list<User> getUsers() throws (1:TashiException e)
-
-	list<Instance> getInstances() throws (1:TashiException e)
-	
-	string vmmSpecificCall(1:i32 instanceId, 2:string arg) throws (1:TashiException e)
-	
-	// NodeManager-facing RPCs
-	i32 registerNodeManager(1:Host host, 2:list<Instance> instances) throws (1:TashiException e)
-	void vmUpdate(1:i32 instanceId, 2:Instance instance, 3:InstanceState old) throws (1:TashiException e)
-
-	// Agent-facing RPCs
-	void activateVm(1:i32 instanceId, 2:Host host) throws (1:TashiException e)
-}
-
-service nodemanagerservice {
-	// ClusterManager-facing RPCs
-	i32 instantiateVm(1:Instance instance) throws (1:TashiException e)
-	
-	void shutdownVm(1:i32 vmId) throws (1:TashiException e)
-	void destroyVm(1:i32 vmId) throws (1:TashiException e)
-	
-	void suspendVm(1:i32 vmId, 2:string destination) throws (1:TashiException e)
-	i32 resumeVm(1:Instance instance, 2:string source) throws (1:TashiException e)
-	
-	string prepReceiveVm(1:Instance instance, 2:Host source) throws (1:TashiException e)
-	void migrateVm(1:i32 vmId, 2:Host target, 3:string transportCookie) throws (1:TashiException e)
-	void receiveVm(1:Instance instance, 2:string transportCookie) throws (1:TashiException e)
-	
-	void pauseVm(1:i32 vmId) throws (1:TashiException e)
-	void unpauseVm(1:i32 vmId) throws (1:TashiException e)
-
-	Instance getVmInfo(1:i32 vmId) throws (1:TashiException e)
-	list<i32> listVms() throws (1:TashiException e)
-	
-	string vmmSpecificCall(1:i32 vmId, 2:string arg) throws (1:TashiException e)
-	
-	// Host getHostInfo() throws (1:TashiException e)
-}
diff --git a/src/tashi/util.py b/src/tashi/util.py
index 18b6133..dd8a28d 100644
--- a/src/tashi/util.py
+++ b/src/tashi/util.py
@@ -149,14 +149,6 @@
 	def __delattr__(self, name):
 		return delattr(self.__dict__['__real_obj__'], name)
 
-def isolatedRPC(client, method, *args, **kw):
-	"""Opens and closes a thrift transport for a single RPC call"""
-	if (not client._iprot.trans.isOpen()):
-		client._iprot.trans.open()
-	res = getattr(client, method)(*args, **kw)
-	client._iprot.trans.close()
-	return res
-
 def signalHandler(signalNumber):
 	"""Used to denote a particular function as the signal handler for a 
 	   specific signal"""
@@ -193,7 +185,7 @@
 
 def convertExceptions(oldFunc):
 	"""This converts any exception type into a TashiException so that 
-	   it can be passed over a Thrift RPC"""
+	   it can be passed over an RPC"""
 	def newFunc(*args, **kw):
 		try:
 			return oldFunc(*args, **kw)
diff --git a/src/utils/Makefile b/src/utils/Makefile
deleted file mode 100644
index aea56ee..0000000
--- a/src/utils/Makefile
+++ /dev/null
@@ -1,24 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.    
-
-all: nmd
-
-clean:
-	rm -f ./nmd
-
-nmd: nmd.c
-	${CC} $< -o $@
diff --git a/src/utils/getLocality.py b/src/utils/getLocality.py
deleted file mode 100755
index 49ecb11..0000000
--- a/src/utils/getLocality.py
+++ /dev/null
@@ -1,68 +0,0 @@
-#!/usr/bin/python
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import sys
-import os
-from os import system
-
-import tashi.services.layoutlocality.localityservice as localityservice
-
-from thrift import Thrift
-from thrift.transport import TSocket
-from thrift.transport import TTransport
-from thrift.protocol import TBinaryProtocol
-
-from tashi.util import getConfig
-
-(config, configFiles) = getConfig(["Client"])
-host = config.get('LocalityService', 'host')
-port = int(config.get('LocalityService', 'port'))
-
-socket = TSocket.TSocket(host, port)
-transport = TTransport.TBufferedTransport(socket)
-protocol = TBinaryProtocol.TBinaryProtocol(transport)
-client = localityservice.Client(protocol)
-transport.open()
-
-while True:
-	line1 = "\n"
-	line2 = "\n"
-	while line1 != "":
-		line1 = sys.stdin.readline()
-		if line1 == "":
-			sys.exit(0)
-		if line1 != "\n":
-			break
-	line1 = line1.strip()
-	while line2 != "":
-		line2 = sys.stdin.readline()
-		if line2 == "":
-			sys.exit(0)
-		if line2 != "\n":
-			break
-	line2 = line2.strip()
-
-	sources = line1.split(" ")
-	destinations = line2.split(" ")
-
-	mat = client.getHopCountMatrix(sources, destinations)
-	for r in mat:
-		for c in r:
-			print '%f\t'%c,
-		print '\n',
-	print '\n',
diff --git a/src/utils/nmd.c b/src/utils/nmd.c
deleted file mode 100644
index effa1d2..0000000
--- a/src/utils/nmd.c
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.    
- */
-
-#include <sys/types.h>
-#include <sys/wait.h>
-#include <dirent.h>
-#include <fcntl.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <assert.h>
-
-#define SLEEP_INTERVAL 10
-#define TASHI_PATH "/usr/local/tashi/"
-#define LOG_FILE "/var/log/nodemanager.log"
-
-/* This function changes (on Linux!) its oom scoring, to make it
- * unattractive to kill
- */
-
-void make_invincible()
-{
-	int oom_adj_fd;
-	int r;
-
-	oom_adj_fd = open("/proc/self/oom_adj", O_WRONLY);
-	assert(oom_adj_fd != -1);
-	r = write(oom_adj_fd, "-17\n", 4);
-	assert(r == 4);
-	close(oom_adj_fd);
-
-}
-
-/* This function resets (on Linux!) its oom scoring to default
- */
-void make_vulnerable()
-{
-	int oom_adj_fd;
-	int r;
-
-	oom_adj_fd = open("/proc/self/oom_adj", O_WRONLY);
-	assert(oom_adj_fd != -1);
-	r = write(oom_adj_fd, "0\n", 2);
-	assert(r == 2);
-	close(oom_adj_fd);
-}
-
-int main(int argc, char **argv)
-{
-	char* env[2];
-	int status;
-	DIR* d;
-	int pid;
-	int lfd;
-	int foreground=0;
-
-/* If first argument is "-f", run in foreground */
-	if ((argc > 1) && (strncmp(argv[1], "-f", 3)==0)) {
-		foreground=1;
-	}
-/* If not running in foreground, fork off and exit the parent.
- * The child closes its default file descriptors.
- */
-	if (!foreground) {
-		pid = fork();
-		if (pid != 0) {
-			exit(0);
-		}
-		close(0);
-		close(1);
-		close(2);
-	}
-/* Adjust OOM preference */
-	make_invincible();
-/* Configure environment of children */
-	env[0] = "PYTHONPATH="TASHI_PATH"/src/";
-	env[1] = NULL;
-	while (1) {
-		pid = fork();
-		if (pid == 0) {
-			/* child */
-			/* nodemanagers are vulnerable. Not the supervisor. */
-			make_vulnerable();
-			if (!foreground) {
-				/* If not running fg, open log file */
-				lfd = open(LOG_FILE, O_WRONLY|O_APPEND|O_CREAT);
-				if (lfd < 0) {
-					/* If this failed, open something? */
-					lfd = open("/dev/null", O_WRONLY);
-				}
-				/* Make this fd stdout and stderr */
-				dup2(lfd, 2);
-				dup2(lfd, 1);
-				/* close stdin */
-				close(0);
-			}
-			chdir(TASHI_PATH);
-			/* start node manager with python environment */
-			execle("./bin/nodemanager.py", "./bin/nodemanager.py", NULL, env);
-			exit(-1);
-		}
-		/* sleep before checking for child's status */
-		sleep(SLEEP_INTERVAL);
-		/* catch child exiting and go through loop again */
-		waitpid(pid, &status, 0);
-	}	 /* while (1) */
-}