#!/usr/bin/env python3
# -*- mode: python -*-
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
"""Input/output utilities.
- i/o-specific constants
- i/o-specific exceptions
- schema validation
- leaf value encoding and decoding
- datum reader/writer stuff (?)
Also includes a generic representation for data, which uses the
following mapping:
- Schema records are implemented as dict.
- Schema arrays are implemented as list.
- Schema maps are implemented as dict.
- Schema strings are implemented as unicode.
- Schema bytes are implemented as str.
- Schema ints are implemented as int.
- Schema longs are implemented as long.
- Schema floats are implemented as float.
- Schema doubles are implemented as float.
- Schema booleans are implemented as bool.
import binascii
import json
import logging
import struct
import sys
from avro import schema
logger = logging.getLogger(__name__)
# ------------------------------------------------------------------------------
# Constants
INT_MIN_VALUE = -(1 << 31)
INT_MAX_VALUE = (1 << 31) - 1
LONG_MIN_VALUE = -(1 << 63)
LONG_MAX_VALUE = (1 << 63) - 1
STRUCT_INT = struct.Struct('!I') # big-endian unsigned int
STRUCT_FLOAT = struct.Struct('<f') # little-endian float
STRUCT_DOUBLE = struct.Struct('<d') # little-endian double
STRUCT_CRC32 = struct.Struct('>I') # big-endian unsigned int
# ------------------------------------------------------------------------------
# Exceptions
class AvroTypeException(schema.AvroException):
"""Raised when datum is not an example of schema."""
def __init__(self, expected_schema, datum):
pretty_expected = json.dumps(json.loads(str(expected_schema)), indent=2)
fail_msg = "The datum %s is not an example of the schema %s"\
% (datum, pretty_expected)
schema.AvroException.__init__(self, fail_msg)
class SchemaResolutionException(schema.AvroException):
def __init__(self, fail_msg, writer_schema=None, reader_schema=None):
pretty_writers = json.dumps(json.loads(str(writer_schema)), indent=2)
pretty_readers = json.dumps(json.loads(str(reader_schema)), indent=2)
if writer_schema: fail_msg += "\nWriter's Schema: %s" % pretty_writers
if reader_schema: fail_msg += "\nReader's Schema: %s" % pretty_readers
schema.AvroException.__init__(self, fail_msg)
# ------------------------------------------------------------------------------
# Validate
_valid = {
'null': lambda s, d: d is None,
'boolean': lambda s, d: isinstance(d, bool),
'string': lambda s, d: isinstance(d, str),
'bytes': lambda s, d: isinstance(d, bytes),
'int': lambda s, d: isinstance(d, int) and (INT_MIN_VALUE <= d <= INT_MAX_VALUE),
'long': lambda s, d: isinstance(d, int) and (LONG_MIN_VALUE <= d <= LONG_MAX_VALUE),
'float': lambda s, d: isinstance(d, (int, float)),
'fixed': lambda s, d: isinstance(d, bytes) and len(d) == s.size,
'enum': lambda s, d: d in s.symbols,
'array': lambda s, d: isinstance(d, list) and all(Validate(s.items, item) for item in d),
'map': lambda s, d: (isinstance(d, dict) and all(isinstance(key, str) for key in d)
and all(Validate(s.values, value) for value in d.values())),
'union': lambda s, d: any(Validate(branch, d) for branch in s.schemas),
'record': lambda s, d: (isinstance(d, dict)
and all(Validate(f.type, d.get( for f in s.fields)
and { for f in s.fields}.issuperset(d.keys()))
_valid['double'] = _valid['float']
_valid['error_union'] = _valid['union']
_valid['error'] = _valid['request'] = _valid['record']
def Validate(expected_schema, datum):
"""Determines if a python datum is an instance of a schema.
expected_schema: Schema to validate against.
datum: Datum to validate.
True if the datum is an instance of the schema.
return _valid[expected_schema.type](expected_schema, datum)
except KeyError:
raise AvroTypeException(expected_schema, datum)
# ------------------------------------------------------------------------------
# Decoder/Encoder
class BinaryDecoder(object):
"""Read leaf values."""
def __init__(self, reader):
reader is a Python object on which we can call read, seek, and tell.
self._reader = reader
def reader(self):
"""Reports the reader used by this decoder."""
return self._reader
def read(self, n):
"""Read n bytes.
n: Number of bytes to read.
The next n bytes from the input.
assert (n >= 0), n
input_bytes =
assert (len(input_bytes) == n), input_bytes
return input_bytes
def read_null(self):
null is written as zero bytes
return None
def read_boolean(self):
a boolean is written as a single byte
whose value is either 0 (false) or 1 (true).
return ord( == 1
def read_int(self):
int and long values are written using variable-length, zig-zag coding.
return self.read_long()
def read_long(self):
int and long values are written using variable-length, zig-zag coding.
b = ord(
n = b & 0x7F
shift = 7
while (b & 0x80) != 0:
b = ord(
n |= (b & 0x7F) << shift
shift += 7
datum = (n >> 1) ^ -(n & 1)
return datum
def read_float(self):
A float is written as 4 bytes.
The float is converted into a 32-bit integer using a method equivalent to
Java's floatToIntBits and then encoded in little-endian format.
return STRUCT_FLOAT.unpack([0]
def read_double(self):
A double is written as 8 bytes.
The double is converted into a 64-bit integer using a method equivalent to
Java's doubleToLongBits and then encoded in little-endian format.
return STRUCT_DOUBLE.unpack([0]
def read_bytes(self):
Bytes are encoded as a long followed by that many bytes of data.
nbytes = self.read_long()
assert (nbytes >= 0), nbytes
def read_utf8(self):
A string is encoded as a long followed by
that many bytes of UTF-8 encoded character data.
input_bytes = self.read_bytes()
return input_bytes.decode('utf-8')
except UnicodeDecodeError as exn:
logger.error('Invalid UTF-8 input bytes: %r', input_bytes)
raise exn
def check_crc32(self, bytes):
checksum = STRUCT_CRC32.unpack([0];
if binascii.crc32(bytes) & 0xffffffff != checksum:
raise schema.AvroException("Checksum failure")
def skip_null(self):
def skip_boolean(self):
def skip_int(self):
def skip_long(self):
b = ord(
while (b & 0x80) != 0:
b = ord(
def skip_float(self):
def skip_double(self):
def skip_bytes(self):
def skip_utf8(self):
def skip(self, n): + n)
# ------------------------------------------------------------------------------
class BinaryEncoder(object):
"""Write leaf values."""
def __init__(self, writer):
writer is a Python object on which we can call write.
self._writer = writer
def writer(self):
"""Reports the writer used by this encoder."""
return self._writer
def write(self, datum):
"""Write a sequence of bytes.
datum: Byte array, as a Python bytes.
assert isinstance(datum, bytes), ('Expecting bytes, got %r' % datum)
def WriteByte(self, byte):
def write_null(self, datum):
null is written as zero bytes
def write_boolean(self, datum):
a boolean is written as a single byte
whose value is either 0 (false) or 1 (true).
# Python maps True to 1 and False to 0.
def write_int(self, datum):
int and long values are written using variable-length, zig-zag coding.
def write_long(self, datum):
int and long values are written using variable-length, zig-zag coding.
datum = (datum << 1) ^ (datum >> 63)
while (datum & ~0x7F) != 0:
self.WriteByte((datum & 0x7f) | 0x80)
datum >>= 7
def write_float(self, datum):
A float is written as 4 bytes.
The float is converted into a 32-bit integer using a method equivalent to
Java's floatToIntBits and then encoded in little-endian format.
def write_double(self, datum):
A double is written as 8 bytes.
The double is converted into a 64-bit integer using a method equivalent to
Java's doubleToLongBits and then encoded in little-endian format.
def write_bytes(self, datum):
Bytes are encoded as a long followed by that many bytes of data.
def write_utf8(self, datum):
A string is encoded as a long followed by
that many bytes of UTF-8 encoded character data.
datum = datum.encode("utf-8")
def write_crc32(self, bytes):
A 4-byte, big-endian CRC32 checksum
self.write(STRUCT_CRC32.pack(binascii.crc32(bytes) & 0xffffffff));
# ------------------------------------------------------------------------------
# DatumReader/Writer
class DatumReader(object):
"""Deserialize Avro-encoded data into a Python data structure."""
def check_props(schema_one, schema_two, prop_list):
for prop in prop_list:
if getattr(schema_one, prop) != getattr(schema_two, prop):
return False
return True
def match_schemas(writer_schema, reader_schema):
w_type = writer_schema.type
r_type = reader_schema.type
if 'union' in [w_type, r_type] or 'error_union' in [w_type, r_type]:
return True
elif (w_type in schema.PRIMITIVE_TYPES and r_type in schema.PRIMITIVE_TYPES
and w_type == r_type):
return True
elif (w_type == r_type == 'record' and
DatumReader.check_props(writer_schema, reader_schema,
return True
elif (w_type == r_type == 'error' and
DatumReader.check_props(writer_schema, reader_schema,
return True
elif (w_type == r_type == 'request'):
return True
elif (w_type == r_type == 'fixed' and
DatumReader.check_props(writer_schema, reader_schema,
['fullname', 'size'])):
return True
elif (w_type == r_type == 'enum' and
DatumReader.check_props(writer_schema, reader_schema,
return True
elif (w_type == r_type == 'map' and
reader_schema.values, ['type'])):
return True
elif (w_type == r_type == 'array' and
reader_schema.items, ['type'])):
return True
# Handle schema promotion
if w_type == 'int' and r_type in ['long', 'float', 'double']:
return True
elif w_type == 'long' and r_type in ['float', 'double']:
return True
elif w_type == 'float' and r_type == 'double':
return True
return False
def __init__(self, writer_schema=None, reader_schema=None):
As defined in the Avro specification, we call the schema encoded
in the data the "writer's schema", and the schema expected by the
reader the "reader's schema".
self._writer_schema = writer_schema
self._reader_schema = reader_schema
# read/write properties
def set_writer_schema(self, writer_schema):
self._writer_schema = writer_schema
writer_schema = property(lambda self: self._writer_schema,
def set_reader_schema(self, reader_schema):
self._reader_schema = reader_schema
reader_schema = property(lambda self: self._reader_schema,
def read(self, decoder):
if self.reader_schema is None:
self.reader_schema = self.writer_schema
return self.read_data(self.writer_schema, self.reader_schema, decoder)
def read_data(self, writer_schema, reader_schema, decoder):
# schema matching
if not DatumReader.match_schemas(writer_schema, reader_schema):
fail_msg = 'Schemas do not match.'
raise SchemaResolutionException(fail_msg, writer_schema, reader_schema)
# schema resolution: reader's schema is a union, writer's schema is not
if (writer_schema.type not in ['union', 'error_union']
and reader_schema.type in ['union', 'error_union']):
for s in reader_schema.schemas:
if DatumReader.match_schemas(writer_schema, s):
return self.read_data(writer_schema, s, decoder)
fail_msg = 'Schemas do not match.'
raise SchemaResolutionException(fail_msg, writer_schema, reader_schema)
# function dispatch for reading data based on type of writer's schema
if writer_schema.type == 'null':
return decoder.read_null()
elif writer_schema.type == 'boolean':
return decoder.read_boolean()
elif writer_schema.type == 'string':
return decoder.read_utf8()
elif writer_schema.type == 'int':
return decoder.read_int()
elif writer_schema.type == 'long':
return decoder.read_long()
elif writer_schema.type == 'float':
return decoder.read_float()
elif writer_schema.type == 'double':
return decoder.read_double()
elif writer_schema.type == 'bytes':
return decoder.read_bytes()
elif writer_schema.type == 'fixed':
return self.read_fixed(writer_schema, reader_schema, decoder)
elif writer_schema.type == 'enum':
return self.read_enum(writer_schema, reader_schema, decoder)
elif writer_schema.type == 'array':
return self.read_array(writer_schema, reader_schema, decoder)
elif writer_schema.type == 'map':
return self.read_map(writer_schema, reader_schema, decoder)
elif writer_schema.type in ['union', 'error_union']:
return self.read_union(writer_schema, reader_schema, decoder)
elif writer_schema.type in ['record', 'error', 'request']:
return self.read_record(writer_schema, reader_schema, decoder)
fail_msg = "Cannot read unknown schema type: %s" % writer_schema.type
raise schema.AvroException(fail_msg)
def skip_data(self, writer_schema, decoder):
if writer_schema.type == 'null':
return decoder.skip_null()
elif writer_schema.type == 'boolean':
return decoder.skip_boolean()
elif writer_schema.type == 'string':
return decoder.skip_utf8()
elif writer_schema.type == 'int':
return decoder.skip_int()
elif writer_schema.type == 'long':
return decoder.skip_long()
elif writer_schema.type == 'float':
return decoder.skip_float()
elif writer_schema.type == 'double':
return decoder.skip_double()
elif writer_schema.type == 'bytes':
return decoder.skip_bytes()
elif writer_schema.type == 'fixed':
return self.skip_fixed(writer_schema, decoder)
elif writer_schema.type == 'enum':
return self.skip_enum(writer_schema, decoder)
elif writer_schema.type == 'array':
return self.skip_array(writer_schema, decoder)
elif writer_schema.type == 'map':
return self.skip_map(writer_schema, decoder)
elif writer_schema.type in ['union', 'error_union']:
return self.skip_union(writer_schema, decoder)
elif writer_schema.type in ['record', 'error', 'request']:
return self.skip_record(writer_schema, decoder)
fail_msg = "Unknown schema type: %s" % writer_schema.type
raise schema.AvroException(fail_msg)
def read_fixed(self, writer_schema, reader_schema, decoder):
Fixed instances are encoded using the number of bytes declared
in the schema.
def skip_fixed(self, writer_schema, decoder):
return decoder.skip(writer_schema.size)
def read_enum(self, writer_schema, reader_schema, decoder):
An enum is encoded by a int, representing the zero-based position
of the symbol in the schema.
# read data
index_of_symbol = decoder.read_int()
if index_of_symbol >= len(writer_schema.symbols):
fail_msg = "Can't access enum index %d for enum with %d symbols"\
% (index_of_symbol, len(writer_schema.symbols))
raise SchemaResolutionException(fail_msg, writer_schema, reader_schema)
read_symbol = writer_schema.symbols[index_of_symbol]
# schema resolution
if read_symbol not in reader_schema.symbols:
fail_msg = "Symbol %s not present in Reader's Schema" % read_symbol
raise SchemaResolutionException(fail_msg, writer_schema, reader_schema)
return read_symbol
def skip_enum(self, writer_schema, decoder):
return decoder.skip_int()
def read_array(self, writer_schema, reader_schema, decoder):
Arrays are encoded as a series of blocks.
Each block consists of a long count value,
followed by that many array items.
A block with count zero indicates the end of the array.
Each item is encoded per the array's item schema.
If a block's count is negative,
then the count is followed immediately by a long block size,
indicating the number of bytes in the block.
The actual count in this case
is the absolute value of the count written.
read_items = []
block_count = decoder.read_long()
while block_count != 0:
if block_count < 0:
block_count = -block_count
block_size = decoder.read_long()
for i in range(block_count):
reader_schema.items, decoder))
block_count = decoder.read_long()
return read_items
def skip_array(self, writer_schema, decoder):
block_count = decoder.read_long()
while block_count != 0:
if block_count < 0:
block_size = decoder.read_long()
for i in range(block_count):
self.skip_data(writer_schema.items, decoder)
block_count = decoder.read_long()
def read_map(self, writer_schema, reader_schema, decoder):
Maps are encoded as a series of blocks.
Each block consists of a long count value,
followed by that many key/value pairs.
A block with count zero indicates the end of the map.
Each item is encoded per the map's value schema.
If a block's count is negative,
then the count is followed immediately by a long block size,
indicating the number of bytes in the block.
The actual count in this case
is the absolute value of the count written.
read_items = {}
block_count = decoder.read_long()
while block_count != 0:
if block_count < 0:
block_count = -block_count
block_size = decoder.read_long()
for i in range(block_count):
key = decoder.read_utf8()
read_items[key] = self.read_data(writer_schema.values,
reader_schema.values, decoder)
block_count = decoder.read_long()
return read_items
def skip_map(self, writer_schema, decoder):
block_count = decoder.read_long()
while block_count != 0:
if block_count < 0:
block_size = decoder.read_long()
for i in range(block_count):
self.skip_data(writer_schema.values, decoder)
block_count = decoder.read_long()
def read_union(self, writer_schema, reader_schema, decoder):
A union is encoded by first writing an int value indicating
the zero-based position within the union of the schema of its value.
The value is then encoded per the indicated schema within the union.
# schema resolution
index_of_schema = int(decoder.read_long())
if index_of_schema >= len(writer_schema.schemas):
fail_msg = "Can't access branch index %d for union with %d branches"\
% (index_of_schema, len(writer_schema.schemas))
raise SchemaResolutionException(fail_msg, writer_schema, reader_schema)
selected_writer_schema = writer_schema.schemas[index_of_schema]
# read data
return self.read_data(selected_writer_schema, reader_schema, decoder)
def skip_union(self, writer_schema, decoder):
index_of_schema = int(decoder.read_long())
if index_of_schema >= len(writer_schema.schemas):
fail_msg = "Can't access branch index %d for union with %d branches"\
% (index_of_schema, len(writer_schema.schemas))
raise SchemaResolutionException(fail_msg, writer_schema)
return self.skip_data(writer_schema.schemas[index_of_schema], decoder)
def read_record(self, writer_schema, reader_schema, decoder):
A record is encoded by encoding the values of its fields
in the order that they are declared. In other words, a record
is encoded as just the concatenation of the encodings of its fields.
Field values are encoded per their schema.
Schema Resolution:
* the ordering of fields may be different: fields are matched by name.
* schemas for fields with the same name in both records are resolved
* if the writer's record contains a field with a name not present in the
reader's record, the writer's value for that field is ignored.
* if the reader's record schema has a field that contains a default value,
and writer's schema does not have a field with the same name, then the
reader should use the default value from its field.
* if the reader's record schema has a field with no default value, and
writer's schema does not have a field with the same name, then the
field's value is unset.
# schema resolution
readers_fields_dict = reader_schema.field_map
read_record = {}
for field in writer_schema.fields:
readers_field = readers_fields_dict.get(
if readers_field is not None:
field_val = self.read_data(field.type, readers_field.type, decoder)
read_record[] = field_val
self.skip_data(field.type, decoder)
# fill in default values
if len(readers_fields_dict) > len(read_record):
writers_fields_dict = writer_schema.field_map
for field_name, field in readers_fields_dict.items():
if field_name not in writers_fields_dict:
if field.has_default:
field_val = self._read_default_value(field.type, field.default)
read_record[] = field_val
fail_msg = 'No default value for field %s' % field_name
raise SchemaResolutionException(fail_msg, writer_schema,
return read_record
def skip_record(self, writer_schema, decoder):
for field in writer_schema.fields:
self.skip_data(field.type, decoder)
def _read_default_value(self, field_schema, default_value):
Basically a JSON Decoder?
if field_schema.type == 'null':
return None
elif field_schema.type == 'boolean':
return bool(default_value)
elif field_schema.type == 'int':
return int(default_value)
elif field_schema.type == 'long':
return int(default_value)
elif field_schema.type in ['float', 'double']:
return float(default_value)
elif field_schema.type in ['enum', 'fixed', 'string', 'bytes']:
return default_value
elif field_schema.type == 'array':
read_array = []
for json_val in default_value:
item_val = self._read_default_value(field_schema.items, json_val)
return read_array
elif field_schema.type == 'map':
read_map = {}
for key, json_val in default_value.items():
map_val = self._read_default_value(field_schema.values, json_val)
read_map[key] = map_val
return read_map
elif field_schema.type in ['union', 'error_union']:
return self._read_default_value(field_schema.schemas[0], default_value)
elif field_schema.type == 'record':
read_record = {}
for field in field_schema.fields:
json_val = default_value.get(
if json_val is None: json_val = field.default
field_val = self._read_default_value(field.type, json_val)
read_record[] = field_val
return read_record
fail_msg = 'Unknown type: %s' % field_schema.type
raise schema.AvroException(fail_msg)
# ------------------------------------------------------------------------------
class DatumWriter(object):
"""DatumWriter for generic python objects."""
def __init__(self, writer_schema=None):
self._writer_schema = writer_schema
# read/write properties
def set_writer_schema(self, writer_schema):
self._writer_schema = writer_schema
writer_schema = property(lambda self: self._writer_schema,
def write(self, datum, encoder):
# validate datum
if not Validate(self.writer_schema, datum):
raise AvroTypeException(self.writer_schema, datum)
self.write_data(self.writer_schema, datum, encoder)
def write_data(self, writer_schema, datum, encoder):
# function dispatch to write datum
if writer_schema.type == 'null':
elif writer_schema.type == 'boolean':
elif writer_schema.type == 'string':
elif writer_schema.type == 'int':
elif writer_schema.type == 'long':
elif writer_schema.type == 'float':
elif writer_schema.type == 'double':
elif writer_schema.type == 'bytes':
elif writer_schema.type == 'fixed':
self.write_fixed(writer_schema, datum, encoder)
elif writer_schema.type == 'enum':
self.write_enum(writer_schema, datum, encoder)
elif writer_schema.type == 'array':
self.write_array(writer_schema, datum, encoder)
elif writer_schema.type == 'map':
self.write_map(writer_schema, datum, encoder)
elif writer_schema.type in ['union', 'error_union']:
self.write_union(writer_schema, datum, encoder)
elif writer_schema.type in ['record', 'error', 'request']:
self.write_record(writer_schema, datum, encoder)
fail_msg = 'Unknown type: %s' % writer_schema.type
raise schema.AvroException(fail_msg)
def write_fixed(self, writer_schema, datum, encoder):
Fixed instances are encoded using the number of bytes declared
in the schema.
def write_enum(self, writer_schema, datum, encoder):
An enum is encoded by a int, representing the zero-based position
of the symbol in the schema.
index_of_datum = writer_schema.symbols.index(datum)
def write_array(self, writer_schema, datum, encoder):
Arrays are encoded as a series of blocks.
Each block consists of a long count value,
followed by that many array items.
A block with count zero indicates the end of the array.
Each item is encoded per the array's item schema.
If a block's count is negative,
then the count is followed immediately by a long block size,
indicating the number of bytes in the block.
The actual count in this case
is the absolute value of the count written.
if len(datum) > 0:
for item in datum:
self.write_data(writer_schema.items, item, encoder)
def write_map(self, writer_schema, datum, encoder):
Maps are encoded as a series of blocks.
Each block consists of a long count value,
followed by that many key/value pairs.
A block with count zero indicates the end of the map.
Each item is encoded per the map's value schema.
If a block's count is negative,
then the count is followed immediately by a long block size,
indicating the number of bytes in the block.
The actual count in this case
is the absolute value of the count written.
if len(datum) > 0:
for key, val in datum.items():
self.write_data(writer_schema.values, val, encoder)
def write_union(self, writer_schema, datum, encoder):
A union is encoded by first writing an int value indicating
the zero-based position within the union of the schema of its value.
The value is then encoded per the indicated schema within the union.
# resolve union
index_of_schema = -1
for i, candidate_schema in enumerate(writer_schema.schemas):
if Validate(candidate_schema, datum):
index_of_schema = i
if index_of_schema < 0: raise AvroTypeException(writer_schema, datum)
# write data
self.write_data(writer_schema.schemas[index_of_schema], datum, encoder)
def write_record(self, writer_schema, datum, encoder):
A record is encoded by encoding the values of its fields
in the order that they are declared. In other words, a record
is encoded as just the concatenation of the encodings of its fields.
Field values are encoded per their schema.
for field in writer_schema.fields:
self.write_data(field.type, datum.get(, encoder)
if __name__ == '__main__':
raise Exception('Not a standalone module')