blob: a300dc59fd4ab2a0ed0ee56ed639601570f8760c [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Utility code to translate between python objects and AMQP encoded data
The unit test for this module is located in tests/
from __future__ import absolute_import
import re, qpid, os
from . import spec08
from cStringIO import StringIO
except ImportError:
from io import StringIO
from struct import *
from .reference import ReferenceId
from logging import getLogger
log = getLogger("qpid.codec")
class EOF(Exception):
# This code appears to be dead
"long_string": "longstr",
"unsigned_int": "long"
class Codec:
class that handles encoding/decoding of AMQP primitives
def __init__(self, stream, spec):
initializing the stream/fields used
""" = stream
self.spec = spec
self.nwrote = 0
self.nread = 0
self.incoming_bits = []
self.outgoing_bits = []
# Before 0-91, the AMQP's set of types did not include the boolean type. However,
# the 0-8 and 0-9 Java client uses this type so we encode/decode it too. However, this
# can be turned off by setting the followng environment value.
if "QPID_CODEC_DISABLE_0_91_BOOLEAN" in os.environ:
self.understand_boolean = False
self.understand_boolean = True
log.debug("AMQP 0-91 boolean supported : %r", self.understand_boolean)
self.types = {} = {}
self.integertypes = [int, long]
self.encodings = {
float: "double", # python uses 64bit floats, send them as doubles
basestring: "longstr",
list: "sequence",
tuple: "sequence",
dict: "table"
if self.understand_boolean:
self.encodings[bool] = "boolean"
for constant in self.spec.constants:
# This code appears to be dead
if constant.klass == "field-table-type":
type ="field_table_", "")
self.typecode(, TYPE_ALIASES.get(type, type))
if not self.types:
# long-string 'S'
self.typecode(ord('S'), "longstr")
# void 'V'
self.typecode(ord('V'), "void")
# long-int 'I' (32bit signed)
self.typecode(ord('I'), "signed_int")
# long-long-int 'l' (64bit signed)
# This is a long standing pre-0-91-spec type used by the Java
# client, 0-9-1 says it should be unsigned or use 'L')
self.typecode(ord('l'), "signed_long")
# double 'd'
self.typecode(ord('d'), "double")
# float 'f'
self.typecode(ord('f'), "float")
if self.understand_boolean:
self.typecode(ord('t'), "boolean")
## The following are supported for decoding only ##
# short-short-uint 'b' (8bit signed)
self.types[ord('b')] = "signed_octet"
# short-int 's' (16bit signed)
# This is a long standing pre-0-91-spec type code used by the Java
# client to send shorts, it should really be a short-string, or for 0-9-1 use 'U'
self.types[ord('s')] = "signed_short"
def typecode(self, code, type):
self.types[code] = type[type] = code
def resolve(self, klass, value):
if(klass in self.integertypes):
if (value >= -2147483648 and value <= 2147483647):
return "signed_int"
elif (value >= -9223372036854775808 and value <= 9223372036854775807):
return "signed_long"
raise ValueError('Integer value is outwith the supported 64bit signed range')
if klass in self.encodings:
return self.encodings[klass]
for base in klass.__bases__:
result = self.resolve(base, value)
if result != None:
return result
def read(self, n):
reads in 'n' bytes from the stream. Can raise EOF exception
data =
if n > 0 and len(data) == 0:
raise EOF()
self.nread += len(data)
return data
def write(self, s):
writes data 's' to the stream
self.nwrote += len(s)
def flush(self):
flushes the bits and data present in the stream
def flushbits(self):
flushes the bits(compressed into octets) onto the stream
if len(self.outgoing_bits) > 0:
bytes = []
index = 0
for b in self.outgoing_bits:
if index == 0: bytes.append(0)
if b: bytes[-1] |= 1 << index
index = (index + 1) % 8
del self.outgoing_bits[:]
for byte in bytes:
def clearbits(self):
if self.incoming_bits:
self.incoming_bits = []
def pack(self, fmt, *args):
packs the data 'args' as per the format 'fmt' and writes it to the stream
self.write(pack(fmt, *args))
def unpack(self, fmt):
reads data from the stream and unpacks it as per the format 'fmt'
size = calcsize(fmt)
data =
values = unpack(fmt, data)
if len(values) == 1:
return values[0]
return values
def encode(self, type, value):
calls the appropriate encode function e.g. encode_octet, encode_short etc.
if isinstance(type, spec08.Struct):
self.encode_struct(type, value)
getattr(self, "encode_" + type)(value)
def decode(self, type):
calls the appropriate decode function e.g. decode_octet, decode_short etc.
if isinstance(type, spec08.Struct):
return self.decode_struct(type)
log.debug("Decoding using method: decode_" + type)
return getattr(self, "decode_" + type)()
def encode_bit(self, o):
encodes a bit
if o:
def decode_bit(self):
decodes a bit
if len(self.incoming_bits) == 0:
bits = self.decode_octet()
for i in range(8):
self.incoming_bits.append(bits >> i & 1 != 0)
return self.incoming_bits.pop(0)
def encode_octet(self, o):
encodes an UNSIGNED octet (8 bits) data 'o' in network byte order
# octet's valid range is [0,255]
if (o < 0 or o > 255):
raise ValueError('Valid range of octet is [0,255]')
self.pack("!B", int(o))
def decode_octet(self):
decodes an UNSIGNED octet (8 bits) encoded in network byte order
return self.unpack("!B")
def decode_signed_octet(self):
decodes a signed octet (8 bits) encoded in network byte order
return self.unpack("!b")
def encode_short(self, o):
encodes an UNSIGNED short (16 bits) data 'o' in network byte order
AMQP 0-9-1 type: short-uint
# short int's valid range is [0,65535]
if (o < 0 or o > 65535):
raise ValueError('Valid range of short int is [0,65535]: %s' % o)
self.pack("!H", int(o))
def decode_short(self):
decodes an UNSIGNED short (16 bits) in network byte order
AMQP 0-9-1 type: short-uint
return self.unpack("!H")
def decode_signed_short(self):
decodes a signed short (16 bits) in network byte order
AMQP 0-9-1 type: short-int
return self.unpack("!h")
def encode_long(self, o):
encodes an UNSIGNED long (32 bits) data 'o' in network byte order
AMQP 0-9-1 type: long-uint
# we need to check both bounds because on 64 bit platforms
# struct.pack won't raise an error if o is too large
if (o < 0 or o > 4294967295):
raise ValueError('Valid range of long int is [0,4294967295]')
self.pack("!L", int(o))
def decode_long(self):
decodes an UNSIGNED long (32 bits) in network byte order
AMQP 0-9-1 type: long-uint
return self.unpack("!L")
def encode_signed_long(self, o):
encodes a signed long (64 bits) in network byte order
AMQP 0-9-1 type: long-long-int
self.pack("!q", o)
def decode_signed_long(self):
decodes a signed long (64 bits) in network byte order
AMQP 0-9-1 type: long-long-int
return self.unpack("!q")
def encode_signed_int(self, o):
encodes a signed int (32 bits) in network byte order
AMQP 0-9-1 type: long-int
self.pack("!l", o)
def decode_signed_int(self):
decodes a signed int (32 bits) in network byte order
AMQP 0-9-1 type: long-int
return self.unpack("!l")
def encode_longlong(self, o):
encodes an UNSIGNED long long (64 bits) data 'o' in network byte order
AMQP 0-9-1 type: long-long-uint
self.pack("!Q", o)
def decode_longlong(self):
decodes an UNSIGNED long long (64 bits) in network byte order
AMQP 0-9-1 type: long-long-uint
return self.unpack("!Q")
def encode_float(self, o):
self.pack("!f", o)
def decode_float(self):
return self.unpack("!f")
def encode_double(self, o):
self.pack("!d", o)
def decode_double(self):
return self.unpack("!d")
def encode_bin128(self, b):
for idx in range (0,16):
self.pack("!B", ord (b[idx]))
def decode_bin128(self):
result = ""
for idx in range (0,16):
result = result + chr (self.unpack("!B"))
return result
def encode_raw(self, len, b):
for idx in range (0,len):
self.pack("!B", b[idx])
def decode_raw(self, len):
result = ""
for idx in range (0,len):
result = result + chr (self.unpack("!B"))
return result
def enc_str(self, fmt, s):
encodes a string 's' in network byte order as per format 'fmt'
size = len(s)
self.pack(fmt, size)
def dec_str(self, fmt):
decodes a string in network byte order as per format 'fmt'
size = self.unpack(fmt)
def encode_shortstr(self, s):
encodes a short string 's' in network byte order
# short strings are limited to 255 octets
if len(s) > 255:
raise ValueError('Short strings are limited to 255 octets')
self.enc_str("!B", s)
def decode_shortstr(self):
decodes a short string in network byte order
return self.dec_str("!B")
def encode_longstr(self, s):
encodes a long string 's' in network byte order
if isinstance(s, dict):
self.enc_str("!L", s)
def decode_longstr(self):
decodes a long string 's' in network byte order
return self.dec_str("!L")
def encode_table(self, tbl):
encodes a table data structure in network byte order
enc = StringIO()
codec = Codec(enc, self.spec)
if tbl:
for key, value in tbl.items():
if self.spec.major == 8 and self.spec.minor == 0 and len(key) > 128:
raise ValueError("field table key too long: '%s'" % key)
type = self.resolve(value.__class__, value)
if type == None:
raise ValueError("no encoding for: " + str(value.__class__))
codec.encode(type, value)
s = enc.getvalue()
def decode_table(self):
decodes a table data structure in network byte order
size = self.decode_long()
start = self.nread
result = {}
while self.nread - start < size:
key = self.decode_shortstr()
log.debug("Field table entry key: %r", key)
code = self.decode_octet()
log.debug("Field table entry type code: %r", code)
if code in self.types:
value = self.decode(self.types[code])
w = width(code)
if fixed(code):
value =
value =
result[key] = value
log.debug("Field table entry value: %r", value)
return result
def encode_timestamp(self, t):
encodes a timestamp data structure in network byte order
def decode_timestamp(self):
decodes a timestamp data structure in network byte order
return self.decode_longlong()
def encode_content(self, s):
encodes a content data structure in network byte order
content can be passed as a string in which case it is assumed to
be inline data, or as an instance of ReferenceId indicating it is
a reference id
if isinstance(s, ReferenceId):
def decode_content(self):
decodes a content data structure in network byte order
return a string for inline data and a ReferenceId instance for
type = self.decode_octet()
if type == 0:
return self.decode_longstr()
return ReferenceId(self.decode_longstr())
# new domains for 0-10:
def encode_rfc1982_long(self, s):
def decode_rfc1982_long(self):
return self.decode_long()
def encode_rfc1982_long_set(self, s):
self.encode_short(len(s) * 4)
for i in s:
def decode_rfc1982_long_set(self):
count = self.decode_short() / 4
set = []
for i in range(0, count):
return set
def encode_uuid(self, s):
self.pack("16s", s)
def decode_uuid(self):
return self.unpack("16s")
def encode_void(self,o):
#NO-OP, value is implicit in the type.
def decode_void(self):
return None
def enc_num(self, width, n):
if width == 1:
elif width == 2:
elif width == 3:
raise ValueError("invalid width: %s" % width)
def dec_num(self, width):
if width == 1:
return self.decode_octet()
elif width == 2:
return self.decode_short()
elif width == 4:
return self.decode_long()
raise ValueError("invalid width: %s" % width)
def encode_struct(self, type, s):
if type.size:
enc = StringIO()
codec = Codec(enc, self.spec)
codec.encode_struct_body(type, s)
body = enc.getvalue()
self.enc_num(type.size, len(body))
self.encode_struct_body(type, s)
def decode_struct(self, type):
if type.size:
size = self.dec_num(type.size)
if size == 0:
return None
return self.decode_struct_body(type)
def encode_struct_body(self, type, s):
reserved = 8*type.pack - len(type.fields)
assert reserved >= 0
for f in type.fields:
if s == None:
elif f.type == "bit":
for i in range(reserved):
for f in type.fields:
if f.type != "bit" and s != None and s.has(
self.encode(f.type, s.get(
def decode_struct_body(self, type):
reserved = 8*type.pack - len(type.fields)
assert reserved >= 0
s = qpid.Struct(type)
for f in type.fields:
if f.type == "bit":
s.set(, self.decode_bit())
elif self.decode_bit():
s.set(, None)
for i in range(reserved):
if self.decode_bit():
raise ValueError("expecting reserved flag")
for f in type.fields:
if f.type != "bit" and s.has(
s.set(, self.decode(f.type))
return s
def encode_long_struct(self, s):
enc = StringIO()
codec = Codec(enc, self.spec)
type = s.type
codec.encode_struct_body(type, s)
def decode_long_struct(self):
codec = Codec(StringIO(self.decode_longstr()), self.spec)
type = self.spec.structs[codec.decode_short()]
return codec.decode_struct_body(type)
def decode_array(self):
size = self.decode_long()
code = self.decode_octet()
count = self.decode_long()
result = []
for i in range(0, count):
if code in self.types:
value = self.decode(self.types[code])
w = width(code)
if fixed(code):
value =
value =
return result
def encode_boolean(self, s):
if (s):
self.pack("!c", "\x01")
self.pack("!c", "\x00")
def decode_boolean(self):
b = self.unpack("!c")
if b == "\x00":
return False
# AMQP spec says anything else is True
return True
def fixed(code):
return (code >> 6) != 2
def width(code):
# decimal
if code >= 192:
decsel = (code >> 4) & 3
if decsel == 0:
return 5
elif decsel == 1:
return 9
elif decsel == 3:
return 0
raise ValueError(code)
# variable width
elif code < 192 and code >= 128:
lenlen = (code >> 4) & 3
if lenlen == 3: raise ValueError(code)
return 2 ** lenlen
# fixed width
return (code >> 4) & 7