| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import array |
| import dataclasses |
| import datetime |
| import enum |
| import functools |
| import logging |
| import pickle |
| import types |
| import typing |
| from typing import TypeVar, Union |
| from enum import Enum |
| |
| from pyfory import ENABLE_FORY_CYTHON_SERIALIZATION |
| from pyfory import Language |
| from pyfory.error import TypeUnregisteredError |
| |
| from pyfory.serializer import ( |
| Serializer, |
| Numpy1DArraySerializer, |
| NDArraySerializer, |
| PyArraySerializer, |
| DynamicPyArraySerializer, |
| NoneSerializer, |
| BooleanSerializer, |
| ByteSerializer, |
| Int16Serializer, |
| Int32Serializer, |
| Int64Serializer, |
| Float32Serializer, |
| Float64Serializer, |
| StringSerializer, |
| DateSerializer, |
| TimestampSerializer, |
| BytesSerializer, |
| ListSerializer, |
| TupleSerializer, |
| MapSerializer, |
| SetSerializer, |
| NonExistEnum, |
| EnumSerializer, |
| SliceSerializer, |
| StatefulSerializer, |
| ReduceSerializer, |
| FunctionSerializer, |
| ObjectSerializer, |
| TypeSerializer, |
| ModuleSerializer, |
| MappingProxySerializer, |
| MethodSerializer, |
| UnsupportedSerializer, |
| NativeFuncMethodSerializer, |
| PickleBufferSerializer, |
| UnionSerializer, |
| ) |
| from pyfory.meta.metastring import MetaStringEncoder, MetaStringDecoder |
| from pyfory.meta.meta_compressor import DeflaterMetaCompressor |
| from pyfory.type import ( |
| TypeId, |
| int8, |
| int16, |
| int32, |
| int64, |
| float32, |
| float64, |
| load_class, |
| is_struct_type, |
| record_class_factory, |
| ) |
| from pyfory._fory import ( |
| DYNAMIC_TYPE_ID, |
| # preserve 0 as flag for type id not set in TypeInfo` |
| NO_TYPE_ID, |
| ) |
| from pyfory.meta.typedef import TypeDef |
| from pyfory.meta.typedef_decoder import decode_typedef, skip_typedef |
| from pyfory.meta.typedef_encoder import encode_typedef |
| |
| try: |
| import numpy as np |
| except ImportError: |
| np = None |
| |
| logger = logging.getLogger(__name__) |
| namespace_decoder = MetaStringDecoder(".", "_") |
| typename_decoder = MetaStringDecoder("$", "_") |
| |
| if ENABLE_FORY_CYTHON_SERIALIZATION: |
| from pyfory.serialization import TypeInfo |
| else: |
| |
| class TypeInfo: |
| __slots__ = ( |
| "cls", |
| "type_id", |
| "serializer", |
| "namespace_bytes", |
| "typename_bytes", |
| "dynamic_type", |
| "type_def", |
| ) |
| |
| def __init__( |
| self, |
| cls: type = None, |
| type_id: int = NO_TYPE_ID, |
| serializer: Serializer = None, |
| namespace_bytes=None, |
| typename_bytes=None, |
| dynamic_type: bool = False, |
| type_def: TypeDef = None, |
| ): |
| self.cls = cls |
| self.type_id = type_id |
| self.serializer = serializer |
| self.namespace_bytes = namespace_bytes |
| self.typename_bytes = typename_bytes |
| self.dynamic_type = dynamic_type |
| self.type_def = type_def |
| |
| def __repr__(self): |
| return f"TypeInfo(cls={self.cls}, type_id={self.type_id}, serializer={self.serializer})" |
| |
| def decode_namespace(self) -> str: |
| if self.namespace_bytes is None: |
| return "" |
| return self.namespace_bytes.decode(namespace_decoder) |
| |
| def decode_typename(self) -> str: |
| if self.typename_bytes is None: |
| return "" |
| return self.typename_bytes.decode(typename_decoder) |
| |
| |
| class TypeResolver: |
| __slots__ = ( |
| "fory", |
| "_metastr_to_str", |
| "_type_id_counter", |
| "_types_info", |
| "_hash_to_metastring", |
| "_metastr_to_type", |
| "_hash_to_typeinfo", |
| "_dynamic_id_to_typeinfo_list", |
| "_dynamic_id_to_metastr_list", |
| "_dynamic_write_string_id", |
| "_dynamic_written_metastr", |
| "_ns_type_to_typeinfo", |
| "_named_type_to_typeinfo", |
| "namespace_encoder", |
| "namespace_decoder", |
| "typename_encoder", |
| "typename_decoder", |
| "meta_compressor", |
| "require_registration", |
| "metastring_resolver", |
| "language", |
| "_type_id_to_typeinfo", |
| "_meta_shared_typeinfo", |
| "meta_share", |
| "serialization_context", |
| "_internal_py_serializer_map", |
| ) |
| |
| def __init__(self, fory, meta_share=False, meta_compressor=None): |
| self.fory = fory |
| self.metastring_resolver = fory.metastring_resolver |
| self.language = fory.language |
| self.require_registration = fory.strict |
| self._metastr_to_str = dict() |
| self._metastr_to_type = dict() |
| self._hash_to_metastring = dict() |
| self._hash_to_typeinfo = dict() |
| self._dynamic_written_metastr = [] |
| self._type_id_to_typeinfo = dict() |
| self._type_id_counter = 64 |
| self._dynamic_write_string_id = 0 |
| # hold objects to avoid gc, since `flat_hash_map/vector` doesn't |
| # hold python reference. |
| self._types_info = dict() |
| self._ns_type_to_typeinfo = dict() |
| self._named_type_to_typeinfo = dict() |
| self.namespace_encoder = MetaStringEncoder(".", "_") |
| self.namespace_decoder = MetaStringDecoder(".", "_") |
| # Cache for TypeDef and TypeInfo tuples (similar to Java's classIdToDef) |
| self._meta_shared_typeinfo = {} |
| self.typename_encoder = MetaStringEncoder("$", "_") |
| self.typename_decoder = MetaStringDecoder("$", "_") |
| self.meta_compressor = meta_compressor if meta_compressor is not None else DeflaterMetaCompressor() |
| self.meta_share = meta_share |
| self._internal_py_serializer_map = {} |
| |
| def initialize(self): |
| self._initialize_common() |
| if self.fory.language == Language.PYTHON: |
| self._initialize_py() |
| else: |
| self._initialize_xlang() |
| self.serialization_context = self.fory.serialization_context |
| |
| def _initialize_py(self): |
| register = functools.partial(self._register_type, internal=True) |
| register(type(None), serializer=NoneSerializer) |
| register(tuple, serializer=TupleSerializer) |
| register(slice, serializer=SliceSerializer) |
| if np is not None: |
| register(np.ndarray, serializer=NDArraySerializer) |
| register(array.array, serializer=DynamicPyArraySerializer) |
| register(types.MappingProxyType, serializer=MappingProxySerializer) |
| register(pickle.PickleBuffer, serializer=PickleBufferSerializer) |
| if not self.require_registration: |
| register(types.ModuleType, serializer=ModuleSerializer) |
| self._internal_py_serializer_map = { |
| ReduceSerializer: (self._stub_cls("__Reduce__"), self._next_type_id()), |
| TypeSerializer: (self._stub_cls("__Type__"), self._next_type_id()), |
| MethodSerializer: (self._stub_cls("__Method__"), self._next_type_id()), |
| FunctionSerializer: ( |
| self._stub_cls("__Function__"), |
| self._next_type_id(), |
| ), |
| NativeFuncMethodSerializer: ( |
| self._stub_cls("__NativeFunction__"), |
| self._next_type_id(), |
| ), |
| } |
| for serializer, ( |
| stub_cls, |
| type_id, |
| ) in self._internal_py_serializer_map.items(): |
| register(stub_cls, serializer=serializer, type_id=type_id) |
| |
| @staticmethod |
| def _stub_cls(name: str): |
| return record_class_factory(name, []) |
| |
| def _initialize_xlang(self): |
| register = functools.partial(self._register_type, internal=True) |
| register(array.array, type_id=DYNAMIC_TYPE_ID, serializer=DynamicPyArraySerializer) |
| if np is not None: |
| register(np.ndarray, type_id=DYNAMIC_TYPE_ID, serializer=NDArraySerializer) |
| |
| def _initialize_common(self): |
| register = functools.partial(self._register_type, internal=True) |
| register(None, type_id=TypeId.UNKNOWN, serializer=NoneSerializer) |
| register(bool, type_id=TypeId.BOOL, serializer=BooleanSerializer) |
| register(int8, type_id=TypeId.INT8, serializer=ByteSerializer) |
| register(int16, type_id=TypeId.INT16, serializer=Int16Serializer) |
| register(int32, type_id=TypeId.INT32, serializer=Int32Serializer) |
| register(int64, type_id=TypeId.INT64, serializer=Int64Serializer) |
| register(int, type_id=TypeId.INT64, serializer=Int64Serializer) |
| register( |
| float32, |
| type_id=TypeId.FLOAT32, |
| serializer=Float32Serializer, |
| ) |
| register( |
| float64, |
| type_id=TypeId.FLOAT64, |
| serializer=Float64Serializer, |
| ) |
| register(float, type_id=TypeId.FLOAT64, serializer=Float64Serializer) |
| register(str, type_id=TypeId.STRING, serializer=StringSerializer) |
| # TODO(chaokunyang) DURATION DECIMAL |
| register(datetime.datetime, type_id=TypeId.TIMESTAMP, serializer=TimestampSerializer) |
| register(datetime.date, type_id=TypeId.LOCAL_DATE, serializer=DateSerializer) |
| register(bytes, type_id=TypeId.BINARY, serializer=BytesSerializer) |
| for itemsize, ftype, typeid in PyArraySerializer.typecode_dict.values(): |
| register( |
| ftype, |
| type_id=typeid, |
| serializer=PyArraySerializer(self.fory, ftype, typeid), |
| ) |
| if np: |
| # overwrite pyarray with same type id. |
| # if pyarray are needed, one must annotate that value with XXXArrayType |
| # as a field of a struct. |
| for dtype, ( |
| itemsize, |
| format, |
| ftype, |
| typeid, |
| ) in Numpy1DArraySerializer.dtypes_dict.items(): |
| register( |
| ftype, |
| type_id=typeid, |
| serializer=Numpy1DArraySerializer(self.fory, ftype, dtype), |
| ) |
| register(list, type_id=TypeId.LIST, serializer=ListSerializer) |
| register(set, type_id=TypeId.SET, serializer=SetSerializer) |
| register(dict, type_id=TypeId.MAP, serializer=MapSerializer) |
| |
| def register_type( |
| self, |
| cls: Union[type, TypeVar], |
| *, |
| type_id: int = None, |
| namespace: str = None, |
| typename: str = None, |
| serializer=None, |
| ): |
| return self._register_type( |
| cls, |
| type_id=type_id, |
| namespace=namespace, |
| typename=typename, |
| serializer=serializer, |
| ) |
| |
| def _register_type( |
| self, |
| cls: Union[type, TypeVar], |
| *, |
| type_id: int = None, |
| namespace: str = None, |
| typename: str = None, |
| serializer=None, |
| internal=False, |
| ): |
| """Register type with given type id or typename. If typename is not None, it will be used for |
| cross-language serialization.""" |
| if serializer is not None and not isinstance(serializer, Serializer): |
| try: |
| serializer = serializer(self.fory, cls) |
| except BaseException: |
| try: |
| serializer = serializer(self.fory) |
| except BaseException: |
| serializer = serializer() |
| n_params = len({typename, type_id, None}) - 1 |
| if n_params == 0 and typename is None: |
| type_id = self._next_type_id() |
| if n_params == 2: |
| raise TypeError(f"type name {typename} and id {type_id} should not be set at the same time") |
| if type_id not in {0, None}: |
| # multiple type can have same tpe id |
| if type_id in self._type_id_to_typeinfo and cls in self._types_info: |
| raise TypeError(f"{cls} registered already") |
| elif cls in self._types_info: |
| raise TypeError(f"{cls} registered already") |
| register_type = self._register_xtype if self.fory.language == Language.XLANG else self._register_pytype |
| return register_type( |
| cls, |
| type_id=type_id, |
| namespace=namespace, |
| typename=typename, |
| serializer=serializer, |
| internal=internal, |
| ) |
| |
| def _register_xtype( |
| self, |
| cls: Union[type, TypeVar], |
| *, |
| type_id: int = None, |
| namespace: str = None, |
| typename: str = None, |
| serializer=None, |
| internal=False, |
| ): |
| if serializer is None: |
| if issubclass(cls, enum.Enum): |
| serializer = EnumSerializer(self.fory, cls) |
| type_id = TypeId.NAMED_ENUM if type_id is None else ((type_id << 8) + TypeId.ENUM) |
| else: |
| serializer = None |
| if self.meta_share: |
| type_id = TypeId.NAMED_COMPATIBLE_STRUCT if type_id is None else ((type_id << 8) + TypeId.COMPATIBLE_STRUCT) |
| else: |
| type_id = TypeId.NAMED_STRUCT if type_id is None else ((type_id << 8) + TypeId.STRUCT) |
| elif not internal: |
| type_id = TypeId.NAMED_EXT if type_id is None else ((type_id << 8) + TypeId.EXT) |
| |
| return self.__register_type( |
| cls, |
| type_id=type_id, |
| serializer=serializer, |
| namespace=namespace, |
| typename=typename, |
| internal=internal, |
| ) |
| |
| def _register_pytype( |
| self, |
| cls: Union[type, TypeVar], |
| *, |
| type_id: int = None, |
| namespace: str = None, |
| typename: str = None, |
| serializer: Serializer = None, |
| internal: bool = False, |
| ): |
| # Set default type_id when None, similar to _register_xtype |
| if type_id is None and typename is not None: |
| type_id = self._next_type_id() |
| return self.__register_type( |
| cls, |
| type_id=type_id, |
| namespace=namespace, |
| typename=typename, |
| serializer=serializer, |
| internal=internal, |
| ) |
| |
| def __register_type( |
| self, |
| cls: Union[type, TypeVar], |
| *, |
| type_id: int = None, |
| namespace: str = None, |
| typename: str = None, |
| serializer: Serializer = None, |
| internal: bool = False, |
| ): |
| dynamic_type = type_id is not None and type_id < 0 |
| # In metashare mode, for struct types, we want to keep serializer=None |
| # so that _set_typeinfo will be called to create the TypeDef-based serializer |
| # This applies to both types registered by name and by ID |
| should_create_serializer = ( |
| not internal and serializer is None and not (self.meta_share and type_id is not None and is_struct_type(type_id & 0xFF)) |
| ) |
| |
| if should_create_serializer: |
| serializer = self._create_serializer(cls) |
| |
| if typename is None: |
| typeinfo = TypeInfo(cls, type_id, serializer, None, None, dynamic_type) |
| else: |
| if namespace is None: |
| splits = typename.rsplit(".", 1) |
| if len(splits) == 2: |
| namespace, typename = splits |
| else: |
| namespace = "" # Use empty string for consistency with lookup |
| ns_metastr = self.namespace_encoder.encode(namespace or "") |
| ns_meta_bytes = self.metastring_resolver.get_metastr_bytes(ns_metastr) |
| type_metastr = self.typename_encoder.encode(typename) |
| type_meta_bytes = self.metastring_resolver.get_metastr_bytes(type_metastr) |
| typeinfo = TypeInfo(cls, type_id, serializer, ns_meta_bytes, type_meta_bytes, dynamic_type) |
| self._named_type_to_typeinfo[(namespace, typename)] = typeinfo |
| self._ns_type_to_typeinfo[(ns_meta_bytes, type_meta_bytes)] = typeinfo |
| self._types_info[cls] = typeinfo |
| if type_id is not None and type_id != 0 and (self.language == Language.PYTHON or not TypeId.is_namespaced_type(type_id)): |
| if type_id not in self._type_id_to_typeinfo or not internal: |
| self._type_id_to_typeinfo[type_id] = typeinfo |
| self._types_info[cls] = typeinfo |
| # Create TypeDef for NAMED_ENUM and NAMED_EXT when meta_share is enabled |
| if self.meta_share and type_id is not None: |
| base_type_id = type_id & 0xFF |
| if base_type_id in (TypeId.NAMED_ENUM, TypeId.NAMED_EXT): |
| type_def = encode_typedef(self, cls) |
| if type_def is not None: |
| typeinfo.type_def = type_def |
| return typeinfo |
| |
| def _next_type_id(self): |
| type_id = self._type_id_counter = self._type_id_counter + 1 |
| while type_id in self._type_id_to_typeinfo: |
| type_id = self._type_id_counter = self._type_id_counter + 1 |
| return type_id |
| |
| def register_serializer(self, cls: Union[type, TypeVar], serializer): |
| assert isinstance(cls, (type, TypeVar)), cls |
| if cls not in self._types_info: |
| raise TypeUnregisteredError(f"{cls} not registered") |
| typeinfo = self._types_info[cls] |
| if self.fory.language == Language.PYTHON: |
| typeinfo.serializer = serializer |
| return |
| type_id = prev_type_id = typeinfo.type_id |
| self._type_id_to_typeinfo.pop(prev_type_id) |
| if typeinfo.serializer is not serializer: |
| if typeinfo.typename_bytes is not None: |
| type_id = typeinfo.type_id & 0xFFFFFF00 | TypeId.NAMED_EXT |
| else: |
| type_id = typeinfo.type_id & 0xFFFFFF00 | TypeId.EXT |
| self._type_id_to_typeinfo[type_id] = typeinfo |
| |
| def get_serializer(self, cls: type): |
| """ |
| Returns |
| ------- |
| Returns or create serializer for the provided type |
| """ |
| return self.get_typeinfo(cls).serializer |
| |
| def get_typeinfo(self, cls, create=True): |
| type_info = self._types_info.get(cls) |
| if type_info is not None: |
| if type_info.serializer is None: |
| self._set_typeinfo(type_info) |
| return type_info |
| elif not create: |
| return None |
| if cls is NonExistEnum: |
| return self._get_nonexist_enum_typeinfo() |
| if self.require_registration and not issubclass(cls, Enum): |
| raise TypeUnregisteredError(f"{cls} not registered") |
| logger.info("Type %s not registered", cls) |
| serializer = self._create_serializer(cls) |
| type_id = None |
| if self.language == Language.PYTHON: |
| if isinstance(serializer, EnumSerializer): |
| type_id = TypeId.NAMED_ENUM |
| elif isinstance(serializer, (ObjectSerializer, StatefulSerializer)): |
| type_id = TypeId.NAMED_EXT |
| elif self._internal_py_serializer_map.get(type(serializer)) is not None: |
| type_id = self._internal_py_serializer_map.get(type(serializer))[1] |
| if not self.require_registration: |
| from pyfory.struct import DataClassSerializer |
| |
| if isinstance(serializer, DataClassSerializer): |
| type_id = TypeId.NAMED_STRUCT |
| if type_id is None: |
| raise TypeUnregisteredError(f"{cls} must be registered using `fory.register_type` API") |
| return self.__register_type( |
| cls, |
| type_id=type_id, |
| namespace=cls.__module__, |
| typename=cls.__qualname__, |
| serializer=serializer, |
| ) |
| |
| def _set_typeinfo(self, typeinfo): |
| type_id = typeinfo.type_id & 0xFF |
| if is_struct_type(type_id): |
| from pyfory.struct import DataClassSerializer, DataClassStubSerializer |
| |
| # Set a stub serializer FIRST to break recursion for self-referencing types. |
| # get_typeinfo() only calls _set_typeinfo when serializer is None, |
| # so setting stub first prevents re-entry for circular type references. |
| typeinfo.serializer = DataClassStubSerializer(self.fory, typeinfo.cls, xlang=not self.fory.is_py) |
| |
| if self.meta_share: |
| type_def = encode_typedef(self, typeinfo.cls) |
| if type_def is not None: |
| typeinfo.serializer = type_def.create_serializer(self) |
| typeinfo.type_def = type_def |
| else: |
| typeinfo.serializer = DataClassSerializer(self.fory, typeinfo.cls, xlang=not self.fory.is_py) |
| else: |
| typeinfo.serializer = DataClassSerializer(self.fory, typeinfo.cls, xlang=not self.fory.is_py) |
| else: |
| typeinfo.serializer = self._create_serializer(typeinfo.cls) |
| |
| return typeinfo |
| |
| def _create_serializer(self, cls): |
| # Check if it's a Union type first |
| origin = typing.get_origin(cls) if hasattr(typing, "get_origin") else getattr(cls, "__origin__", None) |
| if origin is typing.Union: |
| # Extract alternative types from Union |
| args = typing.get_args(cls) if hasattr(typing, "get_args") else getattr(cls, "__args__", ()) |
| # Filter out NoneType as it's handled separately via ref tracking |
| alternative_types = [arg for arg in args if arg is not type(None)] |
| if len(alternative_types) == 0: |
| # Union with only None is equivalent to NoneType |
| return NoneSerializer(self.fory) |
| elif len(alternative_types) == 1: |
| # Optional[T] should use the serializer for T |
| return self.get_serializer(alternative_types[0]) |
| else: |
| # Real union with multiple alternatives |
| return UnionSerializer(self.fory, cls, alternative_types) |
| |
| for clz in cls.__mro__: |
| type_info = self._types_info.get(clz) |
| if type_info and type_info.serializer and type_info.serializer.support_subclass(): |
| serializer = type(type_info.serializer)(self.fory, cls) |
| break |
| else: |
| if cls is types.FunctionType: |
| # Use FunctionSerializer for function types (including lambdas) |
| serializer = FunctionSerializer(self.fory, cls) |
| elif dataclasses.is_dataclass(cls): |
| # lazy create serializer to handle nested struct fields. |
| from pyfory.struct import DataClassStubSerializer |
| |
| serializer = DataClassStubSerializer(self.fory, cls, xlang=not self.fory.is_py) |
| elif issubclass(cls, enum.Enum): |
| serializer = EnumSerializer(self.fory, cls) |
| elif ("builtin_function_or_method" in str(cls) or "cython_function_or_method" in str(cls)) and "<locals>" not in str(cls): |
| serializer = NativeFuncMethodSerializer(self.fory, cls) |
| elif cls is type(self.initialize): |
| # Handle bound method objects |
| serializer = MethodSerializer(self.fory, cls) |
| elif issubclass(cls, type): |
| # Handle Python type objects and metaclass such as numpy._DTypeMeta(i.e. np.dtype) |
| serializer = TypeSerializer(self.fory, cls) |
| elif cls is array.array: |
| # Handle array.array objects with DynamicPyArraySerializer |
| # Note: This will use DynamicPyArraySerializer for all array.array objects |
| serializer = DynamicPyArraySerializer(self.fory, cls) |
| elif (hasattr(cls, "__reduce__") and cls.__reduce__ is not object.__reduce__) or ( |
| hasattr(cls, "__reduce_ex__") and cls.__reduce_ex__ is not object.__reduce_ex__ |
| ): |
| # Use ReduceSerializer for objects that have custom __reduce__ or __reduce_ex__ methods |
| # This has higher precedence than StatefulSerializer and ObjectSerializer |
| # Only use it for objects with custom reduce methods, not default ones from the object |
| serializer = ReduceSerializer(self.fory, cls) |
| elif hasattr(cls, "__getstate__") and hasattr(cls, "__setstate__"): |
| # Use StatefulSerializer for objects that support __getstate__ and __setstate__ |
| serializer = StatefulSerializer(self.fory, cls) |
| elif hasattr(cls, "__dict__") or hasattr(cls, "__slots__"): |
| serializer = ObjectSerializer(self.fory, cls) |
| else: |
| # c-extension types will go to here |
| serializer = UnsupportedSerializer(self.fory, cls) |
| return serializer |
| |
| def is_registered_by_name(self, cls): |
| typeinfo = self._types_info.get(cls) |
| if typeinfo is None: |
| return False |
| return TypeId.is_namespaced_type(typeinfo.type_id & 0xFF) |
| |
| def is_registered_by_id(self, cls=None, type_id=None): |
| if cls is not None: |
| typeinfo = self._types_info.get(cls) |
| if typeinfo is None: |
| return False |
| return not TypeId.is_namespaced_type(typeinfo.type_id & 0xFF) |
| else: |
| return type_id in self._type_id_to_typeinfo |
| |
| def get_registered_name(self, cls): |
| typeinfo = self._types_info.get(cls) |
| assert typeinfo is not None, f"{cls} not registered" |
| return typeinfo.decode_namespace(), typeinfo.decode_typename() |
| |
| def get_registered_id(self, cls): |
| typeinfo = self._types_info.get(cls) |
| assert typeinfo is not None, f"{cls} not registered" |
| return typeinfo.type_id |
| |
| def _load_metabytes_to_typeinfo(self, ns_metabytes, type_metabytes): |
| typeinfo = self._ns_type_to_typeinfo.get((ns_metabytes, type_metabytes)) |
| if typeinfo is not None: |
| return typeinfo |
| ns = ns_metabytes.decode(self.namespace_decoder) |
| typename = type_metabytes.decode(self.typename_decoder) |
| # the hash computed between languages may be different. |
| typeinfo = self._named_type_to_typeinfo.get((ns, typename)) |
| if typeinfo is not None: |
| self._ns_type_to_typeinfo[(ns_metabytes, type_metabytes)] = typeinfo |
| return typeinfo |
| cls = load_class(ns + "#" + typename) |
| typeinfo = self.get_typeinfo(cls) |
| self._ns_type_to_typeinfo[(ns_metabytes, type_metabytes)] = typeinfo |
| return typeinfo |
| |
| def write_typeinfo(self, buffer, typeinfo): |
| if typeinfo.dynamic_type: |
| return |
| # Check if meta share is enabled first |
| if self.meta_share: |
| self.write_shared_type_meta(buffer, typeinfo) |
| return |
| type_id = typeinfo.type_id |
| internal_type_id = type_id & 0xFF |
| buffer.write_varuint32(type_id) |
| if TypeId.is_namespaced_type(internal_type_id): |
| self.metastring_resolver.write_meta_string_bytes(buffer, typeinfo.namespace_bytes) |
| self.metastring_resolver.write_meta_string_bytes(buffer, typeinfo.typename_bytes) |
| |
| def read_typeinfo(self, buffer): |
| # Check if meta share is enabled first |
| if self.meta_share: |
| return self.read_shared_type_meta(buffer) |
| |
| type_id = buffer.read_varuint32() |
| internal_type_id = type_id & 0xFF |
| if TypeId.is_namespaced_type(internal_type_id): |
| ns_metabytes = self.metastring_resolver.read_meta_string_bytes(buffer) |
| type_metabytes = self.metastring_resolver.read_meta_string_bytes(buffer) |
| typeinfo = self._ns_type_to_typeinfo.get((ns_metabytes, type_metabytes)) |
| if typeinfo is None: |
| ns = ns_metabytes.decode(self.namespace_decoder) |
| typename = type_metabytes.decode(self.typename_decoder) |
| typeinfo = self._named_type_to_typeinfo.get((ns, typename)) |
| if typeinfo is not None: |
| self._ns_type_to_typeinfo[(ns_metabytes, type_metabytes)] = typeinfo |
| return typeinfo |
| name = ns + "." + typename if ns else typename |
| raise TypeUnregisteredError(f"{name} not registered") |
| return typeinfo |
| else: |
| return self._type_id_to_typeinfo.get(type_id) |
| |
| def get_typeinfo_by_id(self, type_id): |
| """Get typeinfo by type_id. Never returns None. |
| |
| For unknown ENUM types, returns NonExistEnum typeinfo. |
| For other unknown types, raises TypeUnregisteredError. |
| """ |
| typeinfo = self._type_id_to_typeinfo.get(type_id) |
| if typeinfo is not None: |
| return typeinfo |
| base_type_id = type_id & 0xFF |
| if base_type_id == TypeId.ENUM: |
| return self._get_nonexist_enum_typeinfo() |
| raise TypeUnregisteredError(f"type id {type_id} (base {base_type_id}) not registered") |
| |
| def _get_nonexist_enum_typeinfo(self): |
| """Get or create TypeInfo for NonExistEnum to handle unknown enum types.""" |
| from pyfory.serializer import NonExistEnum, NonExistEnumSerializer |
| |
| typeinfo = self._types_info.get(NonExistEnum) |
| if typeinfo is None: |
| serializer = NonExistEnumSerializer(self.fory) |
| typeinfo = TypeInfo(NonExistEnum, TypeId.ENUM, serializer, None, None, False) |
| self._types_info[NonExistEnum] = typeinfo |
| return typeinfo |
| |
| def get_typeinfo_by_name(self, namespace, typename): |
| """Get typeinfo by namespace and typename.""" |
| return self._named_type_to_typeinfo.get((namespace, typename)) |
| |
| def get_meta_compressor(self): |
| return self.meta_compressor |
| |
| def write_shared_type_meta(self, buffer, typeinfo): |
| """Write shared type meta information.""" |
| meta_context = self.fory.serialization_context.meta_context |
| meta_context.write_shared_typeinfo(buffer, typeinfo) |
| |
| def read_shared_type_meta(self, buffer): |
| """Read shared type meta information.""" |
| meta_context = self.serialization_context.meta_context |
| assert meta_context is not None, "Meta context must be set when meta share is enabled" |
| return meta_context.read_shared_typeinfo(buffer) |
| |
| def write_type_defs(self, buffer): |
| """Write all type definitions that need to be sent.""" |
| meta_context = self.fory.serialization_context.meta_context |
| if meta_context is None: |
| return |
| writing_type_defs = meta_context.get_writing_type_defs() |
| buffer.write_varuint32(len(writing_type_defs)) |
| for type_def in writing_type_defs: |
| # Just copy the encoded bytes directly |
| buffer.write_bytes(type_def.encoded) |
| |
| def read_type_defs(self, buffer): |
| """Read all type definitions from the buffer.""" |
| meta_context = self.fory.serialization_context.meta_context |
| if meta_context is None: |
| return |
| |
| num_type_defs = buffer.read_varuint32() |
| for _ in range(num_type_defs): |
| # Read the header (first 8 bytes) to get the type ID |
| header = buffer.read_int64() |
| # Check if we already have this TypeDef cached |
| type_info = self._meta_shared_typeinfo.get(header) |
| if type_info is not None: |
| # Skip the rest of the TypeDef binary for faster performance |
| skip_typedef(buffer, header) |
| else: |
| # Read the TypeDef and create TypeInfo |
| type_def = decode_typedef(buffer, self, header=header) |
| type_info = self._build_type_info_from_typedef(type_def) |
| # Cache the tuple for future use |
| self._meta_shared_typeinfo[header] = type_info |
| meta_context.add_read_typeinfo(type_info) |
| |
| def _build_type_info_from_typedef(self, type_def): |
| """Build TypeInfo from TypeDef using TypeDef's create_serializer method.""" |
| # Create serializer using TypeDef's create_serializer method |
| serializer = type_def.create_serializer(self) |
| ns_metastr = self.namespace_encoder.encode(type_def.namespace or "") |
| ns_meta_bytes = self.metastring_resolver.get_metastr_bytes(ns_metastr) |
| type_metastr = self.typename_encoder.encode(type_def.typename) |
| type_meta_bytes = self.metastring_resolver.get_metastr_bytes(type_metastr) |
| typeinfo = TypeInfo( |
| type_def.cls, |
| type_def.type_id, |
| serializer, |
| ns_meta_bytes, |
| type_meta_bytes, |
| False, |
| type_def, |
| ) |
| return typeinfo |
| |
| def reset(self): |
| pass |
| |
| def reset_read(self): |
| pass |
| |
| def reset_write(self): |
| pass |