blob: cb55e6a7d35615b9a88d68fb6a45f97d0e905287 [file] [log] [blame]
#!/usr/bin/env python
"""
Module to test AMQP messages with large content (bodies and headers/properties) across different clients
"""
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import signal
import sys
import unittest
from itertools import product
from json import dumps
import qpid_interop_test.qit_common
from qpid_interop_test.qit_errors import InteropTestError, InteropTestTimeout
DEFAULT_TEST_TIMEOUT = 180 # seconds
class AmqpVariableSizeTypes(qpid_interop_test.qit_common.QitTestTypeMap):
"""
Class which contains all the described AMQP variable-size types and the test values to be used in testing.
"""
type_map = {
# List of sizes in Mb (1024*1024 bytes)
# TODO: Until the issue of SLOW Proton performance for large messages is solved, the 100MB tests are
# disabled.
'binary': [1, 10, #100],
],
'string': [1, 10, #100],
],
'symbol': [1, 10, #100],
],
# Tuple of two elements: (tot size of list/map in MB, List of no elements in list)
# The num elements lists are powers of 2 so that they divide evenly into the size in MB (1024 * 1024 bytes)
'list': [[1, [1, 16, 256, 4096]],
[10, [1, 16, 256, 4096]],
#[100, [1, 16, 256, 4096]]
],
'map': [[1, [1, 16, 256, 4096]],
[10, [1, 16, 256, 4096]],
#[100, [1, 16, 256, 4096]]],
],
#'array': [[1, [1, 16, 256, 4096]], [10, [1, 16, 256, 4096]], [100, [1, 16, 256, 4096]]]
}
# This section contains tests that should be skipped because of known broker issues that would cause the
# test to fail. As the issues are resolved, these should be removed.
broker_skip = {}
client_skip = {}
class AmqpLargeContentTestCase(qpid_interop_test.qit_common.QitTestCase):
"""Abstract base class for AMQP large content tests"""
def run_test(self, sender_addr, receiver_addr, amqp_type, test_value_list, send_shim, receive_shim, timeout):
"""
Run this test by invoking the shim send method to send the test values, followed by the shim receive method
to receive the values. Finally, compare the sent values with the received values.
"""
if test_value_list: # len > 0
queue_name = 'qit.amqp_large_content_test.%s.%s.%s' % \
(amqp_type, send_shim.NAME, receive_shim.NAME)
# Start the receive shim first (for queueless brokers/dispatch)
receiver = receive_shim.create_receiver(receiver_addr, queue_name, amqp_type,
str(self.get_num_messages(amqp_type, test_value_list)))
# Start the send shim
sender = send_shim.create_sender(sender_addr, queue_name, amqp_type, dumps(test_value_list))
# Wait for sender, process return string
try:
send_obj = sender.wait_for_completion(timeout)
except (KeyboardInterrupt, InteropTestTimeout):
receiver.send_signal(signal.SIGINT)
raise
if send_obj is not None:
if isinstance(send_obj, str):
if send_obj: # len > 0
receiver.send_signal(signal.SIGINT)
raise InteropTestError('Send shim \'%s\':\n%s' % (send_shim.NAME, send_obj))
else:
receiver.send_signal(signal.SIGINT)
raise InteropTestError('Send shim \'%s\':\n%s' % (send_shim.NAME, send_obj))
# Wait for receiver, process return string
receive_obj = receiver.wait_for_completion(timeout)
if isinstance(receive_obj, tuple):
if len(receive_obj) == 2:
return_amqp_type, return_test_value_list = receive_obj
self.assertEqual(return_amqp_type, amqp_type,
msg='AMQP type error:\n\n sent:%s\n\n received:%s' % \
(amqp_type, return_amqp_type))
self.assertEqual(return_test_value_list, test_value_list, msg='\n sent:%s\nreceived:%s' % \
(test_value_list, return_test_value_list))
else:
raise InteropTestError('Receive shim \'%s\':\n%s' % (receive_shim.NAME, receive_obj))
else:
raise InteropTestError('Receive shim \'%s\':\n%s' % (receive_shim.NAME, receive_obj))
@staticmethod
def get_num_messages(amqp_type, test_value_list):
"""Find the total number of messages to be sent for this test"""
if amqp_type == 'binary' or amqp_type == 'string' or amqp_type == 'symbol':
return len(test_value_list)
if amqp_type == 'list' or amqp_type == 'map':
tot_len = 0
for test_item in test_value_list:
tot_len += len(test_item[1])
return tot_len
return None
class TestOptions(qpid_interop_test.qit_common.QitCommonTestOptions):
"""Command-line arguments used to control the test"""
def __init__(self, shim_map, default_timeout=DEFAULT_TEST_TIMEOUT,
default_xunit_dir=qpid_interop_test.qit_xunit_log.DEFUALT_XUNIT_LOG_DIR):
super(TestOptions, self).__init__('Qpid-interop AMQP client interoparability test suite for AMQP' +
' messages with large content', shim_map, default_timeout, default_xunit_dir)
type_group = self._parser.add_mutually_exclusive_group()
type_group.add_argument('--include-type', action='append', metavar='AMQP-TYPE',
help='Name of AMQP type to include. Supported types:\n%s' %
sorted(AmqpVariableSizeTypes.type_map.keys()))
type_group.add_argument('--exclude-type', action='append', metavar='AMQP-TYPE',
help='Name of AMQP type to exclude. Supported types: see "include-type" above')
class AmqpLargeContentTest(qpid_interop_test.qit_common.QitTest):
"""Top-level test for AMQP large content (variable-size types)"""
TEST_NAME = 'amqp_large_content_test'
def __init__(self):
super(AmqpLargeContentTest, self).__init__(TestOptions, AmqpVariableSizeTypes)
def _generate_tests(self):
"""Generate tests dynamically"""
self.test_suite = unittest.TestSuite()
# Create test classes dynamically
for amqp_type in sorted(self.types.get_type_list()):
if self.args.exclude_type is None or amqp_type not in self.args.exclude_type:
test_case_class = self.create_testcase_class(amqp_type, product(self.shim_map.values(), repeat=2),
int(self.args.timeout))
self.test_suite.addTest(unittest.makeSuite(test_case_class))
def create_testcase_class(self, amqp_type, shim_product, timeout):
"""
Class factory function which creates new subclasses to AmqpTypeTestCase.
"""
def __repr__(self):
"""Print the class name"""
return self.__class__.__name__
def add_test_method(cls, send_shim, receive_shim, timeout):
"""Function which creates a new test method in class cls"""
@unittest.skipIf(self.types.skip_test(amqp_type, self.broker),
self.types.skip_test_message(amqp_type, self.broker))
@unittest.skipIf(self.types.skip_client_test(amqp_type, send_shim.NAME),
self.types.skip_client_test_message(amqp_type, send_shim.NAME, 'SENDER'))
@unittest.skipIf(self.types.skip_client_test(amqp_type, receive_shim.NAME),
self.types.skip_client_test_message(amqp_type, receive_shim.NAME, 'RECEIVER'))
def inner_test_method(self):
self.run_test(self.sender_addr,
self.receiver_addr,
self.amqp_type,
self.test_value_list,
send_shim,
receive_shim,
timeout)
inner_test_method.__name__ = 'test_%s_%s->%s' % (amqp_type, send_shim.NAME, receive_shim.NAME)
setattr(cls, inner_test_method.__name__, inner_test_method)
class_name = amqp_type.title() + 'TestCase'
class_dict = {'__name__': class_name,
'__repr__': __repr__,
'__doc__': 'Test case for AMQP 1.0 simple type \'%s\'' % amqp_type,
'amqp_type': amqp_type,
'sender_addr': self.args.sender,
'receiver_addr': self.args.receiver,
'test_value_list': self.types.get_test_values(amqp_type)}
new_class = type(class_name, (AmqpLargeContentTestCase,), class_dict)
for send_shim, receive_shim in shim_product:
add_test_method(new_class, send_shim, receive_shim, timeout)
return new_class
#--- Main program start ---
if __name__ == '__main__':
try:
AMQP_LARGE_CONTENT_TEST = AmqpLargeContentTest()
AMQP_LARGE_CONTENT_TEST.run_test()
AMQP_LARGE_CONTENT_TEST.write_logs()
if not AMQP_LARGE_CONTENT_TEST.get_result():
sys.exit(1) # Errors or failures present
except InteropTestError as err:
print(err)
sys.exit(1)