| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| """ Console API for Qpid Management Framework """ |
| |
| import os |
| import qpid |
| import struct |
| import socket |
| import re |
| from qpid.peer import Closed |
| from qpid.connection import Connection, ConnectionFailed |
| from qpid.datatypes import UUID, uuid4, Message, RangedSet |
| from qpid.util import connect, ssl, URL |
| from qpid.codec010 import StringCodec as Codec |
| from threading import Lock, Condition, Thread |
| from time import time, strftime, gmtime |
| from cStringIO import StringIO |
| |
| class Console: |
| """ To access the asynchronous operations, a class must be derived from |
| Console with overrides of any combination of the available methods. """ |
| |
| def brokerConnected(self, broker): |
| """ Invoked when a connection is established to a broker """ |
| pass |
| |
| def brokerDisconnected(self, broker): |
| """ Invoked when the connection to a broker is lost """ |
| pass |
| |
| def newPackage(self, name): |
| """ Invoked when a QMF package is discovered. """ |
| pass |
| |
| def newClass(self, kind, classKey): |
| """ Invoked when a new class is discovered. Session.getSchema can be |
| used to obtain details about the class.""" |
| pass |
| |
| def newAgent(self, agent): |
| """ Invoked when a QMF agent is discovered. """ |
| pass |
| |
| def delAgent(self, agent): |
| """ Invoked when a QMF agent disconects. """ |
| pass |
| |
| def objectProps(self, broker, record): |
| """ Invoked when an object is updated. """ |
| pass |
| |
| def objectStats(self, broker, record): |
| """ Invoked when an object is updated. """ |
| pass |
| |
| def event(self, broker, event): |
| """ Invoked when an event is raised. """ |
| pass |
| |
| def heartbeat(self, agent, timestamp): |
| """ """ |
| pass |
| |
| def brokerInfo(self, broker): |
| """ """ |
| pass |
| |
| def methodResponse(self, broker, seq, response): |
| """ """ |
| pass |
| |
| class BrokerURL(URL): |
| def __init__(self, text): |
| URL.__init__(self, text) |
| socket.gethostbyname(self.host) |
| if self.port is None: |
| if self.scheme == URL.AMQPS: |
| self.port = 5671 |
| else: |
| self.port = 5672 |
| self.authName = self.user or "guest" |
| self.authPass = self.password or "guest" |
| self.authMech = "PLAIN" |
| |
| def name(self): |
| return self.host + ":" + str(self.port) |
| |
| def match(self, host, port): |
| return socket.gethostbyname(self.host) == socket.gethostbyname(host) and self.port == port |
| |
| class Session: |
| """ |
| An instance of the Session class represents a console session running |
| against one or more QMF brokers. A single instance of Session is needed |
| to interact with the management framework as a console. |
| """ |
| _CONTEXT_SYNC = 1 |
| _CONTEXT_STARTUP = 2 |
| _CONTEXT_MULTIGET = 3 |
| |
| GET_WAIT_TIME = 60 |
| |
| def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True, |
| manageConnections=False, userBindings=False): |
| """ |
| Initialize a session. If the console argument is provided, the |
| more advanced asynchronous features are available. If console is |
| defaulted, the session will operate in a simpler, synchronous manner. |
| |
| The rcvObjects, rcvEvents, and rcvHeartbeats arguments are meaningful only if 'console' |
| is provided. They control whether object updates, events, and agent-heartbeats are |
| subscribed to. If the console is not interested in receiving one or more of the above, |
| setting the argument to False will reduce tha bandwidth used by the API. |
| |
| If manageConnections is set to True, the Session object will manage connections to |
| the brokers. This means that if a broker is unreachable, it will retry until a connection |
| can be established. If a connection is lost, the Session will attempt to reconnect. |
| |
| If manageConnections is set to False, the user is responsible for handing failures. In |
| this case, an unreachable broker will cause addBroker to raise an exception. |
| |
| If userBindings is set to False (the default) and rcvObjects is True, the console will |
| receive data for all object classes. If userBindings is set to True, the user must select |
| which classes the console shall receive by invoking the bindPackage or bindClass methods. |
| This allows the console to be configured to receive only information that is relavant to |
| a particular application. If rcvObjects id False, userBindings has no meaning. |
| """ |
| self.console = console |
| self.brokers = [] |
| self.packages = {} |
| self.seqMgr = SequenceManager() |
| self.cv = Condition() |
| self.syncSequenceList = [] |
| self.getResult = [] |
| self.getSelect = [] |
| self.error = None |
| self.rcvObjects = rcvObjects |
| self.rcvEvents = rcvEvents |
| self.rcvHeartbeats = rcvHeartbeats |
| self.userBindings = userBindings |
| if self.console == None: |
| self.rcvObjects = False |
| self.rcvEvents = False |
| self.rcvHeartbeats = False |
| self.bindingKeyList = self._bindingKeys() |
| self.manageConnections = manageConnections |
| |
| if self.userBindings and not self.rcvObjects: |
| raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided") |
| |
| def __repr__(self): |
| return "QMF Console Session Manager (brokers: %d)" % len(self.brokers) |
| |
| def addBroker(self, target="localhost"): |
| """ Connect to a Qpid broker. Returns an object of type Broker. """ |
| url = BrokerURL(target) |
| broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass, |
| ssl = url.scheme == URL.AMQPS) |
| |
| self.brokers.append(broker) |
| if not self.manageConnections: |
| self.getObjects(broker=broker, _class="agent") |
| return broker |
| |
| def delBroker(self, broker): |
| """ Disconnect from a broker. The 'broker' argument is the object |
| returned from the addBroker call """ |
| broker._shutdown() |
| self.brokers.remove(broker) |
| del broker |
| |
| def getPackages(self): |
| """ Get the list of known QMF packages """ |
| for broker in self.brokers: |
| broker._waitForStable() |
| list = [] |
| for package in self.packages: |
| list.append(package) |
| return list |
| |
| def getClasses(self, packageName): |
| """ Get the list of known classes within a QMF package """ |
| for broker in self.brokers: |
| broker._waitForStable() |
| list = [] |
| if packageName in self.packages: |
| for pkey in self.packages[packageName]: |
| list.append(self.packages[packageName][pkey].getKey()) |
| return list |
| |
| def getSchema(self, classKey): |
| """ Get the schema for a QMF class """ |
| for broker in self.brokers: |
| broker._waitForStable() |
| pname = classKey.getPackageName() |
| pkey = classKey.getPackageKey() |
| if pname in self.packages: |
| if pkey in self.packages[pname]: |
| return self.packages[pname][pkey] |
| |
| def bindPackage(self, packageName): |
| """ Request object updates for all table classes within a package. """ |
| if not self.userBindings or not self.rcvObjects: |
| raise Exception("userBindings option not set for Session") |
| key = "console.obj.*.*.%s.#" % packageName |
| self.bindingKeyList.append(key) |
| for broker in self.brokers: |
| if broker.isConnected(): |
| broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, |
| binding_key=key) |
| |
| def bindClass(self, pname, cname): |
| """ Request object updates for a particular table class by package and class name. """ |
| if not self.userBindings or not self.rcvObjects: |
| raise Exception("userBindings option not set for Session") |
| key = "console.obj.*.*.%s.%s.#" % (pname, cname) |
| self.bindingKeyList.append(key) |
| for broker in self.brokers: |
| if broker.isConnected(): |
| broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, |
| binding_key=key) |
| |
| def bindClassKey(self, classKey): |
| """ Request object updates for a particular table class by class key. """ |
| pname = classKey.getPackageName() |
| cname = classKey.getClassName() |
| self.bindClass(pname, cname) |
| |
| def getAgents(self, broker=None): |
| """ Get a list of currently known agents """ |
| brokerList = [] |
| if broker == None: |
| for b in self.brokers: |
| brokerList.append(b) |
| else: |
| brokerList.append(broker) |
| |
| for b in brokerList: |
| b._waitForStable() |
| agentList = [] |
| for b in brokerList: |
| for a in b.getAgents(): |
| agentList.append(a) |
| return agentList |
| |
| def getObjects(self, **kwargs): |
| """ Get a list of objects from QMF agents. |
| All arguments are passed by name(keyword). |
| |
| The class for queried objects may be specified in one of the following ways: |
| |
| _schema = <schema> - supply a schema object returned from getSchema. |
| _key = <key> - supply a classKey from the list returned by getClasses. |
| _class = <name> - supply a class name as a string. If the class name exists |
| in multiple packages, a _package argument may also be supplied. |
| _objectId = <id> - get the object referenced by the object-id |
| |
| If objects should be obtained from only one agent, use the following argument. |
| Otherwise, the query will go to all agents. |
| |
| _agent = <agent> - supply an agent from the list returned by getAgents. |
| |
| If the get query is to be restricted to one broker (as opposed to all connected brokers), |
| add the following argument: |
| |
| _broker = <broker> - supply a broker as returned by addBroker. |
| |
| If additional arguments are supplied, they are used as property selectors. For example, |
| if the argument name="test" is supplied, only objects whose "name" property is "test" |
| will be returned in the result. |
| """ |
| if "_broker" in kwargs: |
| brokerList = [] |
| brokerList.append(kwargs["_broker"]) |
| else: |
| brokerList = self.brokers |
| for broker in brokerList: |
| broker._waitForStable() |
| |
| agentList = [] |
| if "_agent" in kwargs: |
| agent = kwargs["_agent"] |
| if agent.broker not in brokerList: |
| raise Exception("Supplied agent is not accessible through the supplied broker") |
| if agent.broker.isConnected(): |
| agentList.append(agent) |
| else: |
| if "_objectId" in kwargs: |
| oid = kwargs["_objectId"] |
| for broker in brokerList: |
| for agent in broker.getAgents(): |
| if agent.getBrokerBank() == oid.getBrokerBank() and agent.getAgentBank() == oid.getAgentBank(): |
| agentList.append(agent) |
| else: |
| for broker in brokerList: |
| for agent in broker.getAgents(): |
| if agent.broker.isConnected(): |
| agentList.append(agent) |
| |
| if len(agentList) == 0: |
| return [] |
| |
| pname = None |
| cname = None |
| hash = None |
| classKey = None |
| if "_schema" in kwargs: classKey = kwargs["_schema"].getKey() |
| elif "_key" in kwargs: classKey = kwargs["_key"] |
| elif "_class" in kwargs: |
| cname = kwargs["_class"] |
| if "_package" in kwargs: |
| pname = kwargs["_package"] |
| if cname == None and classKey == None and "_objectId" not in kwargs: |
| raise Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument") |
| |
| map = {} |
| self.getSelect = [] |
| if "_objectId" in kwargs: |
| map["_objectid"] = kwargs["_objectId"].__repr__() |
| else: |
| if cname == None: |
| cname = classKey.getClassName() |
| pname = classKey.getPackageName() |
| hash = classKey.getHash() |
| map["_class"] = cname |
| if pname != None: map["_package"] = pname |
| if hash != None: map["_hash"] = hash |
| for item in kwargs: |
| if item[0] != '_': |
| self.getSelect.append((item, kwargs[item])) |
| |
| self.getResult = [] |
| for agent in agentList: |
| broker = agent.broker |
| sendCodec = Codec(broker.conn.spec) |
| try: |
| self.cv.acquire() |
| seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET) |
| self.syncSequenceList.append(seq) |
| finally: |
| self.cv.release() |
| broker._setHeader(sendCodec, 'G', seq) |
| sendCodec.write_map(map) |
| smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % (agent.brokerBank, agent.agentBank)) |
| broker._send(smsg) |
| |
| starttime = time() |
| timeout = False |
| try: |
| self.cv.acquire() |
| while len(self.syncSequenceList) > 0 and self.error == None: |
| self.cv.wait(self.GET_WAIT_TIME) |
| if time() - starttime > self.GET_WAIT_TIME: |
| for pendingSeq in self.syncSequenceList: |
| self.seqMgr._release(pendingSeq) |
| self.syncSequenceList = [] |
| timeout = True |
| finally: |
| self.cv.release() |
| |
| if self.error: |
| errorText = self.error |
| self.error = None |
| raise Exception(errorText) |
| |
| if len(self.getResult) == 0 and timeout: |
| raise RuntimeError("No agent responded within timeout period") |
| return self.getResult |
| |
| def setEventFilter(self, **kwargs): |
| """ """ |
| pass |
| |
| def _bindingKeys(self): |
| keyList = [] |
| keyList.append("schema.#") |
| if self.rcvObjects and self.rcvEvents and self.rcvHeartbeats and not self.userBindings: |
| keyList.append("console.#") |
| else: |
| if self.rcvObjects and not self.userBindings: |
| keyList.append("console.obj.#") |
| else: |
| keyList.append("console.obj.*.*.org.apache.qpid.broker.agent") |
| if self.rcvEvents: |
| keyList.append("console.event.#") |
| if self.rcvHeartbeats: |
| keyList.append("console.heartbeat.#") |
| return keyList |
| |
| def _handleBrokerConnect(self, broker): |
| if self.console: |
| self.console.brokerConnected(broker) |
| |
| def _handleBrokerDisconnect(self, broker): |
| if self.console: |
| self.console.brokerDisconnected(broker) |
| |
| def _handleBrokerResp(self, broker, codec, seq): |
| broker.brokerId = UUID(codec.read_uuid()) |
| if self.console != None: |
| self.console.brokerInfo(broker) |
| |
| # Send a package request |
| # (effectively inc and dec outstanding by not doing anything) |
| sendCodec = Codec(broker.conn.spec) |
| seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) |
| broker._setHeader(sendCodec, 'P', seq) |
| smsg = broker._message(sendCodec.encoded) |
| broker._send(smsg) |
| |
| def _handlePackageInd(self, broker, codec, seq): |
| pname = str(codec.read_str8()) |
| notify = False |
| try: |
| self.cv.acquire() |
| if pname not in self.packages: |
| self.packages[pname] = {} |
| notify = True |
| finally: |
| self.cv.release() |
| if notify and self.console != None: |
| self.console.newPackage(pname) |
| |
| # Send a class request |
| broker._incOutstanding() |
| sendCodec = Codec(broker.conn.spec) |
| seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) |
| broker._setHeader(sendCodec, 'Q', seq) |
| sendCodec.write_str8(pname) |
| smsg = broker._message(sendCodec.encoded) |
| broker._send(smsg) |
| |
| def _handleCommandComplete(self, broker, codec, seq): |
| code = codec.read_uint32() |
| text = codec.read_str8() |
| context = self.seqMgr._release(seq) |
| if context == self._CONTEXT_STARTUP: |
| broker._decOutstanding() |
| elif context == self._CONTEXT_SYNC and seq == broker.syncSequence: |
| try: |
| broker.cv.acquire() |
| broker.syncInFlight = False |
| broker.cv.notify() |
| finally: |
| broker.cv.release() |
| elif context == self._CONTEXT_MULTIGET and seq in self.syncSequenceList: |
| try: |
| self.cv.acquire() |
| self.syncSequenceList.remove(seq) |
| if len(self.syncSequenceList) == 0: |
| self.cv.notify() |
| finally: |
| self.cv.release() |
| |
| def _handleClassInd(self, broker, codec, seq): |
| kind = codec.read_uint8() |
| classKey = ClassKey(codec) |
| unknown = False |
| |
| try: |
| self.cv.acquire() |
| if classKey.getPackageName() in self.packages: |
| if classKey.getPackageKey() not in self.packages[classKey.getPackageName()]: |
| unknown = True |
| finally: |
| self.cv.release() |
| |
| if unknown: |
| # Send a schema request for the unknown class |
| broker._incOutstanding() |
| sendCodec = Codec(broker.conn.spec) |
| seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) |
| broker._setHeader(sendCodec, 'S', seq) |
| classKey.encode(sendCodec) |
| smsg = broker._message(sendCodec.encoded) |
| broker._send(smsg) |
| |
| def _handleMethodResp(self, broker, codec, seq): |
| code = codec.read_uint32() |
| text = codec.read_str16() |
| outArgs = {} |
| method, synchronous = self.seqMgr._release(seq) |
| if code == 0: |
| for arg in method.arguments: |
| if arg.dir.find("O") != -1: |
| outArgs[arg.name] = self._decodeValue(codec, arg.type) |
| result = MethodResult(code, text, outArgs) |
| if synchronous: |
| try: |
| broker.cv.acquire() |
| broker.syncResult = result |
| broker.syncInFlight = False |
| broker.cv.notify() |
| finally: |
| broker.cv.release() |
| else: |
| if self.console: |
| self.console.methodResponse(broker, seq, result) |
| |
| def _handleHeartbeatInd(self, broker, codec, seq, msg): |
| brokerBank = 1 |
| agentBank = 0 |
| dp = msg.get("delivery_properties") |
| if dp: |
| key = dp["routing_key"] |
| keyElements = key.split(".") |
| if len(keyElements) == 4: |
| brokerBank = int(keyElements[2]) |
| agentBank = int(keyElements[3]) |
| |
| agent = broker.getAgent(brokerBank, agentBank) |
| timestamp = codec.read_uint64() |
| if self.console != None and agent != None: |
| self.console.heartbeat(agent, timestamp) |
| |
| def _handleEventInd(self, broker, codec, seq): |
| if self.console != None: |
| event = Event(self, broker, codec) |
| self.console.event(broker, event) |
| |
| def _handleSchemaResp(self, broker, codec, seq): |
| kind = codec.read_uint8() |
| classKey = ClassKey(codec) |
| _class = SchemaClass(kind, classKey, codec) |
| try: |
| self.cv.acquire() |
| self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class |
| finally: |
| self.cv.release() |
| |
| self.seqMgr._release(seq) |
| broker._decOutstanding() |
| if self.console != None: |
| self.console.newClass(kind, classKey) |
| |
| def _handleContentInd(self, broker, codec, seq, prop=False, stat=False): |
| classKey = ClassKey(codec) |
| try: |
| self.cv.acquire() |
| pname = classKey.getPackageName() |
| if pname not in self.packages: |
| return |
| pkey = classKey.getPackageKey() |
| if pkey not in self.packages[pname]: |
| return |
| schema = self.packages[pname][pkey] |
| finally: |
| self.cv.release() |
| |
| object = Object(self, broker, schema, codec, prop, stat) |
| if pname == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop: |
| broker._updateAgent(object) |
| |
| try: |
| self.cv.acquire() |
| if seq in self.syncSequenceList: |
| if object.getTimestamps()[2] == 0 and self._selectMatch(object): |
| self.getResult.append(object) |
| return |
| finally: |
| self.cv.release() |
| |
| if self.console and self.rcvObjects: |
| if prop: |
| self.console.objectProps(broker, object) |
| if stat: |
| self.console.objectStats(broker, object) |
| |
| def _handleError(self, error): |
| self.error = error |
| try: |
| self.cv.acquire() |
| self.syncSequenceList = [] |
| self.cv.notify() |
| finally: |
| self.cv.release() |
| |
| def _selectMatch(self, object): |
| """ Check the object against self.getSelect to check for a match """ |
| for key, value in self.getSelect: |
| for prop, propval in object.getProperties(): |
| if key == prop.name and value != propval: |
| return False |
| return True |
| |
| def _decodeValue(self, codec, typecode): |
| """ Decode, from the codec, a value based on its typecode. """ |
| if typecode == 1: data = codec.read_uint8() # U8 |
| elif typecode == 2: data = codec.read_uint16() # U16 |
| elif typecode == 3: data = codec.read_uint32() # U32 |
| elif typecode == 4: data = codec.read_uint64() # U64 |
| elif typecode == 6: data = codec.read_str8() # SSTR |
| elif typecode == 7: data = codec.read_str16() # LSTR |
| elif typecode == 8: data = codec.read_int64() # ABSTIME |
| elif typecode == 9: data = codec.read_uint64() # DELTATIME |
| elif typecode == 10: data = ObjectId(codec) # REF |
| elif typecode == 11: data = codec.read_uint8() != 0 # BOOL |
| elif typecode == 12: data = codec.read_float() # FLOAT |
| elif typecode == 13: data = codec.read_double() # DOUBLE |
| elif typecode == 14: data = UUID(codec.read_uuid()) # UUID |
| elif typecode == 15: data = codec.read_map() # FTABLE |
| elif typecode == 16: data = codec.read_int8() # S8 |
| elif typecode == 17: data = codec.read_int16() # S16 |
| elif typecode == 18: data = codec.read_int32() # S32 |
| elif typecode == 19: data = codec.read_int64() # S63 |
| else: |
| raise ValueError("Invalid type code: %d" % typecode) |
| return data |
| |
| def _encodeValue(self, codec, value, typecode): |
| """ Encode, into the codec, a value based on its typecode. """ |
| if typecode == 1: codec.write_uint8 (int(value)) # U8 |
| elif typecode == 2: codec.write_uint16 (int(value)) # U16 |
| elif typecode == 3: codec.write_uint32 (long(value)) # U32 |
| elif typecode == 4: codec.write_uint64 (long(value)) # U64 |
| elif typecode == 6: codec.write_str8 (value) # SSTR |
| elif typecode == 7: codec.write_str16 (value) # LSTR |
| elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME |
| elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME |
| elif typecode == 10: value.encode (codec) # REF |
| elif typecode == 11: codec.write_uint8 (int(value)) # BOOL |
| elif typecode == 12: codec.write_float (float(value)) # FLOAT |
| elif typecode == 13: codec.write_double (float(value)) # DOUBLE |
| elif typecode == 14: codec.write_uuid (value.bytes) # UUID |
| elif typecode == 15: codec.write_map (value) # FTABLE |
| elif typecode == 16: codec.write_int8 (int(value)) # S8 |
| elif typecode == 17: codec.write_int16 (int(value)) # S16 |
| elif typecode == 18: codec.write_int32 (int(value)) # S32 |
| elif typecode == 19: codec.write_int64 (int(value)) # S64 |
| else: |
| raise ValueError ("Invalid type code: %d" % typecode) |
| |
| def _displayValue(self, value, typecode): |
| """ """ |
| if typecode == 1: return unicode(value) |
| elif typecode == 2: return unicode(value) |
| elif typecode == 3: return unicode(value) |
| elif typecode == 4: return unicode(value) |
| elif typecode == 6: return value |
| elif typecode == 7: return value |
| elif typecode == 8: return unicode(strftime("%c", gmtime(value / 1000000000))) |
| elif typecode == 9: return unicode(value) |
| elif typecode == 10: return unicode(value.__repr__()) |
| elif typecode == 11: |
| if value: return u"T" |
| else: return u"F" |
| elif typecode == 12: return unicode(value) |
| elif typecode == 13: return unicode(value) |
| elif typecode == 14: return unicode(value.__repr__()) |
| elif typecode == 15: return unicode(value.__repr__()) |
| elif typecode == 16: return unicode(value) |
| elif typecode == 17: return unicode(value) |
| elif typecode == 18: return unicode(value) |
| elif typecode == 19: return unicode(value) |
| else: |
| raise ValueError ("Invalid type code: %d" % typecode) |
| |
| def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList): |
| """ This function can be used to send a method request to an object given only the |
| broker, schemaKey, and objectId. This is an uncommon usage pattern as methods are |
| normally invoked on the object itself. |
| """ |
| schema = self.getSchema(schemaKey) |
| for method in schema.getMethods(): |
| if name == method.name: |
| aIdx = 0 |
| sendCodec = Codec(broker.conn.spec) |
| seq = self.seqMgr._reserve((method, False)) |
| broker._setHeader(sendCodec, 'M', seq) |
| objectId.encode(sendCodec) |
| schemaKey.encode(sendCodec) |
| sendCodec.write_str8(name) |
| |
| count = 0 |
| for arg in method.arguments: |
| if arg.dir.find("I") != -1: |
| count += 1 |
| if count != len(argList): |
| raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(argList))) |
| |
| for arg in method.arguments: |
| if arg.dir.find("I") != -1: |
| self._encodeValue(sendCodec, argList[aIdx], arg.type) |
| aIdx += 1 |
| smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % |
| (objectId.getBrokerBank(), objectId.getAgentBank())) |
| broker._send(smsg) |
| return seq |
| return None |
| |
| class Package: |
| """ """ |
| def __init__(self, name): |
| self.name = name |
| |
| class ClassKey: |
| """ A ClassKey uniquely identifies a class from the schema. """ |
| def __init__(self, constructor): |
| if type(constructor) == str: |
| # construct from __repr__ string |
| try: |
| self.pname, cls = constructor.split(":") |
| self.cname, hsh = cls.split("(") |
| hsh = hsh.strip(")") |
| hexValues = hsh.split("-") |
| h0 = int(hexValues[0], 16) |
| h1 = int(hexValues[1], 16) |
| h2 = int(hexValues[2], 16) |
| h3 = int(hexValues[3], 16) |
| self.hash = struct.pack("!LLLL", h0, h1, h2, h3) |
| except: |
| raise Exception("Invalid ClassKey format") |
| else: |
| # construct from codec |
| codec = constructor |
| self.pname = str(codec.read_str8()) |
| self.cname = str(codec.read_str8()) |
| self.hash = codec.read_bin128() |
| |
| def encode(self, codec): |
| codec.write_str8(self.pname) |
| codec.write_str8(self.cname) |
| codec.write_bin128(self.hash) |
| |
| def getPackageName(self): |
| return self.pname |
| |
| def getClassName(self): |
| return self.cname |
| |
| def getHash(self): |
| return self.hash |
| |
| def getHashString(self): |
| return "%08x-%08x-%08x-%08x" % struct.unpack ("!LLLL", self.hash) |
| |
| def getPackageKey(self): |
| return (self.cname, self.hash) |
| |
| def __repr__(self): |
| return self.pname + ":" + self.cname + "(" + self.getHashString() + ")" |
| |
| class SchemaClass: |
| """ """ |
| CLASS_KIND_TABLE = 1 |
| CLASS_KIND_EVENT = 2 |
| |
| def __init__(self, kind, key, codec): |
| self.kind = kind |
| self.classKey = key |
| self.properties = [] |
| self.statistics = [] |
| self.methods = [] |
| self.arguments = [] |
| |
| if self.kind == self.CLASS_KIND_TABLE: |
| propCount = codec.read_uint16() |
| statCount = codec.read_uint16() |
| methodCount = codec.read_uint16() |
| for idx in range(propCount): |
| self.properties.append(SchemaProperty(codec)) |
| for idx in range(statCount): |
| self.statistics.append(SchemaStatistic(codec)) |
| for idx in range(methodCount): |
| self.methods.append(SchemaMethod(codec)) |
| |
| elif self.kind == self.CLASS_KIND_EVENT: |
| argCount = codec.read_uint16() |
| for idx in range(argCount): |
| self.arguments.append(SchemaArgument(codec, methodArg=False)) |
| |
| def __repr__(self): |
| if self.kind == self.CLASS_KIND_TABLE: |
| kindStr = "Table" |
| elif self.kind == self.CLASS_KIND_EVENT: |
| kindStr = "Event" |
| else: |
| kindStr = "Unsupported" |
| result = "%s Class: %s " % (kindStr, self.classKey.__repr__()) |
| return result |
| |
| def getKey(self): |
| """ Return the class-key for this class. """ |
| return self.classKey |
| |
| def getProperties(self): |
| """ Return the list of properties for the class. """ |
| return self.properties |
| |
| def getStatistics(self): |
| """ Return the list of statistics for the class. """ |
| return self.statistics |
| |
| def getMethods(self): |
| """ Return the list of methods for the class. """ |
| return self.methods |
| |
| def getArguments(self): |
| """ Return the list of events for the class. """ |
| return self.arguments |
| |
| class SchemaProperty: |
| """ """ |
| def __init__(self, codec): |
| map = codec.read_map() |
| self.name = str(map["name"]) |
| self.type = map["type"] |
| self.access = str(map["access"]) |
| self.index = map["index"] != 0 |
| self.optional = map["optional"] != 0 |
| self.unit = None |
| self.min = None |
| self.max = None |
| self.maxlen = None |
| self.desc = None |
| |
| for key, value in map.items(): |
| if key == "unit" : self.unit = value |
| elif key == "min" : self.min = value |
| elif key == "max" : self.max = value |
| elif key == "maxlen" : self.maxlen = value |
| elif key == "desc" : self.desc = value |
| |
| def __repr__(self): |
| return self.name |
| |
| class SchemaStatistic: |
| """ """ |
| def __init__(self, codec): |
| map = codec.read_map() |
| self.name = str(map["name"]) |
| self.type = map["type"] |
| self.unit = None |
| self.desc = None |
| |
| for key, value in map.items(): |
| if key == "unit" : self.unit = value |
| elif key == "desc" : self.desc = value |
| |
| def __repr__(self): |
| return self.name |
| |
| class SchemaMethod: |
| """ """ |
| def __init__(self, codec): |
| map = codec.read_map() |
| self.name = str(map["name"]) |
| argCount = map["argCount"] |
| if "desc" in map: |
| self.desc = map["desc"] |
| else: |
| self.desc = None |
| self.arguments = [] |
| |
| for idx in range(argCount): |
| self.arguments.append(SchemaArgument(codec, methodArg=True)) |
| |
| def __repr__(self): |
| result = self.name + "(" |
| first = True |
| for arg in self.arguments: |
| if arg.dir.find("I") != -1: |
| if first: |
| first = False |
| else: |
| result += ", " |
| result += arg.name |
| result += ")" |
| return result |
| |
| class SchemaArgument: |
| """ """ |
| def __init__(self, codec, methodArg): |
| map = codec.read_map() |
| self.name = str(map["name"]) |
| self.type = map["type"] |
| if methodArg: |
| self.dir = str(map["dir"]).upper() |
| self.unit = None |
| self.min = None |
| self.max = None |
| self.maxlen = None |
| self.desc = None |
| self.default = None |
| |
| for key, value in map.items(): |
| if key == "unit" : self.unit = value |
| elif key == "min" : self.min = value |
| elif key == "max" : self.max = value |
| elif key == "maxlen" : self.maxlen = value |
| elif key == "desc" : self.desc = value |
| elif key == "default" : self.default = value |
| |
| class ObjectId: |
| """ Object that represents QMF object identifiers """ |
| def __init__(self, codec, first=0, second=0): |
| if codec: |
| self.first = codec.read_uint64() |
| self.second = codec.read_uint64() |
| else: |
| self.first = first |
| self.second = second |
| |
| def __cmp__(self, other): |
| if other == None or not isinstance(other, ObjectId) : |
| return 1 |
| if self.first < other.first: |
| return -1 |
| if self.first > other.first: |
| return 1 |
| if self.second < other.second: |
| return -1 |
| if self.second > other.second: |
| return 1 |
| return 0 |
| |
| def __repr__(self): |
| return "%d-%d-%d-%d-%d" % (self.getFlags(), self.getSequence(), |
| self.getBrokerBank(), self.getAgentBank(), self.getObject()) |
| |
| def index(self): |
| return (self.first, self.second) |
| |
| def getFlags(self): |
| return (self.first & 0xF000000000000000) >> 60 |
| |
| def getSequence(self): |
| return (self.first & 0x0FFF000000000000) >> 48 |
| |
| def getBrokerBank(self): |
| return (self.first & 0x0000FFFFF0000000) >> 28 |
| |
| def getAgentBank(self): |
| return self.first & 0x000000000FFFFFFF |
| |
| def getObject(self): |
| return self.second |
| |
| def isDurable(self): |
| return self.getSequence() == 0 |
| |
| def encode(self, codec): |
| codec.write_uint64(self.first) |
| codec.write_uint64(self.second) |
| |
| def __hash__(self): |
| return (self.first, self.second).__hash__() |
| |
| def __eq__(self, other): |
| return (self.first, self.second).__eq__(other) |
| |
| class Object(object): |
| """ """ |
| def __init__(self, session, broker, schema, codec, prop, stat): |
| """ """ |
| self._session = session |
| self._broker = broker |
| self._schema = schema |
| self._currentTime = codec.read_uint64() |
| self._createTime = codec.read_uint64() |
| self._deleteTime = codec.read_uint64() |
| self._objectId = ObjectId(codec) |
| self._properties = [] |
| self._statistics = [] |
| if prop: |
| notPresent = self._parsePresenceMasks(codec, schema) |
| for property in schema.getProperties(): |
| if property.name in notPresent: |
| self._properties.append((property, None)) |
| else: |
| self._properties.append((property, self._session._decodeValue(codec, property.type))) |
| if stat: |
| for statistic in schema.getStatistics(): |
| self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type))) |
| |
| def getBroker(self): |
| """ Return the broker from which this object was sent """ |
| return self._broker |
| |
| def getObjectId(self): |
| """ Return the object identifier for this object """ |
| return self._objectId |
| |
| def getClassKey(self): |
| """ Return the class-key that references the schema describing this object. """ |
| return self._schema.getKey() |
| |
| def getSchema(self): |
| """ Return the schema that describes this object. """ |
| return self._schema |
| |
| def getMethods(self): |
| """ Return a list of methods available for this object. """ |
| return self._schema.getMethods() |
| |
| def getTimestamps(self): |
| """ Return the current, creation, and deletion times for this object. """ |
| return self._currentTime, self._createTime, self._deleteTime |
| |
| def getIndex(self): |
| """ Return a string describing this object's primary key. """ |
| result = u"" |
| for property, value in self._properties: |
| if property.index: |
| if result != u"": |
| result += u":" |
| try: |
| valstr = unicode(self._session._displayValue(value, property.type)) |
| except: |
| valstr = u"<undecodable>" |
| result += valstr |
| return result |
| |
| def getProperties(self): |
| return self._properties |
| |
| def getStatistics(self): |
| return self._statistics |
| |
| def mergeUpdate(self, newer): |
| """ Replace properties and/or statistics with a newly received update """ |
| if self._objectId != newer._objectId: |
| raise Exception("Objects with different object-ids") |
| if len(newer.getProperties()) > 0: |
| self.properties = newer.getProperties() |
| if len(newer.getStatistics()) > 0: |
| self.statistics = newer.getStatistics() |
| |
| def __repr__(self): |
| key = self.getClassKey() |
| return key.getPackageName() + ":" + key.getClassName() +\ |
| "[" + self.getObjectId().__repr__() + "] " + self.getIndex().encode("utf8") |
| |
| def __getattr__(self, name): |
| for method in self._schema.getMethods(): |
| if name == method.name: |
| return lambda *args, **kwargs : self._invoke(name, args, kwargs) |
| for property, value in self._properties: |
| if name == property.name: |
| return value |
| if name == "_" + property.name + "_" and property.type == 10: # Dereference references |
| deref = self._session.getObjects(_objectId=value, _broker=self._broker) |
| if len(deref) != 1: |
| return None |
| else: |
| return deref[0] |
| for statistic, value in self._statistics: |
| if name == statistic.name: |
| return value |
| raise Exception("Type Object has no attribute '%s'" % name) |
| |
| def _sendMethodRequest(self, name, args, kwargs, synchronous=False): |
| for method in self._schema.getMethods(): |
| if name == method.name: |
| aIdx = 0 |
| sendCodec = Codec(self._broker.conn.spec) |
| seq = self._session.seqMgr._reserve((method, synchronous)) |
| self._broker._setHeader(sendCodec, 'M', seq) |
| self._objectId.encode(sendCodec) |
| self._schema.getKey().encode(sendCodec) |
| sendCodec.write_str8(name) |
| |
| count = 0 |
| for arg in method.arguments: |
| if arg.dir.find("I") != -1: |
| count += 1 |
| if count != len(args): |
| raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args))) |
| |
| for arg in method.arguments: |
| if arg.dir.find("I") != -1: |
| self._session._encodeValue(sendCodec, args[aIdx], arg.type) |
| aIdx += 1 |
| smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % |
| (self._objectId.getBrokerBank(), self._objectId.getAgentBank())) |
| if synchronous: |
| try: |
| self._broker.cv.acquire() |
| self._broker.syncInFlight = True |
| finally: |
| self._broker.cv.release() |
| self._broker._send(smsg) |
| return seq |
| return None |
| |
| def _invoke(self, name, args, kwargs): |
| if self._sendMethodRequest(name, args, kwargs, True): |
| try: |
| self._broker.cv.acquire() |
| starttime = time() |
| while self._broker.syncInFlight and self._broker.error == None: |
| self._broker.cv.wait(self._broker.SYNC_TIME) |
| if time() - starttime > self._broker.SYNC_TIME: |
| self._session.seqMgr._release(seq) |
| raise RuntimeError("Timed out waiting for method to respond") |
| finally: |
| self._broker.cv.release() |
| if self._broker.error != None: |
| errorText = self._broker.error |
| self._broker.error = None |
| raise Exception(errorText) |
| return self._broker.syncResult |
| raise Exception("Invalid Method (software defect) [%s]" % name) |
| |
| def _parsePresenceMasks(self, codec, schema): |
| excludeList = [] |
| bit = 0 |
| for property in schema.getProperties(): |
| if property.optional: |
| if bit == 0: |
| mask = codec.read_uint8() |
| bit = 1 |
| if (mask & bit) == 0: |
| excludeList.append(property.name) |
| bit *= 2 |
| if bit == 256: |
| bit = 0 |
| return excludeList |
| |
| class MethodResult(object): |
| """ """ |
| def __init__(self, status, text, outArgs): |
| """ """ |
| self.status = status |
| self.text = text |
| self.outArgs = outArgs |
| |
| def __getattr__(self, name): |
| if name in self.outArgs: |
| return self.outArgs[name] |
| |
| def __repr__(self): |
| return "%s (%d) - %s" % (self.text, self.status, self.outArgs) |
| |
| class ManagedConnection(Thread): |
| """ Thread class for managing a connection. """ |
| DELAY_MIN = 1 |
| DELAY_MAX = 128 |
| DELAY_FACTOR = 2 |
| |
| def __init__(self, broker): |
| Thread.__init__(self) |
| self.broker = broker |
| self.cv = Condition() |
| self.canceled = False |
| |
| def stop(self): |
| """ Tell this thread to stop running and return. """ |
| try: |
| self.cv.acquire() |
| self.canceled = True |
| self.cv.notify() |
| finally: |
| self.cv.release() |
| |
| def disconnected(self): |
| """ Notify the thread that the connection was lost. """ |
| try: |
| self.cv.acquire() |
| self.cv.notify() |
| finally: |
| self.cv.release() |
| |
| def run(self): |
| """ Main body of the running thread. """ |
| delay = self.DELAY_MIN |
| while True: |
| try: |
| self.broker._tryToConnect() |
| try: |
| self.cv.acquire() |
| while (not self.canceled) and self.broker.connected: |
| self.cv.wait() |
| if self.canceled: |
| return |
| delay = self.DELAY_MIN |
| finally: |
| self.cv.release() |
| except socket.error: |
| if delay < self.DELAY_MAX: |
| delay *= self.DELAY_FACTOR |
| except SessionDetached: |
| if delay < self.DELAY_MAX: |
| delay *= self.DELAY_FACTOR |
| except Closed: |
| if delay < self.DELAY_MAX: |
| delay *= self.DELAY_FACTOR |
| |
| try: |
| self.cv.acquire() |
| self.cv.wait(delay) |
| if self.canceled: |
| return |
| finally: |
| self.cv.release() |
| |
| class Broker: |
| """ This object represents a connection (or potential connection) to a QMF broker. """ |
| SYNC_TIME = 60 |
| |
| def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False): |
| self.session = session |
| self.host = host |
| self.port = port |
| self.ssl = ssl |
| self.authUser = authUser |
| self.authPass = authPass |
| self.cv = Condition() |
| self.error = None |
| self.brokerId = None |
| self.connected = False |
| self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid()) |
| if self.session.manageConnections: |
| self.thread = ManagedConnection(self) |
| self.thread.start() |
| else: |
| self.thread = None |
| self._tryToConnect() |
| |
| def isConnected(self): |
| """ Return True if there is an active connection to the broker. """ |
| return self.connected |
| |
| def getError(self): |
| """ Return the last error message seen while trying to connect to the broker. """ |
| return self.error |
| |
| def getBrokerId(self): |
| """ Get broker's unique identifier (UUID) """ |
| return self.brokerId |
| |
| def getBrokerBank(self): |
| """ Return the broker-bank value. This is the value that the broker assigns to |
| objects within its control. This value appears as a field in the ObjectId |
| of objects created by agents controlled by this broker. """ |
| return 1 |
| |
| def getAgent(self, brokerBank, agentBank): |
| """ Return the agent object associated with a particular broker and agent bank value.""" |
| bankKey = (brokerBank, agentBank) |
| if bankKey in self.agents: |
| return self.agents[bankKey] |
| return None |
| |
| def getSessionId(self): |
| """ Get the identifier of the AMQP session to the broker """ |
| return self.amqpSessionId |
| |
| def getAgents(self): |
| """ Get the list of agents reachable via this broker """ |
| return self.agents.values() |
| |
| def getAmqpSession(self): |
| """ Get the AMQP session object for this connected broker. """ |
| return self.amqpSession |
| |
| def getUrl(self): |
| """ """ |
| return "%s:%d" % (self.host, self.port) |
| |
| def getFullUrl(self, noAuthIfGuestDefault=True): |
| """ """ |
| ssl = "" |
| if self.ssl: |
| ssl = "s" |
| auth = "%s/%s@" % (self.authUser, self.authPass) |
| if self.authUser == "" or \ |
| (noAuthIfGuestDefault and self.authUser == "guest" and self.authPass == "guest"): |
| auth = "" |
| return "amqp%s://%s%s:%d" % (ssl, auth, self.host, self.port or 5672) |
| |
| def __repr__(self): |
| if self.connected: |
| return "Broker connected at: %s" % self.getUrl() |
| else: |
| return "Disconnected Broker" |
| |
| def _tryToConnect(self): |
| try: |
| self.agents = {} |
| self.agents[(1,0)] = Agent(self, 0, "BrokerAgent") |
| self.topicBound = False |
| self.syncInFlight = False |
| self.syncRequest = 0 |
| self.syncResult = None |
| self.reqsOutstanding = 1 |
| |
| sock = connect(self.host, self.port) |
| if self.ssl: |
| sock = ssl(sock) |
| self.conn = Connection(sock, username=self.authUser, password=self.authPass) |
| self.conn.start() |
| self.replyName = "reply-%s" % self.amqpSessionId |
| self.amqpSession = self.conn.session(self.amqpSessionId) |
| self.amqpSession.auto_sync = True |
| self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True) |
| self.amqpSession.exchange_bind(exchange="amq.direct", |
| queue=self.replyName, binding_key=self.replyName) |
| self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest", |
| accept_mode=self.amqpSession.accept_mode.none, |
| acquire_mode=self.amqpSession.acquire_mode.pre_acquired) |
| self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb) |
| self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1) |
| self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFF) |
| self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFF) |
| |
| self.topicName = "topic-%s" % self.amqpSessionId |
| self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, auto_delete=True) |
| self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest", |
| accept_mode=self.amqpSession.accept_mode.none, |
| acquire_mode=self.amqpSession.acquire_mode.pre_acquired) |
| self.amqpSession.incoming("tdest").listen(self._replyCb) |
| self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1) |
| self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFF) |
| self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFF) |
| |
| self.connected = True |
| self.session._handleBrokerConnect(self) |
| |
| codec = Codec(self.conn.spec) |
| self._setHeader(codec, 'B') |
| msg = self._message(codec.encoded) |
| self._send(msg) |
| |
| except socket.error, e: |
| self.error = "Socket Error %s - %s" % (e[0], e[1]) |
| raise |
| except Closed, e: |
| self.error = "Connect Failed %d - %s" % (e[0], e[1]) |
| raise |
| except ConnectionFailed, e: |
| self.error = "Connect Failed %d - %s" % (e[0], e[1]) |
| raise |
| |
| def _updateAgent(self, obj): |
| bankKey = (obj.brokerBank, obj.agentBank) |
| if obj._deleteTime == 0: |
| if bankKey not in self.agents: |
| agent = Agent(self, obj.agentBank, obj.label) |
| self.agents[bankKey] = agent |
| if self.session.console != None: |
| self.session.console.newAgent(agent) |
| else: |
| agent = self.agents.pop(bankKey, None) |
| if agent != None and self.session.console != None: |
| self.session.console.delAgent(agent) |
| |
| def _setHeader(self, codec, opcode, seq=0): |
| """ Compose the header of a management message. """ |
| codec.write_uint8(ord('A')) |
| codec.write_uint8(ord('M')) |
| codec.write_uint8(ord('2')) |
| codec.write_uint8(ord(opcode)) |
| codec.write_uint32(seq) |
| |
| def _checkHeader(self, codec): |
| """ Check the header of a management message and extract the opcode and class. """ |
| try: |
| octet = chr(codec.read_uint8()) |
| if octet != 'A': |
| return None, None |
| octet = chr(codec.read_uint8()) |
| if octet != 'M': |
| return None, None |
| octet = chr(codec.read_uint8()) |
| if octet != '2': |
| return None, None |
| opcode = chr(codec.read_uint8()) |
| seq = codec.read_uint32() |
| return opcode, seq |
| except: |
| return None, None |
| |
| def _message (self, body, routing_key="broker"): |
| dp = self.amqpSession.delivery_properties() |
| dp.routing_key = routing_key |
| mp = self.amqpSession.message_properties() |
| mp.content_type = "x-application/qmf" |
| mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName) |
| return Message(dp, mp, body) |
| |
| def _send(self, msg, dest="qpid.management"): |
| self.amqpSession.message_transfer(destination=dest, message=msg) |
| |
| def _shutdown(self): |
| if self.thread: |
| self.thread.stop() |
| self.thread.join() |
| if self.connected: |
| self.amqpSession.incoming("rdest").stop() |
| if self.session.console != None: |
| self.amqpSession.incoming("tdest").stop() |
| self.amqpSession.close() |
| self.conn.close() |
| self.connected = False |
| |
| def _waitForStable(self): |
| try: |
| self.cv.acquire() |
| if not self.connected: |
| return |
| if self.reqsOutstanding == 0: |
| return |
| self.syncInFlight = True |
| starttime = time() |
| while self.reqsOutstanding != 0: |
| self.cv.wait(self.SYNC_TIME) |
| if time() - starttime > self.SYNC_TIME: |
| raise RuntimeError("Timed out waiting for broker to synchronize") |
| finally: |
| self.cv.release() |
| |
| def _incOutstanding(self): |
| try: |
| self.cv.acquire() |
| self.reqsOutstanding += 1 |
| finally: |
| self.cv.release() |
| |
| def _decOutstanding(self): |
| try: |
| self.cv.acquire() |
| self.reqsOutstanding -= 1 |
| if self.reqsOutstanding == 0 and not self.topicBound: |
| self.topicBound = True |
| for key in self.session.bindingKeyList: |
| self.amqpSession.exchange_bind(exchange="qpid.management", |
| queue=self.topicName, binding_key=key) |
| if self.reqsOutstanding == 0 and self.syncInFlight: |
| self.syncInFlight = False |
| self.cv.notify() |
| finally: |
| self.cv.release() |
| |
| def _replyCb(self, msg): |
| codec = Codec(self.conn.spec, msg.body) |
| while True: |
| opcode, seq = self._checkHeader(codec) |
| if opcode == None: return |
| if opcode == 'b': self.session._handleBrokerResp (self, codec, seq) |
| elif opcode == 'p': self.session._handlePackageInd (self, codec, seq) |
| elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq) |
| elif opcode == 'q': self.session._handleClassInd (self, codec, seq) |
| elif opcode == 'm': self.session._handleMethodResp (self, codec, seq) |
| elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg) |
| elif opcode == 'e': self.session._handleEventInd (self, codec, seq) |
| elif opcode == 's': self.session._handleSchemaResp (self, codec, seq) |
| elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True) |
| elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True) |
| elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True) |
| |
| def _exceptionCb(self, data): |
| self.connected = False |
| self.error = data |
| try: |
| self.cv.acquire() |
| if self.syncInFlight: |
| self.cv.notify() |
| finally: |
| self.cv.release() |
| self.session._handleError(self.error) |
| self.session._handleBrokerDisconnect(self) |
| if self.thread: |
| self.thread.disconnected() |
| |
| class Agent: |
| """ """ |
| def __init__(self, broker, agentBank, label): |
| self.broker = broker |
| self.brokerBank = broker.getBrokerBank() |
| self.agentBank = agentBank |
| self.label = label |
| |
| def __repr__(self): |
| return "Agent at bank %d.%d (%s)" % (self.brokerBank, self.agentBank, self.label) |
| |
| def getBroker(self): |
| return self.broker |
| |
| def getBrokerBank(self): |
| return self.brokerBank |
| |
| def getAgentBank(self): |
| return self.agentBank |
| |
| class Event: |
| """ """ |
| def __init__(self, session, broker, codec): |
| self.session = session |
| self.broker = broker |
| self.classKey = ClassKey(codec) |
| self.timestamp = codec.read_int64() |
| self.severity = codec.read_uint8() |
| self.schema = None |
| pname = self.classKey.getPackageName() |
| pkey = self.classKey.getPackageKey() |
| if pname in session.packages: |
| if pkey in session.packages[pname]: |
| self.schema = session.packages[pname][pkey] |
| self.arguments = {} |
| for arg in self.schema.arguments: |
| self.arguments[arg.name] = session._decodeValue(codec, arg.type) |
| |
| def __repr__(self): |
| if self.schema == None: |
| return "<uninterpretable>" |
| out = strftime("%c", gmtime(self.timestamp / 1000000000)) |
| out += " " + self._sevName() + " " + self.classKey.getPackageName() + ":" + self.classKey.getClassName() |
| out += " broker=" + self.broker.getUrl() |
| for arg in self.schema.arguments: |
| disp = self.session._displayValue(self.arguments[arg.name], arg.type).encode("utf8") |
| if " " in disp: |
| disp = "\"" + disp + "\"" |
| out += " " + arg.name + "=" + disp |
| return out |
| |
| def _sevName(self): |
| if self.severity == 0 : return "EMER " |
| if self.severity == 1 : return "ALERT" |
| if self.severity == 2 : return "CRIT " |
| if self.severity == 3 : return "ERROR" |
| if self.severity == 4 : return "WARN " |
| if self.severity == 5 : return "NOTIC" |
| if self.severity == 6 : return "INFO " |
| if self.severity == 7 : return "DEBUG" |
| return "INV-%d" % self.severity |
| |
| def getClassKey(self): |
| return self.classKey |
| |
| def getArguments(self): |
| return self.arguments |
| |
| def getTimestamp(self): |
| return self.timestamp |
| |
| def getName(self): |
| return self.name |
| |
| def getSchema(self): |
| return self.schema |
| |
| class SequenceManager: |
| """ Manage sequence numbers for asynchronous method calls """ |
| def __init__(self): |
| self.lock = Lock() |
| self.sequence = 0 |
| self.pending = {} |
| |
| def _reserve(self, data): |
| """ Reserve a unique sequence number """ |
| try: |
| self.lock.acquire() |
| result = self.sequence |
| self.sequence = self.sequence + 1 |
| self.pending[result] = data |
| finally: |
| self.lock.release() |
| return result |
| |
| def _release(self, seq): |
| """ Release a reserved sequence number """ |
| data = None |
| try: |
| self.lock.acquire() |
| if seq in self.pending: |
| data = self.pending[seq] |
| del self.pending[seq] |
| finally: |
| self.lock.release() |
| return data |
| |
| |
| class DebugConsole(Console): |
| """ """ |
| def brokerConnected(self, broker): |
| print "brokerConnected:", broker |
| |
| def brokerDisconnected(self, broker): |
| print "brokerDisconnected:", broker |
| |
| def newPackage(self, name): |
| print "newPackage:", name |
| |
| def newClass(self, kind, classKey): |
| print "newClass:", kind, classKey |
| |
| def newAgent(self, agent): |
| print "newAgent:", agent |
| |
| def delAgent(self, agent): |
| print "delAgent:", agent |
| |
| def objectProps(self, broker, record): |
| print "objectProps:", record |
| |
| def objectStats(self, broker, record): |
| print "objectStats:", record |
| |
| def event(self, broker, event): |
| print "event:", event |
| |
| def heartbeat(self, agent, timestamp): |
| print "heartbeat:", agent |
| |
| def brokerInfo(self, broker): |
| print "brokerInfo:", broker |
| |