blob: dfbf22e54cccca854f6c1ce2f274d76b201fc4dd [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import timeit
import pickle
import pyfory
from pyfory.format import (
schema,
field,
int8,
int16,
int32,
int64,
utf8,
binary,
boolean,
date32,
timestamp,
list_,
map_,
struct,
)
from dataclasses import dataclass
from typing import List, Dict
@dataclass
class Bar:
f1: int
f2: str
@dataclass
class Foo:
f1: int
f2: str
f3: List[str]
f4: Dict[str, int]
f5: List[int]
f6: int
f7: Bar
def create_foo():
return Foo(
f1=1,
f2="hello",
f3=["a", "b", "c"],
f4={"x": 1, "y": 2},
f5=[10, 20, 30],
f6=42,
f7=Bar(f1=100, f2="world"),
)
def foo_schema():
return schema(
[
field("f1", int64()),
field("f2", utf8()),
field("f3", list_(utf8())),
field("f4", map_(utf8(), int64())),
field("f5", list_(int64())),
field("f6", int64()),
field("f7", struct([field("f1", int64()), field("f2", utf8())])),
]
)
def test_encode():
print(foo_schema())
encoder = pyfory.create_row_encoder(foo_schema())
foo = create_foo()
print("foo", foo)
row = encoder.to_row(foo)
print("row bytes length", len(row.to_bytes()))
print("row bytes", row.to_bytes())
print("row", row) # test __str__
new_foo = encoder.from_row(row)
print("new_foo", new_foo)
assert foo.f1 == new_foo.f1
assert foo.f2 == new_foo.f2
assert foo.f3 == new_foo.f3
assert foo.f4 == new_foo.f4
assert foo.f5 == new_foo.f5
assert foo.f6 == new_foo.f6
def test_encoder():
foo = create_foo()
encoder = pyfory.encoder(Foo)
new_foo = encoder.decode(encoder.encode(foo))
assert foo.f1 == new_foo.f1
assert foo.f2 == new_foo.f2
def test_encoder_with_schema():
foo = create_foo()
encoder = pyfory.encoder(schema=foo_schema())
new_foo = encoder.decode(encoder.encode(foo))
assert foo.f1 == new_foo.f1
assert foo.f2 == new_foo.f2
def test_dict():
dict_ = {"f1": 1, "f2": "str"}
s = schema([field("f1", int64()), field("f2", utf8())])
encoder = pyfory.create_row_encoder(s)
row = encoder.to_row(dict_)
new_obj = encoder.from_row(row)
assert new_obj.f1 == dict_["f1"]
assert new_obj.f2 == dict_["f2"]
def test_ints():
cls = pyfory.record_class_factory("TestNumeric", ["f" + str(i) for i in range(1, 9)])
s = schema(
[
field("f1", int64()),
field("f2", int64()),
field("f3", int32()),
field("f4", int32()),
field("f5", int16()),
field("f6", int16()),
field("f7", int8()),
field("f8", int8()),
]
)
print("pyfory.cls", pyfory.get_qualified_classname(cls))
obj = cls(
f1=2**63 - 1,
f2=-(2**63),
f3=2**31 - 1,
f4=-(2**31),
f5=2**15 - 1,
f6=-(2**15),
f7=2**7 - 1,
f8=-(2**7),
)
print("obj", obj)
encoder = pyfory.create_row_encoder(s)
row = encoder.to_row(obj)
print("row", row)
new_obj = encoder.from_row(row)
print("new_obj", new_obj)
assert new_obj.f1 == obj.f1
assert new_obj.f2 == obj.f2
assert new_obj.f3 == obj.f3
assert new_obj.f4 == obj.f4
assert new_obj.f5 == obj.f5
assert new_obj.f6 == obj.f6
assert new_obj.f7 == obj.f7
assert new_obj.f8 == obj.f8
def test_basic():
cls = pyfory.record_class_factory("TestBasic", ["f" + str(i) for i in range(1, 6)])
s = schema(
[
field("f1", utf8()),
field("f2", binary()),
field("f3", boolean()),
field("f4", date32()),
field("f5", timestamp()),
]
)
from datetime import date, datetime
obj = cls(f1="str", f2=b"123456", f3=True, f4=date(1970, 1, 1), f5=datetime.now())
print("obj", obj)
encoder = pyfory.create_row_encoder(s)
row = encoder.to_row(obj)
print("row", row)
new_obj = encoder.from_row(row)
print("new_obj", new_obj)
print("new_obj", type(new_obj))
assert new_obj.f1 == obj.f1
assert new_obj.f2 == obj.f2
assert new_obj.f3 == obj.f3
assert new_obj.f4 == obj.f4
# Timestamp precision may differ
assert abs((new_obj.f5 - obj.f5).total_seconds()) < 1
@dataclass
class BarNested:
f1: str
f2: List[int]
@dataclass
class FooNested:
f1: int
f2: List[int]
f3: Dict[str, int]
f4: List[BarNested]
def test_binary_row_access():
encoder = pyfory.encoder(FooNested)
foo = FooNested(
f1=10,
f2=list(range(1000)),
f3={f"k{i}": i for i in range(1000)},
f4=[BarNested(f1=f"s{i}", f2=list(range(10))) for i in range(10)],
)
binary_data = encoder.to_row(foo).to_bytes()
foo_row = pyfory.RowData(encoder.schema, binary_data)
print(foo_row.f2[2], foo_row.f4[2].f1, foo_row.f4[2].f2[5])
def benchmark_row_access():
encoder = pyfory.encoder(FooNested)
foo = FooNested(
f1=10,
f2=list(range(1000_000)),
f3={f"k{i}": i for i in range(1000_000)},
f4=[BarNested(f1=f"s{i}", f2=list(range(10))) for i in range(1000_000)],
)
binary_data = encoder.to_row(foo).to_bytes()
def benchmark_fory():
foo_row = pyfory.RowData(encoder.schema, binary_data)
print(foo_row.f2[100000], foo_row.f4[100000].f1, foo_row.f4[200000].f2[5])
print(timeit.timeit(benchmark_fory, number=10))
binary_data = pickle.dumps(foo)
def benchmark_pickle():
new_foo = pickle.loads(binary_data)
print(new_foo.f2[100000], new_foo.f4[100000].f1, new_foo.f4[200000].f2[5])
print(timeit.timeit(benchmark_pickle, number=10))