#!/usr/bin/env python

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

import os
from optparse import OptionParser, OptionGroup
import sys
import locale
import socket
import re
from qpid.messaging import Connection

home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools"))
sys.path.append(os.path.join(home, "python"))

from qpidtoollibs import BrokerAgent
from qpidtoollibs import Display, Header, Sorter, YN, Commas, TimeLong


class Config:
    def __init__(self):
        self._host = "localhost"
        self._connTimeout = 10
        self._types = ""
        self._limit = 50
        self._increasing = False
        self._sortcol = None

config = Config()
conn_options = {}

def OptionsAndArguments(argv):
    """ Set global variables for options, return arguments """

    global config
    global conn_options

    usage = \
"""%prog -g [options]
       %prog -c [options]
       %prog -e [options]
       %prog -q [options] [queue-name]
       %prog -u [options]
       %prog -m [options]
       %prog --acl [options]"""

    parser = OptionParser(usage=usage)

    group1 = OptionGroup(parser, "General Options")
    group1.add_option("-b", "--broker",  action="store", type="string", default="localhost", metavar="<url>",
                      help="URL of the broker to query")
    group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>",
                      help="Maximum time to wait for broker connection (in seconds)")
    group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>",
                      help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
    group1.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
    group1.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
    group1.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
    parser.add_option_group(group1)

    group2 = OptionGroup(parser, "Command Options")
    group2.add_option("-g", "--general", help="Show General Broker Stats",  action="store_const", const="g",   dest="show")
    group2.add_option("-c", "--connections", help="Show Connections",       action="store_const", const="c",   dest="show")
    group2.add_option("-e", "--exchanges", help="Show Exchanges",           action="store_const", const="e",   dest="show")
    group2.add_option("-q", "--queues", help="Show Queues",                 action="store_const", const="q",   dest="show")
    group2.add_option("-u", "--subscriptions", help="Show Subscriptions",   action="store_const", const="u",   dest="show")
    group2.add_option("-m", "--memory", help="Show Broker Memory Stats",    action="store_const", const="m",   dest="show")
    group2.add_option(      "--acl", help="Show Access Control List Stats", action="store_const", const="acl", dest="show")
    parser.add_option_group(group2)

    group3 = OptionGroup(parser, "Display Options")
    group3.add_option("-S", "--sort-by",  metavar="<colname>",                   help="Sort by column name")
    group3.add_option("-I", "--increasing", action="store_true", default=False,  help="Sort by increasing value (default = decreasing)")
    group3.add_option("-L", "--limit", type="int", default=50, metavar="<n>",    help="Limit output to n rows")
    parser.add_option_group(group3)

    opts, args = parser.parse_args(args=argv)

    if not opts.show:
        parser.error("You must specify one of these options: -g, -c, -e, -q, -m, or -u. For details, try $ qpid-stat --help")

    config._types = opts.show
    config._sortcol = opts.sort_by
    config._host = opts.broker
    config._connTimeout = opts.timeout
    config._increasing = opts.increasing
    config._limit = opts.limit

    if opts.sasl_mechanism:
        conn_options['sasl_mechanisms'] = opts.sasl_mechanism
    if opts.ssl_certificate:
        conn_options['ssl_certfile'] = opts.ssl_certificate
    if opts.ssl_key:
        if not opts.ssl_certificate:
            parser.error("missing '--ssl-certificate' (required by '--ssl-key')")
        conn_options['ssl_keyfile'] = opts.ssl_key
    if opts.ha_admin:
        conn_options['client_properties'] = {'qpid.ha-admin' : 1}

    return args

class IpAddr:
    def __init__(self, text):
        if text.find("@") != -1:
            tokens = text.split("@")
            text = tokens[1]
        if text.find(":") != -1:
            tokens = text.split(":")
            text = tokens[0]
            self.port = int(tokens[1])
        else:
            self.port = 5672
        self.dottedQuad = socket.gethostbyname(text)
        nums = self.dottedQuad.split(".")
        self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])

    def bestAddr(self, addrPortList):
        bestDiff = 0xFFFFFFFFL
        bestAddr = None
        for addrPort in addrPortList:
            diff = IpAddr(addrPort[0]).addr ^ self.addr
            if diff < bestDiff:
                bestDiff = diff
                bestAddr = addrPort
        return bestAddr

