| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import unittest |
| from proton import Message, PENDING, ACCEPTED, REJECTED |
| from system_test import TestCase, Qdrouterd, main_module |
| from proton.handlers import MessagingHandler |
| from proton.reactor import Container, AtMostOnce, AtLeastOnce |
| |
| # PROTON-828: |
| try: |
| from proton import MODIFIED |
| except ImportError: |
| from proton import PN_STATUS_MODIFIED as MODIFIED |
| |
| |
| class RouterTest(TestCase): |
| """System tests involving a single router""" |
| |
| @classmethod |
| def setUpClass(cls): |
| """Start a router and a messenger""" |
| super(RouterTest, cls).setUpClass() |
| name = "test-router" |
| config = Qdrouterd.Config([ |
| ('container', {'workerThreads': 4, 'containerName': 'Qpid.Dispatch.Router.A'}), |
| |
| ('router', {'mode': 'standalone', 'routerId': 'QDR'}), |
| |
| # Setting the stripAnnotations to 'no' so that the existing tests will work. |
| # Setting stripAnnotations to no will not strip the annotations and any tests that were already in this file |
| # that were expecting the annotations to not be stripped will continue working. |
| ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}), |
| |
| # The following listeners were exclusively added to test the stripAnnotations attribute in qdrouterd.conf file |
| # Different listeners will be used to test all allowed values of stripAnnotations ('no', 'both', 'out', 'in') |
| ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}), |
| ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'both'}), |
| ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'out'}), |
| ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'in'}), |
| |
| ('address', {'prefix': 'closest', 'distribution': 'closest'}), |
| ('address', {'prefix': 'spread', 'distribution': 'balanced'}), |
| ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), |
| ]) |
| cls.router = cls.tester.qdrouterd(name, config) |
| cls.router.wait_ready() |
| cls.address = cls.router.addresses[0] |
| |
| def test_01_pre_settled(self): |
| addr = self.address+"/pre_settled/1" |
| M1 = self.messenger() |
| M2 = self.messenger() |
| |
| M1.start() |
| M2.start() |
| M2.subscribe(addr) |
| |
| tm = Message() |
| rm = Message() |
| |
| tm.address = addr |
| for i in range(100): |
| tm.body = {'number': i} |
| M1.put(tm) |
| M1.send() |
| |
| for i in range(100): |
| M2.recv(1) |
| M2.get(rm) |
| self.assertEqual(i, rm.body['number']) |
| |
| M1.stop() |
| M2.stop() |
| |
| def test_02a_multicast_unsettled(self): |
| addr = self.address+"/multicast.unsettled.1" |
| M1 = self.messenger() |
| M2 = self.messenger() |
| M3 = self.messenger() |
| M4 = self.messenger() |
| |
| |
| M1.outgoing_window = 5 |
| M2.incoming_window = 5 |
| M3.incoming_window = 5 |
| M4.incoming_window = 5 |
| |
| M1.start() |
| M2.start() |
| M3.start() |
| M4.start() |
| |
| M2.subscribe(addr) |
| M3.subscribe(addr) |
| M4.subscribe(addr) |
| |
| tm = Message() |
| rm = Message() |
| |
| tm.address = addr |
| for i in range(2): |
| tm.body = {'number': i} |
| M1.put(tm) |
| M1.send(0) |
| |
| for i in range(2): |
| M2.recv(1) |
| trk = M2.get(rm) |
| M2.accept(trk) |
| M2.settle(trk) |
| self.assertEqual(i, rm.body['number']) |
| |
| M3.recv(1) |
| trk = M3.get(rm) |
| M3.accept(trk) |
| M3.settle(trk) |
| self.assertEqual(i, rm.body['number']) |
| |
| M4.recv(1) |
| trk = M4.get(rm) |
| M4.accept(trk) |
| M4.settle(trk) |
| self.assertEqual(i, rm.body['number']) |
| |
| M1.stop() |
| M2.stop() |
| M3.stop() |
| M4.stop() |
| |
| |
| def test_02b_disp_to_closed_connection(self): |
| addr = self.address+"/pre_settled/2" |
| M1 = self.messenger() |
| M2 = self.messenger() |
| |
| |
| M1.outgoing_window = 5 |
| M2.incoming_window = 5 |
| |
| M1.start() |
| M2.start() |
| M2.subscribe(addr) |
| |
| tm = Message() |
| rm = Message() |
| |
| tm.address = addr |
| for i in range(2): |
| tm.body = {'number': i} |
| M1.put(tm) |
| M1.send(0) |
| M1.stop() |
| |
| for i in range(2): |
| M2.recv(1) |
| trk = M2.get(rm) |
| M2.accept(trk) |
| M2.settle(trk) |
| self.assertEqual(i, rm.body['number']) |
| |
| M2.stop() |
| |
| |
| def test_02c_sender_settles_first(self): |
| addr = self.address+"/settled/senderfirst/1" |
| M1 = self.messenger() |
| M2 = self.messenger() |
| |
| |
| M1.outgoing_window = 5 |
| M2.incoming_window = 5 |
| |
| M1.start() |
| M2.start() |
| M2.subscribe(addr) |
| |
| tm = Message() |
| rm = Message() |
| |
| tm.address = addr |
| tm.body = {'number': 0} |
| ttrk = M1.put(tm) |
| M1.send(0) |
| |
| M1.settle(ttrk) |
| M1.flush() |
| M2.flush() |
| |
| M2.recv(1) |
| rtrk = M2.get(rm) |
| M2.accept(rtrk) |
| M2.settle(rtrk) |
| self.assertEqual(0, rm.body['number']) |
| |
| M1.flush() |
| M2.flush() |
| |
| M1.stop() |
| M2.stop() |
| |
| |
| def test_03_propagated_disposition(self): |
| addr = self.address+"/unsettled/1" |
| M1 = self.messenger() |
| M2 = self.messenger() |
| |
| M1.outgoing_window = 5 |
| M2.incoming_window = 5 |
| |
| M1.start() |
| M2.start() |
| M2.subscribe(addr) |
| |
| tm = Message() |
| rm = Message() |
| |
| tm.address = addr |
| tm.body = {'number': 0} |
| |
| ## |
| ## Test ACCEPT |
| ## |
| tx_tracker = M1.put(tm) |
| M1.send(0) |
| M2.recv(1) |
| rx_tracker = M2.get(rm) |
| self.assertEqual(0, rm.body['number']) |
| self.assertEqual(PENDING, M1.status(tx_tracker)) |
| |
| M2.accept(rx_tracker) |
| M2.settle(rx_tracker) |
| |
| M2.flush() |
| M1.flush() |
| |
| self.assertEqual(ACCEPTED, M1.status(tx_tracker)) |
| |
| ## |
| ## Test REJECT |
| ## |
| tx_tracker = M1.put(tm) |
| M1.send(0) |
| M2.recv(1) |
| rx_tracker = M2.get(rm) |
| self.assertEqual(0, rm.body['number']) |
| self.assertEqual(PENDING, M1.status(tx_tracker)) |
| |
| M2.reject(rx_tracker) |
| M2.settle(rx_tracker) |
| |
| M2.flush() |
| M1.flush() |
| |
| self.assertEqual(REJECTED, M1.status(tx_tracker)) |
| |
| M1.stop() |
| M2.stop() |
| |
| |
| def test_04_unsettled_undeliverable(self): |
| addr = self.address+"/unsettled_undeliverable/1" |
| M1 = self.messenger() |
| |
| M1.outgoing_window = 5 |
| |
| M1.start() |
| M1.timeout = 1 |
| tm = Message() |
| tm.address = addr |
| tm.body = {'number': 200} |
| |
| exception = False |
| try: |
| M1.put(tm) |
| M1.send(0) |
| M1.flush() |
| except Exception: |
| exception = True |
| |
| self.assertEqual(exception, True) |
| |
| M1.stop() |
| |
| |
| def test_05_three_ack(self): |
| addr = self.address+"/three_ack/1" |
| M1 = self.messenger() |
| M2 = self.messenger() |
| |
| M1.outgoing_window = 5 |
| M2.incoming_window = 5 |
| |
| M1.start() |
| M2.start() |
| M2.subscribe(addr) |
| |
| tm = Message() |
| rm = Message() |
| |
| tm.address = addr |
| tm.body = {'number': 200} |
| |
| tx_tracker = M1.put(tm) |
| M1.send(0) |
| M2.recv(1) |
| rx_tracker = M2.get(rm) |
| self.assertEqual(200, rm.body['number']) |
| self.assertEqual(PENDING, M1.status(tx_tracker)) |
| |
| M2.accept(rx_tracker) |
| |
| M2.flush() |
| M1.flush() |
| |
| self.assertEqual(ACCEPTED, M1.status(tx_tracker)) |
| |
| M1.settle(tx_tracker) |
| |
| M1.flush() |
| M2.flush() |
| |
| ## |
| ## We need a way to verify on M2 (receiver) that the tracker has been |
| ## settled on the M1 (sender). [ See PROTON-395 ] |
| ## |
| |
| M2.settle(rx_tracker) |
| |
| M2.flush() |
| M1.flush() |
| |
| M1.stop() |
| M2.stop() |
| |
| |
| # def test_06_link_route_sender(self): |
| # pass |
| |
| # def test_07_link_route_receiver(self): |
| # pass |
| |
| |
| def test_08_message_annotations(self): |
| addr = self.address+"/ma/1" |
| M1 = self.messenger() |
| M2 = self.messenger() |
| |
| |
| M1.start() |
| M2.start() |
| M2.subscribe(addr) |
| |
| tm = Message() |
| rm = Message() |
| |
| tm.address = addr |
| |
| |
| ## |
| ## No inbound delivery annotations |
| ## |
| for i in range(10): |
| tm.body = {'number': i} |
| M1.put(tm) |
| M1.send() |
| |
| for i in range(10): |
| M2.recv(1) |
| M2.get(rm) |
| self.assertEqual(i, rm.body['number']) |
| ma = rm.annotations |
| self.assertEqual(ma.__class__, dict) |
| self.assertEqual(ma['x-opt-qd.ingress'], '0/QDR') |
| self.assertEqual(ma['x-opt-qd.trace'], ['0/QDR']) |
| |
| ## |
| ## Pre-existing ingress |
| ## |
| tm.annotations = {'x-opt-qd.ingress': 'ingress-router'} |
| for i in range(10): |
| tm.body = {'number': i} |
| M1.put(tm) |
| M1.send() |
| |
| for i in range(10): |
| M2.recv(1) |
| M2.get(rm) |
| self.assertEqual(i, rm.body['number']) |
| ma = rm.annotations |
| self.assertEqual(ma.__class__, dict) |
| self.assertEqual(ma['x-opt-qd.ingress'], 'ingress-router') |
| self.assertEqual(ma['x-opt-qd.trace'], ['0/QDR']) |
| |
| ## |
| ## Invalid trace type |
| ## |
| tm.annotations = {'x-opt-qd.trace' : 45} |
| for i in range(10): |
| tm.body = {'number': i} |
| M1.put(tm) |
| M1.send() |
| |
| for i in range(10): |
| M2.recv(1) |
| M2.get(rm) |
| self.assertEqual(i, rm.body['number']) |
| ma = rm.annotations |
| self.assertEqual(ma.__class__, dict) |
| self.assertEqual(ma['x-opt-qd.ingress'], '0/QDR') |
| self.assertEqual(ma['x-opt-qd.trace'], ['0/QDR']) |
| |
| ## |
| ## Empty trace |
| ## |
| tm.annotations = {'x-opt-qd.trace' : []} |
| for i in range(10): |
| tm.body = {'number': i} |
| M1.put(tm) |
| M1.send() |
| |
| for i in range(10): |
| M2.recv(1) |
| M2.get(rm) |
| self.assertEqual(i, rm.body['number']) |
| ma = rm.annotations |
| self.assertEqual(ma.__class__, dict) |
| self.assertEqual(ma['x-opt-qd.ingress'], '0/QDR') |
| self.assertEqual(ma['x-opt-qd.trace'], ['0/QDR']) |
| |
| ## |
| ## Non-empty trace |
| ## |
| tm.annotations = {'x-opt-qd.trace' : ['0/first.hop']} |
| for i in range(10): |
| tm.body = {'number': i} |
| M1.put(tm) |
| M1.send() |
| |
| for i in range(10): |
| M2.recv(1) |
| M2.get(rm) |
| self.assertEqual(i, rm.body['number']) |
| ma = rm.annotations |
| self.assertEqual(ma.__class__, dict) |
| self.assertEqual(ma['x-opt-qd.ingress'], '0/QDR') |
| self.assertEqual(ma['x-opt-qd.trace'], ['0/first.hop', '0/QDR']) |
| |
| M1.stop() |
| M2.stop() |
| |
| # Tests stripping of ingress and egress annotations. |
| # There is a property in qdrouter.json called stripAnnotations with possible values of ["in", "out", "both", "no"] |
| # The default for stripAnnotations is "both" (which means strip annotations on both ingress and egress) |
| # This test will test the stripAnnotations = no option - meaning no annotations must be stripped. |
| # We will send in a custom annotation and make that we get back 3 annotations on the received message |
| # Skipping this test temporarily |
| def notest_08a_test_strip_message_annotations_no_custom_not_implemented(self): |
| addr = self.router.addresses[1]+"/strip_message_annotations_no_custom/1" |
| |
| M1 = self.messenger() |
| M2 = self.messenger() |
| |
| M1.start() |
| M2.start() |
| M2.subscribe(addr) |
| |
| ingress_message = Message() |
| ingress_message.address = addr |
| ingress_message.body = {'message': 'Hello World!'} |
| ingress_message_annotations = {} |
| ingress_message_annotations['custom-annotation'] = '1/Custom_Annotation' |
| |
| |
| ingress_message.annotations = ingress_message_annotations |
| |
| M1.put(ingress_message) |
| M1.send() |
| |
| # Receive the message |
| M2.recv(1) |
| egress_message = Message() |
| M2.get(egress_message) |
| |
| #Make sure 'Hello World!' is in the message body dict |
| self.assertEqual('Hello World!', egress_message.body['message']) |
| |
| |
| egress_message_annotations = egress_message.annotations |
| |
| self.assertEqual(egress_message_annotations.__class__, dict) |
| self.assertEqual(egress_message_annotations['custom-annotation'], '1/Custom_Annotation') |
| self.assertEqual(egress_message_annotations['x-opt-qd.ingress'], '0/QDR') |
| self.assertEqual(egress_message_annotations['x-opt-qd.trace'], ['0/QDR']) |
| |
| M1.stop() |
| M2.stop() |
| |
| #stripAnnotations property is set to "no" |
| def test_08a_test_strip_message_annotations_no(self): |
| addr = self.router.addresses[1]+"/strip_message_annotations_no/1" |
| |
| M1 = self.messenger() |
| M2 = self.messenger() |
| |
| M1.start() |
| M2.start() |
| M2.subscribe(addr) |
| |
| ingress_message = Message() |
| ingress_message.address = addr |
| ingress_message.body = {'message': 'Hello World!'} |
| ingress_message_annotations = {} |
| |
| ingress_message.annotations = ingress_message_annotations |
| |
| M1.put(ingress_message) |
| M1.send() |
| |
| # Receive the message |
| M2.recv(1) |
| egress_message = Message() |
| M2.get(egress_message) |
| |
| #Make sure 'Hello World!' is in the message body dict |
| self.assertEqual('Hello World!', egress_message.body['message']) |
| |
| |
| egress_message_annotations = egress_message.annotations |
| |
| self.assertEqual(egress_message_annotations.__class__, dict) |
| self.assertEqual(egress_message_annotations['x-opt-qd.ingress'], '0/QDR') |
| self.assertEqual(egress_message_annotations['x-opt-qd.trace'], ['0/QDR']) |
| |
| M1.stop() |
| M2.stop() |
| |
| #stripAnnotations property is set to "no" |
| def test_08a_test_strip_message_annotations_no_add_trace(self): |
| addr = self.router.addresses[1]+"/strip_message_annotations_no_add_trace/1" |
| |
| M1 = self.messenger() |
| M2 = self.messenger() |
| |
| M1.start() |
| M2.start() |
| M2.subscribe(addr) |
| |
| ingress_message = Message() |
| ingress_message.address = addr |
| ingress_message.body = {'message': 'Hello World!'} |
| |
| ## |
| ## Pre-existing ingress and trace |
| ## |
| ingress_message_annotations = {'x-opt-qd.ingress': 'ingress-router', 'x-opt-qd.trace': ['0/QDR.1']} |
| ingress_message.annotations = ingress_message_annotations |
| |
| ingress_message.annotations = ingress_message_annotations |
| |
| M1.put(ingress_message) |
| M1.send() |
| |
| # Receive the message |
| M2.recv(1) |
| egress_message = Message() |
| M2.get(egress_message) |
| |
| #Make sure 'Hello World!' is in the message body dict |
| self.assertEqual('Hello World!', egress_message.body['message']) |
| |
| |
| egress_message_annotations = egress_message.annotations |
| |
| self.assertEqual(egress_message_annotations.__class__, dict) |
| self.assertEqual(egress_message_annotations['x-opt-qd.ingress'], 'ingress-router') |
| self.assertEqual(egress_message_annotations['x-opt-qd.trace'], ['0/QDR.1', '0/QDR']) |
| |
| M1.stop() |
| M2.stop() |
| |
| |
| #Dont send any pre-existing ingress or trace annotations. Make sure that there are no outgoing message annotations |
| #stripAnnotations property is set to "both" |
| def test_08a_test_strip_message_annotations_both(self): |
| addr = self.router.addresses[2]+"/strip_message_annotations_both/1" |
| |
| M1 = self.messenger() |
| M2 = self.messenger() |
| |
| M1.start() |
| M2.start() |
| M2.subscribe(addr) |
| |
| ingress_message = Message() |
| ingress_message.address = addr |
| ingress_message.body = {'message': 'Hello World!'} |
| |
| #Put and send the message |
| M1.put(ingress_message) |
| M1.send() |
| |
| # Receive the message |
| M2.recv(1) |
| egress_message = Message() |
| M2.get(egress_message) |
| |
| self.assertEqual(egress_message.annotations, None) |
| |
| M1.stop() |
| M2.stop() |
| |
| #Dont send any pre-existing ingress or trace annotations. Make sure that there are no outgoing message annotations |
| #stripAnnotations property is set to "out" |
| def test_08a_test_strip_message_annotations_out(self): |
| addr = self.router.addresses[3]+"/strip_message_annotations_out/1" |
| |
| M1 = self.messenger() |
| M2 = self.messenger() |
| |
| M1.start() |
| M2.start() |
| M2.subscribe(addr) |
| |
| ingress_message = Message() |
| ingress_message.address = addr |
| ingress_message.body = {'message': 'Hello World!'} |
| |
| #Put and send the message |
| M1.put(ingress_message) |
| M1.send() |
| |
| # Receive the message |
| M2.recv(1) |
| egress_message = Message() |
| M2.get(egress_message) |
| |
| self.assertEqual(egress_message.annotations, None) |
| |
| M1.stop() |
| M2.stop() |
| |
| #Send in pre-existing trace and ingress and annotations and make sure that they are not in the outgoing annotations. |
| #stripAnnotations property is set to "in" |
| def test_08a_test_strip_message_annotations_in(self): |
| addr = self.router.addresses[4]+"/strip_message_annotations_in/1" |
| |
| M1 = self.messenger() |
| M2 = self.messenger() |
| |
| M1.start() |
| M2.start() |
| M2.subscribe(addr) |
| |
| ingress_message = Message() |
| ingress_message.address = addr |
| ingress_message.body = {'message': 'Hello World!'} |
| |
| ## |
| ## Pre-existing ingress and trace |
| ## |
| ingress_message_annotations = {'x-opt-qd.ingress': 'ingress-router', 'x-opt-qd.trace': ['0/QDR.1']} |
| ingress_message.annotations = ingress_message_annotations |
| |
| #Put and send the message |
| M1.put(ingress_message) |
| M1.send() |
| |
| # Receive the message |
| M2.recv(1) |
| egress_message = Message() |
| M2.get(egress_message) |
| |
| #Make sure 'Hello World!' is in the message body dict |
| self.assertEqual('Hello World!', egress_message.body['message']) |
| |
| egress_message_annotations = egress_message.annotations |
| |
| self.assertEqual(egress_message_annotations.__class__, dict) |
| self.assertEqual(egress_message_annotations['x-opt-qd.ingress'], '0/QDR') |
| self.assertEqual(egress_message_annotations['x-opt-qd.trace'], ['0/QDR']) |
| |
| M1.stop() |
| M2.stop() |
| |
| |
| def test_09_management(self): |
| addr = "amqp:/$management" |
| |
| M = self.messenger() |
| M.start() |
| M.route("amqp:/*", self.address+"/$1") |
| sub = M.subscribe("amqp:/#") |
| reply = sub.address |
| |
| request = Message() |
| response = Message() |
| |
| request.address = addr |
| request.reply_to = reply |
| request.correlation_id = "C1" |
| request.properties = {u'type':u'org.amqp.management', u'name':u'self', u'operation':u'GET-MGMT-NODES'} |
| |
| M.put(request) |
| M.send() |
| M.recv() |
| M.get(response) |
| |
| assert response.properties['statusCode'] == 200, response.properties['statusCode'] |
| self.assertEqual(response.correlation_id, "C1") |
| self.assertEqual(response.body, []) |
| |
| request.address = addr |
| request.reply_to = reply |
| request.correlation_id = 135 |
| request.properties = {u'type':u'org.amqp.management', u'name':u'self', u'operation':u'GET-MGMT-NODES'} |
| |
| M.put(request) |
| M.send() |
| M.recv() |
| M.get(response) |
| |
| self.assertEqual(response.properties['statusCode'], 200) |
| self.assertEqual(response.correlation_id, 135) |
| self.assertEqual(response.body, []) |
| |
| request.address = addr |
| request.reply_to = reply |
| request.properties = {u'type':u'org.amqp.management', u'name':u'self', u'operation':u'GET-MGMT-NODES'} |
| |
| M.put(request) |
| M.send() |
| M.recv() |
| M.get(response) |
| |
| self.assertEqual(response.properties['statusCode'], 200) |
| self.assertEqual(response.body, []) |
| |
| M.stop() |
| |
| |
| def test_09a_management_no_reply(self): |
| addr = "amqp:/$management" |
| |
| M = self.messenger() |
| M.start() |
| M.route("amqp:/*", self.address+"/$1") |
| |
| request = Message() |
| |
| request.address = addr |
| request.correlation_id = "C1" |
| request.properties = {u'type':u'org.amqp.management', u'name':u'self', u'operation':u'GET-MGMT-NODES'} |
| |
| M.put(request) |
| M.send() |
| |
| M.put(request) |
| M.send() |
| |
| M.stop() |
| |
| |
| def test_09c_management_get_operations(self): |
| addr = "amqp:/_local/$management" |
| |
| M = self.messenger() |
| M.start() |
| M.route("amqp:/*", self.address+"/$1") |
| sub = M.subscribe("amqp:/#") |
| reply = sub.address |
| |
| request = Message() |
| response = Message() |
| |
| ## |
| ## Unrestricted request |
| ## |
| request.address = addr |
| request.reply_to = reply |
| request.properties = {u'type':u'org.amqp.management', u'name':u'self', u'operation':u'GET-OPERATIONS'} |
| |
| M.put(request) |
| M.send() |
| M.recv() |
| M.get(response) |
| |
| self.assertEqual(response.properties['statusCode'], 200) |
| self.assertEqual(response.body.__class__, dict) |
| self.assertTrue('org.apache.qpid.dispatch.router' in response.body.keys()) |
| self.assertTrue(len(response.body.keys()) > 2) |
| self.assertTrue(response.body['org.apache.qpid.dispatch.router'].__class__, list) |
| |
| M.stop() |
| |
| |
| def test_09d_management_not_implemented(self): |
| addr = "amqp:/$management" |
| |
| M = self.messenger() |
| M.start() |
| M.route("amqp:/*", self.address+"/$1") |
| sub = M.subscribe("amqp:/#") |
| reply = sub.address |
| |
| request = Message() |
| response = Message() |
| |
| ## |
| ## Request with an invalid operation |
| ## |
| request.address = addr |
| request.reply_to = reply |
| request.properties = {u'type':u'org.amqp.management', u'name':u'self', u'operation':u'NOT-IMPL'} |
| |
| M.put(request) |
| M.send() |
| M.recv() |
| M.get(response) |
| |
| self.assertEqual(response.properties['statusCode'], 501) |
| |
| M.stop() |
| |
| |
| def test_10_semantics_multicast(self): |
| addr = self.address+"/multicast.10" |
| M1 = self.messenger() |
| M2 = self.messenger() |
| M3 = self.messenger() |
| M4 = self.messenger() |
| |
| |
| M1.start() |
| M2.start() |
| M3.start() |
| M4.start() |
| |
| M2.subscribe(addr) |
| M3.subscribe(addr) |
| M4.subscribe(addr) |
| |
| tm = Message() |
| rm = Message() |
| |
| tm.address = addr |
| for i in range(100): |
| tm.body = {'number': i} |
| M1.put(tm) |
| M1.send() |
| |
| for i in range(100): |
| M2.recv(1) |
| M2.get(rm) |
| self.assertEqual(i, rm.body['number']) |
| |
| M3.recv(1) |
| M3.get(rm) |
| self.assertEqual(i, rm.body['number']) |
| |
| M4.recv(1) |
| M4.get(rm) |
| self.assertEqual(i, rm.body['number']) |
| |
| M1.stop() |
| M2.stop() |
| M3.stop() |
| M4.stop() |
| |
| def test_11_semantics_closest(self): |
| addr = self.address+"/closest.1" |
| M1 = self.messenger() |
| M2 = self.messenger() |
| M3 = self.messenger() |
| M4 = self.messenger() |
| |
| |
| M1.start() |
| M2.start() |
| M3.start() |
| M4.start() |
| |
| M2.subscribe(addr) |
| M3.subscribe(addr) |
| M4.subscribe(addr) |
| |
| tm = Message() |
| rm = Message() |
| |
| tm.address = addr |
| for i in range(30): |
| tm.body = {'number': i} |
| M1.put(tm) |
| M1.send() |
| |
| i = 0 |
| rx_set = [] |
| for i in range(10): |
| M2.recv(1) |
| M2.get(rm) |
| rx_set.append(rm.body['number']) |
| |
| M3.recv(1) |
| M3.get(rm) |
| rx_set.append(rm.body['number']) |
| |
| M4.recv(1) |
| M4.get(rm) |
| rx_set.append(rm.body['number']) |
| |
| self.assertEqual(30, len(rx_set)) |
| rx_set.sort() |
| for i in range(30): |
| self.assertEqual(i, rx_set[i]) |
| |
| M1.stop() |
| M2.stop() |
| M3.stop() |
| M4.stop() |
| |
| def test_12_semantics_spread(self): |
| addr = self.address+"/spread.1" |
| M1 = self.messenger() |
| M2 = self.messenger() |
| M3 = self.messenger() |
| M4 = self.messenger() |
| |
| M2.timeout = 0.1 |
| M3.timeout = 0.1 |
| M4.timeout = 0.1 |
| |
| M1.start() |
| M2.start() |
| M3.start() |
| M4.start() |
| |
| M2.subscribe(addr) |
| M3.subscribe(addr) |
| M4.subscribe(addr) |
| |
| tm = Message() |
| rm = Message() |
| |
| tm.address = addr |
| for i in range(30): |
| tm.body = {'number': i} |
| M1.put(tm) |
| M1.send() |
| |
| i = 0 |
| rx_set = [] |
| ca = 0 |
| cb = 0 |
| cc = 0 |
| |
| while len(rx_set) < 30: |
| try: |
| M2.recv(1) |
| M2.get(rm) |
| rx_set.append(rm.body['number']) |
| ca += 1 |
| except: |
| pass |
| |
| try: |
| M3.recv(1) |
| M3.get(rm) |
| rx_set.append(rm.body['number']) |
| cb += 1 |
| except: |
| pass |
| |
| try: |
| M4.recv(1) |
| M4.get(rm) |
| rx_set.append(rm.body['number']) |
| cc += 1 |
| except: |
| pass |
| |
| self.assertEqual(30, len(rx_set)) |
| self.assertTrue(ca > 0) |
| self.assertTrue(cb > 0) |
| self.assertTrue(cc > 0) |
| |
| rx_set.sort() |
| for i in range(30): |
| self.assertEqual(i, rx_set[i]) |
| |
| M1.stop() |
| M2.stop() |
| M3.stop() |
| M4.stop() |
| |
| |
| def test_13_to_override(self): |
| addr = self.address+"/toov/1" |
| M1 = self.messenger() |
| M2 = self.messenger() |
| |
| M1.start() |
| M2.start() |
| M2.subscribe(addr) |
| |
| tm = Message() |
| rm = Message() |
| |
| tm.address = addr |
| |
| ## |
| ## Pre-existing TO |
| ## |
| tm.annotations = {'x-opt-qd.to': 'toov/1'} |
| for i in range(10): |
| tm.body = {'number': i} |
| M1.put(tm) |
| M1.send() |
| |
| for i in range(10): |
| M2.recv(1) |
| M2.get(rm) |
| self.assertEqual(i, rm.body['number']) |
| ma = rm.annotations |
| self.assertEqual(ma.__class__, dict) |
| self.assertEqual(ma['x-opt-qd.to'], 'toov/1') |
| |
| M1.stop() |
| M2.stop() |
| |
| def test_14_send_settle_mode_settled(self): |
| """ |
| The receiver sets a snd-settle-mode of settle thus indicating that it wants to receive settled messages from |
| the sender. This tests make sure that the delivery that comes to the receiver comes as already settled. |
| """ |
| send_settle_mode_test = SndSettleModeTest(self.address) |
| send_settle_mode_test.run() |
| self.assertTrue(send_settle_mode_test.message_received) |
| self.assertTrue(send_settle_mode_test.delivery_already_settled) |
| |
| def test_15_excess_deliveries_released(self): |
| """ |
| Message-route a series of deliveries where the receiver provides credit for a subset and |
| once received, closes the link. The remaining deliveries should be released back to the sender. |
| """ |
| test = ExcessDeliveriesReleasedTest(self.address) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| def test_16_multicast_unsettled(self): |
| test = MulticastUnsettledTest(self.address) |
| test.run() |
| self.assertEqual(None, test.error) |
| |
| |
| class Timeout(object): |
| def __init__(self, parent): |
| self.parent = parent |
| |
| def on_timer_task(self, event): |
| self.parent.timeout() |
| |
| |
| HELLO_WORLD = "Hello World!" |
| |
| class SndSettleModeTest(MessagingHandler): |
| def __init__(self, address): |
| super(SndSettleModeTest, self).__init__() |
| self.address = address |
| self.sender = None |
| self.receiver = None |
| self.message_received = False |
| self.delivery_already_settled = False |
| |
| def on_start(self, event): |
| conn = event.container.connect(self.address) |
| # The receiver sets link.snd_settle_mode = Link.SND_SETTLED. It wants to receive settled messages |
| self.receiver = event.container.create_receiver(conn, "org/apache/dev", options=AtMostOnce()) |
| |
| # With AtLeastOnce, the sender will not settle. |
| self.sender = event.container.create_sender(conn, "org/apache/dev", options=AtLeastOnce()) |
| |
| def on_sendable(self, event): |
| msg = Message(body=HELLO_WORLD) |
| event.sender.send(msg) |
| event.sender.close() |
| |
| def on_message(self, event): |
| self.delivery_already_settled = event.delivery.settled |
| if HELLO_WORLD == event.message.body: |
| self.message_received = True |
| else: |
| self.message_received = False |
| event.connection.close() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class ExcessDeliveriesReleasedTest(MessagingHandler): |
| def __init__(self, address): |
| super(ExcessDeliveriesReleasedTest, self).__init__(prefetch=0) |
| self.address = address |
| self.dest = "closest.EDRtest" |
| self.error = None |
| self.sender = None |
| self.receiver = None |
| self.n_sent = 0 |
| self.n_received = 0 |
| self.n_accepted = 0 |
| self.n_released = 0 |
| |
| def on_start(self, event): |
| conn = event.container.connect(self.address) |
| self.sender = event.container.create_sender(conn, self.dest) |
| self.receiver = event.container.create_receiver(conn, self.dest) |
| self.receiver.flow(6) |
| |
| def on_sendable(self, event): |
| for i in range(10 - self.n_sent): |
| msg = Message(body=i) |
| event.sender.send(msg) |
| self.n_sent += 1 |
| |
| def on_accepted(self, event): |
| self.n_accepted += 1 |
| |
| def on_released(self, event): |
| self.n_released += 1 |
| if self.n_released == 4: |
| if self.n_accepted != 6: |
| self.error = "Expected 6 accepted, got %d" % self.n_accepted |
| if self.n_received != 6: |
| self.error = "Expected 6 received, got %d" % self.n_received |
| event.connection.close() |
| |
| def on_message(self, event): |
| self.n_received += 1 |
| if self.n_received == 6: |
| self.receiver.close() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| class MulticastUnsettledTest(MessagingHandler): |
| def __init__(self, address): |
| super(MulticastUnsettledTest, self).__init__(prefetch=0) |
| self.address = address |
| self.dest = "multicast.MUtest" |
| self.error = None |
| self.count = 10 |
| self.n_sent = 0 |
| self.n_received = 0 |
| self.n_accepted = 0 |
| |
| def check_if_done(self): |
| if self.n_received == self.count * 2 and self.n_accepted == self.count: |
| self.timer.cancel() |
| self.conn.close() |
| |
| def timeout(self): |
| self.error = "Timeout Expired: sent=%d, received=%d, accepted=%d" % (self.n_sent, self.n_received, self.n_accepted) |
| self.conn.close() |
| |
| def on_start(self, event): |
| self.timer = event.reactor.schedule(5, Timeout(self)) |
| self.conn = event.container.connect(self.address) |
| self.sender = event.container.create_sender(self.conn, self.dest) |
| self.receiver1 = event.container.create_receiver(self.conn, self.dest, name="A") |
| self.receiver2 = event.container.create_receiver(self.conn, self.dest, name="B") |
| self.receiver1.flow(self.count) |
| self.receiver2.flow(self.count) |
| |
| def on_sendable(self, event): |
| for i in range(self.count - self.n_sent): |
| msg = Message(body=i) |
| event.sender.send(msg) |
| self.n_sent += 1 |
| |
| def on_accepted(self, event): |
| self.n_accepted += 1 |
| self.check_if_done() |
| |
| def on_message(self, event): |
| if not event.delivery.settled: |
| self.error = "Received unsettled delivery" |
| self.n_received += 1 |
| self.check_if_done() |
| |
| def run(self): |
| Container(self).run() |
| |
| |
| if __name__ == '__main__': |
| unittest.main(main_module()) |