blob: 29dd82130cea6e7e727ac48023a634258a413e02 [file] [log] [blame]
#!/usr/bin/env python3
##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import binascii
import datetime
import decimal
import io
import itertools
import json
import unittest
import warnings
from typing import BinaryIO, Collection, Dict, List, Optional, Tuple, Union, cast
import avro.io
import avro.schema
import avro.timezones
from avro.utils import TypedDict
class DefaultValueTestCaseType(TypedDict):
H: object
SCHEMAS_TO_VALIDATE = tuple(
(json.dumps(schema), datum)
for schema, datum in (
("null", None),
("boolean", True),
("string", "adsfasdf09809dsf-=adsf"),
("bytes", b"12345abcd"),
("int", 1234),
("long", 1234),
("float", 1234.0),
("double", 1234.0),
({"type": "fixed", "name": "Test", "size": 1}, b"B"),
(
{
"type": "fixed",
"logicalType": "decimal",
"name": "Test",
"size": 8,
"precision": 5,
"scale": 4,
},
decimal.Decimal("3.1415"),
),
(
{
"type": "fixed",
"logicalType": "decimal",
"name": "Test",
"size": 8,
"precision": 5,
"scale": 4,
},
decimal.Decimal("-3.1415"),
),
(
{
"type": "fixed",
"logicalType": "decimal",
"name": "Test",
"size": 8,
"precision": 1,
},
decimal.Decimal("3"),
),
(
{"type": "bytes", "logicalType": "decimal", "precision": 5, "scale": 4},
decimal.Decimal("3.1415"),
),
(
{"type": "bytes", "logicalType": "decimal", "precision": 5, "scale": 4},
decimal.Decimal("-3.1415"),
),
(
{"type": "bytes", "logicalType": "decimal", "precision": 1},
decimal.Decimal("3"),
),
({"type": "enum", "name": "Test", "symbols": ["A", "B"]}, "B"),
({"type": "array", "items": "long"}, [1, 3, 2]),
({"type": "map", "values": "long"}, {"a": 1, "b": 3, "c": 2}),
(["string", "null", "long"], None),
({"type": "int", "logicalType": "date"}, datetime.date(2000, 1, 1)),
(
{"type": "int", "logicalType": "time-millis"},
datetime.time(23, 59, 59, 999000),
),
({"type": "int", "logicalType": "time-millis"}, datetime.time(0, 0, 0, 000000)),
(
{"type": "long", "logicalType": "time-micros"},
datetime.time(23, 59, 59, 999999),
),
(
{"type": "long", "logicalType": "time-micros"},
datetime.time(0, 0, 0, 000000),
),
(
{"type": "long", "logicalType": "timestamp-millis"},
datetime.datetime(1000, 1, 1, 0, 0, 0, 000000, tzinfo=avro.timezones.utc),
),
(
{"type": "long", "logicalType": "timestamp-millis"},
datetime.datetime(9999, 12, 31, 23, 59, 59, 999000, tzinfo=avro.timezones.utc),
),
(
{"type": "long", "logicalType": "timestamp-millis"},
datetime.datetime(2000, 1, 18, 2, 2, 1, 100000, tzinfo=avro.timezones.tst),
),
(
{"type": "long", "logicalType": "timestamp-micros"},
datetime.datetime(1000, 1, 1, 0, 0, 0, 000000, tzinfo=avro.timezones.utc),
),
(
{"type": "long", "logicalType": "timestamp-micros"},
datetime.datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=avro.timezones.utc),
),
(
{"type": "long", "logicalType": "timestamp-micros"},
datetime.datetime(2000, 1, 18, 2, 2, 1, 123499, tzinfo=avro.timezones.tst),
),
(
{"type": "string", "logicalType": "uuid"},
"a4818e1c-8e59-11eb-8dcd-0242ac130003",
), # UUID1
(
{"type": "string", "logicalType": "uuid"},
"570feebe-2bbc-4937-98df-285944e1dbbd",
), # UUID4
({"type": "string", "logicalType": "unknown-logical-type"}, "12345abcd"),
({"type": "string", "logicalType": "timestamp-millis"}, "12345abcd"),
(
{
"type": "record",
"name": "Test",
"fields": [{"name": "f", "type": "long"}],
},
{"f": 5},
),
(
{
"type": "record",
"name": "Lisp",
"fields": [
{
"name": "value",
"type": [
"null",
"string",
{
"type": "record",
"name": "Cons",
"fields": [
{"name": "car", "type": "Lisp"},
{"name": "cdr", "type": "Lisp"},
],
},
],
}
],
},
{"value": {"car": {"value": "head"}, "cdr": {"value": None}}},
),
(
{"type": "record", "name": "record", "fields": [{"name": "value", "type": "int"}, {"name": "next", "type": ["null", "record"]}]},
{"value": 0, "next": {"value": 1, "next": None}},
),
(
{"type": "record", "name": "ns.long", "fields": [{"name": "value", "type": "int"}, {"name": "next", "type": ["null", "ns.long"]}]},
{"value": 0, "next": {"value": 1, "next": None}},
),
)
)
BINARY_ENCODINGS = (
(0, b"00"),
(-1, b"01"),
(1, b"02"),
(-2, b"03"),
(2, b"04"),
(-64, b"7f"),
(64, b"80 01"),
(8192, b"80 80 01"),
(-8193, b"81 80 01"),
)
DEFAULT_VALUE_EXAMPLES = (
("null", None),
("boolean", True),
("string", "foo"),
("bytes", "\xff\xff"),
("int", 5),
("long", 5),
("float", 1.1),
("double", 1.1),
({"type": "fixed", "name": "F", "size": 2}, "\xff\xff"),
({"type": "enum", "name": "F", "symbols": ["FOO", "BAR"]}, "FOO"),
({"type": "array", "items": "int"}, [1, 2, 3]),
({"type": "map", "values": "int"}, {"a": 1, "b": 2}),
(["int", "null"], 5),
(
{"type": "record", "name": "F", "fields": [{"name": "A", "type": "int"}]},
{"A": 5},
),
)
LONG_RECORD_SCHEMA = avro.schema.parse(
json.dumps(
{
"type": "record",
"name": "Test",
"fields": [
{"name": "A", "type": "int"},
{"name": "B", "type": "int"},
{"name": "C", "type": "int"},
{"name": "D", "type": "int"},
{"name": "E", "type": "int"},
{"name": "F", "type": "int"},
{"name": "G", "type": "int"},
],
}
)
)
LONG_RECORD_DATUM = {"A": 1, "B": 2, "C": 3, "D": 4, "E": 5, "F": 6, "G": 7}
def avro_hexlify(reader: BinaryIO) -> bytes:
"""Return the hex value, as a string, of a binary-encoded int or long."""
b = []
current_byte = reader.read(1)
b.append(binascii.hexlify(current_byte))
while (ord(current_byte) & 0x80) != 0:
current_byte = reader.read(1)
b.append(binascii.hexlify(current_byte))
return b" ".join(b)
def write_datum(datum: object, writers_schema: avro.schema.Schema) -> Tuple[io.BytesIO, avro.io.BinaryEncoder, avro.io.DatumWriter]:
writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(writer)
datum_writer = avro.io.DatumWriter(writers_schema)
datum_writer.write(datum, encoder)
return writer, encoder, datum_writer
def read_datum(buffer: io.BytesIO, writers_schema: avro.schema.Schema, readers_schema: Optional[avro.schema.Schema] = None) -> object:
reader = io.BytesIO(buffer.getvalue())
decoder = avro.io.BinaryDecoder(reader)
datum_reader = avro.io.DatumReader(writers_schema, readers_schema)
return datum_reader.read(decoder)
class IoValidateTestCase(unittest.TestCase):
def __init__(self, test_schema: str, test_datum: object) -> None:
"""Ignore the normal signature for unittest.TestCase because we are generating
many test cases from this one class. This is safe as long as the autoloader
ignores this class. The autoloader will ignore this class as long as it has
no methods starting with `test_`.
"""
super().__init__("io_valid")
self.test_schema = avro.schema.parse(test_schema)
self.test_datum = test_datum
# Never hide repeated warnings when running this test case.
warnings.simplefilter("always")
def io_valid(self) -> None:
"""
In these cases, the provided data should be valid with the given schema.
"""
with warnings.catch_warnings(record=True) as actual_warnings:
self.assertTrue(
avro.io.validate(self.test_schema, self.test_datum),
f"{self.test_datum} did not validate in the schema {self.test_schema}",
)
class RoundTripTestCase(unittest.TestCase):
def __init__(self, test_schema: str, test_datum: object) -> None:
"""Ignore the normal signature for unittest.TestCase because we are generating
many test cases from this one class. This is safe as long as the autoloader
ignores this class. The autoloader will ignore this class as long as it has
no methods starting with `test_`.
"""
super().__init__("io_round_trip")
self.test_schema = avro.schema.parse(test_schema)
self.test_datum = test_datum
# Never hide repeated warnings when running this test case.
warnings.simplefilter("always")
def io_round_trip(self) -> None:
"""
A datum should be the same after being encoded and then decoded.
"""
with warnings.catch_warnings(record=True) as actual_warnings:
writer, encoder, datum_writer = write_datum(self.test_datum, self.test_schema)
round_trip_datum = read_datum(writer, self.test_schema)
expected: object
round_trip: object
if isinstance(round_trip_datum, decimal.Decimal):
expected, round_trip, message = (
str(self.test_datum),
round_trip_datum.to_eng_string(),
"Decimal datum changed value after encode and decode",
)
elif isinstance(round_trip_datum, datetime.datetime):
expected, round_trip, message = (
cast(datetime.datetime, self.test_datum).astimezone(tz=avro.timezones.utc),
round_trip_datum,
"DateTime datum changed value after encode and decode",
)
else:
expected, round_trip, message = (
self.test_datum,
round_trip_datum,
"Datum changed value after encode and decode",
)
self.assertEqual(expected, round_trip, message)
class BinaryEncodingTestCase(unittest.TestCase):
def __init__(self, skip: bool, test_type: str, test_datum: object, test_hex: bytes) -> None:
"""Ignore the normal signature for unittest.TestCase because we are generating
many test cases from this one class. This is safe as long as the autoloader
ignores this class. The autoloader will ignore this class as long as it has
no methods starting with `test_`.
"""
super().__init__(f"check_{'skip' if skip else 'binary'}_encoding")
self.writers_schema = avro.schema.parse(f'"{test_type}"')
self.test_datum = test_datum
self.test_hex = test_hex
# Never hide repeated warnings when running this test case.
warnings.simplefilter("always")
def check_binary_encoding(self) -> None:
with warnings.catch_warnings(record=True) as actual_warnings:
writer, encoder, datum_writer = write_datum(self.test_datum, self.writers_schema)
writer.seek(0)
hex_val = avro_hexlify(writer)
self.assertEqual(
self.test_hex,
hex_val,
"Binary encoding did not match expected hex representation.",
)
def check_skip_encoding(self) -> None:
VALUE_TO_READ = 6253
with warnings.catch_warnings(record=True) as actual_warnings:
# write the value to skip and a known value
writer, encoder, datum_writer = write_datum(self.test_datum, self.writers_schema)
datum_writer.write(VALUE_TO_READ, encoder)
# skip the value
reader = io.BytesIO(writer.getvalue())
decoder = avro.io.BinaryDecoder(reader)
decoder.skip_long()
# read data from string buffer
datum_reader = avro.io.DatumReader(self.writers_schema)
read_value = datum_reader.read(decoder)
self.assertEqual(
read_value,
VALUE_TO_READ,
"Unexpected value after skipping a binary encoded value.",
)
class SchemaPromotionTestCase(unittest.TestCase):
def __init__(self, write_type: str, read_type: str) -> None:
"""Ignore the normal signature for unittest.TestCase because we are generating
many test cases from this one class. This is safe as long as the autoloader
ignores this class. The autoloader will ignore this class as long as it has
no methods starting with `test_`.
"""
super().__init__("check_schema_promotion")
self.writers_schema = avro.schema.parse(f'"{write_type}"')
self.readers_schema = avro.schema.parse(f'"{read_type}"')
# Never hide repeated warnings when running this test case.
warnings.simplefilter("always")
def check_schema_promotion(self) -> None:
"""Test schema promotion"""
# note that checking writers_schema.type in read_data
# allows us to handle promotion correctly
DATUM_TO_WRITE = 219
with warnings.catch_warnings(record=True) as actual_warnings:
writer, enc, dw = write_datum(DATUM_TO_WRITE, self.writers_schema)
datum_read = read_datum(writer, self.writers_schema, self.readers_schema)
self.assertEqual(
datum_read,
DATUM_TO_WRITE,
f"Datum changed between schema that were supposed to promote: writer: {self.writers_schema} reader: {self.readers_schema}.",
)
class DefaultValueTestCase(unittest.TestCase):
def __init__(self, field_type: Collection[str], default: Union[Dict[str, int], List[int], None, float, str]) -> None:
"""Ignore the normal signature for unittest.TestCase because we are generating
many test cases from this one class. This is safe as long as the autoloader
ignores this class. The autoloader will ignore this class as long as it has
no methods starting with `test_`.
"""
super().__init__("check_default_value")
self.field_type = field_type
self.default = default
# Never hide repeated warnings when running this test case.
warnings.simplefilter("always")
def check_default_value(self) -> None:
datum_read: DefaultValueTestCaseType
with warnings.catch_warnings(record=True) as actual_warnings:
datum_to_read = cast(DefaultValueTestCaseType, {"H": self.default})
readers_schema = avro.schema.parse(
json.dumps(
{
"type": "record",
"name": "Test",
"fields": [
{
"name": "H",
"type": self.field_type,
"default": self.default,
}
],
}
)
)
writer, _, _ = write_datum(LONG_RECORD_DATUM, LONG_RECORD_SCHEMA)
datum_read_ = cast(DefaultValueTestCaseType, read_datum(writer, LONG_RECORD_SCHEMA, readers_schema))
datum_read = {"H": cast(bytes, datum_read_["H"]).decode()} if isinstance(datum_read_["H"], bytes) else datum_read_
self.assertEqual(datum_to_read, datum_read)
class TestIncompatibleSchemaReading(unittest.TestCase):
def test_deserialization_fails(self) -> None:
reader_schema = avro.schema.parse(
json.dumps(
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "location", "type": "string"},
],
}
)
)
writer_schema = avro.schema.parse(
json.dumps(
{
"namespace": "example.avro",
"type": "record",
"name": "IncompatibleUser",
"fields": [
{"name": "name", "type": "int"},
{"name": "age", "type": "int"},
{"name": "location", "type": "string"},
],
}
)
)
incompatibleUserRecord = {"name": 100, "age": 21, "location": "Woodford"}
writer = avro.io.DatumWriter(writer_schema)
with io.BytesIO() as writer_bio:
enc = avro.io.BinaryEncoder(writer_bio)
writer.write(incompatibleUserRecord, enc)
enc_bytes = writer_bio.getvalue()
reader = avro.io.DatumReader(reader_schema)
with io.BytesIO(enc_bytes) as reader_bio:
self.assertRaises(avro.errors.InvalidAvroBinaryEncoding, reader.read, avro.io.BinaryDecoder(reader_bio))
incompatibleUserRecord = {"name": -10, "age": 21, "location": "Woodford"}
with io.BytesIO() as writer_bio:
enc = avro.io.BinaryEncoder(writer_bio)
writer.write(incompatibleUserRecord, enc)
enc_bytes = writer_bio.getvalue()
reader = avro.io.DatumReader(reader_schema)
with io.BytesIO(enc_bytes) as reader_bio:
self.assertRaises(avro.errors.InvalidAvroBinaryEncoding, reader.read, avro.io.BinaryDecoder(reader_bio))
class TestMisc(unittest.TestCase):
def test_decimal_bytes_small_scale(self) -> None:
"""Avro should raise an AvroTypeException when attempting to write a decimal with a larger exponent than the schema's scale."""
datum = decimal.Decimal("3.1415")
_, _, exp = datum.as_tuple()
scale = -1 * exp - 1
schema = avro.schema.parse(
json.dumps(
{
"type": "bytes",
"logicalType": "decimal",
"precision": 5,
"scale": scale,
}
)
)
self.assertRaises(avro.errors.AvroOutOfScaleException, write_datum, datum, schema)
def test_decimal_fixed_small_scale(self) -> None:
"""Avro should raise an AvroTypeException when attempting to write a decimal with a larger exponent than the schema's scale."""
datum = decimal.Decimal("3.1415")
_, _, exp = datum.as_tuple()
scale = -1 * exp - 1
schema = avro.schema.parse(
json.dumps(
{
"type": "fixed",
"logicalType": "decimal",
"name": "Test",
"size": 8,
"precision": 5,
"scale": scale,
}
)
)
self.assertRaises(avro.errors.AvroOutOfScaleException, write_datum, datum, schema)
def test_unknown_symbol(self) -> None:
datum_to_write = "FOO"
writers_schema = avro.schema.parse(json.dumps({"type": "enum", "name": "Test", "symbols": ["FOO", "BAR"]}))
readers_schema = avro.schema.parse(json.dumps({"type": "enum", "name": "Test", "symbols": ["BAR", "BAZ"]}))
writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
reader = io.BytesIO(writer.getvalue())
decoder = avro.io.BinaryDecoder(reader)
datum_reader = avro.io.DatumReader(writers_schema, readers_schema)
self.assertRaises(avro.errors.SchemaResolutionException, datum_reader.read, decoder)
def test_no_default_value(self) -> None:
writers_schema = LONG_RECORD_SCHEMA
datum_to_write = LONG_RECORD_DATUM
readers_schema = avro.schema.parse(
json.dumps(
{
"type": "record",
"name": "Test",
"fields": [{"name": "H", "type": "int"}],
}
)
)
writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
reader = io.BytesIO(writer.getvalue())
decoder = avro.io.BinaryDecoder(reader)
datum_reader = avro.io.DatumReader(writers_schema, readers_schema)
self.assertRaises(avro.errors.SchemaResolutionException, datum_reader.read, decoder)
def test_projection(self) -> None:
writers_schema = LONG_RECORD_SCHEMA
datum_to_write = LONG_RECORD_DATUM
readers_schema = avro.schema.parse(
json.dumps(
{
"type": "record",
"name": "Test",
"fields": [
{"name": "E", "type": "int"},
{"name": "F", "type": "int"},
],
}
)
)
datum_to_read = {"E": 5, "F": 6}
writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
datum_read = read_datum(writer, writers_schema, readers_schema)
self.assertEqual(datum_to_read, datum_read)
def test_field_order(self) -> None:
writers_schema = LONG_RECORD_SCHEMA
datum_to_write = LONG_RECORD_DATUM
readers_schema = avro.schema.parse(
json.dumps(
{
"type": "record",
"name": "Test",
"fields": [
{"name": "F", "type": "int"},
{"name": "E", "type": "int"},
],
}
)
)
datum_to_read = {"E": 5, "F": 6}
writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
datum_read = read_datum(writer, writers_schema, readers_schema)
self.assertEqual(datum_to_read, datum_read)
def test_type_exception_int(self) -> None:
writers_schema = avro.schema.parse(
json.dumps(
{
"type": "record",
"name": "Test",
"fields": [
{"name": "F", "type": "int"},
{"name": "E", "type": "int"},
],
}
)
)
datum_to_write = {"E": 5, "F": "Bad"}
with self.assertRaises(avro.errors.AvroTypeException) as exc:
write_datum(datum_to_write, writers_schema)
assert str(exc.exception) == 'The datum "Bad" provided for "F" is not an example of the schema "int"'
def test_type_exception_long(self) -> None:
writers_schema = avro.schema.parse(json.dumps({"type": "record", "name": "Test", "fields": [{"name": "foo", "type": "long"}]}))
datum_to_write = {"foo": 5.0}
with self.assertRaises(avro.errors.AvroTypeException) as exc:
write_datum(datum_to_write, writers_schema)
assert str(exc.exception) == 'The datum "5.0" provided for "foo" is not an example of the schema "long"'
def test_type_exception_record(self) -> None:
writers_schema = avro.schema.parse(json.dumps({"type": "record", "name": "Test", "fields": [{"name": "foo", "type": "long"}]}))
datum_to_write = ("foo", 5.0)
with self.assertRaisesRegex(avro.errors.AvroTypeException, r"The datum \".*\" provided for \".*\" is not an example of the schema [\s\S]*"):
write_datum(datum_to_write, writers_schema)
def load_tests(loader: unittest.TestLoader, default_tests: None, pattern: None) -> unittest.TestSuite:
"""Generate test cases across many test schema."""
suite = unittest.TestSuite()
suite.addTests(loader.loadTestsFromTestCase(TestMisc))
suite.addTests(IoValidateTestCase(schema_str, datum) for schema_str, datum in SCHEMAS_TO_VALIDATE)
suite.addTests(RoundTripTestCase(schema_str, datum) for schema_str, datum in SCHEMAS_TO_VALIDATE)
for skip in False, True:
for type_ in "int", "long":
suite.addTests(BinaryEncodingTestCase(skip, type_, datum, hex_) for datum, hex_ in BINARY_ENCODINGS)
suite.addTests(
SchemaPromotionTestCase(write_type, read_type) for write_type, read_type in itertools.combinations(("int", "long", "float", "double"), 2)
)
suite.addTests(DefaultValueTestCase(field_type, default) for field_type, default in DEFAULT_VALUE_EXAMPLES)
suite.addTests(loader.loadTestsFromTestCase(TestIncompatibleSchemaReading))
return suite
if __name__ == "__main__": # pragma: no coverage
unittest.main()