| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| """ |
| Tests for exchange behaviour. |
| |
| Test classes ending in 'RuleTests' are derived from rules in amqp.xml. |
| """ |
| |
| import Queue, logging, traceback |
| from qpid.testlib import TestBase010 |
| from qpid.datatypes import Message |
| from qpid.client import Closed |
| from qpid.session import SessionException |
| |
| |
| class TestHelper(TestBase010): |
| def setUp(self): |
| TestBase010.setUp(self) |
| self.queues = [] |
| self.exchanges = [] |
| self.subscriptions = [] |
| |
| def tearDown(self): |
| try: |
| for s in self.subscriptions: |
| self.session.message_cancel(destination=s) |
| for ssn, q in self.queues: |
| ssn.queue_delete(queue=q) |
| for ssn, ex in self.exchanges: |
| ssn.exchange_delete(exchange=ex) |
| except: |
| print "Error on tearDown:" |
| print traceback.print_exc() |
| TestBase010.tearDown(self) |
| |
| def createMessage(self, key="", body=""): |
| return Message(self.session.delivery_properties(routing_key=key), body) |
| |
| def getApplicationHeaders(self, msg): |
| for h in msg.headers: |
| if hasattr(h, 'application_headers'): return getattr(h, 'application_headers') |
| return None |
| |
| def assertPublishGet(self, queue, exchange="", routing_key="", properties=None): |
| """ |
| Publish to exchange and assert queue.get() returns the same message. |
| """ |
| body = self.uniqueString() |
| dp=self.session.delivery_properties(routing_key=routing_key) |
| mp=self.session.message_properties(application_headers=properties) |
| self.session.message_transfer(destination=exchange, message=Message(dp, mp, body)) |
| msg = queue.get(timeout=1) |
| self.assertEqual(body, msg.body) |
| if (properties): |
| self.assertEqual(properties, self.getApplicationHeaders(msg)) |
| |
| def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): |
| """ |
| Publish a message and consume it, assert it comes back intact. |
| Return the Queue object used to consume. |
| """ |
| self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) |
| |
| def assertEmpty(self, queue): |
| """Assert that the queue is empty""" |
| try: |
| queue.get(timeout=1) |
| self.fail("Queue is not empty.") |
| except Queue.Empty: None # Ignore |
| |
| def queue_declare(self, session=None, *args, **keys): |
| session = session or self.session |
| reply = session.queue_declare(*args, **keys) |
| self.queues.append((session, keys["queue"])) |
| return reply |
| |
| def exchange_declare(self, session=None, ticket=0, exchange='', |
| type='', passive=False, durable=False, |
| auto_delete=False, |
| arguments={}): |
| session = session or self.session |
| reply = session.exchange_declare(exchange=exchange, type=type, passive=passive, durable=durable, auto_delete=auto_delete, arguments=arguments) |
| if exchange and not exchange.startswith("amq."): |
| self.exchanges.append((session,exchange)) |
| return reply |
| |
| def uniqueString(self): |
| """Generate a unique string, unique for this TestBase instance""" |
| if not "uniqueCounter" in dir(self): self.uniqueCounter = 1; |
| return "Test Message " + str(self.uniqueCounter) |
| |
| def consume(self, queueName): |
| """Consume from named queue returns the Queue object.""" |
| if not "uniqueTag" in dir(self): self.uniqueTag = 1 |
| else: self.uniqueTag += 1 |
| consumer_tag = "tag" + str(self.uniqueTag) |
| self.session.message_subscribe(queue=queueName, destination=consumer_tag) |
| self.session.message_flow(destination=consumer_tag, unit=self.session.credit_unit.message, value=0xFFFFFFFFL) |
| self.session.message_flow(destination=consumer_tag, unit=self.session.credit_unit.byte, value=0xFFFFFFFFL) |
| self.subscriptions.append(consumer_tag) |
| return self.session.incoming(consumer_tag) |
| |
| |
| class StandardExchangeVerifier: |
| """Verifies standard exchange behavior. |
| |
| Used as base class for classes that test standard exchanges.""" |
| |
| def verifyDirectExchange(self, ex, unbind=False): |
| """Verify that ex behaves like a direct exchange.""" |
| self.queue_declare(queue="q") |
| self.session.exchange_bind(queue="q", exchange=ex, binding_key="k") |
| try: |
| self.assertPublishConsume(exchange=ex, queue="q", routing_key="k") |
| try: |
| self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk") |
| self.fail("Expected Empty exception") |
| except Queue.Empty: None # Expected |
| finally: |
| if unbind: |
| self.session.exchange_unbind(queue="q", exchange=ex, binding_key="k") |
| |
| def verifyFanOutExchange(self, ex, unbind=False): |
| """Verify that ex behaves like a fanout exchange.""" |
| self.queue_declare(queue="q") |
| self.session.exchange_bind(queue="q", exchange=ex) |
| self.queue_declare(queue="p") |
| self.session.exchange_bind(queue="p", exchange=ex) |
| try: |
| for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex) |
| finally: |
| if unbind: |
| self.session.exchange_unbind(queue="q", exchange=ex, binding_key="") |
| self.session.exchange_unbind(queue="p", exchange=ex, binding_key="") |
| |
| |
| def verifyTopicExchange(self, ex, unbind=False): |
| """Verify that ex behaves like a topic exchange""" |
| self.queue_declare(queue="a") |
| self.session.exchange_bind(queue="a", exchange=ex, binding_key="a.#.b.*") |
| try: |
| q = self.consume("a") |
| self.assertPublishGet(q, ex, "a.b.x") |
| self.assertPublishGet(q, ex, "a.x.b.x") |
| self.assertPublishGet(q, ex, "a.x.x.b.x") |
| # Shouldn't match |
| self.session.message_transfer(destination=ex, message=self.createMessage("a.b")) |
| self.session.message_transfer(destination=ex, message=self.createMessage("a.b.x.y")) |
| self.session.message_transfer(destination=ex, message=self.createMessage("x.a.b.x")) |
| self.session.message_transfer(destination=ex, message=self.createMessage("a.b")) |
| self.assert_(q.empty()) |
| finally: |
| if unbind: |
| self.session.exchange_unbind(queue="a", exchange=ex, binding_key="a.#.b.*") |
| |
| def verifyHeadersExchange(self, ex, unbind=False): |
| """Verify that ex is a headers exchange""" |
| self.queue_declare(queue="q") |
| self.session.exchange_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} ) |
| try: |
| q = self.consume("q") |
| headers = {"name":"fred", "age":3} |
| self.assertPublishGet(q, exchange=ex, properties=headers) |
| self.session.message_transfer(destination=ex) # No headers, won't deliver |
| self.assertEmpty(q); |
| finally: |
| if unbind: |
| self.session.exchange_unbind(queue="q", exchange=ex, binding_key="") |
| |
| |
| class RecommendedTypesRuleTests(TestHelper, StandardExchangeVerifier): |
| """ |
| The server SHOULD implement these standard exchange types: topic, headers. |
| |
| Client attempts to declare an exchange with each of these standard types. |
| """ |
| |
| def testDirect(self): |
| """Declare and test a direct exchange""" |
| self.exchange_declare(0, exchange="d", type="direct") |
| self.verifyDirectExchange("d") |
| |
| def testFanout(self): |
| """Declare and test a fanout exchange""" |
| self.exchange_declare(0, exchange="f", type="fanout") |
| self.verifyFanOutExchange("f") |
| |
| def testTopic(self): |
| """Declare and test a topic exchange""" |
| self.exchange_declare(0, exchange="t", type="topic") |
| self.verifyTopicExchange("t") |
| |
| def testHeaders(self): |
| """Declare and test a headers exchange""" |
| self.exchange_declare(0, exchange="h", type="headers") |
| self.verifyHeadersExchange("h") |
| |
| |
| class RequiredInstancesRuleTests(TestHelper, StandardExchangeVerifier): |
| """ |
| The server MUST, in each virtual host, pre-declare an exchange instance |
| for each standard exchange type that it implements, where the name of the |
| exchange instance is amq. followed by the exchange type name. |
| |
| Client creates a temporary queue and attempts to bind to each required |
| exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if |
| those types are defined). |
| """ |
| def testAmqDirect(self): self.verifyDirectExchange("amq.direct") |
| |
| def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout") |
| |
| def testAmqTopic(self): self.verifyTopicExchange("amq.topic") |
| |
| def testAmqMatch(self): self.verifyHeadersExchange("amq.match") |
| |
| class DefaultExchangeRuleTests(TestHelper, StandardExchangeVerifier): |
| """ |
| The server MUST predeclare a direct exchange to act as the default exchange |
| for content Publish methods and for default queue bindings. |
| |
| Client checks that the default exchange is active by specifying a queue |
| binding with no exchange name, and publishing a message with a suitable |
| routing key but without specifying the exchange name, then ensuring that |
| the message arrives in the queue correctly. |
| """ |
| def testDefaultExchange(self): |
| # Test automatic binding by queue name. |
| self.queue_declare(queue="d") |
| self.assertPublishConsume(queue="d", routing_key="d") |
| |
| |
| # TODO aconway 2006-09-27: Fill in empty tests: |
| |
| class DefaultAccessRuleTests(TestHelper): |
| """ |
| The server MUST NOT allow clients to access the default exchange except |
| by specifying an empty exchange name in the Queue.Bind and content Publish |
| methods. |
| """ |
| |
| class ExtensionsRuleTests(TestHelper): |
| """ |
| The server MAY implement other exchange types as wanted. |
| """ |
| |
| |
| class DeclareMethodMinimumRuleTests(TestHelper): |
| """ |
| The server SHOULD support a minimum of 16 exchanges per virtual host and |
| ideally, impose no limit except as defined by available resources. |
| |
| The client creates as many exchanges as it can until the server reports |
| an error; the number of exchanges successfuly created must be at least |
| sixteen. |
| """ |
| |
| |
| class DeclareMethodTicketFieldValidityRuleTests(TestHelper): |
| """ |
| The client MUST provide a valid access ticket giving "active" access to |
| the realm in which the exchange exists or will be created, or "passive" |
| access if the if-exists flag is set. |
| |
| Client creates access ticket with wrong access rights and attempts to use |
| in this method. |
| """ |
| |
| |
| class DeclareMethodExchangeFieldReservedRuleTests(TestHelper): |
| """ |
| Exchange names starting with "amq." are reserved for predeclared and |
| standardised exchanges. The client MUST NOT attempt to create an exchange |
| starting with "amq.". |
| |
| Similarly, exchanges starting with "qpid." are reserved for Qpid |
| implementation-specific system exchanges (such as the management exchange). |
| The client must not attempt to create an exchange starting with the string |
| "qpid.". |
| """ |
| def template(self, reservedString, exchangeType): |
| try: |
| self.session.exchange_declare(exchange=reservedString, type=exchangeType) |
| self.fail("Expected not allowed error (530) for exchanges starting with \"" + reservedString + "\".") |
| except SessionException, e: |
| self.assertEquals(e.args[0].error_code, 530) |
| # connection closed, reopen it |
| self.tearDown() |
| self.setUp() |
| try: |
| self.session.exchange_declare(exchange=reservedString + "abc123", type=exchangeType) |
| self.fail("Expected not allowed error (530) for exchanges starting with \"" + reservedString + "\".") |
| except SessionException, e: |
| self.assertEquals(e.args[0].error_code, 530) |
| # connection closed, reopen it |
| self.tearDown() |
| self.setUp() |
| # The following should be legal: |
| self.session.exchange_declare(exchange=reservedString[:-1], type=exchangeType) |
| self.session.exchange_delete(exchange=reservedString[:-1]) |
| self.session.exchange_declare(exchange=reservedString[1:], type=exchangeType) |
| self.session.exchange_delete(exchange=reservedString[1:]) |
| self.session.exchange_declare(exchange="." + reservedString, type=exchangeType) |
| self.session.exchange_delete(exchange="." + reservedString) |
| self.session.exchange_declare(exchange="abc." + reservedString, type=exchangeType) |
| self.session.exchange_delete(exchange="abc." + reservedString) |
| self.session.exchange_declare(exchange="abc." + reservedString + "def", type=exchangeType) |
| self.session.exchange_delete(exchange="abc." + reservedString + "def") |
| |
| def test_amq(self): |
| self.template("amq.", "direct") |
| self.template("amq.", "topic") |
| self.template("amq.", "fanout") |
| |
| def test_qpid(self): |
| self.template("qpid.", "direct") |
| self.template("qpid.", "topic") |
| self.template("qpid.", "fanout") |
| |
| |
| class DeclareMethodTypeFieldTypedRuleTests(TestHelper): |
| """ |
| Exchanges cannot be redeclared with different types. The client MUST not |
| attempt to redeclare an existing exchange with a different type than used |
| in the original Exchange.Declare method. |
| |
| |
| """ |
| |
| |
| class DeclareMethodTypeFieldSupportRuleTests(TestHelper): |
| """ |
| The client MUST NOT attempt to create an exchange with a type that the |
| server does not support. |
| |
| |
| """ |
| |
| |
| class DeclareMethodPassiveFieldNotFoundRuleTests(TestHelper): |
| """ |
| If set, and the exchange does not already exist, the server MUST raise a |
| channel exception with reply code 404 (not found). |
| """ |
| def test(self): |
| try: |
| self.session.exchange_declare(exchange="humpty_dumpty", passive=True) |
| self.fail("Expected 404 for passive declaration of unknown exchange.") |
| except SessionException, e: |
| self.assertEquals(404, e.args[0].error_code) |
| |
| |
| class DeclareMethodDurableFieldSupportRuleTests(TestHelper): |
| """ |
| The server MUST support both durable and transient exchanges. |
| |
| |
| """ |
| |
| |
| class DeclareMethodDurableFieldStickyRuleTests(TestHelper): |
| """ |
| The server MUST ignore the durable field if the exchange already exists. |
| |
| |
| """ |
| |
| |
| class DeclareMethodAutoDeleteFieldStickyRuleTests(TestHelper): |
| """ |
| The server MUST ignore the auto-delete field if the exchange already |
| exists. |
| |
| |
| """ |
| |
| |
| class DeleteMethodTicketFieldValidityRuleTests(TestHelper): |
| """ |
| The client MUST provide a valid access ticket giving "active" access |
| rights to the exchange's access realm. |
| |
| Client creates access ticket with wrong access rights and attempts to use |
| in this method. |
| """ |
| |
| |
| class DeleteMethodExchangeFieldExistsRuleTests(TestHelper): |
| """ |
| The client MUST NOT attempt to delete an exchange that does not exist. |
| """ |
| |
| |
| class HeadersExchangeTests(TestHelper): |
| """ |
| Tests for headers exchange functionality. |
| """ |
| def setUp(self): |
| TestHelper.setUp(self) |
| self.queue_declare(queue="q") |
| self.q = self.consume("q") |
| |
| def myAssertPublishGet(self, headers): |
| self.assertPublishGet(self.q, exchange="amq.match", properties=headers) |
| |
| def myBasicPublish(self, headers): |
| mp=self.session.message_properties(application_headers=headers) |
| self.session.message_transfer(destination="amq.match", message=Message(mp, "foobar")) |
| |
| def testMatchAll(self): |
| self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) |
| self.myAssertPublishGet({"name":"fred", "age":3}) |
| self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"}) |
| |
| # None of these should match |
| self.myBasicPublish({}) |
| self.myBasicPublish({"name":"barney"}) |
| self.myBasicPublish({"name":10}) |
| self.myBasicPublish({"name":"fred", "age":2}) |
| self.assertEmpty(self.q) |
| |
| def testMatchAny(self): |
| self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3}) |
| self.myAssertPublishGet({"name":"fred"}) |
| self.myAssertPublishGet({"name":"fred", "ignoreme":10}) |
| self.myAssertPublishGet({"ignoreme":10, "age":3}) |
| |
| # Wont match |
| self.myBasicPublish({}) |
| self.myBasicPublish({"irrelevant":0}) |
| self.assertEmpty(self.q) |
| |
| def testMatchVoidValue(self): |
| self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":None}) |
| self.myAssertPublishGet({"name":"fred"}) |
| self.myAssertPublishGet({"name":"bob"}) |
| |
| # Wont match |
| self.myBasicPublish({}) |
| self.myBasicPublish({"irrelevant":0}) |
| self.assertEmpty(self.q) |
| |
| def testMultipleBindings(self): |
| self.session.exchange_bind(queue="q", exchange="amq.match", binding_key="SomeKey", arguments={ 'x-match':'any', "name":"fred"}) |
| self.session.exchange_bind(queue="q", exchange="amq.match", binding_key="AnotherKey", arguments={ 'x-match':'all', "age":3}) |
| self.myAssertPublishGet({"name":"fred", "age":3}) |
| self.assertEmpty(self.q) |
| |
| |
| class MiscellaneousErrorsTests(TestHelper): |
| """ |
| Test some miscellaneous error conditions |
| """ |
| def testTypeNotKnown(self): |
| try: |
| self.session.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type") |
| self.fail("Expected 404 for declaration of unknown exchange type.") |
| except SessionException, e: |
| self.assertEquals(404, e.args[0].error_code) |
| |
| def testDifferentDeclaredType(self): |
| self.exchange_declare(exchange="test_different_declared_type_exchange", type="direct") |
| try: |
| session = self.conn.session("alternate", 2) |
| session.exchange_declare(exchange="test_different_declared_type_exchange", type="topic") |
| self.fail("Expected 530 for redeclaration of exchange with different type.") |
| except SessionException, e: |
| self.assertEquals(530, e.args[0].error_code) |
| |
| def testReservedExchangeRedeclaredSameType(self): |
| self.exchange_declare(exchange="amq.direct", type="direct", passive=True) |
| |
| def testReservedExchangeRedeclaredDifferentType(self): |
| try: |
| self.exchange_declare(exchange="amq.direct", type="topic") |
| self.fail("Expected 530 for redeclaration of exchange with different type.") |
| except SessionException, e: |
| self.assertEquals(530, e.args[0].error_code) |
| |
| def testDefaultAccessBind(self): |
| try: |
| self.session.queue_declare(queue="my-queue", auto_delete=True, exclusive=True) |
| self.session.exchange_bind(exchange="", queue="my-queue", binding_key="another-key") |
| self.fail("Expected 542 (invalid-argument) code for bind to default exchange.") |
| except SessionException, e: |
| self.assertEquals(542, e.args[0].error_code) |
| |
| def testDefaultAccessUnbind(self): |
| try: |
| self.session.queue_declare(queue="my-queue", auto_delete=True, exclusive=True) |
| self.session.exchange_unbind(exchange="", queue="my-queue", binding_key="my-queue") |
| self.fail("Expected 542 (invalid-argument) code for unbind from default exchange.") |
| except SessionException, e: |
| self.assertEquals(542, e.args[0].error_code) |
| |
| def testDefaultAccessDelete(self): |
| try: |
| self.session.exchange_delete(exchange="") |
| self.fail("Expected 542 (invalid-argument) code for delete of default exchange.") |
| except SessionException, e: |
| self.assertEquals(542, e.args[0].error_code) |
| |
| class ExchangeTests(TestHelper): |
| def testHeadersBindNoMatchArg(self): |
| self.session.queue_declare(queue="q", exclusive=True, auto_delete=True) |
| try: |
| self.session.exchange_bind(queue="q", exchange="amq.match", arguments={"name":"fred" , "age":3} ) |
| self.fail("Expected failure for missing x-match arg.") |
| except SessionException, e: |
| self.assertEquals(541, e.args[0].error_code) |
| |
| class AutodeleteTests(TestHelper, StandardExchangeVerifier): |
| def checkNotExists(self, e): |
| try: |
| s = self.conn.session("verifier") |
| s.exchange_declare(exchange=e, passive=True) |
| s.exchange_delete(exchange=e) |
| self.fail("Expected failure for passive declare of %s" % e) |
| except SessionException, e: |
| self.assertEquals(404, e.args[0].error_code) |
| |
| |
| def testAutodeleteFanout(self): |
| self.session.exchange_declare(exchange="e", type="fanout", auto_delete=True) |
| self.verifyFanOutExchange("e", unbind=True) |
| self.checkNotExists("e"); |
| |
| def testAutodeleteDirect(self): |
| self.session.exchange_declare(exchange="e", type="direct", auto_delete=True) |
| self.verifyDirectExchange("e", unbind=True) |
| self.checkNotExists("e"); |
| |
| def testAutodeleteTopic(self): |
| self.session.exchange_declare(exchange="e", type="topic", auto_delete=True) |
| self.verifyTopicExchange("e", unbind=True) |
| self.checkNotExists("e"); |
| |
| def testAutodeleteHeaders(self): |
| self.session.exchange_declare(exchange="e", type="headers", auto_delete=True) |
| self.verifyHeadersExchange("e", unbind=True) |
| self.checkNotExists("e"); |