blob: 83e31e979b8be049a28b8fd34813741ef80a4e37 [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sys
from qpid.testlib import TestBase010
from qpid.messaging import Connection
from threading import Thread
from time import sleep, time
from os import environ, popen
class QueueFlowLimitTests(TestBase010):
_timeout = 100
def __getattr__(self, name):
if name == "assertGreater":
return lambda a, b: self.failUnless(a > b)
else:
raise AttributeError
def _create_queue(self, name,
stop_count=None, resume_count=None,
stop_size=None, resume_size=None,
max_size=None, max_count=None):
""" Create a queue with the given flow settings via the queue.declare
command.
"""
args={}
if (stop_count is not None):
args["qpid.flow_stop_count"] = stop_count;
if (resume_count is not None):
args["qpid.flow_resume_count"] = resume_count;
if (stop_size is not None):
args["qpid.flow_stop_size"] = stop_size;
if (resume_size is not None):
args["qpid.flow_resume_size"] = resume_size;
if (max_size is not None):
args["qpid.max_size"] = max_size;
if (max_count is not None):
args["qpid.max_count"] = max_count;
broker = self.qmf.getObjects(_class="broker")[0]
rc = broker.create( "queue", name, args, True )
self.assertEqual(rc.status, 0, rc)
qs = self.qmf.getObjects(_class="queue")
for i in qs:
if i.name == name:
# verify flow settings
if (stop_count is not None):
self.assertEqual(i.arguments.get("qpid.flow_stop_count"), stop_count)
if (resume_count is not None):
self.assertEqual(i.arguments.get("qpid.flow_resume_count"), resume_count)
if (stop_size is not None):
self.assertEqual(i.arguments.get("qpid.flow_stop_size"), stop_size)
if (resume_size is not None):
self.assertEqual(i.arguments.get("qpid.flow_resume_size"), resume_size)
if (max_size is not None):
self.assertEqual(i.arguments.get("qpid.max_size"), max_size)
if (max_count is not None):
self.assertEqual(i.arguments.get("qpid.max_count"), max_count)
self.failIf(i.flowStopped)
return i.getObjectId()
self.fail("Unable to create queue '%s'" % name)
return None
def _delete_queue(self, name):
""" Delete a named queue
"""
broker = self.qmf.getObjects(_class="broker")[0]
rc = broker.delete( "queue", name, {} )
self.assertEqual(rc.status, 0, rc)
def _start_qpid_send(self, queue, count, content="X", capacity=100):
""" Use the qpid-send client to generate traffic to a queue.
"""
command = "qpid-send" + \
" -b" + " %s:%s" % (self.broker.host, self.broker.port) \
+ " -a " + str(queue) \
+ " --messages " + str(count) \
+ " --content-string " + str(content) \
+ " --capacity " + str(capacity)
return popen(command)
def _start_qpid_receive(self, queue, count, timeout=5):
""" Use the qpid-receive client to consume from a queue.
Note well: prints one line of text to stdout for each consumed msg.
"""
command = "qpid-receive" + \
" -b " + "%s:%s" % (self.broker.host, self.broker.port) \
+ " -a " + str(queue) \
+ " --messages " + str(count) \
+ " --timeout " + str(timeout) \
+ " --print-content yes"
return popen(command)
def test_qpid_config_cmd(self):
""" Test the qpid-config command's ability to configure a queue's flow
control thresholds.
"""
tool = environ.get("QPID_CONFIG_EXEC")
if tool:
command = tool + \
" --broker=%s:%s " % (self.broker.host, self.broker.port) \
+ "add queue test01 --flow-stop-count=999" \
+ " --flow-resume-count=55 --flow-stop-size=5000000" \
+ " --flow-resume-size=100000"
cmd = popen(command)
rc = cmd.close()
self.assertEqual(rc, None)
# now verify the settings
self.startQmf();
qs = self.qmf.getObjects(_class="queue")
for i in qs:
if i.name == "test01":
self.assertEqual(i.arguments.get("qpid.flow_stop_count"), 999)
self.assertEqual(i.arguments.get("qpid.flow_resume_count"), 55)
self.assertEqual(i.arguments.get("qpid.flow_stop_size"), 5000000)
self.assertEqual(i.arguments.get("qpid.flow_resume_size"), 100000)
self.failIf(i.flowStopped)
break;
self.assertEqual(i.name, "test01")
self._delete_queue("test01")
def test_flow_count(self):
""" Create a queue with count-based flow limit. Spawn several
producers which will exceed the limit. Verify limit exceeded. Consume
all messages. Verify flow control released.
"""
self.startQmf();
oid = self._create_queue("test-q", stop_count=373, resume_count=229)
self.assertEqual(self.qmf.getObjects(_objectId=oid)[0].flowStoppedCount, 0)
sndr1 = self._start_qpid_send("test-q", count=1213, content="XXX", capacity=50);
sndr2 = self._start_qpid_send("test-q", count=797, content="Y", capacity=13);
sndr3 = self._start_qpid_send("test-q", count=331, content="ZZZZZ", capacity=149);
totalMsgs = 1213 + 797 + 331
# wait until flow control is active
deadline = time() + self._timeout
while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \
time() < deadline:
pass
self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
self.assertGreater(depth, 373)
# now wait until the enqueues stop happening - ensure that
# not all msgs have been sent (senders are blocked)
sleep(1)
newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
while depth != newDepth:
depth = newDepth;
sleep(1)
newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
self.assertGreater(totalMsgs, depth)
# drain the queue
rcvr = self._start_qpid_receive("test-q",
count=totalMsgs)
count = 0;
x = rcvr.readline() # prints a line for each received msg
while x:
count += 1;
x = rcvr.readline()
sndr1.close();
sndr2.close();
sndr3.close();
rcvr.close();
self.assertEqual(count, totalMsgs)
self.failIf(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStoppedCount)
self._delete_queue("test-q")
def test_flow_size(self):
""" Create a queue with size-based flow limit. Spawn several
producers which will exceed the limit. Verify limit exceeded. Consume
all messages. Verify flow control released.
"""
self.startQmf();
oid = self._create_queue("test-q", stop_size=351133, resume_size=251143)
sndr1 = self._start_qpid_send("test-q", count=1699, content="X"*439, capacity=53);
sndr2 = self._start_qpid_send("test-q", count=1129, content="Y"*631, capacity=13);
sndr3 = self._start_qpid_send("test-q", count=881, content="Z"*823, capacity=149);
totalMsgs = 1699 + 1129 + 881
# wait until flow control is active
deadline = time() + self._timeout
while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \
time() < deadline:
pass
self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
self.assertGreater(self.qmf.getObjects(_objectId=oid)[0].byteDepth, 351133)
# now wait until the enqueues stop happening - ensure that
# not all msgs have been sent (senders are blocked)
depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
sleep(1)
newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
while depth != newDepth:
depth = newDepth;
sleep(1)
newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
self.assertGreater(totalMsgs, depth)
# drain the queue
rcvr = self._start_qpid_receive("test-q",
count=totalMsgs)
count = 0;
x = rcvr.readline() # prints a line for each received msg
while x:
count += 1;
x = rcvr.readline()
sndr1.close();
sndr2.close();
sndr3.close();
rcvr.close();
self.assertEqual(count, totalMsgs)
self.failIf(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
self._delete_queue("test-q")
def verify_limit(self, testq):
""" run a limit check against the testq object
"""
testq.mgmt = self.qmf.getObjects(_objectId=testq.oid)[0]
# fill up the queue, waiting until flow control is active
sndr1 = self._start_qpid_send(testq.mgmt.name, count=testq.sendCount, content=testq.content)
deadline = time() + self._timeout
while (not testq.mgmt.flowStopped) and time() < deadline:
testq.mgmt.update()
self.failUnless(testq.verifyStopped())
# now consume enough messages to drop below the flow resume point, and
# verify flow control is released.
rcvr = self._start_qpid_receive(testq.mgmt.name, count=testq.consumeCount)
rcvr.readlines() # prints a line for each received msg
rcvr.close();
# we should now be below the resume threshold
self.failUnless(testq.verifyResumed())
self._delete_queue(testq.mgmt.name)
sndr1.close();
def test_default_flow_count(self):
""" Create a queue with count-based size limit, and verify the computed
thresholds using the broker's default ratios.
"""
class TestQ:
def __init__(self, oid):
# Use the broker-wide default flow thresholds of 80%/70% (see
# run_queue_flow_limit_tests) to base the thresholds off the
# queue's max_count configuration parameter
# max_count == 1000 -> stop == 800, resume == 700
self.oid = oid
self.sendCount = 1000
self.consumeCount = 301 # (send - resume) + 1 to reenable flow
self.content = "X"
self.mgmt = None
def verifyStopped(self):
self.mgmt.update()
return self.mgmt.flowStopped and (self.mgmt.msgDepth > 800)
def verifyResumed(self):
self.mgmt.update()
return (not self.mgmt.flowStopped) and (self.mgmt.msgDepth < 700)
self.startQmf();
oid = self._create_queue("test-X", max_count=1000)
self.verify_limit(TestQ(oid))
def test_default_flow_size(self):
""" Create a queue with byte-based size limit, and verify the computed
thresholds using the broker's default ratios.
"""
class TestQ:
def __init__(self, oid):
# Use the broker-wide default flow thresholds of 80%/70% (see
# run_queue_flow_limit_tests) to base the thresholds off the
# queue's max_count configuration parameter
# max_size == 10000 -> stop == 8000 bytes, resume == 7000 bytes
self.oid = oid
self.sendCount = 2000
self.consumeCount = 601 # (send - resume) + 1 to reenable flow
self.content = "XXXXX" # 5 bytes per message sent.
self.mgmt = None
def verifyStopped(self):
self.mgmt.update()
return self.mgmt.flowStopped and (self.mgmt.byteDepth > 8000)
def verifyResumed(self):
self.mgmt.update()
return (not self.mgmt.flowStopped) and (self.mgmt.byteDepth < 7000)
self.startQmf();
oid = self._create_queue("test-Y", max_size=10000)
self.verify_limit(TestQ(oid))
def test_blocked_queue_delete(self):
""" Verify that blocked senders are unblocked when a queue that is flow
controlled is deleted.
"""
class BlockedSender(Thread):
def __init__(self, tester, queue, count, capacity=10):
self.tester = tester
self.queue = queue
self.count = count
self.capacity = capacity
Thread.__init__(self)
self.done = False
self.start()
def run(self):
# spawn qpid-send
p = self.tester._start_qpid_send(self.queue,
self.count,
self.capacity)
p.close() # waits for qpid-send to complete
self.done = True
self.startQmf();
oid = self._create_queue("kill-q", stop_size=10, resume_size=2)
q = self.qmf.getObjects(_objectId=oid)[0]
self.failIf(q.flowStopped)
sender = BlockedSender(self, "kill-q", count=100)
# wait for flow control
deadline = time() + self._timeout
while (not q.flowStopped) and time() < deadline:
q.update()
self.failUnless(q.flowStopped)
self.failIf(sender.done) # sender blocked
self._delete_queue("kill-q")
sender.join(5)
self.failIf(sender.isAlive())
self.failUnless(sender.done)