blob: 4f3931b78bedfa3c931018d2d272240302a4fe6d [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from qpid.tests.messaging.implementation import *
from qpid.tests.messaging import Base
from time import sleep
from qpidtoollibs.broker import BrokerAgent
#
# Tests the Broker's statistics reporting
#
class BrokerStatsTests(Base):
"""
Tests of the broker's statistics
"""
def assertEqual(self, left, right, text=None):
if not left == right:
print "assertEqual failure: %r != %r" % (left, right)
if text:
print " %r" % text
assert None
def failUnless(self, value, text=None):
if value:
return
print "failUnless failure",
if text:
print ": %r" % text
else:
print
assert None
def fail(self, text=None):
if text:
print "Fail: %r" % text
assert None
def setup_connection(self):
return Connection.establish(self.broker, **self.connection_options())
def setup_session(self, tx=False):
return self.conn.session(transactional=tx)
def setup_access(self):
return BrokerAgent(self.conn)
def test_exchange_stats(self):
agent = self.setup_access()
start_broker = agent.getBroker()
agent.addExchange("direct", "stats-test-exchange")
try:
sess = self.setup_session()
tx_a = sess.sender("stats-test-exchange/a")
tx_b = sess.sender("stats-test-exchange/b")
rx_a = sess.receiver("stats-test-exchange/a")
exchange = agent.getExchange("stats-test-exchange")
self.failUnless(exchange, "expected a valid exchange object")
self.assertEqual(exchange.msgReceives, 0, "msgReceives")
self.assertEqual(exchange.msgDrops, 0, "msgDrops")
self.assertEqual(exchange.msgRoutes, 0, "msgRoutes")
self.assertEqual(exchange.byteReceives, 0, "byteReceives")
self.assertEqual(exchange.byteDrops, 0, "byteDrops")
self.assertEqual(exchange.byteRoutes, 0, "byteRoutes")
tx_a.send("0123456789")
tx_b.send("01234567890123456789")
tx_a.send("012345678901234567890123456789")
tx_b.send("0123456789012345678901234567890123456789")
overhead = 63 #overhead added to message from headers
exchange.update()
self.assertEqual(exchange.msgReceives, 4, "msgReceives")
self.assertEqual(exchange.msgDrops, 2, "msgDrops")
self.assertEqual(exchange.msgRoutes, 2, "msgRoutes")
self.assertEqual(exchange.byteReceives, 100+(4*overhead), "byteReceives")
self.assertEqual(exchange.byteDrops, 60+(2*overhead), "byteDrops")
self.assertEqual(exchange.byteRoutes, 40+(2*overhead), "byteRoutes")
finally:
agent.delExchange("stats-test-exchange")
def test_enqueues_dequeues(self):
agent = self.setup_access()
start_broker = agent.getBroker()
sess = self.setup_session()
tx = sess.sender("enqueue_test;{create:always,delete:always}")
rx = sess.receiver("enqueue_test")
queue = agent.getQueue("enqueue_test")
self.failUnless(queue, "expected a valid queue object")
self.assertEqual(queue.msgTotalEnqueues, 0, "msgTotalEnqueues")
self.assertEqual(queue.byteTotalEnqueues, 0, "byteTotalEnqueues")
self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues")
self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues")
self.assertEqual(queue.msgDepth, 0, "msgDepth")
self.assertEqual(queue.byteDepth, 0, "byteDepth")
tx.send("0123456789")
tx.send("01234567890123456789")
tx.send("012345678901234567890123456789")
tx.send("0123456789012345678901234567890123456789")
overhead = 38 #overhead added to message from headers
queue.update()
self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues")
self.assertEqual(queue.byteTotalEnqueues, 100+(4*overhead), "byteTotalEnqueues")
self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues")
self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues")
self.assertEqual(queue.msgDepth, 4, "msgDepth")
self.assertEqual(queue.byteDepth, 100+(4*overhead), "byteDepth")
now_broker = agent.getBroker()
self.failUnless((now_broker.msgTotalEnqueues - start_broker.msgTotalEnqueues) >= 4, "broker msgTotalEnqueues")
self.failUnless((now_broker.byteTotalEnqueues - start_broker.byteTotalEnqueues) >= 100, "broker byteTotalEnqueues")
m = rx.fetch()
m = rx.fetch()
sess.acknowledge()
queue.update()
self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues")
self.assertEqual(queue.byteTotalEnqueues, 100+(4*overhead), "byteTotalEnqueues")
self.assertEqual(queue.msgTotalDequeues, 2, "msgTotalDequeues")
self.assertEqual(queue.byteTotalDequeues, 30+(2*overhead), "byteTotalDequeues")
self.assertEqual(queue.msgDepth, 2, "msgDepth")
self.assertEqual(queue.byteDepth, 70+(2*overhead), "byteDepth")
now_broker = agent.getBroker()
self.failUnless((now_broker.msgTotalDequeues - start_broker.msgTotalDequeues) >= 2, "broker msgTotalDequeues")
self.failUnless((now_broker.byteTotalDequeues - start_broker.byteTotalDequeues) >= 30, "broker byteTotalDequeues")
sess.close()
now_broker = agent.getBroker()
self.assertEqual(now_broker.abandoned - start_broker.abandoned, 2, "expect 2 abandoned messages")
self.assertEqual(now_broker.msgDepth, start_broker.msgDepth, "expect broker message depth to be unchanged")
self.assertEqual(now_broker.byteDepth, start_broker.byteDepth, "expect broker byte depth to be unchanged")
def test_transactional_enqueues_dequeues(self):
agent = self.setup_access()
start_broker = agent.getBroker()
sess = self.setup_session(True)
tx = sess.sender("tx_enqueue_test;{create:always,delete:always}")
tx.send("0123456789")
tx.send("0123456789")
tx.send("0123456789")
tx.send("0123456789")
overhead = 41 #overhead added to message from headers
queue = agent.getQueue("tx_enqueue_test")
self.failUnless(queue, "expected a valid queue object")
self.assertEqual(queue.msgTotalEnqueues, 0, "msgTotalEnqueues pre-tx-commit")
self.assertEqual(queue.byteTotalEnqueues, 0, "byteTotalEnqueues pre-tx-commit")
self.assertEqual(queue.msgTxnEnqueues, 0, "msgTxnEnqueues pre-tx-commit")
self.assertEqual(queue.byteTxnEnqueues, 0, "byteTxnEnqueues pre-tx-commit")
self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues pre-tx-commit")
self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues pre-tx-commit")
self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues pre-tx-commit")
self.assertEqual(queue.byteTxnDequeues, 0, "byteTxnDequeues pre-tx-commit")
sess.commit()
queue.update()
self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues post-tx-commit")
self.assertEqual(queue.byteTotalEnqueues, 40+(4*overhead), "byteTotalEnqueues post-tx-commit")
self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues post-tx-commit")
self.assertEqual(queue.byteTxnEnqueues, 40+(4*overhead), "byteTxnEnqueues post-tx-commit")
self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues post-tx-commit")
self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues post-tx-commit")
self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues post-tx-commit")
self.assertEqual(queue.byteTxnDequeues, 0, "byteTxnDequeues post-tx-commit")
sess2 = self.setup_session(True)
rx = sess2.receiver("tx_enqueue_test")
m = rx.fetch()
m = rx.fetch()
m = rx.fetch()
m = rx.fetch()
queue.update()
self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues pre-rx-commit")
self.assertEqual(queue.byteTotalEnqueues, 40+(4*overhead), "byteTotalEnqueues pre-rx-commit")
self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues pre-rx-commit")
self.assertEqual(queue.byteTxnEnqueues, 40+(4*overhead), "byteTxnEnqueues pre-rx-commit")
self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues pre-rx-commit")
self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues pre-rx-commit")
self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues pre-rx-commit")
self.assertEqual(queue.byteTxnDequeues, 0, "byteTxnDequeues pre-rx-commit")
sess2.acknowledge()
sess2.commit()
queue.update()
self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues post-rx-commit")
self.assertEqual(queue.byteTotalEnqueues, 40+(4*overhead), "byteTotalEnqueues post-rx-commit")
self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues post-rx-commit")
self.assertEqual(queue.byteTxnEnqueues, 40+(4*overhead), "byteTxnEnqueues post-rx-commit")
self.assertEqual(queue.msgTotalDequeues, 4, "msgTotalDequeues post-rx-commit")
self.assertEqual(queue.byteTotalDequeues, 40+(4*overhead), "byteTotalDequeues post-rx-commit")
self.assertEqual(queue.msgTxnDequeues, 4, "msgTxnDequeues post-rx-commit")
self.assertEqual(queue.byteTxnDequeues, 40+(4*overhead), "byteTxnDequeues post-rx-commit")
sess.close()
sess2.close()
now_broker = agent.getBroker()
self.assertEqual(now_broker.msgTxnEnqueues - start_broker.msgTxnEnqueues, 4, "broker msgTxnEnqueues")
self.assertEqual(now_broker.byteTxnEnqueues - start_broker.byteTxnEnqueues, 40+(4*overhead), "broker byteTxnEnqueues")
self.assertEqual(now_broker.msgTxnDequeues - start_broker.msgTxnDequeues, 4, "broker msgTxnDequeues")
self.assertEqual(now_broker.byteTxnDequeues - start_broker.byteTxnDequeues, 40+(4*overhead), "broker byteTxnDequeues")
def test_discards_no_route(self):
agent = self.setup_access()
start_broker = agent.getBroker()
sess = self.setup_session()
tx = sess.sender("amq.topic/non.existing.key")
tx.send("NO_ROUTE")
tx.send("NO_ROUTE")
tx.send("NO_ROUTE")
tx.send("NO_ROUTE")
tx.send("NO_ROUTE")
now_broker = agent.getBroker()
self.failUnless((now_broker.discardsNoRoute - start_broker.discardsNoRoute) >= 5, "Expect at least 5 no-routes")
sess.close()
def test_abandoned_alt(self):
agent = self.setup_access()
start_broker = agent.getBroker()
sess = self.setup_session()
tx = sess.sender("abandon_alt;{create:always,delete:always,node:{x-declare:{alternate-exchange:'amq.fanout'}}}")
rx = sess.receiver("abandon_alt")
rx.capacity = 2
tx.send("ABANDON_ALT")
tx.send("ABANDON_ALT")
tx.send("ABANDON_ALT")
tx.send("ABANDON_ALT")
tx.send("ABANDON_ALT")
rx.fetch()
sess.close()
now_broker = agent.getBroker()
self.assertEqual(now_broker.abandonedViaAlt - start_broker.abandonedViaAlt, 5, "Expect 5 abandonedViaAlt")
self.assertEqual(now_broker.abandoned - start_broker.abandoned, 0, "Expect 0 abandoned")
def test_discards_ttl(self):
agent = self.setup_access()
start_broker = agent.getBroker()
sess = self.setup_session()
tx = sess.sender("discards_ttl;{create:always,delete:always}")
msg = Message("TTL")
msg.ttl = 1
tx.send(msg)
tx.send(msg)
tx.send(msg)
tx.send(msg)
tx.send(msg)
tx.send(msg)
sleep(2)
rx = sess.receiver("discards_ttl")
try:
rx.fetch(0)
except:
pass
now_broker = agent.getBroker()
queue = agent.getQueue("discards_ttl")
self.failUnless(queue, "expected a valid queue object")
self.assertEqual(queue.discardsTtl, 6, "expect 6 TTL discards on queue")
self.assertEqual(now_broker.discardsTtl - start_broker.discardsTtl, 6, "expect 6 TTL discards on broker")
self.assertEqual(queue.msgTotalDequeues, 6, "expect 6 total dequeues on queue")
sess.close()
def test_discards_limit_overflow(self):
agent = self.setup_access()
start_broker = agent.getBroker()
sess = self.setup_session()
tx = sess.sender("discards_limit;{create:always,node:{x-declare:{arguments:{'qpid.max_count':3,'qpid.flow_stop_count':0}}}}")
tx.send("LIMIT")
tx.send("LIMIT")
tx.send("LIMIT")
try:
tx.send("LIMIT")
self.fail("expected to fail sending 4th message")
except:
pass
now_broker = agent.getBroker()
queue = agent.getQueue("discards_limit")
self.failUnless(queue, "expected a valid queue object")
self.assertEqual(queue.discardsOverflow, 1, "expect 1 overflow discard on queue")
self.assertEqual(now_broker.discardsOverflow - start_broker.discardsOverflow, 1, "expect 1 overflow discard on broker")
##
## Shut down and restart the connection to clear the error condition.
##
try:
self.conn.close(timeout=.1)
except:
pass
self.conn = self.setup_connection()
##
## Re-create the session to delete the queue.
##
sess = self.setup_session()
tx = sess.sender("discards_limit;{create:always,delete:always}")
sess.close()
def test_discards_ring_overflow(self):
agent = self.setup_access()
start_broker = agent.getBroker()
sess = self.setup_session()
tx = sess.sender("discards_ring;{create:always,delete:always,node:{x-declare:{arguments:{'qpid.max_count':3,'qpid.flow_stop_count':0,'qpid.policy_type':ring}}}}")
tx.send("RING")
tx.send("RING")
tx.send("RING")
tx.send("RING")
tx.send("RING")
now_broker = agent.getBroker()
queue = agent.getQueue("discards_ring")
self.failUnless(queue, "expected a valid queue object")
self.assertEqual(queue.discardsRing, 2, "expect 2 ring discards on queue")
self.assertEqual(now_broker.discardsRing - start_broker.discardsRing, 2, "expect 2 ring discards on broker")
self.assertEqual(queue.msgTotalDequeues, 2, "expect 2 total dequeues on queue")
sess.close()
def test_discards_lvq_replace(self):
agent = self.setup_access()
start_broker = agent.getBroker()
sess = self.setup_session()
tx = sess.sender("discards_lvq;{create:always,delete:always,node:{x-declare:{arguments:{'qpid.max_count':3,'qpid.flow_stop_count':0,'qpid.last_value_queue_key':key}}}}")
msgA = Message("LVQ_A")
msgA.properties['key'] = 'AAA'
msgB = Message("LVQ_B")
msgB.properties['key'] = 'BBB'
tx.send(msgA)
tx.send(msgB)
tx.send(msgA)
tx.send(msgA)
tx.send(msgB)
now_broker = agent.getBroker()
queue = agent.getQueue("discards_lvq")
self.failUnless(queue, "expected a valid queue object")
self.assertEqual(queue.discardsLvq, 3, "expect 3 lvq discards on queue")
self.assertEqual(now_broker.discardsLvq - start_broker.discardsLvq, 3, "expect 3 lvq discards on broker")
self.assertEqual(queue.msgTotalDequeues, 3, "expect 3 total dequeues on queue")
sess.close()
def test_discards_reject(self):
agent = self.setup_access()
start_broker = agent.getBroker()
sess = self.setup_session()
tx = sess.sender("discards_reject;{create:always,delete:always}")
tx.send("REJECT")
tx.send("REJECT")
tx.send("REJECT")
rx = sess.receiver("discards_reject")
m = rx.fetch()
sess.acknowledge()
m1 = rx.fetch()
m2 = rx.fetch()
sess.acknowledge(m1, Disposition(REJECTED))
sess.acknowledge(m2, Disposition(REJECTED))
now_broker = agent.getBroker()
queue = agent.getQueue("discards_reject")
self.failUnless(queue, "expected a valid queue object")
self.assertEqual(queue.discardsSubscriber, 2, "expect 2 reject discards on queue")
self.assertEqual(now_broker.discardsSubscriber - start_broker.discardsSubscriber, 2, "expect 2 reject discards on broker")
self.assertEqual(queue.msgTotalDequeues, 3, "expect 3 total dequeues on queue")
sess.close()
def test_message_release(self):
agent = self.setup_access()
start_broker = agent.getBroker()
sess = self.setup_session()
tx = sess.sender("message_release;{create:always,delete:always}")
tx.send("RELEASE")
tx.send("RELEASE")
tx.send("RELEASE")
tx.send("RELEASE")
tx.send("RELEASE")
rx = sess.receiver("message_release")
m1 = rx.fetch()
m2 = rx.fetch()
sess.acknowledge(m1, Disposition(RELEASED))
sess.acknowledge(m2, Disposition(RELEASED))
now_broker = agent.getBroker()
queue = agent.getQueue("message_release")
self.failUnless(queue, "expected a valid queue object")
self.assertEqual(queue.acquires, 2, "expect 2 acquires on queue")
self.failUnless(now_broker.acquires - start_broker.acquires >= 2, "expect at least 2 acquires on broker")
self.assertEqual(queue.msgTotalDequeues, 0, "expect 0 total dequeues on queue")
self.assertEqual(queue.releases, 2, "expect 2 releases on queue")
self.failUnless(now_broker.releases - start_broker.releases >= 2, "expect at least 2 releases on broker")
sess.close()
def test_discards_purge(self):
agent = self.setup_access()
start_broker = agent.getBroker()
sess = self.setup_session()
tx = sess.sender("discards_purge;{create:always,delete:always}")
tx.send("PURGE")
tx.send("PURGE")
tx.send("PURGE")
tx.send("PURGE")
tx.send("PURGE")
queue = agent.getQueue("discards_purge")
self.failUnless(queue, "expected a valid queue object")
queue.purge(3)
queue.update()
now_broker = agent.getBroker()
self.assertEqual(queue.discardsPurge, 3, "expect 3 purge discards on queue")
self.assertEqual(now_broker.discardsPurge - start_broker.discardsPurge, 3, "expect 3 purge discards on broker")
self.assertEqual(queue.msgTotalDequeues, 3, "expect 3 total dequeues on queue")
sess.close()
def test_reroutes(self):
agent = self.setup_access()
start_broker = agent.getBroker()
sess = self.setup_session()
tx = sess.sender("reroute;{create:always,delete:always}")
tx.send("REROUTE")
tx.send("REROUTE")
tx.send("REROUTE")
tx.send("REROUTE")
tx.send("REROUTE")
tx.send("REROUTE")
tx.send("REROUTE")
tx.send("REROUTE")
queue = agent.getQueue("reroute")
self.failUnless(queue, "expected a valid queue object")
queue.reroute(5, False, 'amq.fanout')
queue.update()
now_broker = agent.getBroker()
self.assertEqual(queue.reroutes, 5, "expect 5 reroutes on queue")
self.assertEqual(now_broker.reroutes - start_broker.reroutes, 5, "expect 5 reroutes on broker")
self.assertEqual(queue.msgTotalDequeues, 5, "expect 5 total dequeues on queue")
sess.close()