blob: 132bd7b9877ef22604defd1da9c6a18fafa29c4c [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.testlib import TestBase010
from qpid.datatypes import Message, RangedSet
from qpid.session import SessionException
class QueueTests(TestBase010):
"""Tests for 'methods' on the amqp queue 'class'"""
def test_purge(self):
"""
Test that the purge method removes messages from the queue
"""
session = self.session
#setup, declare a queue and add some messages to it:
session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "one"))
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "two"))
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "three"))
#check that the queue now reports 3 messages:
session.queue_declare(queue="test-queue")
reply = session.queue_query(queue="test-queue")
self.assertEqual(3, reply.message_count)
#now do the purge, then test that three messages are purged and the count drops to 0
session.queue_purge(queue="test-queue");
reply = session.queue_query(queue="test-queue")
self.assertEqual(0, reply.message_count)
#send a further message and consume it, ensuring that the other messages are really gone
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "four"))
session.message_subscribe(queue="test-queue", destination="tag")
session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFFL)
session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
queue = session.incoming("tag")
msg = queue.get(timeout=1)
self.assertEqual("four", msg.body)
def test_purge_queue_exists(self):
"""
Test that the correct exception is thrown is no queue exists
for the name specified in purge
"""
session = self.session
try:
#queue specified but doesn't exist:
session.queue_purge(queue="invalid-queue")
self.fail("Expected failure when purging non-existent queue")
except SessionException, e:
self.assertEquals(404, e.args[0].error_code) #not-found
def test_purge_empty_name(self):
"""
Test that the correct exception is thrown is no queue name
is specified for purge
"""
session = self.session
try:
#queue not specified and none previously declared for channel:
session.queue_purge()
self.fail("Expected failure when purging unspecified queue")
except SessionException, e:
self.assertEquals(531, e.args[0].error_code) #illegal-argument
def test_declare_exclusive(self):
"""
Test that the exclusive field is honoured in queue.declare
"""
# TestBase.setUp has already opened session(1)
s1 = self.session
# Here we open a second separate connection:
s2 = self.conn.session("other")
#declare an exclusive queue:
s1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
s1.exchange_bind(exchange="amq.fanout", queue="exclusive-queue")
try:
#other connection should not be allowed to declare this:
s2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
self.fail("Expected second exclusive queue_declare to raise a channel exception")
except SessionException, e:
self.assertEquals(405, e.args[0].error_code)
s3 = self.conn.session("subscriber")
try:
#other connection should not be allowed to declare this:
s3.message_subscribe(queue="exclusive-queue")
self.fail("Expected message_subscribe on an exclusive queue to raise a channel exception")
except SessionException, e:
self.assertEquals(405, e.args[0].error_code)
s4 = self.conn.session("deleter")
try:
#other connection should not be allowed to declare this:
s4.queue_delete(queue="exclusive-queue")
self.fail("Expected queue_delete on an exclusive queue to raise a channel exception")
except SessionException, e:
self.assertEquals(405, e.args[0].error_code)
s5 = self.conn.session("binder")
try:
#other connection should not be allowed to declare this:
s5.exchange_bind(exchange="amq.direct", queue="exclusive-queue", binding_key="abc")
self.fail("Expected exchange_bind on an exclusive queue to raise an exception")
except SessionException, e:
self.assertEquals(405, e.args[0].error_code)
s6 = self.conn.session("unbinder")
try:
#other connection should not be allowed to declare this:
s6.exchange_unbind(exchange="amq.fanout", queue="exclusive-queue")
self.fail("Expected exchange_unbind on an exclusive queue to raise an exception")
except SessionException, e:
self.assertEquals(405, e.args[0].error_code)
def test_declare_exclusive_alreadyinuse(self):
"""
Test that exclusivity is real if granted
"""
# TestBase.setUp has already opened session(1)
s1 = self.session
# Here we open a second separate connection:
s2 = self.conn.session("other")
#declare an exclusive queue:
s1.queue_declare(queue="a-queue", auto_delete=True)
s1.message_subscribe(queue="a-queue")
try:
#other connection should not be allowed to declare this:
s2.queue_declare(queue="a-queue", exclusive=True, auto_delete=True)
self.fail("Expected request for exclusivity to fail")
except SessionException, e:
self.assertEquals(405, e.args[0].error_code)
def test_declare_passive(self):
"""
Test that the passive field is honoured in queue.declare
"""
s1 = self.session
s2 = self.conn.session("other")
s1.queue_declare(queue="passive-queue-1")
#ensure that same/separate sessions can passively declare same queue
s1.queue_declare(queue="passive-queue-1", passive=True)
s2.queue_declare(queue="passive-queue-1", passive=True)
s1.queue_delete(queue="passive-queue-1")
def test_declare_passive_queue_not_found(self):
"""
Test that the passive field is honoured in queue.declare
"""
s1 = self.session
try:
s1.queue_declare(queue="passive-queue-not-found", passive=True)
self.fail("Expected passive declaration of non-existent queue to raise a channel exception")
except SessionException, e:
self.assertEquals(404, e.args[0].error_code) #not-found
def test_declare_passive_with_exclusive(self):
"""
Test that the passive field is honoured in queue.declare
"""
s1 = self.session
s2 = self.conn.session("other")
#declare exclusive/non-exclusive queues:
s1.queue_declare(queue="passive-queue-exc", exclusive=True, auto_delete=True)
s1.queue_declare(queue="passive-queue-nonexc", exclusive=False, auto_delete=True)
#ensure that same/separate sessions can passively declare same queue *without* the exclusive flag
#this is important for the request/reply pattern
s1.queue_declare(queue="passive-queue-exc", passive=True)
s2.queue_declare(queue="passive-queue-exc", passive=True)
try:
s2.queue_declare(queue="passive-queue-nonexc", exclusive=True, passive=True)
self.fail("Expected exclusive passive declaration of existing queue to raise a channel exception")
except SessionException, e:
self.assertEquals(405, e.args[0].error_code) # resource locked
def test_bind(self):
"""
Test various permutations of the queue.bind method+
"""
session = self.session
session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
#straightforward case, both exchange & queue exist so no errors expected:
session.exchange_bind(queue="queue-1", exchange="amq.direct", binding_key="key1")
#use the queue name where the routing key is not specified:
session.exchange_bind(queue="queue-1", exchange="amq.direct")
#try and bind to non-existant exchange
try:
session.exchange_bind(queue="queue-1", exchange="an-invalid-exchange", binding_key="key1")
self.fail("Expected bind to non-existant exchange to fail")
except SessionException, e:
self.assertEquals(404, e.args[0].error_code)
def test_bind_queue_existence(self):
session = self.session
#try and bind non-existant queue:
try:
session.exchange_bind(queue="queue-2", exchange="amq.direct", binding_key="key1")
self.fail("Expected bind of non-existant queue to fail")
except SessionException, e:
self.assertEquals(404, e.args[0].error_code)
def test_unbind_direct(self):
self.unbind_test(exchange="amq.direct", routing_key="key")
def test_unbind_topic(self):
self.unbind_test(exchange="amq.topic", routing_key="key")
def test_unbind_fanout(self):
self.unbind_test(exchange="amq.fanout")
def test_unbind_headers(self):
self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"})
def unbind_test(self, exchange, routing_key="", args=None, headers=None):
#bind two queues and consume from them
session = self.session
session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
session.queue_declare(queue="queue-2", exclusive=True, auto_delete=True)
session.message_subscribe(queue="queue-1", destination="queue-1")
session.message_flow(destination="queue-1", unit=session.credit_unit.message, value=0xFFFFFFFFL)
session.message_flow(destination="queue-1", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
session.message_subscribe(queue="queue-2", destination="queue-2")
session.message_flow(destination="queue-2", unit=session.credit_unit.message, value=0xFFFFFFFFL)
session.message_flow(destination="queue-2", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
queue1 = session.incoming("queue-1")
queue2 = session.incoming("queue-2")
session.exchange_bind(exchange=exchange, queue="queue-1", binding_key=routing_key, arguments=args)
session.exchange_bind(exchange=exchange, queue="queue-2", binding_key=routing_key, arguments=args)
dp = session.delivery_properties(routing_key=routing_key)
if (headers):
mp = session.message_properties(application_headers=headers)
msg1 = Message(dp, mp, "one")
msg2 = Message(dp, mp, "two")
else:
msg1 = Message(dp, "one")
msg2 = Message(dp, "two")
#send a message that will match both bindings
session.message_transfer(destination=exchange, message=msg1)
#unbind first queue
session.exchange_unbind(exchange=exchange, queue="queue-1", binding_key=routing_key)
#send another message
session.message_transfer(destination=exchange, message=msg2)
#check one queue has both messages and the other has only one
self.assertEquals("one", queue1.get(timeout=1).body)
try:
msg = queue1.get(timeout=1)
self.fail("Got extra message: %s" % msg.body)
except Empty: pass
self.assertEquals("one", queue2.get(timeout=1).body)
self.assertEquals("two", queue2.get(timeout=1).body)
try:
msg = queue2.get(timeout=1)
self.fail("Got extra message: " + msg)
except Empty: pass
def test_delete_simple(self):
"""
Test core queue deletion behaviour
"""
session = self.session
#straight-forward case:
session.queue_declare(queue="delete-me")
session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "a"))
session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "b"))
session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "c"))
session.queue_delete(queue="delete-me")
#check that it has gone by declaring passively
try:
session.queue_declare(queue="delete-me", passive=True)
self.fail("Queue has not been deleted")
except SessionException, e:
self.assertEquals(404, e.args[0].error_code)
def test_delete_queue_exists(self):
"""
Test core queue deletion behaviour
"""
#check attempted deletion of non-existant queue is handled correctly:
session = self.session
try:
session.queue_delete(queue="i-dont-exist", if_empty=True)
self.fail("Expected delete of non-existant queue to fail")
except SessionException, e:
self.assertEquals(404, e.args[0].error_code)
def test_delete_ifempty(self):
"""
Test that if_empty field of queue_delete is honoured
"""
session = self.session
#create a queue and add a message to it (use default binding):
session.queue_declare(queue="delete-me-2")
session.queue_declare(queue="delete-me-2", passive=True)
session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me-2"), "message"))
#try to delete, but only if empty:
try:
session.queue_delete(queue="delete-me-2", if_empty=True)
self.fail("Expected delete if_empty to fail for non-empty queue")
except SessionException, e:
self.assertEquals(406, e.args[0].error_code)
#need new session now:
session = self.conn.session("replacement", 2)
#empty queue:
session.message_subscribe(destination="consumer_tag", queue="delete-me-2")
session.message_flow(destination="consumer_tag", unit=session.credit_unit.message, value=0xFFFFFFFFL)
session.message_flow(destination="consumer_tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
queue = session.incoming("consumer_tag")
msg = queue.get(timeout=1)
self.assertEqual("message", msg.body)
session.message_accept(RangedSet(msg.id))
session.message_cancel(destination="consumer_tag")
#retry deletion on empty queue:
session.queue_delete(queue="delete-me-2", if_empty=True)
#check that it has gone by declaring passively:
try:
session.queue_declare(queue="delete-me-2", passive=True)
self.fail("Queue has not been deleted")
except SessionException, e:
self.assertEquals(404, e.args[0].error_code)
def test_delete_ifunused(self):
"""
Test that if_unused field of queue_delete is honoured
"""
session = self.session
#create a queue and register a consumer:
session.queue_declare(queue="delete-me-3")
session.queue_declare(queue="delete-me-3", passive=True)
session.message_subscribe(destination="consumer_tag", queue="delete-me-3")
#need new session now:
session2 = self.conn.session("replacement", 2)
#try to delete, but only if empty:
try:
session2.queue_delete(queue="delete-me-3", if_unused=True)
self.fail("Expected delete if_unused to fail for queue with existing consumer")
except SessionException, e:
self.assertEquals(406, e.args[0].error_code)
session.message_cancel(destination="consumer_tag")
session.queue_delete(queue="delete-me-3", if_unused=True)
#check that it has gone by declaring passively:
try:
session.queue_declare(queue="delete-me-3", passive=True)
self.fail("Queue has not been deleted")
except SessionException, e:
self.assertEquals(404, e.args[0].error_code)
def test_autodelete_shared(self):
"""
Test auto-deletion (of non-exclusive queues)
"""
session = self.session
session2 =self.conn.session("other", 1)
session.queue_declare(queue="auto-delete-me", auto_delete=True)
#consume from both sessions
tag = "my-tag"
session.message_subscribe(queue="auto-delete-me", destination=tag)
session2.message_subscribe(queue="auto-delete-me", destination=tag)
#implicit cancel
session2.close()
#check it is still there
session.queue_declare(queue="auto-delete-me", passive=True)
#explicit cancel => queue is now unused again:
session.message_cancel(destination=tag)
#NOTE: this assumes there is no timeout in use
#check that it has gone by declaring it passively
try:
session.queue_declare(queue="auto-delete-me", passive=True)
self.fail("Expected queue to have been deleted")
except SessionException, e:
self.assertEquals(404, e.args[0].error_code)