| #!/usr/bin/env python |
| |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import os |
| from optparse import OptionParser, OptionGroup |
| import sys |
| import locale |
| import socket |
| import re |
| from qpid.messaging import Connection |
| |
| home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools")) |
| sys.path.append(os.path.join(home, "python")) |
| |
| from qpidtoollibs import BrokerAgent |
| from qpidtoollibs import Display, Header, Sorter, YN, Commas, TimeLong |
| |
| |
| class Config: |
| def __init__(self): |
| self._host = "localhost" |
| self._connTimeout = 10 |
| self._types = "" |
| self._limit = 50 |
| self._increasing = False |
| self._sortcol = None |
| |
| config = Config() |
| conn_options = {} |
| |
| def OptionsAndArguments(argv): |
| """ Set global variables for options, return arguments """ |
| |
| global config |
| global conn_options |
| |
| usage = \ |
| """%prog -g [options] |
| %prog -c [options] |
| %prog -e [options] |
| %prog -q [options] [queue-name] |
| %prog -u [options] |
| %prog -m [options] |
| %prog --acl [options]""" |
| |
| parser = OptionParser(usage=usage) |
| |
| group1 = OptionGroup(parser, "General Options") |
| group1.add_option("-b", "--broker", action="store", type="string", default="localhost", metavar="<url>", |
| help="URL of the broker to query") |
| group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", |
| help="Maximum time to wait for broker connection (in seconds)") |
| group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", |
| help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") |
| group1.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)") |
| group1.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)") |
| group1.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.") |
| parser.add_option_group(group1) |
| |
| group2 = OptionGroup(parser, "Command Options") |
| group2.add_option("-g", "--general", help="Show General Broker Stats", action="store_const", const="g", dest="show") |
| group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show") |
| group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show") |
| group2.add_option("-q", "--queues", help="Show Queues", action="store_const", const="q", dest="show") |
| group2.add_option("-u", "--subscriptions", help="Show Subscriptions", action="store_const", const="u", dest="show") |
| group2.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show") |
| group2.add_option( "--acl", help="Show Access Control List Stats", action="store_const", const="acl", dest="show") |
| parser.add_option_group(group2) |
| |
| group3 = OptionGroup(parser, "Display Options") |
| group3.add_option("-S", "--sort-by", metavar="<colname>", help="Sort by column name") |
| group3.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)") |
| group3.add_option("-L", "--limit", type="int", default=50, metavar="<n>", help="Limit output to n rows") |
| parser.add_option_group(group3) |
| |
| opts, args = parser.parse_args(args=argv) |
| |
| if not opts.show: |
| parser.error("You must specify one of these options: -g, -c, -e, -q, -m, or -u. For details, try $ qpid-stat --help") |
| |
| config._types = opts.show |
| config._sortcol = opts.sort_by |
| config._host = opts.broker |
| config._connTimeout = opts.timeout |
| config._increasing = opts.increasing |
| config._limit = opts.limit |
| |
| if opts.sasl_mechanism: |
| conn_options['sasl_mechanisms'] = opts.sasl_mechanism |
| if opts.ssl_certificate: |
| conn_options['ssl_certfile'] = opts.ssl_certificate |
| if opts.ssl_key: |
| if not opts.ssl_certificate: |
| parser.error("missing '--ssl-certificate' (required by '--ssl-key')") |
| conn_options['ssl_keyfile'] = opts.ssl_key |
| if opts.ha_admin: |
| conn_options['client_properties'] = {'qpid.ha-admin' : 1} |
| |
| return args |
| |
| class IpAddr: |
| def __init__(self, text): |
| if text.find("@") != -1: |
| tokens = text.split("@") |
| text = tokens[1] |
| if text.find(":") != -1: |
| tokens = text.split(":") |
| text = tokens[0] |
| self.port = int(tokens[1]) |
| else: |
| self.port = 5672 |
| self.dottedQuad = socket.gethostbyname(text) |
| nums = self.dottedQuad.split(".") |
| self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) |
| |
| def bestAddr(self, addrPortList): |
| bestDiff = 0xFFFFFFFFL |
| bestAddr = None |
| for addrPort in addrPortList: |
| diff = IpAddr(addrPort[0]).addr ^ self.addr |
| if diff < bestDiff: |
| bestDiff = diff |
| bestAddr = addrPort |
| return bestAddr |
| |
| class BrokerManager: |
| def __init__(self): |
| self.brokerName = None |
| self.connection = None |
| self.broker = None |
| self.cluster = None |
| |
| def SetBroker(self, brokerUrl): |
| self.url = brokerUrl |
| self.connection = Connection.establish(self.url, **conn_options) |
| self.broker = BrokerAgent(self.connection) |
| |
| def Disconnect(self): |
| """ Release any allocated brokers. Ignore any failures as the tool is |
| shutting down. |
| """ |
| try: |
| self.connection.close() |
| except: |
| pass |
| |
| def _getCluster(self): |
| packages = self.qmf.getPackages() |
| if "org.apache.qpid.cluster" not in packages: |
| return None |
| |
| clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) |
| if len(clusters) == 0: |
| print "Clustering is installed but not enabled on the broker." |
| return None |
| |
| self.cluster = clusters[0] |
| |
| def _getHostList(self, urlList): |
| hosts = [] |
| hostAddr = IpAddr(config._host) |
| for url in urlList: |
| if url.find("amqp:") != 0: |
| raise Exception("Invalid URL 1") |
| url = url[5:] |
| addrs = str(url).split(",") |
| addrList = [] |
| for addr in addrs: |
| tokens = addr.split(":") |
| if len(tokens) != 3: |
| raise Exception("Invalid URL 2") |
| addrList.append((tokens[1], tokens[2])) |
| |
| # Find the address in the list that is most likely to be in the same subnet as the address |
| # with which we made the original QMF connection. This increases the probability that we will |
| # be able to reach the cluster member. |
| |
| best = hostAddr.bestAddr(addrList) |
| bestUrl = best[0] + ":" + best[1] |
| hosts.append(bestUrl) |
| return hosts |
| |
| def displayBroker(self): |
| disp = Display(prefix=" ") |
| heads = [] |
| heads.append(Header('uptime', Header.DURATION)) |
| heads.append(Header('cluster', Header.NONE)) |
| heads.append(Header('connections', Header.COMMAS)) |
| heads.append(Header('sessions', Header.COMMAS)) |
| heads.append(Header('exchanges', Header.COMMAS)) |
| heads.append(Header('queues', Header.COMMAS)) |
| rows = [] |
| broker = self.broker.getBroker() |
| cluster = self.broker.getCluster() |
| clusterInfo = cluster and cluster.clusterName + "<" + cluster.status + ">" or "<standalone>" |
| connections = self.getConnectionMap() |
| sessions = self.getSessionMap() |
| exchanges = self.getExchangeMap() |
| queues = self.getQueueMap() |
| row = (broker.getUpdateTime() - broker.getCreateTime(), |
| clusterInfo, |
| len(connections), len(sessions), |
| len(exchanges), len(queues)) |
| rows.append(row) |
| disp.formattedTable('Broker Summary:', heads, rows) |
| |
| if 'queueCount' not in broker.values: |
| return |
| |
| print |
| heads = [] |
| heads.append(Header('Statistic')) |
| heads.append(Header('Messages', Header.COMMAS)) |
| heads.append(Header('Bytes', Header.COMMAS)) |
| rows = [] |
| rows.append(['queue-depth', broker.msgDepth, broker.byteDepth]) |
| rows.append(['total-enqueues', broker.msgTotalEnqueues, broker.byteTotalEnqueues]) |
| rows.append(['total-dequeues', broker.msgTotalDequeues, broker.byteTotalDequeues]) |
| rows.append(['persistent-enqueues', broker.msgPersistEnqueues, broker.bytePersistEnqueues]) |
| rows.append(['persistent-dequeues', broker.msgPersistDequeues, broker.bytePersistDequeues]) |
| rows.append(['transactional-enqueues', broker.msgTxnEnqueues, broker.byteTxnEnqueues]) |
| rows.append(['transactional-dequeues', broker.msgTxnDequeues, broker.byteTxnDequeues]) |
| rows.append(['flow-to-disk-depth', broker.msgFtdDepth, broker.byteFtdDepth]) |
| rows.append(['flow-to-disk-enqueues', broker.msgFtdEnqueues, broker.byteFtdEnqueues]) |
| rows.append(['flow-to-disk-dequeues', broker.msgFtdDequeues, broker.byteFtdDequeues]) |
| rows.append(['acquires', broker.acquires, None]) |
| rows.append(['releases', broker.releases, None]) |
| rows.append(['discards-no-route', broker.discardsNoRoute, None]) |
| rows.append(['discards-ttl-expired', broker.discardsTtl, None]) |
| rows.append(['discards-limit-overflow', broker.discardsOverflow, None]) |
| rows.append(['discards-ring-overflow', broker.discardsRing, None]) |
| rows.append(['discards-lvq-replace', broker.discardsLvq, None]) |
| rows.append(['discards-subscriber-reject', broker.discardsSubscriber, None]) |
| rows.append(['discards-purged', broker.discardsPurge, None]) |
| rows.append(['reroutes', broker.reroutes, None]) |
| rows.append(['abandoned', broker.abandoned, None]) |
| rows.append(['abandoned-via-alt', broker.abandonedViaAlt, None]) |
| disp.formattedTable('Aggregate Broker Statistics:', heads, rows) |
| |
| |
| def displayConn(self): |
| disp = Display(prefix=" ") |
| heads = [] |
| heads.append(Header('connection')) |
| heads.append(Header('cproc')) |
| heads.append(Header('cpid')) |
| heads.append(Header('mech')) |
| heads.append(Header('auth')) |
| heads.append(Header('connected', Header.DURATION)) |
| heads.append(Header('idle', Header.DURATION)) |
| heads.append(Header('msgIn', Header.KMG)) |
| heads.append(Header('msgOut', Header.KMG)) |
| rows = [] |
| connections = self.broker.getAllConnections() |
| broker = self.broker.getBroker() |
| for conn in connections: |
| row = [] |
| row.append(conn.address) |
| row.append(conn.remoteProcessName) |
| row.append(conn.remotePid) |
| row.append(conn.saslMechanism) |
| row.append(conn.authIdentity) |
| row.append(broker.getUpdateTime() - conn.getCreateTime()) |
| row.append(broker.getUpdateTime() - conn.getUpdateTime()) |
| row.append(conn.msgsFromClient) |
| row.append(conn.msgsToClient) |
| rows.append(row) |
| title = "Connections" |
| if config._sortcol: |
| sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) |
| dispRows = sorter.getSorted() |
| else: |
| dispRows = rows |
| disp.formattedTable(title, heads, dispRows) |
| |
| def displaySession(self): |
| disp = Display(prefix=" ") |
| |
| def displayExchange(self): |
| disp = Display(prefix=" ") |
| heads = [] |
| heads.append(Header("exchange")) |
| heads.append(Header("type")) |
| heads.append(Header("dur", Header.Y)) |
| heads.append(Header("bind", Header.KMG)) |
| heads.append(Header("msgIn", Header.KMG)) |
| heads.append(Header("msgOut", Header.KMG)) |
| heads.append(Header("msgDrop", Header.KMG)) |
| heads.append(Header("byteIn", Header.KMG)) |
| heads.append(Header("byteOut", Header.KMG)) |
| heads.append(Header("byteDrop", Header.KMG)) |
| rows = [] |
| exchanges = self.broker.getAllExchanges() |
| for ex in exchanges: |
| row = [] |
| row.append(ex.name) |
| row.append(ex.type) |
| row.append(ex.durable) |
| row.append(ex.bindingCount) |
| row.append(ex.msgReceives) |
| row.append(ex.msgRoutes) |
| row.append(ex.msgDrops) |
| row.append(ex.byteReceives) |
| row.append(ex.byteRoutes) |
| row.append(ex.byteDrops) |
| rows.append(row) |
| title = "Exchanges" |
| if config._sortcol: |
| sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) |
| dispRows = sorter.getSorted() |
| else: |
| dispRows = rows |
| disp.formattedTable(title, heads, dispRows) |
| |
| def displayQueues(self): |
| disp = Display(prefix=" ") |
| heads = [] |
| heads.append(Header("queue")) |
| heads.append(Header("dur", Header.Y)) |
| heads.append(Header("autoDel", Header.Y)) |
| heads.append(Header("excl", Header.Y)) |
| heads.append(Header("msg", Header.KMG)) |
| heads.append(Header("msgIn", Header.KMG)) |
| heads.append(Header("msgOut", Header.KMG)) |
| heads.append(Header("bytes", Header.KMG)) |
| heads.append(Header("bytesIn", Header.KMG)) |
| heads.append(Header("bytesOut", Header.KMG)) |
| heads.append(Header("cons", Header.KMG)) |
| heads.append(Header("bind", Header.KMG)) |
| rows = [] |
| queues = self.broker.getAllQueues() |
| for q in queues: |
| row = [] |
| row.append(q.name) |
| row.append(q.durable) |
| row.append(q.autoDelete) |
| row.append(q.exclusive) |
| row.append(q.msgDepth) |
| row.append(q.msgTotalEnqueues) |
| row.append(q.msgTotalDequeues) |
| row.append(q.byteDepth) |
| row.append(q.byteTotalEnqueues) |
| row.append(q.byteTotalDequeues) |
| row.append(q.consumerCount) |
| row.append(q.bindingCount) |
| rows.append(row) |
| title = "Queues" |
| if config._sortcol: |
| sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) |
| dispRows = sorter.getSorted() |
| else: |
| dispRows = rows |
| disp.formattedTable(title, heads, dispRows) |
| |
| |
| def displayQueue(self, name): |
| queue = self.broker.getQueue(name) |
| if not queue: |
| print "Queue '%s' not found" % name |
| return |
| |
| disp = Display(prefix=" ") |
| heads = [] |
| heads.append(Header('Name')) |
| heads.append(Header('Durable', Header.YN)) |
| heads.append(Header('AutoDelete', Header.YN)) |
| heads.append(Header('Exclusive', Header.YN)) |
| heads.append(Header('FlowStopped', Header.YN)) |
| heads.append(Header('FlowStoppedCount', Header.COMMAS)) |
| heads.append(Header('Consumers', Header.COMMAS)) |
| heads.append(Header('Bindings', Header.COMMAS)) |
| rows = [] |
| rows.append([queue.name, queue.durable, queue.autoDelete, queue.exclusive, |
| queue.flowStopped, queue.flowStoppedCount, |
| queue.consumerCount, queue.bindingCount]) |
| disp.formattedTable("Properties:", heads, rows) |
| print |
| |
| heads = [] |
| heads.append(Header('Property')) |
| heads.append(Header('Value')) |
| rows = [] |
| rows.append(['arguments', queue.arguments]) |
| rows.append(['alt-exchange', queue.altExchange]) |
| disp.formattedTable("Optional Properties:", heads, rows) |
| print |
| |
| heads = [] |
| heads.append(Header('Statistic')) |
| heads.append(Header('Messages', Header.COMMAS)) |
| heads.append(Header('Bytes', Header.COMMAS)) |
| rows = [] |
| rows.append(['queue-depth', queue.msgDepth, queue.byteDepth]) |
| rows.append(['total-enqueues', queue.msgTotalEnqueues, queue.byteTotalEnqueues]) |
| rows.append(['total-dequeues', queue.msgTotalDequeues, queue.byteTotalDequeues]) |
| rows.append(['persistent-enqueues', queue.msgPersistEnqueues, queue.bytePersistEnqueues]) |
| rows.append(['persistent-dequeues', queue.msgPersistDequeues, queue.bytePersistDequeues]) |
| rows.append(['transactional-enqueues', queue.msgTxnEnqueues, queue.byteTxnEnqueues]) |
| rows.append(['transactional-dequeues', queue.msgTxnDequeues, queue.byteTxnDequeues]) |
| rows.append(['flow-to-disk-depth', queue.msgFtdDepth, queue.byteFtdDepth]) |
| rows.append(['flow-to-disk-enqueues', queue.msgFtdEnqueues, queue.byteFtdEnqueues]) |
| rows.append(['flow-to-disk-dequeues', queue.msgFtdDequeues, queue.byteFtdDequeues]) |
| rows.append(['acquires', queue.acquires, None]) |
| rows.append(['releases', queue.releases, None]) |
| rows.append(['discards-ttl-expired', queue.discardsTtl, None]) |
| rows.append(['discards-limit-overflow', queue.discardsOverflow, None]) |
| rows.append(['discards-ring-overflow', queue.discardsRing, None]) |
| rows.append(['discards-lvq-replace', queue.discardsLvq, None]) |
| rows.append(['discards-subscriber-reject', queue.discardsSubscriber, None]) |
| rows.append(['discards-purged', queue.discardsPurge, None]) |
| rows.append(['reroutes', queue.reroutes, None]) |
| disp.formattedTable("Statistics:", heads, rows) |
| |
| |
| def displaySubscriptions(self): |
| disp = Display(prefix=" ") |
| heads = [] |
| heads.append(Header("subscr")) |
| heads.append(Header("queue")) |
| heads.append(Header("conn")) |
| heads.append(Header("procName")) |
| heads.append(Header("procId")) |
| heads.append(Header("browse", Header.Y)) |
| heads.append(Header("acked", Header.Y)) |
| heads.append(Header("excl", Header.Y)) |
| heads.append(Header("creditMode")) |
| heads.append(Header("delivered", Header.COMMAS)) |
| heads.append(Header("sessUnacked", Header.COMMAS)) |
| rows = [] |
| subscriptions = self.broker.getAllSubscriptions() |
| sessions = self.getSessionMap() |
| connections = self.getConnectionMap() |
| for s in subscriptions: |
| row = [] |
| try: |
| row.append(s.name) |
| row.append(s.queueRef) |
| session = sessions[s.sessionRef] |
| connection = connections[session.connectionRef] |
| row.append(connection.address) |
| row.append(connection.remoteProcessName) |
| row.append(connection.remotePid) |
| row.append(s.browsing) |
| row.append(s.acknowledged) |
| row.append(s.exclusive) |
| row.append(s.creditMode) |
| row.append(s.delivered) |
| row.append(session.unackedMessages) |
| rows.append(row) |
| except: |
| pass |
| title = "Subscriptions" |
| if config._sortcol: |
| sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) |
| dispRows = sorter.getSorted() |
| else: |
| dispRows = rows |
| disp.formattedTable(title, heads, dispRows) |
| |
| def displayMemory(self): |
| disp = Display(prefix=" ") |
| heads = [Header('Statistic'), Header('Value', Header.COMMAS)] |
| rows = [] |
| memory = self.broker.getMemory() |
| for k,v in memory.values.items(): |
| if k != 'name': |
| rows.append([k, v]) |
| disp.formattedTable('Broker Memory Statistics:', heads, rows) |
| |
| def displayAcl(self): |
| acl = self.broker.getAcl() |
| if not acl: |
| print "ACL Policy Module is not installed" |
| return |
| disp = Display(prefix=" ") |
| heads = [Header('Statistic'), Header('Value')] |
| rows = [] |
| rows.append(['policy-file', acl.policyFile]) |
| rows.append(['enforcing', YN(acl.enforcingAcl)]) |
| rows.append(['has-transfer-acls', YN(acl.transferAcl)]) |
| rows.append(['last-acl-load', TimeLong(acl.lastAclLoad)]) |
| rows.append(['acl-denials', Commas(acl.aclDenyCount)]) |
| disp.formattedTable('ACL Policy Statistics:', heads, rows) |
| |
| def getExchangeMap(self): |
| exchanges = self.broker.getAllExchanges() |
| emap = {} |
| for e in exchanges: |
| emap[e.name] = e |
| return emap |
| |
| def getQueueMap(self): |
| queues = self.broker.getAllQueues() |
| qmap = {} |
| for q in queues: |
| qmap[q.name] = q |
| return qmap |
| |
| def getSessionMap(self): |
| sessions = self.broker.getAllSessions() |
| smap = {} |
| for s in sessions: |
| smap[s.name] = s |
| return smap |
| |
| def getConnectionMap(self): |
| connections = self.broker.getAllConnections() |
| cmap = {} |
| for c in connections: |
| cmap[c.address] = c |
| return cmap |
| |
| def displayMain(self, names, main): |
| if main == 'g': self.displayBroker() |
| elif main == 'c': self.displayConn() |
| elif main == 's': self.displaySession() |
| elif main == 'e': self.displayExchange() |
| elif main == 'q': |
| if len(names) >= 1: |
| self.displayQueue(names[0]) |
| else: |
| self.displayQueues() |
| elif main == 'u': self.displaySubscriptions() |
| elif main == 'm': self.displayMemory() |
| elif main == 'acl': self.displayAcl() |
| |
| def display(self, names): |
| self.displayMain(names, config._types) |
| |
| |
| def main(argv=None): |
| |
| args = OptionsAndArguments(argv) |
| bm = BrokerManager() |
| |
| try: |
| bm.SetBroker(config._host) |
| bm.display(args) |
| bm.Disconnect() |
| return 0 |
| except KeyboardInterrupt: |
| print |
| except Exception,e: |
| print "Failed: %s - %s" % (e.__class__.__name__, e) |
| |
| bm.Disconnect() # try to deallocate brokers |
| return 1 |
| |
| if __name__ == "__main__": |
| sys.exit(main()) |