| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import sys |
| from qpidtoollibs.disp import TimeLong |
| try: |
| from uuid import uuid4 |
| except ImportError: |
| from qpid.datatypes import uuid4 |
| |
| class BrokerAgent(object): |
| """ |
| Proxy for a manageable Qpid Broker - Invoke with an opened qpid.messaging.Connection |
| or qpid_messaging.Connection |
| """ |
| def __init__(self, conn): |
| # Use the Message class from the same module as conn which could be qpid.messaging |
| # or qpid_messaging |
| self.message_class = sys.modules[conn.__class__.__module__].Message |
| self.conn = conn |
| self.sess = self.conn.session() |
| self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}}" % str(uuid4()) |
| self.reply_rx = self.sess.receiver(self.reply_to) |
| self.reply_rx.capacity = 10 |
| self.tx = self.sess.sender("qmf.default.direct/broker") |
| self.next_correlator = 1 |
| |
| def close(self): |
| """ |
| Close the proxy session. This will not affect the connection used in creating the object. |
| """ |
| self.sess.close() |
| |
| def _method(self, method, arguments=None, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10): |
| props = {'method' : 'request', |
| 'qmf.opcode' : '_method_request', |
| 'x-amqp-0-10.app-id' : 'qmf2'} |
| correlator = str(self.next_correlator) |
| self.next_correlator += 1 |
| |
| content = {'_object_id' : {'_object_name' : addr}, |
| '_method_name' : method, |
| '_arguments' : arguments or {}} |
| |
| message = self.message_class( |
| content, reply_to=self.reply_to, correlation_id=correlator, |
| properties=props, subject="broker") |
| self.tx.send(message) |
| response = self.reply_rx.fetch(timeout) |
| self.sess.acknowledge() |
| if response.properties['qmf.opcode'] == '_exception': |
| raise Exception("Exception from Agent: %r" % response.content['_values']) |
| if response.properties['qmf.opcode'] != '_method_response': |
| raise Exception("bad response: %r" % response.properties) |
| return response.content['_arguments'] |
| |
| def _sendRequest(self, opcode, content): |
| props = {'method' : 'request', |
| 'qmf.opcode' : opcode, |
| 'x-amqp-0-10.app-id' : 'qmf2'} |
| correlator = str(self.next_correlator) |
| self.next_correlator += 1 |
| message = self.message_class( |
| content, reply_to=self.reply_to, correlation_id=correlator, |
| properties=props, subject="broker") |
| self.tx.send(message) |
| return correlator |
| |
| def _doClassQuery(self, class_name): |
| query = {'_what' : 'OBJECT', |
| '_schema_id' : {'_class_name' : class_name}} |
| correlator = self._sendRequest('_query_request', query) |
| response = self.reply_rx.fetch(10) |
| if response.properties['qmf.opcode'] != '_query_response': |
| raise Exception("bad response") |
| items = [] |
| done = False |
| while not done: |
| for item in response.content: |
| items.append(item) |
| if 'partial' in response.properties: |
| response = self.reply_rx.fetch(10) |
| else: |
| done = True |
| self.sess.acknowledge() |
| return items |
| |
| def _doNameQuery(self, object_id): |
| query = {'_what' : 'OBJECT', '_object_id' : {'_object_name' : object_id}} |
| correlator = self._sendRequest('_query_request', query) |
| response = self.reply_rx.fetch(10) |
| if response.properties['qmf.opcode'] != '_query_response': |
| raise Exception("bad response") |
| items = [] |
| done = False |
| while not done: |
| for item in response.content: |
| items.append(item) |
| if 'partial' in response.properties: |
| response = self.reply_rx.fetch(10) |
| else: |
| done = True |
| self.sess.acknowledge() |
| if len(items) == 1: |
| return items[0] |
| return None |
| |
| def _getAllBrokerObjects(self, cls): |
| items = self._doClassQuery(cls.__name__.lower()) |
| objs = [] |
| for item in items: |
| objs.append(cls(self, item)) |
| return objs |
| |
| def _getBrokerObject(self, cls, oid): |
| obj = self._doNameQuery(oid) |
| if obj: |
| return cls(self, obj) |
| return None |
| |
| def _getSingleObject(self, cls): |
| # |
| # getAllBrokerObjects is used instead of getBrokerObject(Broker, 'amqp-broker') because |
| # of a bug that used to be in the broker whereby by-name queries did not return the |
| # object timestamps. |
| # |
| objects = self._getAllBrokerObjects(cls) |
| if objects: return objects[0] |
| return None |
| |
| def getBroker(self): |
| """ |
| Get the Broker object that contains broker-scope statistics and operations. |
| """ |
| return self._getSingleObject(Broker) |
| |
| |
| def getCluster(self): |
| return self._getSingleObject(Cluster) |
| |
| def getHaBroker(self): |
| return self._getSingleObject(HaBroker) |
| |
| def getAllConnections(self): |
| return self._getAllBrokerObjects(Connection) |
| |
| def getConnection(self, oid): |
| return self._getBrokerObject(Connection, "org.apache.qpid.broker:connection:%s" % oid) |
| |
| def getAllSessions(self): |
| return self._getAllBrokerObjects(Session) |
| |
| def getSession(self, oid): |
| return self._getBrokerObject(Session, "org.apache.qpid.broker:session:%s" % oid) |
| |
| def getAllSubscriptions(self): |
| return self._getAllBrokerObjects(Subscription) |
| |
| def getSubscription(self, oid): |
| return self._getBrokerObject(Subscription, "org.apache.qpid.broker:subscription:%s" % oid) |
| |
| def getAllExchanges(self): |
| return self._getAllBrokerObjects(Exchange) |
| |
| def getExchange(self, name): |
| return self._getBrokerObject(Exchange, "org.apache.qpid.broker:exchange:%s" % name) |
| |
| def getAllQueues(self): |
| return self._getAllBrokerObjects(Queue) |
| |
| def getQueue(self, name): |
| return self._getBrokerObject(Queue, "org.apache.qpid.broker:queue:%s" % name) |
| |
| def getAllBindings(self): |
| return self._getAllBrokerObjects(Binding) |
| |
| def getAllLinks(self): |
| return self._getAllBrokerObjects(Link) |
| |
| def getAcl(self): |
| return self._getSingleObject(Acl) |
| |
| def getMemory(self): |
| return self._getSingleObject(Memory) |
| |
| def echo(self, sequence = 1, body = "Body"): |
| """Request a response to test the path to the management broker""" |
| args = {'sequence' : sequence, 'body' : body} |
| return self._method('echo', args) |
| |
| def connect(self, host, port, durable, authMechanism, username, password, transport): |
| """Establish a connection to another broker""" |
| pass |
| |
| def queueMoveMessages(self, srcQueue, destQueue, qty): |
| """Move messages from one queue to another""" |
| self._method("queueMoveMessages", {'srcQueue':srcQueue,'destQueue':destQueue,'qty':qty}) |
| |
| def queueRedirect(self, sourceQueue, targetQueue): |
| """Enable/disable delivery redirect for indicated queues""" |
| self._method("queueRedirect", {'sourceQueue':sourceQueue,'targetQueue':targetQueue}) |
| |
| def setLogLevel(self, level): |
| """Set the log level""" |
| self._method("setLogLevel", {'level':level}) |
| |
| def getLogLevel(self): |
| """Get the log level""" |
| return self._method('getLogLevel') |
| |
| def setTimestampConfig(self, receive): |
| """Set the message timestamping configuration""" |
| self._method("setTimestampConfig", {'receive':receive}) |
| |
| def getTimestampConfig(self): |
| """Get the message timestamping configuration""" |
| return self._method('getTimestampConfig') |
| |
| def setLogHiresTimestamp(self, logHires): |
| """Set the high resolution timestamp in logs""" |
| self._method("setLogHiresTimestamp", {'logHires':logHires}) |
| |
| def getLogHiresTimestamp(self): |
| """Get the high resolution timestamp in logs""" |
| return self._method('getLogHiresTimestamp') |
| |
| def addExchange(self, exchange_type, name, options={}, **kwargs): |
| properties = {} |
| properties['exchange-type'] = exchange_type |
| for k,v in options.items(): |
| properties[k] = v |
| for k,v in kwargs.items(): |
| properties[k] = v |
| args = {'type': 'exchange', |
| 'name': name, |
| 'properties': properties, |
| 'strict': True} |
| self._method('create', args) |
| |
| def delExchange(self, name): |
| args = {'type': 'exchange', 'name': name} |
| self._method('delete', args) |
| |
| def addQueue(self, name, options={}, **kwargs): |
| properties = options |
| for k,v in kwargs.items(): |
| properties[k] = v |
| args = {'type': 'queue', |
| 'name': name, |
| 'properties': properties, |
| 'strict': True} |
| self._method('create', args) |
| |
| def delQueue(self, name, if_empty=True, if_unused=True): |
| options = {'if_empty': if_empty, |
| 'if_unused': if_unused} |
| |
| args = {'type': 'queue', |
| 'name': name, |
| 'options': options} |
| self._method('delete', args) |
| |
| def bind(self, exchange, queue, key="", options={}, **kwargs): |
| properties = options |
| for k,v in kwargs.items(): |
| properties[k] = v |
| args = {'type': 'binding', |
| 'name': "%s/%s/%s" % (exchange, queue, key), |
| 'properties': properties, |
| 'strict': True} |
| self._method('create', args) |
| |
| def unbind(self, exchange, queue, key, **kwargs): |
| args = {'type': 'binding', |
| 'name': "%s/%s/%s" % (exchange, queue, key), |
| 'strict': True} |
| self._method('delete', args) |
| |
| def reloadAclFile(self): |
| self._method('reloadACLFile', {}, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") |
| |
| def acl_lookup(self, userName, action, aclObj, aclObjName, propMap): |
| args = {'userId': userName, |
| 'action': action, |
| 'object': aclObj, |
| 'objectName': aclObjName, |
| 'propertyMap': propMap} |
| return self._method('Lookup', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") |
| |
| def acl_lookupPublish(self, userName, exchange, key): |
| args = {'userId': userName, |
| 'exchangeName': exchange, |
| 'routingKey': key} |
| return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") |
| |
| def Redirect(self, sourceQueue, targetQueue): |
| args = {'sourceQueue': sourceQueue, |
| 'targetQueue': targetQueue} |
| return self._method('queueRedirect', args, "org.apache.qpid.broker:broker:amqp-broker") |
| |
| def create(self, _type, name, properties={}, strict=False): |
| """Create an object of the specified type""" |
| args = {'type': _type, |
| 'name': name, |
| 'properties': properties, |
| 'strict': strict} |
| return self._method('create', args) |
| |
| def delete(self, _type, name, options): |
| """Delete an object of the specified type""" |
| args = {'type': _type, |
| 'name': name, |
| 'options': options} |
| return self._method('delete', args) |
| |
| def list(self, _type): |
| """List objects of the specified type""" |
| return [i["_values"] for i in self._doClassQuery(_type.lower())] |
| |
| def query(self, _type, oid): |
| """Query the current state of an object""" |
| return self._getBrokerObject(self, _type, oid) |
| |
| |
| class EventHelper(object): |
| def eventAddress(self, pkg='*', cls='*', sev='*'): |
| return "qmf.default.topic/agent.ind.event.%s.%s.%s.#" % (pkg.replace('.', '_'), cls, sev) |
| |
| def event(self, msg): |
| return BrokerEvent(msg) |
| |
| |
| class BrokerEvent(object): |
| def __init__(self, msg): |
| self.msg = msg |
| self.content = msg.content[0] |
| self.values = self.content['_values'] |
| self.schema_id = self.content['_schema_id'] |
| self.name = "%s:%s" % (self.schema_id['_package_name'], self.schema_id['_class_name']) |
| |
| def __repr__(self): |
| rep = "%s %s" % (TimeLong(self.getTimestamp()), self.name) |
| for k,v in self.values.items(): |
| rep = rep + " %s=%s" % (k, v) |
| return rep |
| |
| def __getattr__(self, key): |
| if key not in self.values: |
| return None |
| value = self.values[key] |
| return value |
| |
| def getAttributes(self): |
| return self.values |
| |
| def getTimestamp(self): |
| return self.content['_timestamp'] |
| |
| |
| class BrokerObject(object): |
| def __init__(self, broker, content): |
| self.broker = broker |
| self.content = content |
| self.values = content['_values'] |
| |
| def __getattr__(self, key): |
| if key not in self.values: |
| return None |
| value = self.values[key] |
| if value.__class__ == dict and '_object_name' in value: |
| full_name = value['_object_name'] |
| colon = full_name.find(':') |
| if colon > 0: |
| full_name = full_name[colon+1:] |
| colon = full_name.find(':') |
| if colon > 0: |
| return full_name[colon+1:] |
| return value |
| |
| def getObjectId(self): |
| return self.content['_object_id']['_object_name'] |
| |
| def getAttributes(self): |
| return self.values |
| |
| def getCreateTime(self): |
| return self.content['_create_ts'] |
| |
| def getDeleteTime(self): |
| return self.content['_delete_ts'] |
| |
| def getUpdateTime(self): |
| return self.content['_update_ts'] |
| |
| def update(self): |
| """ |
| Reload the property values from the agent. |
| """ |
| refreshed = self.broker._getBrokerObject(self.__class__, self.getObjectId()) |
| if refreshed: |
| self.content = refreshed.content |
| self.values = self.content['_values'] |
| else: |
| raise Exception("No longer exists on the broker") |
| |
| class Broker(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| class Cluster(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| class HaBroker(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| class Memory(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| class Connection(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| def close(self): |
| self.broker._method("close", {}, "org.apache.qpid.broker:connection:%s" % self.address) |
| |
| class Session(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| class Subscription(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| def __repr__(self): |
| return "subscription name undefined" |
| |
| class Exchange(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| class Binding(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| def __repr__(self): |
| return "Binding key: %s" % self.values['bindingKey'] |
| |
| class Queue(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| def purge(self, request): |
| """Discard all or some messages on a queue""" |
| self.broker._method("purge", {'request':request}, "org.apache.qpid.broker:queue:%s" % self.name) |
| |
| def reroute(self, request, useAltExchange, exchange, filter={}): |
| """Remove all or some messages on this queue and route them to an exchange""" |
| self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter}, |
| "org.apache.qpid.broker:queue:%s" % self.name) |
| |
| class Link(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| class Acl(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |