| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import enum |
| import logging |
| import os |
| from abc import ABC, abstractmethod |
| from typing import Union, Iterable, TypeVar |
| |
| from pyfory.buffer import Buffer |
| from pyfory.resolver import ( |
| MapRefResolver, |
| NoRefResolver, |
| NULL_FLAG, |
| NOT_NULL_VALUE_FLAG, |
| ) |
| from pyfory.util import is_little_endian, set_bit, get_bit, clear_bit |
| from pyfory.type import TypeId |
| from pyfory.policy import DeserializationPolicy, DEFAULT_POLICY |
| |
| try: |
| import numpy as np |
| except ImportError: |
| np = None |
| |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| MAGIC_NUMBER = 0x62D4 |
| DEFAULT_DYNAMIC_WRITE_META_STR_ID = -1 |
| DYNAMIC_TYPE_ID = -1 |
| USE_TYPE_NAME = 0 |
| USE_TYPE_ID = 1 |
| # preserve 0 as flag for type id not set in TypeInfo` |
| NO_TYPE_ID = 0 |
| INT64_TYPE_ID = TypeId.INT64 |
| FLOAT64_TYPE_ID = TypeId.FLOAT64 |
| BOOL_TYPE_ID = TypeId.BOOL |
| STRING_TYPE_ID = TypeId.STRING |
| # `NOT_NULL_VALUE_FLAG` + `TYPE_ID << 1` in little-endian order |
| NOT_NULL_INT64_FLAG = NOT_NULL_VALUE_FLAG & 0b11111111 | (INT64_TYPE_ID << 8) |
| NOT_NULL_FLOAT64_FLAG = NOT_NULL_VALUE_FLAG & 0b11111111 | (FLOAT64_TYPE_ID << 8) |
| NOT_NULL_BOOL_FLAG = NOT_NULL_VALUE_FLAG & 0b11111111 | (BOOL_TYPE_ID << 8) |
| NOT_NULL_STRING_FLAG = NOT_NULL_VALUE_FLAG & 0b11111111 | (STRING_TYPE_ID << 8) |
| SMALL_STRING_THRESHOLD = 16 |
| |
| |
| class Language(enum.Enum): |
| XLANG = 0 |
| JAVA = 1 |
| PYTHON = 2 |
| CPP = 3 |
| GO = 4 |
| JAVA_SCRIPT = 5 |
| RUST = 6 |
| DART = 7 |
| |
| |
| class BufferObject(ABC): |
| """ |
| Fory binary representation of an object. |
| Note: This class is used for zero-copy out-of-band serialization and shouldn't |
| be used for any other cases. |
| """ |
| |
| @abstractmethod |
| def total_bytes(self) -> int: |
| """Total size for serialized bytes of an object.""" |
| |
| @abstractmethod |
| def write_to(self, stream): |
| """ |
| Write serialized object to a writable stream. |
| |
| Args: |
| stream: Any writable object with write() method (Buffer, file, BytesIO, etc.) |
| """ |
| |
| @abstractmethod |
| def getbuffer(self) -> memoryview: |
| """ |
| Return serialized data as memoryview for zero-copy access. |
| |
| Returns: |
| memoryview: A memoryview of the serialized data. For objects with |
| contiguous memory (bytes, bytearray, numpy arrays), this is zero-copy. |
| For non-contiguous data, a copy may be created to ensure contiguity. |
| """ |
| |
| def raw(self): |
| return memoryview(self.getbuffer()) |
| |
| |
| class Fory: |
| """ |
| High-performance cross-language serialization framework. |
| |
| Fory provides blazingly-fast serialization for Python objects with support for |
| both Python-native mode and cross-language mode. It handles complex object graphs, |
| reference tracking, and circular references automatically. |
| |
| In Python-native mode (xlang=False), Fory can serialize all Python objects |
| including dataclasses, classes with custom serialization methods, and local |
| functions/classes, making it a drop-in replacement for pickle. |
| |
| In cross-language mode (xlang=True), Fory serializes objects in a format that |
| can be deserialized by other Fory-supported languages (Java, Go, Rust, C++, etc). |
| |
| Examples: |
| >>> import pyfory |
| >>> from dataclasses import dataclass |
| >>> |
| >>> @dataclass |
| >>> class Person: |
| ... name: str |
| ... age: pyfory.int32 |
| >>> |
| >>> # Python-native mode |
| >>> fory = pyfory.Fory() |
| >>> fory.register(Person) |
| >>> data = fory.serialize(Person("Alice", 30)) |
| >>> person = fory.deserialize(data) |
| >>> |
| >>> # Cross-language mode |
| >>> fory_xlang = pyfory.Fory(xlang=True) |
| >>> fory_xlang.register(Person) |
| >>> data = fory_xlang.serialize(Person("Bob", 25)) |
| |
| See Also: |
| ThreadSafeFory: Thread-safe wrapper for concurrent usage |
| """ |
| |
| __slots__ = ( |
| "language", |
| "is_py", |
| "compatible", |
| "ref_tracking", |
| "ref_resolver", |
| "type_resolver", |
| "serialization_context", |
| "strict", |
| "buffer", |
| "_buffer_callback", |
| "_buffers", |
| "metastring_resolver", |
| "_unsupported_callback", |
| "_unsupported_objects", |
| "_peer_language", |
| "max_depth", |
| "depth", |
| "field_nullable", |
| "policy", |
| ) |
| |
| def __init__( |
| self, |
| xlang: bool = False, |
| ref: bool = False, |
| strict: bool = True, |
| compatible: bool = False, |
| max_depth: int = 50, |
| policy: DeserializationPolicy = None, |
| field_nullable: bool = False, |
| meta_compressor=None, |
| **kwargs, |
| ): |
| """ |
| Initialize a Fory serialization instance. |
| |
| Args: |
| xlang: Enable cross-language serialization mode. When False (default), uses |
| Python-native mode supporting all Python objects (dataclasses, __reduce__, |
| local functions/classes). With ref=True and strict=False, serves as a |
| drop-in replacement for pickle. When True, uses cross-language format |
| compatible with other Fory languages (Java, Go, Rust, etc), but Python- |
| specific features like functions and __reduce__ methods are not supported. |
| |
| ref: Enable reference tracking for shared and circular references. When enabled, |
| duplicate objects are stored once and circular references are supported. |
| Disabled by default for better performance. |
| |
| strict: Require type registration before serialization (default: True). When |
| disabled, unknown types can be deserialized, which may be insecure if |
| malicious code exists in __new__/__init__/__eq__/__hash__ methods. |
| **WARNING**: Only disable in trusted environments. When disabling strict |
| mode, you should provide a custom `policy` parameter to control which types |
| are allowed. We are not responsible for security risks when this option |
| is disabled without proper policy controls. |
| |
| compatible: Enable schema evolution for cross-language serialization. When |
| enabled, supports forward/backward compatibility for dataclass field |
| additions and removals. |
| |
| max_depth: Maximum nesting depth for deserialization (default: 50). Raises |
| an exception if exceeded to prevent malicious deeply-nested data attacks. |
| |
| policy: Custom deserialization policy for security checks. When provided, |
| it controls which types can be deserialized, overriding the default policy. |
| **Strongly recommended** when strict=False to maintain security controls. |
| |
| field_nullable: Treat all dataclass fields as nullable in Python-native mode |
| (xlang=False), regardless of Optional annotation. Ignored in cross-language |
| mode. |
| |
| Example: |
| >>> # Python-native mode with reference tracking |
| >>> fory = Fory(ref=True) |
| >>> |
| >>> # Cross-language mode with schema evolution |
| >>> fory = Fory(xlang=True, compatible=True) |
| """ |
| self.language = Language.XLANG if xlang else Language.PYTHON |
| if kwargs.get("language") is not None: |
| self.language = kwargs.get("language") |
| self.is_py = self.language == Language.PYTHON |
| if kwargs.get("ref_tracking") is not None: |
| ref = kwargs.get("ref_tracking") |
| self.ref_tracking = ref |
| if self.ref_tracking: |
| self.ref_resolver = MapRefResolver() |
| else: |
| self.ref_resolver = NoRefResolver() |
| if kwargs.get("require_type_registration") is not None: |
| strict = kwargs.get("require_type_registration") |
| self.strict = _ENABLE_TYPE_REGISTRATION_FORCIBLY or strict |
| self.policy = policy or DEFAULT_POLICY |
| self.compatible = compatible |
| self.field_nullable = field_nullable if self.is_py else False |
| from pyfory.serialization import MetaStringResolver, SerializationContext |
| from pyfory._registry import TypeResolver |
| |
| self.metastring_resolver = MetaStringResolver() |
| self.type_resolver = TypeResolver(self, meta_share=compatible, meta_compressor=meta_compressor) |
| self.serialization_context = SerializationContext(fory=self, scoped_meta_share_enabled=compatible) |
| self.type_resolver.initialize() |
| |
| self.buffer = Buffer.allocate(32) |
| self._buffer_callback = None |
| self._buffers = None |
| self._unsupported_callback = None |
| self._unsupported_objects = None |
| self._peer_language = None |
| self.max_depth = max_depth |
| self.depth = 0 |
| |
| def register( |
| self, |
| cls: Union[type, TypeVar], |
| *, |
| type_id: int = None, |
| namespace: str = None, |
| typename: str = None, |
| serializer=None, |
| ): |
| """ |
| Register a type for serialization. |
| |
| This is an alias for `register_type()`. Type registration enables Fory to |
| efficiently serialize and deserialize objects by pre-computing serialization |
| metadata. |
| |
| For cross-language serialization, types can be matched between languages using: |
| 1. **type_id** (recommended): Numeric ID matching - faster and more compact |
| 2. **namespace + typename**: String-based matching - more flexible but larger overhead |
| |
| Args: |
| cls: The Python type to register |
| type_id: Optional unique numeric ID for cross-language type matching. |
| Using type_id provides better performance and smaller serialized size |
| compared to namespace/typename matching. |
| namespace: Optional namespace for cross-language type matching by name. |
| Used when type_id is not specified. |
| typename: Optional type name for cross-language type matching by name. |
| Defaults to class name if not specified. Used with namespace. |
| serializer: Optional custom serializer instance for this type |
| |
| Example: |
| >>> # Register with type_id (recommended for performance) |
| >>> fory = Fory(xlang=True) |
| >>> fory.register(Person, type_id=100) |
| >>> |
| >>> # Register with namespace and typename (more flexible) |
| >>> fory.register(Person, namespace="com.example", typename="Person") |
| >>> |
| >>> # Python-native mode (no cross-language matching needed) |
| >>> fory = Fory() |
| >>> fory.register(Person) |
| """ |
| self.register_type( |
| cls, |
| type_id=type_id, |
| namespace=namespace, |
| typename=typename, |
| serializer=serializer, |
| ) |
| |
| # `Union[type, TypeVar]` is not supported in py3.6 |
| def register_type( |
| self, |
| cls: Union[type, TypeVar], |
| *, |
| type_id: int = None, |
| namespace: str = None, |
| typename: str = None, |
| serializer=None, |
| ): |
| """ |
| Register a type for serialization. |
| |
| Type registration enables Fory to efficiently serialize and deserialize objects |
| by pre-computing serialization metadata. |
| |
| For cross-language serialization, types can be matched between languages using: |
| 1. **type_id** (recommended): Numeric ID matching - faster and more compact |
| 2. **namespace + typename**: String-based matching - more flexible but larger overhead |
| |
| Args: |
| cls: The Python type to register |
| type_id: Optional unique numeric ID for cross-language type matching. |
| Using type_id provides better performance and smaller serialized size |
| compared to namespace/typename matching. |
| namespace: Optional namespace for cross-language type matching by name. |
| Used when type_id is not specified. |
| typename: Optional type name for cross-language type matching by name. |
| Defaults to class name if not specified. Used with namespace. |
| serializer: Optional custom serializer instance for this type |
| |
| Example: |
| >>> # Register with type_id (recommended for performance) |
| >>> fory = Fory(xlang=True) |
| >>> fory.register_type(Person, type_id=100) |
| >>> |
| >>> # Register with namespace and typename (more flexible) |
| >>> fory.register_type(Person, namespace="com.example", typename="Person") |
| >>> |
| >>> # Python-native mode (no cross-language matching needed) |
| >>> fory = Fory() |
| >>> fory.register_type(Person) |
| """ |
| return self.type_resolver.register_type( |
| cls, |
| type_id=type_id, |
| namespace=namespace, |
| typename=typename, |
| serializer=serializer, |
| ) |
| |
| def register_serializer(self, cls: type, serializer): |
| """ |
| Register a custom serializer for a type. |
| |
| Allows you to provide a custom serializer implementation for a specific type, |
| overriding Fory's default serialization behavior. |
| |
| Args: |
| cls: The Python type to associate with the serializer |
| serializer: Custom serializer instance implementing the Serializer protocol |
| |
| Example: |
| >>> fory = Fory() |
| >>> fory.register_serializer(MyClass, MyCustomSerializer()) |
| """ |
| self.type_resolver.register_serializer(cls, serializer) |
| |
| def dumps( |
| self, |
| obj, |
| buffer: Buffer = None, |
| buffer_callback=None, |
| unsupported_callback=None, |
| ) -> Union[Buffer, bytes]: |
| """ |
| Serialize an object to bytes, alias for `serialize` method. |
| """ |
| return self.serialize(obj, buffer, buffer_callback, unsupported_callback) |
| |
| def loads( |
| self, |
| buffer: Union[Buffer, bytes], |
| buffers: Iterable = None, |
| unsupported_objects: Iterable = None, |
| ): |
| """ |
| Deserialize bytes to an object, alias for `deserialize` method. |
| """ |
| return self.deserialize(buffer, buffers, unsupported_objects) |
| |
| def serialize( |
| self, |
| obj, |
| buffer: Buffer = None, |
| buffer_callback=None, |
| unsupported_callback=None, |
| ) -> Union[Buffer, bytes]: |
| """ |
| Serialize a Python object to bytes. |
| |
| Converts the object into Fory's binary format. The serialization process |
| automatically handles reference tracking (if enabled), type information, |
| and nested objects. |
| |
| Args: |
| obj: The object to serialize |
| buffer: Optional pre-allocated buffer to write to. If None, uses internal buffer |
| buffer_callback: Optional callback for out-of-band buffer serialization |
| unsupported_callback: Optional callback for handling unsupported types |
| |
| Returns: |
| Serialized bytes if buffer is None, otherwise returns the provided buffer |
| |
| Example: |
| >>> fory = Fory() |
| >>> data = fory.serialize({"key": "value", "num": 42}) |
| >>> print(type(data)) |
| <class 'bytes'> |
| """ |
| try: |
| return self._serialize( |
| obj, |
| buffer, |
| buffer_callback=buffer_callback, |
| unsupported_callback=unsupported_callback, |
| ) |
| finally: |
| self.reset_write() |
| |
| def _serialize( |
| self, |
| obj, |
| buffer: Buffer = None, |
| buffer_callback=None, |
| unsupported_callback=None, |
| ) -> Union[Buffer, bytes]: |
| assert self.depth == 0, "Nested serialization should use write_ref/write_no_ref/xwrite_ref/xwrite_no_ref." |
| self.depth += 1 |
| self._buffer_callback = buffer_callback |
| self._unsupported_callback = unsupported_callback |
| if buffer is None: |
| self.buffer.writer_index = 0 |
| buffer = self.buffer |
| if self.language == Language.XLANG: |
| buffer.write_int16(MAGIC_NUMBER) |
| mask_index = buffer.writer_index |
| # 1byte used for bit mask |
| buffer.grow(1) |
| buffer.writer_index = mask_index + 1 |
| if obj is None: |
| set_bit(buffer, mask_index, 0) |
| else: |
| clear_bit(buffer, mask_index, 0) |
| # set endian |
| if is_little_endian: |
| set_bit(buffer, mask_index, 1) |
| else: |
| clear_bit(buffer, mask_index, 1) |
| |
| if self.language == Language.XLANG: |
| # set reader as x_lang. |
| set_bit(buffer, mask_index, 2) |
| # set writer language. |
| buffer.write_int8(Language.PYTHON.value) |
| else: |
| # set reader as native. |
| clear_bit(buffer, mask_index, 2) |
| if self._buffer_callback is not None: |
| set_bit(buffer, mask_index, 3) |
| else: |
| clear_bit(buffer, mask_index, 3) |
| # Reserve space for type definitions offset, similar to Java implementation |
| type_defs_offset_pos = None |
| if self.serialization_context.scoped_meta_share_enabled: |
| type_defs_offset_pos = buffer.writer_index |
| buffer.write_int32(-1) # Reserve 4 bytes for type definitions offset |
| |
| if self.language == Language.PYTHON: |
| self.write_ref(buffer, obj) |
| else: |
| self.xwrite_ref(buffer, obj) |
| |
| # Write type definitions at the end, similar to Java implementation |
| if self.serialization_context.scoped_meta_share_enabled: |
| meta_context = self.serialization_context.meta_context |
| if meta_context is not None and len(meta_context.get_writing_type_defs()) > 0: |
| # Update the offset to point to current position |
| current_pos = buffer.writer_index |
| buffer.put_int32(type_defs_offset_pos, current_pos - type_defs_offset_pos - 4) |
| self.type_resolver.write_type_defs(buffer) |
| if buffer is not self.buffer: |
| return buffer |
| else: |
| return buffer.to_bytes(0, buffer.writer_index) |
| |
| def write_ref(self, buffer, obj, typeinfo=None): |
| cls = type(obj) |
| if cls is str: |
| buffer.write_int16(NOT_NULL_STRING_FLAG) |
| buffer.write_string(obj) |
| return |
| elif cls is int: |
| buffer.write_int16(NOT_NULL_INT64_FLAG) |
| buffer.write_varint64(obj) |
| return |
| elif cls is bool: |
| buffer.write_int16(NOT_NULL_BOOL_FLAG) |
| buffer.write_bool(obj) |
| return |
| if self.ref_resolver.write_ref_or_null(buffer, obj): |
| return |
| if typeinfo is None: |
| typeinfo = self.type_resolver.get_typeinfo(cls) |
| self.type_resolver.write_typeinfo(buffer, typeinfo) |
| typeinfo.serializer.write(buffer, obj) |
| |
| def write_no_ref(self, buffer, obj): |
| cls = type(obj) |
| if cls is str: |
| buffer.write_varuint32(STRING_TYPE_ID) |
| buffer.write_string(obj) |
| return |
| elif cls is int: |
| buffer.write_varuint32(INT64_TYPE_ID) |
| buffer.write_varint64(obj) |
| return |
| elif cls is bool: |
| buffer.write_varuint32(BOOL_TYPE_ID) |
| buffer.write_bool(obj) |
| return |
| else: |
| typeinfo = self.type_resolver.get_typeinfo(cls) |
| self.type_resolver.write_typeinfo(buffer, typeinfo) |
| typeinfo.serializer.write(buffer, obj) |
| |
| def xwrite_ref(self, buffer, obj, serializer=None): |
| if serializer is None or serializer.need_to_write_ref: |
| if not self.ref_resolver.write_ref_or_null(buffer, obj): |
| self.xwrite_no_ref(buffer, obj, serializer=serializer) |
| else: |
| if obj is None: |
| buffer.write_int8(NULL_FLAG) |
| else: |
| buffer.write_int8(NOT_NULL_VALUE_FLAG) |
| self.xwrite_no_ref(buffer, obj, serializer=serializer) |
| |
| def xwrite_no_ref(self, buffer, obj, serializer=None): |
| if serializer is not None: |
| serializer.xwrite(buffer, obj) |
| return |
| cls = type(obj) |
| typeinfo = self.type_resolver.get_typeinfo(cls) |
| self.type_resolver.write_typeinfo(buffer, typeinfo) |
| typeinfo.serializer.xwrite(buffer, obj) |
| |
| def deserialize( |
| self, |
| buffer: Union[Buffer, bytes], |
| buffers: Iterable = None, |
| unsupported_objects: Iterable = None, |
| ): |
| """ |
| Deserialize bytes back to a Python object. |
| |
| Reconstructs an object from Fory's binary format. The deserialization process |
| automatically handles reference resolution (if enabled), type instantiation, |
| and nested objects. |
| |
| Args: |
| buffer: Serialized bytes or Buffer to deserialize from |
| buffers: Optional iterable of buffers for out-of-band deserialization |
| unsupported_objects: Optional iterable of objects for unsupported type handling |
| |
| Returns: |
| The deserialized Python object |
| |
| Example: |
| >>> fory = Fory() |
| >>> data = fory.serialize({"key": "value"}) |
| >>> obj = fory.deserialize(data) |
| >>> print(obj) |
| {'key': 'value'} |
| """ |
| try: |
| return self._deserialize(buffer, buffers, unsupported_objects) |
| finally: |
| self.reset_read() |
| |
| def _deserialize( |
| self, |
| buffer: Union[Buffer, bytes], |
| buffers: Iterable = None, |
| unsupported_objects: Iterable = None, |
| ): |
| assert self.depth == 0, "Nested deserialization should use read_ref/read_no_ref/xread_ref/xread_no_ref." |
| self.depth += 1 |
| if isinstance(buffer, bytes): |
| buffer = Buffer(buffer) |
| if unsupported_objects is not None: |
| self._unsupported_objects = iter(unsupported_objects) |
| if self.language == Language.XLANG: |
| magic_numer = buffer.read_int16() |
| assert magic_numer == MAGIC_NUMBER, ( |
| f"The fory xlang serialization must start with magic number {hex(MAGIC_NUMBER)}. " |
| "Please check whether the serialization is based on the xlang protocol and the data didn't corrupt." |
| ) |
| reader_index = buffer.reader_index |
| buffer.reader_index = reader_index + 1 |
| if get_bit(buffer, reader_index, 0): |
| return None |
| is_little_endian_ = get_bit(buffer, reader_index, 1) |
| assert is_little_endian_, "Big endian is not supported for now, please ensure peer machine is little endian." |
| is_target_x_lang = get_bit(buffer, reader_index, 2) |
| if is_target_x_lang: |
| self._peer_language = Language(buffer.read_int8()) |
| else: |
| self._peer_language = Language.PYTHON |
| is_out_of_band_serialization_enabled = get_bit(buffer, reader_index, 3) |
| if is_out_of_band_serialization_enabled: |
| assert buffers is not None, "buffers shouldn't be null when the serialized stream is produced with buffer_callback not null." |
| self._buffers = iter(buffers) |
| else: |
| assert buffers is None, "buffers should be null when the serialized stream is produced with buffer_callback null." |
| |
| # Read type definitions at the start, similar to Java implementation |
| end_reader_index = None |
| if self.serialization_context.scoped_meta_share_enabled: |
| relative_type_defs_offset = buffer.read_int32() |
| if relative_type_defs_offset != -1: |
| # Save current reader position |
| current_reader_index = buffer.reader_index |
| # Jump to type definitions |
| buffer.reader_index = current_reader_index + relative_type_defs_offset |
| # Read type definitions |
| self.type_resolver.read_type_defs(buffer) |
| # Save the end position (after type defs) - this is the true end of serialized data |
| end_reader_index = buffer.reader_index |
| # Jump back to continue with object deserialization |
| buffer.reader_index = current_reader_index |
| |
| if is_target_x_lang: |
| obj = self.xread_ref(buffer) |
| else: |
| obj = self.read_ref(buffer) |
| |
| # After reading the object, position buffer at the end of serialized data |
| # (which is after the type definitions, not after the object data) |
| if end_reader_index is not None: |
| buffer.reader_index = end_reader_index |
| |
| return obj |
| |
| def read_ref(self, buffer): |
| ref_resolver = self.ref_resolver |
| ref_id = ref_resolver.try_preserve_ref_id(buffer) |
| # indicates that the object is first read. |
| if ref_id >= NOT_NULL_VALUE_FLAG: |
| typeinfo = self.type_resolver.read_typeinfo(buffer) |
| self.inc_depth() |
| o = typeinfo.serializer.read(buffer) |
| self.dec_depth() |
| ref_resolver.set_read_object(ref_id, o) |
| return o |
| else: |
| return ref_resolver.get_read_object() |
| |
| def read_no_ref(self, buffer): |
| """Deserialize not-null and non-reference object from buffer.""" |
| typeinfo = self.type_resolver.read_typeinfo(buffer) |
| self.inc_depth() |
| o = typeinfo.serializer.read(buffer) |
| self.dec_depth() |
| return o |
| |
| def xread_ref(self, buffer, serializer=None): |
| if serializer is None or serializer.need_to_write_ref: |
| ref_resolver = self.ref_resolver |
| ref_id = ref_resolver.try_preserve_ref_id(buffer) |
| # indicates that the object is first read. |
| if ref_id >= NOT_NULL_VALUE_FLAG: |
| # Don't push -1 here - try_preserve_ref_id already pushed ref_id |
| o = self._xread_no_ref_internal(buffer, serializer) |
| ref_resolver.set_read_object(ref_id, o) |
| return o |
| else: |
| return ref_resolver.get_read_object() |
| head_flag = buffer.read_int8() |
| if head_flag == NULL_FLAG: |
| return None |
| return self.xread_no_ref(buffer, serializer=serializer) |
| |
| def xread_no_ref(self, buffer, serializer=None): |
| if serializer is None: |
| serializer = self.type_resolver.read_typeinfo(buffer).serializer |
| # Push -1 to read_ref_ids so reference() can pop it and skip reference tracking |
| # This handles the case where xread_no_ref is called directly without xread_ref |
| if self.ref_tracking: |
| self.ref_resolver.read_ref_ids.append(-1) |
| return self._xread_no_ref_internal(buffer, serializer) |
| |
| def _xread_no_ref_internal(self, buffer, serializer): |
| """Internal method to read without pushing to read_ref_ids.""" |
| if serializer is None: |
| serializer = self.type_resolver.read_typeinfo(buffer).serializer |
| self.inc_depth() |
| o = serializer.xread(buffer) |
| self.dec_depth() |
| return o |
| |
| def write_buffer_object(self, buffer, buffer_object: BufferObject): |
| if self._buffer_callback is None or self._buffer_callback(buffer_object): |
| buffer.write_bool(True) |
| size = buffer_object.total_bytes() |
| # writer length. |
| buffer.write_varuint32(size) |
| writer_index = buffer.writer_index |
| buffer.ensure(writer_index + size) |
| buf = buffer.slice(buffer.writer_index, size) |
| buffer_object.write_to(buf) |
| buffer.writer_index += size |
| else: |
| buffer.write_bool(False) |
| |
| def read_buffer_object(self, buffer) -> Buffer: |
| in_band = buffer.read_bool() |
| if in_band: |
| size = buffer.read_varuint32() |
| buf = buffer.slice(buffer.reader_index, size) |
| buffer.reader_index += size |
| return buf |
| else: |
| assert self._buffers is not None |
| return next(self._buffers) |
| |
| def handle_unsupported_write(self, buffer, obj): |
| if self._unsupported_callback is None or self._unsupported_callback(obj): |
| raise NotImplementedError(f"{type(obj)} is not supported for write") |
| |
| def handle_unsupported_read(self, buffer): |
| assert self._unsupported_objects is not None |
| return next(self._unsupported_objects) |
| |
| def write_ref_pyobject(self, buffer, value, typeinfo=None): |
| if self.ref_resolver.write_ref_or_null(buffer, value): |
| return |
| if typeinfo is None: |
| typeinfo = self.type_resolver.get_typeinfo(type(value)) |
| self.type_resolver.write_typeinfo(buffer, typeinfo) |
| typeinfo.serializer.write(buffer, value) |
| |
| def read_ref_pyobject(self, buffer): |
| return self.read_ref(buffer) |
| |
| def reset_write(self): |
| """ |
| Reset write state after serialization. |
| |
| Clears internal write buffers, reference tracking state, and type resolution |
| caches. This method is automatically called after each serialization. |
| """ |
| self.depth = 0 |
| self.ref_resolver.reset_write() |
| self.type_resolver.reset_write() |
| self.serialization_context.reset_write() |
| self.metastring_resolver.reset_write() |
| self._buffer_callback = None |
| self._unsupported_callback = None |
| |
| def reset_read(self): |
| """ |
| Reset read state after deserialization. |
| |
| Clears internal read buffers, reference tracking state, and type resolution |
| caches. This method is automatically called after each deserialization. |
| """ |
| self.depth = 0 |
| self.ref_resolver.reset_read() |
| self.type_resolver.reset_read() |
| self.serialization_context.reset_read() |
| self.metastring_resolver.reset_write() |
| self._buffers = None |
| self._unsupported_objects = None |
| |
| def reset(self): |
| """ |
| Reset both write and read state. |
| |
| Clears all internal state including buffers, reference tracking, and type |
| resolution caches. Use this to ensure a clean state before reusing a Fory |
| instance. |
| """ |
| self.reset_write() |
| self.reset_read() |
| |
| def inc_depth(self): |
| self.depth += 1 |
| if self.depth > self.max_depth: |
| self.throw_depth_limit_exceeded_exception() |
| |
| def dec_depth(self): |
| self.depth -= 1 |
| |
| def throw_depth_limit_exceeded_exception(self): |
| raise Exception( |
| f"Read depth exceed max depth: {self.depth}, the deserialization data may be malicious. If it's not malicious, " |
| "please increase max read depth by Fory(..., max_depth=...)" |
| ) |
| |
| |
| _ENABLE_TYPE_REGISTRATION_FORCIBLY = os.getenv("ENABLE_TYPE_REGISTRATION_FORCIBLY", "0") in { |
| "1", |
| "true", |
| } |
| |
| |
| class ThreadSafeFory: |
| """ |
| Thread-safe wrapper for Fory using instance pooling. |
| |
| ThreadSafeFory maintains a pool of Fory instances protected by a lock to enable |
| safe concurrent serialization/deserialization across multiple threads. When a thread |
| needs to serialize or deserialize data, it acquires an instance from the pool, uses it, |
| and returns it for reuse by other threads. |
| |
| All type registrations must be performed before any serialization operations to ensure |
| consistency across all pooled instances. Attempting to register types after the first |
| serialization will raise a RuntimeError. |
| |
| Args: |
| xlang (bool): Whether to enable cross-language serialization. Defaults to False. |
| ref (bool): Whether to enable reference tracking. Defaults to False. |
| strict (bool): Whether to require type registration. Defaults to True. |
| compatible (bool): Whether to enable compatible mode. Defaults to False. |
| max_depth (int): Maximum depth for deserialization. Defaults to 50. |
| |
| Example: |
| >>> import pyfury |
| >>> import threading |
| >>> from dataclasses import dataclass |
| >>> |
| >>> @dataclass |
| >>> class Person: |
| ... name: str |
| ... age: int |
| >>> |
| >>> # Create thread-safe instance |
| >>> fory = pyfury.ThreadSafeFory() |
| >>> fory.register(Person) |
| >>> |
| >>> # Use safely from multiple threads |
| >>> def worker(thread_id): |
| ... person = Person(f"User{thread_id}", 25) |
| ... data = fory.serialize(person) |
| ... result = fory.deserialize(data) |
| ... print(f"Thread {thread_id}: {result}") |
| >>> |
| >>> threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)] |
| >>> for t in threads: t.start() |
| >>> for t in threads: t.join() |
| |
| Note: |
| - Register all types before calling serialize/deserialize |
| - The pool grows dynamically as needed based on thread contention |
| - Instances are automatically returned to the pool after use |
| - Both Python and Cython modes are supported automatically |
| """ |
| |
| def __init__(self, **kwargs): |
| import threading |
| |
| self._config = kwargs |
| self._callbacks = [] |
| self._lock = threading.Lock() |
| self._pool = [] |
| self._fory_class = self._get_fory_class() |
| self._instances_created = False |
| |
| def _get_fory_class(self): |
| try: |
| from pyfory.serialization import ENABLE_FORY_CYTHON_SERIALIZATION |
| |
| if ENABLE_FORY_CYTHON_SERIALIZATION: |
| from pyfory.serialization import Fory as CythonFory |
| |
| return CythonFory |
| except ImportError: |
| pass |
| return Fory |
| |
| def _get_fory(self): |
| with self._lock: |
| if self._pool: |
| return self._pool.pop() |
| self._instances_created = True |
| fory = self._fory_class(**self._config) |
| for callback in self._callbacks: |
| callback(fory) |
| return fory |
| |
| def _return_fory(self, fory): |
| with self._lock: |
| self._pool.append(fory) |
| |
| def _register_callback(self, callback): |
| with self._lock: |
| if self._instances_created: |
| raise RuntimeError( |
| "Cannot register types after Fory instances have been created. Please register all types before calling serialize/deserialize." |
| ) |
| self._callbacks.append(callback) |
| |
| def register( |
| self, |
| cls: Union[type, TypeVar], |
| *, |
| type_id: int = None, |
| namespace: str = None, |
| typename: str = None, |
| serializer=None, |
| ): |
| self._register_callback(lambda f: f.register(cls, type_id=type_id, namespace=namespace, typename=typename, serializer=serializer)) |
| |
| def register_type( |
| self, |
| cls: Union[type, TypeVar], |
| *, |
| type_id: int = None, |
| namespace: str = None, |
| typename: str = None, |
| serializer=None, |
| ): |
| self._register_callback(lambda f: f.register_type(cls, type_id=type_id, namespace=namespace, typename=typename, serializer=serializer)) |
| |
| def register_serializer(self, cls: type, serializer): |
| self._register_callback(lambda f: f.register_serializer(cls, serializer)) |
| |
| def serialize( |
| self, |
| obj, |
| buffer: Buffer = None, |
| buffer_callback=None, |
| unsupported_callback=None, |
| ) -> Union[Buffer, bytes]: |
| fory = self._get_fory() |
| try: |
| return fory.serialize(obj, buffer, buffer_callback, unsupported_callback) |
| finally: |
| self._return_fory(fory) |
| |
| def deserialize( |
| self, |
| buffer: Union[Buffer, bytes], |
| buffers: Iterable = None, |
| unsupported_objects: Iterable = None, |
| ): |
| fory = self._get_fory() |
| try: |
| return fory.deserialize(buffer, buffers, unsupported_objects) |
| finally: |
| self._return_fory(fory) |
| |
| def dumps( |
| self, |
| obj, |
| buffer: Buffer = None, |
| buffer_callback=None, |
| unsupported_callback=None, |
| ) -> Union[Buffer, bytes]: |
| return self.serialize(obj, buffer, buffer_callback, unsupported_callback) |
| |
| def loads( |
| self, |
| buffer: Union[Buffer, bytes], |
| buffers: Iterable = None, |
| unsupported_objects: Iterable = None, |
| ): |
| return self.deserialize(buffer, buffers, unsupported_objects) |