| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # distutils: language = c++ |
| # cython: embedsignature = True |
| # cython: language_level = 3 |
| # cython: annotate = True |
| import datetime |
| import logging |
| import os |
| import platform |
| import time |
| import warnings |
| from typing import TypeVar, Union, Iterable |
| |
| from pyfory._util import get_bit, set_bit, clear_bit |
| from pyfory import _fory as fmod |
| from pyfory._fory import Language |
| from pyfory._fory import _ENABLE_TYPE_REGISTRATION_FORCIBLY |
| from pyfory.lib import mmh3 |
| from pyfory.meta.metastring import Encoding |
| from pyfory.type import is_primitive_type |
| from pyfory.policy import DeserializationPolicy, DEFAULT_POLICY |
| from pyfory.util import is_little_endian |
| from pyfory.includes.libserialization cimport \ |
| (TypeId, IsNamespacedType, IsTypeShareMeta, Fory_PyBooleanSequenceWriteToBuffer, Fory_PyFloatSequenceWriteToBuffer) |
| |
| from libc.stdint cimport int8_t, int16_t, int32_t, int64_t, uint64_t |
| from libc.stdint cimport * |
| from libcpp.vector cimport vector |
| from cpython cimport PyObject |
| from cpython.dict cimport PyDict_Next |
| from cpython.ref cimport * |
| from cpython.list cimport PyList_New, PyList_SET_ITEM |
| from cpython.tuple cimport PyTuple_New, PyTuple_SET_ITEM |
| from libcpp cimport bool as c_bool |
| from libcpp.utility cimport pair |
| from cython.operator cimport dereference as deref |
| from pyfory._util cimport Buffer |
| from pyfory.includes.libabsl cimport flat_hash_map |
| from pyfory.meta.metastring import MetaStringDecoder |
| |
| try: |
| import numpy as np |
| except ImportError: |
| np = None |
| |
| cimport cython |
| |
| logger = logging.getLogger(__name__) |
| ENABLE_FORY_CYTHON_SERIALIZATION = os.environ.get( |
| "ENABLE_FORY_CYTHON_SERIALIZATION", "True").lower() in ("true", "1") |
| |
| cdef extern from *: |
| """ |
| #define int2obj(obj_addr) ((PyObject *)(obj_addr)) |
| #define obj2int(obj_ref) (Py_INCREF(obj_ref), ((int64_t)(obj_ref))) |
| """ |
| object int2obj(int64_t obj_addr) |
| int64_t obj2int(object obj_ref) |
| dict _PyDict_NewPresized(Py_ssize_t minused) |
| Py_ssize_t Py_SIZE(object obj) |
| |
| |
| cdef int8_t NULL_FLAG = -3 |
| # This flag indicates that object is a not-null value. |
| # We don't use another byte to indicate REF, so that we can save one byte. |
| cdef int8_t REF_FLAG = -2 |
| # this flag indicates that the object is a non-null value. |
| cdef int8_t NOT_NULL_VALUE_FLAG = -1 |
| # this flag indicates that the object is a referencable and first read. |
| cdef int8_t REF_VALUE_FLAG = 0 |
| # Global MetaString decoder for namespace bytes to str |
| namespace_decoder = MetaStringDecoder(".", "_") |
| # Global MetaString decoder for typename bytes to str |
| typename_decoder = MetaStringDecoder("$", "_") |
| |
| |
| @cython.final |
| cdef class MapRefResolver: |
| """ |
| Manages object reference tracking during serialization and deserialization. |
| |
| Handles shared and circular references by assigning unique IDs to objects |
| during serialization and resolving them during deserialization. This enables |
| efficient serialization of object graphs with duplicate references and prevents |
| infinite recursion with circular references. |
| |
| When ref_tracking is enabled, duplicate object references are serialized only once, |
| with subsequent references storing only the reference ID. During deserialization, |
| the resolver maintains a mapping to reconstruct the exact same object graph structure. |
| |
| Note: |
| This is an internal class used by the Fory serializer. Users typically don't |
| interact with this class directly. |
| """ |
| cdef flat_hash_map[uint64_t, int32_t] written_objects_id # id(obj) -> ref_id |
| # Hold object to avoid tmp object gc when serialize nested fields/objects. |
| cdef vector[PyObject *] written_objects |
| cdef vector[PyObject *] read_objects |
| cdef vector[int32_t] read_ref_ids |
| cdef object read_object |
| cdef c_bool ref_tracking |
| |
| def __cinit__(self, c_bool ref): |
| self.read_object = None |
| self.ref_tracking = ref |
| |
| # Special methods of extension types must be declared with def, not cdef. |
| def __dealloc__(self): |
| self.reset() |
| |
| cpdef inline c_bool write_ref_or_null(self, Buffer buffer, obj): |
| if not self.ref_tracking: |
| if obj is None: |
| buffer.write_int8(NULL_FLAG) |
| return True |
| else: |
| buffer.write_int8(NOT_NULL_VALUE_FLAG) |
| return False |
| if obj is None: |
| buffer.write_int8(NULL_FLAG) |
| return True |
| cdef uint64_t object_id = <uintptr_t> <PyObject *> obj |
| cdef int32_t next_id |
| cdef flat_hash_map[uint64_t, int32_t].iterator it = \ |
| self.written_objects_id.find(object_id) |
| if it == self.written_objects_id.end(): |
| next_id = self.written_objects_id.size() |
| self.written_objects_id[object_id] = next_id |
| self.written_objects.push_back(<PyObject *> obj) |
| Py_INCREF(obj) |
| buffer.write_int8(REF_VALUE_FLAG) |
| return False |
| else: |
| # The obj has been written previously. |
| buffer.write_int8(REF_FLAG) |
| buffer.write_varuint32(<uint64_t> deref(it).second) |
| return True |
| |
| cpdef inline int8_t read_ref_or_null(self, Buffer buffer): |
| cdef int8_t head_flag = buffer.read_int8() |
| if not self.ref_tracking: |
| return head_flag |
| cdef int32_t ref_id |
| cdef PyObject * obj |
| if head_flag == REF_FLAG: |
| # read reference id and get object from reference resolver |
| ref_id = buffer.read_varuint32() |
| assert 0 <= ref_id < self.read_objects.size(), f"Invalid ref id {ref_id}, current size {self.read_objects.size()}" |
| obj = self.read_objects[ref_id] |
| assert obj != NULL, f"Invalid ref id {ref_id}, current size {self.read_objects.size()}" |
| self.read_object = <object> obj |
| return REF_FLAG |
| else: |
| self.read_object = None |
| return head_flag |
| |
| cpdef inline int32_t preserve_ref_id(self): |
| if not self.ref_tracking: |
| return -1 |
| next_read_ref_id = self.read_objects.size() |
| self.read_objects.push_back(NULL) |
| self.read_ref_ids.push_back(next_read_ref_id) |
| return next_read_ref_id |
| |
| cpdef inline int32_t try_preserve_ref_id(self, Buffer buffer): |
| if not self.ref_tracking: |
| # `NOT_NULL_VALUE_FLAG` can be used as stub reference id because we use |
| # `refId >= NOT_NULL_VALUE_FLAG` to read data. |
| return buffer.read_int8() |
| head_flag = buffer.read_int8() |
| cdef int32_t ref_id |
| cdef PyObject *obj |
| if head_flag == REF_FLAG: |
| # read reference id and get object from reference resolver |
| ref_id = buffer.read_varuint32() |
| # avoid wrong id cause crash |
| assert 0 <= ref_id < self.read_objects.size(), f"Invalid ref id {ref_id}, current size {self.read_objects.size()}" |
| obj = self.read_objects[ref_id] |
| assert obj != NULL, f"Invalid ref id {ref_id}, current size {self.read_objects.size()}" |
| self.read_object = <object> obj |
| # `head_flag` except `REF_FLAG` can be used as stub reference id because |
| # we use `refId >= NOT_NULL_VALUE_FLAG` to read data. |
| return head_flag |
| else: |
| self.read_object = None |
| if head_flag == REF_VALUE_FLAG: |
| return self.preserve_ref_id() |
| # For NOT_NULL_VALUE_FLAG, push -1 to read_ref_ids so reference() knows |
| # this object is not referenceable (it's a value type, not a reference type) |
| self.read_ref_ids.push_back(-1) |
| return head_flag |
| |
| cpdef inline int32_t last_preserved_ref_id(self): |
| cdef int32_t length = self.read_ref_ids.size() |
| assert length > 0 |
| return self.read_ref_ids[length - 1] |
| |
| cpdef inline reference(self, obj): |
| if not self.ref_tracking: |
| return |
| cdef int32_t ref_id = self.read_ref_ids.back() |
| self.read_ref_ids.pop_back() |
| # When NOT_NULL_VALUE_FLAG was read instead of REF_VALUE_FLAG, |
| # -1 is pushed to read_ref_ids. This means the object is a value type |
| # (not a reference type), so we skip reference tracking. |
| if ref_id < 0: |
| return |
| cdef c_bool need_inc = self.read_objects[ref_id] == NULL |
| if need_inc: |
| Py_INCREF(obj) |
| self.read_objects[ref_id] = <PyObject *> obj |
| |
| cpdef inline get_read_object(self, id_=None): |
| if not self.ref_tracking: |
| return None |
| if id_ is None: |
| return self.read_object |
| cdef int32_t ref_id = id_ |
| cdef PyObject * obj = self.read_objects[ref_id] |
| if obj == NULL: |
| return None |
| return <object> obj |
| |
| cpdef inline set_read_object(self, int32_t ref_id, obj): |
| if not self.ref_tracking: |
| return |
| if ref_id >= 0: |
| need_inc = self.read_objects[ref_id] == NULL |
| if need_inc: |
| Py_INCREF(obj) |
| self.read_objects[ref_id] = <PyObject *> obj |
| |
| cpdef inline reset(self): |
| self.reset_write() |
| self.reset_read() |
| |
| cpdef inline reset_write(self): |
| self.written_objects_id.clear() |
| for item in self.written_objects: |
| Py_XDECREF(item) |
| self.written_objects.clear() |
| |
| cpdef inline reset_read(self): |
| if not self.ref_tracking: |
| return |
| for item in self.read_objects: |
| Py_XDECREF(item) |
| self.read_objects.clear() |
| self.read_ref_ids.clear() |
| self.read_object = None |
| |
| |
| cdef int8_t USE_TYPE_NAME = 0 |
| cdef int8_t USE_TYPE_ID = 1 |
| # preserve 0 as flag for type id not set in TypeInfo` |
| cdef int8_t NO_TYPE_ID = 0 |
| cdef int8_t DEFAULT_DYNAMIC_WRITE_META_STR_ID = fmod.DEFAULT_DYNAMIC_WRITE_META_STR_ID |
| cdef int8_t INT64_TYPE_ID = fmod.INT64_TYPE_ID |
| cdef int8_t FLOAT64_TYPE_ID = fmod.FLOAT64_TYPE_ID |
| cdef int8_t BOOL_TYPE_ID = fmod.BOOL_TYPE_ID |
| cdef int8_t STRING_TYPE_ID = fmod.STRING_TYPE_ID |
| |
| cdef int16_t MAGIC_NUMBER = fmod.MAGIC_NUMBER |
| cdef int32_t NOT_NULL_INT64_FLAG = fmod.NOT_NULL_INT64_FLAG |
| cdef int32_t NOT_NULL_FLOAT64_FLAG = fmod.NOT_NULL_FLOAT64_FLAG |
| cdef int32_t NOT_NULL_BOOL_FLAG = fmod.NOT_NULL_BOOL_FLAG |
| cdef int32_t NOT_NULL_STRING_FLAG = fmod.NOT_NULL_STRING_FLAG |
| cdef int32_t SMALL_STRING_THRESHOLD = fmod.SMALL_STRING_THRESHOLD |
| |
| |
| @cython.final |
| cdef class MetaStringBytes: |
| cdef public bytes data |
| cdef int16_t length |
| cdef public int8_t encoding |
| cdef public int64_t hashcode |
| cdef public int16_t dynamic_write_string_id |
| |
| def __init__(self, data, hashcode): |
| self.data = data |
| self.length = len(data) |
| self.hashcode = hashcode |
| self.encoding = hashcode & 0xff |
| self.dynamic_write_string_id = DEFAULT_DYNAMIC_WRITE_META_STR_ID |
| |
| def __eq__(self, other): |
| return type(other) is MetaStringBytes and other.hashcode == self.hashcode |
| |
| def __hash__(self): |
| return self.hashcode |
| |
| def decode(self, decoder): |
| return decoder.decode(self.data, Encoding(self.encoding)) |
| |
| def __repr__(self): |
| return f"MetaStringBytes(data={self.data}, hashcode={self.hashcode})" |
| |
| |
| @cython.final |
| cdef class MetaStringResolver: |
| cdef: |
| int16_t dynamic_write_string_id |
| vector[PyObject *] _c_dynamic_written_enum_string |
| vector[PyObject *] _c_dynamic_id_to_enum_string_vec |
| # hash -> MetaStringBytes |
| flat_hash_map[int64_t, PyObject *] _c_hash_to_metastr_bytes |
| flat_hash_map[pair[int64_t, int64_t], PyObject *] _c_hash_to_small_metastring_bytes |
| set _enum_str_set |
| dict _metastr_to_metastr_bytes |
| |
| def __init__(self): |
| self._enum_str_set = set() |
| self._metastr_to_metastr_bytes = dict() |
| |
| cpdef inline write_meta_string_bytes( |
| self, Buffer buffer, MetaStringBytes metastr_bytes): |
| cdef int16_t dynamic_type_id = metastr_bytes.dynamic_write_string_id |
| cdef int32_t length = metastr_bytes.length |
| if dynamic_type_id == DEFAULT_DYNAMIC_WRITE_META_STR_ID: |
| dynamic_type_id = self.dynamic_write_string_id |
| metastr_bytes.dynamic_write_string_id = dynamic_type_id |
| self.dynamic_write_string_id += 1 |
| self._c_dynamic_written_enum_string.push_back(<PyObject *> metastr_bytes) |
| buffer.write_varuint32(length << 1) |
| if length <= SMALL_STRING_THRESHOLD: |
| buffer.write_int8(metastr_bytes.encoding) |
| else: |
| buffer.write_int64(metastr_bytes.hashcode) |
| buffer.write_bytes(metastr_bytes.data) |
| else: |
| buffer.write_varuint32(((dynamic_type_id + 1) << 1) | 1) |
| |
| cpdef inline MetaStringBytes read_meta_string_bytes(self, Buffer buffer): |
| cdef int32_t header = buffer.read_varuint32() |
| cdef int32_t length = header >> 1 |
| if header & 0b1 != 0: |
| return <MetaStringBytes> self._c_dynamic_id_to_enum_string_vec[length - 1] |
| cdef int64_t v1 = 0, v2 = 0, hashcode |
| cdef PyObject * enum_str_ptr |
| cdef int32_t reader_index |
| cdef encoding = 0 |
| if length <= SMALL_STRING_THRESHOLD: |
| encoding = buffer.read_int8() |
| if length <= 8: |
| v1 = buffer.read_bytes_as_int64(length) |
| else: |
| v1 = buffer.read_int64() |
| v2 = buffer.read_bytes_as_int64(length - 8) |
| hashcode = ((v1 * 31 + v2) >> 8 << 8) | encoding |
| enum_str_ptr = self._c_hash_to_small_metastring_bytes[pair[int64_t, int64_t](v1, v2)] |
| if enum_str_ptr == NULL: |
| reader_index = buffer.reader_index |
| str_bytes = buffer.get_bytes(reader_index - length, length) |
| enum_str = MetaStringBytes(str_bytes, hashcode=hashcode) |
| self._enum_str_set.add(enum_str) |
| enum_str_ptr = <PyObject *> enum_str |
| self._c_hash_to_small_metastring_bytes[pair[int64_t, int64_t](v1, v2)] = enum_str_ptr |
| else: |
| hashcode = buffer.read_int64() |
| reader_index = buffer.reader_index |
| buffer.check_bound(reader_index, length) |
| buffer.reader_index = reader_index + length |
| enum_str_ptr = self._c_hash_to_metastr_bytes[hashcode] |
| if enum_str_ptr == NULL: |
| str_bytes = buffer.get_bytes(reader_index, length) |
| enum_str = MetaStringBytes(str_bytes, hashcode=hashcode) |
| self._enum_str_set.add(enum_str) |
| enum_str_ptr = <PyObject *> enum_str |
| self._c_hash_to_metastr_bytes[hashcode] = enum_str_ptr |
| self._c_dynamic_id_to_enum_string_vec.push_back(enum_str_ptr) |
| return <MetaStringBytes> enum_str_ptr |
| |
| cpdef inline get_metastr_bytes(self, metastr): |
| metastr_bytes = self._metastr_to_metastr_bytes.get(metastr) |
| if metastr_bytes is not None: |
| return metastr_bytes |
| cdef int64_t v1 = 0, v2 = 0, hashcode |
| length = len(metastr.encoded_data) |
| if length <= SMALL_STRING_THRESHOLD: |
| data_buf = Buffer(metastr.encoded_data) |
| if length <= 8: |
| v1 = data_buf.read_bytes_as_int64(length) |
| else: |
| v1 = data_buf.read_int64() |
| v2 = data_buf.read_bytes_as_int64(length - 8) |
| value_hash = ((v1 * 31 + v2) >> 8 << 8) | metastr.encoding.value |
| else: |
| value_hash = mmh3.hash_buffer(metastr.encoded_data, seed=47)[0] |
| value_hash = value_hash >> 8 << 8 |
| value_hash |= metastr.encoding.value & 0xFF |
| self._metastr_to_metastr_bytes[metastr] = metastr_bytes = MetaStringBytes(metastr.encoded_data, value_hash) |
| return metastr_bytes |
| |
| cpdef inline reset_read(self): |
| self._c_dynamic_id_to_enum_string_vec.clear() |
| |
| cpdef inline reset_write(self): |
| if self.dynamic_write_string_id != 0: |
| self.dynamic_write_string_id = 0 |
| for ptr in self._c_dynamic_written_enum_string: |
| (<MetaStringBytes> ptr).dynamic_write_string_id = \ |
| DEFAULT_DYNAMIC_WRITE_META_STR_ID |
| self._c_dynamic_written_enum_string.clear() |
| |
| |
| @cython.final |
| cdef class TypeInfo: |
| """ |
| If dynamic_type is true, the serializer will be a dynamic typed serializer |
| and it will write type info when writing the data. |
| In such cases, the `write_typeinfo` should not write typeinfo. |
| In general, if we have 4 type for one class, we will have 5 serializers. |
| For example, we have int8/16/32/64/128 for python `int` type, then we have 6 serializers |
| for python `int`: `Int8/1632/64/128Serializer` for `int8/16/32/64/128` each, and another |
| `IntSerializer` for `int` which will dispatch to different `int8/16/32/64/128` type |
| according the actual value. |
| We do not get the actual type here, because it will introduce extra computing. |
| For example, we have want to get actual `Int8/16/32/64Serializer`, we must check and |
| extract the actual here which will introduce cost, and we will do same thing again |
| when serializing the actual data. |
| """ |
| cdef public object cls |
| cdef public int32_t type_id |
| cdef public Serializer serializer |
| cdef public MetaStringBytes namespace_bytes |
| cdef public MetaStringBytes typename_bytes |
| cdef public c_bool dynamic_type |
| cdef public object type_def |
| |
| def __init__( |
| self, |
| cls: Union[type, TypeVar] = None, |
| type_id: int = NO_TYPE_ID, |
| serializer: Serializer = None, |
| namespace_bytes: MetaStringBytes = None, |
| typename_bytes: MetaStringBytes = None, |
| dynamic_type: bool = False, |
| type_def: object = None |
| ): |
| self.cls = cls |
| self.type_id = type_id |
| self.serializer = serializer |
| self.namespace_bytes = namespace_bytes |
| self.typename_bytes = typename_bytes |
| self.dynamic_type = dynamic_type |
| self.type_def = type_def |
| |
| def __repr__(self): |
| return f"TypeInfo(cls={self.cls}, type_id={self.type_id}, " \ |
| f"serializer={self.serializer})" |
| |
| cpdef str decode_namespace(self): |
| if self.namespace_bytes is None: |
| return "" |
| return self.namespace_bytes.decode(namespace_decoder) |
| |
| cpdef str decode_typename(self): |
| if self.typename_bytes is None: |
| return "" |
| return self.typename_bytes.decode(typename_decoder) |
| |
| |
| @cython.final |
| cdef class TypeResolver: |
| """ |
| Manages type registration, resolution, and serializer dispatch. |
| |
| TypeResolver maintains mappings between Python types and their corresponding |
| serialization metadata (TypeInfo), including serializers, type IDs, and cross- |
| language type names. It handles both registered types (with explicit type IDs) |
| and dynamic types (resolved at runtime). |
| |
| For cross-language serialization, TypeResolver coordinates namespace and typename |
| encoding using MetaString compression, and manages type definition sharing when |
| compatible mode is enabled. |
| |
| The resolver uses high-performance C++ hash maps for fast type lookups during |
| serialization and deserialization. |
| |
| Note: |
| This is an internal class used by the Fory serializer. Users typically don't |
| interact with this class directly, but instead use Fory.register() methods. |
| """ |
| cdef: |
| readonly Fory fory |
| readonly MetaStringResolver metastring_resolver |
| object _resolver |
| vector[PyObject *] _c_registered_id_to_type_info |
| # cls -> TypeInfo |
| flat_hash_map[uint64_t, PyObject *] _c_types_info |
| # hash -> TypeInfo |
| flat_hash_map[pair[int64_t, int64_t], PyObject *] _c_meta_hash_to_typeinfo |
| MetaStringResolver meta_string_resolver |
| c_bool meta_share |
| readonly SerializationContext serialization_context |
| |
| def __init__(self, fory, meta_share=False, meta_compressor=None): |
| self.fory = fory |
| self.metastring_resolver = fory.metastring_resolver |
| self.meta_share = meta_share |
| from pyfory._registry import TypeResolver |
| self._resolver = TypeResolver(fory, meta_share=meta_share, meta_compressor=meta_compressor) |
| |
| def initialize(self): |
| self._resolver.initialize() |
| for typeinfo in self._resolver._types_info.values(): |
| self._populate_typeinfo(typeinfo) |
| self.serialization_context = self.fory.serialization_context |
| |
| def register( |
| self, |
| cls: Union[type, TypeVar], |
| *, |
| type_id: int = None, |
| namespace: str = None, |
| typename: str = None, |
| serializer=None, |
| ): |
| self.register_type(cls, type_id=type_id, namespace=namespace, typename=typename, serializer=serializer) |
| |
| def register_type( |
| self, |
| cls: Union[type, TypeVar], |
| *, |
| type_id: int = None, |
| namespace: str = None, |
| typename: str = None, |
| serializer=None, |
| ): |
| typeinfo = self._resolver.register_type( |
| cls, |
| type_id=type_id, |
| namespace=namespace, |
| typename=typename, |
| serializer=serializer, |
| ) |
| self._populate_typeinfo(typeinfo) |
| |
| cdef _populate_typeinfo(self, typeinfo): |
| type_id = typeinfo.type_id |
| if type_id >= self._c_registered_id_to_type_info.size(): |
| self._c_registered_id_to_type_info.resize(type_id * 2, NULL) |
| if type_id > 0 and (self.fory.language == Language.PYTHON or not IsNamespacedType(type_id)): |
| self._c_registered_id_to_type_info[type_id] = <PyObject *> typeinfo |
| self._c_types_info[<uintptr_t> <PyObject *> typeinfo.cls] = <PyObject *> typeinfo |
| # Resize if load factor >= 0.4 (using integer arithmetic: size/capacity >= 4/10) |
| if self._c_types_info.size() * 10 >= self._c_types_info.bucket_count() * 5: |
| self._c_types_info.rehash(self._c_types_info.size() * 2) |
| if typeinfo.typename_bytes is not None: |
| self._load_bytes_to_typeinfo(type_id, typeinfo.namespace_bytes, typeinfo.typename_bytes) |
| |
| def register_serializer(self, cls: Union[type, TypeVar], serializer): |
| typeinfo1 = self._resolver.get_typeinfo(cls) |
| self._resolver.register_serializer(cls, serializer) |
| typeinfo2 = self._resolver.get_typeinfo(cls) |
| if typeinfo1.type_id != typeinfo2.type_id: |
| self._c_registered_id_to_type_info[typeinfo1.type_id] = NULL |
| self._populate_typeinfo(typeinfo2) |
| |
| cpdef inline Serializer get_serializer(self, cls): |
| """ |
| Returns |
| ------- |
| Returns or create serializer for the provided type |
| """ |
| return self.get_typeinfo(cls).serializer |
| |
| cpdef inline TypeInfo get_typeinfo(self, cls, create=True): |
| cdef PyObject * typeinfo_ptr = self._c_types_info[<uintptr_t> <PyObject *> cls] |
| cdef TypeInfo type_info |
| if typeinfo_ptr != NULL: |
| type_info = <object> typeinfo_ptr |
| if type_info.serializer is not None: |
| return type_info |
| else: |
| type_info.serializer = self._resolver.get_typeinfo(cls).serializer |
| return type_info |
| elif not create: |
| return None |
| else: |
| type_info = self._resolver.get_typeinfo(cls, create=create) |
| self._c_types_info[<uintptr_t> <PyObject *> cls] = <PyObject *> type_info |
| self._populate_typeinfo(type_info) |
| return type_info |
| |
| cpdef inline is_registered_by_name(self, cls): |
| return self._resolver.is_registered_by_name(cls) |
| |
| cpdef inline is_registered_by_id(self, cls): |
| return self._resolver.is_registered_by_id(cls) |
| |
| cpdef inline get_registered_name(self, cls): |
| return self._resolver.get_registered_name(cls) |
| |
| cpdef inline get_registered_id(self, cls): |
| return self._resolver.get_registered_id(cls) |
| |
| cdef inline TypeInfo _load_bytes_to_typeinfo( |
| self, int32_t type_id, MetaStringBytes ns_metabytes, MetaStringBytes type_metabytes): |
| cdef PyObject * typeinfo_ptr = self._c_meta_hash_to_typeinfo[ |
| pair[int64_t, int64_t](ns_metabytes.hashcode, type_metabytes.hashcode)] |
| if typeinfo_ptr != NULL: |
| return <TypeInfo> typeinfo_ptr |
| typeinfo = self._resolver._load_metabytes_to_typeinfo(ns_metabytes, type_metabytes) |
| typeinfo_ptr = <PyObject *> typeinfo |
| self._c_meta_hash_to_typeinfo[pair[int64_t, int64_t]( |
| ns_metabytes.hashcode, type_metabytes.hashcode)] = typeinfo_ptr |
| return typeinfo |
| |
| cpdef inline write_typeinfo(self, Buffer buffer, TypeInfo typeinfo): |
| if typeinfo.dynamic_type: |
| return |
| cdef: |
| int32_t type_id = typeinfo.type_id |
| int32_t internal_type_id = type_id & 0xFF |
| |
| if self.meta_share: |
| self.write_shared_type_meta(buffer, typeinfo) |
| return |
| |
| buffer.write_varuint32(type_id) |
| if IsNamespacedType(internal_type_id): |
| self.metastring_resolver.write_meta_string_bytes(buffer, typeinfo.namespace_bytes) |
| self.metastring_resolver.write_meta_string_bytes(buffer, typeinfo.typename_bytes) |
| |
| cpdef inline TypeInfo read_typeinfo(self, Buffer buffer): |
| if self.meta_share: |
| return self.read_shared_type_meta(buffer) |
| |
| cdef: |
| int32_t type_id = buffer.read_varuint32() |
| if type_id < 0: |
| type_id = -type_id |
| if type_id >= self._c_registered_id_to_type_info.size(): |
| raise ValueError(f"Unexpected type_id {type_id}") |
| cdef: |
| int32_t internal_type_id = type_id & 0xFF |
| MetaStringBytes namespace_bytes, typename_bytes |
| if IsNamespacedType(internal_type_id): |
| namespace_bytes = self.metastring_resolver.read_meta_string_bytes(buffer) |
| typename_bytes = self.metastring_resolver.read_meta_string_bytes(buffer) |
| return self._load_bytes_to_typeinfo(type_id, namespace_bytes, typename_bytes) |
| typeinfo_ptr = self._c_registered_id_to_type_info[type_id] |
| if typeinfo_ptr == NULL: |
| raise ValueError(f"Unexpected type_id {type_id}") |
| typeinfo = <TypeInfo> typeinfo_ptr |
| return typeinfo |
| |
| cpdef inline TypeInfo get_typeinfo_by_id(self, int32_t type_id): |
| if type_id >= self._c_registered_id_to_type_info.size() or type_id < 0 or IsNamespacedType(type_id & 0xFF): |
| raise ValueError(f"Unexpected type_id {type_id}") |
| typeinfo_ptr = self._c_registered_id_to_type_info[type_id] |
| if typeinfo_ptr == NULL: |
| raise ValueError(f"Unexpected type_id {type_id}") |
| typeinfo = <TypeInfo> typeinfo_ptr |
| return typeinfo |
| |
| cpdef inline get_typeinfo_by_name(self, namespace, typename): |
| return self._resolver.get_typeinfo_by_name(namespace=namespace, typename=typename) |
| |
| cpdef inline _set_typeinfo(self, typeinfo): |
| self._resolver._set_typeinfo(typeinfo) |
| |
| cpdef inline get_meta_compressor(self): |
| return self._resolver.get_meta_compressor() |
| |
| cpdef inline write_shared_type_meta(self, Buffer buffer, TypeInfo typeinfo): |
| """Write shared type meta information.""" |
| meta_context = self.serialization_context.meta_context |
| meta_context.write_shared_typeinfo(buffer, typeinfo) |
| |
| cpdef inline TypeInfo read_shared_type_meta(self, Buffer buffer): |
| """Read shared type meta information.""" |
| meta_context = self.serialization_context.meta_context |
| typeinfo = meta_context.read_shared_typeinfo(buffer) |
| return typeinfo |
| |
| cpdef inline write_type_defs(self, Buffer buffer): |
| """Write all type definitions that need to be sent.""" |
| self._resolver.write_type_defs(buffer) |
| |
| cpdef inline read_type_defs(self, Buffer buffer): |
| """Read all type definitions from the buffer.""" |
| self._resolver.read_type_defs(buffer) |
| |
| cpdef inline reset(self): |
| pass |
| |
| cpdef inline reset_read(self): |
| pass |
| |
| cpdef inline reset_write(self): |
| pass |
| |
| |
| @cython.final |
| cdef class MetaContext: |
| """ |
| Manages type metadata sharing across serializations in compatible mode. |
| |
| When compatible mode is enabled, MetaContext tracks type definitions (type names, |
| field names, field types) to enable efficient schema evolution. Instead of sending |
| full type metadata with every serialized object, the context sends type definitions |
| once and references them by ID in subsequent serializations. |
| |
| This enables forward/backward compatibility when struct fields are added or removed |
| between different versions of an application. |
| |
| Note: |
| This is an internal class used by SerializationContext. It is not thread-safe |
| and should only be used with a single Fory instance. |
| """ |
| cdef: |
| # Types which have sent definitions to peer |
| # Maps type objects to their assigned IDs |
| flat_hash_map[uint64_t, int32_t] _c_type_map |
| |
| # Counter for assigning new IDs |
| list _writing_type_defs |
| list _read_type_infos |
| object fory |
| object type_resolver |
| |
| def __cinit__(self, object fory): |
| self.fory = fory |
| self.type_resolver = fory.type_resolver |
| self._writing_type_defs = [] |
| self._read_type_infos = [] |
| |
| cpdef inline void write_shared_typeinfo(self, Buffer buffer, typeinfo): |
| """Add a type definition to the writing queue.""" |
| type_cls = typeinfo.cls |
| cdef int32_t type_id = typeinfo.type_id |
| cdef int32_t internal_type_id = type_id & 0xFF |
| buffer.write_varuint32(type_id) |
| if not IsTypeShareMeta(internal_type_id): |
| return |
| |
| cdef uint64_t type_addr = <uint64_t> <PyObject *> type_cls |
| cdef flat_hash_map[uint64_t, int32_t].iterator it = self._c_type_map.find(type_addr) |
| if it != self._c_type_map.end(): |
| buffer.write_varuint32(deref(it).second) |
| return |
| |
| cdef index = self._c_type_map.size() |
| buffer.write_varuint32(index) |
| self._c_type_map[type_addr] = index |
| type_def = typeinfo.type_def |
| if type_def is None: |
| self.type_resolver._set_typeinfo(typeinfo) |
| type_def = typeinfo.type_def |
| self._writing_type_defs.append(type_def) |
| |
| cpdef inline list get_writing_type_defs(self): |
| """Get all type definitions that need to be written.""" |
| return self._writing_type_defs |
| |
| cpdef inline reset_write(self): |
| """Reset write state.""" |
| self._writing_type_defs.clear() |
| self._c_type_map.clear() |
| |
| cpdef inline add_read_typeinfo(self, type_info): |
| """Add a type info read from peer.""" |
| self._read_type_infos.append(type_info) |
| |
| cpdef inline read_shared_typeinfo(self, Buffer buffer): |
| """Read a type info from buffer.""" |
| cdef type_id = buffer.read_varuint32() |
| if IsTypeShareMeta(type_id & 0xFF): |
| return self._read_type_infos[buffer.read_varuint32()] |
| return self.type_resolver.get_typeinfo_by_id(type_id) |
| |
| cpdef inline reset_read(self): |
| """Reset read state.""" |
| self._read_type_infos.clear() |
| |
| cpdef inline reset(self): |
| """Reset both read and write state.""" |
| self.reset_write() |
| self.reset_read() |
| |
| def __str__(self): |
| return self.__repr__() |
| |
| def __repr__(self): |
| return (f"MetaContext(" |
| f"read_infos={self._read_type_infos}, " |
| f"writing_defs={self._writing_type_defs})") |
| |
| |
| @cython.final |
| cdef class SerializationContext: |
| """ |
| Manages serialization state and metadata sharing across operations. |
| |
| SerializationContext provides a scoped storage for sharing data during serialization |
| and deserialization operations. When compatible mode is enabled, it maintains a |
| MetaContext for efficient type metadata sharing to support schema evolution. |
| |
| The context stores temporary objects needed during serialization (e.g., class |
| definitions, custom serialization state) and coordinates type definition exchange |
| between serializer and deserializer. |
| |
| Note: |
| This is an internal class used by the Fory serializer. It is not thread-safe |
| and should only be used with a single Fory instance. |
| """ |
| cdef dict objects |
| cdef readonly bint scoped_meta_share_enabled |
| cdef public MetaContext meta_context |
| cdef public object fory |
| |
| def __init__(self, object fory, scoped_meta_share_enabled: bool = False): |
| self.objects = dict() |
| self.scoped_meta_share_enabled = scoped_meta_share_enabled |
| if scoped_meta_share_enabled: |
| self.meta_context = MetaContext(fory) |
| else: |
| self.meta_context = None |
| self.fory = fory |
| |
| cpdef inline add(self, key, obj): |
| self.objects[id(key)] = obj |
| |
| def __contains__(self, key): |
| return id(key) in self.objects |
| |
| def __getitem__(self, key): |
| return self.objects[id(key)] |
| |
| def get(self, key): |
| return self.objects.get(id(key)) |
| |
| cpdef inline reset(self): |
| if len(self.objects) > 0: |
| self.objects.clear() |
| |
| cpdef inline reset_write(self): |
| if len(self.objects) > 0: |
| self.objects.clear() |
| if self.scoped_meta_share_enabled and self.meta_context is not None: |
| self.meta_context.reset_write() |
| |
| cpdef inline reset_read(self): |
| if len(self.objects) > 0: |
| self.objects.clear() |
| if self.scoped_meta_share_enabled and self.meta_context is not None: |
| self.meta_context.reset_read() |
| |
| |
| @cython.final |
| cdef class Fory: |
| """ |
| High-performance cross-language serialization framework. |
| |
| Fory provides blazingly-fast serialization for Python objects with support for |
| both Python-native mode and cross-language mode. It handles complex object graphs, |
| reference tracking, and circular references automatically. |
| |
| In Python-native mode (xlang=False), Fory can serialize all Python objects |
| including dataclasses, classes with custom serialization methods, and local |
| functions/classes, making it a drop-in replacement for pickle. |
| |
| In cross-language mode (xlang=True), Fory serializes objects in a format that |
| can be deserialized by other Fory-supported languages (Java, Go, Rust, C++, etc). |
| |
| Examples: |
| >>> import pyfory |
| >>> from dataclasses import dataclass |
| >>> |
| >>> @dataclass |
| >>> class Person: |
| ... name: str |
| ... age: pyfory.int32 |
| >>> |
| >>> # Python-native mode |
| >>> fory = pyfory.Fory() |
| >>> fory.register(Person) |
| >>> data = fory.serialize(Person("Alice", 30)) |
| >>> person = fory.deserialize(data) |
| >>> |
| >>> # Cross-language mode |
| >>> fory_xlang = pyfory.Fory(xlang=True) |
| >>> fory_xlang.register(Person) |
| >>> data = fory_xlang.serialize(Person("Bob", 25)) |
| |
| See Also: |
| ThreadSafeFory: Thread-safe wrapper for concurrent usage |
| """ |
| cdef readonly object language |
| cdef readonly c_bool ref_tracking |
| cdef readonly c_bool strict |
| cdef readonly c_bool is_py |
| cdef readonly c_bool compatible |
| cdef readonly c_bool field_nullable |
| cdef readonly object policy |
| cdef readonly MapRefResolver ref_resolver |
| cdef readonly TypeResolver type_resolver |
| cdef readonly MetaStringResolver metastring_resolver |
| cdef readonly SerializationContext serialization_context |
| cdef Buffer buffer |
| cdef public object buffer_callback |
| cdef object _buffers # iterator |
| cdef object _unsupported_callback |
| cdef object _unsupported_objects # iterator |
| cdef object _peer_language |
| cdef int32_t max_depth |
| cdef int32_t depth |
| |
| def __init__( |
| self, |
| xlang: bool = False, |
| ref: bool = False, |
| strict: bool = True, |
| policy: DeserializationPolicy = None, |
| compatible: bool = False, |
| max_depth: int = 50, |
| field_nullable: bool = False, |
| meta_compressor=None, |
| **kwargs, |
| ): |
| """ |
| Initialize a Fory serialization instance. |
| |
| Args: |
| xlang: Enable cross-language serialization mode. When False (default), uses |
| Python-native mode supporting all Python objects (dataclasses, __reduce__, |
| local functions/classes). With ref=True and strict=False, serves as a |
| drop-in replacement for pickle. When True, uses cross-language format |
| compatible with other Fory languages (Java, Go, Rust, etc), but Python- |
| specific features like functions and __reduce__ methods are not supported. |
| |
| ref: Enable reference tracking for shared and circular references. When enabled, |
| duplicate objects are stored once and circular references are supported. |
| Disabled by default for better performance. |
| |
| strict: Require type registration before serialization (default: True). When |
| disabled, unknown types can be deserialized, which may be insecure if |
| malicious code exists in __new__/__init__/__eq__/__hash__ methods. |
| **WARNING**: Only disable in trusted environments. When disabling strict |
| mode, you should provide a custom `policy` parameter to control which types |
| are allowed. We are not responsible for security risks when this option |
| is disabled without proper policy controls. |
| |
| compatible: Enable schema evolution for cross-language serialization. When |
| enabled, supports forward/backward compatibility for struct field |
| additions and removals. |
| |
| max_depth: Maximum nesting depth for deserialization (default: 50). Raises |
| an exception if exceeded to prevent malicious deeply-nested data attacks. |
| |
| policy: Custom deserialization policy for security checks. When provided, |
| it controls which types can be deserialized, overriding the default policy. |
| **Strongly recommended** when strict=False to maintain security controls. |
| |
| field_nullable: Treat all dataclass fields as nullable in Python-native mode |
| (xlang=False), regardless of Optional annotation. Ignored in cross-language |
| mode. |
| |
| Example: |
| >>> # Python-native mode with reference tracking |
| >>> fory = Fory(ref=True) |
| >>> |
| >>> # Cross-language mode with schema evolution |
| >>> fory = Fory(xlang=True, compatible=True) |
| """ |
| self.language = Language.XLANG if xlang else Language.PYTHON |
| if kwargs.get("language") is not None: |
| self.language = kwargs.get("language") |
| if kwargs.get("ref_tracking") is not None: |
| ref = kwargs.get("ref_tracking") |
| if kwargs.get("require_type_registration") is not None: |
| strict = kwargs.get("require_type_registration") |
| if _ENABLE_TYPE_REGISTRATION_FORCIBLY or strict: |
| self.strict = True |
| else: |
| self.strict = False |
| self.policy = policy or DEFAULT_POLICY |
| self.compatible = compatible |
| self.ref_tracking = ref |
| self.ref_resolver = MapRefResolver(ref) |
| self.is_py = self.language == Language.PYTHON |
| self.field_nullable = field_nullable if self.is_py else False |
| self.metastring_resolver = MetaStringResolver() |
| self.type_resolver = TypeResolver(self, meta_share=compatible, meta_compressor=meta_compressor) |
| self.serialization_context = SerializationContext(fory=self, scoped_meta_share_enabled=compatible) |
| self.type_resolver.initialize() |
| self.buffer = Buffer.allocate(32) |
| self.buffer_callback = None |
| self._buffers = None |
| self._unsupported_callback = None |
| self._unsupported_objects = None |
| self._peer_language = None |
| self.depth = 0 |
| self.max_depth = max_depth |
| |
| def register_serializer(self, cls: Union[type, TypeVar], Serializer serializer): |
| """ |
| Register a custom serializer for a type. |
| |
| Allows you to provide a custom serializer implementation for a specific type, |
| overriding Fory's default serialization behavior. |
| |
| Args: |
| cls: The Python type to associate with the serializer |
| serializer: Custom serializer instance implementing the Serializer protocol |
| |
| Example: |
| >>> fory = Fory() |
| >>> fory.register_serializer(MyClass, MyCustomSerializer()) |
| """ |
| self.type_resolver.register_serializer(cls, serializer) |
| |
| def register( |
| self, |
| cls: Union[type, TypeVar], |
| *, |
| type_id: int = None, |
| namespace: str = None, |
| typename: str = None, |
| serializer=None, |
| ): |
| """ |
| Register a type for serialization. |
| |
| This is an alias for `register_type()`. Type registration enables Fory to |
| efficiently serialize and deserialize objects by pre-computing serialization |
| metadata. |
| |
| For cross-language serialization, types can be matched between languages using: |
| 1. **type_id** (recommended): Numeric ID matching - faster and more compact |
| 2. **namespace + typename**: String-based matching - more flexible but larger overhead |
| |
| Args: |
| cls: The Python type to register |
| type_id: Optional unique numeric ID for cross-language type matching. |
| Using type_id provides better performance and smaller serialized size |
| compared to namespace/typename matching. |
| namespace: Optional namespace for cross-language type matching by name. |
| Used when type_id is not specified. |
| typename: Optional type name for cross-language type matching by name. |
| Defaults to class name if not specified. Used with namespace. |
| serializer: Optional custom serializer instance for this type |
| |
| Example: |
| >>> # Register with type_id (recommended for performance) |
| >>> fory = Fory(xlang=True) |
| >>> fory.register(Person, type_id=100) |
| >>> |
| >>> # Register with namespace and typename (more flexible) |
| >>> fory.register(Person, namespace="com.example", typename="Person") |
| >>> |
| >>> # Python-native mode (no cross-language matching needed) |
| >>> fory = Fory() |
| >>> fory.register(Person) |
| """ |
| self.type_resolver.register_type( |
| cls, type_id=type_id, namespace=namespace, typename=typename, serializer=serializer) |
| |
| def register_type( |
| self, |
| cls: Union[type, TypeVar], |
| *, |
| type_id: int = None, |
| namespace: str = None, |
| typename: str = None, |
| serializer=None, |
| ): |
| """ |
| Register a type for serialization. |
| |
| Type registration enables Fory to efficiently serialize and deserialize objects |
| by pre-computing serialization metadata. |
| |
| For cross-language serialization, types can be matched between languages using: |
| 1. **type_id** (recommended): Numeric ID matching - faster and more compact |
| 2. **namespace + typename**: String-based matching - more flexible but larger overhead |
| |
| Args: |
| cls: The Python type to register |
| type_id: Optional unique numeric ID for cross-language type matching. |
| Using type_id provides better performance and smaller serialized size |
| compared to namespace/typename matching. |
| namespace: Optional namespace for cross-language type matching by name. |
| Used when type_id is not specified. |
| typename: Optional type name for cross-language type matching by name. |
| Defaults to class name if not specified. Used with namespace. |
| serializer: Optional custom serializer instance for this type |
| |
| Example: |
| >>> # Register with type_id (recommended for performance) |
| >>> fory = Fory(xlang=True) |
| >>> fory.register_type(Person, type_id=100) |
| >>> |
| >>> # Register with namespace and typename (more flexible) |
| >>> fory.register_type(Person, namespace="com.example", typename="Person") |
| >>> |
| >>> # Python-native mode (no cross-language matching needed) |
| >>> fory = Fory() |
| >>> fory.register_type(Person) |
| """ |
| self.type_resolver.register_type( |
| cls, type_id=type_id, namespace=namespace, typename=typename, serializer=serializer) |
| |
| def dumps( |
| self, |
| obj, |
| buffer: Buffer = None, |
| buffer_callback=None, |
| unsupported_callback=None, |
| ) -> Union[Buffer, bytes]: |
| """ |
| Serialize an object to bytes, alias for `serialize` method. |
| """ |
| return self.serialize(obj, buffer, buffer_callback, unsupported_callback) |
| |
| def loads( |
| self, |
| buffer: Union[Buffer, bytes], |
| buffers: Iterable = None, |
| unsupported_objects: Iterable = None, |
| ): |
| """ |
| Deserialize bytes to an object, alias for `deserialize` method. |
| """ |
| return self.deserialize(buffer, buffers, unsupported_objects) |
| |
| def serialize( |
| self, obj, |
| Buffer buffer=None, |
| buffer_callback=None, |
| unsupported_callback=None |
| ) -> Union[Buffer, bytes]: |
| """ |
| Serialize a Python object to bytes. |
| |
| Converts the object into Fory's binary format. The serialization process |
| automatically handles reference tracking (if enabled), type information, |
| and nested objects. |
| |
| Args: |
| obj: The object to serialize |
| buffer: Optional pre-allocated buffer to write to. If None, uses internal buffer |
| buffer_callback: Optional callback for out-of-band buffer serialization |
| unsupported_callback: Optional callback for handling unsupported types |
| |
| Returns: |
| Serialized bytes if buffer is None, otherwise returns the provided buffer |
| |
| Example: |
| >>> fory = Fory() |
| >>> data = fory.serialize({"key": "value", "num": 42}) |
| >>> print(type(data)) |
| <class 'bytes'> |
| """ |
| try: |
| return self._serialize( |
| obj, |
| buffer, |
| buffer_callback=buffer_callback, |
| unsupported_callback=unsupported_callback) |
| finally: |
| self.reset_write() |
| |
| cpdef inline _serialize( |
| self, obj, Buffer buffer, buffer_callback=None, unsupported_callback=None): |
| assert self.depth == 0, "Nested serialization should use write_ref/write_no_ref/xwrite_ref/xwrite_no_ref." |
| self.depth += 1 |
| self.buffer_callback = buffer_callback |
| self._unsupported_callback = unsupported_callback |
| if buffer is None: |
| self.buffer.writer_index = 0 |
| buffer = self.buffer |
| if self.language == Language.XLANG: |
| buffer.write_int16(MAGIC_NUMBER) |
| cdef int32_t mask_index = buffer.writer_index |
| # 1byte used for bit mask |
| buffer.grow(1) |
| buffer.writer_index = mask_index + 1 |
| if obj is None: |
| set_bit(buffer, mask_index, 0) |
| else: |
| clear_bit(buffer, mask_index, 0) |
| # set endian |
| if is_little_endian: |
| set_bit(buffer, mask_index, 1) |
| else: |
| clear_bit(buffer, mask_index, 1) |
| |
| if self.language == Language.XLANG: |
| # set reader as x_lang. |
| set_bit(buffer, mask_index, 2) |
| # set writer language. |
| buffer.write_int8(Language.PYTHON.value) |
| else: |
| # set reader as native. |
| clear_bit(buffer, mask_index, 2) |
| if self.buffer_callback is not None: |
| set_bit(buffer, mask_index, 3) |
| else: |
| clear_bit(buffer, mask_index, 3) |
| # Reserve space for type definitions offset, similar to Java implementation |
| cdef int32_t type_defs_offset_pos = -1 |
| if self.serialization_context.scoped_meta_share_enabled: |
| type_defs_offset_pos = buffer.writer_index |
| buffer.write_int32(-1) # Reserve 4 bytes for type definitions offset |
| |
| cdef int32_t start_offset |
| if self.language == Language.PYTHON: |
| self.write_ref(buffer, obj) |
| else: |
| self.xwrite_ref(buffer, obj) |
| |
| # Write type definitions at the end, similar to Java implementation |
| if self.serialization_context.scoped_meta_share_enabled: |
| meta_context = self.serialization_context.meta_context |
| if meta_context is not None and len(meta_context.get_writing_type_defs()) > 0: |
| # Update the offset to point to current position |
| current_pos = buffer.writer_index |
| buffer.put_int32(type_defs_offset_pos, current_pos - type_defs_offset_pos - 4) |
| self.type_resolver.write_type_defs(buffer) |
| |
| if buffer is not self.buffer: |
| return buffer |
| else: |
| return buffer.to_bytes(0, buffer.writer_index) |
| |
| cpdef inline write_ref( |
| self, Buffer buffer, obj, TypeInfo typeinfo=None): |
| cls = type(obj) |
| if cls is str: |
| buffer.write_int16(NOT_NULL_STRING_FLAG) |
| buffer.write_string(obj) |
| return |
| elif cls is int: |
| buffer.write_int16(NOT_NULL_INT64_FLAG) |
| buffer.write_varint64(obj) |
| return |
| elif cls is bool: |
| buffer.write_int16(NOT_NULL_BOOL_FLAG) |
| buffer.write_bool(obj) |
| return |
| elif cls is float: |
| buffer.write_int16(NOT_NULL_FLOAT64_FLAG) |
| buffer.write_double(obj) |
| return |
| if self.ref_resolver.write_ref_or_null(buffer, obj): |
| return |
| if typeinfo is None: |
| typeinfo = self.type_resolver.get_typeinfo(cls) |
| self.type_resolver.write_typeinfo(buffer, typeinfo) |
| typeinfo.serializer.write(buffer, obj) |
| |
| cpdef inline write_no_ref(self, Buffer buffer, obj): |
| cls = type(obj) |
| if cls is str: |
| buffer.write_varuint32(STRING_TYPE_ID) |
| buffer.write_string(obj) |
| return |
| elif cls is int: |
| buffer.write_varuint32(INT64_TYPE_ID) |
| buffer.write_varint64(obj) |
| return |
| elif cls is bool: |
| buffer.write_varuint32(BOOL_TYPE_ID) |
| buffer.write_bool(obj) |
| return |
| elif cls is float: |
| buffer.write_varuint32(FLOAT64_TYPE_ID) |
| buffer.write_double(obj) |
| return |
| cdef TypeInfo typeinfo = self.type_resolver.get_typeinfo(cls) |
| self.type_resolver.write_typeinfo(buffer, typeinfo) |
| typeinfo.serializer.write(buffer, obj) |
| |
| cpdef inline xwrite_ref( |
| self, Buffer buffer, obj, Serializer serializer=None): |
| if serializer is None or serializer.need_to_write_ref: |
| if not self.ref_resolver.write_ref_or_null(buffer, obj): |
| self.xwrite_no_ref( |
| buffer, obj, serializer=serializer |
| ) |
| else: |
| if obj is None: |
| buffer.write_int8(NULL_FLAG) |
| else: |
| buffer.write_int8(NOT_NULL_VALUE_FLAG) |
| self.xwrite_no_ref( |
| buffer, obj, serializer=serializer |
| ) |
| |
| cpdef inline xwrite_no_ref( |
| self, Buffer buffer, obj, Serializer serializer=None): |
| if serializer is None: |
| typeinfo = self.type_resolver.get_typeinfo(type(obj)) |
| self.type_resolver.write_typeinfo(buffer, typeinfo) |
| serializer = typeinfo.serializer |
| serializer.xwrite(buffer, obj) |
| |
| def deserialize( |
| self, |
| buffer: Union[Buffer, bytes], |
| buffers: Iterable = None, |
| unsupported_objects: Iterable = None, |
| ): |
| """ |
| Deserialize bytes back to a Python object. |
| |
| Reconstructs an object from Fory's binary format. The deserialization process |
| automatically handles reference resolution (if enabled), type instantiation, |
| and nested objects. |
| |
| Args: |
| buffer: Serialized bytes or Buffer to deserialize from |
| buffers: Optional iterable of buffers for out-of-band deserialization |
| unsupported_objects: Optional iterable of objects for unsupported type handling |
| |
| Returns: |
| The deserialized Python object |
| |
| Example: |
| >>> fory = Fory() |
| >>> data = fory.serialize({"key": "value"}) |
| >>> obj = fory.deserialize(data) |
| >>> print(obj) |
| {'key': 'value'} |
| """ |
| try: |
| if type(buffer) == bytes: |
| buffer = Buffer(buffer) |
| return self._deserialize(buffer, buffers, unsupported_objects) |
| finally: |
| self.reset_read() |
| |
| cpdef inline _deserialize( |
| self, Buffer buffer, buffers=None, unsupported_objects=None): |
| assert self.depth == 0, "Nested deserialization should use read_ref/read_no_ref/xread_ref/xread_no_ref." |
| self.depth += 1 |
| if unsupported_objects is not None: |
| self._unsupported_objects = iter(unsupported_objects) |
| if self.language == Language.XLANG: |
| magic_numer = buffer.read_int16() |
| assert magic_numer == MAGIC_NUMBER, ( |
| f"The fory xlang serialization must start with magic number {hex(MAGIC_NUMBER)}. " |
| "Please check whether the serialization is based on the xlang protocol and the " |
| "data didn't corrupt." |
| ) |
| cdef int32_t reader_index = buffer.reader_index |
| buffer.reader_index = reader_index + 1 |
| if get_bit(buffer, reader_index, 0): |
| return None |
| cdef c_bool is_little_endian_ = get_bit(buffer, reader_index, 1) |
| assert is_little_endian_, ( |
| "Big endian is not supported for now, " |
| "please ensure peer machine is little endian." |
| ) |
| cdef c_bool is_target_x_lang = get_bit(buffer, reader_index, 2) |
| if is_target_x_lang: |
| self._peer_language = Language(buffer.read_int8()) |
| else: |
| self._peer_language = Language.PYTHON |
| cdef c_bool is_out_of_band_serialization_enabled = \ |
| get_bit(buffer, reader_index, 3) |
| if is_out_of_band_serialization_enabled: |
| assert buffers is not None, ( |
| "buffers shouldn't be null when the serialized stream is " |
| "produced with buffer_callback not null." |
| ) |
| self._buffers = iter(buffers) |
| else: |
| assert buffers is None, ( |
| "buffers should be null when the serialized stream is " |
| "produced with buffer_callback null." |
| ) |
| |
| # Read type definitions at the start, similar to Java implementation |
| cdef int32_t end_reader_index = -1 |
| if self.serialization_context.scoped_meta_share_enabled: |
| relative_type_defs_offset = buffer.read_int32() |
| if relative_type_defs_offset != -1: |
| # Save current reader position |
| current_reader_index = buffer.reader_index |
| # Jump to type definitions |
| buffer.reader_index = current_reader_index + relative_type_defs_offset |
| # Read type definitions |
| self.type_resolver.read_type_defs(buffer) |
| # Save the end position (after type defs) - this is the true end of serialized data |
| end_reader_index = buffer.reader_index |
| # Jump back to continue with object deserialization |
| buffer.reader_index = current_reader_index |
| |
| if not is_target_x_lang: |
| obj = self.read_ref(buffer) |
| else: |
| obj = self.xread_ref(buffer) |
| |
| # After reading the object, position buffer at the end of serialized data |
| # (which is after the type definitions, not after the object data) |
| if end_reader_index != -1: |
| buffer.reader_index = end_reader_index |
| |
| return obj |
| |
| cpdef inline read_ref(self, Buffer buffer): |
| cdef MapRefResolver ref_resolver = self.ref_resolver |
| cdef int32_t ref_id = ref_resolver.try_preserve_ref_id(buffer) |
| if ref_id < NOT_NULL_VALUE_FLAG: |
| return ref_resolver.get_read_object() |
| # indicates that the object is first read. |
| cdef TypeInfo typeinfo = self.type_resolver.read_typeinfo(buffer) |
| cls = typeinfo.cls |
| if cls is str: |
| return buffer.read_string() |
| elif cls is int: |
| return buffer.read_varint64() |
| elif cls is bool: |
| return buffer.read_bool() |
| elif cls is float: |
| return buffer.read_double() |
| self.inc_depth() |
| o = typeinfo.serializer.read(buffer) |
| self.depth -= 1 |
| ref_resolver.set_read_object(ref_id, o) |
| return o |
| |
| cpdef inline read_no_ref(self, Buffer buffer): |
| """Deserialize not-null and non-reference object from buffer.""" |
| cdef TypeInfo typeinfo = self.type_resolver.read_typeinfo(buffer) |
| cls = typeinfo.cls |
| if cls is str: |
| return buffer.read_string() |
| elif cls is int: |
| return buffer.read_varint64() |
| elif cls is bool: |
| return buffer.read_bool() |
| elif cls is float: |
| return buffer.read_double() |
| self.inc_depth() |
| o = typeinfo.serializer.read(buffer) |
| self.depth -= 1 |
| return o |
| |
| cpdef inline xread_ref(self, Buffer buffer, Serializer serializer=None): |
| cdef MapRefResolver ref_resolver |
| cdef int32_t ref_id |
| if serializer is None or serializer.need_to_write_ref: |
| ref_resolver = self.ref_resolver |
| ref_id = ref_resolver.try_preserve_ref_id(buffer) |
| # indicates that the object is first read. |
| if ref_id >= NOT_NULL_VALUE_FLAG: |
| # Don't push -1 here - try_preserve_ref_id already pushed ref_id |
| o = self._xread_no_ref_internal(buffer, serializer) |
| ref_resolver.set_read_object(ref_id, o) |
| return o |
| else: |
| return ref_resolver.get_read_object() |
| cdef int8_t head_flag = buffer.read_int8() |
| if head_flag == NULL_FLAG: |
| return None |
| return self.xread_no_ref( |
| buffer, serializer=serializer |
| ) |
| |
| cpdef inline xread_no_ref( |
| self, Buffer buffer, Serializer serializer=None): |
| if serializer is None: |
| serializer = self.type_resolver.read_typeinfo(buffer).serializer |
| # Push -1 to read_ref_ids so reference() can pop it and skip reference tracking |
| # This handles the case where xread_no_ref is called directly without xread_ref |
| if self.ref_resolver.ref_tracking: |
| self.ref_resolver.read_ref_ids.push_back(-1) |
| return self._xread_no_ref_internal(buffer, serializer) |
| |
| cdef inline _xread_no_ref_internal( |
| self, Buffer buffer, Serializer serializer): |
| """Internal method to read without pushing to read_ref_ids.""" |
| if serializer is None: |
| serializer = self.type_resolver.read_typeinfo(buffer).serializer |
| self.inc_depth() |
| o = serializer.xread(buffer) |
| self.depth -= 1 |
| return o |
| |
| cpdef inline inc_depth(self): |
| self.depth += 1 |
| if self.depth > self.max_depth: |
| self.throw_depth_limit_exceeded_exception() |
| |
| cpdef inline dec_depth(self): |
| self.depth -= 1 |
| |
| cpdef inline throw_depth_limit_exceeded_exception(self): |
| raise Exception( |
| f"Read depth exceed max depth: {self.depth}, the deserialization data may be malicious. If it's not malicious, " |
| "please increase max read depth by Fory(..., max_depth=...)" |
| ) |
| |
| cpdef inline write_buffer_object(self, Buffer buffer, buffer_object): |
| cdef int32_t size |
| cdef int32_t writer_index |
| cdef Buffer buf |
| if self.buffer_callback is None or self.buffer_callback(buffer_object): |
| buffer.write_bool(True) |
| size = buffer_object.total_bytes() |
| # writer length. |
| buffer.write_varuint32(size) |
| writer_index = buffer.writer_index |
| buffer.ensure(writer_index + size) |
| buf = buffer.slice(buffer.writer_index, size) |
| buffer_object.write_to(buf) |
| buffer.writer_index += size |
| else: |
| buffer.write_bool(False) |
| |
| cpdef inline object read_buffer_object(self, Buffer buffer): |
| cdef c_bool in_band = buffer.read_bool() |
| if not in_band: |
| assert self._buffers is not None |
| return next(self._buffers) |
| cdef int32_t size = buffer.read_varuint32() |
| cdef Buffer buf = buffer.slice(buffer.reader_index, size) |
| buffer.reader_index += size |
| return buf |
| |
| cpdef handle_unsupported_write(self, buffer, obj): |
| if self._unsupported_callback is None or self._unsupported_callback(obj): |
| raise NotImplementedError(f"{type(obj)} is not supported for write") |
| |
| cpdef handle_unsupported_read(self, buffer): |
| assert self._unsupported_objects is not None |
| return next(self._unsupported_objects) |
| |
| cpdef inline write_ref_pyobject( |
| self, Buffer buffer, value, TypeInfo typeinfo=None): |
| if self.ref_resolver.write_ref_or_null(buffer, value): |
| return |
| if typeinfo is None: |
| typeinfo = self.type_resolver.get_typeinfo(type(value)) |
| self.type_resolver.write_typeinfo(buffer, typeinfo) |
| typeinfo.serializer.write(buffer, value) |
| |
| cpdef inline read_ref_pyobject(self, Buffer buffer): |
| cdef MapRefResolver ref_resolver = self.ref_resolver |
| cdef int32_t ref_id = ref_resolver.try_preserve_ref_id(buffer) |
| if ref_id < NOT_NULL_VALUE_FLAG: |
| return ref_resolver.get_read_object() |
| # indicates that the object is first read. |
| cdef TypeInfo typeinfo = self.type_resolver.read_typeinfo(buffer) |
| self.inc_depth() |
| o = typeinfo.serializer.read(buffer) |
| self.depth -= 1 |
| ref_resolver.set_read_object(ref_id, o) |
| return o |
| |
| cpdef inline reset_write(self): |
| """ |
| Reset write state after serialization. |
| |
| Clears internal write buffers, reference tracking state, and type resolution |
| caches. This method is automatically called after each serialization. |
| """ |
| self.depth = 0 |
| self.ref_resolver.reset_write() |
| self.type_resolver.reset_write() |
| self.metastring_resolver.reset_write() |
| self.serialization_context.reset_write() |
| self._unsupported_callback = None |
| |
| cpdef inline reset_read(self): |
| """ |
| Reset read state after deserialization. |
| |
| Clears internal read buffers, reference tracking state, and type resolution |
| caches. This method is automatically called after each deserialization. |
| """ |
| self.depth = 0 |
| self.ref_resolver.reset_read() |
| self.type_resolver.reset_read() |
| self.metastring_resolver.reset_read() |
| self.serialization_context.reset_read() |
| self._buffers = None |
| self._unsupported_objects = None |
| |
| cpdef inline reset(self): |
| """ |
| Reset both write and read state. |
| |
| Clears all internal state including buffers, reference tracking, and type |
| resolution caches. Use this to ensure a clean state before reusing a Fory |
| instance. |
| """ |
| self.reset_write() |
| self.reset_read() |
| |
| cpdef inline write_nullable_pybool(Buffer buffer, value): |
| if value is None: |
| buffer.write_int8(NULL_FLAG) |
| else: |
| buffer.write_int8(NOT_NULL_VALUE_FLAG) |
| buffer.write_bool(value) |
| |
| cpdef inline write_nullable_pyint64(Buffer buffer, value): |
| if value is None: |
| buffer.write_int8(NULL_FLAG) |
| else: |
| buffer.write_int8(NOT_NULL_VALUE_FLAG) |
| buffer.write_varint64(value) |
| |
| cpdef inline write_nullable_pyfloat64(Buffer buffer, value): |
| if value is None: |
| buffer.write_int8(NULL_FLAG) |
| else: |
| buffer.write_int8(NOT_NULL_VALUE_FLAG) |
| buffer.write_double(value) |
| |
| cpdef inline write_nullable_pystr(Buffer buffer, value): |
| if value is None: |
| buffer.write_int8(NULL_FLAG) |
| else: |
| buffer.write_int8(NOT_NULL_VALUE_FLAG) |
| buffer.write_string(value) |
| |
| cpdef inline read_nullable_pybool(Buffer buffer): |
| if buffer.read_int8() == NOT_NULL_VALUE_FLAG: |
| return buffer.read_bool() |
| else: |
| return None |
| |
| cpdef inline read_nullable_pyint64(Buffer buffer): |
| if buffer.read_int8() == NOT_NULL_VALUE_FLAG: |
| return buffer.read_varint64() |
| else: |
| return None |
| |
| cpdef inline read_nullable_pyfloat64(Buffer buffer): |
| if buffer.read_int8() == NOT_NULL_VALUE_FLAG: |
| return buffer.read_double() |
| else: |
| return None |
| |
| cpdef inline read_nullable_pystr(Buffer buffer): |
| if buffer.read_int8() == NOT_NULL_VALUE_FLAG: |
| return buffer.read_string() |
| else: |
| return None |
| |
| |
| cdef class Serializer: |
| """ |
| Base class for type-specific serializers. |
| |
| Serializer defines the interface for serializing and deserializing objects of a |
| specific type. Each serializer implements two modes: |
| |
| - Python-native mode (write/read): Optimized for Python-to-Python serialization, |
| supporting all Python-specific features like __reduce__, local functions, etc. |
| |
| - Cross-language mode (xwrite/xread): Serializes to a cross-language format |
| compatible with other Fory implementations (Java, Go, Rust, C++, etc). |
| |
| Custom serializers can be registered for user-defined types using |
| Fory.register_serializer() to override default serialization behavior. |
| |
| Attributes: |
| fory: The Fory instance this serializer belongs to |
| type_: The Python type this serializer handles |
| need_to_write_ref: Whether reference tracking is needed for this type |
| |
| Note: |
| This is a base class for implementing custom serializers. Subclasses must |
| implement write(), read(), xwrite(), and xread() methods. |
| """ |
| cdef readonly Fory fory |
| cdef readonly object type_ |
| cdef public c_bool need_to_write_ref |
| |
| def __init__(self, fory, type_: Union[type, TypeVar]): |
| self.fory = fory |
| self.type_ = type_ |
| self.need_to_write_ref = fory.ref_tracking and not is_primitive_type(type_) |
| |
| cpdef write(self, Buffer buffer, value): |
| raise NotImplementedError(f"write method not implemented in {type(self)}") |
| |
| cpdef read(self, Buffer buffer): |
| raise NotImplementedError(f"read method not implemented in {type(self)}") |
| |
| cpdef xwrite(self, Buffer buffer, value): |
| raise NotImplementedError(f"xwrite method not implemented in {type(self)}") |
| |
| cpdef xread(self, Buffer buffer): |
| raise NotImplementedError(f"xread method not implemented in {type(self)}") |
| |
| @classmethod |
| def support_subclass(cls) -> bool: |
| return False |
| |
| cdef class XlangCompatibleSerializer(Serializer): |
| cpdef xwrite(self, Buffer buffer, value): |
| self.write(buffer, value) |
| |
| cpdef xread(self, Buffer buffer): |
| return self.read(buffer) |
| |
| |
| @cython.final |
| cdef class EnumSerializer(Serializer): |
| @classmethod |
| def support_subclass(cls) -> bool: |
| return True |
| |
| cpdef inline write(self, Buffer buffer, value): |
| buffer.write_string(value.name) |
| |
| cpdef inline read(self, Buffer buffer): |
| name = buffer.read_string() |
| return getattr(self.type_, name) |
| |
| cpdef inline xwrite(self, Buffer buffer, value): |
| buffer.write_varuint32(value.value) |
| |
| cpdef inline xread(self, Buffer buffer): |
| ordinal = buffer.read_varuint32() |
| return self.type_(ordinal) |
| |
| |
| @cython.final |
| cdef class SliceSerializer(Serializer): |
| cpdef inline write(self, Buffer buffer, v): |
| cdef slice value = v |
| start, stop, step = value.start, value.stop, value.step |
| if type(start) is int: |
| # TODO support varint128 |
| buffer.write_int16(NOT_NULL_INT64_FLAG) |
| buffer.write_varint64(start) |
| else: |
| if start is None: |
| buffer.write_int8(NULL_FLAG) |
| else: |
| buffer.write_int8(NOT_NULL_VALUE_FLAG) |
| self.fory.write_no_ref(buffer, start) |
| if type(stop) is int: |
| # TODO support varint128 |
| buffer.write_int16(NOT_NULL_INT64_FLAG) |
| buffer.write_varint64(stop) |
| else: |
| if stop is None: |
| buffer.write_int8(NULL_FLAG) |
| else: |
| buffer.write_int8(NOT_NULL_VALUE_FLAG) |
| self.fory.write_no_ref(buffer, stop) |
| if type(step) is int: |
| # TODO support varint128 |
| buffer.write_int16(NOT_NULL_INT64_FLAG) |
| buffer.write_varint64(step) |
| else: |
| if step is None: |
| buffer.write_int8(NULL_FLAG) |
| else: |
| buffer.write_int8(NOT_NULL_VALUE_FLAG) |
| self.fory.write_no_ref(buffer, step) |
| |
| cpdef inline read(self, Buffer buffer): |
| if buffer.read_int8() == NULL_FLAG: |
| start = None |
| else: |
| start = self.fory.read_no_ref(buffer) |
| if buffer.read_int8() == NULL_FLAG: |
| stop = None |
| else: |
| stop = self.fory.read_no_ref(buffer) |
| if buffer.read_int8() == NULL_FLAG: |
| step = None |
| else: |
| step = self.fory.read_no_ref(buffer) |
| return slice(start, stop, step) |
| |
| cpdef xwrite(self, Buffer buffer, value): |
| raise NotImplementedError |
| |
| cpdef xread(self, Buffer buffer): |
| raise NotImplementedError |
| |
| |
| include "primitive.pxi" |
| include "collection.pxi" |