| #!/usr/bin/env python |
| |
| """ |
| JMS sender shim for qpid-interop-test |
| """ |
| |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| from json import loads |
| from subprocess import check_output |
| from struct import pack, unpack |
| import sys |
| from traceback import format_exc |
| |
| from qpid_interop_test.jms_types import create_annotation |
| from proton import byte, char, float32, int32, Message, short, symbol |
| from proton.handlers import MessagingHandler |
| from proton.reactor import Container |
| from qpid_interop_test.interop_test_errors import InteropTestError |
| |
| class JmsMessagesTestSender(MessagingHandler): |
| """ |
| This shim sends JMS messages of a particular JMS message type according to the test parameters list. This list |
| contains three maps: |
| 0: The test value map, which contains test value types as keys, and lists of values of that type; |
| 1. The test headers map, which contains the JMS headers as keys and a submap conatining types and values; |
| 2. The test proprties map, which contains the name of the properties as keys, and a submap containing types |
| and values |
| This shim takes the combinations of the above map and creates test cases, each of which sends a single message |
| with (or without) JMS headers and properties. |
| """ |
| def __init__(self, broker_ip_addr, queue_name, jms_msg_type, test_parameters_list): |
| super(JmsMessagesTestSender, self).__init__() |
| self.broker_ip_addr = broker_ip_addr |
| self.queue_name = queue_name |
| self.jms_msg_type = jms_msg_type |
| self.test_value_map = test_parameters_list |
| self.sent = 0 |
| self.confirmed = 0 |
| self.total = self._get_total_num_msgs() |
| |
| def on_start(self, event): |
| """Event callback for when the client starts""" |
| event.container.create_sender('%s/%s' % (self.broker_ip_addr, self.queue_name)) |
| |
| def on_sendable(self, event): |
| """Event callback for when send credit is received, allowing the sending of messages""" |
| if self.sent == 0: |
| # These types expect a test_values Python string representation of a map: '{type:[val, val, val], ...}' |
| for sub_type in sorted(self.test_value_map.keys()): |
| if self._send_test_values(event, sub_type, self.test_value_map[sub_type]): |
| return |
| |
| def on_connection_error(self, event): |
| print 'JmsMessagesTestSender.on_connection_error' |
| |
| def on_session_error(self, event): |
| print 'JmsMessagesTestSender.on_session_error' |
| |
| def on_link_error(self, event): |
| print 'JmsMessagesTestSender.on_link_error' |
| |
| def on_accepted(self, event): |
| """Event callback for when a sent message is accepted by the broker""" |
| self.confirmed += 1 |
| if self.confirmed == self.total: |
| event.connection.close() |
| |
| def on_disconnected(self, event): |
| """Event callback for when the broker disconnects with the client""" |
| self.sent = self.confirmed |
| |
| def _get_total_num_msgs(self): |
| """ |
| Calculates the total number of messages to be sent based on the message parameters received on the command-line |
| """ |
| total = 0 |
| for key in self.test_value_map.keys(): |
| total += len(self.test_value_map[key]) |
| return total |
| |
| def _send_test_values(self, event, test_value_type, test_values): |
| """Method which loops through recieved parameters and sends the corresponding messages""" |
| value_num = 0 |
| for test_value in test_values: |
| if event.sender.credit: |
| message = self._create_message(test_value_type, test_value, value_num) |
| # TODO: set message to address |
| if message is not None: |
| event.sender.send(message) |
| self.sent += 1 |
| value_num += 1 |
| else: |
| event.connection.close() |
| return True |
| return False |
| |
| # TODO: Change this to return a list of messages. That way each test can return more than one message |
| def _create_message(self, test_value_type, test_value, value_num): |
| """Create a single message of the appropriate JMS message type""" |
| if self.jms_msg_type == 'JMS_MESSAGE_TYPE': |
| return self._create_jms_message(test_value_type, test_value) |
| elif self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE': |
| return self._create_jms_bytesmessage(test_value_type, test_value) |
| elif self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE': |
| return self._create_jms_mapmessage(test_value_type, test_value, "%s%03d" % (test_value_type, value_num)) |
| elif self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE': |
| return self._create_jms_objectmessage('%s:%s' % (test_value_type, test_value)) |
| elif self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE': |
| return self._create_jms_streammessage(test_value_type, test_value) |
| elif self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE': |
| return self._create_jms_textmessage(test_value) |
| else: |
| print 'JmsMessagesTestSender: Unsupported JMS message type "%s"' % self.jms_msg_type |
| return None |
| |
| def _create_jms_message(self, test_value_type, test_value): |
| """Create a JMS message type (without message body)""" |
| if test_value_type != 'none': |
| raise InteropTestError('JmsMessagesTestSender._create_jms_message: Unknown or unsupported subtype "%s"' % |
| test_value_type) |
| if test_value is not None: |
| raise InteropTestError('JmsMessagesTestSender._create_jms_message: Invalid value "%s" for subtype "%s"' % |
| (test_value, test_value_type)) |
| return Message(id=(self.sent+1), |
| content_type='application/octet-stream', |
| annotations=create_annotation('JMS_MESSAGE_TYPE')) |
| |
| def _create_jms_bytesmessage(self, test_value_type, test_value): |
| """Create a JMS bytes message""" |
| # NOTE: test_value contains all unicode strings u'...' as returned by json |
| body_bytes = None |
| if test_value_type == 'boolean': |
| body_bytes = b'\x01' if test_value == 'True' else b'\x00' |
| elif test_value_type == 'byte': |
| body_bytes = pack('b', int(test_value, 16)) |
| elif test_value_type == 'bytes': |
| body_bytes = str(test_value) # remove unicode |
| elif test_value_type == 'char': |
| # JMS expects two-byte chars, ASCII chars can be prefixed with '\x00' |
| body_bytes = '\x00' + str(test_value) # remove unicode |
| elif test_value_type == 'double' or test_value_type == 'float': |
| body_bytes = test_value[2:].decode('hex') |
| elif test_value_type == 'int': |
| body_bytes = pack('!i', int(test_value, 16)) |
| elif test_value_type == 'long': |
| body_bytes = pack('!q', long(test_value, 16)) |
| elif test_value_type == 'short': |
| body_bytes = pack('!h', short(test_value, 16)) |
| elif test_value_type == 'string': |
| # NOTE: First two bytes must be string length |
| test_value_str = str(test_value) # remove unicode |
| body_bytes = pack('!H', len(test_value_str)) + test_value_str |
| else: |
| raise InteropTestError('JmsMessagesTestSender._create_jms_bytesmessage: Unknown or unsupported subtype "%s"' % |
| test_value_type) |
| return Message(id=(self.sent+1), |
| body=body_bytes, |
| inferred=True, |
| content_type='application/octet-stream', |
| annotations=create_annotation('JMS_BYTESMESSAGE_TYPE')) |
| |
| def _create_jms_mapmessage(self, test_value_type, test_value, name): |
| """Create a JMS map message""" |
| if test_value_type == 'boolean': |
| value = test_value == 'True' |
| elif test_value_type == 'byte': |
| value = byte(int(test_value, 16)) |
| elif test_value_type == 'bytes': |
| value = str(test_value) # remove unicode |
| elif test_value_type == 'char': |
| value = char(test_value) |
| elif test_value_type == 'double': |
| value = unpack('!d', test_value[2:].decode('hex'))[0] |
| elif test_value_type == 'float': |
| value = float32(unpack('!f', test_value[2:].decode('hex'))[0]) |
| elif test_value_type == 'int': |
| value = int32(int(test_value, 16)) |
| elif test_value_type == 'long': |
| value = long(test_value, 16) |
| elif test_value_type == 'short': |
| value = short(int(test_value, 16)) |
| elif test_value_type == 'string': |
| value = test_value |
| else: |
| raise InteropTestError('JmsMessagesTestSender._create_jms_mapmessage: Unknown or unsupported subtype "%s"' % |
| test_value_type) |
| return Message(id=(self.sent+1), |
| body={name: value}, |
| inferred=False, |
| annotations=create_annotation('JMS_MAPMESSAGE_TYPE')) |
| |
| def _create_jms_objectmessage(self, test_value): |
| """Create a JMS object message""" |
| java_binary = self._s_get_java_obj_binary(test_value) |
| return Message(id=(self.sent+1), |
| body=java_binary, |
| inferred=True, |
| content_type='application/x-java-serialized-object', |
| annotations=create_annotation('JMS_OBJECTMESSAGE_TYPE')) |
| |
| @staticmethod |
| def _s_get_java_obj_binary(java_class_str): |
| """Call external utility to create Java object and stringify it, returning the string representation""" |
| out_str = check_output(['java', |
| '-cp', |
| 'target/JavaObjUtils.jar', |
| 'org.apache.qpid.interop_test.obj_util.JavaObjToBytes', |
| java_class_str]) |
| out_str_list = out_str.split('\n')[:-1] # remove trailing \n |
| if out_str_list[0] != java_class_str: |
| raise InteropTestError('JmsMessagesTestSender._s_get_java_obj_binary(): Call to JavaObjToBytes failed\n%s' % |
| out_str) |
| return out_str_list[1].decode('hex') |
| |
| def _create_jms_streammessage(self, test_value_type, test_value): |
| """Create a JMS stream message""" |
| if test_value_type == 'boolean': |
| body_list = [test_value == 'True'] |
| elif test_value_type == 'byte': |
| body_list = [byte(int(test_value, 16))] |
| elif test_value_type == 'bytes': |
| body_list = [str(test_value)] |
| elif test_value_type == 'char': |
| body_list = [char(test_value)] |
| elif test_value_type == 'double': |
| body_list = [unpack('!d', test_value[2:].decode('hex'))[0]] |
| elif test_value_type == 'float': |
| body_list = [float32(unpack('!f', test_value[2:].decode('hex'))[0])] |
| elif test_value_type == 'int': |
| body_list = [int32(int(test_value, 16))] |
| elif test_value_type == 'long': |
| body_list = [long(test_value, 16)] |
| elif test_value_type == 'short': |
| body_list = [short(int(test_value, 16))] |
| elif test_value_type == 'string': |
| body_list = [test_value] |
| else: |
| raise InteropTestError('JmsMessagesTestSender._create_jms_streammessage: Unknown or unsupported subtype "%s"' % |
| test_value_type) |
| return Message(id=(self.sent+1), |
| body=body_list, |
| inferred=True, |
| annotations=create_annotation('JMS_STREAMMESSAGE_TYPE')) |
| |
| def _create_jms_textmessage(self, test_value_text): |
| """Create a JMS text message""" |
| return Message(id=(self.sent+1), |
| body=unicode(test_value_text), |
| annotations=create_annotation('JMS_TEXTMESSAGE_TYPE')) |
| |
| |
| |
| |
| # --- main --- |
| # Args: 1: Broker address (ip-addr:port) |
| # 2: Queue name |
| # 3: JMS message type |
| # 4: JSON Test parameters containing 3 maps: [testValueMap, testHeadersMap, testPropertiesMap] |
| #print '#### sys.argv=%s' % sys.argv |
| #print '>>> test_values=%s' % loads(sys.argv[4]) |
| try: |
| SENDER = JmsMessagesTestSender(sys.argv[1], sys.argv[2], sys.argv[3], loads(sys.argv[4])) |
| Container(SENDER).run() |
| except KeyboardInterrupt: |
| pass |
| except Exception as exc: |
| print 'jms-sender-shim EXCEPTION:', exc |
| print format_exc() |