| #!/usr/bin/env python |
| |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import os |
| import getopt |
| import sys |
| import locale |
| from qmf.console import Session |
| |
| _recursive = False |
| _host = "localhost" |
| _durable = False |
| _clusterDurable = False |
| _fileCount = 8 |
| _fileSize = 24 |
| _maxQueueSize = None |
| _maxQueueCount = None |
| _policyType = None |
| _lvq = False |
| _msgSequence = False |
| _ive = False |
| |
| FILECOUNT = "qpid.file_count" |
| FILESIZE = "qpid.file_size" |
| MAX_QUEUE_SIZE = "qpid.max_size" |
| MAX_QUEUE_COUNT = "qpid.max_count" |
| POLICY_TYPE = "qpid.policy_type" |
| CLUSTER_DURABLE = "qpid.persist_last_node" |
| LVQ = "qpid.last_value_queue" |
| MSG_SEQUENCE = "qpid.msg_sequence" |
| IVE = "qpid.ive" |
| |
| def Usage (): |
| print "Usage: qpid-config [OPTIONS]" |
| print " qpid-config [OPTIONS] exchanges [filter-string]" |
| print " qpid-config [OPTIONS] queues [filter-string]" |
| print " qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]" |
| print " qpid-config [OPTIONS] del exchange <name>" |
| print " qpid-config [OPTIONS] add queue <name> [AddQueueOptions]" |
| print " qpid-config [OPTIONS] del queue <name>" |
| print " qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]" |
| print " qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]" |
| print |
| print "Options:" |
| print " -b [ --bindings ] Show bindings in queue or exchange list" |
| print " -a [ --broker-addr ] Address (localhost) Address of qpidd broker" |
| print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]" |
| print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" |
| print |
| print "Add Queue Options:" |
| print " --durable Queue is durable" |
| print " --cluster-durable Queue becomes durable if there is only one functioning cluster node" |
| print " --file-count N (8) Number of files in queue's persistence journal" |
| print " --file-size N (24) File size in pages (64Kib/page)" |
| print " --max-queue-size N Maximum in-memory queue size as bytes" |
| print " --max-queue-count N Maximum in-memory queue size as a number of messages" |
| print " --policy-type TYPE Action taken when queue limit is reached (reject, flow_to_disk, ring, ring_strict)" |
| print " --last-value-queue Enable LVQ behavior on the queue" |
| print |
| print "Add Exchange Options:" |
| print " --durable Exchange is durable" |
| print " --sequence Exchange will insert a 'qpid.msg_sequence' field in the message header" |
| print " with a value that increments for each message forwarded." |
| print " --ive Exchange will behave as an 'initial-value-exchange', keeping a reference" |
| print " to the last message forwarded and enqueuing that message to newly bound" |
| print " queues." |
| print |
| sys.exit (1) |
| |
| class BrokerManager: |
| def __init__ (self): |
| self.brokerName = None |
| self.qmf = None |
| self.broker = None |
| |
| def SetBroker (self, brokerUrl): |
| self.url = brokerUrl |
| self.qmf = Session() |
| self.broker = self.qmf.addBroker(brokerUrl) |
| agents = self.qmf.getAgents() |
| for a in agents: |
| if a.getAgentBank() == 0: |
| self.brokerAgent = a |
| |
| def Disconnect(self): |
| if self.broker: |
| self.qmf.delBroker(self.broker) |
| |
| def Overview (self): |
| exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) |
| queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) |
| print "Total Exchanges: %d" % len (exchanges) |
| etype = {} |
| for ex in exchanges: |
| if ex.type not in etype: |
| etype[ex.type] = 1 |
| else: |
| etype[ex.type] = etype[ex.type] + 1 |
| for typ in etype: |
| print "%15s: %d" % (typ, etype[typ]) |
| |
| print |
| print " Total Queues: %d" % len (queues) |
| _durable = 0 |
| for queue in queues: |
| if queue.durable: |
| _durable = _durable + 1 |
| print " durable: %d" % _durable |
| print " non-durable: %d" % (len (queues) - _durable) |
| |
| def ExchangeList (self, filter): |
| exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) |
| caption1 = "Type " |
| caption2 = "Exchange Name" |
| maxNameLen = len(caption2) |
| for ex in exchanges: |
| if self.match(ex.name, filter): |
| if len(ex.name) > maxNameLen: maxNameLen = len(ex.name) |
| print "%s%-*s Attributes" % (caption1, maxNameLen, caption2) |
| line = "" |
| for i in range(((maxNameLen + len(caption1)) / 5) + 5): |
| line += "=====" |
| print line |
| |
| for ex in exchanges: |
| if self.match (ex.name, filter): |
| print "%-10s%-*s " % (ex.type, maxNameLen, ex.name), |
| args = ex.arguments |
| if ex.durable: print "--durable", |
| if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence", |
| if IVE in args and args[IVE] == 1: print "--ive", |
| print |
| |
| def ExchangeListRecurse (self, filter): |
| exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) |
| bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) |
| queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) |
| for ex in exchanges: |
| if self.match (ex.name, filter): |
| print "Exchange '%s' (%s)" % (ex.name, ex.type) |
| for bind in bindings: |
| if bind.exchangeRef == ex.getObjectId(): |
| qname = "<unknown>" |
| queue = self.findById (queues, bind.queueRef) |
| if queue != None: |
| qname = queue.name |
| print " bind [%s] => %s" % (bind.bindingKey, qname) |
| |
| |
| def QueueList (self, filter): |
| queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) |
| |
| caption = "Queue Name" |
| maxNameLen = len(caption) |
| for q in queues: |
| if self.match (q.name, filter): |
| if len(q.name) > maxNameLen: maxNameLen = len(q.name) |
| print "%-*s Attributes" % (maxNameLen, caption) |
| line = "" |
| for i in range((maxNameLen / 5) + 5): |
| line += "=====" |
| print line |
| |
| for q in queues: |
| if self.match (q.name, filter): |
| print "%-*s " % (maxNameLen, q.name), |
| args = q.arguments |
| if q.durable: print "--durable", |
| if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable", |
| if q.autoDelete: print "auto-del", |
| if q.exclusive: print "excl", |
| if FILESIZE in args: print "--file-size=%d" % args[FILESIZE], |
| if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT], |
| if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" % args[MAX_QUEUE_SIZE], |
| if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % args[MAX_QUEUE_COUNT], |
| if POLICY_TYPE in args: print "--policy-type=%s" % args[POLICY_TYPE], |
| if LVQ in args and args[LVQ] == 1: print "--last-value-queue", |
| print |
| |
| def QueueListRecurse (self, filter): |
| exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) |
| bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) |
| queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) |
| for queue in queues: |
| if self.match (queue.name, filter): |
| print "Queue '%s'" % queue.name |
| for bind in bindings: |
| if bind.queueRef == queue.getObjectId(): |
| ename = "<unknown>" |
| ex = self.findById (exchanges, bind.exchangeRef) |
| if ex != None: |
| ename = ex.name |
| if ename == "": |
| ename = "''" |
| print " bind [%s] => %s" % (bind.bindingKey, ename) |
| |
| def AddExchange (self, args): |
| if len (args) < 2: |
| Usage () |
| etype = args[0] |
| ename = args[1] |
| declArgs = {} |
| if _msgSequence: |
| declArgs[MSG_SEQUENCE] = 1 |
| if _ive: |
| declArgs[IVE] = 1 |
| self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, durable=_durable, arguments=declArgs) |
| |
| def DelExchange (self, args): |
| if len (args) < 1: |
| Usage () |
| ename = args[0] |
| self.broker.getAmqpSession().exchange_delete (exchange=ename) |
| |
| def AddQueue (self, args): |
| if len (args) < 1: |
| Usage () |
| qname = args[0] |
| declArgs = {} |
| if _durable: |
| declArgs[FILECOUNT] = _fileCount |
| declArgs[FILESIZE] = _fileSize |
| |
| if _maxQueueSize: |
| declArgs[MAX_QUEUE_SIZE] = _maxQueueSize |
| if _maxQueueCount: |
| declArgs[MAX_QUEUE_COUNT] = _maxQueueCount |
| if _policyType: |
| declArgs[POLICY_TYPE] = _policyType |
| if _clusterDurable: |
| declArgs[CLUSTER_DURABLE] = 1 |
| if _lvq: |
| declArgs[LVQ] = 1 |
| |
| self.broker.getAmqpSession().queue_declare (queue=qname, durable=_durable, arguments=declArgs) |
| |
| def DelQueue (self, args): |
| if len (args) < 1: |
| Usage () |
| qname = args[0] |
| self.broker.getAmqpSession().queue_delete (queue=qname) |
| |
| def Bind (self, args): |
| if len (args) < 2: |
| Usage () |
| ename = args[0] |
| qname = args[1] |
| key = "" |
| if len (args) > 2: |
| key = args[2] |
| self.broker.getAmqpSession().exchange_bind (queue=qname, exchange=ename, binding_key=key) |
| |
| def Unbind (self, args): |
| if len (args) < 2: |
| Usage () |
| ename = args[0] |
| qname = args[1] |
| key = "" |
| if len (args) > 2: |
| key = args[2] |
| self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key) |
| |
| def findById (self, items, id): |
| for item in items: |
| if item.getObjectId() == id: |
| return item |
| return None |
| |
| def match (self, name, filter): |
| if filter == "": |
| return True |
| if name.find (filter) == -1: |
| return False |
| return True |
| |
| def YN (bool): |
| if bool: |
| return 'Y' |
| return 'N' |
| |
| |
| ## |
| ## Main Program |
| ## |
| |
| try: |
| longOpts = ("durable", "cluster-durable", "bindings", "broker-addr=", "file-count=", |
| "file-size=", "max-queue-size=", "max-queue-count=", "policy-type=", |
| "last-value-queue", "sequence", "ive") |
| (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "a:b", longOpts) |
| except: |
| Usage () |
| |
| try: |
| encoding = locale.getpreferredencoding() |
| cargs = [a.decode(encoding) for a in encArgs] |
| except: |
| cargs = encArgs |
| |
| for opt in optlist: |
| if opt[0] == "-b" or opt[0] == "--bindings": |
| _recursive = True |
| if opt[0] == "-a" or opt[0] == "--broker-addr": |
| _host = opt[1] |
| if opt[0] == "--durable": |
| _durable = True |
| if opt[0] == "--cluster-durable": |
| _clusterDurable = True |
| if opt[0] == "--file-count": |
| _fileCount = int (opt[1]) |
| if opt[0] == "--file-size": |
| _fileSize = int (opt[1]) |
| if opt[0] == "--max-queue-size": |
| _maxQueueSize = int (opt[1]) |
| if opt[0] == "--max-queue-count": |
| _maxQueueCount = int (opt[1]) |
| if opt[0] == "--policy-type": |
| _policyType = opt[1] |
| if opt[0] == "--last-value-queue": |
| _lvq = True |
| if opt[0] == "--sequence": |
| _msgSequence = True |
| if opt[0] == "--ive": |
| _ive = True |
| |
| nargs = len (cargs) |
| bm = BrokerManager () |
| |
| try: |
| bm.SetBroker(_host) |
| if nargs == 0: |
| bm.Overview () |
| else: |
| cmd = cargs[0] |
| modifier = "" |
| if nargs > 1: |
| modifier = cargs[1] |
| if cmd == "exchanges": |
| if _recursive: |
| bm.ExchangeListRecurse (modifier) |
| else: |
| bm.ExchangeList (modifier) |
| elif cmd == "queues": |
| if _recursive: |
| bm.QueueListRecurse (modifier) |
| else: |
| bm.QueueList (modifier) |
| elif cmd == "add": |
| if modifier == "exchange": |
| bm.AddExchange (cargs[2:]) |
| elif modifier == "queue": |
| bm.AddQueue (cargs[2:]) |
| else: |
| Usage () |
| elif cmd == "del": |
| if modifier == "exchange": |
| bm.DelExchange (cargs[2:]) |
| elif modifier == "queue": |
| bm.DelQueue (cargs[2:]) |
| else: |
| Usage () |
| elif cmd == "bind": |
| bm.Bind (cargs[1:]) |
| elif cmd == "unbind": |
| bm.Unbind (cargs[1:]) |
| else: |
| Usage () |
| except KeyboardInterrupt: |
| print |
| except Exception,e: |
| print "Failed:", e.args |
| sys.exit(1) |
| |
| bm.Disconnect() |