blob: dec7cfb3af945799ab8c43f9f974326f53a62140 [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sys
from qpid.testlib import TestBase010
from qpid import datatypes, messaging
from qpid.messaging import Message, Empty
from threading import Thread, Lock
from logging import getLogger
from time import sleep, time
from os import environ, popen
class QueueFlowLimitTests(TestBase010):
def __getattr__(self, name):
if name == "assertGreater":
return lambda a, b: self.failUnless(a > b)
else:
raise AttributeError
def _create_queue(self, name,
stop_count=None, resume_count=None,
stop_size=None, resume_size=None,
max_size=None, max_count=None):
""" Create a queue with the given flow settings via the queue.declare
command.
"""
args={}
if (stop_count is not None):
args["qpid.flow_stop_count"] = stop_count;
if (resume_count is not None):
args["qpid.flow_resume_count"] = resume_count;
if (stop_size is not None):
args["qpid.flow_stop_size"] = stop_size;
if (resume_size is not None):
args["qpid.flow_resume_size"] = resume_size;
if (max_size is not None):
args["qpid.max_size"] = max_size;
if (max_count is not None):
args["qpid.max_count"] = max_count;
self.session.queue_declare(queue=name, arguments=args)
qs = self.qmf.getObjects(_class="queue")
for i in qs:
if i.name == name:
# verify flow settings
if (stop_count is not None):
self.assertEqual(i.arguments.get("qpid.flow_stop_count"), stop_count)
if (resume_count is not None):
self.assertEqual(i.arguments.get("qpid.flow_resume_count"), resume_count)
if (stop_size is not None):
self.assertEqual(i.arguments.get("qpid.flow_stop_size"), stop_size)
if (resume_size is not None):
self.assertEqual(i.arguments.get("qpid.flow_resume_size"), resume_size)
if (max_size is not None):
self.assertEqual(i.arguments.get("qpid.max_size"), max_size)
if (max_count is not None):
self.assertEqual(i.arguments.get("qpid.max_count"), max_count)
self.failIf(i.flowStopped)
return i.getObjectId()
self.fail("Unable to create queue '%s'" % name)
return None
def _delete_queue(self, name):
""" Delete a named queue
"""
self.session.queue_delete(queue=name)
def _start_qpid_send(self, queue, count, content="X", capacity=100):
""" Use the qpid-send client to generate traffic to a queue.
"""
command = "qpid-send" + \
" -b" + " %s:%s" % (self.broker.host, self.broker.port) \
+ " -a " + str(queue) \
+ " --messages " + str(count) \
+ " --content-string " + str(content) \
+ " --capacity " + str(capacity)
return popen(command)
def _start_qpid_receive(self, queue, count, timeout=5):
""" Use the qpid-receive client to consume from a queue.
Note well: prints one line of text to stdout for each consumed msg.
"""
command = "qpid-receive" + \
" -b " + "%s:%s" % (self.broker.host, self.broker.port) \
+ " -a " + str(queue) \
+ " --messages " + str(count) \
+ " --timeout " + str(timeout) \
+ " --print-content yes"
return popen(command)
def test_qpid_config_cmd(self):
""" Test the qpid-config command's ability to configure a queue's flow
control thresholds.
"""
tool = environ.get("QPID_CONFIG_EXEC")
if tool:
command = tool + \
" --broker-addr=%s:%s " % (self.broker.host, self.broker.port) \
+ "add queue test01 --flow-stop-count=999" \
+ " --flow-resume-count=55 --flow-stop-size=5000000" \
+ " --flow-resume-size=100000"
cmd = popen(command)
rc = cmd.close()
self.assertEqual(rc, None)
# now verify the settings
self.startQmf();
qs = self.qmf.getObjects(_class="queue")
for i in qs:
if i.name == "test01":
self.assertEqual(i.arguments.get("qpid.flow_stop_count"), 999)
self.assertEqual(i.arguments.get("qpid.flow_resume_count"), 55)
self.assertEqual(i.arguments.get("qpid.flow_stop_size"), 5000000)
self.assertEqual(i.arguments.get("qpid.flow_resume_size"), 100000)
self.failIf(i.flowStopped)
break;
self.assertEqual(i.name, "test01")
self._delete_queue("test01")
def test_flow_count(self):
""" Create a queue with count-based flow limit. Spawn several
producers which will exceed the limit. Verify limit exceeded. Consume
all messages. Verify flow control released.
"""
self.startQmf();
oid = self._create_queue("test-q", stop_count=373, resume_count=229)
self.assertEqual(self.qmf.getObjects(_objectId=oid)[0].flowStoppedCount, 0)
sndr1 = self._start_qpid_send("test-q", count=1213, content="XXX", capacity=50);
sndr2 = self._start_qpid_send("test-q", count=797, content="Y", capacity=13);
sndr3 = self._start_qpid_send("test-q", count=331, content="ZZZZZ", capacity=149);
totalMsgs = 1213 + 797 + 331
# wait until flow control is active
deadline = time() + 10
while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \
time() < deadline:
pass
self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
self.assertGreater(depth, 373)
# now wait until the enqueues stop happening - ensure that
# not all msgs have been sent (senders are blocked)
sleep(1)
newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
while depth != newDepth:
depth = newDepth;
sleep(1)
newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
self.assertGreater(totalMsgs, depth)
# drain the queue
rcvr = self._start_qpid_receive("test-q",
count=totalMsgs)
count = 0;
x = rcvr.readline() # prints a line for each received msg
while x:
count += 1;
x = rcvr.readline()
sndr1.close();
sndr2.close();
sndr3.close();
rcvr.close();
self.assertEqual(count, totalMsgs)
self.failIf(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStoppedCount)
self._delete_queue("test-q")
def test_flow_size(self):
""" Create a queue with size-based flow limit. Spawn several
producers which will exceed the limit. Verify limit exceeded. Consume
all messages. Verify flow control released.
"""
self.startQmf();
oid = self._create_queue("test-q", stop_size=351133, resume_size=251143)
sndr1 = self._start_qpid_send("test-q", count=1699, content="X"*439, capacity=53);
sndr2 = self._start_qpid_send("test-q", count=1129, content="Y"*631, capacity=13);
sndr3 = self._start_qpid_send("test-q", count=881, content="Z"*823, capacity=149);
totalMsgs = 1699 + 1129 + 881
# wait until flow control is active
deadline = time() + 10
while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \
time() < deadline:
pass
self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
self.assertGreater(self.qmf.getObjects(_objectId=oid)[0].byteDepth, 351133)
# now wait until the enqueues stop happening - ensure that
# not all msgs have been sent (senders are blocked)
depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
sleep(1)
newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
while depth != newDepth:
depth = newDepth;
sleep(1)
newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
self.assertGreater(totalMsgs, depth)
# drain the queue
rcvr = self._start_qpid_receive("test-q",
count=totalMsgs)
count = 0;
x = rcvr.readline() # prints a line for each received msg
while x:
count += 1;
x = rcvr.readline()
sndr1.close();
sndr2.close();
sndr3.close();
rcvr.close();
self.assertEqual(count, totalMsgs)
self.failIf(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
self._delete_queue("test-q")
def verify_limit(self, testq):
""" run a limit check against the testq object
"""
testq.mgmt = self.qmf.getObjects(_objectId=testq.oid)[0]
# fill up the queue, waiting until flow control is active
sndr1 = self._start_qpid_send(testq.mgmt.name, count=testq.sendCount, content=testq.content)
deadline = time() + 10
while (not testq.mgmt.flowStopped) and time() < deadline:
testq.mgmt.update()
self.failUnless(testq.verifyStopped())
# now consume enough messages to drop below the flow resume point, and
# verify flow control is released.
rcvr = self._start_qpid_receive(testq.mgmt.name, count=testq.consumeCount)
rcvr.readlines() # prints a line for each received msg
rcvr.close();
# we should now be below the resume threshold
self.failUnless(testq.verifyResumed())
self._delete_queue(testq.mgmt.name)
sndr1.close();
def test_default_flow_count(self):
""" Create a queue with count-based size limit, and verify the computed
thresholds using the broker's default ratios.
"""
class TestQ:
def __init__(self, oid):
# Use the broker-wide default flow thresholds of 80%/70% (see
# run_queue_flow_limit_tests) to base the thresholds off the
# queue's max_count configuration parameter
# max_count == 1000 -> stop == 800, resume == 700
self.oid = oid
self.sendCount = 1000
self.consumeCount = 301 # (send - resume) + 1 to reenable flow
self.content = "X"
def verifyStopped(self):
self.mgmt.update()
return self.mgmt.flowStopped and (self.mgmt.msgDepth > 800)
def verifyResumed(self):
self.mgmt.update()
return (not self.mgmt.flowStopped) and (self.mgmt.msgDepth < 700)
self.startQmf();
oid = self._create_queue("test-X", max_count=1000)
self.verify_limit(TestQ(oid))
def test_default_flow_size(self):
""" Create a queue with byte-based size limit, and verify the computed
thresholds using the broker's default ratios.
"""
class TestQ:
def __init__(self, oid):
# Use the broker-wide default flow thresholds of 80%/70% (see
# run_queue_flow_limit_tests) to base the thresholds off the
# queue's max_count configuration parameter
# max_size == 10000 -> stop == 8000 bytes, resume == 7000 bytes
self.oid = oid
self.sendCount = 2000
self.consumeCount = 601 # (send - resume) + 1 to reenable flow
self.content = "XXXXX" # 5 bytes per message sent.
def verifyStopped(self):
self.mgmt.update()
return self.mgmt.flowStopped and (self.mgmt.byteDepth > 8000)
def verifyResumed(self):
self.mgmt.update()
return (not self.mgmt.flowStopped) and (self.mgmt.byteDepth < 7000)
self.startQmf();
oid = self._create_queue("test-Y", max_size=10000)
self.verify_limit(TestQ(oid))
def test_blocked_queue_delete(self):
""" Verify that blocked senders are unblocked when a queue that is flow
controlled is deleted.
"""
class BlockedSender(Thread):
def __init__(self, tester, queue, count, capacity=10):
self.tester = tester
self.queue = queue
self.count = count
self.capacity = capacity
Thread.__init__(self)
self.done = False
self.start()
def run(self):
# spawn qpid-send
p = self.tester._start_qpid_send(self.queue,
self.count,
self.capacity)
p.close() # waits for qpid-send to complete
self.done = True
self.startQmf();
oid = self._create_queue("kill-q", stop_size=10, resume_size=2)
q = self.qmf.getObjects(_objectId=oid)[0]
self.failIf(q.flowStopped)
sender = BlockedSender(self, "kill-q", count=100)
# wait for flow control
deadline = time() + 10
while (not q.flowStopped) and time() < deadline:
q.update()
self.failUnless(q.flowStopped)
self.failIf(sender.done) # sender blocked
self._delete_queue("kill-q")
sender.join(5)
self.failIf(sender.isAlive())
self.failUnless(sender.done)