| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License |
| # |
| |
| """ |
| AMQP 1.0 QMF client for the Qpid C++ broker. |
| |
| This client is based on the Qpid-Proton library which only supports AMQP 1.0, it |
| is intended for forward-looking projects that want to move to the newer client |
| libraries. One important feature is that it does not start background threads, |
| which makes it more suitable for environments that may fork. |
| """ |
| |
| from proton import Message |
| from proton.utils import BlockingConnection, IncomingMessageHandler, ConnectionException |
| import threading, struct, time |
| |
| class ReconnectDelays(object): |
| """ |
| An iterable that returns a (possibly unlimited) sequence of delays to for |
| successive reconnect attempts. |
| """ |
| |
| def __init__(self, shortest, longest, repeat=True): |
| """ |
| First delay is 0, then `shortest`. Successive delays are doubled up to a |
| maximum of `longest`. If `repeat` is a number > 0 then the `longest` value |
| is generated `repeat` more times. If `repeat` is True, the longest |
| value is returned without limit. |
| |
| For example: ReconnectDelays(.125, 1, True) will generate the following sequence: |
| 0, .125, .25, .5, 1, 1, 1, 1, 1, 1 ... (forever) |
| """ |
| if shortest <= 0 or shortest > longest or repeat < 0: |
| raise ValueError("invalid arguments for reconnect_delays()") |
| self.shortest, self.longest, self.repeat = shortest, longest, repeat |
| |
| def _generate(self): |
| yield 0 |
| delay = self.shortest |
| while delay < self.longest: |
| yield delay |
| delay *= 2 |
| yield self.longest |
| if self.repeat is True: |
| while True: |
| yield self.longest |
| elif self.repeat: |
| for i in xrange(self.repeat): |
| yield self.longest |
| |
| def __iter__(self): |
| return self._generate() |
| |
| |
| class SyncRequestResponse(IncomingMessageHandler): |
| """ |
| Implementation of the synchronous request-responce (aka RPC) pattern. |
| @ivar address: Address for all requests, may be None. |
| @ivar connection: Connection for requests and responses. |
| """ |
| |
| def __init__(self, connection, address=None): |
| """ |
| Send requests and receive responses. A single instance can send many requests |
| to the same or different addresses. |
| |
| @param connection: A L{BlockingConnection} |
| @param address: Address for all requests. |
| If not specified, each request must have the address property set. |
| Sucessive messages may have different addresses. |
| """ |
| super(SyncRequestResponse, self).__init__() |
| self.address = address |
| self.response = None |
| self._cid = 0 |
| self.lock = threading.Lock() |
| self.reconnect(connection) |
| |
| def reconnect(self, connection): |
| self.connection = connection |
| self.sender = self.connection.create_sender(self.address) |
| # dynamic=true generates a unique address dynamically for this receiver. |
| # credit=1 because we want to receive 1 response message initially. |
| self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) |
| |
| def _next(self): |
| """Get the next correlation ID""" |
| self.lock.acquire() |
| try: |
| self._cid += 1; |
| return struct.pack("I", self._cid).decode('ascii') |
| finally: |
| self.lock.release() |
| |
| def call(self, request): |
| """ |
| Send a request message, wait for and return the response message. |
| |
| @param request: A L{proton.Message}. If L{self.address} is not set the |
| L{self.address} must be set and will be used. |
| """ |
| return self.wait(self.send(request)) |
| |
| def send(self, request): |
| """ |
| Send a request and return the correlation_id immediately. Use wait() to get the response. |
| @param request: A L{proton.Message}. If L{self.address} is not set the |
| L{self.address} must be set and will be used. |
| """ |
| if not self.address and not request.address: |
| raise ValueError("Request message has no address: %s" % request) |
| request.reply_to = self.reply_to |
| request.correlation_id = self._next() |
| self.sender.send(request) |
| return request.correlation_id |
| |
| def wait(self, correlation_id): |
| """Wait for and return a single response to a request previously sent with send()""" |
| def wakeup(): |
| return self.response and (self.response.correlation_id == correlation_id) |
| self.connection.wait(wakeup, msg="Waiting for response") |
| response = self.response |
| self.response = None # Ready for next response. |
| self.receiver.flow(1) # Set up credit for the next response. |
| return response |
| |
| @property |
| def reply_to(self): |
| """Return the dynamic address of our receiver.""" |
| return self.receiver.remote_source.address |
| |
| def on_message(self, event): |
| """Called when we receive a message for our receiver.""" |
| self.response = event.message |
| self.connection.container.yield_() # Wake up the wait() loop to handle the message. |
| |
| |
| class BrokerAgent(object): |
| """Proxy for a manageable Qpid broker""" |
| |
| @staticmethod |
| def connection(url=None, timeout=10, ssl_domain=None, sasl=None): |
| """Return a BlockingConnection suitable for use with a BrokerAgent.""" |
| return BlockingConnection(url, |
| timeout=timeout, |
| ssl_domain=ssl_domain, |
| allowed_mechs=str(sasl.mechs) if sasl else None, |
| user=str(sasl.user) if sasl else None, |
| password=str(sasl.password) if sasl else None) |
| |
| @staticmethod |
| def connect(url=None, timeout=10, ssl_domain=None, sasl=None, reconnect_delays=None): |
| """ |
| Return a BrokerAgent connected with the given parameters. |
| @param reconnect_delays: iterable of delays for successive automatic re-connect attempts, |
| see class ReconnectDelays. If None there is no automatic re-connect |
| """ |
| f = lambda: BrokerAgent.connection(url, timeout, ssl_domain, sasl) |
| ba = BrokerAgent(f()) |
| ba.make_connection = f |
| ba.reconnect_delays = reconnect_delays or [] |
| return ba |
| |
| def __init__(self, connection): |
| """ |
| Create a management node proxy using the given connection. |
| @param locales: Default list of locales for management operations. |
| @param connection: a L{BlockingConnection} to the management agent. |
| """ |
| path = connection.url.path or "qmf.default.direct" |
| self._client = SyncRequestResponse(connection, path) |
| self.reconnect_delays = None |
| |
| def reconnect(self, connection): |
| self._client.reconnect(connection) |
| |
| def close(self): |
| """Shut down the node""" |
| if self._client: |
| self._client.connection.close() |
| self._client = None |
| |
| def __repr__(self): |
| return "%s(%s)"%(self.__class__.__name__, self._client.connection.url) |
| |
| def _reconnect(self): |
| for d in self.reconnect_delays: |
| time.sleep(d) |
| try: |
| self.reconnect(self.make_connection()) |
| return True |
| except ConnectionException: |
| pass |
| return False |
| |
| def _retry(self, f, *args, **kwargs): |
| while True: |
| try: |
| return f(*args, **kwargs) |
| except ConnectionException: |
| if not self._reconnect(): |
| raise |
| |
| def _request(self, opcode, content): |
| props = {u'method' : u'request', |
| u'qmf.opcode' : opcode, |
| u'x-amqp-0-10.app-id' : u'qmf2'} |
| return self._client.call(Message(body=content, properties=props, subject="broker")) |
| |
| def _method(self, method, arguments=None, addr="org.apache.qpid.broker:broker:amqp-broker"): |
| """ |
| Make a L{proton.Message} containing a QMF method request. |
| """ |
| content = {'_object_id' : {'_object_name' : addr}, |
| '_method_name' : method, |
| '_arguments' : arguments or {}} |
| response = self._retry(self._request, u'_method_request', content) |
| if response.properties[u'qmf.opcode'] == u'_exception': |
| raise Exception("management error: %r" % response.body['_values']) |
| if response.properties[u'qmf.opcode'] != u'_method_response': |
| raise Exception("bad response: %r" % response.properties) |
| return response.body['_arguments'] |
| |
| def _gather(self, response): |
| items = response.body |
| while u'partial' in response.properties: |
| response = self._client.wait() |
| items += self._client.wait(response.correlation_id).body |
| return items |
| |
| def _classQuery(self, class_name): |
| query = {'_what' : 'OBJECT', '_schema_id' : {'_class_name' : class_name}} |
| def f(): |
| response = self._request(u'_query_request', query) |
| if response.properties[u'qmf.opcode'] != u'_query_response': |
| raise Exception("bad response") |
| return self._gather(response) |
| return self._retry(f) |
| |
| def _nameQuery(self, object_id): |
| query = {'_what' : 'OBJECT', '_object_id' : {'_object_name' : object_id}} |
| def f(): |
| response = self._request(u'_query_request', query) |
| if response.properties[u'qmf.opcode'] != u'_query_response': |
| raise Exception("bad response") |
| items = self._gather(response) |
| if len(items) == 1: |
| return items[0] |
| return None |
| return self._retry(f) |
| |
| def _getAll(self, cls): |
| return [cls(self, x) for x in self._classQuery(cls.__name__.lower())] |
| |
| def _getSingle(self, cls): |
| l = self._getAll(cls) |
| return l and l[0] |
| |
| def _get(self, cls, oid): |
| x = self._nameQuery(oid) |
| return x and cls(self, x) |
| |
| def getBroker(self): return self._getSingle(Broker) |
| def getCluster(self): return self._getSingle(Cluster) |
| def getHaBroker(self): return self._getSingle(HaBroker) |
| def getAllConnections(self): return self._getAll(Connection) |
| def getConnection(self, oid): return self._get(Connection, "org.apache.qpid.broker:connection:%s" % oid) |
| def getAllSessions(self): return self._getAll(Session) |
| def getSession(self, oid): return self._get(Session, "org.apache.qpid.broker:session:%s" % oid) |
| def getAllSubscriptions(self): return self._getAll(Subscription) |
| def getSubscription(self, oid): return self._get(Subscription, "org.apache.qpid.broker:subscription:%s" % oid) |
| def getAllExchanges(self): return self._getAll(Exchange) |
| def getExchange(self, name): return self._get(Exchange, "org.apache.qpid.broker:exchange:%s" % name) |
| def getAllQueues(self): return self._getAll(Queue) |
| def getQueue(self, name): return self._get(Queue, "org.apache.qpid.broker:queue:%s" % name) |
| def getAllBindings(self): return self._getAll(Binding) |
| def getAllLinks(self): return self._getAll(Link) |
| def getAcl(self): return self._getSingle(Acl) |
| def getMemory(self): return self._getSingle(Memory) |
| |
| def echo(self, sequence = 1, body = "Body"): |
| """Request a response to test the path to the management broker""" |
| return self._method('echo', {'sequence' : sequence, 'body' : body}) |
| |
| def queueMoveMessages(self, srcQueue, destQueue, qty): |
| """Move messages from one queue to another""" |
| self._method("queueMoveMessages", {'srcQueue':srcQueue,'destQueue':destQueue,'qty':qty}) |
| |
| def queueRedirect(self, sourceQueue, targetQueue): |
| """Enable/disable delivery redirect for indicated queues""" |
| self._method("queueRedirect", {'sourceQueue':sourceQueue,'targetQueue':targetQueue}) |
| |
| def setLogLevel(self, level): |
| """Set the log level""" |
| self._method("setLogLevel", {'level':level}) |
| |
| def getLogLevel(self): |
| """Get the log level""" |
| return self._method('getLogLevel') |
| |
| def setTimestampConfig(self, receive): |
| """Set the message timestamping configuration""" |
| self._method("setTimestampConfig", {'receive':receive}) |
| |
| def getTimestampConfig(self): |
| """Get the message timestamping configuration""" |
| return self._method('getTimestampConfig') |
| |
| def setLogHiresTimestamp(self, logHires): |
| """Set the high resolution timestamp in logs""" |
| self._method("setLogHiresTimestamp", {'logHires':logHires}) |
| |
| def getLogHiresTimestamp(self): |
| """Get the high resolution timestamp in logs""" |
| return self._method('getLogHiresTimestamp') |
| |
| def addExchange(self, exchange_type, name, options={}, **kwargs): |
| properties = {} |
| properties['exchange-type'] = exchange_type |
| for k,v in options.items(): |
| properties[k] = v |
| for k,v in kwargs.items(): |
| properties[k] = v |
| args = {'type': 'exchange', |
| 'name': name, |
| 'properties': properties, |
| 'strict': True} |
| self._method('create', args) |
| |
| def delExchange(self, name): |
| args = {'type': 'exchange', 'name': name} |
| self._method('delete', args) |
| |
| def addQueue(self, name, options={}, **kwargs): |
| properties = options |
| for k,v in kwargs.items(): |
| properties[k] = v |
| args = {'type': 'queue', |
| 'name': name, |
| 'properties': properties, |
| 'strict': True} |
| self._method('create', args) |
| |
| def delQueue(self, name, if_empty=True, if_unused=True): |
| options = {'if_empty': if_empty, |
| 'if_unused': if_unused} |
| args = {'type': 'queue', |
| 'name': name, |
| 'options': options} |
| self._method('delete', args) |
| |
| def bind(self, exchange, queue, key="", options={}, **kwargs): |
| properties = options |
| for k,v in kwargs.items(): |
| properties[k] = v |
| args = {'type': 'binding', |
| 'name': "%s/%s/%s" % (exchange, queue, key), |
| 'properties': properties, |
| 'strict': True} |
| self._method('create', args) |
| |
| def unbind(self, exchange, queue, key, **kwargs): |
| args = {'type': 'binding', |
| 'name': "%s/%s/%s" % (exchange, queue, key), |
| 'strict': True} |
| self._method('delete', args) |
| |
| def reloadAclFile(self): |
| self._method('reloadACLFile', {}, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") |
| |
| def acl_lookup(self, userName, action, aclObj, aclObjName, propMap): |
| args = {'userId': userName, |
| 'action': action, |
| 'object': aclObj, |
| 'objectName': aclObjName, |
| 'propertyMap': propMap} |
| return self._method('Lookup', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") |
| |
| def acl_lookupPublish(self, userName, exchange, key): |
| args = {'userId': userName, |
| 'exchangeName': exchange, |
| 'routingKey': key} |
| return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") |
| |
| def Redirect(self, sourceQueue, targetQueue): |
| args = {'sourceQueue': sourceQueue, |
| 'targetQueue': targetQueue} |
| return self._method('queueRedirect', args, "org.apache.qpid.broker:broker:amqp-broker") |
| |
| def create(self, _type, name, properties={}, strict=False): |
| """Create an object of the specified type""" |
| args = {'type': _type, |
| 'name': name, |
| 'properties': properties, |
| 'strict': strict} |
| return self._method('create', args) |
| |
| def delete(self, _type, name, options): |
| """Delete an object of the specified type""" |
| args = {'type': _type, |
| 'name': name, |
| 'options': options} |
| return self._method('delete', args) |
| |
| def list(self, _type): |
| """List objects of the specified type""" |
| return [i["_values"] for i in self._classQuery(_type.lower())] |
| |
| def query(self, _type, oid): |
| """Query the current state of an object""" |
| return self._get(self, _type, oid) |
| |
| |
| class BrokerObject(object): |
| def __init__(self, broker, content): |
| self.broker = broker |
| self.content = content |
| |
| @property |
| def values(self): |
| return self.content['_values'] |
| |
| def __getattr__(self, key): |
| return self.values.get(key) |
| |
| def getObjectId(self): |
| return self.content['_object_id']['_object_name'] |
| |
| def getAttributes(self): |
| return self.values |
| |
| def getCreateTime(self): |
| return self.content['_create_ts'] |
| |
| def getDeleteTime(self): |
| return self.content['_delete_ts'] |
| |
| def getUpdateTime(self): |
| return self.content['_update_ts'] |
| |
| def update(self): |
| """ |
| Reload the property values from the agent. |
| """ |
| refreshed = self.broker._get(self.__class__, self.getObjectId()) |
| if refreshed: |
| self.content = refreshed.content |
| self.values = self.content['_values'] |
| else: |
| raise Exception("No longer exists on the broker") |
| |
| def __repr__(self): |
| return "%s(%s)"%(self.__class__.__name__, self.content) |
| |
| def __str__(self): |
| return self.getObjectId() |
| |
| |
| class Broker(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| class Cluster(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| class HaBroker(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| class Memory(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| class Connection(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| def close(self): |
| self.broker._method("close", {}, "org.apache.qpid.broker:connection:%s" % self.address) |
| |
| class Session(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| class Subscription(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| class Exchange(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| class Binding(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| class Queue(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| def purge(self, request): |
| """Discard all or some messages on a queue""" |
| self.broker._method("purge", {'request':request}, "org.apache.qpid.broker:queue:%s" % self.name) |
| |
| def reroute(self, request, useAltExchange, exchange, filter={}): |
| """Remove all or some messages on this queue and route them to an exchange""" |
| self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter}, |
| "org.apache.qpid.broker:queue:%s" % self.name) |
| |
| class Link(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |
| class Acl(BrokerObject): |
| def __init__(self, broker, values): |
| BrokerObject.__init__(self, broker, values) |
| |