blob: dadfe61fafb8178b0935cafe92bcb54da2cdbe5b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import array
import datetime
import gc
import io
import pickle
import weakref
from enum import Enum
from typing import Any, List, Dict
import numpy as np
import pandas as pd
from dataclasses import dataclass
import pytest
import pyfury
from pyfury.buffer import Buffer
from pyfury import Fury, Language, _serialization, EnumSerializer
from pyfury.serializer import (
TimestampSerializer,
DateSerializer,
PyArraySerializer,
Numpy1DArraySerializer,
)
from pyfury.tests.core import require_pyarrow
from pyfury.type import FuryType
from pyfury.util import lazy_import
pa = lazy_import("pyarrow")
def test_float():
fury = Fury(language=Language.PYTHON, ref_tracking=True)
assert ser_de(fury, -1.0) == -1.0
assert ser_de(fury, 1 / 3) == 1 / 3
serializer = fury.class_resolver.get_serializer(float)
assert type(serializer) is pyfury.DoubleSerializer
def test_tuple():
fury = Fury(language=Language.PYTHON, ref_tracking=True)
print(len(fury.serialize((-1.0, 2))))
assert ser_de(fury, (-1.0, 2)) == (-1.0, 2)
def test_dict():
fury = Fury(language=Language.PYTHON, ref_tracking=True)
assert ser_de(fury, {1: 2}) == {1: 2}
assert ser_de(fury, {1 / 3: 2.0}) == {1 / 3: 2.0}
assert ser_de(fury, {1 / 3: 2}) == {1 / 3: 2}
assert ser_de(fury, {"1": 2}) == {"1": 2}
assert ser_de(fury, {"1": 1 / 3}) == {"1": 1 / 3}
assert ser_de(fury, {"1": {}}) == {"1": {}}
assert ser_de(fury, {"1": {1: 2}}) == {"1": {1: 2}}
assert ser_de(fury, {"k1": {"a": 2.0}, "k2": {-1.0: -1.0}}) == {
"k1": {"a": 2.0},
"k2": {-1.0: -1.0},
}
# make multiple references point to same `-1.0`.
dict3 = {
1: {5: -1.0},
2: {-1.0: -1.0, 10: -1.0},
}
assert ser_de(fury, dict3) == dict3
@pytest.mark.parametrize("language", [Language.XLANG, Language.PYTHON])
def test_basic_serializer(language):
fury = Fury(language=language, ref_tracking=True)
datetime_serializer = fury.class_resolver.get_serializer(datetime.datetime)
assert isinstance(
datetime_serializer, (TimestampSerializer, _serialization.TimestampSerializer)
)
assert datetime_serializer.get_xtype_id() == FuryType.TIMESTAMP.value
date_serializer = fury.class_resolver.get_serializer(datetime.date)
assert isinstance(date_serializer, (DateSerializer, _serialization.DateSerializer))
assert date_serializer.get_xtype_id() == FuryType.DATE32.value
assert ser_de(fury, True) is True
assert ser_de(fury, False) is False
assert ser_de(fury, -1) == -1
assert ser_de(fury, 2**7 - 1) == 2**7 - 1
assert ser_de(fury, 2**15 - 1) == 2**15 - 1
assert ser_de(fury, -(2**15)) == -(2**15)
assert ser_de(fury, 2**31 - 1) == 2**31 - 1
assert ser_de(fury, 2**63 - 1) == 2**63 - 1
assert ser_de(fury, -(2**63)) == -(2**63)
assert ser_de(fury, 1.0) == 1.0
assert ser_de(fury, -1.0) == -1.0
assert ser_de(fury, "str") == "str"
assert ser_de(fury, b"") == b""
now = datetime.datetime.now()
assert ser_de(fury, now) == now
day = datetime.date(2021, 11, 23)
assert ser_de(fury, day) == day
list_ = ["a", 1, -1.0, True, now, day]
assert ser_de(fury, list_) == list_
dict1_ = {"k1": "a", "k2": 1, "k3": -1.0, "k4": True, "k5": now, "k6": day}
assert ser_de(fury, dict1_) == dict1_
dict2_ = {"a": "a", 1: 1, -1.0: -1.0, True: True, now: now, day: day}
assert ser_de(fury, dict2_) == dict2_
set_ = {"a", 1, -1.0, True, now, day}
assert ser_de(fury, set_) == set_
@pytest.mark.parametrize("language", [Language.XLANG, Language.PYTHON])
def test_ref_tracking(language):
fury = Fury(language=language, ref_tracking=True)
simple_list = []
simple_list.append(simple_list)
new_simple_list = ser_de(fury, simple_list)
assert new_simple_list[0] is new_simple_list
now = datetime.datetime.now()
day = datetime.date(2021, 11, 23)
list_ = ["a", 1, -1.0, True, now, day]
dict1 = {f"k{i}": v for i, v in enumerate(list_)}
dict2 = {v: v for v in list_}
dict3 = {
"list1_0": list_,
"list1_1": list_,
"dict1_0": dict1,
"dict1_1": dict1,
"dict2_0": dict2,
"dict2_1": dict2,
}
dict3["dict3_0"] = dict3
dict3["dict3_1"] = dict3
new_dict3 = ser_de(fury, dict3)
assert new_dict3["list1_0"] == list_
assert new_dict3["list1_0"] is new_dict3["list1_1"]
assert new_dict3["dict1_0"] == dict1
assert new_dict3["dict1_0"] is new_dict3["dict1_1"]
assert new_dict3["dict2_0"] == dict2
assert new_dict3["dict2_0"] is new_dict3["dict2_1"]
assert new_dict3["dict3_0"] is new_dict3
assert new_dict3["dict3_0"] is new_dict3["dict3_0"]
@pytest.mark.parametrize("language", [Language.PYTHON, Language.XLANG])
def test_tmp_ref(language):
# FIXME this can't simulate the case where new objects are allocated on memory
# address of released tmp object.
fury = Fury(language=language, ref_tracking=True)
buffer = Buffer.allocate(128)
writer_index = buffer.writer_index
x = 1
fury.serialize([x], buffer)
fury.serialize([x], buffer)
fury.serialize([x], buffer)
assert buffer.writer_index > writer_index + 15
l1 = fury.deserialize(buffer)
l2 = fury.deserialize(buffer)
l3 = fury.deserialize(buffer)
assert l1 == [x]
assert l2 == [x]
assert l3 == [x]
assert l1 is not l2
assert l1 is not l3
assert l2 is not l3
@pytest.mark.parametrize("language", [Language.PYTHON, Language.XLANG])
def test_multiple_ref(language):
# FIXME this can't simulate the case where new objects are allocated on memory
# address of released tmp object.
fury = Fury(language=language, ref_tracking=True)
buffer = Buffer.allocate(128)
for i in range(1000):
fury.serialize([], buffer)
objs = []
for i in range(1000):
objs.append(fury.deserialize(buffer))
assert len(set(id(o) for o in objs)) == 1000
class RefTestClass1:
def __init__(self, f1=None):
self.f1 = f1
class RefTestClass2:
def __init__(self, f1):
self.f1 = f1
@pytest.mark.parametrize("language", [Language.PYTHON])
def test_ref_cleanup(language):
# FIXME this can't simulate the case where new objects are allocated on memory
# address of released tmp object.
fury = Fury(language=language, ref_tracking=True, require_class_registration=False)
# TODO support Language.XLANG, current unpickler will error for xlang,
o1 = RefTestClass1()
o2 = RefTestClass2(f1=o1)
pickle.loads(pickle.dumps(o2))
ref1 = weakref.ref(o1)
ref2 = weakref.ref(o2)
data = fury.serialize(o2)
del o1, o2
gc.collect()
assert ref1() is None
assert ref2() is None
fury.deserialize(data)
@pytest.mark.parametrize("language", [Language.XLANG, Language.PYTHON])
def test_array_serializer(language):
fury = Fury(language=language, ref_tracking=True, require_class_registration=False)
for typecode in PyArraySerializer.typecode_dict.keys():
arr = array.array(typecode, list(range(10)))
assert ser_de(fury, arr) == arr
for dtype in Numpy1DArraySerializer.dtypes_dict.keys():
arr = np.array(list(range(10)), dtype=dtype)
new_arr = ser_de(fury, arr)
assert np.array_equal(new_arr, arr)
np.testing.assert_array_equal(new_arr, arr)
def ser_de(fury, obj):
binary = fury.serialize(obj)
return fury.deserialize(binary)
def test_pickle():
buf = Buffer.allocate(32)
pickler = pickle.Pickler(buf)
pickler.dump(b"abc")
buf.write_int32(-1)
pickler.dump("abcd")
assert buf.writer_index - 4 == len(pickle.dumps(b"abc")) + len(pickle.dumps("abcd"))
print(f"writer_index {buf.writer_index}")
bytes_io_ = io.BytesIO(buf)
unpickler = pickle.Unpickler(bytes_io_)
assert unpickler.load() == b"abc"
bytes_io_.seek(bytes_io_.tell() + 4)
assert unpickler.load() == "abcd"
print(f"reader_index {buf.reader_index} {bytes_io_.tell()}")
if pa:
pa_buf = pa.BufferReader(buf)
unpickler = pickle.Unpickler(pa_buf)
assert unpickler.load() == b"abc"
pa_buf.seek(pa_buf.tell() + 4)
assert unpickler.load() == "abcd"
print(f"reader_index {buf.reader_index} {pa_buf.tell()} {buf.reader_index}")
unpickler = pickle.Unpickler(buf)
assert unpickler.load() == b"abc"
buf.reader_index = buf.reader_index + 4
assert unpickler.load() == "abcd"
print(f"reader_index {buf.reader_index}")
@require_pyarrow
def test_serialize_arrow():
record_batch = create_record_batch(10000)
table = pa.Table.from_batches([record_batch, record_batch])
fury = Fury(language=Language.XLANG, ref_tracking=True)
serialized_data = Buffer.allocate(32)
fury.serialize(record_batch, buffer=serialized_data)
fury.serialize(table, buffer=serialized_data)
new_batch = fury.deserialize(serialized_data)
new_table = fury.deserialize(serialized_data)
assert new_batch == record_batch
assert new_table == table
@require_pyarrow
def test_serialize_arrow_zero_copy():
record_batch = create_record_batch(10000)
table = pa.Table.from_batches([record_batch, record_batch])
buffer_objects = []
fury = Fury(language=Language.XLANG, ref_tracking=True)
serialized_data = Buffer.allocate(32)
fury.serialize(
record_batch, buffer=serialized_data, buffer_callback=buffer_objects.append
)
fury.serialize(table, buffer=serialized_data, buffer_callback=buffer_objects.append)
buffers = [o.to_buffer() for o in buffer_objects]
new_batch = fury.deserialize(serialized_data, buffers=buffers[:1])
new_table = fury.deserialize(serialized_data, buffers=buffers[1:])
buffer_objects.clear()
assert new_batch == record_batch
assert new_table == table
def create_record_batch(size):
data = [
pa.array([bool(i % 2) for i in range(size)]),
pa.array([f"test{i}" for i in range(size)]),
]
return pa.RecordBatch.from_arrays(data, ["boolean", "varchar"])
@dataclass
class Foo:
f1: int
@dataclass
class Bar(Foo):
f2: int
class BarSerializer(pyfury.Serializer):
def xwrite(self, buffer, value: Bar):
buffer.write_int32(value.f1)
buffer.write_int32(value.f2)
def xread(self, buffer):
return Bar(buffer.read_int32(), buffer.read_int32())
def get_xtype_id(self):
return pyfury.FuryType.FURY_TYPE_TAG.value
def get_xtype_tag(self):
return "test.Bar"
class RegisterClass:
def __init__(self, f1=None):
self.f1 = f1
def test_register_py_serializer():
fury = Fury(
language=Language.PYTHON, ref_tracking=True, require_class_registration=False
)
class Serializer(pyfury.Serializer):
def write(self, buffer, value):
buffer.write_int32(value.f1)
def read(self, buffer):
a = A()
a.f1 = buffer.read_int32()
return a
def xwrite(self, buffer, value):
raise NotImplementedError
def xread(self, buffer):
raise NotImplementedError
fury.register_serializer(A, Serializer(fury, RegisterClass))
assert fury.deserialize(fury.serialize(RegisterClass(100))).f1 == 100
class A:
class B:
class C:
pass
def test_register_class():
fury = Fury(language=Language.PYTHON, ref_tracking=True)
class Serializer(pyfury.Serializer):
def write(self, buffer, value):
pass
def read(self, buffer):
return self.type_()
def xwrite(self, buffer, value):
raise NotImplementedError
def xread(self, buffer):
raise NotImplementedError
fury.register_serializer(A, Serializer(fury, A))
fury.register_serializer(A.B, Serializer(fury, A.B))
fury.register_serializer(A.B.C, Serializer(fury, A.B.C))
assert isinstance(fury.deserialize(fury.serialize(A())), A)
assert isinstance(fury.deserialize(fury.serialize(A.B())), A.B)
assert isinstance(fury.deserialize(fury.serialize(A.B.C())), A.B.C)
def test_pickle_fallback():
fury = Fury(
language=Language.PYTHON, ref_tracking=True, require_class_registration=False
)
o1 = [1, True, np.dtype(np.int32)]
data1 = fury.serialize(o1)
new_o1 = fury.deserialize(data1)
assert o1 == new_o1
df = pd.DataFrame({"a": list(range(10))})
df2 = fury.deserialize(fury.serialize(df))
assert df2.equals(df)
def test_unsupported_callback():
fury = Fury(language=Language.PYTHON, ref_tracking=True)
def f1(x):
return x
def f2(x):
return x + x
obj1 = [1, True, f1, f2, {1: 2}]
with pytest.raises(Exception):
fury.serialize(obj1)
unsupported_objects = []
binary1 = fury.serialize(obj1, unsupported_callback=unsupported_objects.append)
assert len(unsupported_objects) == 2
assert unsupported_objects == [f1, f2]
new_obj1 = fury.deserialize(binary1, unsupported_objects=unsupported_objects)
assert new_obj1 == obj1
def test_slice():
fury = Fury(language=Language.PYTHON, ref_tracking=True)
assert fury.deserialize(fury.serialize(slice(1, None, "10"))) == slice(
1, None, "10"
)
assert fury.deserialize(fury.serialize(slice(1, 100, 10))) == slice(1, 100, 10)
assert fury.deserialize(fury.serialize(slice(1, None, 10))) == slice(1, None, 10)
assert fury.deserialize(fury.serialize(slice(10, 10, None))) == slice(10, 10, None)
assert fury.deserialize(fury.serialize(slice(None, None, 10))) == slice(
None, None, 10
)
assert fury.deserialize(fury.serialize(slice(None, None, None))) == slice(
None, None, None
)
assert fury.deserialize(
fury.serialize([1, 2, slice(1, 100, 10), slice(1, 100, 10)])
) == [1, 2, slice(1, 100, 10), slice(1, 100, 10)]
assert fury.deserialize(
fury.serialize([1, slice(1, None, 10), False, [], slice(1, 100, 10)])
) == [1, slice(1, None, 10), False, [], slice(1, 100, 10)]
assert fury.deserialize(
fury.serialize([1, slice(1, None, "10"), False, [], slice(1, 100, "10")])
) == [1, slice(1, None, "10"), False, [], slice(1, 100, "10")]
class TestEnum(Enum):
E1 = 1
E2 = 2
E3 = "E3"
E4 = "E4"
def test_enum():
fury = Fury(language=Language.PYTHON, ref_tracking=True)
assert ser_de(fury, TestEnum.E1) == TestEnum.E1
assert ser_de(fury, TestEnum.E2) == TestEnum.E2
assert ser_de(fury, TestEnum.E3) == TestEnum.E3
assert ser_de(fury, TestEnum.E4) == TestEnum.E4
assert isinstance(fury.class_resolver.get_serializer(TestEnum), EnumSerializer)
assert isinstance(
fury.class_resolver.get_serializer(obj=TestEnum.E1), EnumSerializer
)
assert isinstance(
fury.class_resolver.get_serializer(obj=TestEnum.E4), EnumSerializer
)
def test_duplicate_serialize():
fury = Fury(language=Language.PYTHON, ref_tracking=True)
assert ser_de(fury, TestEnum.E1) == TestEnum.E1
assert ser_de(fury, TestEnum.E2) == TestEnum.E2
assert ser_de(fury, TestEnum.E4) == TestEnum.E4
assert ser_de(fury, TestEnum.E2) == TestEnum.E2
assert ser_de(fury, TestEnum.E1) == TestEnum.E1
assert ser_de(fury, TestEnum.E4) == TestEnum.E4
@dataclass(unsafe_hash=True)
class TestCacheClass1:
f1: int
def test_cache_serializer():
fury = Fury(language=Language.PYTHON, ref_tracking=True)
fury.register_serializer(TestCacheClass1, pyfury.PickleStrongCacheSerializer(fury))
assert ser_de(fury, TestCacheClass1(1)) == TestCacheClass1(1)
fury.register_serializer(TestCacheClass1, pyfury.PickleCacheSerializer(fury))
assert ser_de(fury, TestCacheClass1(1)) == TestCacheClass1(1)
classinfo = pyfury.PickleStrongCacheSerializer.new_classinfo(fury)
buffer = Buffer.allocate(32)
fury.serialize_ref(buffer, TestCacheClass1(1), classinfo)
assert fury.deserialize_ref(buffer) == TestCacheClass1(1)
classinfo = pyfury.PickleCacheSerializer.new_classinfo(fury)
fury.serialize_ref(buffer, TestCacheClass1(1), classinfo)
assert fury.deserialize_ref(buffer) == TestCacheClass1(1)
def test_pandas_range_index():
fury = Fury(
language=Language.PYTHON, ref_tracking=True, require_class_registration=False
)
fury.register_serializer(pd.RangeIndex, pyfury.PandasRangeIndexSerializer(fury))
index = pd.RangeIndex(1, 100, 2, name="a")
new_index = ser_de(fury, index)
pd.testing.assert_index_equal(new_index, new_index)
@dataclass(unsafe_hash=True)
class TestPyDataClass1:
f1: int
f2: float
f3: str
f4: bool
f5: Any
f6: List
f7: Dict
def test_py_serialize_dataclass():
fury = Fury(language=Language.PYTHON, ref_tracking=True)
obj1 = TestPyDataClass1(
f1=1, f2=-2.0, f3="abc", f4=True, f5="xyz", f6=[1, 2], f7={"k1": "v1"}
)
assert ser_de(fury, obj1) == obj1
obj2 = TestPyDataClass1(
f1=None, f2=-2.0, f3="abc", f4=None, f5="xyz", f6=None, f7=None
)
assert ser_de(fury, obj2) == obj2
def test_function():
fury = Fury(
language=Language.PYTHON, ref_tracking=True, require_class_registration=False
)
c = fury.deserialize(fury.serialize(lambda x: x * 2))
assert c(2) == 4
def func(x):
return x * 2
c = fury.deserialize(fury.serialize(func))
assert c(2) == 4
df = pd.DataFrame({"a": list(range(10))})
df_sum = fury.deserialize(fury.serialize(df.sum))
assert df_sum().equals(df.sum())