| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| from qpid.client import Client, Closed |
| from qpid.queue import Empty |
| from qpid.testlib import TestBase010 |
| from qpid.datatypes import Message, RangedSet |
| from qpid.session import SessionException |
| |
| from qpid.content import Content |
| from time import sleep |
| |
| class MessageTests(TestBase010): |
| """Tests for 'methods' on the amqp message 'class'""" |
| |
| def test_no_local(self): |
| """ |
| NOTE: this is a test of a QPID specific feature |
| |
| Test that the qpid specific no_local arg is honoured. |
| """ |
| session = self.session |
| #setup, declare two queues one of which excludes delivery of locally sent messages |
| session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True) |
| session.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True, arguments={'no-local':'true'}) |
| #establish two consumers |
| self.subscribe(destination="local_included", queue="test-queue-1a") |
| self.subscribe(destination="local_excluded", queue="test-queue-1b") |
| |
| #send a message |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me")) |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me")) |
| |
| #send a message from another session on the same connection to each queue |
| session2 = self.conn.session("my-local-session") |
| session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1a"), "deliver-me-as-well")) |
| session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me-either")) |
| |
| #send a message from a session on another connection to each queue |
| for q in ["test-queue-1a", "test-queue-1b"]: |
| session.exchange_bind(queue=q, exchange="amq.fanout", binding_key="my-key") |
| other = self.connect() |
| session3 = other.session("my-other-session") |
| session3.message_transfer(destination="amq.fanout", message=Message("i-am-not-local")) |
| other.close() |
| |
| #check the queues of the two consumers |
| excluded = session.incoming("local_excluded") |
| included = session.incoming("local_included") |
| for b in ["deliver-me", "deliver-me-as-well", "i-am-not-local"]: |
| msg = included.get(timeout=1) |
| self.assertEqual(b, msg.body) |
| msg = excluded.get(timeout=1) |
| self.assertEqual("i-am-not-local", msg.body) |
| try: |
| excluded.get(timeout=1) |
| self.fail("Received locally published message though no_local=true") |
| except Empty: None |
| |
| def test_no_local_awkward(self): |
| |
| """ |
| NOTE: this is a test of a QPID specific feature |
| |
| Check that messages which will be excluded through no-local |
| processing will not block subsequent deliveries |
| """ |
| |
| session = self.session |
| #setup: |
| session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True, arguments={'no-local':'true'}) |
| #establish consumer which excludes delivery of locally sent messages |
| self.subscribe(destination="local_excluded", queue="test-queue") |
| |
| #send a 'local' message |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "local")) |
| |
| #send a non local message |
| other = self.connect() |
| session2 = other.session("my-session", 1) |
| session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue"), "foreign")) |
| session2.close() |
| other.close() |
| |
| #check that the second message only is delivered |
| excluded = session.incoming("local_excluded") |
| msg = excluded.get(timeout=1) |
| self.assertEqual("foreign", msg.body) |
| try: |
| excluded.get(timeout=1) |
| self.fail("Received extra message") |
| except Empty: None |
| #check queue is empty |
| self.assertEqual(0, session.queue_query(queue="test-queue").message_count) |
| |
| def test_no_local_exclusive_subscribe(self): |
| """ |
| NOTE: this is a test of a QPID specific feature |
| |
| Test that the no_local processing works on queues not declared |
| as exclusive, but with an exclusive subscription |
| """ |
| session = self.session |
| |
| #setup, declare two queues one of which excludes delivery of |
| #locally sent messages but is not declared as exclusive |
| session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True) |
| session.queue_declare(queue="test-queue-1b", auto_delete=True, arguments={'no-local':'true'}) |
| #establish two consumers |
| self.subscribe(destination="local_included", queue="test-queue-1a") |
| self.subscribe(destination="local_excluded", queue="test-queue-1b", exclusive=True) |
| |
| #send a message from the same session to each queue |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me")) |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me")) |
| |
| #send a message from another session on the same connection to each queue |
| session2 = self.conn.session("my-session") |
| session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1a"), "deliver-me-as-well")) |
| session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me-either")) |
| |
| #send a message from a session on another connection to each queue |
| for q in ["test-queue-1a", "test-queue-1b"]: |
| session.exchange_bind(queue=q, exchange="amq.fanout", binding_key="my-key") |
| other = self.connect() |
| session3 = other.session("my-other-session") |
| session3.message_transfer(destination="amq.fanout", message=Message("i-am-not-local")) |
| other.close() |
| |
| #check the queues of the two consumers |
| excluded = session.incoming("local_excluded") |
| included = session.incoming("local_included") |
| for b in ["deliver-me", "deliver-me-as-well", "i-am-not-local"]: |
| msg = included.get(timeout=1) |
| self.assertEqual(b, msg.body) |
| msg = excluded.get(timeout=1) |
| self.assertEqual("i-am-not-local", msg.body) |
| try: |
| excluded.get(timeout=1) |
| self.fail("Received locally published message though no_local=true") |
| except Empty: None |
| |
| |
| def test_consume_exclusive(self): |
| """ |
| Test an exclusive consumer prevents other consumer being created |
| """ |
| session = self.session |
| session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True) |
| session.message_subscribe(destination="first", queue="test-queue-2", exclusive=True) |
| try: |
| session.message_subscribe(destination="second", queue="test-queue-2") |
| self.fail("Expected consume request to fail due to previous exclusive consumer") |
| except SessionException, e: |
| self.assertEquals(405, e.args[0].error_code) |
| |
| def test_consume_exclusive2(self): |
| """ |
| Check that an exclusive consumer cannot be created if a consumer already exists: |
| """ |
| session = self.session |
| session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True) |
| session.message_subscribe(destination="first", queue="test-queue-2") |
| try: |
| session.message_subscribe(destination="second", queue="test-queue-2", exclusive=True) |
| self.fail("Expected exclusive consume request to fail due to previous consumer") |
| except SessionException, e: |
| self.assertEquals(405, e.args[0].error_code) |
| |
| def test_consume_queue_not_found(self): |
| """ |
| Test error conditions associated with the queue field of the consume method: |
| """ |
| session = self.session |
| try: |
| #queue specified but doesn't exist: |
| session.message_subscribe(queue="invalid-queue", destination="a") |
| self.fail("Expected failure when consuming from non-existent queue") |
| except SessionException, e: |
| self.assertEquals(404, e.args[0].error_code) |
| |
| def test_consume_queue_not_specified(self): |
| session = self.session |
| try: |
| #queue not specified and none previously declared for channel: |
| session.message_subscribe(destination="a") |
| self.fail("Expected failure when consuming from unspecified queue") |
| except SessionException, e: |
| self.assertEquals(531, e.args[0].error_code) |
| |
| def test_consume_unique_consumers(self): |
| """ |
| Ensure unique consumer tags are enforced |
| """ |
| session = self.session |
| #setup, declare a queue: |
| session.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True) |
| |
| #check that attempts to use duplicate tags are detected and prevented: |
| session.message_subscribe(destination="first", queue="test-queue-3") |
| try: |
| session.message_subscribe(destination="first", queue="test-queue-3") |
| self.fail("Expected consume request to fail due to non-unique tag") |
| except SessionException, e: |
| self.assertEquals(530, e.args[0].error_code) |
| |
| def test_cancel(self): |
| """ |
| Test compliance of the basic.cancel method |
| """ |
| session = self.session |
| #setup, declare a queue: |
| session.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True) |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "One")) |
| |
| session.message_subscribe(destination="my-consumer", queue="test-queue-4") |
| myqueue = session.incoming("my-consumer") |
| session.message_flow(destination="my-consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL) |
| session.message_flow(destination="my-consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL) |
| |
| #should flush here |
| |
| #cancel should stop messages being delivered |
| session.message_cancel(destination="my-consumer") |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "Two")) |
| msg = myqueue.get(timeout=1) |
| self.assertEqual("One", msg.body) |
| try: |
| msg = myqueue.get(timeout=1) |
| self.fail("Got message after cancellation: " + msg) |
| except Empty: None |
| |
| #cancellation of non-existant consumers should be result in 404s |
| try: |
| session.message_cancel(destination="my-consumer") |
| self.fail("Expected 404 for recancellation of subscription.") |
| except SessionException, e: |
| self.assertEquals(404, e.args[0].error_code) |
| |
| session = self.conn.session("alternate-session", timeout=10) |
| try: |
| session.message_cancel(destination="this-never-existed") |
| self.fail("Expected 404 for cancellation of unknown subscription.") |
| except SessionException, e: |
| self.assertEquals(404, e.args[0].error_code) |
| |
| |
| def test_ack(self): |
| """ |
| Test basic ack/recover behaviour using a combination of implicit and |
| explicit accept subscriptions. |
| """ |
| self.startQmf() |
| session1 = self.conn.session("alternate-session", timeout=10) |
| session1.queue_declare(queue="test-ack-queue", auto_delete=True) |
| |
| delivery_properties = session1.delivery_properties(routing_key="test-ack-queue") |
| for i in ["One", "Two", "Three", "Four", "Five"]: |
| session1.message_transfer(message=Message(delivery_properties, i)) |
| |
| # verify enqueued message count, use both QMF and session query to verify consistency |
| self.assertEqual(5, session1.queue_query(queue="test-ack-queue").message_count) |
| queueObj = self.qmf.getObjects(_class="queue", name="test-ack-queue")[0] |
| self.assertEquals(queueObj.msgDepth, 5) |
| self.assertEquals(queueObj.msgTotalEnqueues, 5) |
| self.assertEquals(queueObj.msgTotalDequeues, 0) |
| |
| # subscribe with implied acquire, explicit accept: |
| session1.message_subscribe(queue = "test-ack-queue", destination = "consumer") |
| session1.message_flow(destination="consumer", unit=session1.credit_unit.message, value=0xFFFFFFFFL) |
| session1.message_flow(destination="consumer", unit=session1.credit_unit.byte, value=0xFFFFFFFFL) |
| queue = session1.incoming("consumer") |
| |
| msg1 = queue.get(timeout=1) |
| msg2 = queue.get(timeout=1) |
| msg3 = queue.get(timeout=1) |
| msg4 = queue.get(timeout=1) |
| msg5 = queue.get(timeout=1) |
| |
| self.assertEqual("One", msg1.body) |
| self.assertEqual("Two", msg2.body) |
| self.assertEqual("Three", msg3.body) |
| self.assertEqual("Four", msg4.body) |
| self.assertEqual("Five", msg5.body) |
| |
| # messages should not be on the queue: |
| self.assertEqual(0, session1.queue_query(queue="test-ack-queue").message_count) |
| # QMF shows the dequeues as not having happened yet, since they are have |
| # not been accepted |
| queueObj.update() |
| self.assertEquals(queueObj.msgDepth, 5) |
| self.assertEquals(queueObj.msgTotalEnqueues, 5) |
| self.assertEquals(queueObj.msgTotalDequeues, 0) |
| |
| session1.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four |
| |
| # QMF should now reflect the accepted messages as being dequeued |
| self.assertEqual(0, session1.queue_query(queue="test-ack-queue").message_count) |
| queueObj.update() |
| self.assertEquals(queueObj.msgDepth, 2) |
| self.assertEquals(queueObj.msgTotalEnqueues, 5) |
| self.assertEquals(queueObj.msgTotalDequeues, 3) |
| |
| #subscribe from second session here to ensure queue is not auto-deleted |
| #when alternate session closes. Use implicit accept mode to test that |
| #we don't need to explicitly accept |
| session2 = self.conn.session("alternate-session-2", timeout=10) |
| session2.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1) |
| |
| #now close the first session, and see that the unaccepted messages are |
| #then redelivered to another subscriber: |
| session1.close(timeout=10) |
| |
| # check the statistics - the queue_query will show the non-accepted |
| # messages have been released. QMF never considered them dequeued, so |
| # those counts won't change |
| self.assertEqual(2, session2.queue_query(queue="test-ack-queue").message_count) |
| queueObj.update() |
| self.assertEquals(queueObj.msgDepth, 2) |
| self.assertEquals(queueObj.msgTotalEnqueues, 5) |
| self.assertEquals(queueObj.msgTotalDequeues, 3) |
| |
| session2.message_flow(destination="checker", unit=session2.credit_unit.message, value=0xFFFFFFFFL) |
| session2.message_flow(destination="checker", unit=session2.credit_unit.byte, value=0xFFFFFFFFL) |
| queue = session2.incoming("checker") |
| |
| msg3b = queue.get(timeout=1) |
| msg5b = queue.get(timeout=1) |
| |
| self.assertEqual("Three", msg3b.body) |
| self.assertEqual("Five", msg5b.body) |
| |
| try: |
| extra = queue.get(timeout=1) |
| self.fail("Got unexpected message: " + extra.body) |
| except Empty: None |
| |
| self.assertEqual(0, session2.queue_query(queue="test-ack-queue").message_count) |
| queueObj.update() |
| self.assertEquals(queueObj.msgDepth, 0) |
| self.assertEquals(queueObj.msgTotalEnqueues, 5) |
| self.assertEquals(queueObj.msgTotalDequeues, 5) |
| |
| # Subscribe one last time to keep the queue available, and to verify |
| # that the implied accept worked by verifying no messages have been |
| # returned when session2 is closed. |
| self.session.message_subscribe(queue = "test-ack-queue", destination = "final-checker") |
| |
| session2.close(timeout=10) |
| |
| # check the statistics - they should not have changed |
| self.assertEqual(0, self.session.queue_query(queue="test-ack-queue").message_count) |
| queueObj.update() |
| self.assertEquals(queueObj.msgDepth, 0) |
| self.assertEquals(queueObj.msgTotalEnqueues, 5) |
| self.assertEquals(queueObj.msgTotalDequeues, 5) |
| |
| self.session.message_flow(destination="final-checker", unit=self.session.credit_unit.message, value=0xFFFFFFFFL) |
| self.session.message_flow(destination="final-checker", unit=self.session.credit_unit.byte, value=0xFFFFFFFFL) |
| try: |
| extra = self.session.incoming("final-checker").get(timeout=1) |
| self.fail("Got unexpected message: " + extra.body) |
| except Empty: None |
| |
| def test_reject(self): |
| session = self.session |
| session.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout") |
| session.queue_declare(queue = "r", exclusive=True, auto_delete=True) |
| session.exchange_bind(queue = "r", exchange = "amq.fanout") |
| |
| session.message_subscribe(queue = "q", destination = "consumer") |
| session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL) |
| session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL) |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "blah, blah")) |
| msg = session.incoming("consumer").get(timeout = 1) |
| self.assertEquals(msg.body, "blah, blah") |
| session.message_reject(RangedSet(msg.id)) |
| |
| session.message_subscribe(queue = "r", destination = "checker") |
| session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL) |
| session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL) |
| msg = session.incoming("checker").get(timeout = 1) |
| self.assertEquals(msg.body, "blah, blah") |
| |
| def test_credit_flow_messages(self): |
| """ |
| Test basic credit based flow control with unit = message |
| """ |
| #declare an exclusive queue |
| session = self.session |
| session.queue_declare(queue = "q", exclusive=True, auto_delete=True) |
| #create consumer (for now that defaults to infinite credit) |
| session.message_subscribe(queue = "q", destination = "c") |
| session.message_set_flow_mode(flow_mode = 0, destination = "c") |
| #send batch of messages to queue |
| for i in range(1, 11): |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i)) |
| |
| #set message credit to finite amount (less than enough for all messages) |
| session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") |
| #set infinite byte credit |
| session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") |
| #check that expected number were received |
| q = session.incoming("c") |
| for i in range(1, 6): |
| self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) |
| self.assertEmpty(q) |
| |
| #increase credit again and check more are received |
| for i in range(6, 11): |
| session.message_flow(unit = session.credit_unit.message, value = 1, destination = "c") |
| self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) |
| self.assertEmpty(q) |
| |
| def test_credit_flow_bytes(self): |
| """ |
| Test basic credit based flow control with unit = bytes |
| """ |
| #declare an exclusive queue |
| session = self.session |
| session.queue_declare(queue = "q", exclusive=True, auto_delete=True) |
| #create consumer (for now that defaults to infinite credit) |
| session.message_subscribe(queue = "q", destination = "c") |
| session.message_set_flow_mode(flow_mode = 0, destination = "c") |
| #send batch of messages to queue |
| for i in range(10): |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh")) |
| |
| #each message is currently interpreted as requiring msg_size bytes of credit |
| msg_size = 19 |
| |
| #set byte credit to finite amount (less than enough for all messages) |
| session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c") |
| #set infinite message credit |
| session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "c") |
| #check that expected number were received |
| q = session.incoming("c") |
| for i in range(5): |
| self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") |
| self.assertEmpty(q) |
| |
| #increase credit again and check more are received |
| for i in range(5): |
| session.message_flow(unit = session.credit_unit.byte, value = msg_size, destination = "c") |
| self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") |
| self.assertEmpty(q) |
| |
| |
| def test_window_flow_messages(self): |
| """ |
| Test basic window based flow control with unit = message |
| """ |
| #declare an exclusive queue |
| session = self.session |
| session.queue_declare(queue = "q", exclusive=True, auto_delete=True) |
| #create consumer (for now that defaults to infinite credit) |
| session.message_subscribe(queue = "q", destination = "c") |
| session.message_set_flow_mode(flow_mode = 1, destination = "c") |
| #send batch of messages to queue |
| for i in range(1, 11): |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i)) |
| |
| #set message credit to finite amount (less than enough for all messages) |
| session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") |
| #set infinite byte credit |
| session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") |
| #check that expected number were received |
| q = session.incoming("c") |
| for i in range(1, 6): |
| msg = q.get(timeout = 1) |
| session.receiver._completed.add(msg.id)#TODO: this may be done automatically |
| self.assertDataEquals(session, msg, "Message %d" % i) |
| self.assertEmpty(q) |
| |
| #acknowledge messages and check more are received |
| #TODO: there may be a nicer way of doing this |
| session.channel.session_completed(session.receiver._completed) |
| |
| for i in range(6, 11): |
| self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) |
| self.assertEmpty(q) |
| |
| |
| def test_window_flow_bytes(self): |
| """ |
| Test basic window based flow control with unit = bytes |
| """ |
| #declare an exclusive queue |
| session = self.session |
| session.queue_declare(queue = "q", exclusive=True, auto_delete=True) |
| #create consumer (for now that defaults to infinite credit) |
| session.message_subscribe(queue = "q", destination = "c") |
| session.message_set_flow_mode(flow_mode = 1, destination = "c") |
| #send batch of messages to queue |
| for i in range(10): |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh")) |
| |
| #each message is currently interpreted as requiring msg_size bytes of credit |
| msg_size = 19 |
| |
| #set byte credit to finite amount (less than enough for all messages) |
| session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c") |
| #set infinite message credit |
| session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "c") |
| #check that expected number were received |
| q = session.incoming("c") |
| msgs = [] |
| for i in range(5): |
| msg = q.get(timeout = 1) |
| msgs.append(msg) |
| self.assertDataEquals(session, msg, "abcdefgh") |
| self.assertEmpty(q) |
| |
| #ack each message individually and check more are received |
| for i in range(5): |
| msg = msgs.pop() |
| #TODO: there may be a nicer way of doing this |
| session.receiver._completed.add(msg.id) |
| session.channel.session_completed(session.receiver._completed) |
| self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") |
| self.assertEmpty(q) |
| |
| def test_window_flush_ack_flow(self): |
| """ |
| Test basic window based flow control with unit = bytes |
| """ |
| #declare an exclusive queue |
| ssn = self.session |
| ssn.queue_declare(queue = "q", exclusive=True, auto_delete=True) |
| #create consumer |
| ssn.message_subscribe(queue = "q", destination = "c", |
| accept_mode=ssn.accept_mode.explicit) |
| ssn.message_set_flow_mode(flow_mode = ssn.flow_mode.window, destination = "c") |
| |
| #send message A |
| ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "A")) |
| |
| for unit in ssn.credit_unit.VALUES: |
| ssn.message_flow("c", unit, 0xFFFFFFFFL) |
| |
| q = ssn.incoming("c") |
| msgA = q.get(timeout=10) |
| |
| ssn.message_flush(destination="c") |
| |
| # XXX |
| ssn.receiver._completed.add(msgA.id) |
| ssn.channel.session_completed(ssn.receiver._completed) |
| ssn.message_accept(RangedSet(msgA.id)) |
| |
| for unit in ssn.credit_unit.VALUES: |
| ssn.message_flow("c", unit, 0xFFFFFFFFL) |
| |
| #send message B |
| ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "B")) |
| |
| msgB = q.get(timeout=10) |
| |
| def test_window_stop(self): |
| """ |
| Ensure window based flow control reacts to stop correctly |
| """ |
| session = self.session |
| #setup subscriber on a test queue |
| session.queue_declare(queue = "q", exclusive=True, auto_delete=True) |
| session.message_subscribe(queue = "q", destination = "c") |
| session.message_set_flow_mode(flow_mode = 1, destination = "c") |
| session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") |
| session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") |
| |
| |
| #send batch of messages to queue |
| for i in range(0, 10): |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % (i+1))) |
| |
| #retrieve all delivered messages |
| q = session.incoming("c") |
| for i in range(0, 5): |
| msg = q.get(timeout = 1) |
| session.receiver._completed.add(msg.id)#TODO: this may be done automatically |
| self.assertDataEquals(session, msg, "Message %d" % (i+1)) |
| |
| session.message_stop(destination = "c") |
| |
| #now send completions, normally used to move window forward, |
| #but after a stop should not do so |
| session.channel.session_completed(session.receiver._completed) |
| |
| #check no more messages are sent |
| self.assertEmpty(q) |
| |
| #re-establish window and check remaining messages |
| session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") |
| session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") |
| for i in range(0, 5): |
| msg = q.get(timeout = 1) |
| self.assertDataEquals(session, msg, "Message %d" % (i+6)) |
| |
| |
| def test_subscribe_not_acquired(self): |
| """ |
| Test the not-acquired modes works as expected for a simple case |
| """ |
| session = self.session |
| session.queue_declare(queue = "q", exclusive=True, auto_delete=True) |
| for i in range(1, 6): |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i)) |
| |
| session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) |
| session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "a") |
| session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") |
| session.message_subscribe(queue = "q", destination = "b", acquire_mode = 1) |
| session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "b") |
| session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b") |
| |
| for i in range(6, 11): |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i)) |
| |
| #both subscribers should see all messages |
| qA = session.incoming("a") |
| qB = session.incoming("b") |
| for i in range(1, 11): |
| for q in [qA, qB]: |
| msg = q.get(timeout = 1) |
| self.assertEquals("Message %s" % i, msg.body) |
| #TODO: tidy up completion |
| session.receiver._completed.add(msg.id) |
| |
| #TODO: tidy up completion |
| session.channel.session_completed(session.receiver._completed) |
| #messages should still be on the queue: |
| self.assertEquals(10, session.queue_query(queue = "q").message_count) |
| |
| def test_acquire_with_no_accept_and_credit_flow(self): |
| """ |
| Test that messages recieved unacquired, with accept not |
| required in windowing mode can be acquired. |
| """ |
| session = self.session |
| session.queue_declare(queue = "q", exclusive=True, auto_delete=True) |
| |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me")) |
| |
| session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, accept_mode = 1) |
| session.message_set_flow_mode(flow_mode = session.flow_mode.credit, destination = "a") |
| session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "a") |
| session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") |
| msg = session.incoming("a").get(timeout = 1) |
| self.assertEquals("acquire me", msg.body) |
| #message should still be on the queue: |
| self.assertEquals(1, session.queue_query(queue = "q").message_count) |
| |
| transfers = RangedSet(msg.id) |
| response = session.message_acquire(transfers) |
| #check that we get notification (i.e. message_acquired) |
| self.assert_(msg.id in response.transfers) |
| #message should have been removed from the queue: |
| self.assertEquals(0, session.queue_query(queue = "q").message_count) |
| |
| def test_acquire(self): |
| """ |
| Test explicit acquire function |
| """ |
| session = self.session |
| session.queue_declare(queue = "q", exclusive=True, auto_delete=True) |
| |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me")) |
| |
| session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) |
| session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL) |
| session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL) |
| msg = session.incoming("a").get(timeout = 1) |
| self.assertEquals("acquire me", msg.body) |
| #message should still be on the queue: |
| self.assertEquals(1, session.queue_query(queue = "q").message_count) |
| |
| transfers = RangedSet(msg.id) |
| response = session.message_acquire(transfers) |
| #check that we get notification (i.e. message_acquired) |
| self.assert_(msg.id in response.transfers) |
| #message should have been removed from the queue: |
| self.assertEquals(0, session.queue_query(queue = "q").message_count) |
| session.message_accept(transfers) |
| |
| |
| def test_release(self): |
| """ |
| Test explicit release function |
| """ |
| session = self.session |
| session.queue_declare(queue = "q", exclusive=True, auto_delete=True) |
| |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "release me")) |
| |
| session.message_subscribe(queue = "q", destination = "a") |
| session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL) |
| session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL) |
| msg = session.incoming("a").get(timeout = 1) |
| self.assertEquals("release me", msg.body) |
| session.message_cancel(destination = "a") |
| session.message_release(RangedSet(msg.id)) |
| |
| #message should not have been removed from the queue: |
| self.assertEquals(1, session.queue_query(queue = "q").message_count) |
| |
| def test_release_ordering(self): |
| """ |
| Test order of released messages is as expected |
| """ |
| session = self.session |
| session.queue_declare(queue = "q", exclusive=True, auto_delete=True) |
| for i in range (1, 11): |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "released message %s" % (i))) |
| |
| session.message_subscribe(queue = "q", destination = "a") |
| session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") |
| session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") |
| queue = session.incoming("a") |
| first = queue.get(timeout = 1) |
| for i in range(2, 10): |
| msg = queue.get(timeout = 1) |
| self.assertEquals("released message %s" % (i), msg.body) |
| |
| last = queue.get(timeout = 1) |
| self.assertEmpty(queue) |
| released = RangedSet() |
| released.add(first.id, last.id) |
| session.message_release(released) |
| |
| #TODO: may want to clean this up... |
| session.receiver._completed.add(first.id, last.id) |
| session.channel.session_completed(session.receiver._completed) |
| |
| for i in range(1, 11): |
| self.assertEquals("released message %s" % (i), queue.get(timeout = 1).body) |
| |
| def test_ranged_ack(self): |
| """ |
| Test acking of messages ranges |
| """ |
| session = self.conn.session("alternate-session", timeout=10) |
| |
| session.queue_declare(queue = "q", auto_delete=True) |
| delivery_properties = session.delivery_properties(routing_key="q") |
| for i in range (1, 11): |
| session.message_transfer(message=Message(delivery_properties, "message %s" % (i))) |
| |
| session.message_subscribe(queue = "q", destination = "a") |
| session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") |
| session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") |
| queue = session.incoming("a") |
| ids = [] |
| for i in range (1, 11): |
| msg = queue.get(timeout = 1) |
| self.assertEquals("message %s" % (i), msg.body) |
| ids.append(msg.id) |
| |
| self.assertEmpty(queue) |
| |
| #ack all but the fourth message (command id 2) |
| accepted = RangedSet() |
| accepted.add(ids[0], ids[2]) |
| accepted.add(ids[4], ids[9]) |
| session.message_accept(accepted) |
| |
| #subscribe from second session here to ensure queue is not |
| #auto-deleted when alternate session closes (no need to ack on these): |
| self.session.message_subscribe(queue = "q", destination = "checker") |
| |
| #now close the session, and see that the unacked messages are |
| #then redelivered to another subscriber: |
| session.close(timeout=10) |
| |
| session = self.session |
| session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL) |
| session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL) |
| queue = session.incoming("checker") |
| |
| self.assertEquals("message 4", queue.get(timeout = 1).body) |
| self.assertEmpty(queue) |
| |
| def test_subscribe_not_acquired_2(self): |
| session = self.session |
| |
| #publish some messages |
| session.queue_declare(queue = "q", exclusive=True, auto_delete=True) |
| for i in range(1, 11): |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) |
| |
| #consume some of them |
| session.message_subscribe(queue = "q", destination = "a") |
| session.message_set_flow_mode(flow_mode = 0, destination = "a") |
| session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a") |
| session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") |
| |
| queue = session.incoming("a") |
| for i in range(1, 6): |
| msg = queue.get(timeout = 1) |
| self.assertEquals("message-%d" % (i), msg.body) |
| #complete and accept |
| session.message_accept(RangedSet(msg.id)) |
| #TODO: tidy up completion |
| session.receiver._completed.add(msg.id) |
| session.channel.session_completed(session.receiver._completed) |
| self.assertEmpty(queue) |
| |
| #now create a not-acquired subscriber |
| session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) |
| session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b") |
| |
| #check it gets those not consumed |
| queue = session.incoming("b") |
| session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") |
| for i in range(6, 11): |
| msg = queue.get(timeout = 1) |
| self.assertEquals("message-%d" % (i), msg.body) |
| session.message_release(RangedSet(msg.id)) |
| #TODO: tidy up completion |
| session.receiver._completed.add(msg.id) |
| session.channel.session_completed(session.receiver._completed) |
| session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") |
| self.assertEmpty(queue) |
| |
| #check all 'browsed' messages are still on the queue |
| self.assertEqual(5, session.queue_query(queue="q").message_count) |
| |
| def test_subscribe_not_acquired_3(self): |
| session = self.session |
| |
| #publish some messages |
| session.queue_declare(queue = "q", exclusive=True, auto_delete=True) |
| for i in range(1, 11): |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) |
| |
| #create a not-acquired subscriber |
| session.message_subscribe(queue = "q", destination = "a", acquire_mode=1) |
| session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") |
| session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") |
| |
| #browse through messages |
| queue = session.incoming("a") |
| for i in range(1, 11): |
| msg = queue.get(timeout = 1) |
| self.assertEquals("message-%d" % (i), msg.body) |
| if (i % 2): |
| #try to acquire every second message |
| response = session.message_acquire(RangedSet(msg.id)) |
| #check that acquire succeeds |
| self.assert_(msg.id in response.transfers) |
| session.message_accept(RangedSet(msg.id)) |
| else: |
| session.message_release(RangedSet(msg.id)) |
| session.receiver._completed.add(msg.id) |
| session.channel.session_completed(session.receiver._completed) |
| self.assertEmpty(queue) |
| |
| #create a second not-acquired subscriber |
| session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) |
| session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b") |
| session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") |
| #check it gets those not consumed |
| queue = session.incoming("b") |
| for i in [2,4,6,8,10]: |
| msg = queue.get(timeout = 1) |
| self.assertEquals("message-%d" % (i), msg.body) |
| session.message_release(RangedSet(msg.id)) |
| session.receiver._completed.add(msg.id) |
| session.channel.session_completed(session.receiver._completed) |
| session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") |
| self.assertEmpty(queue) |
| |
| #check all 'browsed' messages are still on the queue |
| self.assertEqual(5, session.queue_query(queue="q").message_count) |
| |
| def test_release_unacquired(self): |
| session = self.session |
| |
| #create queue |
| session.queue_declare(queue = "q", exclusive=True, auto_delete=True) |
| |
| #send message |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "my-message")) |
| |
| #create two 'browsers' |
| session.message_subscribe(queue = "q", destination = "a", acquire_mode=1) |
| session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") |
| session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") |
| queueA = session.incoming("a") |
| |
| session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) |
| session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b") |
| session.message_flow(unit = session.credit_unit.message, value = 10, destination = "b") |
| queueB = session.incoming("b") |
| |
| #have each browser release the message |
| msgA = queueA.get(timeout = 1) |
| session.message_release(RangedSet(msgA.id)) |
| |
| msgB = queueB.get(timeout = 1) |
| session.message_release(RangedSet(msgB.id)) |
| |
| #cancel browsers |
| session.message_cancel(destination = "a") |
| session.message_cancel(destination = "b") |
| |
| #create consumer |
| session.message_subscribe(queue = "q", destination = "c") |
| session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") |
| session.message_flow(unit = session.credit_unit.message, value = 10, destination = "c") |
| queueC = session.incoming("c") |
| #consume the message then ack it |
| msgC = queueC.get(timeout = 1) |
| session.message_accept(RangedSet(msgC.id)) |
| #ensure there are no other messages |
| self.assertEmpty(queueC) |
| |
| def test_release_order(self): |
| session = self.session |
| |
| #create queue |
| session.queue_declare(queue = "q", exclusive=True, auto_delete=True) |
| |
| #send messages |
| for i in range(1, 11): |
| session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) |
| |
| #subscribe: |
| session.message_subscribe(queue="q", destination="a") |
| a = session.incoming("a") |
| session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") |
| session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") |
| |
| for i in range(1, 11): |
| msg = a.get(timeout = 1) |
| self.assertEquals("message-%d" % (i), msg.body) |
| if (i % 2): |
| #accept all odd messages |
| session.message_accept(RangedSet(msg.id)) |
| else: |
| #release all even messages |
| session.message_release(RangedSet(msg.id)) |
| |
| #browse: |
| session.message_subscribe(queue="q", destination="b", acquire_mode=1) |
| b = session.incoming("b") |
| b.start() |
| for i in [2, 4, 6, 8, 10]: |
| msg = b.get(timeout = 1) |
| self.assertEquals("message-%d" % (i), msg.body) |
| |
| |
| def test_empty_body(self): |
| session = self.session |
| session.queue_declare(queue="xyz", exclusive=True, auto_delete=True) |
| props = session.delivery_properties(routing_key="xyz") |
| session.message_transfer(message=Message(props, "")) |
| |
| consumer_tag = "tag1" |
| session.message_subscribe(queue="xyz", destination=consumer_tag) |
| session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = consumer_tag) |
| session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag) |
| queue = session.incoming(consumer_tag) |
| msg = queue.get(timeout=1) |
| self.assertEquals("", msg.body) |
| session.message_accept(RangedSet(msg.id)) |
| |
| def test_incoming_start(self): |
| q = "test_incoming_start" |
| session = self.session |
| |
| session.queue_declare(queue=q, exclusive=True, auto_delete=True) |
| session.message_subscribe(queue=q, destination="msgs") |
| messages = session.incoming("msgs") |
| assert messages.destination == "msgs" |
| |
| dp = session.delivery_properties(routing_key=q) |
| session.message_transfer(message=Message(dp, "test")) |
| |
| messages.start() |
| msg = messages.get() |
| assert msg.body == "test" |
| |
| def test_ttl(self): |
| q = "test_ttl" |
| session = self.session |
| |
| session.queue_declare(queue=q, exclusive=True, auto_delete=True) |
| |
| dp = session.delivery_properties(routing_key=q, ttl=500)#expire in half a second |
| session.message_transfer(message=Message(dp, "first")) |
| |
| dp = session.delivery_properties(routing_key=q, ttl=300000)#expire in fives minutes |
| session.message_transfer(message=Message(dp, "second")) |
| |
| d = "msgs" |
| session.message_subscribe(queue=q, destination=d) |
| messages = session.incoming(d) |
| sleep(1) |
| session.message_flow(unit = session.credit_unit.message, value=2, destination=d) |
| session.message_flow(unit = session.credit_unit.byte, value=0xFFFFFFFFL, destination=d) |
| assert messages.get(timeout=1).body == "second" |
| self.assertEmpty(messages) |
| |
| def assertDataEquals(self, session, msg, expected): |
| self.assertEquals(expected, msg.body) |
| |
| def assertEmpty(self, queue): |
| try: |
| extra = queue.get(timeout=1) |
| self.fail("Queue not empty, contains: " + extra.body) |
| except Empty: None |
| |
| class SizelessContent(Content): |
| |
| def size(self): |
| return None |