blob: e7d11722ecec5e91a712419e2ecb3c26fe525cd8 [file] [log] [blame]
#!/usr/bin/env python
"""
JMS receiver shim for qpid-interop-test
"""
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from json import dumps, loads
from struct import pack, unpack
from subprocess import check_output
import sys
from traceback import format_exc
from proton import byte
from proton.handlers import MessagingHandler
from proton.reactor import Container
from qpid_interop_test.interop_test_errors import InteropTestError
from qpid_interop_test.jms_types import QPID_JMS_TYPE_ANNOTATION_NAME
class JmsMessagesTestReceiver(MessagingHandler):
"""
Receiver shim: This shim receives JMS messages sent by the Sender shim and prints the contents of the received
messages onto the terminal in JSON format for retrieval by the test harness. The JMS messages type and, where
applicable, body values, as well as the combinations of JMS headers and properties which may be attached to
the message are received on the command-line in JSON format when this program is launched.
"""
def __init__(self, broker_url, queue_name, jms_msg_type, test_parameters_list):
super(JmsMessagesTestReceiver, self).__init__()
self.broker_url = broker_url
self.queue_name = queue_name
self.jms_msg_type = jms_msg_type
self.expteced_msg_map = test_parameters_list
self.subtype_itr = iter(sorted(self.expteced_msg_map.keys()))
self.expected = self._get_tot_num_messages()
self.received = 0
self.received_value_map = {}
self.current_subtype = None
self.current_subtype_msg_list = None
def get_received_value_map(self):
""""Return the collected message values received"""
return self.received_value_map
def on_start(self, event):
"""Event callback for when the client starts"""
connection = event.container.connect(url=self.broker_url, sasl_enabled=False)
event.container.create_receiver(connection, source=self.queue_name)
def on_message(self, event):
"""Event callback when a message is received by the client"""
if event.message.id and event.message.id < self.received:
return # ignore duplicate message
if self.received < self.expected:
if self.current_subtype is None:
self.current_subtype = self.subtype_itr.next()
self.current_subtype_msg_list = []
self.current_subtype_msg_list.append(self._handle_message(event.message))
if len(self.current_subtype_msg_list) >= self.expteced_msg_map[self.current_subtype]:
self.received_value_map[self.current_subtype] = self.current_subtype_msg_list
self.current_subtype = None
self.current_subtype_msg_list = []
self.received += 1
if self.received >= self.expected:
event.receiver.close()
event.connection.close()
def on_connection_error(self, event):
print('JmsMessagesTestReceiver.on_connection_error')
def on_session_error(self, event):
print('JmsMessagesTestReceiver.on_session_error')
def on_link_error(self, event):
print('JmsMessagesTestReceiver.on_link_error')
def _handle_message(self, message):
"""Handles the analysis of a received message"""
if self.jms_msg_type == 'JMS_MESSAGE_TYPE':
return self._receive_jms_message(message)
if self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE':
return self._receive_jms_bytesmessage(message)
if self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE':
return self._recieve_jms_mapmessage(message)
if self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE':
return self._recieve_jms_objectmessage(message)
if self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE':
return self._receive_jms_streammessage(message)
if self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE':
return self._receive_jms_textmessage(message)
print('jms-receive: Unsupported JMS message type "%s"' % self.jms_msg_type)
return None
def _get_tot_num_messages(self):
""""Counts up the total number of messages which should be received from the expected message map"""
total = 0
for key in self.expteced_msg_map:
total += int(self.expteced_msg_map[key])
return total
def _receive_jms_message(self, message):
""""Receives a JMS message (without a body)"""
assert self.jms_msg_type == 'JMS_MESSAGE_TYPE'
assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(0)
if message.body is not None:
raise InteropTestError('_receive_jms_message: Invalid body for type JMS_MESSAGE_TYPE: %s' %
str(message.body))
return None
def _receive_jms_bytesmessage(self, message):
""""Receives a JMS bytes message"""
assert self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE'
assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(3)
if self.current_subtype == 'boolean':
if message.body == b'\x00':
return 'False'
if message.body == b'\x01':
return 'True'
raise InteropTestError('_receive_jms_bytesmessage: Invalid encoding for subtype boolean: %s' %
str(message.body))
if self.current_subtype == 'byte':
return hex(unpack('b', message.body)[0])
if self.current_subtype == 'bytes':
return str(message.body)
if self.current_subtype == 'char':
if len(message.body) == 2: # format 'a' or '\xNN'
return str(message.body[1]) # strip leading '\x00' char
raise InteropTestError('Unexpected strring length for type char: %d' % len(message.body))
if self.current_subtype == 'double':
return '0x%016x' % unpack('!Q', message.body)[0]
if self.current_subtype == 'float':
return '0x%08x' % unpack('!L', message.body)[0]
if self.current_subtype == 'int':
return hex(unpack('!i', message.body)[0])
if self.current_subtype == 'long':
return hex(unpack('!q', message.body)[0])
if self.current_subtype == 'short':
return hex(unpack('!h', message.body)[0])
if self.current_subtype == 'string':
# NOTE: first 2 bytes are string length, must be present
if len(message.body) >= 2:
str_len = unpack('!H', message.body[:2])[0]
str_body = str(message.body[2:])
if len(str_body) != str_len:
raise InteropTestError('String length mismatch: size=%d, but len(\'%s\')=%d' %
(str_len, str_body, len(str_body)))
return str_body
else:
raise InteropTestError('Malformed string binary: len(\'%s\')=%d' %
(repr(message.body), len(message.body)))
raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' %
(self.jms_msg_type, self.current_subtype))
def _recieve_jms_mapmessage(self, message):
""""Receives a JMS map message"""
assert self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE'
assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(2)
key, value = message.body.items()[0]
assert key[:-3] == self.current_subtype
if self.current_subtype == 'boolean':
return str(value)
if self.current_subtype == 'byte':
return hex(value)
if self.current_subtype == 'bytes':
return str(value)
if self.current_subtype == 'char':
return str(value)
if self.current_subtype == 'double':
return '0x%016x' % unpack('!Q', pack('!d', value))[0]
if self.current_subtype == 'float':
return '0x%08x' % unpack('!L', pack('!f', value))[0]
if self.current_subtype == 'int':
return hex(value)
if self.current_subtype == 'long':
return hex(int(value))
if self.current_subtype == 'short':
return hex(value)
if self.current_subtype == 'string':
return str(value)
raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' %
(self.jms_msg_type, self.current_subtype))
def _recieve_jms_objectmessage(self, message):
""""Receives a JMS Object message"""
assert self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE'
assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(1)
return self._get_java_obj(message.body)
def _get_java_obj(self, java_obj_bytes):
"""
Take bytes from serialized Java object and construct a Java object, then return its toString() value. The
work of 'translating' the bytes to a Java object and obtaining its class and value is done in a Java
utility org.apache.qpid.interop_test.obj_util.BytesToJavaObj located in jar JavaObjUtils.jar.
java_obj_bytes: hex string representation of bytes from Java object (eg 'aced00057372...')
returns: string containing Java class value as returned by the toString() method
"""
java_obj_bytes_str = ''.join(["%02x" % ord(x) for x in java_obj_bytes]).strip()
out_str = check_output(['java',
'-cp',
'target/JavaObjUtils.jar',
'org.apache.qpid.interop_test.obj_util.BytesToJavaObj',
java_obj_bytes_str])
out_str_list = out_str.split('\n')[:-1] # remove trailing \n
if len(out_str_list) > 1:
raise InteropTestError('Unexpected return from JavaObjUtils: %s' % out_str)
colon_index = out_str_list[0].index(':')
if colon_index < 0:
raise InteropTestError('Unexpected format from JavaObjUtils: %s' % out_str)
java_class_name = out_str_list[0][:colon_index]
java_class_value_str = out_str_list[0][colon_index+1:]
if java_class_name != self.current_subtype:
raise InteropTestError('Unexpected class name from JavaObjUtils: expected %s, recieved %s' %
(self.current_subtype, java_class_name))
return java_class_value_str
def _receive_jms_streammessage(self, message):
"""Receives a JMS stream message"""
assert self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE'
assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(4)
# Every message is a list with one item [value]
assert len(message.body) == 1
value = message.body[0]
if self.current_subtype == 'boolean':
return str(value)
if self.current_subtype == 'byte':
return hex(value)
if self.current_subtype == 'bytes':
return str(value)
if self.current_subtype == 'char':
return str(value)
if self.current_subtype == 'double':
return '0x%016x' % unpack('!Q', pack('!d', value))[0]
if self.current_subtype == 'float':
return '0x%08x' % unpack('!L', pack('!f', value))[0]
if self.current_subtype == 'int':
return hex(value)
if self.current_subtype == 'long':
return hex(int(value))
if self.current_subtype == 'short':
return hex(value)
if self.current_subtype == 'string':
return str(value)
raise InteropTestError('JmsRecieverShim._receive_jms_streammessage(): ' +
'JMS message type %s: Unknown or unsupported subtype \'%s\'' %
(self.jms_msg_type, self.current_subtype))
def _receive_jms_textmessage(self, message):
""""Receives a JMS text message"""
assert self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE'
assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(5)
return message.body
# --- main ---
# Args: 1: Broker address (ip-addr:port)
# 2: Queue name
# 3: JMS message type
# 4: JSON Test parameters containing 2 maps: [testValuesMap, flagMap]
#print('#### sys.argv=%s' % sys.argv)
try:
RECEIVER = JmsMessagesTestReceiver(sys.argv[1], sys.argv[2], sys.argv[3], loads(sys.argv[4]))
Container(RECEIVER).run()
print(sys.argv[3])
print(dumps(RECEIVER.get_received_value_map()))
except KeyboardInterrupt:
pass
except Exception as exc:
print('jms-receiver-shim EXCEPTION:', exc)
print(format_exc())