blob: 45331248ce8ec9e0562c48c327c3345e9ee9bbe0 [file] [log] [blame]
#!/usr/bin/env python
"""
AMQP type test receiver shim for qpid-interop-test
"""
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Issues:
# * Capturing errors from client or broker
from json import dumps
import os.path
from string import digits, letters, punctuation
from struct import pack, unpack
import sys
from traceback import format_exc
from proton.handlers import MessagingHandler
from proton.reactor import Container
class AmqpTypesTestReceiver(MessagingHandler):
"""
Reciver shim for AMQP types test
This shim receives the number of messages supplied on the command-line and checks that they contain message
bodies of the exptected AMQP type. The values are then aggregated and returned.
"""
def __init__(self, broker_url, queue_name, amqp_type, num_expected_messages_str):
super(AmqpTypesTestReceiver, self).__init__()
self.broker_url = broker_url
self.queue_name = queue_name
self.received_value_list = []
self.amqp_type = amqp_type
self.expected = int(num_expected_messages_str)
self.received = 0
def get_received_value_list(self):
"""Return the received list of AMQP values"""
return self.received_value_list
def on_start(self, event):
"""Event callback for when the client starts"""
event.container.create_receiver('%s/%s' % (self.broker_url, self.queue_name))
def on_message(self, event):
"""Event callback when a message is received by the client"""
if event.message.id and event.message.id < self.received:
return # ignore duplicate message
if self.received < self.expected:
if self.amqp_type == 'null' or \
self.amqp_type == 'boolean' or \
self.amqp_type == 'uuid':
self.received_value_list.append(str(event.message.body))
elif self.amqp_type == 'ubyte' or \
self.amqp_type == 'ushort' or \
self.amqp_type == 'byte' or \
self.amqp_type == 'short' or \
self.amqp_type == 'int':
self.received_value_list.append(hex(event.message.body))
elif self.amqp_type == 'uint' or \
self.amqp_type == 'ulong' or \
self.amqp_type == 'long' or \
self.amqp_type == 'timestamp':
hex_str = hex(int(event.message.body))
if len(hex_str) == 19 and hex_str[-1] == 'L':
self.received_value_list.append(hex_str[:-1]) # strip trailing 'L' if present on some ulongs
else:
self.received_value_list.append(hex_str)
elif self.amqp_type == 'float':
self.received_value_list.append('0x%08x' % unpack('!L', pack('!f', event.message.body))[0])
elif self.amqp_type == 'double':
self.received_value_list.append('0x%016x' % unpack('!Q', pack('!d', event.message.body))[0])
elif self.amqp_type == 'decimal32':
self.received_value_list.append('0x%08x' % event.message.body)
elif self.amqp_type == 'decimal64':
self.received_value_list.append('0x%016x' % event.message.body)
elif self.amqp_type == 'decimal128':
self.received_value_list.append('0x' + ''.join(['%02x' % ord(c) for c in event.message.body]).strip())
elif self.amqp_type == 'char':
if ord(event.message.body) < 0x80 and event.message.body in digits + letters + punctuation + " ":
self.received_value_list.append(event.message.body)
else:
self.received_value_list.append(hex(ord(event.message.body)))
elif self.amqp_type == 'binary' or \
self.amqp_type == 'string' or \
self.amqp_type == 'symbol':
self.received_value_list.append(event.message.body)
elif self.amqp_type == 'list' or \
self.amqp_type == 'map':
self.received_value_list.append(event.message.body)
else:
print 'receive: Unsupported AMQP type "%s"' % self.amqp_type
return
self.received += 1
if self.received >= self.expected:
event.receiver.close()
event.connection.close()
# --- main ---
# Args: 1: Broker address (ip-addr:port)
# 2: Queue name
# 3: AMQP type
# 4: Expected number of test values to receive
try:
RECEIVER = AmqpTypesTestReceiver(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4])
Container(RECEIVER).run()
print sys.argv[3]
print dumps(RECEIVER.get_received_value_list())
except KeyboardInterrupt:
pass
except Exception as exc:
print os.path.basename(sys.argv[0]), 'EXCEPTION', exc
print format_exc()