class BrokerManager:
    def __init__(self):
        self.brokerName = None
        self.connection = None
        self.broker     = None
        self.cluster    = None

    def SetBroker(self, brokerUrl):
        self.url = brokerUrl
        self.connection = Connection.establish(self.url, **conn_options)
        self.broker = BrokerAgent(self.connection)

    def Disconnect(self):
        """ Release any allocated brokers.  Ignore any failures as the tool is
        shutting down.
        """
        try:
            self.connection.close()
        except:
            pass

    def _getCluster(self):
        packages = self.qmf.getPackages()
        if "org.apache.qpid.cluster" not in packages:
            return None

        clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
        if len(clusters) == 0:
            print "Clustering is installed but not enabled on the broker."
            return None

        self.cluster = clusters[0]

    def _getHostList(self, urlList):
        hosts = []
        hostAddr = IpAddr(config._host)
        for url in urlList:
            if url.find("amqp:") != 0:
                raise Exception("Invalid URL 1")
            url = url[5:]
            addrs = str(url).split(",")
            addrList = []
            for addr in addrs:
                tokens = addr.split(":")
                if len(tokens) != 3:
                    raise Exception("Invalid URL 2")
                addrList.append((tokens[1], tokens[2]))

            # Find the address in the list that is most likely to be in the same subnet as the address
            # with which we made the original QMF connection.  This increases the probability that we will
            # be able to reach the cluster member.

            best = hostAddr.bestAddr(addrList)
            bestUrl = best[0] + ":" + best[1]
            hosts.append(bestUrl)
        return hosts

    def displayBroker(self):
        disp = Display(prefix="  ")
        heads = []
        heads.append(Header('uptime', Header.DURATION))
        heads.append(Header('cluster', Header.NONE))
        heads.append(Header('connections', Header.COMMAS))
        heads.append(Header('sessions', Header.COMMAS))
        heads.append(Header('exchanges', Header.COMMAS))
        heads.append(Header('queues', Header.COMMAS))
        rows = []
        broker = self.broker.getBroker()
        cluster = self.broker.getCluster()
        clusterInfo = cluster and cluster.clusterName + "<" + cluster.status + ">" or "<standalone>"
        connections = self.getConnectionMap()
        sessions = self.getSessionMap()
        exchanges = self.getExchangeMap()
        queues = self.getQueueMap()
        row = (broker.getUpdateTime() - broker.getCreateTime(),
               clusterInfo,
               len(connections), len(sessions),
               len(exchanges), len(queues))
        rows.append(row)
        disp.formattedTable('Broker Summary:', heads, rows)

        if 'queueCount' not in broker.values:
            return

        print
        heads = []
        heads.append(Header('Statistic'))
        heads.append(Header('Messages', Header.COMMAS))
        heads.append(Header('Bytes', Header.COMMAS))
        rows = []
        rows.append(['queue-depth',         broker.msgDepth, broker.byteDepth])
        rows.append(['total-enqueues',      broker.msgTotalEnqueues, broker.byteTotalEnqueues])
        rows.append(['total-dequeues',      broker.msgTotalDequeues, broker.byteTotalDequeues])
        rows.append(['persistent-enqueues', broker.msgPersistEnqueues, broker.bytePersistEnqueues])
        rows.append(['persistent-dequeues', broker.msgPersistDequeues, broker.bytePersistDequeues])
        rows.append(['transactional-enqueues', broker.msgTxnEnqueues, broker.byteTxnEnqueues])
        rows.append(['transactional-dequeues', broker.msgTxnDequeues, broker.byteTxnDequeues])
        rows.append(['flow-to-disk-depth', broker.msgFtdDepth, broker.byteFtdDepth])
        rows.append(['flow-to-disk-enqueues', broker.msgFtdEnqueues, broker.byteFtdEnqueues])
        rows.append(['flow-to-disk-dequeues', broker.msgFtdDequeues, broker.byteFtdDequeues])
        rows.append(['acquires', broker.acquires, None])
        rows.append(['releases', broker.releases, None])
        rows.append(['discards-no-route', broker.discardsNoRoute, None])
        rows.append(['discards-ttl-expired', broker.discardsTtl, None])
        rows.append(['discards-limit-overflow', broker.discardsOverflow, None])
        rows.append(['discards-ring-overflow', broker.discardsRing, None])
        rows.append(['discards-lvq-replace', broker.discardsLvq, None])
        rows.append(['discards-subscriber-reject', broker.discardsSubscriber, None])
        rows.append(['discards-purged', broker.discardsPurge, None])
        rows.append(['reroutes', broker.reroutes, None])
        rows.append(['abandoned', broker.abandoned, None])
        rows.append(['abandoned-via-alt', broker.abandonedViaAlt, None])
        disp.formattedTable('Aggregate Broker Statistics:', heads, rows)


    def displayConn(self):
        disp = Display(prefix="  ")
        heads = []
        heads.append(Header('connection'))
        heads.append(Header('cproc'))
        heads.append(Header('cpid'))
        heads.append(Header('mech'))
        heads.append(Header('auth'))
        heads.append(Header('connected', Header.DURATION))
        heads.append(Header('idle', Header.DURATION))
        heads.append(Header('msgIn', Header.KMG))
        heads.append(Header('msgOut', Header.KMG))
        rows = []
        connections = self.broker.getAllConnections()
        broker = self.broker.getBroker()
        for conn in connections:
            row = []
            row.append(conn.address)
            row.append(conn.remoteProcessName)
            row.append(conn.remotePid)
            row.append(conn.saslMechanism)
            row.append(conn.authIdentity)
            row.append(broker.getUpdateTime() - conn.getCreateTime())
            row.append(broker.getUpdateTime() - conn.getUpdateTime())
            row.append(conn.msgsFromClient)
            row.append(conn.msgsToClient)
            rows.append(row)
        title = "Connections"
        if config._sortcol:
            sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
            dispRows = sorter.getSorted()
        else:
            dispRows = rows
        disp.formattedTable(title, heads, dispRows)

    def displaySession(self):
        disp = Display(prefix="  ")

    def displayExchange(self):
        disp = Display(prefix="  ")
        heads = []
        heads.append(Header("exchange"))
        heads.append(Header("type"))
        heads.append(Header("dur", Header.Y))
        heads.append(Header("bind", Header.KMG))
        heads.append(Header("msgIn", Header.KMG))
        heads.append(Header("msgOut", Header.KMG))
        heads.append(Header("msgDrop", Header.KMG))
        heads.append(Header("byteIn", Header.KMG))
        heads.append(Header("byteOut", Header.KMG))
        heads.append(Header("byteDrop", Header.KMG))
        rows = []
        exchanges = self.broker.getAllExchanges()
        for ex in exchanges:
            row = []
            row.append(ex.name)
            row.append(ex.type)
            row.append(ex.durable)
            row.append(ex.bindingCount)
            row.append(ex.msgReceives)
            row.append(ex.msgRoutes)
            row.append(ex.msgDrops)
            row.append(ex.byteReceives)
            row.append(ex.byteRoutes)
            row.append(ex.byteDrops)
            rows.append(row)
        title = "Exchanges"
        if config._sortcol:
            sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
            dispRows = sorter.getSorted()
        else:
            dispRows = rows
        disp.formattedTable(title, heads, dispRows)

    def displayQueues(self):
        disp = Display(prefix="  ")
        heads = []
        heads.append(Header("queue"))
        heads.append(Header("dur", Header.Y))
        heads.append(Header("autoDel", Header.Y))
        heads.append(Header("excl", Header.Y))
        heads.append(Header("msg", Header.KMG))
        heads.append(Header("msgIn", Header.KMG))
        heads.append(Header("msgOut", Header.KMG))
        heads.append(Header("bytes", Header.KMG))
        heads.append(Header("bytesIn", Header.KMG))
        heads.append(Header("bytesOut", Header.KMG))
        heads.append(Header("cons", Header.KMG))
        heads.append(Header("bind", Header.KMG))
        rows = []
        queues = self.broker.getAllQueues()
        for q in queues:
            row = []
            row.append(q.name)
            row.append(q.durable)
            row.append(q.autoDelete)
            row.append(q.exclusive)
            row.append(q.msgDepth)
            row.append(q.msgTotalEnqueues)
            row.append(q.msgTotalDequeues)
            row.append(q.byteDepth)
            row.append(q.byteTotalEnqueues)
            row.append(q.byteTotalDequeues)
            row.append(q.consumerCount)
            row.append(q.bindingCount)
            rows.append(row)
        title = "Queues"
        if config._sortcol:
            sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
            dispRows = sorter.getSorted()
        else:
            dispRows = rows
        disp.formattedTable(title, heads, dispRows)


    def displayQueue(self, name):
        queue = self.broker.getQueue(name)
        if not queue:
            print "Queue '%s' not found" % name
            return

        disp = Display(prefix="  ")
        heads = []
        heads.append(Header('Name'))
        heads.append(Header('Durable', Header.YN))
        heads.append(Header('AutoDelete', Header.YN))
        heads.append(Header('Exclusive', Header.YN))
        heads.append(Header('FlowStopped', Header.YN))
        heads.append(Header('FlowStoppedCount', Header.COMMAS))
        heads.append(Header('Consumers', Header.COMMAS))
        heads.append(Header('Bindings', Header.COMMAS))
        rows = []
        rows.append([queue.name, queue.durable, queue.autoDelete, queue.exclusive,
                     queue.flowStopped, queue.flowStoppedCount,
                     queue.consumerCount, queue.bindingCount])
        disp.formattedTable("Properties:", heads, rows)
        print

        heads = []
        heads.append(Header('Property'))
        heads.append(Header('Value'))
        rows = []
        rows.append(['arguments', queue.arguments])
        rows.append(['alt-exchange', queue.altExchange])
        disp.formattedTable("Optional Properties:", heads, rows)
        print

        heads = []
        heads.append(Header('Statistic'))
        heads.append(Header('Messages', Header.COMMAS))
        heads.append(Header('Bytes', Header.COMMAS))
        rows = []
        rows.append(['queue-depth',                queue.msgDepth,           queue.byteDepth])
        rows.append(['total-enqueues',             queue.msgTotalEnqueues,   queue.byteTotalEnqueues])
        rows.append(['total-dequeues',             queue.msgTotalDequeues,   queue.byteTotalDequeues])
        rows.append(['persistent-enqueues',        queue.msgPersistEnqueues, queue.bytePersistEnqueues])
        rows.append(['persistent-dequeues',        queue.msgPersistDequeues, queue.bytePersistDequeues])
        rows.append(['transactional-enqueues',     queue.msgTxnEnqueues,     queue.byteTxnEnqueues])
        rows.append(['transactional-dequeues',     queue.msgTxnDequeues,     queue.byteTxnDequeues])
        rows.append(['flow-to-disk-depth',         queue.msgFtdDepth,        queue.byteFtdDepth])
        rows.append(['flow-to-disk-enqueues',      queue.msgFtdEnqueues,     queue.byteFtdEnqueues])
        rows.append(['flow-to-disk-dequeues',      queue.msgFtdDequeues,     queue.byteFtdDequeues])
        rows.append(['acquires',                   queue.acquires,           None])
        rows.append(['releases',                   queue.releases,           None])
        rows.append(['discards-ttl-expired',       queue.discardsTtl,        None])
        rows.append(['discards-limit-overflow',    queue.discardsOverflow,   None])
        rows.append(['discards-ring-overflow',     queue.discardsRing,       None])
        rows.append(['discards-lvq-replace',       queue.discardsLvq,        None])
        rows.append(['discards-subscriber-reject', queue.discardsSubscriber, None])
        rows.append(['discards-purged',            queue.discardsPurge,      None])
        rows.append(['reroutes',                   queue.reroutes,           None])
        disp.formattedTable("Statistics:", heads, rows)


    def displaySubscriptions(self):
        disp = Display(prefix="  ")
        heads = []
        heads.append(Header("subscr"))
        heads.append(Header("queue"))
        heads.append(Header("conn"))
        heads.append(Header("procName"))
        heads.append(Header("procId"))
        heads.append(Header("browse", Header.Y))
        heads.append(Header("acked", Header.Y))
        heads.append(Header("excl", Header.Y))
        heads.append(Header("creditMode"))
        heads.append(Header("delivered", Header.COMMAS))
        heads.append(Header("sessUnacked", Header.COMMAS))
        rows = []
        subscriptions = self.broker.getAllSubscriptions()
        sessions = self.getSessionMap()
        connections = self.getConnectionMap()
        for s in subscriptions:
            row = []
            try:
                row.append(s.name)
                row.append(s.queueRef)
                session = sessions[s.sessionRef]
                connection = connections[session.connectionRef]
                row.append(connection.address)
                row.append(connection.remoteProcessName)
                row.append(connection.remotePid)
                row.append(s.browsing)
                row.append(s.acknowledged)
                row.append(s.exclusive)
                row.append(s.creditMode)
                row.append(s.delivered)
                row.append(session.unackedMessages)
                rows.append(row)
            except:
                pass
        title = "Subscriptions"
        if config._sortcol:
            sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
            dispRows = sorter.getSorted()
        else:
            dispRows = rows
        disp.formattedTable(title, heads, dispRows)

    def displayMemory(self):
        disp = Display(prefix="  ")
        heads = [Header('Statistic'), Header('Value', Header.COMMAS)]
        rows = []
        memory = self.broker.getMemory()
        for k,v in memory.values.items():
            if k != 'name':
                rows.append([k, v])
        disp.formattedTable('Broker Memory Statistics:', heads, rows)

    def displayAcl(self):
        acl = self.broker.getAcl()
        if not acl:
            print "ACL Policy Module is not installed"
            return
        disp = Display(prefix="  ")
        heads = [Header('Statistic'), Header('Value')]
        rows = []
        rows.append(['policy-file',       acl.policyFile])
        rows.append(['enforcing',         YN(acl.enforcingAcl)])
        rows.append(['has-transfer-acls', YN(acl.transferAcl)])
        rows.append(['last-acl-load',     TimeLong(acl.lastAclLoad)])
        rows.append(['acl-denials',       Commas(acl.aclDenyCount)])
        disp.formattedTable('ACL Policy Statistics:', heads, rows)

    def getExchangeMap(self):
        exchanges = self.broker.getAllExchanges()
        emap = {}
        for e in exchanges:
            emap[e.name] = e
        return emap

    def getQueueMap(self):
        queues = self.broker.getAllQueues()
        qmap = {}
        for q in queues:
            qmap[q.name] = q
        return qmap

    def getSessionMap(self):
        sessions = self.broker.getAllSessions()
        smap = {}
        for s in sessions:
            smap[s.name] = s
        return smap

    def getConnectionMap(self):
        connections = self.broker.getAllConnections()
        cmap = {}
        for c in connections:
            cmap[c.address] = c
        return cmap

    def displayMain(self, names, main):
        if   main == 'g': self.displayBroker()
        elif main == 'c': self.displayConn()
        elif main == 's': self.displaySession()
        elif main == 'e': self.displayExchange()
        elif main == 'q':
            if len(names) >= 1:
                self.displayQueue(names[0])
            else:
                self.displayQueues()
        elif main == 'u':   self.displaySubscriptions()
        elif main == 'm':   self.displayMemory()
        elif main == 'acl': self.displayAcl()

    def display(self, names):
        self.displayMain(names, config._types)


def main(argv=None):

    args = OptionsAndArguments(argv)
    bm   = BrokerManager()

    try:
        bm.SetBroker(config._host)
        bm.display(args)
        bm.Disconnect()
        return 0
    except KeyboardInterrupt:
        print
    except Exception,e:
        print "Failed: %s - %s" % (e.__class__.__name__, e)

    bm.Disconnect()   # try to deallocate brokers
    return 1

if __name__ == "__main__":
        sys.exit(main())
