| #!/usr/bin/env python |
| |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import os |
| import optparse |
| import sys |
| import re |
| import socket |
| import qpid |
| from threading import Condition |
| from qmf.console import Session, Console |
| from qpid.peer import Closed |
| from qpid.connection import Connection, ConnectionFailed |
| from time import sleep |
| |
| class BrokerManager(Console): |
| def __init__(self, host): |
| self.url = host |
| self.objects = {} |
| self.filter = None |
| self.session = Session(self, rcvEvents=False, rcvHeartbeats=False, |
| userBindings=True, manageConnections=True) |
| self.broker = self.session.addBroker(self.url) |
| self.firstError = True |
| |
| def setFilter(self,filter): |
| self.filter = filter |
| |
| def brokerConnected(self, broker): |
| if not self.firstError: |
| print "*** Broker connected" |
| self.firstError = False |
| |
| def brokerDisconnected(self, broker): |
| print "*** Broker connection lost - %s, retrying..." % broker.getError() |
| self.firstError = False |
| self.objects.clear() |
| |
| def objectProps(self, broker, record): |
| className = record.getClassKey().getClassName() |
| if className != "queue": |
| return |
| |
| id = record.getObjectId().__repr__() |
| if id not in self.objects: |
| self.objects[id] = (record.name, None, None) |
| |
| def objectStats(self, broker, record): |
| className = record.getClassKey().getClassName() |
| if className != "queue": |
| return |
| |
| id = record.getObjectId().__repr__() |
| if id not in self.objects: |
| return |
| |
| (name, first, last) = self.objects[id] |
| if first == None: |
| self.objects[id] = (name, record, None) |
| return |
| |
| if len(self.filter) > 0 : |
| match = False |
| |
| for x in self.filter: |
| if x.match(name): |
| match = True |
| break |
| if match == False: |
| return |
| |
| if last == None: |
| lastSample = first |
| else: |
| lastSample = last |
| |
| self.objects[id] = (name, first, record) |
| |
| deltaTime = float (record.getTimestamps()[0] - lastSample.getTimestamps()[0]) |
| if deltaTime < 1000000000.0: |
| return |
| enqueueRate = float (record.msgTotalEnqueues - lastSample.msgTotalEnqueues) / \ |
| (deltaTime / 1000000000.0) |
| dequeueRate = float (record.msgTotalDequeues - lastSample.msgTotalDequeues) / \ |
| (deltaTime / 1000000000.0) |
| print "%-41s%10.2f%11d%13.2f%13.2f" % \ |
| (name, deltaTime / 1000000000, record.msgDepth, enqueueRate, dequeueRate) |
| |
| |
| def Display (self): |
| self.session.bindClass("org.apache.qpid.broker", "queue") |
| print "Queue Name Sec Depth Enq Rate Deq Rate" |
| print "========================================================================================" |
| try: |
| while True: |
| sleep (1) |
| if self.firstError and self.broker.getError(): |
| self.firstError = False |
| print "*** Error: %s, retrying..." % self.broker.getError() |
| except KeyboardInterrupt: |
| print |
| self.session.delBroker(self.broker) |
| |
| ## |
| ## Main Program |
| ## |
| def main(): |
| p = optparse.OptionParser() |
| p.add_option('--broker-address','-a', default='localhost' , help='broker-addr is in the form: [username/password@] hostname | ip-address [:<port>] \n ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost') |
| p.add_option('--filter','-f' ,default=None ,help='a list of comma separated queue names (regex are accepted) to show') |
| |
| options, arguments = p.parse_args() |
| |
| host = options.broker_address |
| filter = [] |
| if options.filter != None: |
| for s in options.filter.split(","): |
| filter.append(re.compile(s)) |
| |
| bm = BrokerManager(host) |
| bm.setFilter(filter) |
| bm.Display() |
| |
| if __name__ == '__main__': |
| main() |
| |