blob: 608e6ae2d5d24067132555633994b61389b18237 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import itertools
import struct
from io import SEEK_SET
from types import TracebackType
from typing import Callable, Optional, Type
from unittest.mock import MagicMock, patch
import pytest
from pyiceberg.avro.decoder import BinaryDecoder, StreamingBinaryDecoder, new_decoder
from pyiceberg.avro.decoder_fast import CythonBinaryDecoder
from pyiceberg.avro.resolver import resolve_reader
from pyiceberg.io import InputStream
from pyiceberg.types import DoubleType, FloatType
AVAILABLE_DECODERS = [StreamingBinaryDecoder, CythonBinaryDecoder]
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_boolean_true(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x01")
assert decoder.read_boolean() is True
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_boolean_false(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x00")
assert decoder.read_boolean() is False
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_skip_boolean(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x00")
assert decoder.tell() == 0
decoder.skip_boolean()
assert decoder.tell() == 1
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_int(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x18")
assert decoder.read_int() == 12
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_int_longer(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x8e\xd1\x87\x01")
assert decoder.read_int() == 1111111
def zigzag_encode(datum: int) -> bytes:
result = []
datum = (datum << 1) ^ (datum >> 63)
while (datum & ~0x7F) != 0:
result.append(struct.pack("B", (datum & 0x7F) | 0x80))
datum >>= 7
result.append(struct.pack("B", datum))
return b"".join(result)
@pytest.mark.parametrize(
"decoder_class, expected_value",
list(itertools.product(AVAILABLE_DECODERS, [0, -1, 2**32, -(2**32), (2**63 - 1), -(2**63)])),
)
def test_read_int_custom_encode(decoder_class: Callable[[bytes], BinaryDecoder], expected_value: int) -> None:
encoded = zigzag_encode(expected_value)
decoder = decoder_class(encoded)
decoded = decoder.read_int()
assert decoded == expected_value, f"Decoded value does not match decoded={decoded} expected={expected_value}"
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_skip_int(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x18")
assert decoder.tell() == 0
decoder.skip_int()
assert decoder.tell() == 1
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_negative_bytes(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"")
with pytest.raises(ValueError) as exc_info:
decoder.read(-1)
assert "Requested -1 bytes to read, expected positive integer." in str(exc_info.value)
class OneByteAtATimeInputStream(InputStream):
"""
Fake input stream that just returns a single byte at the time
"""
pos = 0
def read(self, size: int = 0) -> bytes:
self.pos += 1
return self.pos.to_bytes(1, byteorder="little")
def seek(self, offset: int, whence: int = SEEK_SET) -> int:
self.pos = offset
return self.pos
def tell(self) -> int:
return self.pos
def close(self) -> None:
pass
def __enter__(self) -> OneByteAtATimeInputStream:
return self
def __exit__(
self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
) -> None:
self.close()
# InMemoryBinaryDecoder doesn't work for a byte at a time reading
@pytest.mark.parametrize("decoder_class", [StreamingBinaryDecoder])
def test_read_single_byte_at_the_time(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(OneByteAtATimeInputStream()) # type: ignore
assert decoder.read(2) == b"\x01\x02"
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_float(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x00\x00\x9a\x41")
assert decoder.read_float() == 19.25
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_skip_float(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x00\x00\x9a\x41")
assert decoder.tell() == 0
decoder.skip_float()
assert decoder.tell() == 4
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_double(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x00\x00\x00\x00\x00\x40\x33\x40")
assert decoder.read_double() == 19.25
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_skip_double(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x00\x00\x00\x00\x00\x40\x33\x40")
assert decoder.tell() == 0
decoder.skip_double()
assert decoder.tell() == 8
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_bytes(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x08\x01\x02\x03\x04")
actual = decoder.read_bytes()
assert actual == b"\x01\x02\x03\x04"
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_utf8(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x04\x76\x6f")
assert decoder.read_utf8() == "vo"
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_skip_utf8(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x04\x76\x6f")
assert decoder.tell() == 0
decoder.skip_utf8()
assert decoder.tell() == 3
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_int_as_float(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x00\x00\x9a\x41")
reader = resolve_reader(FloatType(), DoubleType())
assert reader.read(decoder) == 19.25
@patch("pyiceberg.avro.decoder_fast.CythonBinaryDecoder")
def test_fallback_to_pure_python_decoder(cython_decoder: MagicMock) -> None:
cython_decoder.side_effect = ModuleNotFoundError
with pytest.warns(UserWarning, match="Falling back to pure Python Avro decoder, missing Cython implementation"):
dec = new_decoder(b"")
assert isinstance(dec, StreamingBinaryDecoder)