blob: b25016e680674234b415c8210e02be38edbe4f13 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.content import Content
from qpid.testlib import testrunner, TestBase
from qpid.reference import Reference, ReferenceId
class MessageTests(TestBase):
"""Tests for 'methods' on the amqp message 'class'"""
def test_consume_no_local(self):
"""
Test that the no_local flag is honoured in the consume method
"""
channel = self.channel
#setup, declare two queues:
channel.queue_declare(queue="test-queue-1a", exclusive=True)
channel.queue_declare(queue="test-queue-1b", exclusive=True)
#establish two consumers one of which excludes delivery of locally sent messages
channel.message_consume(destination="local_included", queue="test-queue-1a")
channel.message_consume(destination="local_excluded", queue="test-queue-1b", no_local=True)
#send a message
channel.message_transfer(routing_key="test-queue-1a", body="consume_no_local")
channel.message_transfer(routing_key="test-queue-1b", body="consume_no_local")
#check the queues of the two consumers
excluded = self.client.queue("local_excluded")
included = self.client.queue("local_included")
msg = included.get(timeout=1)
self.assertEqual("consume_no_local", msg.body)
try:
excluded.get(timeout=1)
self.fail("Received locally published message though no_local=true")
except Empty: None
def test_consume_exclusive(self):
"""
Test that the exclusive flag is honoured in the consume method
"""
channel = self.channel
#setup, declare a queue:
channel.queue_declare(queue="test-queue-2", exclusive=True)
#check that an exclusive consumer prevents other consumer being created:
channel.message_consume(destination="first", queue="test-queue-2", exclusive=True)
try:
channel.message_consume(destination="second", queue="test-queue-2")
self.fail("Expected consume request to fail due to previous exclusive consumer")
except Closed, e:
self.assertChannelException(403, e.args[0])
#open new channel and cleanup last consumer:
channel = self.client.channel(2)
channel.channel_open()
#check that an exclusive consumer cannot be created if a consumer already exists:
channel.message_consume(destination="first", queue="test-queue-2")
try:
channel.message_consume(destination="second", queue="test-queue-2", exclusive=True)
self.fail("Expected exclusive consume request to fail due to previous consumer")
except Closed, e:
self.assertChannelException(403, e.args[0])
def test_consume_queue_errors(self):
"""
Test error conditions associated with the queue field of the consume method:
"""
channel = self.channel
try:
#queue specified but doesn't exist:
channel.message_consume(queue="invalid-queue")
self.fail("Expected failure when consuming from non-existent queue")
except Closed, e:
self.assertChannelException(404, e.args[0])
channel = self.client.channel(2)
channel.channel_open()
try:
#queue not specified and none previously declared for channel:
channel.message_consume(queue="")
self.fail("Expected failure when consuming from unspecified queue")
except Closed, e:
self.assertConnectionException(530, e.args[0])
def test_consume_unique_consumers(self):
"""
Ensure unique consumer tags are enforced
"""
channel = self.channel
#setup, declare a queue:
channel.queue_declare(queue="test-queue-3", exclusive=True)
#check that attempts to use duplicate tags are detected and prevented:
channel.message_consume(destination="first", queue="test-queue-3")
try:
channel.message_consume(destination="first", queue="test-queue-3")
self.fail("Expected consume request to fail due to non-unique tag")
except Closed, e:
self.assertConnectionException(530, e.args[0])
def test_cancel(self):
"""
Test compliance of the basic.cancel method
"""
channel = self.channel
#setup, declare a queue:
channel.queue_declare(queue="test-queue-4", exclusive=True)
channel.message_consume(destination="my-consumer", queue="test-queue-4")
channel.message_transfer(routing_key="test-queue-4", body="One")
#cancel should stop messages being delivered
channel.message_cancel(destination="my-consumer")
channel.message_transfer(routing_key="test-queue-4", body="Two")
myqueue = self.client.queue("my-consumer")
msg = myqueue.get(timeout=1)
self.assertEqual("One", msg.body)
try:
msg = myqueue.get(timeout=1)
self.fail("Got message after cancellation: " + msg)
except Empty: None
#cancellation of non-existant consumers should be handled without error
channel.message_cancel(destination="my-consumer")
channel.message_cancel(destination="this-never-existed")
def test_ack(self):
"""
Test basic ack/recover behaviour
"""
channel = self.channel
channel.queue_declare(queue="test-ack-queue", exclusive=True)
channel.message_consume(queue="test-ack-queue", destination="consumer_tag", no_ack=False)
queue = self.client.queue("consumer_tag")
channel.message_transfer(routing_key="test-ack-queue", body="One")
channel.message_transfer(routing_key="test-ack-queue", body="Two")
channel.message_transfer(routing_key="test-ack-queue", body="Three")
channel.message_transfer(routing_key="test-ack-queue", body="Four")
channel.message_transfer(routing_key="test-ack-queue", body="Five")
msg1 = queue.get(timeout=1)
msg2 = queue.get(timeout=1)
msg3 = queue.get(timeout=1)
msg4 = queue.get(timeout=1)
msg5 = queue.get(timeout=1)
self.assertEqual("One", msg1.body)
self.assertEqual("Two", msg2.body)
self.assertEqual("Three", msg3.body)
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
msg1.ok(batchoffset=1)#One and Two
msg4.ok()
channel.message_recover(requeue=False)
msg3b = queue.get(timeout=1)
msg5b = queue.get(timeout=1)
self.assertEqual("Three", msg3b.body)
self.assertEqual("Five", msg5b.body)
try:
extra = queue.get(timeout=1)
self.fail("Got unexpected message: " + extra.body)
except Empty: None
def test_recover_requeue(self):
"""
Test requeing on recovery
"""
channel = self.channel
channel.queue_declare(queue="test-requeue", exclusive=True)
channel.message_consume(queue="test-requeue", destination="consumer_tag", no_ack=False)
queue = self.client.queue("consumer_tag")
channel.message_transfer(routing_key="test-requeue", body="One")
channel.message_transfer(routing_key="test-requeue", body="Two")
channel.message_transfer(routing_key="test-requeue", body="Three")
channel.message_transfer(routing_key="test-requeue", body="Four")
channel.message_transfer(routing_key="test-requeue", body="Five")
msg1 = queue.get(timeout=1)
msg2 = queue.get(timeout=1)
msg3 = queue.get(timeout=1)
msg4 = queue.get(timeout=1)
msg5 = queue.get(timeout=1)
self.assertEqual("One", msg1.body)
self.assertEqual("Two", msg2.body)
self.assertEqual("Three", msg3.body)
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
msg1.ok(batchoffset=1) #One and Two
msg4.ok() #Four
channel.message_cancel(destination="consumer_tag")
#publish a new message
channel.message_transfer(routing_key="test-requeue", body="Six")
#requeue unacked messages (Three and Five)
channel.message_recover(requeue=True)
channel.message_consume(queue="test-requeue", destination="consumer_tag")
queue2 = self.client.queue("consumer_tag")
msg3b = queue2.get(timeout=1)
msg5b = queue2.get(timeout=1)
self.assertEqual("Three", msg3b.body)
self.assertEqual("Five", msg5b.body)
self.assertEqual(True, msg3b.redelivered)
self.assertEqual(True, msg5b.redelivered)
self.assertEqual("Six", queue2.get(timeout=1).body)
try:
extra = queue2.get(timeout=1)
self.fail("Got unexpected message in second queue: " + extra.body)
except Empty: None
try:
extra = queue.get(timeout=1)
self.fail("Got unexpected message in original queue: " + extra.body)
except Empty: None
def test_qos_prefetch_count(self):
"""
Test that the prefetch count specified is honoured
"""
#setup: declare queue and subscribe
channel = self.channel
channel.queue_declare(queue="test-prefetch-count", exclusive=True)
subscription = channel.message_consume(queue="test-prefetch-count", destination="consumer_tag", no_ack=False)
queue = self.client.queue("consumer_tag")
#set prefetch to 5:
channel.message_qos(prefetch_count=5)
#publish 10 messages:
for i in range(1, 11):
channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i)
#only 5 messages should have been delivered:
for i in range(1, 6):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
try:
extra = queue.get(timeout=1)
self.fail("Got unexpected 6th message in original queue: " + extra.body)
except Empty: None
#ack messages and check that the next set arrive ok:
#todo: once batching is implmented, send a single response for all messages
msg.ok(batchoffset=-4)#1-5
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
msg.ok(batchoffset=-4)#6-10
try:
extra = queue.get(timeout=1)
self.fail("Got unexpected 11th message in original queue: " + extra.body)
except Empty: None
def test_qos_prefetch_size(self):
"""
Test that the prefetch size specified is honoured
"""
#setup: declare queue and subscribe
channel = self.channel
channel.queue_declare(queue="test-prefetch-size", exclusive=True)
subscription = channel.message_consume(queue="test-prefetch-size", destination="consumer_tag", no_ack=False)
queue = self.client.queue("consumer_tag")
#set prefetch to 50 bytes (each message is 9 or 10 bytes):
channel.message_qos(prefetch_size=50)
#publish 10 messages:
for i in range(1, 11):
channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i)
#only 5 messages should have been delivered (i.e. 45 bytes worth):
for i in range(1, 6):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
try:
extra = queue.get(timeout=1)
self.fail("Got unexpected 6th message in original queue: " + extra.body)
except Empty: None
#ack messages and check that the next set arrive ok:
msg.ok(batchoffset=-4)#1-5
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
msg.ok(batchoffset=-4)#6-10
try:
extra = queue.get(timeout=1)
self.fail("Got unexpected 11th message in original queue: " + extra.body)
except Empty: None
#make sure that a single oversized message still gets delivered
large = "abcdefghijklmnopqrstuvwxyz"
large = large + "-" + large;
channel.message_transfer(routing_key="test-prefetch-size", body=large)
msg = queue.get(timeout=1)
self.assertEqual(large, msg.body)
def test_get(self):
"""
Test message_get method
"""
channel = self.channel
channel.queue_declare(queue="test-get", exclusive=True)
#publish some messages (no_ack=True)
for i in range(1, 11):
channel.message_transfer(routing_key="test-get", body="Message %d" % i)
#use message_get to read back the messages, and check that we get an empty at the end
for i in range(1, 11):
tag = "queue %d" % i
reply = channel.message_get(no_ack=True, queue="test-get", destination=tag)
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "ok")
self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "empty")
#repeat for no_ack=False
for i in range(11, 21):
channel.message_transfer(routing_key="test-get", body="Message %d" % i)
for i in range(11, 21):
tag = "queue %d" % i
reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "ok")
msg = self.client.queue(tag).get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
if (i==13):
msg.ok(batchoffset=-2)#11, 12 & 13
if(i in [15, 17, 19]):
msg.ok()
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "empty")
#recover(requeue=True)
channel.message_recover(requeue=True)
#get the unacked messages again (14, 16, 18, 20)
for i in [14, 16, 18, 20]:
tag = "queue %d" % i
reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "ok")
msg = self.client.queue(tag).get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
msg.ok()
#channel.message_ack(delivery_tag=reply.delivery_tag)
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "empty")
channel.message_recover(requeue=True)
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "empty")
def test_reference_simple(self):
"""
Test basic ability to handle references
"""
channel = self.channel
channel.queue_declare(queue="ref_queue", exclusive=True)
channel.message_consume(queue="ref_queue", destination="c1")
queue = self.client.queue("c1")
refId = "myref"
channel.message_open(reference=refId)
channel.message_append(reference=refId, bytes="abcd")
channel.synchronous = False
ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId))
channel.synchronous = True
channel.message_append(reference=refId, bytes="efgh")
channel.message_append(reference=refId, bytes="ijkl")
channel.message_close(reference=refId)
#first, wait for the ok for the transfer
ack.get_response(timeout=1)
self.assertDataEquals(channel, queue.get(timeout=1), "abcdefghijkl")
def test_reference_large(self):
"""
Test basic ability to handle references whose content exceeds max frame size
"""
channel = self.channel
self.queue_declare(queue="ref_queue")
#generate a big data string (> max frame size of consumer):
data = "0123456789"
for i in range(0, 10):
data += data
#send it inline
channel.synchronous = False
ack = channel.message_transfer(routing_key="ref_queue", body=data)
channel.synchronous = True
#first, wait for the ok for the transfer
ack.get_response(timeout=1)
#create a new connection for consumer, with specific max frame size (< data)
other = self.connect(tune_params={"channel_max":10, "frame_max":5120, "heartbeat":0})
ch2 = other.channel(1)
ch2.channel_open()
ch2.message_consume(queue="ref_queue", destination="c1")
queue = other.queue("c1")
msg = queue.get(timeout=1)
self.assertTrue(isinstance(msg.body, ReferenceId))
self.assertTrue(msg.reference)
self.assertEquals(data, msg.reference.get_complete())
def test_reference_completion(self):
"""
Test that reference transfer are not deemed complete until
closed (therefore are not acked or routed until that point)
"""
channel = self.channel
channel.queue_declare(queue="ref_queue", exclusive=True)
channel.message_consume(queue="ref_queue", destination="c1")
queue = self.client.queue("c1")
refId = "myref"
channel.message_open(reference=refId)
channel.message_append(reference=refId, bytes="abcd")
channel.synchronous = False
ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId))
channel.synchronous = True
try:
msg = queue.get(timeout=1)
self.fail("Got unexpected message on queue: " + msg)
except Empty: None
self.assertTrue(not ack.is_complete())
channel.message_close(reference=refId)
#first, wait for the ok for the transfer
ack.get_response(timeout=1)
self.assertDataEquals(channel, queue.get(timeout=1), "abcd")
def test_reference_multi_transfer(self):
"""
Test that multiple transfer requests for the same reference are
correctly handled.
"""
channel = self.channel
#declare and consume from two queues
channel.queue_declare(queue="q-one", exclusive=True)
channel.queue_declare(queue="q-two", exclusive=True)
channel.message_consume(queue="q-one", destination="q-one")
channel.message_consume(queue="q-two", destination="q-two")
queue1 = self.client.queue("q-one")
queue2 = self.client.queue("q-two")
#transfer a single ref to both queues (in separate commands)
channel.message_open(reference="my-ref")
channel.synchronous = False
ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref"))
channel.message_append(reference="my-ref", bytes="my data")
ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref"))
channel.synchronous = True
channel.message_close(reference="my-ref")
#check that both queues have the message
self.assertDataEquals(channel, queue1.get(timeout=1), "my data")
self.assertDataEquals(channel, queue2.get(timeout=1), "my data")
self.assertEmpty(queue1)
self.assertEmpty(queue2)
#transfer a single ref to the same queue twice (in separate commands)
channel.message_open(reference="my-ref")
channel.synchronous = False
ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref"))
channel.message_append(reference="my-ref", bytes="second message")
ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref"))
channel.synchronous = True
channel.message_close(reference="my-ref")
msg1 = queue1.get(timeout=1)
msg2 = queue1.get(timeout=1)
#order is undefined
if msg1.message_id == "abc":
self.assertEquals(msg2.message_id, "xyz")
else:
self.assertEquals(msg1.message_id, "xyz")
self.assertEquals(msg2.message_id, "abc")
#would be legal for the incoming messages to be transfered
#inline or by reference in any combination
if isinstance(msg1.body, ReferenceId):
self.assertEquals("second message", msg1.reference.get_complete())
if isinstance(msg2.body, ReferenceId):
if msg1.body != msg2.body:
self.assertEquals("second message", msg2.reference.get_complete())
#else ok, as same ref as msg1
else:
self.assertEquals("second message", msg1.body)
if isinstance(msg2.body, ReferenceId):
self.assertEquals("second message", msg2.reference.get_complete())
else:
self.assertEquals("second message", msg2.body)
self.assertEmpty(queue1)
def test_reference_unopened_on_append_error(self):
channel = self.channel
try:
channel.message_append(reference="unopened")
except Closed, e:
self.assertConnectionException(503, e.args[0])
def test_reference_unopened_on_close_error(self):
channel = self.channel
try:
channel.message_close(reference="unopened")
except Closed, e:
self.assertConnectionException(503, e.args[0])
def test_reference_unopened_on_transfer_error(self):
channel = self.channel
try:
channel.message_transfer(body=ReferenceId("unopened"))
except Closed, e:
self.assertConnectionException(503, e.args[0])
def test_reference_already_opened_error(self):
channel = self.channel
channel.message_open(reference="a")
try:
channel.message_open(reference="a")
except Closed, e:
self.assertConnectionException(503, e.args[0])
def test_empty_reference(self):
channel = self.channel
channel.queue_declare(queue="ref_queue", exclusive=True)
channel.message_consume(queue="ref_queue", destination="c1")
queue = self.client.queue("c1")
refId = "myref"
channel.message_open(reference=refId)
channel.synchronous = False
ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId))
channel.synchronous = True
channel.message_close(reference=refId)
#first, wait for the ok for the transfer
ack.get_response(timeout=1)
msg = queue.get(timeout=1)
self.assertEquals(msg.message_id, "empty-msg")
self.assertDataEquals(channel, msg, "")
def test_reject(self):
channel = self.channel
channel.queue_declare(queue = "q", exclusive=True)
channel.message_consume(queue = "q", destination = "consumer")
channel.message_transfer(routing_key = "q", body="blah, blah")
msg = self.client.queue("consumer").get(timeout = 1)
self.assertEquals(msg.body, "blah, blah")
channel.message_cancel(destination = "consumer")
msg.reject()
channel.message_consume(queue = "q", destination = "checker")
msg = self.client.queue("checker").get(timeout = 1)
self.assertEquals(msg.body, "blah, blah")
def test_checkpoint(self):
channel = self.channel
channel.queue_declare(queue = "q", exclusive=True)
channel.message_open(reference="my-ref")
channel.message_append(reference="my-ref", bytes="abcdefgh")
channel.message_append(reference="my-ref", bytes="ijklmnop")
channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint")
channel.channel_close()
channel = self.client.channel(2)
channel.channel_open()
channel.message_consume(queue = "q", destination = "consumer")
offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value
self.assertTrue(offset<=16)
channel.message_append(reference="my-ref", bytes="qrstuvwxyz")
channel.synchronous = False
channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref"))
channel.synchronous = True
channel.message_close(reference="my-ref")
self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz")
self.assertEmpty(self.client.queue("consumer"))
def assertDataEquals(self, channel, msg, expected):
if isinstance(msg.body, ReferenceId):
data = msg.reference.get_complete()
else:
data = msg.body
self.assertEquals(expected, data)