blob: 0009726fe7fbb6011fb440dfe8f498cf5b73c446 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
""" Console API for Qpid Management Framework """
import os
import qpid
import struct
import socket
import re
from qpid.peer import Closed
from qpid.connection import Connection, ConnectionFailed
from qpid.datatypes import UUID, uuid4, Message, RangedSet
from qpid.util import connect, ssl, URL
from qpid.codec010 import StringCodec as Codec
from threading import Lock, Condition, Thread
from time import time, strftime, gmtime
from cStringIO import StringIO
class Console:
""" To access the asynchronous operations, a class must be derived from
Console with overrides of any combination of the available methods. """
def brokerConnected(self, broker):
""" Invoked when a connection is established to a broker """
pass
def brokerDisconnected(self, broker):
""" Invoked when the connection to a broker is lost """
pass
def newPackage(self, name):
""" Invoked when a QMF package is discovered. """
pass
def newClass(self, kind, classKey):
""" Invoked when a new class is discovered. Session.getSchema can be
used to obtain details about the class."""
pass
def newAgent(self, agent):
""" Invoked when a QMF agent is discovered. """
pass
def delAgent(self, agent):
""" Invoked when a QMF agent disconects. """
pass
def objectProps(self, broker, record):
""" Invoked when an object is updated. """
pass
def objectStats(self, broker, record):
""" Invoked when an object is updated. """
pass
def event(self, broker, event):
""" Invoked when an event is raised. """
pass
def heartbeat(self, agent, timestamp):
""" """
pass
def brokerInfo(self, broker):
""" """
pass
def methodResponse(self, broker, seq, response):
""" """
pass
class BrokerURL(URL):
def __init__(self, text):
URL.__init__(self, text)
socket.gethostbyname(self.host)
if self.port is None:
if self.scheme == URL.AMQPS:
self.port = 5671
else:
self.port = 5672
self.authName = self.user or "guest"
self.authPass = self.password or "guest"
self.authMech = "PLAIN"
def name(self):
return self.host + ":" + str(self.port)
def match(self, host, port):
return socket.gethostbyname(self.host) == socket.gethostbyname(host) and self.port == port
class Session:
"""
An instance of the Session class represents a console session running
against one or more QMF brokers. A single instance of Session is needed
to interact with the management framework as a console.
"""
_CONTEXT_SYNC = 1
_CONTEXT_STARTUP = 2
_CONTEXT_MULTIGET = 3
GET_WAIT_TIME = 60
def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True,
manageConnections=False, userBindings=False):
"""
Initialize a session. If the console argument is provided, the
more advanced asynchronous features are available. If console is
defaulted, the session will operate in a simpler, synchronous manner.
The rcvObjects, rcvEvents, and rcvHeartbeats arguments are meaningful only if 'console'
is provided. They control whether object updates, events, and agent-heartbeats are
subscribed to. If the console is not interested in receiving one or more of the above,
setting the argument to False will reduce tha bandwidth used by the API.
If manageConnections is set to True, the Session object will manage connections to
the brokers. This means that if a broker is unreachable, it will retry until a connection
can be established. If a connection is lost, the Session will attempt to reconnect.
If manageConnections is set to False, the user is responsible for handing failures. In
this case, an unreachable broker will cause addBroker to raise an exception.
If userBindings is set to False (the default) and rcvObjects is True, the console will
receive data for all object classes. If userBindings is set to True, the user must select
which classes the console shall receive by invoking the bindPackage or bindClass methods.
This allows the console to be configured to receive only information that is relavant to
a particular application. If rcvObjects id False, userBindings has no meaning.
"""
self.console = console
self.brokers = []
self.packages = {}
self.seqMgr = SequenceManager()
self.cv = Condition()
self.syncSequenceList = []
self.getResult = []
self.getSelect = []
self.error = None
self.rcvObjects = rcvObjects
self.rcvEvents = rcvEvents
self.rcvHeartbeats = rcvHeartbeats
self.userBindings = userBindings
if self.console == None:
self.rcvObjects = False
self.rcvEvents = False
self.rcvHeartbeats = False
self.bindingKeyList = self._bindingKeys()
self.manageConnections = manageConnections
if self.userBindings and not self.rcvObjects:
raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
def __repr__(self):
return "QMF Console Session Manager (brokers: %d)" % len(self.brokers)
def addBroker(self, target="localhost"):
""" Connect to a Qpid broker. Returns an object of type Broker. """
url = BrokerURL(target)
broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass,
ssl = url.scheme == URL.AMQPS)
self.brokers.append(broker)
if not self.manageConnections:
self.getObjects(broker=broker, _class="agent")
return broker
def delBroker(self, broker):
""" Disconnect from a broker. The 'broker' argument is the object
returned from the addBroker call """
broker._shutdown()
self.brokers.remove(broker)
del broker
def getPackages(self):
""" Get the list of known QMF packages """
for broker in self.brokers:
broker._waitForStable()
list = []
for package in self.packages:
list.append(package)
return list
def getClasses(self, packageName):
""" Get the list of known classes within a QMF package """
for broker in self.brokers:
broker._waitForStable()
list = []
if packageName in self.packages:
for pkey in self.packages[packageName]:
list.append(self.packages[packageName][pkey].getKey())
return list
def getSchema(self, classKey):
""" Get the schema for a QMF class """
for broker in self.brokers:
broker._waitForStable()
pname = classKey.getPackageName()
pkey = classKey.getPackageKey()
if pname in self.packages:
if pkey in self.packages[pname]:
return self.packages[pname][pkey]
def bindPackage(self, packageName):
""" Request object updates for all table classes within a package. """
if not self.userBindings or not self.rcvObjects:
raise Exception("userBindings option not set for Session")
key = "console.obj.*.*.%s.#" % packageName
self.bindingKeyList.append(key)
for broker in self.brokers:
if broker.isConnected():
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
binding_key=key)
def bindClass(self, pname, cname):
""" Request object updates for a particular table class by package and class name. """
if not self.userBindings or not self.rcvObjects:
raise Exception("userBindings option not set for Session")
key = "console.obj.*.*.%s.%s.#" % (pname, cname)
self.bindingKeyList.append(key)
for broker in self.brokers:
if broker.isConnected():
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
binding_key=key)
def bindClassKey(self, classKey):
""" Request object updates for a particular table class by class key. """
pname = classKey.getPackageName()
cname = classKey.getClassName()
self.bindClass(pname, cname)
def getAgents(self, broker=None):
""" Get a list of currently known agents """
brokerList = []
if broker == None:
for b in self.brokers:
brokerList.append(b)
else:
brokerList.append(broker)
for b in brokerList:
b._waitForStable()
agentList = []
for b in brokerList:
for a in b.getAgents():
agentList.append(a)
return agentList
def getObjects(self, **kwargs):
""" Get a list of objects from QMF agents.
All arguments are passed by name(keyword).
The class for queried objects may be specified in one of the following ways:
_schema = <schema> - supply a schema object returned from getSchema.
_key = <key> - supply a classKey from the list returned by getClasses.
_class = <name> - supply a class name as a string. If the class name exists
in multiple packages, a _package argument may also be supplied.
_objectId = <id> - get the object referenced by the object-id
If objects should be obtained from only one agent, use the following argument.
Otherwise, the query will go to all agents.
_agent = <agent> - supply an agent from the list returned by getAgents.
If the get query is to be restricted to one broker (as opposed to all connected brokers),
add the following argument:
_broker = <broker> - supply a broker as returned by addBroker.
If additional arguments are supplied, they are used as property selectors. For example,
if the argument name="test" is supplied, only objects whose "name" property is "test"
will be returned in the result.
"""
if "_broker" in kwargs:
brokerList = []
brokerList.append(kwargs["_broker"])
else:
brokerList = self.brokers
for broker in brokerList:
broker._waitForStable()
agentList = []
if "_agent" in kwargs:
agent = kwargs["_agent"]
if agent.broker not in brokerList:
raise Exception("Supplied agent is not accessible through the supplied broker")
if agent.broker.isConnected():
agentList.append(agent)
else:
if "_objectId" in kwargs:
oid = kwargs["_objectId"]
for broker in brokerList:
for agent in broker.getAgents():
if agent.getBrokerBank() == oid.getBrokerBank() and agent.getAgentBank() == oid.getAgentBank():
agentList.append(agent)
else:
for broker in brokerList:
for agent in broker.getAgents():
if agent.broker.isConnected():
agentList.append(agent)
if len(agentList) == 0:
return []
pname = None
cname = None
hash = None
classKey = None
if "_schema" in kwargs: classKey = kwargs["_schema"].getKey()
elif "_key" in kwargs: classKey = kwargs["_key"]
elif "_class" in kwargs:
cname = kwargs["_class"]
if "_package" in kwargs:
pname = kwargs["_package"]
if cname == None and classKey == None and "_objectId" not in kwargs:
raise Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument")
map = {}
self.getSelect = []
if "_objectId" in kwargs:
map["_objectid"] = kwargs["_objectId"].__repr__()
else:
if cname == None:
cname = classKey.getClassName()
pname = classKey.getPackageName()
hash = classKey.getHash()
map["_class"] = cname
if pname != None: map["_package"] = pname
if hash != None: map["_hash"] = hash
for item in kwargs:
if item[0] != '_':
self.getSelect.append((item, kwargs[item]))
self.getResult = []
for agent in agentList:
broker = agent.broker
sendCodec = Codec(broker.conn.spec)
try:
self.cv.acquire()
seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET)
self.syncSequenceList.append(seq)
finally:
self.cv.release()
broker._setHeader(sendCodec, 'G', seq)
sendCodec.write_map(map)
smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % (agent.brokerBank, agent.agentBank))
broker._send(smsg)
starttime = time()
timeout = False
try:
self.cv.acquire()
while len(self.syncSequenceList) > 0 and self.error == None:
self.cv.wait(self.GET_WAIT_TIME)
if time() - starttime > self.GET_WAIT_TIME:
for pendingSeq in self.syncSequenceList:
self.seqMgr._release(pendingSeq)
self.syncSequenceList = []
timeout = True
finally:
self.cv.release()
if self.error:
errorText = self.error
self.error = None
raise Exception(errorText)
if len(self.getResult) == 0 and timeout:
raise RuntimeError("No agent responded within timeout period")
return self.getResult
def setEventFilter(self, **kwargs):
""" """
pass
def _bindingKeys(self):
keyList = []
keyList.append("schema.#")
if self.rcvObjects and self.rcvEvents and self.rcvHeartbeats and not self.userBindings:
keyList.append("console.#")
else:
if self.rcvObjects and not self.userBindings:
keyList.append("console.obj.#")
else:
keyList.append("console.obj.*.*.org.apache.qpid.broker.agent")
if self.rcvEvents:
keyList.append("console.event.#")
if self.rcvHeartbeats:
keyList.append("console.heartbeat.#")
return keyList
def _handleBrokerConnect(self, broker):
if self.console:
self.console.brokerConnected(broker)
def _handleBrokerDisconnect(self, broker):
if self.console:
self.console.brokerDisconnected(broker)
def _handleBrokerResp(self, broker, codec, seq):
broker.brokerId = UUID(codec.read_uuid())
if self.console != None:
self.console.brokerInfo(broker)
# Send a package request
# (effectively inc and dec outstanding by not doing anything)
sendCodec = Codec(broker.conn.spec)
seq = self.seqMgr._reserve(self._CONTEXT_STARTUP)
broker._setHeader(sendCodec, 'P', seq)
smsg = broker._message(sendCodec.encoded)
broker._send(smsg)
def _handlePackageInd(self, broker, codec, seq):
pname = str(codec.read_str8())
notify = False
try:
self.cv.acquire()
if pname not in self.packages:
self.packages[pname] = {}
notify = True
finally:
self.cv.release()
if notify and self.console != None:
self.console.newPackage(pname)
# Send a class request
broker._incOutstanding()
sendCodec = Codec(broker.conn.spec)
seq = self.seqMgr._reserve(self._CONTEXT_STARTUP)
broker._setHeader(sendCodec, 'Q', seq)
sendCodec.write_str8(pname)
smsg = broker._message(sendCodec.encoded)
broker._send(smsg)
def _handleCommandComplete(self, broker, codec, seq):
code = codec.read_uint32()
text = codec.read_str8()
context = self.seqMgr._release(seq)
if context == self._CONTEXT_STARTUP:
broker._decOutstanding()
elif context == self._CONTEXT_SYNC and seq == broker.syncSequence:
try:
broker.cv.acquire()
broker.syncInFlight = False
broker.cv.notify()
finally:
broker.cv.release()
elif context == self._CONTEXT_MULTIGET and seq in self.syncSequenceList:
try:
self.cv.acquire()
self.syncSequenceList.remove(seq)
if len(self.syncSequenceList) == 0:
self.cv.notify()
finally:
self.cv.release()
def _handleClassInd(self, broker, codec, seq):
kind = codec.read_uint8()
classKey = ClassKey(codec)
unknown = False
try:
self.cv.acquire()
if classKey.getPackageName() in self.packages:
if classKey.getPackageKey() not in self.packages[classKey.getPackageName()]:
unknown = True
finally:
self.cv.release()
if unknown:
# Send a schema request for the unknown class
broker._incOutstanding()
sendCodec = Codec(broker.conn.spec)
seq = self.seqMgr._reserve(self._CONTEXT_STARTUP)
broker._setHeader(sendCodec, 'S', seq)
classKey.encode(sendCodec)
smsg = broker._message(sendCodec.encoded)
broker._send(smsg)
def _handleMethodResp(self, broker, codec, seq):
code = codec.read_uint32()
text = codec.read_str16()
outArgs = {}
method, synchronous = self.seqMgr._release(seq)
if code == 0:
for arg in method.arguments:
if arg.dir.find("O") != -1:
outArgs[arg.name] = self._decodeValue(codec, arg.type)
result = MethodResult(code, text, outArgs)
if synchronous:
try:
broker.cv.acquire()
broker.syncResult = result
broker.syncInFlight = False
broker.cv.notify()
finally:
broker.cv.release()
else:
if self.console:
self.console.methodResponse(broker, seq, result)
def _handleHeartbeatInd(self, broker, codec, seq, msg):
brokerBank = 1
agentBank = 0
dp = msg.get("delivery_properties")
if dp:
key = dp["routing_key"]
keyElements = key.split(".")
if len(keyElements) == 4:
brokerBank = int(keyElements[2])
agentBank = int(keyElements[3])
agent = broker.getAgent(brokerBank, agentBank)
timestamp = codec.read_uint64()
if self.console != None and agent != None:
self.console.heartbeat(agent, timestamp)
def _handleEventInd(self, broker, codec, seq):
if self.console != None:
event = Event(self, broker, codec)
self.console.event(broker, event)
def _handleSchemaResp(self, broker, codec, seq):
kind = codec.read_uint8()
classKey = ClassKey(codec)
_class = SchemaClass(kind, classKey, codec)
try:
self.cv.acquire()
self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class
finally:
self.cv.release()
self.seqMgr._release(seq)
broker._decOutstanding()
if self.console != None:
self.console.newClass(kind, classKey)
def _handleContentInd(self, broker, codec, seq, prop=False, stat=False):
classKey = ClassKey(codec)
try:
self.cv.acquire()
pname = classKey.getPackageName()
if pname not in self.packages:
return
pkey = classKey.getPackageKey()
if pkey not in self.packages[pname]:
return
schema = self.packages[pname][pkey]
finally:
self.cv.release()
object = Object(self, broker, schema, codec, prop, stat)
if pname == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop:
broker._updateAgent(object)
try:
self.cv.acquire()
if seq in self.syncSequenceList:
if object.getTimestamps()[2] == 0 and self._selectMatch(object):
self.getResult.append(object)
return
finally:
self.cv.release()
if self.console and self.rcvObjects:
if prop:
self.console.objectProps(broker, object)
if stat:
self.console.objectStats(broker, object)
def _handleError(self, error):
self.error = error
try:
self.cv.acquire()
self.syncSequenceList = []
self.cv.notify()
finally:
self.cv.release()
def _selectMatch(self, object):
""" Check the object against self.getSelect to check for a match """
for key, value in self.getSelect:
for prop, propval in object.getProperties():
if key == prop.name and value != propval:
return False
return True
def _decodeValue(self, codec, typecode):
""" Decode, from the codec, a value based on its typecode. """
if typecode == 1: data = codec.read_uint8() # U8
elif typecode == 2: data = codec.read_uint16() # U16
elif typecode == 3: data = codec.read_uint32() # U32
elif typecode == 4: data = codec.read_uint64() # U64
elif typecode == 6: data = codec.read_str8() # SSTR
elif typecode == 7: data = codec.read_str16() # LSTR
elif typecode == 8: data = codec.read_int64() # ABSTIME
elif typecode == 9: data = codec.read_uint64() # DELTATIME
elif typecode == 10: data = ObjectId(codec) # REF
elif typecode == 11: data = codec.read_uint8() != 0 # BOOL
elif typecode == 12: data = codec.read_float() # FLOAT
elif typecode == 13: data = codec.read_double() # DOUBLE
elif typecode == 14: data = UUID(codec.read_uuid()) # UUID
elif typecode == 15: data = codec.read_map() # FTABLE
elif typecode == 16: data = codec.read_int8() # S8
elif typecode == 17: data = codec.read_int16() # S16
elif typecode == 18: data = codec.read_int32() # S32
elif typecode == 19: data = codec.read_int64() # S63
else:
raise ValueError("Invalid type code: %d" % typecode)
return data
def _encodeValue(self, codec, value, typecode):
""" Encode, into the codec, a value based on its typecode. """
if typecode == 1: codec.write_uint8 (int(value)) # U8
elif typecode == 2: codec.write_uint16 (int(value)) # U16
elif typecode == 3: codec.write_uint32 (long(value)) # U32
elif typecode == 4: codec.write_uint64 (long(value)) # U64
elif typecode == 6: codec.write_str8 (value) # SSTR
elif typecode == 7: codec.write_str16 (value) # LSTR
elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME
elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME
elif typecode == 10: value.encode (codec) # REF
elif typecode == 11: codec.write_uint8 (int(value)) # BOOL
elif typecode == 12: codec.write_float (float(value)) # FLOAT
elif typecode == 13: codec.write_double (float(value)) # DOUBLE
elif typecode == 14: codec.write_uuid (value.bytes) # UUID
elif typecode == 15: codec.write_map (value) # FTABLE
elif typecode == 16: codec.write_int8 (int(value)) # S8
elif typecode == 17: codec.write_int16 (int(value)) # S16
elif typecode == 18: codec.write_int32 (int(value)) # S32
elif typecode == 19: codec.write_int64 (int(value)) # S64
else:
raise ValueError ("Invalid type code: %d" % typecode)
def _displayValue(self, value, typecode):
""" """
if typecode == 1: return unicode(value)
elif typecode == 2: return unicode(value)
elif typecode == 3: return unicode(value)
elif typecode == 4: return unicode(value)
elif typecode == 6: return value
elif typecode == 7: return value
elif typecode == 8: return unicode(strftime("%c", gmtime(value / 1000000000)))
elif typecode == 9: return unicode(value)
elif typecode == 10: return unicode(value.__repr__())
elif typecode == 11:
if value: return u"T"
else: return u"F"
elif typecode == 12: return unicode(value)
elif typecode == 13: return unicode(value)
elif typecode == 14: return unicode(value.__repr__())
elif typecode == 15: return unicode(value.__repr__())
elif typecode == 16: return unicode(value)
elif typecode == 17: return unicode(value)
elif typecode == 18: return unicode(value)
elif typecode == 19: return unicode(value)
else:
raise ValueError ("Invalid type code: %d" % typecode)
def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList):
""" This function can be used to send a method request to an object given only the
broker, schemaKey, and objectId. This is an uncommon usage pattern as methods are
normally invoked on the object itself.
"""
schema = self.getSchema(schemaKey)
for method in schema.getMethods():
if name == method.name:
aIdx = 0
sendCodec = Codec(broker.conn.spec)
seq = self.seqMgr._reserve((method, False))
broker._setHeader(sendCodec, 'M', seq)
objectId.encode(sendCodec)
schemaKey.encode(sendCodec)
sendCodec.write_str8(name)
count = 0
for arg in method.arguments:
if arg.dir.find("I") != -1:
count += 1
if count != len(argList):
raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(argList)))
for arg in method.arguments:
if arg.dir.find("I") != -1:
self._encodeValue(sendCodec, argList[aIdx], arg.type)
aIdx += 1
smsg = broker._message(sendCodec.encoded, "agent.%d.%d" %
(objectId.getBrokerBank(), objectId.getAgentBank()))
broker._send(smsg)
return seq
return None
class Package:
""" """
def __init__(self, name):
self.name = name
class ClassKey:
""" A ClassKey uniquely identifies a class from the schema. """
def __init__(self, constructor):
if type(constructor) == str:
# construct from __repr__ string
try:
self.pname, cls = constructor.split(":")
self.cname, hsh = cls.split("(")
hsh = hsh.strip(")")
hexValues = hsh.split("-")
h0 = int(hexValues[0], 16)
h1 = int(hexValues[1], 16)
h2 = int(hexValues[2], 16)
h3 = int(hexValues[3], 16)
self.hash = struct.pack("!LLLL", h0, h1, h2, h3)
except:
raise Exception("Invalid ClassKey format")
else:
# construct from codec
codec = constructor
self.pname = str(codec.read_str8())
self.cname = str(codec.read_str8())
self.hash = codec.read_bin128()
def encode(self, codec):
codec.write_str8(self.pname)
codec.write_str8(self.cname)
codec.write_bin128(self.hash)
def getPackageName(self):
return self.pname
def getClassName(self):
return self.cname
def getHash(self):
return self.hash
def getHashString(self):
return "%08x-%08x-%08x-%08x" % struct.unpack ("!LLLL", self.hash)
def getPackageKey(self):
return (self.cname, self.hash)
def __repr__(self):
return self.pname + ":" + self.cname + "(" + self.getHashString() + ")"
class SchemaClass:
""" """
CLASS_KIND_TABLE = 1
CLASS_KIND_EVENT = 2
def __init__(self, kind, key, codec):
self.kind = kind
self.classKey = key
self.properties = []
self.statistics = []
self.methods = []
self.arguments = []
if self.kind == self.CLASS_KIND_TABLE:
propCount = codec.read_uint16()
statCount = codec.read_uint16()
methodCount = codec.read_uint16()
for idx in range(propCount):
self.properties.append(SchemaProperty(codec))
for idx in range(statCount):
self.statistics.append(SchemaStatistic(codec))
for idx in range(methodCount):
self.methods.append(SchemaMethod(codec))
elif self.kind == self.CLASS_KIND_EVENT:
argCount = codec.read_uint16()
for idx in range(argCount):
self.arguments.append(SchemaArgument(codec, methodArg=False))
def __repr__(self):
if self.kind == self.CLASS_KIND_TABLE:
kindStr = "Table"
elif self.kind == self.CLASS_KIND_EVENT:
kindStr = "Event"
else:
kindStr = "Unsupported"
result = "%s Class: %s " % (kindStr, self.classKey.__repr__())
return result
def getKey(self):
""" Return the class-key for this class. """
return self.classKey
def getProperties(self):
""" Return the list of properties for the class. """
return self.properties
def getStatistics(self):
""" Return the list of statistics for the class. """
return self.statistics
def getMethods(self):
""" Return the list of methods for the class. """
return self.methods
def getArguments(self):
""" Return the list of events for the class. """
return self.arguments
class SchemaProperty:
""" """
def __init__(self, codec):
map = codec.read_map()
self.name = str(map["name"])
self.type = map["type"]
self.access = str(map["access"])
self.index = map["index"] != 0
self.optional = map["optional"] != 0
self.unit = None
self.min = None
self.max = None
self.maxlen = None
self.desc = None
for key, value in map.items():
if key == "unit" : self.unit = value
elif key == "min" : self.min = value
elif key == "max" : self.max = value
elif key == "maxlen" : self.maxlen = value
elif key == "desc" : self.desc = value
def __repr__(self):
return self.name
class SchemaStatistic:
""" """
def __init__(self, codec):
map = codec.read_map()
self.name = str(map["name"])
self.type = map["type"]
self.unit = None
self.desc = None
for key, value in map.items():
if key == "unit" : self.unit = value
elif key == "desc" : self.desc = value
def __repr__(self):
return self.name
class SchemaMethod:
""" """
def __init__(self, codec):
map = codec.read_map()
self.name = str(map["name"])
argCount = map["argCount"]
if "desc" in map:
self.desc = map["desc"]
else:
self.desc = None
self.arguments = []
for idx in range(argCount):
self.arguments.append(SchemaArgument(codec, methodArg=True))
def __repr__(self):
result = self.name + "("
first = True
for arg in self.arguments:
if arg.dir.find("I") != -1:
if first:
first = False
else:
result += ", "
result += arg.name
result += ")"
return result
class SchemaArgument:
""" """
def __init__(self, codec, methodArg):
map = codec.read_map()
self.name = str(map["name"])
self.type = map["type"]
if methodArg:
self.dir = str(map["dir"]).upper()
self.unit = None
self.min = None
self.max = None
self.maxlen = None
self.desc = None
self.default = None
for key, value in map.items():
if key == "unit" : self.unit = value
elif key == "min" : self.min = value
elif key == "max" : self.max = value
elif key == "maxlen" : self.maxlen = value
elif key == "desc" : self.desc = value
elif key == "default" : self.default = value
class ObjectId:
""" Object that represents QMF object identifiers """
def __init__(self, codec, first=0, second=0):
if codec:
self.first = codec.read_uint64()
self.second = codec.read_uint64()
else:
self.first = first
self.second = second
def __cmp__(self, other):
if other == None or not isinstance(other, ObjectId) :
return 1
if self.first < other.first:
return -1
if self.first > other.first:
return 1
if self.second < other.second:
return -1
if self.second > other.second:
return 1
return 0
def __repr__(self):
return "%d-%d-%d-%d-%d" % (self.getFlags(), self.getSequence(),
self.getBrokerBank(), self.getAgentBank(), self.getObject())
def index(self):
return (self.first, self.second)
def getFlags(self):
return (self.first & 0xF000000000000000) >> 60
def getSequence(self):
return (self.first & 0x0FFF000000000000) >> 48
def getBrokerBank(self):
return (self.first & 0x0000FFFFF0000000) >> 28
def getAgentBank(self):
return self.first & 0x000000000FFFFFFF
def getObject(self):
return self.second
def isDurable(self):
return self.getSequence() == 0
def encode(self, codec):
codec.write_uint64(self.first)
codec.write_uint64(self.second)
def __hash__(self):
return (self.first, self.second).__hash__()
def __eq__(self, other):
return (self.first, self.second).__eq__(other)
class Object(object):
""" """
def __init__(self, session, broker, schema, codec, prop, stat):
""" """
self._session = session
self._broker = broker
self._schema = schema
self._currentTime = codec.read_uint64()
self._createTime = codec.read_uint64()
self._deleteTime = codec.read_uint64()
self._objectId = ObjectId(codec)
self._properties = []
self._statistics = []
if prop:
notPresent = self._parsePresenceMasks(codec, schema)
for property in schema.getProperties():
if property.name in notPresent:
self._properties.append((property, None))
else:
self._properties.append((property, self._session._decodeValue(codec, property.type)))
if stat:
for statistic in schema.getStatistics():
self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type)))
def getBroker(self):
""" Return the broker from which this object was sent """
return self._broker
def getObjectId(self):
""" Return the object identifier for this object """
return self._objectId
def getClassKey(self):
""" Return the class-key that references the schema describing this object. """
return self._schema.getKey()
def getSchema(self):
""" Return the schema that describes this object. """
return self._schema
def getMethods(self):
""" Return a list of methods available for this object. """
return self._schema.getMethods()
def getTimestamps(self):
""" Return the current, creation, and deletion times for this object. """
return self._currentTime, self._createTime, self._deleteTime
def getIndex(self):
""" Return a string describing this object's primary key. """
result = u""
for property, value in self._properties:
if property.index:
if result != u"":
result += u":"
try:
valstr = unicode(self._session._displayValue(value, property.type))
except:
valstr = u"<undecodable>"
result += valstr
return result
def getProperties(self):
return self._properties
def getStatistics(self):
return self._statistics
def mergeUpdate(self, newer):
""" Replace properties and/or statistics with a newly received update """
if self._objectId != newer._objectId:
raise Exception("Objects with different object-ids")
if len(newer.getProperties()) > 0:
self.properties = newer.getProperties()
if len(newer.getStatistics()) > 0:
self.statistics = newer.getStatistics()
def __repr__(self):
key = self.getClassKey()
return key.getPackageName() + ":" + key.getClassName() +\
"[" + self.getObjectId().__repr__() + "] " + self.getIndex().encode("utf8")
def __getattr__(self, name):
for method in self._schema.getMethods():
if name == method.name:
return lambda *args, **kwargs : self._invoke(name, args, kwargs)
for property, value in self._properties:
if name == property.name:
return value
if name == "_" + property.name + "_" and property.type == 10: # Dereference references
deref = self._session.getObjects(_objectId=value, _broker=self._broker)
if len(deref) != 1:
return None
else:
return deref[0]
for statistic, value in self._statistics:
if name == statistic.name:
return value
raise Exception("Type Object has no attribute '%s'" % name)
def _sendMethodRequest(self, name, args, kwargs, synchronous=False):
for method in self._schema.getMethods():
if name == method.name:
aIdx = 0
sendCodec = Codec(self._broker.conn.spec)
seq = self._session.seqMgr._reserve((method, synchronous))
self._broker._setHeader(sendCodec, 'M', seq)
self._objectId.encode(sendCodec)
self._schema.getKey().encode(sendCodec)
sendCodec.write_str8(name)
count = 0
for arg in method.arguments:
if arg.dir.find("I") != -1:
count += 1
if count != len(args):
raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args)))
for arg in method.arguments:
if arg.dir.find("I") != -1:
self._session._encodeValue(sendCodec, args[aIdx], arg.type)
aIdx += 1
smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
(self._objectId.getBrokerBank(), self._objectId.getAgentBank()))
if synchronous:
try:
self._broker.cv.acquire()
self._broker.syncInFlight = True
finally:
self._broker.cv.release()
self._broker._send(smsg)
return seq
return None
def _invoke(self, name, args, kwargs):
if self._sendMethodRequest(name, args, kwargs, True):
try:
self._broker.cv.acquire()
starttime = time()
while self._broker.syncInFlight and self._broker.error == None:
self._broker.cv.wait(self._broker.SYNC_TIME)
if time() - starttime > self._broker.SYNC_TIME:
self._session.seqMgr._release(seq)
raise RuntimeError("Timed out waiting for method to respond")
finally:
self._broker.cv.release()
if self._broker.error != None:
errorText = self._broker.error
self._broker.error = None
raise Exception(errorText)
return self._broker.syncResult
raise Exception("Invalid Method (software defect) [%s]" % name)
def _parsePresenceMasks(self, codec, schema):
excludeList = []
bit = 0
for property in schema.getProperties():
if property.optional:
if bit == 0:
mask = codec.read_uint8()
bit = 1
if (mask & bit) == 0:
excludeList.append(property.name)
bit *= 2
if bit == 256:
bit = 0
return excludeList
class MethodResult(object):
""" """
def __init__(self, status, text, outArgs):
""" """
self.status = status
self.text = text
self.outArgs = outArgs
def __getattr__(self, name):
if name in self.outArgs:
return self.outArgs[name]
def __repr__(self):
return "%s (%d) - %s" % (self.text, self.status, self.outArgs)
class ManagedConnection(Thread):
""" Thread class for managing a connection. """
DELAY_MIN = 1
DELAY_MAX = 128
DELAY_FACTOR = 2
def __init__(self, broker):
Thread.__init__(self)
self.broker = broker
self.cv = Condition()
self.canceled = False
def stop(self):
""" Tell this thread to stop running and return. """
try:
self.cv.acquire()
self.canceled = True
self.cv.notify()
finally:
self.cv.release()
def disconnected(self):
""" Notify the thread that the connection was lost. """
try:
self.cv.acquire()
self.cv.notify()
finally:
self.cv.release()
def run(self):
""" Main body of the running thread. """
delay = self.DELAY_MIN
while True:
try:
self.broker._tryToConnect()
try:
self.cv.acquire()
while (not self.canceled) and self.broker.connected:
self.cv.wait()
if self.canceled:
return
delay = self.DELAY_MIN
finally:
self.cv.release()
except socket.error:
if delay < self.DELAY_MAX:
delay *= self.DELAY_FACTOR
except SessionDetached:
if delay < self.DELAY_MAX:
delay *= self.DELAY_FACTOR
except Closed:
if delay < self.DELAY_MAX:
delay *= self.DELAY_FACTOR
try:
self.cv.acquire()
self.cv.wait(delay)
if self.canceled:
return
finally:
self.cv.release()
class Broker:
""" This object represents a connection (or potential connection) to a QMF broker. """
SYNC_TIME = 60
def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False):
self.session = session
self.host = host
self.port = port
self.ssl = ssl
self.authUser = authUser
self.authPass = authPass
self.cv = Condition()
self.error = None
self.brokerId = None
self.connected = False
self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid())
if self.session.manageConnections:
self.thread = ManagedConnection(self)
self.thread.start()
else:
self.thread = None
self._tryToConnect()
def isConnected(self):
""" Return True if there is an active connection to the broker. """
return self.connected
def getError(self):
""" Return the last error message seen while trying to connect to the broker. """
return self.error
def getBrokerId(self):
""" Get broker's unique identifier (UUID) """
return self.brokerId
def getBrokerBank(self):
""" Return the broker-bank value. This is the value that the broker assigns to
objects within its control. This value appears as a field in the ObjectId
of objects created by agents controlled by this broker. """
return 1
def getAgent(self, brokerBank, agentBank):
""" Return the agent object associated with a particular broker and agent bank value."""
bankKey = (brokerBank, agentBank)
if bankKey in self.agents:
return self.agents[bankKey]
return None
def getSessionId(self):
""" Get the identifier of the AMQP session to the broker """
return self.amqpSessionId
def getAgents(self):
""" Get the list of agents reachable via this broker """
return self.agents.values()
def getAmqpSession(self):
""" Get the AMQP session object for this connected broker. """
return self.amqpSession
def getUrl(self):
""" """
return "%s:%d" % (self.host, self.port)
def getFullUrl(self, noAuthIfGuestDefault=True):
""" """
ssl = ""
if self.ssl:
ssl = "s"
auth = "%s/%s@" % (self.authUser, self.authPass)
if self.authUser == "" or \
(noAuthIfGuestDefault and self.authUser == "guest" and self.authPass == "guest"):
auth = ""
return "amqp%s://%s%s:%d" % (ssl, auth, self.host, self.port or 5672)
def __repr__(self):
if self.connected:
return "Broker connected at: %s" % self.getUrl()
else:
return "Disconnected Broker"
def _tryToConnect(self):
try:
self.agents = {}
self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
self.topicBound = False
self.syncInFlight = False
self.syncRequest = 0
self.syncResult = None
self.reqsOutstanding = 1
sock = connect(self.host, self.port)
if self.ssl:
sock = ssl(sock)
self.conn = Connection(sock, username=self.authUser, password=self.authPass)
self.conn.start()
self.replyName = "reply-%s" % self.amqpSessionId
self.amqpSession = self.conn.session(self.amqpSessionId)
self.amqpSession.auto_sync = True
self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True)
self.amqpSession.exchange_bind(exchange="amq.direct",
queue=self.replyName, binding_key=self.replyName)
self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb)
self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1)
self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFF)
self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFF)
self.topicName = "topic-%s" % self.amqpSessionId
self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, auto_delete=True)
self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
self.amqpSession.incoming("tdest").listen(self._replyCb)
self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1)
self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFF)
self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFF)
self.connected = True
self.session._handleBrokerConnect(self)
codec = Codec(self.conn.spec)
self._setHeader(codec, 'B')
msg = self._message(codec.encoded)
self._send(msg)
except socket.error, e:
self.error = "Socket Error %s - %s" % (e[0], e[1])
raise
except Closed, e:
self.error = "Connect Failed %d - %s" % (e[0], e[1])
raise
except ConnectionFailed, e:
self.error = "Connect Failed %d - %s" % (e[0], e[1])
raise
def _updateAgent(self, obj):
bankKey = (obj.brokerBank, obj.agentBank)
if obj._deleteTime == 0:
if bankKey not in self.agents:
agent = Agent(self, obj.agentBank, obj.label)
self.agents[bankKey] = agent
if self.session.console != None:
self.session.console.newAgent(agent)
else:
agent = self.agents.pop(bankKey, None)
if agent != None and self.session.console != None:
self.session.console.delAgent(agent)
def _setHeader(self, codec, opcode, seq=0):
""" Compose the header of a management message. """
codec.write_uint8(ord('A'))
codec.write_uint8(ord('M'))
codec.write_uint8(ord('2'))
codec.write_uint8(ord(opcode))
codec.write_uint32(seq)
def _checkHeader(self, codec):
""" Check the header of a management message and extract the opcode and class. """
try:
octet = chr(codec.read_uint8())
if octet != 'A':
return None, None
octet = chr(codec.read_uint8())
if octet != 'M':
return None, None
octet = chr(codec.read_uint8())
if octet != '2':
return None, None
opcode = chr(codec.read_uint8())
seq = codec.read_uint32()
return opcode, seq
except:
return None, None
def _message (self, body, routing_key="broker"):
dp = self.amqpSession.delivery_properties()
dp.routing_key = routing_key
mp = self.amqpSession.message_properties()
mp.content_type = "x-application/qmf"
mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName)
return Message(dp, mp, body)
def _send(self, msg, dest="qpid.management"):
self.amqpSession.message_transfer(destination=dest, message=msg)
def _shutdown(self):
if self.thread:
self.thread.stop()
self.thread.join()
if self.connected:
self.amqpSession.incoming("rdest").stop()
if self.session.console != None:
self.amqpSession.incoming("tdest").stop()
self.amqpSession.close()
self.conn.close()
self.connected = False
def _waitForStable(self):
try:
self.cv.acquire()
if not self.connected:
return
if self.reqsOutstanding == 0:
return
self.syncInFlight = True
starttime = time()
while self.reqsOutstanding != 0:
self.cv.wait(self.SYNC_TIME)
if time() - starttime > self.SYNC_TIME:
raise RuntimeError("Timed out waiting for broker to synchronize")
finally:
self.cv.release()
def _incOutstanding(self):
try:
self.cv.acquire()
self.reqsOutstanding += 1
finally:
self.cv.release()
def _decOutstanding(self):
try:
self.cv.acquire()
self.reqsOutstanding -= 1
if self.reqsOutstanding == 0 and not self.topicBound:
self.topicBound = True
for key in self.session.bindingKeyList:
self.amqpSession.exchange_bind(exchange="qpid.management",
queue=self.topicName, binding_key=key)
if self.reqsOutstanding == 0 and self.syncInFlight:
self.syncInFlight = False
self.cv.notify()
finally:
self.cv.release()
def _replyCb(self, msg):
codec = Codec(self.conn.spec, msg.body)
while True:
opcode, seq = self._checkHeader(codec)
if opcode == None: return
if opcode == 'b': self.session._handleBrokerResp (self, codec, seq)
elif opcode == 'p': self.session._handlePackageInd (self, codec, seq)
elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq)
elif opcode == 'q': self.session._handleClassInd (self, codec, seq)
elif opcode == 'm': self.session._handleMethodResp (self, codec, seq)
elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg)
elif opcode == 'e': self.session._handleEventInd (self, codec, seq)
elif opcode == 's': self.session._handleSchemaResp (self, codec, seq)
elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True)
elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True)
elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True)
def _exceptionCb(self, data):
self.connected = False
self.error = data
try:
self.cv.acquire()
if self.syncInFlight:
self.cv.notify()
finally:
self.cv.release()
self.session._handleError(self.error)
self.session._handleBrokerDisconnect(self)
if self.thread:
self.thread.disconnected()
class Agent:
""" """
def __init__(self, broker, agentBank, label):
self.broker = broker
self.brokerBank = broker.getBrokerBank()
self.agentBank = agentBank
self.label = label
def __repr__(self):
return "Agent at bank %d.%d (%s)" % (self.brokerBank, self.agentBank, self.label)
def getBroker(self):
return self.broker
def getBrokerBank(self):
return self.brokerBank
def getAgentBank(self):
return self.agentBank
class Event:
""" """
def __init__(self, session, broker, codec):
self.session = session
self.broker = broker
self.classKey = ClassKey(codec)
self.timestamp = codec.read_int64()
self.severity = codec.read_uint8()
self.schema = None
pname = self.classKey.getPackageName()
pkey = self.classKey.getPackageKey()
if pname in session.packages:
if pkey in session.packages[pname]:
self.schema = session.packages[pname][pkey]
self.arguments = {}
for arg in self.schema.arguments:
self.arguments[arg.name] = session._decodeValue(codec, arg.type)
def __repr__(self):
if self.schema == None:
return "<uninterpretable>"
out = strftime("%c", gmtime(self.timestamp / 1000000000))
out += " " + self._sevName() + " " + self.classKey.getPackageName() + ":" + self.classKey.getClassName()
out += " broker=" + self.broker.getUrl()
for arg in self.schema.arguments:
disp = self.session._displayValue(self.arguments[arg.name], arg.type).encode("utf8")
if " " in disp:
disp = "\"" + disp + "\""
out += " " + arg.name + "=" + disp
return out
def _sevName(self):
if self.severity == 0 : return "EMER "
if self.severity == 1 : return "ALERT"
if self.severity == 2 : return "CRIT "
if self.severity == 3 : return "ERROR"
if self.severity == 4 : return "WARN "
if self.severity == 5 : return "NOTIC"
if self.severity == 6 : return "INFO "
if self.severity == 7 : return "DEBUG"
return "INV-%d" % self.severity
def getClassKey(self):
return self.classKey
def getArguments(self):
return self.arguments
def getTimestamp(self):
return self.timestamp
def getName(self):
return self.name
def getSchema(self):
return self.schema
class SequenceManager:
""" Manage sequence numbers for asynchronous method calls """
def __init__(self):
self.lock = Lock()
self.sequence = 0
self.pending = {}
def _reserve(self, data):
""" Reserve a unique sequence number """
try:
self.lock.acquire()
result = self.sequence
self.sequence = self.sequence + 1
self.pending[result] = data
finally:
self.lock.release()
return result
def _release(self, seq):
""" Release a reserved sequence number """
data = None
try:
self.lock.acquire()
if seq in self.pending:
data = self.pending[seq]
del self.pending[seq]
finally:
self.lock.release()
return data
class DebugConsole(Console):
""" """
def brokerConnected(self, broker):
print "brokerConnected:", broker
def brokerDisconnected(self, broker):
print "brokerDisconnected:", broker
def newPackage(self, name):
print "newPackage:", name
def newClass(self, kind, classKey):
print "newClass:", kind, classKey
def newAgent(self, agent):
print "newAgent:", agent
def delAgent(self, agent):
print "delAgent:", agent
def objectProps(self, broker, record):
print "objectProps:", record
def objectStats(self, broker, record):
print "objectStats:", record
def event(self, broker, event):
print "event:", event
def heartbeat(self, agent, timestamp):
print "heartbeat:", agent
def brokerInfo(self, broker):
print "brokerInfo:", broker