blob: 00227a98b9e16b5b41bb8f84e34a22190238a0f3 [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os
from optparse import OptionParser, OptionGroup
import sys
import locale
import socket
import re
from qpid.messaging import Connection
home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools"))
sys.path.append(os.path.join(home, "python"))
from qpidtoollibs import BrokerAgent
from qpidtoollibs import Display, Header, Sorter, YN, Commas, TimeLong
class Config:
def __init__(self):
self._host = "localhost"
self._connTimeout = 10
self._types = ""
self._limit = 50
self._increasing = False
self._sortcol = None
config = Config()
conn_options = {}
def OptionsAndArguments(argv):
""" Set global variables for options, return arguments """
global config
global conn_options
usage = \
"""%prog -g [options]
%prog -c [options]
%prog -e [options]
%prog -q [options] [queue-name]
%prog -u [options]
%prog -m [options]
%prog --acl [options]"""
parser = OptionParser(usage=usage)
group1 = OptionGroup(parser, "General Options")
group1.add_option("-b", "--broker", action="store", type="string", default="localhost", metavar="<url>",
help="URL of the broker to query")
group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>",
help="Maximum time to wait for broker connection (in seconds)")
group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>",
help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
group1.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
group1.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
group1.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
parser.add_option_group(group1)
group2 = OptionGroup(parser, "Command Options")
group2.add_option("-g", "--general", help="Show General Broker Stats", action="store_const", const="g", dest="show")
group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show")
group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show")
group2.add_option("-q", "--queues", help="Show Queues", action="store_const", const="q", dest="show")
group2.add_option("-u", "--subscriptions", help="Show Subscriptions", action="store_const", const="u", dest="show")
group2.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show")
group2.add_option( "--acl", help="Show Access Control List Stats", action="store_const", const="acl", dest="show")
parser.add_option_group(group2)
group3 = OptionGroup(parser, "Display Options")
group3.add_option("-S", "--sort-by", metavar="<colname>", help="Sort by column name")
group3.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)")
group3.add_option("-L", "--limit", type="int", default=50, metavar="<n>", help="Limit output to n rows")
parser.add_option_group(group3)
opts, args = parser.parse_args(args=argv)
if not opts.show:
parser.error("You must specify one of these options: -g, -c, -e, -q, -m, or -u. For details, try $ qpid-stat --help")
config._types = opts.show
config._sortcol = opts.sort_by
config._host = opts.broker
config._connTimeout = opts.timeout
config._increasing = opts.increasing
config._limit = opts.limit
if opts.sasl_mechanism:
conn_options['sasl_mechanisms'] = opts.sasl_mechanism
if opts.ssl_certificate:
conn_options['ssl_certfile'] = opts.ssl_certificate
if opts.ssl_key:
if not opts.ssl_certificate:
parser.error("missing '--ssl-certificate' (required by '--ssl-key')")
conn_options['ssl_keyfile'] = opts.ssl_key
if opts.ha_admin:
conn_options['client_properties'] = {'qpid.ha-admin' : 1}
return args
class IpAddr:
def __init__(self, text):
if text.find("@") != -1:
tokens = text.split("@")
text = tokens[1]
if text.find(":") != -1:
tokens = text.split(":")
text = tokens[0]
self.port = int(tokens[1])
else:
self.port = 5672
self.dottedQuad = socket.gethostbyname(text)
nums = self.dottedQuad.split(".")
self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
def bestAddr(self, addrPortList):
bestDiff = 0xFFFFFFFFL
bestAddr = None
for addrPort in addrPortList:
diff = IpAddr(addrPort[0]).addr ^ self.addr
if diff < bestDiff:
bestDiff = diff
bestAddr = addrPort
return bestAddr
class BrokerManager:
def __init__(self):
self.brokerName = None
self.connection = None
self.broker = None
self.cluster = None
def SetBroker(self, brokerUrl):
self.url = brokerUrl
self.connection = Connection.establish(self.url, **conn_options)
self.broker = BrokerAgent(self.connection)
def Disconnect(self):
""" Release any allocated brokers. Ignore any failures as the tool is
shutting down.
"""
try:
self.connection.close()
except:
pass
def _getCluster(self):
packages = self.qmf.getPackages()
if "org.apache.qpid.cluster" not in packages:
return None
clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
if len(clusters) == 0:
print "Clustering is installed but not enabled on the broker."
return None
self.cluster = clusters[0]
def _getHostList(self, urlList):
hosts = []
hostAddr = IpAddr(config._host)
for url in urlList:
if url.find("amqp:") != 0:
raise Exception("Invalid URL 1")
url = url[5:]
addrs = str(url).split(",")
addrList = []
for addr in addrs:
tokens = addr.split(":")
if len(tokens) != 3:
raise Exception("Invalid URL 2")
addrList.append((tokens[1], tokens[2]))
# Find the address in the list that is most likely to be in the same subnet as the address
# with which we made the original QMF connection. This increases the probability that we will
# be able to reach the cluster member.
best = hostAddr.bestAddr(addrList)
bestUrl = best[0] + ":" + best[1]
hosts.append(bestUrl)
return hosts
def displayBroker(self):
disp = Display(prefix=" ")
heads = []
heads.append(Header('uptime', Header.DURATION))
heads.append(Header('cluster', Header.NONE))
heads.append(Header('connections', Header.COMMAS))
heads.append(Header('sessions', Header.COMMAS))
heads.append(Header('exchanges', Header.COMMAS))
heads.append(Header('queues', Header.COMMAS))
rows = []
broker = self.broker.getBroker()
cluster = self.broker.getCluster()
clusterInfo = cluster and cluster.clusterName + "<" + cluster.status + ">" or "<standalone>"
connections = self.getConnectionMap()
sessions = self.getSessionMap()
exchanges = self.getExchangeMap()
queues = self.getQueueMap()
row = (broker.getUpdateTime() - broker.getCreateTime(),
clusterInfo,
len(connections), len(sessions),
len(exchanges), len(queues))
rows.append(row)
disp.formattedTable('Broker Summary:', heads, rows)
if 'queueCount' not in broker.values:
return
print
heads = []
heads.append(Header('Statistic'))
heads.append(Header('Messages', Header.COMMAS))
heads.append(Header('Bytes', Header.COMMAS))
rows = []
rows.append(['queue-depth', broker.msgDepth, broker.byteDepth])
rows.append(['total-enqueues', broker.msgTotalEnqueues, broker.byteTotalEnqueues])
rows.append(['total-dequeues', broker.msgTotalDequeues, broker.byteTotalDequeues])
rows.append(['persistent-enqueues', broker.msgPersistEnqueues, broker.bytePersistEnqueues])
rows.append(['persistent-dequeues', broker.msgPersistDequeues, broker.bytePersistDequeues])
rows.append(['transactional-enqueues', broker.msgTxnEnqueues, broker.byteTxnEnqueues])
rows.append(['transactional-dequeues', broker.msgTxnDequeues, broker.byteTxnDequeues])
rows.append(['flow-to-disk-depth', broker.msgFtdDepth, broker.byteFtdDepth])
rows.append(['flow-to-disk-enqueues', broker.msgFtdEnqueues, broker.byteFtdEnqueues])
rows.append(['flow-to-disk-dequeues', broker.msgFtdDequeues, broker.byteFtdDequeues])
rows.append(['acquires', broker.acquires, None])
rows.append(['releases', broker.releases, None])
rows.append(['discards-no-route', broker.discardsNoRoute, None])
rows.append(['discards-ttl-expired', broker.discardsTtl, None])
rows.append(['discards-limit-overflow', broker.discardsOverflow, None])
rows.append(['discards-ring-overflow', broker.discardsRing, None])
rows.append(['discards-lvq-replace', broker.discardsLvq, None])
rows.append(['discards-subscriber-reject', broker.discardsSubscriber, None])
rows.append(['discards-purged', broker.discardsPurge, None])
rows.append(['reroutes', broker.reroutes, None])
rows.append(['abandoned', broker.abandoned, None])
rows.append(['abandoned-via-alt', broker.abandonedViaAlt, None])
disp.formattedTable('Aggregate Broker Statistics:', heads, rows)
def displayConn(self):
disp = Display(prefix=" ")
heads = []
heads.append(Header('connection'))
heads.append(Header('cproc'))
heads.append(Header('cpid'))
heads.append(Header('mech'))
heads.append(Header('auth'))
heads.append(Header('connected', Header.DURATION))
heads.append(Header('idle', Header.DURATION))
heads.append(Header('msgIn', Header.KMG))
heads.append(Header('msgOut', Header.KMG))
rows = []
connections = self.broker.getAllConnections()
broker = self.broker.getBroker()
for conn in connections:
row = []
row.append(conn.address)
row.append(conn.remoteProcessName)
row.append(conn.remotePid)
row.append(conn.saslMechanism)
row.append(conn.authIdentity)
row.append(broker.getUpdateTime() - conn.getCreateTime())
row.append(broker.getUpdateTime() - conn.getUpdateTime())
row.append(conn.msgsFromClient)
row.append(conn.msgsToClient)
rows.append(row)
title = "Connections"
if config._sortcol:
sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
dispRows = sorter.getSorted()
else:
dispRows = rows
disp.formattedTable(title, heads, dispRows)
def displaySession(self):
disp = Display(prefix=" ")
def displayExchange(self):
disp = Display(prefix=" ")
heads = []
heads.append(Header("exchange"))
heads.append(Header("type"))
heads.append(Header("dur", Header.Y))
heads.append(Header("bind", Header.KMG))
heads.append(Header("msgIn", Header.KMG))
heads.append(Header("msgOut", Header.KMG))
heads.append(Header("msgDrop", Header.KMG))
heads.append(Header("byteIn", Header.KMG))
heads.append(Header("byteOut", Header.KMG))
heads.append(Header("byteDrop", Header.KMG))
rows = []
exchanges = self.broker.getAllExchanges()
for ex in exchanges:
row = []
row.append(ex.name)
row.append(ex.type)
row.append(ex.durable)
row.append(ex.bindingCount)
row.append(ex.msgReceives)
row.append(ex.msgRoutes)
row.append(ex.msgDrops)
row.append(ex.byteReceives)
row.append(ex.byteRoutes)
row.append(ex.byteDrops)
rows.append(row)
title = "Exchanges"
if config._sortcol:
sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
dispRows = sorter.getSorted()
else:
dispRows = rows
disp.formattedTable(title, heads, dispRows)
def displayQueues(self):
disp = Display(prefix=" ")
heads = []
heads.append(Header("queue"))
heads.append(Header("dur", Header.Y))
heads.append(Header("autoDel", Header.Y))
heads.append(Header("excl", Header.Y))
heads.append(Header("msg", Header.KMG))
heads.append(Header("msgIn", Header.KMG))
heads.append(Header("msgOut", Header.KMG))
heads.append(Header("bytes", Header.KMG))
heads.append(Header("bytesIn", Header.KMG))
heads.append(Header("bytesOut", Header.KMG))
heads.append(Header("cons", Header.KMG))
heads.append(Header("bind", Header.KMG))
rows = []
queues = self.broker.getAllQueues()
for q in queues:
row = []
row.append(q.name)
row.append(q.durable)
row.append(q.autoDelete)
row.append(q.exclusive)
row.append(q.msgDepth)
row.append(q.msgTotalEnqueues)
row.append(q.msgTotalDequeues)
row.append(q.byteDepth)
row.append(q.byteTotalEnqueues)
row.append(q.byteTotalDequeues)
row.append(q.consumerCount)
row.append(q.bindingCount)
rows.append(row)
title = "Queues"
if config._sortcol:
sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
dispRows = sorter.getSorted()
else:
dispRows = rows
disp.formattedTable(title, heads, dispRows)
def displayQueue(self, name):
queue = self.broker.getQueue(name)
if not queue:
print "Queue '%s' not found" % name
return
disp = Display(prefix=" ")
heads = []
heads.append(Header('Name'))
heads.append(Header('Durable', Header.YN))
heads.append(Header('AutoDelete', Header.YN))
heads.append(Header('Exclusive', Header.YN))
heads.append(Header('FlowStopped', Header.YN))
heads.append(Header('FlowStoppedCount', Header.COMMAS))
heads.append(Header('Consumers', Header.COMMAS))
heads.append(Header('Bindings', Header.COMMAS))
rows = []
rows.append([queue.name, queue.durable, queue.autoDelete, queue.exclusive,
queue.flowStopped, queue.flowStoppedCount,
queue.consumerCount, queue.bindingCount])
disp.formattedTable("Properties:", heads, rows)
print
heads = []
heads.append(Header('Property'))
heads.append(Header('Value'))
rows = []
rows.append(['arguments', queue.arguments])
rows.append(['alt-exchange', queue.altExchange])
disp.formattedTable("Optional Properties:", heads, rows)
print
heads = []
heads.append(Header('Statistic'))
heads.append(Header('Messages', Header.COMMAS))
heads.append(Header('Bytes', Header.COMMAS))
rows = []
rows.append(['queue-depth', queue.msgDepth, queue.byteDepth])
rows.append(['total-enqueues', queue.msgTotalEnqueues, queue.byteTotalEnqueues])
rows.append(['total-dequeues', queue.msgTotalDequeues, queue.byteTotalDequeues])
rows.append(['persistent-enqueues', queue.msgPersistEnqueues, queue.bytePersistEnqueues])
rows.append(['persistent-dequeues', queue.msgPersistDequeues, queue.bytePersistDequeues])
rows.append(['transactional-enqueues', queue.msgTxnEnqueues, queue.byteTxnEnqueues])
rows.append(['transactional-dequeues', queue.msgTxnDequeues, queue.byteTxnDequeues])
rows.append(['flow-to-disk-depth', queue.msgFtdDepth, queue.byteFtdDepth])
rows.append(['flow-to-disk-enqueues', queue.msgFtdEnqueues, queue.byteFtdEnqueues])
rows.append(['flow-to-disk-dequeues', queue.msgFtdDequeues, queue.byteFtdDequeues])
rows.append(['acquires', queue.acquires, None])
rows.append(['releases', queue.releases, None])
rows.append(['discards-ttl-expired', queue.discardsTtl, None])
rows.append(['discards-limit-overflow', queue.discardsOverflow, None])
rows.append(['discards-ring-overflow', queue.discardsRing, None])
rows.append(['discards-lvq-replace', queue.discardsLvq, None])
rows.append(['discards-subscriber-reject', queue.discardsSubscriber, None])
rows.append(['discards-purged', queue.discardsPurge, None])
rows.append(['reroutes', queue.reroutes, None])
disp.formattedTable("Statistics:", heads, rows)
def displaySubscriptions(self):
disp = Display(prefix=" ")
heads = []
heads.append(Header("subscr"))
heads.append(Header("queue"))
heads.append(Header("conn"))
heads.append(Header("procName"))
heads.append(Header("procId"))
heads.append(Header("browse", Header.Y))
heads.append(Header("acked", Header.Y))
heads.append(Header("excl", Header.Y))
heads.append(Header("creditMode"))
heads.append(Header("delivered", Header.COMMAS))
heads.append(Header("sessUnacked", Header.COMMAS))
rows = []
subscriptions = self.broker.getAllSubscriptions()
sessions = self.getSessionMap()
connections = self.getConnectionMap()
for s in subscriptions:
row = []
try:
row.append(s.name)
row.append(s.queueRef)
session = sessions[s.sessionRef]
connection = connections[session.connectionRef]
row.append(connection.address)
row.append(connection.remoteProcessName)
row.append(connection.remotePid)
row.append(s.browsing)
row.append(s.acknowledged)
row.append(s.exclusive)
row.append(s.creditMode)
row.append(s.delivered)
row.append(session.unackedMessages)
rows.append(row)
except:
pass
title = "Subscriptions"
if config._sortcol:
sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
dispRows = sorter.getSorted()
else:
dispRows = rows
disp.formattedTable(title, heads, dispRows)
def displayMemory(self):
disp = Display(prefix=" ")
heads = [Header('Statistic'), Header('Value', Header.COMMAS)]
rows = []
memory = self.broker.getMemory()
for k,v in memory.values.items():
if k != 'name':
rows.append([k, v])
disp.formattedTable('Broker Memory Statistics:', heads, rows)
def displayAcl(self):
acl = self.broker.getAcl()
if not acl:
print "ACL Policy Module is not installed"
return
disp = Display(prefix=" ")
heads = [Header('Statistic'), Header('Value')]
rows = []
rows.append(['policy-file', acl.policyFile])
rows.append(['enforcing', YN(acl.enforcingAcl)])
rows.append(['has-transfer-acls', YN(acl.transferAcl)])
rows.append(['last-acl-load', TimeLong(acl.lastAclLoad)])
rows.append(['acl-denials', Commas(acl.aclDenyCount)])
disp.formattedTable('ACL Policy Statistics:', heads, rows)
def getExchangeMap(self):
exchanges = self.broker.getAllExchanges()
emap = {}
for e in exchanges:
emap[e.name] = e
return emap
def getQueueMap(self):
queues = self.broker.getAllQueues()
qmap = {}
for q in queues:
qmap[q.name] = q
return qmap
def getSessionMap(self):
sessions = self.broker.getAllSessions()
smap = {}
for s in sessions:
smap[s.name] = s
return smap
def getConnectionMap(self):
connections = self.broker.getAllConnections()
cmap = {}
for c in connections:
cmap[c.address] = c
return cmap
def displayMain(self, names, main):
if main == 'g': self.displayBroker()
elif main == 'c': self.displayConn()
elif main == 's': self.displaySession()
elif main == 'e': self.displayExchange()
elif main == 'q':
if len(names) >= 1:
self.displayQueue(names[0])
else:
self.displayQueues()
elif main == 'u': self.displaySubscriptions()
elif main == 'm': self.displayMemory()
elif main == 'acl': self.displayAcl()
def display(self, names):
self.displayMain(names, config._types)
def main(argv=None):
args = OptionsAndArguments(argv)
bm = BrokerManager()
try:
bm.SetBroker(config._host)
bm.display(args)
bm.Disconnect()
return 0
except KeyboardInterrupt:
print
except Exception,e:
print "Failed: %s - %s" % (e.__class__.__name__, e)
bm.Disconnect() # try to deallocate brokers
return 1
if __name__ == "__main__":
sys.exit(main())