blob: 0da8beaaa635b8c5277594f3d77cd781ae52a934 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# distutils: language = c++
# cython: embedsignature = True
# cython: language_level = 3
# cython: annotate = True
import array
import dataclasses
import datetime
import enum
import logging
import os
import sys
import warnings
from typing import TypeVar, Union, Iterable, get_type_hints
from pyfury._util import get_bit, set_bit, clear_bit
from pyfury._fury import Language, OpaqueObject
from pyfury._fury import _PicklerStub, _UnpicklerStub, Pickler, Unpickler
from pyfury._fury import _ENABLE_CLASS_REGISTRATION_FORCIBLY
from pyfury.error import ClassNotCompatibleError
from pyfury.lib import mmh3
from pyfury.type import is_primitive_type, FuryType, Int8Type, Int16Type, Int32Type, \
Int64Type, Float32Type, Float64Type, Int16ArrayType, Int32ArrayType, \
Int64ArrayType, Float32ArrayType, Float64ArrayType, infer_field, load_class
from pyfury.util import is_little_endian
from libc.stdint cimport *
from libcpp.vector cimport vector
from cpython cimport PyObject
from cpython.ref cimport *
from libcpp cimport bool as c_bool
from cython.operator cimport dereference as deref
from pyfury._util cimport Buffer
from pyfury.includes.libabsl cimport flat_hash_map
try:
import numpy as np
except ImportError:
np = None
if sys.version_info[:2] < (3, 8): # pragma: no cover
import pickle5 as pickle # nosec # pylint: disable=import_pickle
else:
import pickle # nosec # pylint: disable=import_pickle
cimport cython
logger = logging.getLogger(__name__)
ENABLE_FURY_CYTHON_SERIALIZATION = os.environ.get(
"ENABLE_FURY_CYTHON_SERIALIZATION", "True").lower() in ("true", "1")
cdef int8_t NULL_FLAG = -3
# This flag indicates that object is a not-null value.
# We don't use another byte to indicate REF, so that we can save one byte.
cdef int8_t REF_FLAG = -2
# this flag indicates that the object is a non-null value.
cdef int8_t NOT_NULL_VALUE_FLAG = -1
# this flag indicates that the object is a referencable and first read.
cdef int8_t REF_VALUE_FLAG = 0
@cython.final
cdef class MapRefResolver:
cdef flat_hash_map[uint64_t, int32_t] written_objects_id # id(obj) -> ref_id
# Hold object to avoid tmp object gc when serialize nested fields/objects.
cdef vector[PyObject*] written_objects
cdef vector[PyObject*] read_objects
cdef vector[int32_t] read_ref_ids
cdef object read_object
cdef c_bool ref_tracking
def __cinit__(self, c_bool ref_tracking):
self.read_object = None
self.ref_tracking = ref_tracking
# Special methods of extension types must be declared with def, not cdef.
def __dealloc__(self):
self.reset()
cpdef inline c_bool write_ref_or_null(self, Buffer buffer, obj):
if not self.ref_tracking:
if obj is None:
buffer.write_int8(NULL_FLAG)
return True
else:
buffer.write_int8(NOT_NULL_VALUE_FLAG)
return False
if obj is None:
buffer.write_int8(NULL_FLAG)
return True
cdef uint64_t object_id = <uintptr_t><PyObject*>obj
cdef int32_t next_id
cdef flat_hash_map[uint64_t, int32_t].iterator it = \
self.written_objects_id.find(object_id)
if it == self.written_objects_id.end():
next_id = self.written_objects_id.size()
self.written_objects_id[object_id] = next_id
self.written_objects.push_back(<PyObject*>obj)
Py_INCREF(obj)
buffer.write_int8(REF_VALUE_FLAG)
return False
else:
# The obj has been written previously.
buffer.write_int8(REF_FLAG)
buffer.write_varint32(<uint64_t>deref(it).second)
return True
cpdef inline int8_t read_ref_or_null(self, Buffer buffer):
cdef int8_t head_flag = buffer.read_int8()
if not self.ref_tracking:
return head_flag
cdef int32_t ref_id
if head_flag == REF_FLAG:
# read reference id and get object from reference resolver
ref_id = buffer.read_varint32()
self.read_object = <object>(self.read_objects[ref_id])
return REF_FLAG
else:
self.read_object = None
return head_flag
cpdef inline int32_t preserve_ref_id(self):
if not self.ref_tracking:
return -1
next_read_ref_id = self.read_objects.size()
self.read_objects.push_back(NULL)
self.read_ref_ids.push_back(next_read_ref_id)
return next_read_ref_id
cpdef inline int32_t try_preserve_ref_id(self, Buffer buffer):
if not self.ref_tracking:
# `NOT_NULL_VALUE_FLAG` can be used as stub reference id because we use
# `refId >= NOT_NULL_VALUE_FLAG` to read data.
return buffer.read_int8()
head_flag = buffer.read_int8()
if head_flag == REF_FLAG:
# read reference id and get object from reference resolver
ref_id = buffer.read_varint32()
self.read_object = <object>(self.read_objects[ref_id])
# `head_flag` except `REF_FLAG` can be used as stub reference id because
# we use `refId >= NOT_NULL_VALUE_FLAG` to read data.
return head_flag
else:
self.read_object = None
if head_flag == REF_VALUE_FLAG:
return self.preserve_ref_id()
return head_flag
cpdef inline reference(self, obj):
if not self.ref_tracking:
return
cdef int32_t ref_id = self.read_ref_ids.back()
self.read_ref_ids.pop_back()
cdef c_bool need_inc = self.read_objects[ref_id] == NULL
if need_inc:
Py_INCREF(obj)
self.read_objects[ref_id] = <PyObject*>obj
cpdef inline get_read_object(self, id_=None):
if not self.ref_tracking:
return None
if id_ is None:
return self.read_object
cdef int32_t ref_id = id_
return <object>(self.read_objects[ref_id])
cpdef inline set_read_object(self, int32_t ref_id, obj):
if not self.ref_tracking:
return
if ref_id >= 0:
need_inc = self.read_objects[ref_id] == NULL
if need_inc:
Py_INCREF(obj)
self.read_objects[ref_id] = <PyObject*>obj
cpdef inline reset(self):
self.reset_write()
self.reset_read()
cpdef inline reset_write(self):
self.written_objects_id.clear()
for item in self.written_objects:
Py_XDECREF(item)
self.written_objects.clear()
cpdef inline reset_read(self):
if not self.ref_tracking:
return
for item in self.read_objects:
Py_XDECREF(item)
self.read_objects.clear()
self.read_ref_ids.clear()
self.read_object = None
cdef int8_t NOT_SUPPORT_CROSS_LANGUAGE = 0
cdef int8_t USE_CLASSNAME = 0
cdef int8_t USE_CLASS_ID = 1
# preserve 0 as flag for class id not set in ClassInfo`
cdef int8_t NO_CLASS_ID = 0
cdef int8_t DEFAULT_DYNAMIC_WRITE_STRING_ID = -1
cdef int8_t PYINT_CLASS_ID = 1
cdef int8_t PYFLOAT_CLASS_ID = 2
cdef int8_t PYBOOL_CLASS_ID = 3
cdef int8_t STRING_CLASS_ID = 4
cdef int8_t PICKLE_CLASS_ID = 5
cdef int8_t PICKLE_STRONG_CACHE_CLASS_ID = 6
cdef int8_t PICKLE_CACHE_CLASS_ID = 7
# `NOT_NULL_VALUE_FLAG` + `CLASS_ID<<1` in little-endian order
cdef int32_t NOT_NULL_PYINT_FLAG = NOT_NULL_VALUE_FLAG & 0b11111111 | \
(PYINT_CLASS_ID << 9)
cdef int32_t NOT_NULL_PYFLOAT_FLAG = NOT_NULL_VALUE_FLAG & 0b11111111 | \
(PYFLOAT_CLASS_ID << 9)
cdef int32_t NOT_NULL_PYBOOL_FLAG = NOT_NULL_VALUE_FLAG & 0b11111111 | \
(PYBOOL_CLASS_ID << 9)
cdef int32_t NOT_NULL_STRING_FLAG = NOT_NULL_VALUE_FLAG & 0b11111111 | \
(STRING_CLASS_ID << 9)
cdef class BufferObject:
"""
Fury binary representation of an object.
Note: This class is used for zero-copy out-of-band serialization and shouldn't be
used for any other cases.
"""
cpdef int32_t total_bytes(self):
"""total size for serialized bytes of an object"""
raise NotImplementedError
cpdef write_to(self, Buffer buffer):
"""Write serialized object to a buffer."""
raise NotImplementedError
cpdef Buffer to_buffer(self):
"""Write serialized data as Buffer."""
raise NotImplementedError
@cython.final
cdef class BytesBufferObject(BufferObject):
cdef public bytes binary
def __init__(self, bytes binary):
self.binary = binary
cpdef inline int32_t total_bytes(self):
return len(self.binary)
cpdef inline write_to(self, Buffer buffer):
buffer.write_bytes(self.binary)
cpdef inline Buffer to_buffer(self):
return Buffer(self.binary)
@cython.final
cdef class _PickleStub:
pass
@cython.final
cdef class PickleStrongCacheStub:
pass
@cython.final
cdef class PickleCacheStub:
pass
@cython.final
cdef class ClassResolver:
cdef:
readonly Fury fury
dict _type_id_to_class # Dict[int, type]
dict _type_id_to_serializer # Dict[int, Serializer]
dict _type_id_and_cls_to_serializer # Dict[Tuple[int, type], Serializer]
dict _type_tag_to_class_x_lang_map
int16_t _class_id_counter
set _used_classes_id
public list _registered_id2_class_info
vector[PyObject*] _c_registered_id2_class_info
# cls -> ClassInfo
flat_hash_map[uint64_t, PyObject*] _c_classes_info
# hash -> ClassInfo
flat_hash_map[int64_t, PyObject*] _c_hash_to_classinfo
# hash -> MetaStringBytes
flat_hash_map[int64_t, PyObject*] _c_hash_to_enum_string_bytes
# classname MetaStringBytes address -> class
flat_hash_map[uint64_t, PyObject*] _c_str_bytes_to_class
# classname MetaStringBytes address -> str
flat_hash_map[uint64_t, PyObject*] _c_enum_str_to_str
int16_t dynamic_write_string_id
vector[PyObject*] _c_dynamic_written_enum_string
vector[PyObject*] _c_dynamic_id_to_enum_string_vec
vector[PyObject*] _c_dynamic_id_to_classinfo_vec
Serializer _serializer
# hold objects to avoid gc, since flat_hash_map/vector doesn't
# hold python reference.
public dict _classes_info # Dict[type, "ClassInfo"]
set _class_set
set _classname_set
set _enum_str_set
def __init__(self, fury):
self.fury = fury
self._type_id_to_class = dict()
self._type_id_to_serializer = dict()
self._type_id_and_cls_to_serializer = dict()
self._type_tag_to_class_x_lang_map = dict()
self._class_id_counter = PICKLE_CACHE_CLASS_ID + 1
self._used_classes_id = set()
self._registered_id2_class_info = list()
self.dynamic_write_string_id = 0
self._serializer = None
self._classes_info = dict()
self._class_set = set()
self._classname_set = set()
self._enum_str_set = set()
def initialize(self):
self.register_class(int, class_id=PYINT_CLASS_ID)
self.register_class(float, class_id=PYFLOAT_CLASS_ID)
self.register_class(bool, class_id=PYBOOL_CLASS_ID)
self.register_class(str, class_id=STRING_CLASS_ID)
self.register_class(_PickleStub, class_id=PICKLE_CLASS_ID)
self.register_class(PickleStrongCacheStub, class_id=PICKLE_STRONG_CACHE_CLASS_ID)
self.register_class(PickleCacheStub, class_id=PICKLE_CACHE_CLASS_ID)
self._add_default_serializers()
def register_serializer(self, cls: Union[type, TypeVar], serializer):
assert isinstance(cls, (type, TypeVar)), cls
type_id = serializer.get_xtype_id()
if type_id != NOT_SUPPORT_CROSS_LANGUAGE:
self._add_x_lang_serializer(cls, serializer=serializer)
else:
self.register_class(cls)
self._classes_info[cls].serializer = serializer
def register_class(self, cls: Union[type, TypeVar], *, class_id: int = None, type_tag: str = None):
"""Register class with given type id or tag, if tag is not None, it will be used for
cross-language serialization."""
if type_tag is not None:
assert class_id is None, (f"Type tag {type_tag} has been set already, "
f"set class id at the same time is not allowed.")
self.register_serializer(
cls, ComplexObjectSerializer(self.fury, cls, type_tag)
)
return
classinfo = self._classes_info.get(cls)
if classinfo is None:
if isinstance(cls, TypeVar):
class_name_bytes = (cls.__module__ + "#" + cls.__name__).encode("utf-8")
else:
class_name_bytes = (cls.__module__ + "#" + cls.__qualname__) \
.encode("utf-8")
class_id = class_id if class_id is not None else self._next_class_id()
assert class_id not in self._used_classes_id, (
self._used_classes_id,
self._classes_info,
)
classinfo = ClassInfo(
cls=cls, class_name_bytes=class_name_bytes, class_id=class_id
)
self._classes_info[cls] = classinfo
self._c_classes_info[<uintptr_t><PyObject*>cls] = <PyObject*>classinfo
if len(self._registered_id2_class_info) <= class_id:
self._registered_id2_class_info.extend(
[None] * (class_id - len(self._registered_id2_class_info) + 1)
)
self._registered_id2_class_info[class_id] = classinfo
self._c_registered_id2_class_info.resize(class_id + 1)
self._c_registered_id2_class_info[class_id] = <PyObject*>classinfo
else:
if classinfo.class_id == NO_CLASS_ID:
class_id = class_id if class_id is not None else self._next_class_id()
assert class_id not in self._used_classes_id, (
self._used_classes_id,
self._classes_info,
)
classinfo.class_id = class_id
if len(self._registered_id2_class_info) <= class_id:
self._registered_id2_class_info.extend(
[None] * (class_id - len(self._registered_id2_class_info) + 1)
)
self._registered_id2_class_info[class_id] = classinfo
self._c_registered_id2_class_info.resize(class_id + 1)
self._c_registered_id2_class_info[class_id] = <PyObject*>classinfo
else:
if class_id is not None and classinfo.class_id != class_id:
raise ValueError(
f"Inconsistent class id {class_id} vs {classinfo.class_id} "
f"for class {cls}"
)
def _next_class_id(self):
class_id = self._class_id_counter = self._class_id_counter + 1
while class_id in self._used_classes_id:
class_id = self._class_id_counter = self._class_id_counter + 1
return class_id
def _add_serializer(
self,
cls: Union[type, TypeVar],
serializer=None,
serializer_cls=None):
if serializer_cls:
serializer = serializer_cls(self.fury, cls)
self.register_serializer(cls, serializer)
def _add_x_lang_serializer(self,
cls: Union[type, TypeVar],
serializer=None,
serializer_cls=None):
if serializer_cls:
serializer = serializer_cls(self.fury, cls)
type_id = serializer.get_xtype_id()
assert type_id != NOT_SUPPORT_CROSS_LANGUAGE
self._type_id_and_cls_to_serializer[(type_id, cls)] = serializer
self.register_class(cls)
classinfo = self._classes_info[cls]
classinfo.serializer = serializer
if type_id == FuryType.FURY_TYPE_TAG.value:
type_tag = serializer.get_xtype_tag()
assert type(type_tag) is str
assert type_tag not in self._type_tag_to_class_x_lang_map
classinfo.type_tag_bytes = MetaStringBytes(type_tag.encode("utf-8"))
self._type_tag_to_class_x_lang_map[type_tag] = cls
else:
self._type_id_to_serializer[type_id] = serializer
if type_id > NOT_SUPPORT_CROSS_LANGUAGE:
self._type_id_to_class[type_id] = cls
def _add_default_serializers(self):
self._add_x_lang_serializer(int, serializer_cls=ByteSerializer)
self._add_x_lang_serializer(int, serializer_cls=Int16Serializer)
self._add_x_lang_serializer(int, serializer_cls=Int32Serializer)
self._add_x_lang_serializer(int, serializer_cls=Int64Serializer)
self._add_x_lang_serializer(float, serializer_cls=FloatSerializer)
self._add_x_lang_serializer(float, serializer_cls=DoubleSerializer)
self._add_serializer(type(None), serializer_cls=NoneSerializer)
self._add_serializer(bool, serializer_cls=BooleanSerializer)
self._add_serializer(Int8Type, serializer_cls=ByteSerializer)
self._add_serializer(Int16Type, serializer_cls=Int16Serializer)
self._add_serializer(Int32Type, serializer_cls=Int32Serializer)
self._add_serializer(Int64Type, serializer_cls=Int64Serializer)
self._add_serializer(Float32Type, serializer_cls=FloatSerializer)
self._add_serializer(Float64Type, serializer_cls=DoubleSerializer)
self._add_serializer(str, serializer_cls=StringSerializer)
self._add_serializer(datetime.date, serializer_cls=DateSerializer)
self._add_serializer(
datetime.datetime, serializer_cls=TimestampSerializer
)
self._add_serializer(bytes, serializer_cls=BytesSerializer)
self._add_serializer(list, serializer_cls=ListSerializer)
self._add_serializer(tuple, serializer_cls=TupleSerializer)
self._add_serializer(dict, serializer_cls=MapSerializer)
self._add_serializer(set, serializer_cls=SetSerializer)
self._add_serializer(enum.Enum, serializer_cls=EnumSerializer)
self._add_serializer(slice, serializer_cls=SliceSerializer)
from pyfury import PickleCacheSerializer, PickleStrongCacheSerializer
self._add_serializer(PickleStrongCacheStub,
serializer=PickleStrongCacheSerializer(self.fury))
self._add_serializer(PickleCacheStub,
serializer=PickleCacheSerializer(self.fury))
try:
import pyarrow as pa
from pyfury.format.serializer import (
ArrowRecordBatchSerializer,
ArrowTableSerializer,
)
self._add_serializer(
pa.RecordBatch, serializer_cls=ArrowRecordBatchSerializer
)
self._add_serializer(pa.Table, serializer_cls=ArrowTableSerializer)
except Exception:
pass
for typecode in PyArraySerializer.typecode_dict.keys():
self._add_serializer(
array.array,
serializer=PyArraySerializer(self.fury, array.array, typecode),
)
self._add_serializer(
PyArraySerializer.typecodearray_type[typecode],
serializer=PyArraySerializer(self.fury, array.array, typecode),
)
if np:
for dtype in Numpy1DArraySerializer.dtypes_dict.keys():
self._add_serializer(
np.ndarray,
serializer=Numpy1DArraySerializer(self.fury, array.array, dtype),
)
cpdef inline Serializer get_serializer(self, cls=None, type_id=None, obj=None):
"""
Returns
-------
Returns or create serializer for the provided class
"""
assert cls is not None or type_id is not None or obj is not None
cdef Serializer serializer_
if obj is not None:
cls = type(obj)
if cls is int and 2**63 - 1 >= obj >= -(2**63):
type_id = FuryType.INT64.value
elif cls is float:
type_id = FuryType.DOUBLE.value
elif cls is array.array:
info = PyArraySerializer.typecode_dict.get(obj.typecode)
if info is not None:
type_id = info[1]
elif np and cls is np.ndarray and obj.ndim == 1:
info = Numpy1DArraySerializer.dtypes_dict.get(obj.dtype)
if info:
type_id = info[2]
if type_id is not None:
if cls is not None:
serializer_ = self._type_id_and_cls_to_serializer[(type_id, cls)]
else:
serializer_ = self._type_id_to_serializer[type_id]
else:
class_info = self._classes_info.get(cls)
if class_info is not None:
serializer_ = class_info.serializer
else:
self._add_serializer(cls, serializer=self.get_or_create_serializer(cls))
serializer_ = self._classes_info.get(cls).serializer
self._serializer = serializer_
return serializer_
cpdef inline Serializer get_or_create_serializer(self, cls):
return self.get_or_create_classinfo(cls).serializer
cpdef inline ClassInfo get_or_create_classinfo(self, cls):
cdef PyObject* classinfo_ptr = self._c_classes_info[<uintptr_t><PyObject*>cls]
cdef ClassInfo class_info
if classinfo_ptr != NULL:
class_info = <object>classinfo_ptr
if class_info.serializer is not None:
return class_info
else:
class_info.serializer = self._create_serializer(cls)
return class_info
else:
serializer = self._create_serializer(cls)
if type(serializer) is PickleSerializer:
class_id = PICKLE_CLASS_ID
else:
class_id = NO_CLASS_ID
class_name_bytes = (cls.__module__ + "#" + cls.__qualname__).encode("utf-8")
class_info = ClassInfo(
cls=cls, class_name_bytes=class_name_bytes,
serializer=serializer, class_id=class_id
)
self._classes_info[cls] = class_info
self._c_classes_info[<uintptr_t><PyObject*>cls] = <PyObject*>class_info
return class_info
cdef _create_serializer(self, cls):
mro = cls.__mro__
classinfo_ = self._classes_info.get(cls)
for clz in mro:
class_info = self._classes_info.get(clz)
if (
class_info
and class_info.serializer
and class_info.serializer.support_subclass()
):
if classinfo_ is None or classinfo_.class_id == NO_CLASS_ID:
logger.info("Class %s not registered", cls)
serializer = type(class_info.serializer)(self.fury, cls)
break
else:
if dataclasses.is_dataclass(cls):
if classinfo_ is None or classinfo_.class_id == NO_CLASS_ID:
logger.info("Class %s not registered", cls)
from pyfury import DataClassSerializer
serializer = DataClassSerializer(self.fury, cls)
else:
serializer = PickleSerializer(self.fury, cls)
return serializer
cpdef inline write_classinfo(self, Buffer buffer, ClassInfo classinfo):
cdef int32_t class_id = classinfo.class_id
if class_id != NO_CLASS_ID:
buffer.write_varint32((class_id << 1))
return
buffer.write_varint32(1)
self._write_enum_string_bytes(buffer, classinfo.class_name_bytes)
cpdef inline ClassInfo read_classinfo(self, Buffer buffer):
cdef int32_t h1 = buffer.read_varint32()
cdef int32_t class_id = h1 >> 1
cdef ClassInfo classinfo
cdef PyObject* classinfo_ptr
# registered class id are greater than `NO_CLASS_ID`.
if h1 & 0b1 == 0:
assert class_id >= 0, class_id
classinfo_ptr = self._c_registered_id2_class_info[class_id]
if classinfo_ptr == NULL:
raise ValueError(f"Unexpected class_id {class_id} "
f"{self._registered_id2_class_info}")
classinfo = <ClassInfo>classinfo_ptr
if classinfo.serializer is None:
classinfo.serializer = self._create_serializer(classinfo.cls)
return classinfo
cdef int32_t header = buffer.read_varint32()
cdef int32_t length = header >> 1
if header & 0b1 != 0:
return <ClassInfo>self._c_dynamic_id_to_classinfo_vec[length - 1]
cdef int64_t class_name_bytes_hash = buffer.read_int64()
cdef int32_t reader_index = buffer.reader_index
buffer.check_bound(reader_index, length)
buffer.reader_index = reader_index + length
classinfo_ptr = self._c_hash_to_classinfo[class_name_bytes_hash]
if classinfo_ptr != NULL:
self._c_dynamic_id_to_classinfo_vec.push_back(classinfo_ptr)
return <ClassInfo>classinfo_ptr
cdef bytes classname_bytes = buffer.get_bytes(reader_index, length)
cdef str full_class_name = classname_bytes.decode(encoding="utf-8")
cls = load_class(full_class_name)
classinfo = self.get_or_create_classinfo(cls)
classinfo_ptr = <PyObject*>classinfo
self._c_hash_to_classinfo[class_name_bytes_hash] = classinfo_ptr
self._c_dynamic_id_to_classinfo_vec.push_back(classinfo_ptr)
return classinfo
cdef inline _write_enum_string_bytes(
self, Buffer buffer, MetaStringBytes enum_string_bytes):
cdef int16_t dynamic_class_id = enum_string_bytes.dynamic_write_string_id
if dynamic_class_id == DEFAULT_DYNAMIC_WRITE_STRING_ID:
dynamic_class_id = self.dynamic_write_string_id
enum_string_bytes.dynamic_write_string_id = dynamic_class_id
self.dynamic_write_string_id += 1
self._c_dynamic_written_enum_string.push_back(<PyObject*>enum_string_bytes)
buffer.write_varint32(enum_string_bytes.length << 1)
buffer.write_int64(enum_string_bytes.hashcode)
buffer.write_bytes(enum_string_bytes.data)
else:
buffer.write_varint32(((dynamic_class_id + 1) << 1) | 1)
cdef inline MetaStringBytes _read_enum_string_bytes(self, Buffer buffer):
cdef int32_t header = buffer.read_varint32()
cdef int32_t length = header >> 1
if header & 0b1 != 0:
return <MetaStringBytes>self._c_dynamic_id_to_enum_string_vec[length - 1]
cdef int64_t hashcode = buffer.read_int64()
cdef int32_t reader_index = buffer.reader_index
buffer.check_bound(reader_index, length)
buffer.reader_index = reader_index + length
cdef PyObject* enum_str_ptr = self._c_hash_to_enum_string_bytes[hashcode]
if enum_str_ptr != NULL:
self._c_dynamic_id_to_enum_string_vec.push_back(enum_str_ptr)
return <MetaStringBytes>enum_str_ptr
cdef bytes str_bytes = buffer.get_bytes(reader_index, length)
cdef MetaStringBytes enum_str = MetaStringBytes(str_bytes, hashcode=hashcode)
self._enum_str_set.add(enum_str)
enum_str_ptr = <PyObject*>enum_str
self._c_hash_to_enum_string_bytes[hashcode] = enum_str_ptr
self._c_dynamic_id_to_enum_string_vec.push_back(enum_str_ptr)
return enum_str
cpdef inline xwrite_class(self, Buffer buffer, cls):
cdef PyObject* classinfo_ptr = self._c_classes_info[<uintptr_t><PyObject*>cls]
assert classinfo_ptr != NULL
cdef MetaStringBytes class_name_bytes = (<object>classinfo_ptr).class_name_bytes
self._write_enum_string_bytes(buffer, class_name_bytes)
cpdef inline xwrite_type_tag(self, Buffer buffer, cls):
cdef PyObject* classinfo_ptr = self._c_classes_info[<uintptr_t><PyObject*>cls]
assert classinfo_ptr != NULL
cdef MetaStringBytes type_tag_bytes = (<object>classinfo_ptr).type_tag_bytes
self._write_enum_string_bytes(buffer, type_tag_bytes)
cpdef inline read_class_by_type_tag(self, Buffer buffer):
tag = self.xread_classname(buffer)
return self._type_tag_to_class_x_lang_map[tag]
cpdef inline xread_class(self, Buffer buffer):
cdef MetaStringBytes str_bytes = self._read_enum_string_bytes(buffer)
cdef uint64_t object_id = <uintptr_t><PyObject*>str_bytes
cdef PyObject* cls_ptr = self._c_str_bytes_to_class[object_id]
if cls_ptr != NULL:
return <object>cls_ptr
cdef str full_class_name = str_bytes.data.decode(encoding="utf-8")
cls = load_class(full_class_name)
self._c_str_bytes_to_class[object_id] = <PyObject*>cls
self._class_set.add(cls)
return cls
cpdef inline str xread_classname(self, Buffer buffer):
cdef MetaStringBytes str_bytes = self._read_enum_string_bytes(buffer)
cdef uint64_t object_id = <uintptr_t><PyObject*>str_bytes
cdef PyObject* classname_ptr = self._c_enum_str_to_str[object_id]
if classname_ptr != NULL:
return <object>classname_ptr
cdef str full_class_name = str_bytes.data.decode(encoding="utf-8")
self._c_enum_str_to_str[object_id] = <PyObject*>full_class_name
self._classname_set.add(full_class_name)
return full_class_name
cpdef inline get_class_by_type_id(self, int32_t type_id):
return self._type_id_to_class[type_id]
cpdef inline reset(self):
self.reset_write()
self.reset_read()
cpdef inline reset_read(self):
self._c_dynamic_id_to_enum_string_vec.clear()
self._c_dynamic_id_to_classinfo_vec.clear()
cpdef inline reset_write(self):
if self.dynamic_write_string_id != 0:
self.dynamic_write_string_id = 0
for ptr in self._c_dynamic_written_enum_string:
(<MetaStringBytes>ptr).dynamic_write_string_id = \
DEFAULT_DYNAMIC_WRITE_STRING_ID
self._c_dynamic_written_enum_string.clear()
@cython.final
cdef class MetaStringBytes:
cdef bytes data
cdef int16_t length
cdef int64_t hashcode
cdef int16_t dynamic_write_string_id
def __init__(self, data, hashcode=None):
self.data = data
self.length = len(data)
if hashcode is None:
hashcodes = mmh3.hash_buffer(data, 47)
hashcode = hashcodes[0]
assert isinstance(hashcode, int)
# FIXME: why using & 0xffffffffffffff00 overflow in cython
hashcode = (hashcode >> 8) << 8
self.hashcode = hashcode
self.dynamic_write_string_id = DEFAULT_DYNAMIC_WRITE_STRING_ID
def __eq__(self, other):
return type(other) is MetaStringBytes and other.hashcode == self.hashcode
def __hash__(self):
return self.hashcode
@cython.final
cdef class ClassInfo:
cdef public object cls
cdef public int16_t class_id
cdef public Serializer serializer
cdef public MetaStringBytes class_name_bytes
cdef public MetaStringBytes type_tag_bytes
def __init__(
self,
cls: Union[type, TypeVar] = None,
class_id: int = NO_CLASS_ID,
serializer: Serializer = None,
class_name_bytes: bytes = None,
type_tag_bytes: bytes = None,
):
self.cls = cls
self.class_id = class_id
self.serializer = serializer
self.class_name_bytes = MetaStringBytes(class_name_bytes)
if type_tag_bytes is None:
self.type_tag_bytes = None
else:
self.type_tag_bytes = MetaStringBytes(type_tag_bytes)
def __repr__(self):
return f"ClassInfo(cls={self.cls}, class_id={self.class_id}, " \
f"serializer={self.serializer})"
@cython.final
cdef class Fury:
cdef readonly object language
cdef readonly c_bool ref_tracking
cdef readonly c_bool require_class_registration
cdef readonly MapRefResolver ref_resolver
cdef readonly ClassResolver class_resolver
cdef readonly SerializationContext serialization_context
cdef Buffer buffer
cdef public object pickler # pickle.Pickler
cdef public object unpickler # Optional[pickle.Unpickler]
cdef object _buffer_callback
cdef object _buffers # iterator
cdef object _unsupported_callback
cdef object _unsupported_objects # iterator
cdef object _peer_language
cdef list _native_objects
def __init__(
self,
language=Language.XLANG,
ref_tracking: bool = False,
require_class_registration: bool = True,
):
"""
:param require_class_registration:
Whether to require registering classes for serialization, enabled by default.
If disabled, unknown insecure classes can be deserialized, which can be
insecure and cause remote code execution attack if the classes
`__new__`/`__init__`/`__eq__`/`__hash__` method contain malicious code.
Do not disable class registration if you can't ensure your environment are
*indeed secure*. We are not responsible for security risks if
you disable this option.
"""
self.language = language
if _ENABLE_CLASS_REGISTRATION_FORCIBLY or require_class_registration:
self.require_class_registration = True
else:
self.require_class_registration = False
self.ref_tracking = ref_tracking
self.ref_resolver = MapRefResolver(ref_tracking)
self.class_resolver = ClassResolver(self)
self.class_resolver.initialize()
self.serialization_context = SerializationContext()
self.buffer = Buffer.allocate(32)
if not require_class_registration:
warnings.warn(
"Class registration is disabled, unknown classes can be deserialized "
"which may be insecure.",
RuntimeWarning,
stacklevel=2,
)
self.pickler = Pickler(self.buffer)
else:
self.pickler = _PicklerStub(self.buffer)
self.unpickler = None
self._buffer_callback = None
self._buffers = None
self._unsupported_callback = None
self._unsupported_objects = None
self._peer_language = None
self._native_objects = []
def register_serializer(self, cls: Union[type, TypeVar], Serializer serializer):
self.class_resolver.register_serializer(cls, serializer)
def register_class(self, cls: Union[type, TypeVar], *,
class_id: int = None, type_tag: str = None):
self.class_resolver.register_class(cls, class_id=class_id, type_tag=type_tag)
def serialize(
self, obj,
Buffer buffer=None,
buffer_callback=None,
unsupported_callback=None
) -> Union[Buffer, bytes]:
try:
return self._serialize(
obj,
buffer,
buffer_callback=buffer_callback,
unsupported_callback=unsupported_callback)
finally:
self.reset_write()
cpdef inline _serialize(
self, obj, Buffer buffer, buffer_callback=None, unsupported_callback=None):
self._buffer_callback = buffer_callback
self._unsupported_callback = unsupported_callback
if buffer is not None:
self.pickler = Pickler(self.buffer)
else:
self.buffer.writer_index = 0
buffer = self.buffer
cdef int32_t mask_index = buffer.writer_index
# 1byte used for bit mask
buffer.grow(1)
buffer.writer_index = mask_index + 1
if obj is None:
set_bit(buffer, mask_index, 0)
else:
clear_bit(buffer, mask_index, 0)
# set endian
if is_little_endian:
set_bit(buffer, mask_index, 1)
else:
clear_bit(buffer, mask_index, 1)
if self.language == Language.XLANG:
# set reader as x_lang.
set_bit(buffer, mask_index, 2)
# set writer language.
buffer.write_int8(Language.PYTHON.value)
else:
# set reader as native.
clear_bit(buffer, mask_index, 2)
if self._buffer_callback is not None:
set_bit(buffer, mask_index, 3)
else:
clear_bit(buffer, mask_index, 3)
cdef int32_t start_offset
if self.language == Language.PYTHON:
self.serialize_ref(buffer, obj)
else:
start_offset = buffer.writer_index
# preserve 4-byte for nativeObjects start offsets.
buffer.write_int32(<int32_t>-1)
# preserve 4-byte for nativeObjects size
buffer.write_int32(<int32_t>-1)
self.xserialize_ref(buffer, obj)
buffer.put_int32(start_offset, buffer.writer_index)
buffer.put_int32(start_offset + 4, len(self._native_objects))
self.ref_resolver.reset_write()
# fury write opaque object classname which cause later write of classname
# only write an id.
self.class_resolver.reset_write()
for native_object in self._native_objects:
self.serialize_ref(buffer, native_object)
self.reset_write()
if buffer is not self.buffer:
return buffer
else:
return buffer.to_bytes(0, buffer.writer_index)
cpdef inline serialize_ref(
self, Buffer buffer, obj, ClassInfo classinfo=None):
cls = type(obj)
if cls is str:
buffer.write_int16(NOT_NULL_STRING_FLAG)
buffer.write_string(obj)
return
elif cls is int:
buffer.write_int16(NOT_NULL_PYINT_FLAG)
buffer.write_varint64(obj)
return
elif cls is bool:
buffer.write_int16(NOT_NULL_PYBOOL_FLAG)
buffer.write_bool(obj)
return
elif cls is float:
buffer.write_int16(NOT_NULL_PYFLOAT_FLAG)
buffer.write_double(obj)
return
if self.ref_resolver.write_ref_or_null(buffer, obj):
return
if classinfo is None:
classinfo = self.class_resolver.get_or_create_classinfo(cls)
self.class_resolver.write_classinfo(buffer, classinfo)
classinfo.serializer.write(buffer, obj)
cpdef inline serialize_nonref(self, Buffer buffer, obj):
cls = type(obj)
if cls is str:
buffer.write_varint32(STRING_CLASS_ID << 1)
buffer.write_string(obj)
return
elif cls is int:
buffer.write_varint32(PYINT_CLASS_ID << 1)
buffer.write_varint64(obj)
return
elif cls is bool:
buffer.write_varint32(PYBOOL_CLASS_ID << 1)
buffer.write_bool(obj)
return
elif cls is float:
buffer.write_varint32(PYFLOAT_CLASS_ID << 1)
buffer.write_double(obj)
return
cdef ClassInfo classinfo = self.class_resolver.get_or_create_classinfo(cls)
self.class_resolver.write_classinfo(buffer, classinfo)
classinfo.serializer.write(buffer, obj)
cpdef inline xserialize_ref(
self, Buffer buffer, obj, Serializer serializer=None):
if serializer is None or serializer.need_to_write_ref:
if not self.ref_resolver.write_ref_or_null(buffer, obj):
self.xserialize_nonref(
buffer, obj, serializer=serializer
)
else:
if obj is None:
buffer.write_int8(NULL_FLAG)
else:
buffer.write_int8(NOT_NULL_VALUE_FLAG)
self.xserialize_nonref(
buffer, obj, serializer=serializer
)
cpdef inline xserialize_nonref(
self, Buffer buffer, obj, Serializer serializer=None):
cls = type(obj)
serializer = serializer or self.class_resolver.get_serializer(obj=obj)
cdef int16_t type_id = serializer.get_xtype_id()
buffer.write_int16(type_id)
if type_id != NOT_SUPPORT_CROSS_LANGUAGE:
if type_id == FuryType.FURY_TYPE_TAG.value:
self.class_resolver.xwrite_type_tag(buffer, cls)
if type_id < NOT_SUPPORT_CROSS_LANGUAGE:
self.class_resolver.xwrite_class(buffer, cls)
serializer.xwrite(buffer, obj)
else:
# Write classname so it can be used for debugging which object doesn't
# support cross-language.
# TODO add a config to disable this to reduce space cost.
self.class_resolver.xwrite_class(buffer, cls)
# serializer may increase reference id multi times internally, thus peer
# cross-language later fields/objects deserialization will use wrong
# reference id since we skip opaque objects deserialization.
# So we stash native objects and serialize all those object at the last.
buffer.write_varint32(len(self._native_objects))
self._native_objects.append(obj)
def deserialize(
self,
buffer: Union[Buffer, bytes],
buffers: Iterable = None,
unsupported_objects: Iterable = None,
):
try:
if type(buffer) == bytes:
buffer = Buffer(buffer)
return self._deserialize(buffer, buffers, unsupported_objects)
finally:
self.reset_read()
cpdef inline _deserialize(
self, Buffer buffer, buffers=None, unsupported_objects=None):
if self.require_class_registration:
self.unpickler = _UnpicklerStub(buffer)
else:
self.unpickler = Unpickler(buffer)
if unsupported_objects is not None:
self._unsupported_objects = iter(unsupported_objects)
cdef int32_t reader_index = buffer.reader_index
buffer.reader_index = reader_index + 1
if get_bit(buffer, reader_index, 0):
return None
cdef c_bool is_little_endian_ = get_bit(buffer, reader_index, 1)
assert is_little_endian_, (
"Big endian is not supported for now, "
"please ensure peer machine is little endian."
)
cdef c_bool is_target_x_lang = get_bit(buffer, reader_index, 2)
if is_target_x_lang:
self._peer_language = Language(buffer.read_int8())
else:
self._peer_language = Language.PYTHON
cdef c_bool is_out_of_band_serialization_enabled = \
get_bit(buffer, reader_index, 3)
if is_out_of_band_serialization_enabled:
assert buffers is not None, (
"buffers shouldn't be null when the serialized stream is "
"produced with buffer_callback not null."
)
self._buffers = iter(buffers)
else:
assert buffers is None, (
"buffers should be null when the serialized stream is "
"produced with buffer_callback null."
)
if not is_target_x_lang:
return self.deserialize_ref(buffer)
cdef int32_t native_objects_start_offset = buffer.read_int32()
cdef int32_t native_objects_size = buffer.read_int32()
if self._peer_language == Language.PYTHON:
if native_objects_size > 0:
native_objects_buffer = buffer.slice(native_objects_start_offset)
for i in range(native_objects_size):
self._native_objects.append(
self.deserialize_ref(native_objects_buffer)
)
self.ref_resolver.reset_read()
self.class_resolver.reset_read()
return self.xdeserialize_ref(buffer)
cpdef inline deserialize_ref(self, Buffer buffer):
cdef MapRefResolver ref_resolver = self.ref_resolver
cdef int32_t ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
return ref_resolver.get_read_object()
# indicates that the object is first read.
cdef ClassInfo classinfo = self.class_resolver.read_classinfo(buffer)
cls = classinfo.cls
if cls is str:
return buffer.read_string()
elif cls is int:
return buffer.read_varint64()
elif cls is bool:
return buffer.read_bool()
elif cls is float:
return buffer.read_double()
o = classinfo.serializer.read(buffer)
ref_resolver.set_read_object(ref_id, o)
return o
cpdef inline deserialize_nonref(self, Buffer buffer):
"""Deserialize not-null and non-reference object from buffer."""
cdef ClassInfo classinfo = self.class_resolver.read_classinfo(buffer)
cls = classinfo.cls
if cls is str:
return buffer.read_string()
elif cls is int:
return buffer.read_varint64()
elif cls is bool:
return buffer.read_bool()
elif cls is float:
return buffer.read_double()
return classinfo.serializer.read(buffer)
cpdef inline xdeserialize_ref(
self, Buffer buffer, Serializer serializer=None):
cdef MapRefResolver ref_resolver
cdef int32_t red_id
if serializer is None or serializer.need_to_write_ref:
ref_resolver = self.ref_resolver
red_id = ref_resolver.try_preserve_ref_id(buffer)
# indicates that the object is first read.
if red_id >= NOT_NULL_VALUE_FLAG:
o = self.xdeserialize_nonref(
buffer, serializer=serializer
)
ref_resolver.set_read_object(red_id, o)
return o
else:
return ref_resolver.get_read_object()
cdef int8_t head_flag = buffer.read_int8()
if head_flag == NULL_FLAG:
return None
return self.xdeserialize_nonref(
buffer, serializer=serializer
)
cpdef inline xdeserialize_nonref(
self, Buffer buffer, Serializer serializer=None):
cdef int16_t type_id = buffer.read_int16()
cls = None
if type_id != NOT_SUPPORT_CROSS_LANGUAGE:
if type_id == FuryType.FURY_TYPE_TAG.value:
cls = self.class_resolver.read_class_by_type_tag(buffer)
if type_id < NOT_SUPPORT_CROSS_LANGUAGE:
if self._peer_language is not Language.PYTHON:
self.class_resolver.xread_classname(buffer)
cls = self.class_resolver.get_class_by_type_id(-type_id)
serializer = serializer or self.class_resolver.get_serializer(
type_id=-type_id
)
else:
cls = self.class_resolver.xread_class(buffer)
serializer = serializer or self.class_resolver.get_serializer(
cls=cls, type_id=type_id
)
else:
if type_id != FuryType.FURY_TYPE_TAG.value:
cls = self.class_resolver.get_class_by_type_id(type_id)
serializer = serializer or self.class_resolver.get_serializer(
cls=cls, type_id=type_id
)
assert cls is not None
return serializer.xread(buffer)
cdef str class_name = self.class_resolver.xread_classname(buffer)
cdef int32_t ordinal = buffer.read_varint32()
if self._peer_language != Language.PYTHON:
return OpaqueObject(self._peer_language, class_name, ordinal)
else:
return self._native_objects[ordinal]
cpdef inline write_buffer_object(self, Buffer buffer, BufferObject buffer_object):
if self._buffer_callback is not None and self._buffer_callback(buffer_object):
buffer.write_bool(False)
return
buffer.write_bool(True)
cdef int32_t size = buffer_object.total_bytes()
# writer length.
buffer.write_varint32(size)
cdef int32_t writer_index = buffer.writer_index
buffer.ensure(writer_index + size)
cdef Buffer buf = buffer.slice(buffer.writer_index, size)
buffer_object.write_to(buf)
buffer.writer_index += size
cpdef inline Buffer read_buffer_object(self, Buffer buffer):
cdef c_bool in_band = buffer.read_bool()
if not in_band:
assert self._buffers is not None
return next(self._buffers)
cdef int32_t size = buffer.read_varint32()
cdef Buffer buf = buffer.slice(buffer.reader_index, size)
buffer.reader_index += size
return buf
cpdef inline handle_unsupported_write(self, Buffer buffer, obj):
if self._unsupported_callback is None or self._unsupported_callback(obj):
buffer.write_bool(True)
self.pickler.dump(obj)
else:
buffer.write_bool(False)
cpdef inline handle_unsupported_read(self, Buffer buffer):
cdef c_bool in_band = buffer.read_bool()
if in_band:
return self.unpickler.load()
else:
assert self._unsupported_objects is not None
return next(self._unsupported_objects)
cpdef inline write_ref_pyobject(
self, Buffer buffer, value, ClassInfo classinfo=None):
if self.ref_resolver.write_ref_or_null(buffer, value):
return
if classinfo is None:
classinfo = self.class_resolver.get_or_create_classinfo(type(value))
self.class_resolver.write_classinfo(buffer, classinfo)
classinfo.serializer.write(buffer, value)
cpdef inline read_ref_pyobject(self, Buffer buffer):
cdef MapRefResolver ref_resolver = self.ref_resolver
cdef int32_t ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
return ref_resolver.get_read_object()
# indicates that the object is first read.
cdef ClassInfo classinfo = self.class_resolver.read_classinfo(buffer)
o = classinfo.serializer.read(buffer)
ref_resolver.set_read_object(ref_id, o)
return o
cpdef inline reset_write(self):
self.ref_resolver.reset_write()
self.class_resolver.reset_write()
self.serialization_context.reset()
self._native_objects.clear()
self.pickler.clear_memo()
self._unsupported_callback = None
cpdef inline reset_read(self):
self.ref_resolver.reset_read()
self.class_resolver.reset_read()
self.serialization_context.reset()
self._native_objects.clear()
self._buffers = None
self.unpickler = None
self._unsupported_objects = None
cpdef inline reset(self):
self.reset_write()
self.reset_read()
cpdef inline write_nullable_pybool(Buffer buffer, value):
if value is None:
buffer.write_int8(NULL_FLAG)
else:
buffer.write_int8(NOT_NULL_VALUE_FLAG)
buffer.write_bool(value)
cpdef inline write_nullable_pyint64(Buffer buffer, value):
if value is None:
buffer.write_int8(NULL_FLAG)
else:
buffer.write_int8(NOT_NULL_VALUE_FLAG)
buffer.write_varint64(value)
cpdef inline write_nullable_pyfloat64(Buffer buffer, value):
if value is None:
buffer.write_int8(NULL_FLAG)
else:
buffer.write_int8(NOT_NULL_VALUE_FLAG)
buffer.write_double(value)
cpdef inline write_nullable_pystr(Buffer buffer, value):
if value is None:
buffer.write_int8(NULL_FLAG)
else:
buffer.write_int8(NOT_NULL_VALUE_FLAG)
buffer.write_string(value)
cpdef inline read_nullable_pybool(Buffer buffer):
if buffer.read_int8() == NOT_NULL_VALUE_FLAG:
return buffer.read_bool()
else:
return None
cpdef inline read_nullable_pyint64(Buffer buffer):
if buffer.read_int8() == NOT_NULL_VALUE_FLAG:
return buffer.read_varint64()
else:
return None
cpdef inline read_nullable_pyfloat64(Buffer buffer):
if buffer.read_int8() == NOT_NULL_VALUE_FLAG:
return buffer.read_double()
else:
return None
cpdef inline read_nullable_pystr(Buffer buffer):
if buffer.read_int8() == NOT_NULL_VALUE_FLAG:
return buffer.read_string()
else:
return None
@cython.final
cdef class SerializationContext:
cdef dict objects
def __init__(self):
self.objects = dict()
def add(self, key, obj):
self.objects[id(key)] = obj
def __contains__(self, key):
return id(key) in self.objects
def __getitem__(self, key):
return self.objects[id(key)]
def get(self, key):
return self.objects.get(id(key))
def reset(self):
if len(self.objects) > 0:
self.objects.clear()
cdef class Serializer:
cdef readonly Fury fury
cdef readonly object type_
cdef public c_bool need_to_write_ref
def __init__(self, fury, type_: Union[type, TypeVar]):
self.fury = fury
self.type_ = type_
self.need_to_write_ref = not is_primitive_type(type_)
cpdef int16_t get_xtype_id(self):
"""
Returns
-------
Returns NOT_SUPPORT_CROSS_LANGUAGE if the serializer doesn't
support cross-language serialization.
Return a number in range (0, 32767) if the serializer support
cross-language serialization and native serialization data is the
same with cross-language serialization.
Return a negative short in range [-32768, 0) if the serializer
support cross-language serialization and native serialization data
is not the same with cross-language serialization.
"""
return NOT_SUPPORT_CROSS_LANGUAGE
cpdef str get_xtype_tag(self):
"""
Returns
-------
a type tag used for setup type mapping between languages.
"""
cpdef write(self, Buffer buffer, value):
raise NotImplementedError
cpdef read(self, Buffer buffer):
raise NotImplementedError
cpdef xwrite(self, Buffer buffer, value):
raise NotImplemented
cpdef xread(self, Buffer buffer):
raise NotImplemented
@classmethod
def support_subclass(cls) -> bool:
return False
cdef class CrossLanguageCompatibleSerializer(Serializer):
cpdef write(self, Buffer buffer, value):
raise NotImplementedError
cpdef read(self, Buffer buffer):
raise NotImplementedError
def __init__(self, fury, type_):
super().__init__(fury, type_)
cpdef xwrite(self, Buffer buffer, value):
self.write(buffer, value)
cpdef xread(self, Buffer buffer):
return self.read(buffer)
@cython.final
cdef class BooleanSerializer(CrossLanguageCompatibleSerializer):
cpdef inline int16_t get_xtype_id(self):
return FuryType.BOOL.value
cpdef inline write(self, Buffer buffer, value):
buffer.write_bool(value)
cpdef inline read(self, Buffer buffer):
return buffer.read_bool()
@cython.final
cdef class NoneSerializer(Serializer):
cpdef inline xwrite(self, Buffer buffer, value):
raise NotImplementedError
cpdef inline xread(self, Buffer buffer):
raise NotImplementedError
cpdef inline write(self, Buffer buffer, value):
pass
cpdef inline read(self, Buffer buffer):
return None
@cython.final
cdef class ByteSerializer(CrossLanguageCompatibleSerializer):
cpdef inline int16_t get_xtype_id(self):
return FuryType.INT8.value
cpdef inline write(self, Buffer buffer, value):
buffer.write_int8(value)
cpdef inline read(self, Buffer buffer):
return buffer.read_int8()
@cython.final
cdef class Int16Serializer(CrossLanguageCompatibleSerializer):
cpdef inline int16_t get_xtype_id(self):
return FuryType.INT16.value
cpdef inline write(self, Buffer buffer, value):
buffer.write_int16(value)
cpdef inline read(self, Buffer buffer):
return buffer.read_int16()
@cython.final
cdef class Int32Serializer(CrossLanguageCompatibleSerializer):
cpdef inline int16_t get_xtype_id(self):
return FuryType.INT32.value
cpdef inline write(self, Buffer buffer, value):
buffer.write_int32(value)
cpdef inline read(self, Buffer buffer):
return buffer.read_int32()
@cython.final
cdef class Int64Serializer(CrossLanguageCompatibleSerializer):
cpdef inline int16_t get_xtype_id(self):
return FuryType.INT64.value
cpdef inline xwrite(self, Buffer buffer, value):
buffer.write_int64(value)
cpdef inline xread(self, Buffer buffer):
return buffer.read_int64()
cpdef inline write(self, Buffer buffer, value):
buffer.write_varint64(value)
cpdef inline read(self, Buffer buffer):
return buffer.read_varint64()
@cython.final
cdef class FloatSerializer(CrossLanguageCompatibleSerializer):
cpdef inline int16_t get_xtype_id(self):
return FuryType.FLOAT.value
cpdef inline write(self, Buffer buffer, value):
buffer.write_float(value)
cpdef inline read(self, Buffer buffer):
return buffer.read_float()
@cython.final
cdef class DoubleSerializer(CrossLanguageCompatibleSerializer):
cpdef inline int16_t get_xtype_id(self):
return FuryType.DOUBLE.value
cpdef inline write(self, Buffer buffer, value):
buffer.write_double(value)
cpdef inline read(self, Buffer buffer):
return buffer.read_double()
@cython.final
cdef class StringSerializer(CrossLanguageCompatibleSerializer):
cpdef inline int16_t get_xtype_id(self):
return FuryType.STRING.value
cpdef inline write(self, Buffer buffer, value):
buffer.write_string(value)
cpdef inline read(self, Buffer buffer):
return buffer.read_string()
cdef _base_date = datetime.date(1970, 1, 1)
@cython.final
cdef class DateSerializer(CrossLanguageCompatibleSerializer):
cpdef inline int16_t get_xtype_id(self):
return FuryType.DATE32.value
cpdef inline write(self, Buffer buffer, value):
if type(value) is not datetime.date:
raise TypeError(
"{} should be {} instead of {}".format(
value, datetime.date, type(value)
)
)
days = (value - _base_date).days
buffer.write_int32(days)
cpdef inline read(self, Buffer buffer):
days = buffer.read_int32()
return _base_date + datetime.timedelta(days=days)
@cython.final
cdef class TimestampSerializer(CrossLanguageCompatibleSerializer):
cpdef inline int16_t get_xtype_id(self):
return FuryType.TIMESTAMP.value
cpdef inline write(self, Buffer buffer, value):
if type(value) is not datetime.datetime:
raise TypeError(
"{} should be {} instead of {}".format(value, datetime, type(value))
)
# TimestampType represent micro seconds
timestamp = int(value.timestamp() * 1000000)
buffer.write_int64(timestamp)
cpdef inline read(self, Buffer buffer):
ts = buffer.read_int64() / 1000000
# TODO support timezone
return datetime.datetime.fromtimestamp(ts)
@cython.final
cdef class BytesSerializer(CrossLanguageCompatibleSerializer):
cpdef inline int16_t get_xtype_id(self):
return FuryType.BINARY.value
cpdef inline write(self, Buffer buffer, value):
self.fury.write_buffer_object(buffer, BytesBufferObject(value))
cpdef inline read(self, Buffer buffer):
fury_buf = self.fury.read_buffer_object(buffer)
return fury_buf.to_pybytes()
cdef class CollectionSerializer(Serializer):
cdef ClassResolver class_resolver
cdef MapRefResolver ref_resolver
cdef Serializer elem_serializer
def __init__(self, fury, type_, elem_serializer=None):
super().__init__(fury, type_)
self.class_resolver = fury.class_resolver
self.ref_resolver = fury.ref_resolver
self.elem_serializer = elem_serializer
cpdef int16_t get_xtype_id(self):
return -FuryType.LIST.value
cpdef write(self, Buffer buffer, value):
buffer.write_varint32(len(value))
cdef MapRefResolver ref_resolver = self.ref_resolver
cdef ClassResolver class_resolver = self.class_resolver
for s in value:
cls = type(s)
if cls is str:
buffer.write_int16(NOT_NULL_STRING_FLAG)
buffer.write_string(s)
elif cls is int:
buffer.write_int16(NOT_NULL_PYINT_FLAG)
buffer.write_varint64(s)
elif cls is bool:
buffer.write_int16(NOT_NULL_PYBOOL_FLAG)
buffer.write_bool(s)
elif cls is float:
buffer.write_int16(NOT_NULL_PYFLOAT_FLAG)
buffer.write_double(s)
else:
if not ref_resolver.write_ref_or_null(buffer, s):
classinfo = class_resolver.get_or_create_classinfo(cls)
class_resolver.write_classinfo(buffer, classinfo)
classinfo.serializer.write(buffer, s)
cpdef xwrite(self, Buffer buffer, value):
cdef int32_t len_ = 0
try:
len_ = len(value)
except AttributeError:
value = list(value)
len_ = len(value)
buffer.write_varint32(len_)
for s in value:
self.fury.xserialize_ref(
buffer, s, serializer=self.elem_serializer
)
len_ += 1
cdef class ListSerializer(CollectionSerializer):
cpdef int16_t get_xtype_id(self):
return FuryType.LIST.value
cpdef read(self, Buffer buffer):
cdef MapRefResolver ref_resolver = self.fury.ref_resolver
cdef ClassResolver class_resolver = self.fury.class_resolver
cdef list list_ = []
ref_resolver.reference(list_)
populate_list(buffer, list_, ref_resolver, class_resolver)
return list_
cpdef xread(self, Buffer buffer):
cdef int32_t len_ = buffer.read_varint32()
cdef list collection_ = []
self.fury.ref_resolver.reference(collection_)
for i in range(len_):
collection_.append(self.fury.xdeserialize_ref(
buffer, serializer=self.elem_serializer
))
return collection_
cdef populate_list(
Buffer buffer,
list list_,
MapRefResolver ref_resolver,
ClassResolver class_resolver):
cdef int32_t ref_id
cdef ClassInfo classinfo
cdef int32_t len_ = buffer.read_varint32()
for i in range(len_):
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
list_.append(ref_resolver.get_read_object())
continue
# indicates that the object is first read.
classinfo = class_resolver.read_classinfo(buffer)
cls = classinfo.cls
# Note that all read operations in fast paths of list/tuple/set/dict/sub_dict
# ust match corresponding writing operations. Otherwise, ref tracking will
# error.
if cls is str:
list_.append(buffer.read_string())
elif cls is int:
list_.append(buffer.read_varint64())
elif cls is bool:
list_.append(buffer.read_bool())
elif cls is float:
list_.append(buffer.read_double())
else:
o = classinfo.serializer.read(buffer)
ref_resolver.set_read_object(ref_id, o)
list_.append(o)
@cython.final
cdef class TupleSerializer(CollectionSerializer):
cpdef inline read(self, Buffer buffer):
cdef MapRefResolver ref_resolver = self.fury.ref_resolver
cdef ClassResolver class_resolver = self.fury.class_resolver
cdef list list_ = []
populate_list(buffer, list_, ref_resolver, class_resolver)
return tuple(list_)
cpdef inline xread(self, Buffer buffer):
cdef int32_t len_ = buffer.read_varint32()
cdef list collection_ = []
for i in range(len_):
collection_.append(self.fury.xdeserialize_ref(
buffer, serializer=self.elem_serializer
))
return tuple(collection_)
@cython.final
cdef class StringArraySerializer(ListSerializer):
def __init__(self, fury, type_):
super().__init__(fury, type_, StringSerializer(fury, str))
cpdef inline int16_t get_xtype_id(self):
return FuryType.FURY_STRING_ARRAY.value
@cython.final
cdef class SetSerializer(CollectionSerializer):
cpdef inline int16_t get_xtype_id(self):
return FuryType.FURY_SET.value
cpdef inline read(self, Buffer buffer):
cdef MapRefResolver ref_resolver = self.fury.ref_resolver
cdef ClassResolver class_resolver = self.fury.class_resolver
cdef set instance = set()
ref_resolver.reference(instance)
cdef int32_t len_ = buffer.read_varint32()
cdef int32_t ref_id
cdef ClassInfo classinfo
for i in range(len_):
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
instance.add(ref_resolver.get_read_object())
continue
# indicates that the object is first read.
classinfo = class_resolver.read_classinfo(buffer)
cls = classinfo.cls
if cls is str:
instance.add(buffer.read_string())
elif cls is int:
instance.add(buffer.read_varint64())
elif cls is bool:
instance.add(buffer.read_bool())
elif cls is float:
instance.add(buffer.read_double())
else:
o = classinfo.serializer.read(buffer)
ref_resolver.set_read_object(ref_id, o)
instance.add(o)
return instance
cpdef inline xread(self, Buffer buffer):
cdef int32_t len_ = buffer.read_varint32()
cdef set instance = set()
self.fury.ref_resolver.reference(instance)
for i in range(len_):
instance.add(self.fury.xdeserialize_ref(
buffer, serializer=self.elem_serializer
))
return instance
@cython.final
cdef class MapSerializer(Serializer):
cdef ClassResolver class_resolver
cdef MapRefResolver ref_resolver
cdef Serializer key_serializer
cdef Serializer value_serializer
def __init__(self, fury, type_, key_serializer=None, value_serializer=None):
super().__init__(fury, type_)
self.class_resolver = fury.class_resolver
self.ref_resolver = fury.ref_resolver
self.key_serializer = key_serializer
self.value_serializer = value_serializer
cpdef inline int16_t get_xtype_id(self):
return FuryType.MAP.value
cpdef inline write(self, Buffer buffer, o):
cdef dict value = o
buffer.write_varint32(len(value))
cdef ClassInfo key_classinfo
cdef ClassInfo value_classinfo
for k, v in value.items():
key_cls = type(k)
if key_cls is str:
buffer.write_int16(NOT_NULL_STRING_FLAG)
buffer.write_string(k)
else:
if not self.ref_resolver.write_ref_or_null(buffer, k):
key_classinfo = self.class_resolver.get_or_create_classinfo(key_cls)
self.class_resolver.write_classinfo(buffer, key_classinfo)
key_classinfo.serializer.write(buffer, k)
value_cls = type(v)
if value_cls is str:
buffer.write_int16(NOT_NULL_STRING_FLAG)
buffer.write_string(v)
elif value_cls is int:
buffer.write_int16(NOT_NULL_PYINT_FLAG)
buffer.write_varint64(v)
elif value_cls is bool:
buffer.write_int16(NOT_NULL_PYBOOL_FLAG)
buffer.write_bool(v)
elif value_cls is float:
buffer.write_int16(NOT_NULL_PYFLOAT_FLAG)
buffer.write_double(v)
else:
if not self.ref_resolver.write_ref_or_null(buffer, v):
value_classinfo = self.class_resolver. \
get_or_create_classinfo(value_cls)
self.class_resolver.write_classinfo(buffer, value_classinfo)
value_classinfo.serializer.write(buffer, v)
cpdef inline read(self, Buffer buffer):
cdef MapRefResolver ref_resolver = self.ref_resolver
cdef ClassResolver class_resolver = self.class_resolver
cdef dict map_ = {}
ref_resolver.reference(map_)
cdef int32_t len_ = buffer.read_varint32()
cdef int32_t ref_id
cdef ClassInfo key_classinfo
cdef ClassInfo value_classinfo
for i in range(len_):
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
key = ref_resolver.get_read_object()
else:
key_classinfo = class_resolver.read_classinfo(buffer)
if key_classinfo.cls is str:
key = buffer.read_string()
else:
key = key_classinfo.serializer.read(buffer)
ref_resolver.set_read_object(ref_id, key)
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
value = ref_resolver.get_read_object()
else:
value_classinfo = class_resolver.read_classinfo(buffer)
cls = value_classinfo.cls
if cls is str:
value = buffer.read_string()
elif cls is int:
value = buffer.read_varint64()
elif cls is bool:
value = buffer.read_bool()
elif cls is float:
value = buffer.read_double()
else:
value = value_classinfo.serializer.read(buffer)
ref_resolver.set_read_object(ref_id, value)
map_[key] = value
return map_
cpdef inline xwrite(self, Buffer buffer, o):
cdef dict value = o
buffer.write_varint32(len(value))
for k, v in value.items():
self.fury.xserialize_ref(
buffer, k, serializer=self.key_serializer
)
self.fury.xserialize_ref(
buffer, v, serializer=self.value_serializer
)
cpdef inline xread(self, Buffer buffer):
cdef int32_t len_ = buffer.read_varint32()
cdef dict map_ = {}
self.fury.ref_resolver.reference(map_)
for i in range(len_):
k = self.fury.xdeserialize_ref(
buffer, serializer=self.key_serializer
)
v = self.fury.xdeserialize_ref(
buffer, serializer=self.value_serializer
)
map_[k] = v
return map_
@cython.final
cdef class SubMapSerializer(Serializer):
cdef ClassResolver class_resolver
cdef MapRefResolver ref_resolver
cdef Serializer key_serializer
cdef Serializer value_serializer
def __init__(self, fury, type_, key_serializer=None, value_serializer=None):
super().__init__(fury, type_)
self.class_resolver = fury.class_resolver
self.ref_resolver = fury.ref_resolver
self.key_serializer = key_serializer
self.value_serializer = value_serializer
cpdef inline write(self, Buffer buffer, value):
buffer.write_varint32(len(value))
cdef ClassInfo key_classinfo
cdef ClassInfo value_classinfo
for k, v in value.items():
key_cls = type(k)
if key_cls is str:
buffer.write_int16(NOT_NULL_STRING_FLAG)
buffer.write_string(k)
else:
if not self.ref_resolver.write_ref_or_null(buffer, k):
key_classinfo = self.class_resolver.get_or_create_classinfo(key_cls)
self.class_resolver.write_classinfo(buffer, key_classinfo)
key_classinfo.serializer.write(buffer, k)
value_cls = type(v)
if value_cls is str:
buffer.write_int16(NOT_NULL_STRING_FLAG)
buffer.write_string(v)
elif value_cls is int:
buffer.write_int16(NOT_NULL_PYINT_FLAG)
buffer.write_varint64(v)
elif value_cls is bool:
buffer.write_int16(NOT_NULL_PYBOOL_FLAG)
buffer.write_bool(v)
elif value_cls is float:
buffer.write_int16(NOT_NULL_PYFLOAT_FLAG)
buffer.write_double(v)
else:
if not self.ref_resolver.write_ref_or_null(buffer, v):
value_classinfo = self.class_resolver. \
get_or_create_classinfo(value_cls)
self.class_resolver.write_classinfo(buffer, value_classinfo)
value_classinfo.serializer.write(buffer, v)
cpdef inline read(self, Buffer buffer):
cdef MapRefResolver ref_resolver = self.fury.ref_resolver
cdef ClassResolver class_resolver = self.fury.class_resolver
map_ = self.type_()
ref_resolver.reference(map_)
cdef int32_t len_ = buffer.read_varint32()
cdef int32_t ref_id
cdef ClassInfo key_classinfo
cdef ClassInfo value_classinfo
for i in range(len_):
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
key = ref_resolver.get_read_object()
else:
key_classinfo = class_resolver.read_classinfo(buffer)
if key_classinfo.cls is str:
key = buffer.read_string()
else:
key = key_classinfo.serializer.read(buffer)
ref_resolver.set_read_object(ref_id, key)
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
value = ref_resolver.get_read_object()
else:
value_classinfo = class_resolver.read_classinfo(buffer)
cls = value_classinfo.cls
if cls is str:
value = buffer.read_string()
elif cls is int:
value = buffer.read_varint64()
elif cls is bool:
value = buffer.read_bool()
elif cls is float:
value = buffer.read_double()
else:
value = value_classinfo.serializer.read(buffer)
ref_resolver.set_read_object(ref_id, value)
map_[key] = value
return map_
# Use numpy array or python array module.
typecode_dict = {
# use bytes serializer for byte array.
"h": (2, FuryType.FURY_PRIMITIVE_SHORT_ARRAY.value),
"i": (4, FuryType.FURY_PRIMITIVE_INT_ARRAY.value),
"l": (8, FuryType.FURY_PRIMITIVE_LONG_ARRAY.value),
"f": (4, FuryType.FURY_PRIMITIVE_FLOAT_ARRAY.value),
"d": (8, FuryType.FURY_PRIMITIVE_DOUBLE_ARRAY.value),
}
if np:
typecode_dict = {
k: (itemsize, -type_id) for k, (itemsize, type_id) in typecode_dict.items()
}
@cython.final
cdef class PyArraySerializer(CrossLanguageCompatibleSerializer):
typecode_dict = typecode_dict
typecodearray_type = {
"h": Int16ArrayType,
"i": Int32ArrayType,
"l": Int64ArrayType,
"f": Float32ArrayType,
"d": Float64ArrayType,
}
cdef str typecode
cdef int8_t itemsize
cdef int16_t type_id
def __init__(self, fury, type_, str typecode):
super().__init__(fury, type_)
self.typecode = typecode
self.itemsize, self.type_id = PyArraySerializer.typecode_dict[self.typecode]
cpdef int16_t get_xtype_id(self):
return self.type_id
cpdef inline xwrite(self, Buffer buffer, value):
assert value.itemsize == self.itemsize
view = memoryview(value)
assert view.format == self.typecode
assert view.itemsize == self.itemsize
assert view.c_contiguous # TODO handle contiguous
cdef int32_t nbytes = len(value) * self.itemsize
buffer.write_varint32(nbytes)
buffer.write_buffer(value)
cpdef inline xread(self, Buffer buffer):
data = buffer.read_bytes_and_size()
arr = array.array(self.typecode, [])
arr.frombytes(data)
return arr
cpdef inline write(self, Buffer buffer, value: array.array):
cdef int32_t nbytes = len(value) * value.itemsize
buffer.write_string(value.typecode)
buffer.write_varint32(nbytes)
buffer.write_buffer(value)
cpdef inline read(self, Buffer buffer):
typecode = buffer.read_string()
data = buffer.read_bytes_and_size()
arr = array.array(typecode, [])
arr.frombytes(data)
return arr
if np:
_np_dtypes_dict = {
# use bytes serializer for byte array.
np.dtype(np.bool_): (1, "?", FuryType.FURY_PRIMITIVE_BOOL_ARRAY.value),
np.dtype(np.int16): (2, "h", FuryType.FURY_PRIMITIVE_SHORT_ARRAY.value),
np.dtype(np.int32): (4, "i", FuryType.FURY_PRIMITIVE_INT_ARRAY.value),
np.dtype(np.int64): (8, "l", FuryType.FURY_PRIMITIVE_LONG_ARRAY.value),
np.dtype(np.float32): (4, "f", FuryType.FURY_PRIMITIVE_FLOAT_ARRAY.value),
np.dtype(np.float64): (8, "d", FuryType.FURY_PRIMITIVE_DOUBLE_ARRAY.value),
}
else:
_np_dtypes_dict = {}
@cython.final
cdef class Numpy1DArraySerializer(CrossLanguageCompatibleSerializer):
dtypes_dict = _np_dtypes_dict
cdef object dtype
cdef str typecode
cdef int8_t itemsize
cdef int16_t type_id
def __init__(self, fury, type_, dtype):
super().__init__(fury, type_)
self.dtype = dtype
self.itemsize, self.typecode, self.type_id = _np_dtypes_dict[self.dtype]
cpdef int16_t get_xtype_id(self):
return self.type_id
cpdef inline xwrite(self, Buffer buffer, value):
assert value.itemsize == self.itemsize
view = memoryview(value)
try:
assert view.format == self.typecode
except AssertionError as e:
raise e
assert view.itemsize == self.itemsize
cdef int32_t nbytes = len(value) * self.itemsize
buffer.write_varint32(nbytes)
if self.dtype == np.dtype("bool") or not view.c_contiguous:
buffer.write_bytes(value.tobytes())
else:
buffer.write_buffer(value)
cpdef inline xread(self, Buffer buffer):
data = buffer.read_bytes_and_size()
return np.frombuffer(data, dtype=self.dtype)
cpdef inline write(self, Buffer buffer, value):
self.fury.handle_unsupported_write(buffer, value)
cpdef inline read(self, Buffer buffer):
return self.fury.handle_unsupported_read(buffer)
cdef _get_hash(Fury fury, list field_names, dict type_hints):
from pyfury._struct import StructHashVisitor
visitor = StructHashVisitor(fury)
for index, key in enumerate(field_names):
infer_field(key, type_hints[key], visitor, types_path=[])
hash_ = visitor.get_hash()
assert hash_ != 0
return hash_
cdef class ComplexObjectSerializer(Serializer):
cdef str _type_tag
cdef object _type_hints
cdef list _field_names
cdef list _serializers
cdef int32_t _hash
def __init__(self, fury, clz: type, type_tag: str):
super().__init__(fury, clz)
self._type_tag = type_tag
self._type_hints = get_type_hints(clz)
self._field_names = sorted(self._type_hints.keys())
self._serializers = [None] * len(self._field_names)
from pyfury._struct import ComplexTypeVisitor
visitor = ComplexTypeVisitor(fury)
for index, key in enumerate(self._field_names):
serializer = infer_field(key, self._type_hints[key], visitor, types_path=[])
self._serializers[index] = serializer
if self.fury.language == Language.PYTHON:
logger.warning(
"Type of class %s shouldn't be serialized using cross-language "
"serializer",
clz,
)
self._hash = 0
cpdef int16_t get_xtype_id(self):
return FuryType.FURY_TYPE_TAG.value
cpdef str get_xtype_tag(self):
return self._type_tag
cpdef write(self, Buffer buffer, value):
return self.xwrite(buffer, value)
cpdef read(self, Buffer buffer):
return self.xread(buffer)
cpdef xwrite(self, Buffer buffer, value):
if self._hash == 0:
self._hash = _get_hash(self.fury, self._field_names, self._type_hints)
buffer.write_int32(self._hash)
cdef Serializer serializer
cdef int32_t index
for index, field_name in enumerate(self._field_names):
field_value = getattr(value, field_name)
serializer = self._serializers[index]
self.fury.xserialize_ref(
buffer, field_value, serializer=serializer
)
cpdef xread(self, Buffer buffer):
cdef int32_t hash_ = buffer.read_int32()
if self._hash == 0:
self._hash = _get_hash(self.fury, self._field_names, self._type_hints)
if hash_ != self._hash:
raise ClassNotCompatibleError(
f"Hash {hash_} is not consistent with {self._hash} "
f"for class {self.type_}",
)
obj = self.type_.__new__(self.type_)
self.fury.ref_resolver.reference(obj)
cdef Serializer serializer
cdef int32_t index
for index, field_name in enumerate(self._field_names):
serializer = self._serializers[index]
field_value = self.fury.xdeserialize_ref(
buffer, serializer=serializer
)
setattr(
obj,
field_name,
field_value,
)
return obj
@cython.final
cdef class EnumSerializer(Serializer):
@classmethod
def support_subclass(cls) -> bool:
return True
cpdef inline write(self, Buffer buffer, value):
buffer.write_string(value.name)
cpdef inline read(self, Buffer buffer):
name = buffer.read_string()
return getattr(self.type_, name)
cpdef inline xwrite(self, Buffer buffer, value):
raise NotImplementedError
cpdef inline xread(self, Buffer buffer):
raise NotImplementedError
@cython.final
cdef class SliceSerializer(Serializer):
cpdef inline write(self, Buffer buffer, v):
cdef slice value = v
start, stop, step = value.start, value.stop, value.step
if type(start) is int:
# TODO support varint128
buffer.write_int16(NOT_NULL_PYINT_FLAG)
buffer.write_varint64(start)
else:
if start is None:
buffer.write_int8(NULL_FLAG)
else:
buffer.write_int8(NOT_NULL_VALUE_FLAG)
self.fury.serialize_nonref(buffer, start)
if type(stop) is int:
# TODO support varint128
buffer.write_int16(NOT_NULL_PYINT_FLAG)
buffer.write_varint64(stop)
else:
if stop is None:
buffer.write_int8(NULL_FLAG)
else:
buffer.write_int8(NOT_NULL_VALUE_FLAG)
self.fury.serialize_nonref(buffer, stop)
if type(step) is int:
# TODO support varint128
buffer.write_int16(NOT_NULL_PYINT_FLAG)
buffer.write_varint64(step)
else:
if step is None:
buffer.write_int8(NULL_FLAG)
else:
buffer.write_int8(NOT_NULL_VALUE_FLAG)
self.fury.serialize_nonref(buffer, step)
cpdef inline read(self, Buffer buffer):
if buffer.read_int8() == NULL_FLAG:
start = None
else:
start = self.fury.deserialize_nonref(buffer)
if buffer.read_int8() == NULL_FLAG:
stop = None
else:
stop = self.fury.deserialize_nonref(buffer)
if buffer.read_int8() == NULL_FLAG:
step = None
else:
step = self.fury.deserialize_nonref(buffer)
return slice(start, stop, step)
cpdef xwrite(self, Buffer buffer, value):
raise NotImplementedError
cpdef xread(self, Buffer buffer):
raise NotImplementedError
@cython.final
cdef class PickleSerializer(Serializer):
cpdef inline xwrite(self, Buffer buffer, value):
raise NotImplementedError
cpdef inline xread(self, Buffer buffer):
raise NotImplementedError
cpdef inline write(self, Buffer buffer, value):
self.fury.handle_unsupported_write(buffer, value)
cpdef inline read(self, Buffer buffer):
return self.fury.handle_unsupported_read(buffer)