blob: 5dee9eb1a92ab1abc5870541c1091a5c9f8c8976 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import unittest
from proton import Message, PENDING, ACCEPTED, REJECTED
from system_test import TestCase, Qdrouterd, main_module
from proton.handlers import MessagingHandler
from proton.reactor import Container, AtMostOnce, AtLeastOnce
# PROTON-828:
try:
from proton import MODIFIED
except ImportError:
from proton import PN_STATUS_MODIFIED as MODIFIED
class RouterTest(TestCase):
"""System tests involving a single router"""
@classmethod
def setUpClass(cls):
"""Start a router and a messenger"""
super(RouterTest, cls).setUpClass()
name = "test-router"
config = Qdrouterd.Config([
('container', {'workerThreads': 4, 'containerName': 'Qpid.Dispatch.Router.A'}),
('router', {'mode': 'standalone', 'routerId': 'QDR'}),
# Setting the stripAnnotations to 'no' so that the existing tests will work.
# Setting stripAnnotations to no will not strip the annotations and any tests that were already in this file
# that were expecting the annotations to not be stripped will continue working.
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
# The following listeners were exclusively added to test the stripAnnotations attribute in qdrouterd.conf file
# Different listeners will be used to test all allowed values of stripAnnotations ('no', 'both', 'out', 'in')
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'both'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'out'}),
('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'in'}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'spread', 'distribution': 'balanced'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
])
cls.router = cls.tester.qdrouterd(name, config)
cls.router.wait_ready()
cls.address = cls.router.addresses[0]
def test_01_pre_settled(self):
addr = self.address+"/pre_settled/1"
M1 = self.messenger()
M2 = self.messenger()
M1.start()
M2.start()
M2.subscribe(addr)
tm = Message()
rm = Message()
tm.address = addr
for i in range(100):
tm.body = {'number': i}
M1.put(tm)
M1.send()
for i in range(100):
M2.recv(1)
M2.get(rm)
self.assertEqual(i, rm.body['number'])
M1.stop()
M2.stop()
def test_02a_multicast_unsettled(self):
addr = self.address+"/multicast.unsettled.1"
M1 = self.messenger()
M2 = self.messenger()
M3 = self.messenger()
M4 = self.messenger()
M1.outgoing_window = 5
M2.incoming_window = 5
M3.incoming_window = 5
M4.incoming_window = 5
M1.start()
M2.start()
M3.start()
M4.start()
M2.subscribe(addr)
M3.subscribe(addr)
M4.subscribe(addr)
tm = Message()
rm = Message()
tm.address = addr
for i in range(2):
tm.body = {'number': i}
M1.put(tm)
M1.send(0)
for i in range(2):
M2.recv(1)
trk = M2.get(rm)
M2.accept(trk)
M2.settle(trk)
self.assertEqual(i, rm.body['number'])
M3.recv(1)
trk = M3.get(rm)
M3.accept(trk)
M3.settle(trk)
self.assertEqual(i, rm.body['number'])
M4.recv(1)
trk = M4.get(rm)
M4.accept(trk)
M4.settle(trk)
self.assertEqual(i, rm.body['number'])
M1.stop()
M2.stop()
M3.stop()
M4.stop()
def test_02b_disp_to_closed_connection(self):
addr = self.address+"/pre_settled/2"
M1 = self.messenger()
M2 = self.messenger()
M1.outgoing_window = 5
M2.incoming_window = 5
M1.start()
M2.start()
M2.subscribe(addr)
tm = Message()
rm = Message()
tm.address = addr
for i in range(2):
tm.body = {'number': i}
M1.put(tm)
M1.send(0)
M1.stop()
for i in range(2):
M2.recv(1)
trk = M2.get(rm)
M2.accept(trk)
M2.settle(trk)
self.assertEqual(i, rm.body['number'])
M2.stop()
def test_02c_sender_settles_first(self):
addr = self.address+"/settled/senderfirst/1"
M1 = self.messenger()
M2 = self.messenger()
M1.outgoing_window = 5
M2.incoming_window = 5
M1.start()
M2.start()
M2.subscribe(addr)
tm = Message()
rm = Message()
tm.address = addr
tm.body = {'number': 0}
ttrk = M1.put(tm)
M1.send(0)
M1.settle(ttrk)
M1.flush()
M2.flush()
M2.recv(1)
rtrk = M2.get(rm)
M2.accept(rtrk)
M2.settle(rtrk)
self.assertEqual(0, rm.body['number'])
M1.flush()
M2.flush()
M1.stop()
M2.stop()
def test_03_propagated_disposition(self):
addr = self.address+"/unsettled/1"
M1 = self.messenger()
M2 = self.messenger()
M1.outgoing_window = 5
M2.incoming_window = 5
M1.start()
M2.start()
M2.subscribe(addr)
tm = Message()
rm = Message()
tm.address = addr
tm.body = {'number': 0}
##
## Test ACCEPT
##
tx_tracker = M1.put(tm)
M1.send(0)
M2.recv(1)
rx_tracker = M2.get(rm)
self.assertEqual(0, rm.body['number'])
self.assertEqual(PENDING, M1.status(tx_tracker))
M2.accept(rx_tracker)
M2.settle(rx_tracker)
M2.flush()
M1.flush()
self.assertEqual(ACCEPTED, M1.status(tx_tracker))
##
## Test REJECT
##
tx_tracker = M1.put(tm)
M1.send(0)
M2.recv(1)
rx_tracker = M2.get(rm)
self.assertEqual(0, rm.body['number'])
self.assertEqual(PENDING, M1.status(tx_tracker))
M2.reject(rx_tracker)
M2.settle(rx_tracker)
M2.flush()
M1.flush()
self.assertEqual(REJECTED, M1.status(tx_tracker))
M1.stop()
M2.stop()
def test_04_unsettled_undeliverable(self):
addr = self.address+"/unsettled_undeliverable/1"
M1 = self.messenger()
M1.outgoing_window = 5
M1.start()
M1.timeout = 1
tm = Message()
tm.address = addr
tm.body = {'number': 200}
exception = False
try:
M1.put(tm)
M1.send(0)
M1.flush()
except Exception:
exception = True
self.assertEqual(exception, True)
M1.stop()
def test_05_three_ack(self):
addr = self.address+"/three_ack/1"
M1 = self.messenger()
M2 = self.messenger()
M1.outgoing_window = 5
M2.incoming_window = 5
M1.start()
M2.start()
M2.subscribe(addr)
tm = Message()
rm = Message()
tm.address = addr
tm.body = {'number': 200}
tx_tracker = M1.put(tm)
M1.send(0)
M2.recv(1)
rx_tracker = M2.get(rm)
self.assertEqual(200, rm.body['number'])
self.assertEqual(PENDING, M1.status(tx_tracker))
M2.accept(rx_tracker)
M2.flush()
M1.flush()
self.assertEqual(ACCEPTED, M1.status(tx_tracker))
M1.settle(tx_tracker)
M1.flush()
M2.flush()
##
## We need a way to verify on M2 (receiver) that the tracker has been
## settled on the M1 (sender). [ See PROTON-395 ]
##
M2.settle(rx_tracker)
M2.flush()
M1.flush()
M1.stop()
M2.stop()
# def test_06_link_route_sender(self):
# pass
# def test_07_link_route_receiver(self):
# pass
def test_08_message_annotations(self):
addr = self.address+"/ma/1"
M1 = self.messenger()
M2 = self.messenger()
M1.start()
M2.start()
M2.subscribe(addr)
tm = Message()
rm = Message()
tm.address = addr
##
## No inbound delivery annotations
##
for i in range(10):
tm.body = {'number': i}
M1.put(tm)
M1.send()
for i in range(10):
M2.recv(1)
M2.get(rm)
self.assertEqual(i, rm.body['number'])
ma = rm.annotations
self.assertEqual(ma.__class__, dict)
self.assertEqual(ma['x-opt-qd.ingress'], '0/QDR')
self.assertEqual(ma['x-opt-qd.trace'], ['0/QDR'])
##
## Pre-existing ingress
##
tm.annotations = {'x-opt-qd.ingress': 'ingress-router'}
for i in range(10):
tm.body = {'number': i}
M1.put(tm)
M1.send()
for i in range(10):
M2.recv(1)
M2.get(rm)
self.assertEqual(i, rm.body['number'])
ma = rm.annotations
self.assertEqual(ma.__class__, dict)
self.assertEqual(ma['x-opt-qd.ingress'], 'ingress-router')
self.assertEqual(ma['x-opt-qd.trace'], ['0/QDR'])
##
## Invalid trace type
##
tm.annotations = {'x-opt-qd.trace' : 45}
for i in range(10):
tm.body = {'number': i}
M1.put(tm)
M1.send()
for i in range(10):
M2.recv(1)
M2.get(rm)
self.assertEqual(i, rm.body['number'])
ma = rm.annotations
self.assertEqual(ma.__class__, dict)
self.assertEqual(ma['x-opt-qd.ingress'], '0/QDR')
self.assertEqual(ma['x-opt-qd.trace'], ['0/QDR'])
##
## Empty trace
##
tm.annotations = {'x-opt-qd.trace' : []}
for i in range(10):
tm.body = {'number': i}
M1.put(tm)
M1.send()
for i in range(10):
M2.recv(1)
M2.get(rm)
self.assertEqual(i, rm.body['number'])
ma = rm.annotations
self.assertEqual(ma.__class__, dict)
self.assertEqual(ma['x-opt-qd.ingress'], '0/QDR')
self.assertEqual(ma['x-opt-qd.trace'], ['0/QDR'])
##
## Non-empty trace
##
tm.annotations = {'x-opt-qd.trace' : ['0/first.hop']}
for i in range(10):
tm.body = {'number': i}
M1.put(tm)
M1.send()
for i in range(10):
M2.recv(1)
M2.get(rm)
self.assertEqual(i, rm.body['number'])
ma = rm.annotations
self.assertEqual(ma.__class__, dict)
self.assertEqual(ma['x-opt-qd.ingress'], '0/QDR')
self.assertEqual(ma['x-opt-qd.trace'], ['0/first.hop', '0/QDR'])
M1.stop()
M2.stop()
# Tests stripping of ingress and egress annotations.
# There is a property in qdrouter.json called stripAnnotations with possible values of ["in", "out", "both", "no"]
# The default for stripAnnotations is "both" (which means strip annotations on both ingress and egress)
# This test will test the stripAnnotations = no option - meaning no annotations must be stripped.
# We will send in a custom annotation and make that we get back 3 annotations on the received message
# Skipping this test temporarily
def notest_08a_test_strip_message_annotations_no_custom_not_implemented(self):
addr = self.router.addresses[1]+"/strip_message_annotations_no_custom/1"
M1 = self.messenger()
M2 = self.messenger()
M1.start()
M2.start()
M2.subscribe(addr)
ingress_message = Message()
ingress_message.address = addr
ingress_message.body = {'message': 'Hello World!'}
ingress_message_annotations = {}
ingress_message_annotations['custom-annotation'] = '1/Custom_Annotation'
ingress_message.annotations = ingress_message_annotations
M1.put(ingress_message)
M1.send()
# Receive the message
M2.recv(1)
egress_message = Message()
M2.get(egress_message)
#Make sure 'Hello World!' is in the message body dict
self.assertEqual('Hello World!', egress_message.body['message'])
egress_message_annotations = egress_message.annotations
self.assertEqual(egress_message_annotations.__class__, dict)
self.assertEqual(egress_message_annotations['custom-annotation'], '1/Custom_Annotation')
self.assertEqual(egress_message_annotations['x-opt-qd.ingress'], '0/QDR')
self.assertEqual(egress_message_annotations['x-opt-qd.trace'], ['0/QDR'])
M1.stop()
M2.stop()
#stripAnnotations property is set to "no"
def test_08a_test_strip_message_annotations_no(self):
addr = self.router.addresses[1]+"/strip_message_annotations_no/1"
M1 = self.messenger()
M2 = self.messenger()
M1.start()
M2.start()
M2.subscribe(addr)
ingress_message = Message()
ingress_message.address = addr
ingress_message.body = {'message': 'Hello World!'}
ingress_message_annotations = {}
ingress_message.annotations = ingress_message_annotations
M1.put(ingress_message)
M1.send()
# Receive the message
M2.recv(1)
egress_message = Message()
M2.get(egress_message)
#Make sure 'Hello World!' is in the message body dict
self.assertEqual('Hello World!', egress_message.body['message'])
egress_message_annotations = egress_message.annotations
self.assertEqual(egress_message_annotations.__class__, dict)
self.assertEqual(egress_message_annotations['x-opt-qd.ingress'], '0/QDR')
self.assertEqual(egress_message_annotations['x-opt-qd.trace'], ['0/QDR'])
M1.stop()
M2.stop()
#stripAnnotations property is set to "no"
def test_08a_test_strip_message_annotations_no_add_trace(self):
addr = self.router.addresses[1]+"/strip_message_annotations_no_add_trace/1"
M1 = self.messenger()
M2 = self.messenger()
M1.start()
M2.start()
M2.subscribe(addr)
ingress_message = Message()
ingress_message.address = addr
ingress_message.body = {'message': 'Hello World!'}
##
## Pre-existing ingress and trace
##
ingress_message_annotations = {'x-opt-qd.ingress': 'ingress-router', 'x-opt-qd.trace': ['0/QDR.1']}
ingress_message.annotations = ingress_message_annotations
ingress_message.annotations = ingress_message_annotations
M1.put(ingress_message)
M1.send()
# Receive the message
M2.recv(1)
egress_message = Message()
M2.get(egress_message)
#Make sure 'Hello World!' is in the message body dict
self.assertEqual('Hello World!', egress_message.body['message'])
egress_message_annotations = egress_message.annotations
self.assertEqual(egress_message_annotations.__class__, dict)
self.assertEqual(egress_message_annotations['x-opt-qd.ingress'], 'ingress-router')
self.assertEqual(egress_message_annotations['x-opt-qd.trace'], ['0/QDR.1', '0/QDR'])
M1.stop()
M2.stop()
#Dont send any pre-existing ingress or trace annotations. Make sure that there are no outgoing message annotations
#stripAnnotations property is set to "both"
def test_08a_test_strip_message_annotations_both(self):
addr = self.router.addresses[2]+"/strip_message_annotations_both/1"
M1 = self.messenger()
M2 = self.messenger()
M1.start()
M2.start()
M2.subscribe(addr)
ingress_message = Message()
ingress_message.address = addr
ingress_message.body = {'message': 'Hello World!'}
#Put and send the message
M1.put(ingress_message)
M1.send()
# Receive the message
M2.recv(1)
egress_message = Message()
M2.get(egress_message)
self.assertEqual(egress_message.annotations, None)
M1.stop()
M2.stop()
#Dont send any pre-existing ingress or trace annotations. Make sure that there are no outgoing message annotations
#stripAnnotations property is set to "out"
def test_08a_test_strip_message_annotations_out(self):
addr = self.router.addresses[3]+"/strip_message_annotations_out/1"
M1 = self.messenger()
M2 = self.messenger()
M1.start()
M2.start()
M2.subscribe(addr)
ingress_message = Message()
ingress_message.address = addr
ingress_message.body = {'message': 'Hello World!'}
#Put and send the message
M1.put(ingress_message)
M1.send()
# Receive the message
M2.recv(1)
egress_message = Message()
M2.get(egress_message)
self.assertEqual(egress_message.annotations, None)
M1.stop()
M2.stop()
#Send in pre-existing trace and ingress and annotations and make sure that they are not in the outgoing annotations.
#stripAnnotations property is set to "in"
def test_08a_test_strip_message_annotations_in(self):
addr = self.router.addresses[4]+"/strip_message_annotations_in/1"
M1 = self.messenger()
M2 = self.messenger()
M1.start()
M2.start()
M2.subscribe(addr)
ingress_message = Message()
ingress_message.address = addr
ingress_message.body = {'message': 'Hello World!'}
##
## Pre-existing ingress and trace
##
ingress_message_annotations = {'x-opt-qd.ingress': 'ingress-router', 'x-opt-qd.trace': ['0/QDR.1']}
ingress_message.annotations = ingress_message_annotations
#Put and send the message
M1.put(ingress_message)
M1.send()
# Receive the message
M2.recv(1)
egress_message = Message()
M2.get(egress_message)
#Make sure 'Hello World!' is in the message body dict
self.assertEqual('Hello World!', egress_message.body['message'])
egress_message_annotations = egress_message.annotations
self.assertEqual(egress_message_annotations.__class__, dict)
self.assertEqual(egress_message_annotations['x-opt-qd.ingress'], '0/QDR')
self.assertEqual(egress_message_annotations['x-opt-qd.trace'], ['0/QDR'])
M1.stop()
M2.stop()
def test_09_management(self):
addr = "amqp:/$management"
M = self.messenger()
M.start()
M.route("amqp:/*", self.address+"/$1")
sub = M.subscribe("amqp:/#")
reply = sub.address
request = Message()
response = Message()
request.address = addr
request.reply_to = reply
request.correlation_id = "C1"
request.properties = {u'type':u'org.amqp.management', u'name':u'self', u'operation':u'GET-MGMT-NODES'}
M.put(request)
M.send()
M.recv()
M.get(response)
assert response.properties['statusCode'] == 200, response.properties['statusCode']
self.assertEqual(response.correlation_id, "C1")
self.assertEqual(response.body, [])
request.address = addr
request.reply_to = reply
request.correlation_id = 135
request.properties = {u'type':u'org.amqp.management', u'name':u'self', u'operation':u'GET-MGMT-NODES'}
M.put(request)
M.send()
M.recv()
M.get(response)
self.assertEqual(response.properties['statusCode'], 200)
self.assertEqual(response.correlation_id, 135)
self.assertEqual(response.body, [])
request.address = addr
request.reply_to = reply
request.properties = {u'type':u'org.amqp.management', u'name':u'self', u'operation':u'GET-MGMT-NODES'}
M.put(request)
M.send()
M.recv()
M.get(response)
self.assertEqual(response.properties['statusCode'], 200)
self.assertEqual(response.body, [])
M.stop()
def test_09a_management_no_reply(self):
addr = "amqp:/$management"
M = self.messenger()
M.start()
M.route("amqp:/*", self.address+"/$1")
request = Message()
request.address = addr
request.correlation_id = "C1"
request.properties = {u'type':u'org.amqp.management', u'name':u'self', u'operation':u'GET-MGMT-NODES'}
M.put(request)
M.send()
M.put(request)
M.send()
M.stop()
def test_09c_management_get_operations(self):
addr = "amqp:/_local/$management"
M = self.messenger()
M.start()
M.route("amqp:/*", self.address+"/$1")
sub = M.subscribe("amqp:/#")
reply = sub.address
request = Message()
response = Message()
##
## Unrestricted request
##
request.address = addr
request.reply_to = reply
request.properties = {u'type':u'org.amqp.management', u'name':u'self', u'operation':u'GET-OPERATIONS'}
M.put(request)
M.send()
M.recv()
M.get(response)
self.assertEqual(response.properties['statusCode'], 200)
self.assertEqual(response.body.__class__, dict)
self.assertTrue('org.apache.qpid.dispatch.router' in response.body.keys())
self.assertTrue(len(response.body.keys()) > 2)
self.assertTrue(response.body['org.apache.qpid.dispatch.router'].__class__, list)
M.stop()
def test_09d_management_not_implemented(self):
addr = "amqp:/$management"
M = self.messenger()
M.start()
M.route("amqp:/*", self.address+"/$1")
sub = M.subscribe("amqp:/#")
reply = sub.address
request = Message()
response = Message()
##
## Request with an invalid operation
##
request.address = addr
request.reply_to = reply
request.properties = {u'type':u'org.amqp.management', u'name':u'self', u'operation':u'NOT-IMPL'}
M.put(request)
M.send()
M.recv()
M.get(response)
self.assertEqual(response.properties['statusCode'], 501)
M.stop()
def test_10_semantics_multicast(self):
addr = self.address+"/multicast.10"
M1 = self.messenger()
M2 = self.messenger()
M3 = self.messenger()
M4 = self.messenger()
M1.start()
M2.start()
M3.start()
M4.start()
M2.subscribe(addr)
M3.subscribe(addr)
M4.subscribe(addr)
tm = Message()
rm = Message()
tm.address = addr
for i in range(100):
tm.body = {'number': i}
M1.put(tm)
M1.send()
for i in range(100):
M2.recv(1)
M2.get(rm)
self.assertEqual(i, rm.body['number'])
M3.recv(1)
M3.get(rm)
self.assertEqual(i, rm.body['number'])
M4.recv(1)
M4.get(rm)
self.assertEqual(i, rm.body['number'])
M1.stop()
M2.stop()
M3.stop()
M4.stop()
def test_11_semantics_closest(self):
addr = self.address+"/closest.1"
M1 = self.messenger()
M2 = self.messenger()
M3 = self.messenger()
M4 = self.messenger()
M1.start()
M2.start()
M3.start()
M4.start()
M2.subscribe(addr)
M3.subscribe(addr)
M4.subscribe(addr)
tm = Message()
rm = Message()
tm.address = addr
for i in range(30):
tm.body = {'number': i}
M1.put(tm)
M1.send()
i = 0
rx_set = []
for i in range(10):
M2.recv(1)
M2.get(rm)
rx_set.append(rm.body['number'])
M3.recv(1)
M3.get(rm)
rx_set.append(rm.body['number'])
M4.recv(1)
M4.get(rm)
rx_set.append(rm.body['number'])
self.assertEqual(30, len(rx_set))
rx_set.sort()
for i in range(30):
self.assertEqual(i, rx_set[i])
M1.stop()
M2.stop()
M3.stop()
M4.stop()
def test_12_semantics_spread(self):
addr = self.address+"/spread.1"
M1 = self.messenger()
M2 = self.messenger()
M3 = self.messenger()
M4 = self.messenger()
M2.timeout = 0.1
M3.timeout = 0.1
M4.timeout = 0.1
M1.start()
M2.start()
M3.start()
M4.start()
M2.subscribe(addr)
M3.subscribe(addr)
M4.subscribe(addr)
tm = Message()
rm = Message()
tm.address = addr
for i in range(30):
tm.body = {'number': i}
M1.put(tm)
M1.send()
i = 0
rx_set = []
ca = 0
cb = 0
cc = 0
while len(rx_set) < 30:
try:
M2.recv(1)
M2.get(rm)
rx_set.append(rm.body['number'])
ca += 1
except:
pass
try:
M3.recv(1)
M3.get(rm)
rx_set.append(rm.body['number'])
cb += 1
except:
pass
try:
M4.recv(1)
M4.get(rm)
rx_set.append(rm.body['number'])
cc += 1
except:
pass
self.assertEqual(30, len(rx_set))
self.assertTrue(ca > 0)
self.assertTrue(cb > 0)
self.assertTrue(cc > 0)
rx_set.sort()
for i in range(30):
self.assertEqual(i, rx_set[i])
M1.stop()
M2.stop()
M3.stop()
M4.stop()
def test_13_to_override(self):
addr = self.address+"/toov/1"
M1 = self.messenger()
M2 = self.messenger()
M1.start()
M2.start()
M2.subscribe(addr)
tm = Message()
rm = Message()
tm.address = addr
##
## Pre-existing TO
##
tm.annotations = {'x-opt-qd.to': 'toov/1'}
for i in range(10):
tm.body = {'number': i}
M1.put(tm)
M1.send()
for i in range(10):
M2.recv(1)
M2.get(rm)
self.assertEqual(i, rm.body['number'])
ma = rm.annotations
self.assertEqual(ma.__class__, dict)
self.assertEqual(ma['x-opt-qd.to'], 'toov/1')
M1.stop()
M2.stop()
def test_14_send_settle_mode_settled(self):
"""
The receiver sets a snd-settle-mode of settle thus indicating that it wants to receive settled messages from
the sender. This tests make sure that the delivery that comes to the receiver comes as already settled.
"""
send_settle_mode_test = SndSettleModeTest(self.address)
send_settle_mode_test.run()
self.assertTrue(send_settle_mode_test.message_received)
self.assertTrue(send_settle_mode_test.delivery_already_settled)
def test_15_excess_deliveries_released(self):
"""
Message-route a series of deliveries where the receiver provides credit for a subset and
once received, closes the link. The remaining deliveries should be released back to the sender.
"""
test = ExcessDeliveriesReleasedTest(self.address)
test.run()
self.assertEqual(None, test.error)
def test_16_multicast_unsettled(self):
test = MulticastUnsettledTest(self.address)
test.run()
self.assertEqual(None, test.error)
class Timeout(object):
def __init__(self, parent):
self.parent = parent
def on_timer_task(self, event):
self.parent.timeout()
HELLO_WORLD = "Hello World!"
class SndSettleModeTest(MessagingHandler):
def __init__(self, address):
super(SndSettleModeTest, self).__init__()
self.address = address
self.sender = None
self.receiver = None
self.message_received = False
self.delivery_already_settled = False
def on_start(self, event):
conn = event.container.connect(self.address)
# The receiver sets link.snd_settle_mode = Link.SND_SETTLED. It wants to receive settled messages
self.receiver = event.container.create_receiver(conn, "org/apache/dev", options=AtMostOnce())
# With AtLeastOnce, the sender will not settle.
self.sender = event.container.create_sender(conn, "org/apache/dev", options=AtLeastOnce())
def on_sendable(self, event):
msg = Message(body=HELLO_WORLD)
event.sender.send(msg)
event.sender.close()
def on_message(self, event):
self.delivery_already_settled = event.delivery.settled
if HELLO_WORLD == event.message.body:
self.message_received = True
else:
self.message_received = False
event.connection.close()
def run(self):
Container(self).run()
class ExcessDeliveriesReleasedTest(MessagingHandler):
def __init__(self, address):
super(ExcessDeliveriesReleasedTest, self).__init__(prefetch=0)
self.address = address
self.dest = "closest.EDRtest"
self.error = None
self.sender = None
self.receiver = None
self.n_sent = 0
self.n_received = 0
self.n_accepted = 0
self.n_released = 0
def on_start(self, event):
conn = event.container.connect(self.address)
self.sender = event.container.create_sender(conn, self.dest)
self.receiver = event.container.create_receiver(conn, self.dest)
self.receiver.flow(6)
def on_sendable(self, event):
for i in range(10 - self.n_sent):
msg = Message(body=i)
event.sender.send(msg)
self.n_sent += 1
def on_accepted(self, event):
self.n_accepted += 1
def on_released(self, event):
self.n_released += 1
if self.n_released == 4:
if self.n_accepted != 6:
self.error = "Expected 6 accepted, got %d" % self.n_accepted
if self.n_received != 6:
self.error = "Expected 6 received, got %d" % self.n_received
event.connection.close()
def on_message(self, event):
self.n_received += 1
if self.n_received == 6:
self.receiver.close()
def run(self):
Container(self).run()
class MulticastUnsettledTest(MessagingHandler):
def __init__(self, address):
super(MulticastUnsettledTest, self).__init__(prefetch=0)
self.address = address
self.dest = "multicast.MUtest"
self.error = None
self.count = 10
self.n_sent = 0
self.n_received = 0
self.n_accepted = 0
def check_if_done(self):
if self.n_received == self.count * 2 and self.n_accepted == self.count:
self.timer.cancel()
self.conn.close()
def timeout(self):
self.error = "Timeout Expired: sent=%d, received=%d, accepted=%d" % (self.n_sent, self.n_received, self.n_accepted)
self.conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(5, Timeout(self))
self.conn = event.container.connect(self.address)
self.sender = event.container.create_sender(self.conn, self.dest)
self.receiver1 = event.container.create_receiver(self.conn, self.dest, name="A")
self.receiver2 = event.container.create_receiver(self.conn, self.dest, name="B")
self.receiver1.flow(self.count)
self.receiver2.flow(self.count)
def on_sendable(self, event):
for i in range(self.count - self.n_sent):
msg = Message(body=i)
event.sender.send(msg)
self.n_sent += 1
def on_accepted(self, event):
self.n_accepted += 1
self.check_if_done()
def on_message(self, event):
if not event.delivery.settled:
self.error = "Received unsettled delivery"
self.n_received += 1
self.check_if_done()
def run(self):
Container(self).run()
if __name__ == '__main__':
unittest.main(main_module())