blob: 04413ee7afede1a18a9a8a3470328f94e1cb279d [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import absolute_import
from uuid import uuid4
from proton import *
from . import common
class Test(common.Test):
def setUp(self):
self.msg = Message()
def tearDown(self):
self.msg = None
class AccessorsTest(Test):
def _test(self, name, default, values):
d = getattr(self.msg, name)
assert d == default, (d, default)
for v in values:
setattr(self.msg, name, v)
gotten = getattr(self.msg, name)
assert gotten == v, gotten
def _test_symbol(self, name):
self._test(name, symbol(None), (symbol(u"abc.123.#$%"), symbol(u"hello.world")))
def _test_str(self, name):
self._test(name, None, (u"asdf", u"fdsa", u""))
def _test_time(self, name):
self._test(name, 0, (0, 123456789, 987654321))
def testId(self):
self._test("id", None, ("bytes", None, 123, u"string", uuid4()))
def testCorrelationId(self):
self._test("correlation_id", None, ("bytes", None, 123, u"string", uuid4()))
def testDurable(self):
self._test("durable", False, (True, False))
def testPriority(self):
self._test("priority", Message.DEFAULT_PRIORITY, range(0, 255))
def testTtl(self):
self._test("ttl", 0, range(12345, 54321))
def testFirstAcquirer(self):
self._test("first_acquirer", False, (True, False))
def testDeliveryCount(self):
self._test("delivery_count", 0, range(0, 1024))
def testUserId(self):
self._test("user_id", b"", (b"asdf", b"fdsa", b"asd\x00fdsa", b""))
def testAddress(self):
self._test_str("address")
def testSubject(self):
self._test_str("subject")
def testReplyTo(self):
self._test_str("reply_to")
def testContentType(self):
self._test_symbol("content_type")
def testContentEncoding(self):
self._test_symbol("content_encoding")
def testExpiryTime(self):
self._test_time("expiry_time")
def testCreationTime(self):
self._test_time("creation_time")
def testGroupId(self):
self._test_str("group_id")
def testGroupSequence(self):
self._test("group_sequence", 0, (0, 1, 10, 20, 4294967294, 4294967295))
def testReplyToGroupId(self):
self._test_str("reply_to_group_id")
class CodecTest(Test):
def testProperties(self):
self.msg.properties = {}
self.msg.properties['key'] = 'value'
data = self.msg.encode()
msg2 = Message()
msg2.decode(data)
assert msg2.properties['key'] == 'value', msg2.properties['key']
def testAnnotationsSymbolicAndUlongKey(self, a={symbol('one'): 1, 'two': 2, ulong(3): 'three'}):
self.msg.annotations = a
data = self.msg.encode()
msg2 = Message()
msg2.decode(data)
# both keys must be symbols
assert msg2.annotations == a
def testRoundTrip(self):
self.msg.id = "asdf"
self.msg.correlation_id = uuid4()
self.msg.ttl = 3
self.msg.priority = 100
self.msg.address = "address"
self.msg.subject = "subject"
self.msg.body = 'Hello World!'
data = self.msg.encode()
msg2 = Message()
msg2.decode(data)
assert self.msg.id == msg2.id, (self.msg.id, msg2.id)
assert self.msg.correlation_id == msg2.correlation_id, (self.msg.correlation_id, msg2.correlation_id)
assert self.msg.ttl == msg2.ttl, (self.msg.ttl, msg2.ttl)
assert self.msg.priority == msg2.priority, (self.msg.priority, msg2.priority)
assert self.msg.address == msg2.address, (self.msg.address, msg2.address)
assert self.msg.subject == msg2.subject, (self.msg.subject, msg2.subject)
assert self.msg.body == msg2.body, (self.msg.body, msg2.body)
def testExpiryEncodeAsNull(self):
self.msg.group_id = "A" # Force creation and expiry fields to be present
data = self.msg.encode()
decoder = Data()
# Skip past the headers
consumed = decoder.decode(data)
decoder.clear()
data = data[consumed:]
decoder.decode(data)
dproperties = decoder.get_py_described()
# Check we've got the correct described list
assert dproperties.descriptor == 0x73, (dproperties.descriptor)
properties = dproperties.value
assert properties[8] == None, properties[8]
def testCreationEncodeAsNull(self):
self.msg.group_id = "A" # Force creation and expiry fields to be present
data = self.msg.encode()
decoder = Data()
# Skip past the headers
consumed = decoder.decode(data)
decoder.clear()
data = data[consumed:]
decoder.decode(data)
dproperties = decoder.get_py_described()
# Check we've got the correct described list
assert dproperties.descriptor == 0x73, (dproperties.descriptor)
properties = dproperties.value
assert properties[9] == None, properties[9]
def testGroupSequenceEncodeAsNull(self):
self.msg.reply_to_group_id = "R" # Force group_id and group_sequence fields to be present
data = self.msg.encode()
decoder = Data()
# Skip past the headers
consumed = decoder.decode(data)
decoder.clear()
data = data[consumed:]
decoder.decode(data)
dproperties = decoder.get_py_described()
# Check we've got the correct described list
assert dproperties.descriptor == 0x73, (dproperties.descriptor)
properties = dproperties.value
assert properties[10] == None, properties[10]
assert properties[11] == None, properties[11]
def testGroupSequenceEncodeAsNonNull(self):
self.msg.group_id = "G"
self.msg.reply_to_group_id = "R" # Force group_id and group_sequence fields to be present
data = self.msg.encode()
decoder = Data()
# Skip past the headers
consumed = decoder.decode(data)
decoder.clear()
data = data[consumed:]
decoder.decode(data)
dproperties = decoder.get_py_described()
# Check we've got the correct described list
assert dproperties.descriptor == 0x73, (dproperties.descriptor)
properties = dproperties.value
assert properties[10] == 'G', properties[10]
assert properties[11] == 0, properties[11]
def testDefaultCreationExpiryDecode(self):
# This is a message with everything filled explicitly as null or zero in LIST32 HEADER and PROPERTIES lists
data = b'\x00\x53\x70\xd0\x00\x00\x00\x0a\x00\x00\x00\x05\x42\x40\x40\x42\x52\x00\x00\x53\x73\xd0\x00\x00\x00\x12\x00\x00\x00\x0d\x40\x40\x40\x40\x40\x40\x40\x40\x40\x40\x40\x52\x00\x40'
msg2 = Message()
msg2.decode(data)
assert msg2.expiry_time == 0, (msg2.expiry_time)
assert msg2.creation_time == 0, (msg2.creation_time)
# The same message with LIST8s instead
data = b'\x00\x53\x70\xc0\x07\x05\x42\x40\x40\x42\x52\x00\x00\x53\x73\xc0\x0f\x0d\x40\x40\x40\x40\x40\x40\x40\x40\x40\x40\x40\x52\x00\x40'
msg3 = Message()
msg3.decode(data)
assert msg2.expiry_time == 0, (msg2.expiry_time)
assert msg2.creation_time == 0, (msg2.creation_time)
# Minified message with zero length HEADER and PROPERTIES lists
data = b'\x00\x53\x70\x45' b'\x00\x53\x73\x45'
msg4 = Message()
msg4.decode(data)
assert msg2.expiry_time == 0, (msg2.expiry_time)
assert msg2.creation_time == 0, (msg2.creation_time)
def testDefaultPriorityEncode(self):
assert self.msg.priority == 4, (self.msg.priority)
self.msg.ttl = 0.003 # field after priority, so forces priority to be present
data = self.msg.encode()
decoder = Data()
decoder.decode(data)
dheaders = decoder.get_py_described()
# Check we've got the correct described list
assert dheaders.descriptor == 0x70, (dheaders.descriptor)
# Check that the priority field (second field) is encoded as null
headers = dheaders.value
assert headers[1] == None, (headers[1])
def testDefaultPriorityDecode(self):
# This is a message with everything filled explicitly as null or zero in LIST32 HEADER and PROPERTIES lists
data = b'\x00\x53\x70\xd0\x00\x00\x00\x0a\x00\x00\x00\x05\x42\x40\x40\x42\x52\x00\x00\x53\x73\xd0\x00\x00\x00\x22\x00\x00\x00\x0d\x40\x40\x40\x40\x40\x40\x40\x40\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00\x40\x52\x00\x40'
msg2 = Message()
msg2.decode(data)
assert msg2.priority == 4, (msg2.priority)
# The same message with LIST8s instead
data = b'\x00\x53\x70\xc0\x07\x05\x42\x40\x40\x42\x52\x00\x00\x53\x73\xc0\x1f\x0d\x40\x40\x40\x40\x40\x40\x40\x40\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00\x40\x52\x00\x40'
msg3 = Message()
msg3.decode(data)
assert msg3.priority == 4, (msg3.priority)
# Minified message with zero length HEADER and PROPERTIES lists
data = b'\x00\x53\x70\x45' b'\x00\x53\x73\x45'
msg4 = Message()
msg4.decode(data)
assert msg4.priority == 4, (msg4.priority)