merge stroucki-dropthrift from trunk
Nodemanager.cfg, TashiDefaults.cfg: remove thrift config items
git-svn-id: https://svn.apache.org/repos/asf/incubator/tashi/branches/stroucki-dropthrift@1297655 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/TODO b/TODO
new file mode 100644
index 0000000..08150be
--- /dev/null
+++ b/TODO
@@ -0,0 +1 @@
+remove thrift cruft
diff --git a/etc/NodeManager.cfg b/etc/NodeManager.cfg
index a47bccf..48f4044 100644
--- a/etc/NodeManager.cfg
+++ b/etc/NodeManager.cfg
@@ -80,7 +80,6 @@
statsInterval = 0.0
;accountingHost = clustermanager
;accountingPort = 2228
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[Security]
authAndEncrypt = False
diff --git a/etc/TashiDefaults.cfg b/etc/TashiDefaults.cfg
index fd034eb..101da69 100644
--- a/etc/TashiDefaults.cfg
+++ b/etc/TashiDefaults.cfg
@@ -57,7 +57,6 @@
allowDuplicateNames = False
;accountingHost = clustermanager
;accountingPort = 2228
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[GetentOverride]
baseData = tashi.clustermanager.data.Pickled
@@ -110,7 +109,6 @@
clusterManagerHost = localhost
clusterManagerPort = 9882
statsInterval = 0.0
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[Qemu]
qemuBin = /usr/bin/kvm
diff --git a/src/tashi/agents/locality-server.py b/src/tashi/agents/locality-server.py
deleted file mode 100755
index a2b64a5..0000000
--- a/src/tashi/agents/locality-server.py
+++ /dev/null
@@ -1,228 +0,0 @@
-#!/usr/bin/python
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# this module provides a service to locate servers that are close
-# to a VM. Uses all-pairs shortest path algorithm. Need to provide
-# a topology for the underlying physical infrastructure.
-
-from socket import gethostname
-import os
-import threading
-import time
-import socket
-
-from tashi.services.ttypes import *
-
-from thrift import Thrift
-from thrift.transport import TSocket
-from thrift.transport import TTransport
-from thrift.protocol import TBinaryProtocol
-from thrift.server import TServer
-
-from tashi.services import clustermanagerservice
-from tashi.util import getConfig, createClient
-from tashi.parallel import *
-
-import tashi.services.layoutlocality.localityservice as localityservice
-
-from numpy import *
-from scipy import *
-
-cnames = {}
-def cannonicalName(hn):
- try:
- if cnames.has_key(hn):
- return cnames[hn]
- r = socket.gethostbyname_ex(hn)[0]
- cnames[hn] = r
- return r
- except:
- return hn
-
-# define matrix multiplication that can be used to calculate a min-plus
-# distance product
-def genMul(A, B, add, mult):
- '''generalized matrix multiplication'''
- C = zeros((shape(A)[0], shape(B)[1]))
- for i in range(shape(C)[0]):
- for j in range(shape(C)[1]):
- C[i,j] = add(mult(A[i,:], B[:,j]))
- return C
-
-def addHost(graph, hostVals, host):
- if not graph.has_key(host):
- graph[host] = []
- if not hostVals.has_key(host):
- hostVals[host] = len(hostVals)
-
-def graphConnect(graph, h1, h2):
- if not h1 in graph[h2]:
- graph[h2].append(h1)
- if not h2 in graph[h1]:
- graph[h1].append(h2)
-
-def graphFromFile(fn = 'serverLayout', graph = {}, hostVals = {}):
- f = open(fn)
- for line in f.readlines():
- line = line.split()
- if len(line) < 1:
- continue
- server = cannonicalName(line[0].strip())
-
- addHost(graph, hostVals, server)
- for peer in line[1:]:
- peer = cannonicalName(peer.strip())
- addHost(graph, hostVals, peer)
- graphConnect(graph, server, peer)
- return graph, hostVals
-
-def graphFromTashi(client, transport, graph={}, hostVals={}):
- print 'getting graph'
- if not transport.isOpen():
- transport.open()
- hosts = client.getHosts()
- instances = client.getInstances()
- for instance in instances:
- host = [cannonicalName(h.name) for h in hosts if h.id == instance.hostId]
- if len(host) <1 :
- print 'cant find vm host'
- continue
- host = host[0]
- print 'host is ', host
- addHost(graph, hostVals, host)
- print 'added host'
- vmhost = cannonicalName(instance.name)
- addHost(graph, hostVals, vmhost)
- print 'added vm'
- graphConnect(graph, host, vmhost)
- print 'connected'
- print 'returning from graphFromTashi'
- return graph, hostVals
-
-
-
-def graphToArray(graph, hostVals):
- a = zeros((len(hostVals), len(hostVals)))
- for host in graph.keys():
- if not hostVals.has_key(host):
- continue
- a[hostVals[host], hostVals[host]] = 1
- for peer in graph[host]:
- if not hostVals.has_key(peer):
- continue
- a[hostVals[host], hostVals[peer]] = 1
- a[a==0] = inf
- for i in range(shape(a)[0]):
- a[i,i]=0
- return a
-
-def shortestPaths(graphArray):
- a = graphArray
- for i in range(math.ceil(math.log(shape(a)[0],2))):
- a = genMul(a,a,min,plus)
- return a
-
-def plus(A, B):
- return A + B
-
-
-def getHopCountMatrix(sourceHosts, destHosts, array, hostVals):
- a = zeros((len(sourceHosts), len(destHosts)))
- a[a==0] = inf
- for i in range(len(sourceHosts)):
- sh = cannonicalName(sourceHosts[i])
- shv = None
- if hostVals.has_key(sh):
- shv = hostVals[sh]
- else:
- print 'host not found', sh
- continue
- for j in range(len(destHosts)):
- dh = cannonicalName(destHosts[j])
- dhv = None
- if hostVals.has_key(dh):
- dhv = hostVals[dh]
- else:
- print 'dest not found', dh
- continue
- print sh, dh, i,j, shv, dhv, array[shv, dhv]
- a[i,j] = array[shv, dhv]
- return a
-
-
-class LocalityService:
- def __init__(self):
- (config, configFiles) = getConfig(["Agent"])
- self.port = int(config.get('LocalityService', 'port'))
- print 'Locality service on port %i' % self.port
- self.processor = localityservice.Processor(self)
- self.transport = TSocket.TServerSocket(self.port)
- self.tfactory = TTransport.TBufferedTransportFactory()
- self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
- self.server = TServer.TThreadedServer(self.processor,
- self.transport,
- self.tfactory,
- self.pfactory)
-
- self.hostVals =[]
- self.array = array([[]])
- self.rtime = 0
-
-
- self.fileName = os.path.expanduser(config.get("LocalityService", "staticLayout"))
- (self.client, self.transport) = createClient(config)
-
- self.server.serve()
-
- @synchronizedmethod
- def refresh(self):
- if time.time() - self.rtime < 10:
- return
- g, self.hostVals = graphFromFile(self.fileName)
- try:
- g, self.hostVals = graphFromTashi(self.client, self.transport, g, self.hostVals)
- except e:
- print e
- print 'could not get instance list from cluster manager'
- print 'graph to array'
- a = graphToArray(g, self.hostVals)
- print 'calling shortest paths ', a.shape
- self.array = shortestPaths(a)
- print 'computed shortest paths'
- print self.array
- print self.hostVals
- @synchronizedmethod
- def getHopCountMatrix(self, sourceHosts, destHosts):
- self.refresh()
- print 'getting hop count matrix for', sourceHosts, destHosts
- hcm = getHopCountMatrix(sourceHosts, destHosts, self.array, self.hostVals)
- print hcm
- return hcm
-
-
-def main():
-
- #XXXstroucki This code has not been updated for several years.
- # It may still be useful as an example.
- import sys
- sys.exit(0);
-
- ls = LocalityService()
-
-if __name__ == "__main__":
- main()
diff --git a/src/tashi/client/client.py b/src/tashi/client/client.py
deleted file mode 100755
index 71b5b20..0000000
--- a/src/tashi/client/client.py
+++ /dev/null
@@ -1,213 +0,0 @@
-#! /usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import inspect
-import os
-import sys
-import types
-from tashi.services.ttypes import *
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
-from thrift.transport.TSocket import TSocket
-
-from tashi.services import clustermanagerservice
-from tashi import vmStates
-
-from tashi.util import getConfig
-
-def makeHTMLTable(list):
- (stdin_r, stdin_w) = os.pipe()
-
-# XXXpipe: find number of columns in current window
- pipe = os.popen("tput cols")
- columns = pipe.read().strip()
- keys = {}
- for k in list:
- for k2 in k.__dict__.keys():
- if (not k2.endswith("Obj")):
- keys[k2] = k2
- if ('id' in keys):
- del keys['id']
- keylist = ['id'] + keys.keys()
- else:
- keylist = keys.keys()
- output = "<html>"
- output = output + "<table>"
- output = output + "<tr>"
- for k in keylist:
- output = output + "<td>%s</td>" % (k)
- output = output + "</tr>"
- for k in list:
- output = output + "<tr>"
- for k2 in keylist:
- if (k2 == "state"):
- output = output + "<td>%s</td>" % (str(vmStates[k.__dict__.get(k2, None)]))
- else:
- output = output + "<td>%s</td>" % (str(k.__dict__.get(k2, None)))
- output = output + "</tr>"
- output = output + "</table>"
- output = output + "</html>"
- pid = os.fork()
- if (pid == 0):
- os.close(stdin_w)
- os.dup2(stdin_r, 0)
- os.close(stdin_r)
- os.execl("/usr/bin/lynx", "/usr/bin/lynx", "-width=%s" % (columns), "-dump", "-stdin")
- sys.exit(-1)
- os.close(stdin_r)
- os.write(stdin_w, output)
- os.close(stdin_w)
- os.waitpid(pid, 0)
-
-def getFunction(argv):
- """Tries to determine the name of the function requested by the user -- may be called multiple times if the binary name is 'client'"""
- function = "None"
- if (len(argv) > 0):
- function = argv[0].strip()
- if (function.rfind("/") != -1):
- function = function[function.rfind("/")+1:]
- if (function.rfind(".") != -1):
- function = function[:function.rfind(".")]
- return function
-
-def getFunctionInfo(m):
- """Gets a string that describes a function from the interface"""
- f = getattr(clustermanagerservice.Iface, m)
- argspec = inspect.getargspec(f)[0][1:]
- return m + inspect.formatargspec(argspec)
-
-def usage():
- """Print program usage"""
- print "Available methods:"
- for m in methods:
- print "\t" + getFunctionInfo(m)
- print
- print "Examples:"
- print "\tgetInstances"
- print "\taddUser 'User(d={\"username\":\"foobar\"})'"
- print "\tremoveUser 2"
- print "\tcreateVM 1 1"
-
-def simpleType(obj):
- """Determines whether an object is a simple type -- used as a helper function to pprint"""
- if (type(obj) is not type([])):
- if (not getattr(obj, "__dict__", None)):
- return True
- return False
-
-def pprint(obj, depth = 0, key = None):
- """My own version of pprint that prints out a dict in a readable, but slightly more compact format"""
- valueManip = lambda x: x
- if (key):
- keyString = key + ": "
- if (key == "state"):
- valueManip = lambda x: vmStates[x]
- else:
- keyString = ""
- if (type(obj) is type([])):
- if (reduce(lambda x, y: x and simpleType(y), obj, True)):
- print (" " * (depth * INDENT)) + keyString + str(obj)
- else:
- print (" " * (depth * INDENT)) + keyString + "["
- for o in obj:
- pprint(o, depth + 1)
- print (" " * (depth * INDENT)) + "]"
- elif (getattr(obj, "__dict__", None)):
- if (reduce(lambda x, y: x and simpleType(y), obj.__dict__.itervalues(), True)):
- print (" " * (depth * INDENT)) + keyString + str(obj)
- else:
- print (" " * (depth * INDENT)) + keyString + "{"
- for (k, v) in obj.__dict__.iteritems():
- pprint(v, depth + 1, k)
- print (" " * (depth * INDENT)) + "}"
- else:
- print (" " * (depth * INDENT)) + keyString + str(valueManip(obj))
-
-def main():
- """Main function for the client program"""
- global INDENT, methods, exitCode
- exitCode = 0
- INDENT = (os.getenv("INDENT", 4))
- methods = filter(lambda x: not x.startswith("__"), clustermanagerservice.Iface.__dict__.keys())
- function = getFunction(sys.argv)
- if (function == "client"):
- function = getFunction(sys.argv[1:])
- if (function == "--makesyms"):
- for m in methods:
- os.symlink(sys.argv[0], m)
- sys.exit(0)
- if (function == "--rmsyms"):
- for m in methods:
- os.unlink(m)
- sys.exit(0)
-
- (config,configFiles) = getConfig(["Client"])
- cfgHost = config.get('Client', 'clusterManagerHost')
- cfgPort = config.get('Client', 'clusterManagerPort')
- cfgTimeout = float(config.get('Client', 'clusterManagerTimeout'))
- host = os.getenv('TASHI_CM_HOST', cfgHost)
- port = os.getenv('TASHI_CM_PORT', cfgPort)
- timeout = float(os.getenv('TASHI_CM_TIMEOUT', cfgTimeout)) * 1000.0
-
- socket = TSocket(host, int(port))
- socket.setTimeout(timeout)
- transport = TBufferedTransport(socket)
- protocol = TBinaryProtocol(transport)
- client = clustermanagerservice.Client(protocol)
- client._transport = transport
- client._transport.open()
- f = getattr(client, function, None)
- if not f:
- usage()
- sys.exit(-1)
- args = map(lambda x: eval(x), sys.argv[1:])
- try:
- res = f(*args)
- def cmp(x, y):
- try:
- if (x.id < y.id):
- return -1
- elif (y.id < x.id):
- return 1
- else:
- return 0
- except Exception, e:
- return 0
- if (type(res) == types.ListType):
- res.sort(cmp)
- if (os.getenv("USE_HTML_TABLES")):
- try:
- makeHTMLTable(res)
- except:
- pprint(res)
- else:
- pprint(res)
- except TashiException, e:
- print e.msg
- exitCode = e.errno
- except TypeError, e:
- print e
- print "\t" + getFunctionInfo(function)
- exitCode = -1
- finally:
- client._transport.close()
- sys.exit(exitCode)
-
-if __name__ == "__main__":
- main()
diff --git a/src/tashi/client/test.py b/src/tashi/client/test.py
deleted file mode 100644
index a53eefa..0000000
--- a/src/tashi/client/test.py
+++ /dev/null
@@ -1,314 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import unittest
-import logging
-import sys
-import signal
-import os.path
-import copy
-import time
-import random
-from ConfigParser import ConfigParser
-
-from tashi.services.ttypes import *
-from thrift.transport.TSocket import TSocket
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
-
-from tashi.services import clustermanagerservice
-from tashi.messaging.threadpool import synchronized
-from tashi.messaging.tashimessaging import TestTashiSubscriber
-
-from tashi.util import getConfig
-
-import tashi.client.client
-
-class ClientConnection(object):
- '''Creates an rpc proxy'''
- def __init__(self, host, port):
- self.host = host
- self.port = port
- self.transport = TBufferedTransport(TSocket(host, int(port)))
- self.protocol = TBinaryProtocol(self.transport)
- self.client = clustermanagerservice.Client(self.protocol)
- self.client._transport = self.transport
- self.client._transport.open()
- def __del__(self):
- self.client._transport.close()
-
-def incrementor(init=0):
- while 1:
- yield init
- init = init + 1
-
-# FIXME: don't duplicate code from clustermanager
-# def getConfig(args):
-# config = ConfigParser()
-# configFiles = [
-# '/usr/share/tashi/ClusterManagerDefaults.cfg',
-# '/etc/tashi/ClusterManager.cfg',
-# os.path.expanduser('~/.tashi/ClusterManager.cfg')
-# ] + ([args[0]] if len(args) > 0 else [])
-
-# configFiles = config.read(configFiles)
-# if len(configFiles) == 0:
-# print >>sys.stderr, 'Unable to find the configuration file\n'
-# sys.exit(3)
-
-# return config
-
-
-class TestClient(unittest.TestCase):
- @synchronized()
- def getPortNum(self):
- return self.portnum.next()
-
- """macro test cases for single-host tests
-
- Assumes cwd is 'src/tashi/client/'
- """
- def setUp(self):
- """Create a CM and single NM on local host"""
- logging.info('setting up test')
-
- (self.config, self.configfiles) = getConfig([])
-
- self.port = 1717 # FIXME: take this (and other things) from config file
- self.portnum = incrementor(self.port)
-
- self.cwd = os.getcwd()
- self.srcd = os.path.dirname(os.path.dirname(self.cwd))
-
- self.environ = copy.copy(os.environ)
- self.environ['PYTHONPATH'] = self.srcd
- logging.info('base path = %s' % self.srcd)
-
- self.nm = os.spawnlpe(os.P_NOWAIT, 'python', 'python',
- os.path.join(self.srcd, 'tashi', 'nodemanager', 'nodemanager.py'),
- self.environ)
- self.cm = os.spawnlpe(os.P_WAIT, 'python', 'python',
- os.path.join(self.srcd, 'tashi', 'clustermanager', 'clustermanager.py'),
- '--drop', '--create',
- os.path.expanduser('~/.tashi/ClusterManager.cfg'),
- self.environ)
- self.cm = os.spawnlpe(os.P_NOWAIT, 'python', 'python',
- os.path.join(self.srcd, 'tashi', 'clustermanager', 'clustermanager.py'),
- os.path.expanduser('~/.tashi/ClusterManager.cfg'),
- self.environ)
- # since we are spawning with P_NOWAIT, we need to sleep to ensure that the CM is listening
- time.sleep(1)
- try:
- self.connection = ClientConnection('localhost', self.config.get('ClusterManagerService', 'port'))
- except Exception, e:
- logging.warning('client connection failed')
- ex = None
- try:
- logging.warning("setUp killing node manager " + str(self.nm))
- os.kill(self.nm, signal.SIGKILL)
- except Exception, e:
- ex = e
- logging.warning('could not kill node manager: '+ str(e))
- try:
- logging.warning('setUp killing cluster manager ' + str(self.cm))
- os.kill(self.cm, signal.SIGKILL)
- except Exception, e:
- ex = e
- logging.warning('could not kill cluster manager: ' + str(e))
- if e != None:
- raise e
-
- logging.info('node manager PID: %i' % self.nm)
- def tearDown(self):
- '''Kill the CM and NM that were created by setUP'''
- logging.info('tearing down test')
- ex = None
- try:
- logging.debug("killing cluster manager " + str(self.cm))
- os.kill(self.cm, signal.SIGKILL)
- except Exception, e:
- ex = e
- logging.error('Could not kill cluster manager: ' + str(e))
-
- try:
- logging.debug("killing node manager " + str(self.nm))
- os.kill(self.nm, signal.SIGKILL)
- except Exception, e:
- ex = e
- logging.error('Could not kill node manager: ' + str(e))
- if ex != None:
- raise ex
- def testSetup(self):
- '''empty test to ensure that setUp code works'''
- logging.info('setting up')
- def testHostManagement(self):
- '''test adding/removing/listing hosts
-
- Right now this just adds a single host: localhost. Eventually
- it should 1) take a list of hosts from a test configuration
- file, 2) ensure that all were added, 3) remove a random
- subset, 4) ensure that they were correctly removed, 5) remove
- all, 6) ensure that they were correctly removed.'''
-
- # get empty host list
- hosts = self.connection.client.getHosts()
- self.assertEqual(hosts, [], 'starting host list not empty: ' + str(hosts) )
-
- # add a host
- host = Host()
- host.hostname = 'localhost'
- host.enabled=True
- self.connection.client.addHost(host)
- hosts = self.connection.client.getHosts()
- self.assertEqual(len(hosts), 1, 'wrong number of hosts %i, should be %i' % (len(hosts), 1) )
- self.assertEqual(hosts[0].hostname, 'localhost', 'wrong hostname: ' + str(hosts[0].hostname) )
-
- # remove first host
- hid = hosts[0].id
- self.connection.client.removeHost(hid)
- hosts = self.connection.client.getHosts()
- self.assertEqual(hosts, [], 'host list not empty after remove: ' + str(hosts) )
-
- def testMessaging(self):
- '''test messaging system started by CM
-
- tests messages published directly, through events in the CM,
- and the log system'''
- # FIXME: add tests for generating events as a side-effect of
- # rpc commands, as well as logging in the CM
- portnum = self.getPortNum()
- self.sub = TestTashiSubscriber(self.config, portnum)
- self.assertEqual(self.sub.messageQueue.qsize(), 0)
- self.pub = tashi.messaging.thriftmessaging.PublisherThrift(self.config.get('MessageBroker', 'host'),
- int(self.config.get('MessageBroker', 'port')))
- self.pub.publish({'message-type':'text', 'message':'Hello World!'})
- time.sleep(0.5)
- print '*** QSIZE', self.sub.messageQueue.qsize()
- self.assertEqual(self.sub.messageQueue.qsize(), 1)
-
- self.log = logging.getLogger(__name__)
- messageHandler = tashi.messaging.tashimessaging.TashiLogHandler(self.config)
- self.log.addHandler(messageHandler)
- # FIXME: why can't we log messages with severity below 'warning'?
- self.log.warning('test log message')
- time.sleep(0.5)
- self.assertEqual(self.sub.messageQueue.qsize(), 2)
-
- # This should generate at least one log message
-# hosts = self.connection.client.getHosts()
-# time.sleep(0.5)
-# if (self.sub.messageQueue.qsize() <= 2):
-# self.fail()
-
- def testUserManagement(self):
- '''test adding/removing/listing users
-
- same as testHostManagement, but with users'''
- usernames = ['sleepy', 'sneezy', 'dopey', 'doc',
- 'grumpy', 'bashful', 'happy']
- # add all users
- for un in usernames:
- user = User()
- user.username = un
- self.connection.client.addUser(user)
- # ensure that all were added
- users = self.connection.client.getUsers()
- self.assertEqual(len(usernames), len(users))
- for user in users:
- usernames.remove(user.username)
- self.assertEqual(0, len(usernames))
- # remove a random subset
- rm = random.sample(users, 4)
- for user in rm:
- self.connection.client.removeUser(user.id)
- users.remove(user)
- newUsers = self.connection.client.getUsers()
- # This ensures that the remaining ones are what we expect:
- for user in newUsers:
- # if there is a user remaining that we asked to be removed,
- # this will throw an exception
- users.remove(user)
- # if a user was removed that we did not intend, this will
- # throw an exception
- self.assertEqual(0, len(users))
-
-# def testInstanceConfigurationManagement(self):
-# '''test adding/removing/listing instance configurations
-
-# same as testHostManagement, but with instance configurations'''
-# self.fail('test not implemented')
- def testHardDiskConfigurationManagement(self):
- '''test adding/removing/listing hard disk configurations
-
- same as testHostManagement, but with hard disk configurations'''
-
- user = User(d={'username':'sleepy'})
- self.connection.client.addUser(user)
- users = self.connection.client.getUsers()
-
- per = PersistentImage()
- per.userId = users[0].id
- per.name = 'sleepy-PersistentImage'
- self.connection.client.addPersistentImage(per)
- pers = self.connection.client.getPersistentImages()
-
- inst = InstanceConfiguration()
- inst.name = 'sleepy-inst'
- inst.memory = 512
- inst.cores = 1
- self.connection.client.addInstanceConfiguration(inst)
- insts = self.connection.client.getInstanceConfigurations()
-
- hdc = HardDiskConfiguration()
- hdc.index = 0
- hdc.persistentImageId = pers[0].id
- hdc.persistent = False
- hdc.instanceConfigurationId = insts[0].id
-
-# def testCreateDestroyShutdown(self):
-# '''test creating/destroying/shutting down VMs
-
-# not implemented'''
-# self.fail('test not implemented')
-# def testSuspendResume(self):
-# '''test suspending/resuming VMs
-
-# not implemented'''
-# self.fail('test not implemented')
-# def testMigrate(self):
-# '''test migration
-
-# not implemented'''
-# self.fail('test not implemented')
-# def testPauseUnpause(self):
-# '''test pausing/unpausing VMs
-
-# not implemented'''
-# self.fail('test not implemented')
-
-
-##############################
-# Test Code
-##############################
-if __name__ == '__main__':
- logging.basicConfig(level=logging.NOTSET,
- format="%(asctime)s %(levelname)s:\t %(message)s",
- stream=sys.stdout)
-
- suite = unittest.TestLoader().loadTestsFromTestCase(TestClient)
- unittest.TextTestRunner(verbosity=2).run(suite)
-
diff --git a/src/tashi/messaging/messageBroker.py b/src/tashi/messaging/messageBroker.py
deleted file mode 100644
index c21b57a..0000000
--- a/src/tashi/messaging/messageBroker.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import ConfigParser
-import getopt
-
-import os
-import sys
-import time
-
-import thriftmessaging
-
-options = []
-long_options = ['port=']
-
-# FIXME: should initialize from config file
-params = {"port":1717}
-
-try:
- optlist, args = getopt.getopt(sys.argv[1:], options, long_options)
-except getopt.GetoptError, err:
- print str(err)
- sys.exit(2)
-
-for opt in optlist:
- if opt[0] == "--port":
- try:
- params["port"] = int(opt[1])
- except:
- print "--port expects an integer, got %s" % opt[1]
- sys.exit(0)
-
-print "Starting message broker on port %i" % params["port"]
-broker = thriftmessaging.MessageBrokerThrift(params["port"], daemon=False)
-
diff --git a/src/tashi/messaging/messaging.py b/src/tashi/messaging/messaging.py
deleted file mode 100644
index c421d5c..0000000
--- a/src/tashi/messaging/messaging.py
+++ /dev/null
@@ -1,337 +0,0 @@
-#!/usr/bin/python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import threading
-import thread
-import sys
-import os
-import socket
-import Queue
-import copy
-import random
-import traceback
-
-from threadpool import ThreadPoolClass, threadpool, ThreadPool
-from threadpool import threadpoolmethod, threaded, synchronized, synchronizedmethod
-
-class RWLock(object):
- """RWLock: Simple reader/writer lock implementation
- FIXME: this implementation will starve writers!
- Methods:
- acquire() : take lock for read access
- release() : release lock from read access
- acquireWrite() : take lock for write access
- releaseWrite() : release lock from write access"""
- def __init__(self):
- self.lock = threading.Condition()
- self.readers = 0
- def acquire(self):
- self.lock.acquire()
- self.readers = self.readers + 1
- self.lock.release()
- def release(self):
- self.lock.acquire()
- self.readers = self.readers - 1
- self.lock.notify()
- self.lock.release()
- def acquireWrite(self):
- self.lock.acquire()
- while self.readers > 0:
- self.lock.wait()
- def releaseWrite(self):
- self.lock.notify()
- self.lock.release()
-
-
-
-class MessageBroker(object):
- def __init__(self):
- self.sublock = RWLock()
- self.subscribers = []
- self.random = random.Random()
- def log(self, msg):
- print "MessageBroker: Got log: '%s'" % str(msg)
- return msg
- def addSubscriber(self, subscriber):
- self.sublock.acquireWrite()
- self.subscribers.append(subscriber)
- l = len(self.subscribers)
- self.sublock.releaseWrite()
- return l
- def publish(self, message):
- removesubs = []
- i = self.random.randint(0,100)
-
-# subscribers = self.getSubscribers()
-# random.shuffle(subscribers)
-
- self.sublock.acquire()
-
- sys.stdout.flush()
-
- for subscriber in self.subscribers:
- try:
- sys.stdout.flush()
- assert(subscriber != self)
- subscriber.publish(message)
- sys.stdout.flush()
- except Exception, e:
- print e
- removesubs.append(subscriber)
-
- self.sublock.release()
-
- if len(removesubs) > 0:
- print "detected %i failed subscribers" % len(removesubs)
- sys.stdout.flush()
- self.sublock.acquireWrite()
- for subscriber in removesubs:
- try:
- self.subscribers.remove(subscriber)
- except:
- pass
- self.sublock.releaseWrite()
- def getSubscribers(self):
- self.sublock.acquire()
- subs = copy.copy(self.subscribers)
- self.sublock.release()
- return subs
- def removeSubscriber(self, subscriber):
- self.sublock.acquireWrite()
- try:
- self.subscribers.remove(subscriber)
- except:
- pass
- self.sublock.releaseWrite()
- def publishList(self, messages):
- for message in messages:
- self.publish(message)
-
-class Subscriber(object):
- def __init__(self, broker, pmatch={}, nmatch={}, synchronized=False):
- self.broker = broker
- self.lock = threading.Lock()
- self.synchronized = synchronized
- self.pmatch={}
- self.nmatch={}
- broker.addSubscriber(self)
- def publish(self, message):
- sys.stdout.flush()
- msg = message
- try:
- if self.synchronized:
- self.lock.acquire()
- msg = self.filter(msg)
- if (msg != None):
- self.handle(msg)
- if self.synchronized:
- self.lock.release()
- except Exception, x:
- if self.synchronized:
- self.lock.release()
- print '%s, %s, %s' % (type(x), x, traceback.format_exc())
- def publishList(self, messages):
- for message in messages:
- self.publish(message)
- def handle(self, message):
- print "Subscriber Default Handler: '%s'" % message
- def setMatch(self, pmatch={}, nmatch={}):
- self.lock.acquire()
- self.pmatch=pmatch
- self.nmatch=nmatch
- self.lock.release()
- def filter(self, message):
- """filter(self, message) : the filter function returns
- the message, modified to be passed to the handler.
- Returning (None) indicates that this is not a message
- we are interested in, and it will not be passed to the
- handler."""
- send = True
- for key in self.pmatch.keys():
- if (not message.has_key(key)):
- send = False
- break
- if self.pmatch[key] != None:
- if message[key] != self.pmatch[key]:
- send = False
- break
- if send == False:
- return None
- for key in message.keys():
- if self.nmatch.has_key(key):
- if self.nmatch[key] == None:
- send = False
- break
- if self.nmatch[key] == message[key]:
- send = False
- break
- if send == False:
- return None
- return message
-
-
-
-class Publisher(object):
- '''Superclass for pub/sub publishers
-
- FIXME: use finer-grained locking'''
- def __init__(self, broker, aggregate=100):
- self.pending = []
- self.pendingLock = threading.Lock()
- self.aggregateSize = aggregate
- self.broker = broker
- @synchronizedmethod
- def publish(self, message):
- if message.has_key('aggregate') and message['aggregate'] == 'True':
- self.aggregate(message)
- return
- else:
- self.broker.publish(message)
- @synchronizedmethod
- def publishList(self, messages):
- self.broker.publishList(messages)
- @synchronizedmethod
- def aggregate(self, message):
- # we can make this lock-less by using a queue for pending
- # messages
- self.pendingLock.acquire()
- self.pending.append(message)
- if len(self.pending) >= self.aggregateSize:
- self.broker.publishList(self.pending)
- self.pending = []
- self.pendingLock.release()
- @synchronizedmethod
- def setBroker(self, broker):
- self.broker = broker
-
-##############################
-# Testing Code
-##############################
-import time
-import unittest
-import sys
-import logging
-
-
-class TestSubscriber(Subscriber):
- def __init__(self, *args, **kwargs):
- self.queue = Queue.Queue()
- Subscriber.__init__(self, *args, **kwargs)
- def handle(self, message):
- self.queue.put(message)
-
-class TestMessaging(unittest.TestCase):
- def setUp(self):
- self.broker = MessageBroker()
- self.publisher = Publisher(self.broker)
- self.subscriber = TestSubscriber(self.broker)
- def testPublish(self):
- self.publisher.publish( {'message':'hello world'} )
- self.assertEqual(self.subscriber.queue.qsize(), 1)
- def testPublishList(self):
- nrmsgs = 10
- msgs = []
- for i in range(nrmsgs):
- msgs.append( {'msgnum':str(i)} )
- self.publisher.publishList( msgs )
- self.assertEqual(self.subscriber.queue.qsize(), nrmsgs)
- def testAggregate(self):
- nrmsgs = self.publisher.aggregateSize
- for i in range(nrmsgs):
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- self.publisher.aggregate( {'msgnum':str(i)} )
- self.assertEqual(self.subscriber.queue.qsize(), nrmsgs)
- def testAggregateKeyword(self):
- nrmsgs = self.publisher.aggregateSize
- for i in range(nrmsgs):
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- self.publisher.publish( {'msgnum':str(i), 'aggregate':'True'} )
- self.assertEqual(self.subscriber.queue.qsize(), nrmsgs)
-
-if __name__ == '__main__':
-
- logging.basicConfig(level=logging.INFO,
- format="%(asctime)s %(levelname)s:\t %(message)s",
- stream=sys.stdout)
-
- suite = unittest.TestLoader().loadTestsFromTestCase(TestMessaging)
- unittest.TextTestRunner(verbosity=2).run(suite)
-
- sys.exit(0)
-
-
-##############################
-# Old/Unused testing code
-##############################
-
-
-
- print 'testing removeSubscriber'
- broker.removeSubscriber(subscriber)
- publisher.publish( {'message':"you shouldn't see this"} )
-
- nsub = NullSubscriber(broker)
- print 'timing publish'
- nrmsg = 100000
- tt = time.time()
- for i in range(nrmsg):
-# publisher.publish( {"message":"hello world!"} )
- publisher.publish( {} )
- tt = time.time() - tt
- print "Published %i messages in %f seconds, %f msg/s"%(nrmsg,
- tt,
- nrmsg/tt)
- broker.removeSubscriber(nsub)
-
- class SlowSubscriber(Subscriber):
- def handle(self, message):
- print 'called slow subscriber with message', message
- time.sleep(1)
- print 'returning from slow subscriber with message', message
- class ThreadedSubscriber(Subscriber):
- @threaded
- def handle(self, message):
- print 'called threaded subscriber with message', message
- time.sleep(1)
- print 'returning from threaded subscriber with message', message
- class ThreadPoolSubscriber(Subscriber, ThreadPoolClass):
- @threadpoolmethod
- def handle(self, message):
- print 'called threadpool subscriber with message', message
- time.sleep(1)
- print 'returning from threadpool subscriber with message', message
-
-
-
- tsub = ThreadedSubscriber(broker)
- for i in range(8):
- publisher.publish( {"msg":str(i)} )
- broker.removeSubscriber(tsub)
- time.sleep(3)
-
- tpsub = ThreadPoolSubscriber(broker)
- for i in range(8):
- publisher.publish( {"msg":str(i)} )
- broker.removeSubscriber(tpsub)
- time.sleep(3)
-
- ssub = SlowSubscriber(broker)
- for i in range(4):
- publisher.publish( {"msg":str(i)} )
- broker.removeSubscriber(ssub)
diff --git a/src/tashi/messaging/soapmessaging.py b/src/tashi/messaging/soapmessaging.py
deleted file mode 100755
index be35fc9..0000000
--- a/src/tashi/messaging/soapmessaging.py
+++ /dev/null
@@ -1,229 +0,0 @@
-#! /usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from messaging import *
-
-import cPickle
-import soaplib.wsgi_soap
-import cherrypy.wsgiserver
-from soaplib.service import soapmethod
-from soaplib.serializers.primitive import *
-import SOAPpy.WSDL
-import time
-
-class MessageBrokerSoap(soaplib.wsgi_soap.SimpleWSGISoapApp, MessageBroker):
- def __init__(self, port):
- soaplib.wsgi_soap.SimpleWSGISoapApp.__init__(self)
- MessageBroker.__init__(self)
- self.port = port
- def trdfn():
- service = self
- server = cherrypy.wsgiserver.CherryPyWSGIServer(("0.0.0.0",port), service)
- server.start()
- threading.Thread(target=trdfn).start()
-
-
- @soapmethod(Array(String), Array(String), _returns=Null)
- def log(self, keys, values):
- message = {}
- if len(keys) != len(values):
- raise Exception, "Different lengths for keys and values"
- for i in range(len(keys)):
- message[keys[i]] = values[i]
- MessageBroker.log(self, message)
-
- @soapmethod(String, Integer, _returns=Null)
- def addSubscriber(self, host, port):
- subscriber = SubscriberSoapProxy(host, port)
- MessageBroker.addSubscriber(self, subscriber)
-
- @soapmethod(String, Integer, _returns=Null)
- def removeSubscriber(self, host, port):
- # should this method really be able to peek into subscriber.host/port
- subscriber = None
- subscribers = self.getSubscribers()
- for subscriber in subscribers:
- if subscriber.host == host and subscriber.port == port:
- subscriber = subscriber
- if subscriber != None:
- MessageBroker.removeSubscriber(self, subscriber)
-
-
- @soapmethod(Array(String), Array(String), _returns=Null)
- def publish(self, keys, values):
- message = {}
- if len(keys) != len(values):
- raise Exception, "Different lengths for keys and values"
- for i in range(len(keys)):
- message[keys[i]] = values[i]
- MessageBroker.publish(self, message)
-
-
-
-class MessageBrokerSoapProxy(object):
- def __init__(self, host, port):
- self.host = host
- self.port = port
- self.connection = SOAPpy.WSDL.Proxy("http://%s:%i/.wsdl"%(host,port))
- def log(self, message):
- keys = []
- values = []
- for k,v in message.items():
- keys.append(k)
- values.append(v)
- self.connection.log(keys=keys, values=values)
- def addSubscriber(self, subscriber):
- self.connection.addSubscriber(host=subscriber.host, port=subscriber.port)
- def publish(self, message):
- keys = []
- values = []
- for k,v in message.items():
- keys.append(k)
- values.append(v)
- self.connection.publish(keys=keys, values=values)
- def removeSubscriber(self, subscriber):
- self.connection.removeSubscriber(host=subscriber.host, port=subscriber.port)
-
-
-
-
-class SubscriberSoap(soaplib.wsgi_soap.SimpleWSGISoapApp, Subscriber):
- def __init__(self, broker, port, synchronized=False):
- soaplib.wsgi_soap.SimpleWSGISoapApp.__init__(self)
- Subscriber.__init__(self, synchronized=synchronized)
- self.host = socket.gethostname()
- self.port = port
- self.broker = broker
- self.server = None
- def trdfn():
- service = self
- self.server = cherrypy.wsgiserver.CherryPyWSGIServer(("0.0.0.0",port), service)
- self.server.start()
- threading.Thread(target=trdfn).start()
-# broker.log("Subscriber started")
- broker.addSubscriber(self)
- @soapmethod(Array(String), Array(String), _returns=Integer)
- def publish(self, keys, values):
- message = {}
- if len(keys) != len(values):
- raise Exception, "Different lengths for keys and values"
- for i in range(len(keys)):
- message[keys[i]] = values[i]
- Subscriber.publish(self, message)
- return 0
- def stop(self):
- self.server.stop()
-
-class SubscriberSoapProxy(object):
- def __init__(self, host, port):
- self.host = host
- self.port = port
- self.connection = SOAPpy.WSDL.Proxy("http://%s:%i/.wsdl"%(host,port))
- def publish(self, message):
- keys = []
- values = []
- for k,v in message.items():
- keys.append(k)
- values.append(v)
- self.connection.publish(keys=keys, values=values)
-
-
-####################
-# Testing Code
-####################
-
-class CustomSubscriber(SubscriberSoap):
- def handle(self, message):
- print "Custom Subscriber: '%s'" % str(message)
-
-class NullSubscriber(SubscriberSoap):
- def handle(self, message):
- pass
-
-
-if __name__ == '__main__':
- try:
- portnum = 1717
-
- print "\ntesting message broker"
- broker = MessageBrokerSoap(portnum)
- proxy = MessageBrokerSoapProxy("localhost", portnum)
- portnum = portnum + 1
-
- print "\ntesting log function"
- proxy.log( {"message":"Hello World!"} )
-# proxy.log("It looks like log works")
-
- print "\ntesting subscriber proxy"
- subscriber = SubscriberSoap(proxy, portnum)
- portnum = portnum + 1
-
- print "\ntesting custom subscriber"
- csub = CustomSubscriber(proxy, portnum)
- portnum = portnum + 1
-
- print "\ntesting publish"
- proxy.publish( {"message":"Hello World!"} )
-
- print "\ntesting stop"
- subscriber.stop()
- proxy.publish( {"message":"Everybody here?"} )
-
- print "\ntesting removeSubscriber"
- proxy.removeSubscriber(csub)
- proxy.publish( {"message":"Nobody home"} )
- proxy.addSubscriber(csub)
- proxy.publish( {"message":"You're back!"} )
-
- print "\ntesting filter"
- csub.setMatch( {"print":"yes"} )
- proxy.publish( {"print":"yes", "message":"this should be printed"} )
- proxy.publish( {"print":"no", "message":"this should NOT be printed"} )
- csub.setMatch()
-
- print "\ntesting publish performance"
- proxy.removeSubscriber(csub)
- nrmsg = 10000
- tt = time.time()
- for i in range(nrmsg):
- proxy.publish( {"message":"msg %i"%i} )
- tt = time.time() - tt
- print "Published %i messages in %f seconds, %f msg/s"%(nrmsg,
- tt,
- nrmsg/tt)
-
- print "\ntesting publish/subscribe performance"
- nsub = NullSubscriber(proxy, portnum)
- portnum = portnum + 1
- nrmsg = 10000
- tt = time.time()
- for i in range(nrmsg):
- proxy.publish( {"message":"msg %i"%i} )
- tt = time.time() - tt
- print "Published %i messages in %f seconds, %f msg/s"%(nrmsg,
- tt,
- nrmsg/tt)
-
-
-
- except Exception, e:
-# raise e
- print e
- sys.exit(0)
- sys.exit(0)
diff --git a/src/tashi/messaging/tashimessaging.py b/src/tashi/messaging/tashimessaging.py
deleted file mode 100644
index 006400f..0000000
--- a/src/tashi/messaging/tashimessaging.py
+++ /dev/null
@@ -1,148 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from thriftmessaging import *
-import logging
-import Queue
-from ConfigParser import ConfigParser
-import time
-import socket
-import signal
-
-class TashiLogHandler(logging.Handler, PublisherThrift):
- def __init__(self, config, *args, **kwargs):
- self.messages = Queue.Queue()
- self.config = config
- logging.Handler.__init__(self, *args, **kwargs)
- PublisherThrift.__init__(self,
- config.get('MessageBroker', 'host'),
- int(config.get('MessageBroker', 'port')))
- def emit(self, record):
- # 'args', 'created', 'exc_info', 'exc_text', 'filename',
- # 'funcName', 'getMessage', 'levelname', 'levelno', 'lineno',
- # 'module', 'msecs', 'msg', 'name', 'pathname', 'process',
- # 'relativeCreated', 'thread', 'threadName']
- msg = {}
- # args
- # created
- # exc_info
- # exc_text
- msg['log-filename'] = str(record.filename)
- msg['log-funcname'] = str(record.funcName)
- msg['log-levelname'] = str(record.levelname)
- msg['log-level'] = str(record.levelno)
- msg['log-lineno'] = str(record.lineno)
- msg['log-module'] = str(record.module)
- msg['log-msecs'] = str(record.msecs)
- msg['log-message'] = str(record.msg)
- msg['log-name'] = str(record.name)
- msg['log-pathname'] = str(record.pathname)
- msg['log-process'] = str(record.process)
- # relativeCreated
- msg['log-thread'] = str(record.thread)
- msg['log-threadname'] = str(record.threadName)
-
- # standard message fields
- msg['timestamp'] = str(time.time())
- msg['hostname'] = socket.gethostname()
- msg['message-type'] = 'log'
-
- self.messages.put(msg)
- self.publish(msg)
-
-class TashiSubscriber(SubscriberThrift):
- def __init__(self, config, port, **kwargs):
- sys.stdout.flush()
- brokerPort = int(config.get('MessageBroker', 'port'))
- self.broker = MessageBrokerThriftProxy(config.get('MessageBroker', 'host'), brokerPort)
- SubscriberThrift.__init__(self, self.broker, port, **kwargs)
-
-
-
-##############################
-# Test Code
-##############################
-import unittest
-import sys
-
-class TestTashiSubscriber(TashiSubscriber):
- def __init__(self, *args, **kwargs):
- self.messageQueue = Queue.Queue()
- TashiSubscriber.__init__(self, *args, **kwargs)
- def handle(self, message):
- self.messageQueue.put(message)
-
-
-def incrementor(start = 0):
- while True:
- a = start
- start = start + 1
- yield a
-increment = incrementor()
-
-class TestTashiMessaging(unittest.TestCase):
- def setUp(self):
- self.configFiles = [ '../../../etc/TestConfig.cfg']
- self.config = ConfigParser()
- self.configFiles = self.config.read(self.configFiles)
- self.port = int(self.config.get('MessageBroker', 'port'))
-
- try:
- self.brokerPid = os.spawnlpe(os.P_NOWAIT, 'python', 'python',
- './messageBroker.py',
- '--port', str(self.port),
- os.environ)
- self.port = self.port + 1
- # FIXME: what's the best way to wait for the broker to be ready?
- time.sleep(1)
- except Exception, e:
- sys.exit(0)
- self.initialized = True
- self.log = logging.getLogger('TestTashiMessaging')
- self.handler = TashiLogHandler(self.config)
- self.log.addHandler(self.handler)
- self.sub = TestTashiSubscriber(self.config, int(self.port) + increment.next())
- def tearDown(self):
- os.kill(self.brokerPid, signal.SIGKILL)
- # FIXME: wait for the port to be ready again
- time.sleep(2)
- self.log.removeHandler(self.handler)
-# self.sub.broker.removeSubscriber(self.sub)
- pass
- def testLog(self):
- self.log.log(50, "Hello World!")
- self.handler.messages.get(timeout=5)
- self.sub.messageQueue.get(timeout=5)
- self.assertEqual(self.handler.messages.qsize(), 0)
- self.assertEqual(self.sub.messageQueue.qsize(), 0)
- def testPublish(self):
- sys.stdout.flush()
- self.port = self.port + 1
- self.handler.publish({'message':'hello world'})
- self.sub.messageQueue.get(timeout=5)
- self.assertEqual(self.sub.messageQueue.qsize(), 0)
-
-
-if __name__=='__main__':
-
-
-# logging.basicConfig(level=logging.INFO,
-# format="%(asctime)s %(levelname)s:\t %(message)s",
-# stream=sys.stdout)
-
- suite = unittest.TestLoader().loadTestsFromTestCase(TestTashiMessaging)
- unittest.TextTestRunner(verbosity=2).run(suite)
diff --git a/src/tashi/messaging/threadpool.py b/src/tashi/messaging/threadpool.py
deleted file mode 100644
index 5684ef2..0000000
--- a/src/tashi/messaging/threadpool.py
+++ /dev/null
@@ -1,305 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import threading
-import time
-import Queue
-import logging
-
-_log = logging.getLogger('tashi.messaging.threadpool')
-
-def threaded(func):
- def fn(*args, **kwargs):
- thread = threading.Thread(target=func, args=args, kwargs=kwargs)
- thread.start()
- return thread
- return fn
-
-
-class ThreadPool(Queue.Queue):
- def __init__(self, size=8, maxsize=0):
- Queue.Queue.__init__(self, maxsize)
- for i in range(size):
- thread = threading.Thread(target=self._worker)
- thread.setDaemon(True)
- thread.start()
- def _worker(self):
- while True:
- try:
- func, args, kwargs = self.get()
- func(*args, **kwargs)
- except Exception, e:
- _log.error(e)
- # FIXME: do something smarter here, backtrace, log,
- # allow user-defined error handling...
-
- def submit(self, func, *args, **kwargs):
- self.put((func, args, kwargs))
- def submitlist(self, func, args, kwargs):
- self.put((func, args, kwargs))
-
-class ThreadPoolClass:
- def __init__(self, size=8, maxsize=0):
- self._threadpool_pool = ThreadPool(size=size, maxsize=maxsize)
-
-
-def threadpool(pool):
- def dec(func):
- def fn(*args, **kwargs):
- pool.submit(func, *args, **kwargs)
- return fn
- return dec
-
-def threadpoolmethod(meth):
- def fn(*args, **kwargs):
- try:
- pool = args[0]._threadpool_pool
- except AttributeError:
- pool = args[0].__dict__.setdefault('_threadpool_pool', ThreadPool())
- # FIXME: how do we check parent class?
-# assert args[0].__class__ == ThreadPoolClass, "Thread pool method must be in a ThreadPoolClass"
- pool.submit(meth, *args, **kwargs)
- return fn
-
-def synchronized(lock=None):
- _log.debug('synchronized decorator factory called')
- if lock==None:
- lock = threading.RLock()
- def dec(func):
- _log.debug('synchronized decorator called')
- def fn(*args, **kwargs):
- _log.debug('getting sync lock')
- lock.acquire()
- _log.debug('got sync lock')
- ex = None
- try:
- r = func(*args, **kwargs)
- except Exception, e:
- ex = e
- _log.debug('releasing sync lock')
- lock.release()
- _log.debug('released sync lock')
- if ex != None:
- raise e
- return r
- return fn
- return dec
-
-def synchronizedmethod(func):
- def fn(*args, **kwargs):
- try:
- lock = args[0]._synchronized_lock
- except AttributeError:
- lock = args[0].__dict__.setdefault('_synchronized_lock', threading.RLock())
- lock.acquire()
- ex = None
- try:
- func(*args, **kwargs)
- except Exception, e:
- ex = e
- lock.release()
- if ex != None:
- raise e
- return fn
-
-
-##############################
-# Test Code
-##############################
-import unittest
-import sys
-import time
-
-class TestThreadPool(unittest.TestCase):
- def setUp(self):
- self.errmargin = 0.5
-
- def testUnthreaded(self):
- queue = Queue.Queue()
- def slowfunc(sleep=1):
- time.sleep(sleep)
- queue.put(None)
- tt = time.time()
- for i in range(4):
- slowfunc()
- for i in range(4):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 4, 1)
-
- def testThreaded(self):
- queue = Queue.Queue()
- @threaded
- def slowthreadfunc(sleep=1):
- time.sleep(sleep)
- queue.put(None)
- tt = time.time()
- for i in range(8):
- slowthreadfunc()
- for i in range(8):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 1, 1)
-
- def testThreadPool(self):
- pool = ThreadPool(size=4)
- queue = Queue.Queue()
- @threadpool(pool)
- def slowpoolfunc(sleep=1):
- time.sleep(sleep)
- queue.put(None)
- tt = time.time()
- for i in range(8):
- slowpoolfunc()
- for i in range(8):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 2, 1)
-
- def testUnthreadedMethod(self):
- queue = Queue.Queue()
- class slowclass:
- def __init__(self, sleep=1):
- self.sleep=sleep
- def beslow(self):
- time.sleep(self.sleep)
- queue.put(None)
- sc = slowclass()
- tt = time.time()
- for i in range(4):
- sc.beslow()
- for i in range(4):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 4, 1)
-
- def testThreadedMethod(self):
- queue = Queue.Queue()
- class slowclass:
- def __init__(self, sleep=1):
- self.sleep=sleep
- @threaded
- def beslow(self):
- time.sleep(self.sleep)
- queue.put(None)
- sc = slowclass()
- tt = time.time()
- for i in range(4):
- sc.beslow()
- for i in range(4):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 1, 1)
-
- def testThreadPoolMethod(self):
- queue = Queue.Queue()
- class slowclass:
- def __init__(self, sleep=1):
- self.sleep=sleep
- @threadpoolmethod
- def beslow(self):
- time.sleep(self.sleep)
- queue.put(None)
- sc = slowclass()
- tt = time.time()
- for i in range(16):
- sc.beslow()
- for i in range(16):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 2, 1)
-
- def testSynchronized(self):
- queue = Queue.Queue()
- @synchronized()
- def addtoqueue():
- time.sleep(1)
- queue.put(None)
- @threaded
- def slowthreadfunc():
- addtoqueue()
- tt = time.time()
- for i in range(4):
- slowthreadfunc()
- for i in range(4):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 4, 1)
-
- def testSynchronizedMethod(self):
- queue = Queue.Queue()
- class addtoqueue:
- @synchronizedmethod
- def addtoqueue1(self):
- time.sleep(1)
- queue.put(None)
- @synchronizedmethod
- def addtoqueue2(self):
- time.sleep(1)
- queue.put(None)
- atc = addtoqueue()
- @threaded
- def slowthreadfunc1():
- atc.addtoqueue1()
- @threaded
- def slowthreadfunc2():
- atc.addtoqueue2()
- tt = time.time()
- for i in range(4):
- slowthreadfunc1()
- slowthreadfunc2()
- for i in range(8):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 8, 1)
-
- def testUnsynchronizedMethod(self):
- queue = Queue.Queue()
- class addtoqueue:
- def addtoqueue1(self):
- time.sleep(1)
- queue.put(None)
- def addtoqueue2(self):
- time.sleep(1)
- queue.put(None)
- atc = addtoqueue()
- @threaded
- def slowthreadfunc1():
- atc.addtoqueue1()
- @threaded
- def slowthreadfunc2():
- atc.addtoqueue2()
- tt = time.time()
- for i in range(4):
- slowthreadfunc1()
- slowthreadfunc2()
- for i in range(8):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 1, 1)
-
-
-
-if __name__=='__main__':
- import sys
-
- logging.basicConfig(level=logging.INFO,
- format="%(asctime)s %(levelname)s:\t %(message)s",
- stream=sys.stdout)
-
- suite = unittest.TestLoader().loadTestsFromTestCase(TestThreadPool)
- unittest.TextTestRunner(verbosity=2).run(suite)
diff --git a/src/tashi/messaging/thriftmessaging.py b/src/tashi/messaging/thriftmessaging.py
deleted file mode 100755
index 0c73ff0..0000000
--- a/src/tashi/messaging/thriftmessaging.py
+++ /dev/null
@@ -1,278 +0,0 @@
-#!/usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import sys
-import time
-import socket
-import traceback
-import threading
-
-sys.path.append('./gen-py')
-import tashi.messaging.messagingthrift
-import tashi.messaging.messagingthrift.MessageBrokerThrift
-import tashi.messaging.messagingthrift.SubscriberThrift
-from tashi.messaging.messagingthrift.ttypes import *
-
-from thrift import Thrift
-from thrift.transport import TSocket
-from thrift.transport import TTransport
-from thrift.protocol import TBinaryProtocol
-from thrift.server import TServer
-
-from tashi import ConnectionManager
-
-from tashi.messaging.messaging import *
-from tashi.messaging.threadpool import ThreadPoolClass, threadpool, ThreadPool, threadpoolmethod, threaded
-
-class MessageBrokerThrift(MessageBroker):
- def __init__(self, port, daemon=True):
- MessageBroker.__init__(self)
- self.processor = tashi.messaging.messagingthrift.MessageBrokerThrift.Processor(self)
- self.transport = TSocket.TServerSocket(port)
- self.tfactory = TTransport.TBufferedTransportFactory()
- self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
- self.proxy = ConnectionManager(tashi.messaging.messagingthrift.SubscriberThrift.Client, 0)
- self.ready = threading.Event()
-# self.server = TServer.TSimpleServer(self.processor,
-# self.transport,
-# self.tfactory,
-# self.pfactory)
-# self.server = TServer.TThreadPoolServer(self.processor,
-# self.transport,
-# self.tfactory,
-# self.pfactory)
- self.server = TServer.TThreadedServer(self.processor,
- self.transport,
- self.tfactory,
- self.pfactory)
- self.publishCalls = 0
-
- def ssvrthrd():
- try:
- # FIXME: Race condition, the ready event should be set after
- # starting the server. However, server.serve()
- # doesn't return under normal circumstances. This
- # seems to work in practice, even though it's clearly
- # wrong.
- self.ready.set()
- self.server.serve()
- except Exception, e:
- print e
- sys.stdout.flush()
- pass
- svt = threading.Thread(target=ssvrthrd)
- svt.setDaemon(daemon)
- svt.start()
- self.ready.wait()
- def log(self, message):
- MessageBroker.log(self, message)
- @synchronizedmethod
- def addSubscriber(self, host, port):
- subscribers = self.getSubscribers()
- for sub in subscribers:
- if sub.host == host and sub.port == port:
- return
- subscriber = SubscriberThriftProxy(host, port, self.proxy)
- MessageBroker.addSubscriber(self, subscriber)
- def removeSubscriber(self, host, port):
- subscriber = None
- subscribers = self.getSubscribers()
- for sub in subscribers:
- if sub.host == host and sub.port == port:
- subscriber = sub
- if subscriber != None:
- MessageBroker.removeSubscriber(self, subscriber)
- @synchronizedmethod
- def publish(self, message):
- self.publishCalls = self.publishCalls + 1
- sys.stdout.flush()
- MessageBroker.publish(self, message)
-
-class MessageBrokerThriftProxy:
- def __init__(self, host, port):
- self.host = host
- self.port = port
- self.proxy = ConnectionManager(tashi.messaging.messagingthrift.MessageBrokerThrift.Client,port)
- @synchronizedmethod
- def log(self, message):
- self.proxy[self.host, self.port].log(message)
- @synchronizedmethod
- def publish(self, message):
- self.proxy[self.host, self.port].publish(message)
- @synchronizedmethod
- def publishList(self, messages):
- self.proxy[self.host, self.port].publishList(messages)
- @synchronizedmethod
- def addSubscriber(self, subscriber):
- self.proxy[self.host, self.port].addSubscriber(host=subscriber.host, port=subscriber.port)
- @synchronizedmethod
- def removeSubscriber(self, subscriber):
- self.proxy[self.host, self.port].removeSubscriber(host=subscriber.host, port=subscriber.port)
-
-
-
-class SubscriberThrift(Subscriber, threading.Thread):
- def __init__(self, broker, port, synchronized=False):
- self.host = socket.gethostname()
- self.port = port
- self.processor = tashi.messaging.messagingthrift.SubscriberThrift.Processor(self)
- self.transport = TSocket.TServerSocket(port)
- self.tfactory = TTransport.TBufferedTransportFactory()
- self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
- self.server = TServer.TThreadedServer(self.processor,
- self.transport,
- self.tfactory,
- self.pfactory)
- def ssvrthrd():
- try:
- self.server.serve()
- except Exception, e:
- pass
-
-
- self.thread = threading.Thread(target=ssvrthrd)
- self.thread.setDaemon(True)
- self.thread.start()
-
- # We have to call this AFTER initializing our server, so that
- # the broker can contact us
- # Wrap this in a try/catch because the broker may not be online yet
- try:
- Subscriber.__init__(self, broker, synchronized=synchronized)
- except:
- pass
- threading.Thread.__init__(self)
- self.setDaemon(True)
- self.start()
-
- def stop(self):
-# # FIXME: this is broken, there is no clear way to stop a
-# # Thrift server
- self.broker.removeSubscriber(self)
- self.transport.close()
- def run(self):
- while(True):
- # renew subscription every 5 min
- try:
- self.broker.addSubscriber(self)
- except:
- pass
- time.sleep(5*60)
-
-class SubscriberThriftProxy:
- def __init__(self, host, port, proxy, aggregate = 100):
- self.host = host
- self.port = port
- self.proxy = proxy
- # for some reason, thrift clients are not thread-safe, lock during send
- self.lock = threading.Lock()
- self.pending = []
- self.aggregateSize = aggregate
- def publish(self, message):
- self.lock.acquire()
- sys.stdout.flush()
- if message.has_key('aggregate') and message['aggregate'] == 'True':
- self.pending.append(message)
- if len(self.pending) >= self.aggregateSize:
- try:
- self.proxy[self.host, self.port].publishList(self.pending)
- except Exception, e:
- print e
- self.lock.release()
- raise e
- self.pending = []
- else:
- try:
- self.proxy[self.host, self.port].publish(message)
- except Exception, e:
- sys.stdout.flush()
- print e
- self.lock.release()
- raise e
- self.lock.release()
-
-class PublisherThrift(Publisher):
- def __init__(self, host, port):
- self.host = host
- self.port = port
- self.broker = MessageBrokerThriftProxy(host, port)
- Publisher.__init__(self, self.broker)
-
-####################
-# Testing Code
-####################
-
-class TestSubscriberThrift(SubscriberThrift):
- def __init__(self, *args, **kwargs):
- self.queue = Queue.Queue()
- SubscriberThrift.__init__(self, *args, **kwargs)
- def handle(self, message):
- self.queue.put(message)
-
-portnum = 1718
-class TestThriftMessaging(unittest.TestCase):
- def setUp(self):
- global portnum
- self.broker = MessageBrokerThrift(portnum)
- self.brokerPort = portnum
- portnum = portnum + 1
- self.proxy = MessageBrokerThriftProxy('localhost', self.brokerPort)
- self.publisher = PublisherThrift('localhost', self.brokerPort)
- self.subscriber = TestSubscriberThrift(self.proxy, portnum)
- portnum = portnum + 1
- def tearDown(self):
- pass
- def testSetUp(self):
- pass
- def testPublish(self):
- self.publisher.publish( {'message':'hello world'} )
- self.subscriber.queue.get(True, timeout=5)
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- def testPublishList(self):
- nrmsgs = 10
- msgs = []
- for i in range(nrmsgs):
- msgs.append( {'msgnum':str(i)} )
- self.publisher.publishList( msgs )
- for i in range(nrmsgs):
- self.subscriber.queue.get(True, timeout=5)
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- def testAggregate(self):
- nrmsgs = self.publisher.aggregateSize
- for i in range(nrmsgs):
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- self.publisher.aggregate( {'msgnum':str(i)} )
- for i in range(nrmsgs):
- self.subscriber.queue.get(True, timeout=5)
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- def testAggregateKeyword(self):
- nrmsgs = self.publisher.aggregateSize
- for i in range(nrmsgs):
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- self.publisher.publish( {'msgnum':str(i), 'aggregate':'True'} )
- for i in range(nrmsgs):
- self.subscriber.queue.get(True, timeout=5)
- self.assertEqual(self.subscriber.queue.qsize(), 0)
-
-
-if __name__=='__main__':
- suite = unittest.TestLoader().loadTestsFromTestCase(TestThriftMessaging)
- unittest.TextTestRunner(verbosity=2).run(suite)
-
-
diff --git a/src/tashi/nodemanager/vmcontrol/xenpv.py b/src/tashi/nodemanager/vmcontrol/xenpv.py
index 7c7de75..9401df3 100644
--- a/src/tashi/nodemanager/vmcontrol/xenpv.py
+++ b/src/tashi/nodemanager/vmcontrol/xenpv.py
@@ -28,7 +28,7 @@
from tashi.rpycservices.rpyctypes import Errors, InstanceState, TashiException
from tashi.rpycservices.rpyctypes import Instance, Host
from tashi import boolean, convertExceptions, ConnectionManager, version
-from tashi.util import isolatedRPC, broken
+from tashi.util import broken
import tashi.parallel
from tashi.parallel import synchronized, synchronizedmethod
diff --git a/src/tashi/thrift/build.py b/src/tashi/thrift/build.py
deleted file mode 100755
index 42b22fa..0000000
--- a/src/tashi/thrift/build.py
+++ /dev/null
@@ -1,56 +0,0 @@
-#!/usr/bin/python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import shutil
-import os
-from os import path
-import re
-
-if __name__ == '__main__':
- if (path.exists('gen-py')):
- print 'Removing \'gen-py\' directory...'
- shutil.rmtree('gen-py')
-
- if (path.exists('../services')):
- print 'Removing \'../services\' directory...'
- shutil.rmtree('../services')
-
- if (path.exists('../messaging/messagingthrift')):
- print 'Removing \'../messaging/messagingthrift\' directory...'
- shutil.rmtree('../messaging/messagingthrift')
-
- print 'Generating Python code for \'services.thrift\'...'
- os.system('thrift --gen py:new_style services.thrift')
-
- print 'Copying generated code to \'tashi.services\' package...'
- shutil.copytree('gen-py/services', '../services')
-
- print 'Generatign Python code for \'messagingthrift\'...'
- os.system('rm -rf gen-py')
- os.system('thrift --gen py messagingthrift.thrift')
-
- print 'Copying generated code to \'tashi.messaging.messagingthrift\' package...'
- shutil.copytree(os.path.join('gen-py', 'messagingthrift'),
- os.path.join('..', 'messaging', 'messagingthrift'))
-
- print 'Generating Python code for \'layoutlocality.thrift\'...'
- os.system('thrift --gen py:new_style layoutlocality.thrift')
-
- print 'Copying generated code to \'tashi.services\' package...'
- shutil.copytree('gen-py/layoutlocality', '../services/layoutlocality')
diff --git a/src/tashi/thrift/layoutlocality.thrift b/src/tashi/thrift/layoutlocality.thrift
deleted file mode 100644
index e14910c..0000000
--- a/src/tashi/thrift/layoutlocality.thrift
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-struct BlockLocation {
- list<string> hosts, // hostnames of data nodes
- list<i32> ports, // ports for data nodes
- list<string> names, // hostname:port of data nodes
- i64 blocknum,
- i64 offset,
- i64 length
-}
-
-struct Pathname {
- string pathname
-}
-
-exception FileNotFoundException {
- string message
-}
-
-service layoutservice {
- list <BlockLocation> getFileBlockLocations(1:Pathname path, 2:i64 offset, 3:i64 length)
- throws (1:FileNotFoundException ouch),
-}
-
-service localityservice {
- list <list<double>> getHopCountMatrix(1:list<string> sourceHosts, 2:list<string> destHosts),
-}
diff --git a/src/tashi/thrift/messagingthrift.thrift b/src/tashi/thrift/messagingthrift.thrift
deleted file mode 100644
index 401e9a1..0000000
--- a/src/tashi/thrift/messagingthrift.thrift
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-typedef map<string, string> strstrmap
-
-service SubscriberThrift{
- # the async keyword seems to slow things down in the simple
- # tests. However, with non-trivial subscribers it will be
- # necessary to use async here.
- async void publish(strstrmap message)
- async void publishList(list<strstrmap> messages)
-}
-
-service MessageBrokerThrift{
- void log(strstrmap message),
- void addSubscriber(string host, i16 port)
- void removeSubscriber(string host, i16 port)
- async void publish(strstrmap message)
- async void publishList(list<strstrmap> messages)
-
-}
-
diff --git a/src/tashi/thrift/services.thrift b/src/tashi/thrift/services.thrift
deleted file mode 100644
index fa29c30..0000000
--- a/src/tashi/thrift/services.thrift
+++ /dev/null
@@ -1,166 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-enum Errors {
- ConvertedException = 1,
- NoSuchInstanceId = 2,
- NoSuchVmId = 3,
- IncorrectVmState = 4,
- NoSuchHost = 5,
- NoSuchHostId = 6,
- InstanceIdAlreadyExists = 7,
- HostNameMismatch = 8,
- HostNotUp = 9,
- HostStateError = 10,
- InvalidInstance = 11,
- UnableToResume = 12,
- UnableToSuspend = 13,
-}
-
-enum InstanceState {
- Pending = 1, // Job submitted
- Activating = 2, // activateVm has been called, but instantiateVm hasn't finished yet
- Running = 3, // Normal state
- Pausing = 4, // Beginning pause sequence
- Paused = 5 // Paused
- Unpausing = 6, // Beginning unpause sequence
- Suspending = 7, // Beginning suspend sequence
- Resuming = 8, // Beginning resume sequence
- MigratePrep = 9, // Migrate state #1
- MigrateTrans = 10, // Migrate state #2
- ShuttingDown = 11, // Beginning exit sequence
- Destroying = 12, // Beginning exit sequence
- Orphaned = 13, // Host is missing
- Held = 14, // Activation failed
- Exited = 15, // VM has exited
- Suspended = 16, // VM is suspended
-}
-
-enum HostState {
- Normal = 1,
- Drained = 2,
- VersionMismatch = 3
-}
-
-exception TashiException {
- 1: Errors errno
- 2: string msg
-}
-
-struct Host {
- 1:i32 id,
- 2:string name,
- 3:bool up,
- 4:bool decayed,
- 5:HostState state,
- 6:i32 memory,
- 7:i32 cores,
- 8:string version
- // Other properties (disk?)
-}
-
-struct Network {
- 1:i32 id
- 2:string name
-}
-
-struct User {
- 1:i32 id,
- 2:string name
-}
-
-struct DiskConfiguration {
- 1:string uri,
- 2:bool persistent
-}
-
-struct NetworkConfiguration {
- 1:i32 network,
- 2:string mac,
- 3:string ip
-}
-
-struct Instance {
- 1:i32 id,
- 2:i32 vmId,
- 3:i32 hostId,
- 4:bool decayed,
- 5:InstanceState state,
- 6:i32 userId,
- 7:string name, // User specified
- 8:i32 cores, // User specified
- 9:i32 memory, // User specified
- 10:list<DiskConfiguration> disks, // User specified
- 11:list<NetworkConfiguration> nics // User specified
- 12:map<string, string> hints // User specified
-}
-
-service clustermanagerservice {
- // Client-facing RPCs
- Instance createVm(1:Instance instance) throws (1:TashiException e)
-
- void shutdownVm(1:i32 instanceId) throws (1:TashiException e)
- void destroyVm(1:i32 instanceId) throws (1:TashiException e)
-
- void suspendVm(1:i32 instanceId) throws (1:TashiException e)
- Instance resumeVm(1:i32 instanceId) throws (1:TashiException e)
-
- void migrateVm(1:i32 instanceId, 2:i32 targetHostId) throws (1:TashiException e)
-
- void pauseVm(1:i32 instanceId) throws (1:TashiException e)
- void unpauseVm(1:i32 instanceId) throws (1:TashiException e)
-
- list<Host> getHosts() throws (1:TashiException e)
- list<Network> getNetworks() throws (1:TashiException e)
- list<User> getUsers() throws (1:TashiException e)
-
- list<Instance> getInstances() throws (1:TashiException e)
-
- string vmmSpecificCall(1:i32 instanceId, 2:string arg) throws (1:TashiException e)
-
- // NodeManager-facing RPCs
- i32 registerNodeManager(1:Host host, 2:list<Instance> instances) throws (1:TashiException e)
- void vmUpdate(1:i32 instanceId, 2:Instance instance, 3:InstanceState old) throws (1:TashiException e)
-
- // Agent-facing RPCs
- void activateVm(1:i32 instanceId, 2:Host host) throws (1:TashiException e)
-}
-
-service nodemanagerservice {
- // ClusterManager-facing RPCs
- i32 instantiateVm(1:Instance instance) throws (1:TashiException e)
-
- void shutdownVm(1:i32 vmId) throws (1:TashiException e)
- void destroyVm(1:i32 vmId) throws (1:TashiException e)
-
- void suspendVm(1:i32 vmId, 2:string destination) throws (1:TashiException e)
- i32 resumeVm(1:Instance instance, 2:string source) throws (1:TashiException e)
-
- string prepReceiveVm(1:Instance instance, 2:Host source) throws (1:TashiException e)
- void migrateVm(1:i32 vmId, 2:Host target, 3:string transportCookie) throws (1:TashiException e)
- void receiveVm(1:Instance instance, 2:string transportCookie) throws (1:TashiException e)
-
- void pauseVm(1:i32 vmId) throws (1:TashiException e)
- void unpauseVm(1:i32 vmId) throws (1:TashiException e)
-
- Instance getVmInfo(1:i32 vmId) throws (1:TashiException e)
- list<i32> listVms() throws (1:TashiException e)
-
- string vmmSpecificCall(1:i32 vmId, 2:string arg) throws (1:TashiException e)
-
- // Host getHostInfo() throws (1:TashiException e)
-}
diff --git a/src/tashi/util.py b/src/tashi/util.py
index 18b6133..dd8a28d 100644
--- a/src/tashi/util.py
+++ b/src/tashi/util.py
@@ -149,14 +149,6 @@
def __delattr__(self, name):
return delattr(self.__dict__['__real_obj__'], name)
-def isolatedRPC(client, method, *args, **kw):
- """Opens and closes a thrift transport for a single RPC call"""
- if (not client._iprot.trans.isOpen()):
- client._iprot.trans.open()
- res = getattr(client, method)(*args, **kw)
- client._iprot.trans.close()
- return res
-
def signalHandler(signalNumber):
"""Used to denote a particular function as the signal handler for a
specific signal"""
@@ -193,7 +185,7 @@
def convertExceptions(oldFunc):
"""This converts any exception type into a TashiException so that
- it can be passed over a Thrift RPC"""
+ it can be passed over an RPC"""
def newFunc(*args, **kw):
try:
return oldFunc(*args, **kw)
diff --git a/src/utils/Makefile b/src/utils/Makefile
deleted file mode 100644
index aea56ee..0000000
--- a/src/utils/Makefile
+++ /dev/null
@@ -1,24 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-all: nmd
-
-clean:
- rm -f ./nmd
-
-nmd: nmd.c
- ${CC} $< -o $@
diff --git a/src/utils/getLocality.py b/src/utils/getLocality.py
deleted file mode 100755
index 49ecb11..0000000
--- a/src/utils/getLocality.py
+++ /dev/null
@@ -1,68 +0,0 @@
-#!/usr/bin/python
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import sys
-import os
-from os import system
-
-import tashi.services.layoutlocality.localityservice as localityservice
-
-from thrift import Thrift
-from thrift.transport import TSocket
-from thrift.transport import TTransport
-from thrift.protocol import TBinaryProtocol
-
-from tashi.util import getConfig
-
-(config, configFiles) = getConfig(["Client"])
-host = config.get('LocalityService', 'host')
-port = int(config.get('LocalityService', 'port'))
-
-socket = TSocket.TSocket(host, port)
-transport = TTransport.TBufferedTransport(socket)
-protocol = TBinaryProtocol.TBinaryProtocol(transport)
-client = localityservice.Client(protocol)
-transport.open()
-
-while True:
- line1 = "\n"
- line2 = "\n"
- while line1 != "":
- line1 = sys.stdin.readline()
- if line1 == "":
- sys.exit(0)
- if line1 != "\n":
- break
- line1 = line1.strip()
- while line2 != "":
- line2 = sys.stdin.readline()
- if line2 == "":
- sys.exit(0)
- if line2 != "\n":
- break
- line2 = line2.strip()
-
- sources = line1.split(" ")
- destinations = line2.split(" ")
-
- mat = client.getHopCountMatrix(sources, destinations)
- for r in mat:
- for c in r:
- print '%f\t'%c,
- print '\n',
- print '\n',
diff --git a/src/utils/nmd.c b/src/utils/nmd.c
deleted file mode 100644
index effa1d2..0000000
--- a/src/utils/nmd.c
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <sys/types.h>
-#include <sys/wait.h>
-#include <dirent.h>
-#include <fcntl.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <assert.h>
-
-#define SLEEP_INTERVAL 10
-#define TASHI_PATH "/usr/local/tashi/"
-#define LOG_FILE "/var/log/nodemanager.log"
-
-/* This function changes (on Linux!) its oom scoring, to make it
- * unattractive to kill
- */
-
-void make_invincible()
-{
- int oom_adj_fd;
- int r;
-
- oom_adj_fd = open("/proc/self/oom_adj", O_WRONLY);
- assert(oom_adj_fd != -1);
- r = write(oom_adj_fd, "-17\n", 4);
- assert(r == 4);
- close(oom_adj_fd);
-
-}
-
-/* This function resets (on Linux!) its oom scoring to default
- */
-void make_vulnerable()
-{
- int oom_adj_fd;
- int r;
-
- oom_adj_fd = open("/proc/self/oom_adj", O_WRONLY);
- assert(oom_adj_fd != -1);
- r = write(oom_adj_fd, "0\n", 2);
- assert(r == 2);
- close(oom_adj_fd);
-}
-
-int main(int argc, char **argv)
-{
- char* env[2];
- int status;
- DIR* d;
- int pid;
- int lfd;
- int foreground=0;
-
-/* If first argument is "-f", run in foreground */
- if ((argc > 1) && (strncmp(argv[1], "-f", 3)==0)) {
- foreground=1;
- }
-/* If not running in foreground, fork off and exit the parent.
- * The child closes its default file descriptors.
- */
- if (!foreground) {
- pid = fork();
- if (pid != 0) {
- exit(0);
- }
- close(0);
- close(1);
- close(2);
- }
-/* Adjust OOM preference */
- make_invincible();
-/* Configure environment of children */
- env[0] = "PYTHONPATH="TASHI_PATH"/src/";
- env[1] = NULL;
- while (1) {
- pid = fork();
- if (pid == 0) {
- /* child */
- /* nodemanagers are vulnerable. Not the supervisor. */
- make_vulnerable();
- if (!foreground) {
- /* If not running fg, open log file */
- lfd = open(LOG_FILE, O_WRONLY|O_APPEND|O_CREAT);
- if (lfd < 0) {
- /* If this failed, open something? */
- lfd = open("/dev/null", O_WRONLY);
- }
- /* Make this fd stdout and stderr */
- dup2(lfd, 2);
- dup2(lfd, 1);
- /* close stdin */
- close(0);
- }
- chdir(TASHI_PATH);
- /* start node manager with python environment */
- execle("./bin/nodemanager.py", "./bin/nodemanager.py", NULL, env);
- exit(-1);
- }
- /* sleep before checking for child's status */
- sleep(SLEEP_INTERVAL);
- /* catch child exiting and go through loop again */
- waitpid(pid, &status, 0);
- } /* while (1) */
-}