blob: 381c792fc819086ea8882d23658cad90d8a86d2a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import array
import dataclasses
import datetime
import enum
import functools
import logging
import pickle
import types
import typing
from typing import TypeVar, Union
from enum import Enum
from pyfory import ENABLE_FORY_CYTHON_SERIALIZATION
from pyfory import Language
from pyfory.error import TypeUnregisteredError
from pyfory.serializer import (
Serializer,
Numpy1DArraySerializer,
NDArraySerializer,
PyArraySerializer,
DynamicPyArraySerializer,
NoneSerializer,
BooleanSerializer,
ByteSerializer,
Int16Serializer,
Int32Serializer,
Int64Serializer,
Float32Serializer,
Float64Serializer,
StringSerializer,
DateSerializer,
TimestampSerializer,
BytesSerializer,
ListSerializer,
TupleSerializer,
MapSerializer,
SetSerializer,
NonExistEnum,
EnumSerializer,
SliceSerializer,
StatefulSerializer,
ReduceSerializer,
FunctionSerializer,
ObjectSerializer,
TypeSerializer,
ModuleSerializer,
MappingProxySerializer,
MethodSerializer,
UnsupportedSerializer,
NativeFuncMethodSerializer,
PickleBufferSerializer,
UnionSerializer,
)
from pyfory.meta.metastring import MetaStringEncoder, MetaStringDecoder
from pyfory.meta.meta_compressor import DeflaterMetaCompressor
from pyfory.type import (
TypeId,
int8,
int16,
int32,
int64,
float32,
float64,
load_class,
is_struct_type,
record_class_factory,
)
from pyfory._fory import (
DYNAMIC_TYPE_ID,
# preserve 0 as flag for type id not set in TypeInfo`
NO_TYPE_ID,
)
from pyfory.meta.typedef import TypeDef
from pyfory.meta.typedef_decoder import decode_typedef, skip_typedef
from pyfory.meta.typedef_encoder import encode_typedef
try:
import numpy as np
except ImportError:
np = None
logger = logging.getLogger(__name__)
namespace_decoder = MetaStringDecoder(".", "_")
typename_decoder = MetaStringDecoder("$", "_")
if ENABLE_FORY_CYTHON_SERIALIZATION:
from pyfory.serialization import TypeInfo
else:
class TypeInfo:
__slots__ = (
"cls",
"type_id",
"serializer",
"namespace_bytes",
"typename_bytes",
"dynamic_type",
"type_def",
)
def __init__(
self,
cls: type = None,
type_id: int = NO_TYPE_ID,
serializer: Serializer = None,
namespace_bytes=None,
typename_bytes=None,
dynamic_type: bool = False,
type_def: TypeDef = None,
):
self.cls = cls
self.type_id = type_id
self.serializer = serializer
self.namespace_bytes = namespace_bytes
self.typename_bytes = typename_bytes
self.dynamic_type = dynamic_type
self.type_def = type_def
def __repr__(self):
return f"TypeInfo(cls={self.cls}, type_id={self.type_id}, serializer={self.serializer})"
def decode_namespace(self) -> str:
if self.namespace_bytes is None:
return ""
return self.namespace_bytes.decode(namespace_decoder)
def decode_typename(self) -> str:
if self.typename_bytes is None:
return ""
return self.typename_bytes.decode(typename_decoder)
class TypeResolver:
__slots__ = (
"fory",
"_metastr_to_str",
"_type_id_counter",
"_types_info",
"_hash_to_metastring",
"_metastr_to_type",
"_hash_to_typeinfo",
"_dynamic_id_to_typeinfo_list",
"_dynamic_id_to_metastr_list",
"_dynamic_write_string_id",
"_dynamic_written_metastr",
"_ns_type_to_typeinfo",
"_named_type_to_typeinfo",
"namespace_encoder",
"namespace_decoder",
"typename_encoder",
"typename_decoder",
"meta_compressor",
"require_registration",
"metastring_resolver",
"language",
"_type_id_to_typeinfo",
"_meta_shared_typeinfo",
"meta_share",
"serialization_context",
"_internal_py_serializer_map",
)
def __init__(self, fory, meta_share=False, meta_compressor=None):
self.fory = fory
self.metastring_resolver = fory.metastring_resolver
self.language = fory.language
self.require_registration = fory.strict
self._metastr_to_str = dict()
self._metastr_to_type = dict()
self._hash_to_metastring = dict()
self._hash_to_typeinfo = dict()
self._dynamic_written_metastr = []
self._type_id_to_typeinfo = dict()
self._type_id_counter = 64
self._dynamic_write_string_id = 0
# hold objects to avoid gc, since `flat_hash_map/vector` doesn't
# hold python reference.
self._types_info = dict()
self._ns_type_to_typeinfo = dict()
self._named_type_to_typeinfo = dict()
self.namespace_encoder = MetaStringEncoder(".", "_")
self.namespace_decoder = MetaStringDecoder(".", "_")
# Cache for TypeDef and TypeInfo tuples (similar to Java's classIdToDef)
self._meta_shared_typeinfo = {}
self.typename_encoder = MetaStringEncoder("$", "_")
self.typename_decoder = MetaStringDecoder("$", "_")
self.meta_compressor = meta_compressor if meta_compressor is not None else DeflaterMetaCompressor()
self.meta_share = meta_share
self._internal_py_serializer_map = {}
def initialize(self):
self._initialize_common()
if self.fory.language == Language.PYTHON:
self._initialize_py()
else:
self._initialize_xlang()
self.serialization_context = self.fory.serialization_context
def _initialize_py(self):
register = functools.partial(self._register_type, internal=True)
register(type(None), serializer=NoneSerializer)
register(tuple, serializer=TupleSerializer)
register(slice, serializer=SliceSerializer)
if np is not None:
register(np.ndarray, serializer=NDArraySerializer)
register(array.array, serializer=DynamicPyArraySerializer)
register(types.MappingProxyType, serializer=MappingProxySerializer)
register(pickle.PickleBuffer, serializer=PickleBufferSerializer)
if not self.require_registration:
register(types.ModuleType, serializer=ModuleSerializer)
self._internal_py_serializer_map = {
ReduceSerializer: (self._stub_cls("__Reduce__"), self._next_type_id()),
TypeSerializer: (self._stub_cls("__Type__"), self._next_type_id()),
MethodSerializer: (self._stub_cls("__Method__"), self._next_type_id()),
FunctionSerializer: (
self._stub_cls("__Function__"),
self._next_type_id(),
),
NativeFuncMethodSerializer: (
self._stub_cls("__NativeFunction__"),
self._next_type_id(),
),
}
for serializer, (
stub_cls,
type_id,
) in self._internal_py_serializer_map.items():
register(stub_cls, serializer=serializer, type_id=type_id)
@staticmethod
def _stub_cls(name: str):
return record_class_factory(name, [])
def _initialize_xlang(self):
register = functools.partial(self._register_type, internal=True)
register(array.array, type_id=DYNAMIC_TYPE_ID, serializer=DynamicPyArraySerializer)
if np is not None:
register(np.ndarray, type_id=DYNAMIC_TYPE_ID, serializer=NDArraySerializer)
def _initialize_common(self):
register = functools.partial(self._register_type, internal=True)
register(None, type_id=TypeId.UNKNOWN, serializer=NoneSerializer)
register(bool, type_id=TypeId.BOOL, serializer=BooleanSerializer)
register(int8, type_id=TypeId.INT8, serializer=ByteSerializer)
register(int16, type_id=TypeId.INT16, serializer=Int16Serializer)
register(int32, type_id=TypeId.INT32, serializer=Int32Serializer)
register(int64, type_id=TypeId.INT64, serializer=Int64Serializer)
register(int, type_id=TypeId.INT64, serializer=Int64Serializer)
register(
float32,
type_id=TypeId.FLOAT32,
serializer=Float32Serializer,
)
register(
float64,
type_id=TypeId.FLOAT64,
serializer=Float64Serializer,
)
register(float, type_id=TypeId.FLOAT64, serializer=Float64Serializer)
register(str, type_id=TypeId.STRING, serializer=StringSerializer)
# TODO(chaokunyang) DURATION DECIMAL
register(datetime.datetime, type_id=TypeId.TIMESTAMP, serializer=TimestampSerializer)
register(datetime.date, type_id=TypeId.LOCAL_DATE, serializer=DateSerializer)
register(bytes, type_id=TypeId.BINARY, serializer=BytesSerializer)
for itemsize, ftype, typeid in PyArraySerializer.typecode_dict.values():
register(
ftype,
type_id=typeid,
serializer=PyArraySerializer(self.fory, ftype, typeid),
)
if np:
# overwrite pyarray with same type id.
# if pyarray are needed, one must annotate that value with XXXArrayType
# as a field of a struct.
for dtype, (
itemsize,
format,
ftype,
typeid,
) in Numpy1DArraySerializer.dtypes_dict.items():
register(
ftype,
type_id=typeid,
serializer=Numpy1DArraySerializer(self.fory, ftype, dtype),
)
register(list, type_id=TypeId.LIST, serializer=ListSerializer)
register(set, type_id=TypeId.SET, serializer=SetSerializer)
register(dict, type_id=TypeId.MAP, serializer=MapSerializer)
def register_type(
self,
cls: Union[type, TypeVar],
*,
type_id: int = None,
namespace: str = None,
typename: str = None,
serializer=None,
):
return self._register_type(
cls,
type_id=type_id,
namespace=namespace,
typename=typename,
serializer=serializer,
)
def _register_type(
self,
cls: Union[type, TypeVar],
*,
type_id: int = None,
namespace: str = None,
typename: str = None,
serializer=None,
internal=False,
):
"""Register type with given type id or typename. If typename is not None, it will be used for
cross-language serialization."""
if serializer is not None and not isinstance(serializer, Serializer):
try:
serializer = serializer(self.fory, cls)
except BaseException:
try:
serializer = serializer(self.fory)
except BaseException:
serializer = serializer()
n_params = len({typename, type_id, None}) - 1
if n_params == 0 and typename is None:
type_id = self._next_type_id()
if n_params == 2:
raise TypeError(f"type name {typename} and id {type_id} should not be set at the same time")
if type_id not in {0, None}:
# multiple type can have same tpe id
if type_id in self._type_id_to_typeinfo and cls in self._types_info:
raise TypeError(f"{cls} registered already")
elif cls in self._types_info:
raise TypeError(f"{cls} registered already")
register_type = self._register_xtype if self.fory.language == Language.XLANG else self._register_pytype
return register_type(
cls,
type_id=type_id,
namespace=namespace,
typename=typename,
serializer=serializer,
internal=internal,
)
def _register_xtype(
self,
cls: Union[type, TypeVar],
*,
type_id: int = None,
namespace: str = None,
typename: str = None,
serializer=None,
internal=False,
):
if serializer is None:
if issubclass(cls, enum.Enum):
serializer = EnumSerializer(self.fory, cls)
type_id = TypeId.NAMED_ENUM if type_id is None else ((type_id << 8) + TypeId.ENUM)
else:
serializer = None
if self.meta_share:
type_id = TypeId.NAMED_COMPATIBLE_STRUCT if type_id is None else ((type_id << 8) + TypeId.COMPATIBLE_STRUCT)
else:
type_id = TypeId.NAMED_STRUCT if type_id is None else ((type_id << 8) + TypeId.STRUCT)
elif not internal:
type_id = TypeId.NAMED_EXT if type_id is None else ((type_id << 8) + TypeId.EXT)
return self.__register_type(
cls,
type_id=type_id,
serializer=serializer,
namespace=namespace,
typename=typename,
internal=internal,
)
def _register_pytype(
self,
cls: Union[type, TypeVar],
*,
type_id: int = None,
namespace: str = None,
typename: str = None,
serializer: Serializer = None,
internal: bool = False,
):
# Set default type_id when None, similar to _register_xtype
if type_id is None and typename is not None:
type_id = self._next_type_id()
return self.__register_type(
cls,
type_id=type_id,
namespace=namespace,
typename=typename,
serializer=serializer,
internal=internal,
)
def __register_type(
self,
cls: Union[type, TypeVar],
*,
type_id: int = None,
namespace: str = None,
typename: str = None,
serializer: Serializer = None,
internal: bool = False,
):
dynamic_type = type_id is not None and type_id < 0
# In metashare mode, for struct types, we want to keep serializer=None
# so that _set_typeinfo will be called to create the TypeDef-based serializer
# This applies to both types registered by name and by ID
should_create_serializer = (
not internal and serializer is None and not (self.meta_share and type_id is not None and is_struct_type(type_id & 0xFF))
)
if should_create_serializer:
serializer = self._create_serializer(cls)
if typename is None:
typeinfo = TypeInfo(cls, type_id, serializer, None, None, dynamic_type)
else:
if namespace is None:
splits = typename.rsplit(".", 1)
if len(splits) == 2:
namespace, typename = splits
else:
namespace = "" # Use empty string for consistency with lookup
ns_metastr = self.namespace_encoder.encode(namespace or "")
ns_meta_bytes = self.metastring_resolver.get_metastr_bytes(ns_metastr)
type_metastr = self.typename_encoder.encode(typename)
type_meta_bytes = self.metastring_resolver.get_metastr_bytes(type_metastr)
typeinfo = TypeInfo(cls, type_id, serializer, ns_meta_bytes, type_meta_bytes, dynamic_type)
self._named_type_to_typeinfo[(namespace, typename)] = typeinfo
self._ns_type_to_typeinfo[(ns_meta_bytes, type_meta_bytes)] = typeinfo
self._types_info[cls] = typeinfo
if type_id is not None and type_id != 0 and (self.language == Language.PYTHON or not TypeId.is_namespaced_type(type_id)):
if type_id not in self._type_id_to_typeinfo or not internal:
self._type_id_to_typeinfo[type_id] = typeinfo
self._types_info[cls] = typeinfo
# Create TypeDef for NAMED_ENUM and NAMED_EXT when meta_share is enabled
if self.meta_share and type_id is not None:
base_type_id = type_id & 0xFF
if base_type_id in (TypeId.NAMED_ENUM, TypeId.NAMED_EXT):
type_def = encode_typedef(self, cls)
if type_def is not None:
typeinfo.type_def = type_def
return typeinfo
def _next_type_id(self):
type_id = self._type_id_counter = self._type_id_counter + 1
while type_id in self._type_id_to_typeinfo:
type_id = self._type_id_counter = self._type_id_counter + 1
return type_id
def register_serializer(self, cls: Union[type, TypeVar], serializer):
assert isinstance(cls, (type, TypeVar)), cls
if cls not in self._types_info:
raise TypeUnregisteredError(f"{cls} not registered")
typeinfo = self._types_info[cls]
if self.fory.language == Language.PYTHON:
typeinfo.serializer = serializer
return
type_id = prev_type_id = typeinfo.type_id
self._type_id_to_typeinfo.pop(prev_type_id)
if typeinfo.serializer is not serializer:
if typeinfo.typename_bytes is not None:
type_id = typeinfo.type_id & 0xFFFFFF00 | TypeId.NAMED_EXT
else:
type_id = typeinfo.type_id & 0xFFFFFF00 | TypeId.EXT
self._type_id_to_typeinfo[type_id] = typeinfo
def get_serializer(self, cls: type):
"""
Returns
-------
Returns or create serializer for the provided type
"""
return self.get_typeinfo(cls).serializer
def get_typeinfo(self, cls, create=True):
type_info = self._types_info.get(cls)
if type_info is not None:
if type_info.serializer is None:
self._set_typeinfo(type_info)
return type_info
elif not create:
return None
if cls is NonExistEnum:
return self._get_nonexist_enum_typeinfo()
if self.require_registration and not issubclass(cls, Enum):
raise TypeUnregisteredError(f"{cls} not registered")
logger.info("Type %s not registered", cls)
serializer = self._create_serializer(cls)
type_id = None
if self.language == Language.PYTHON:
if isinstance(serializer, EnumSerializer):
type_id = TypeId.NAMED_ENUM
elif isinstance(serializer, (ObjectSerializer, StatefulSerializer)):
type_id = TypeId.NAMED_EXT
elif self._internal_py_serializer_map.get(type(serializer)) is not None:
type_id = self._internal_py_serializer_map.get(type(serializer))[1]
if not self.require_registration:
from pyfory.struct import DataClassSerializer
if isinstance(serializer, DataClassSerializer):
type_id = TypeId.NAMED_STRUCT
if type_id is None:
raise TypeUnregisteredError(f"{cls} must be registered using `fory.register_type` API")
return self.__register_type(
cls,
type_id=type_id,
namespace=cls.__module__,
typename=cls.__qualname__,
serializer=serializer,
)
def _set_typeinfo(self, typeinfo):
type_id = typeinfo.type_id & 0xFF
if is_struct_type(type_id):
from pyfory.struct import DataClassSerializer, DataClassStubSerializer
# Set a stub serializer FIRST to break recursion for self-referencing types.
# get_typeinfo() only calls _set_typeinfo when serializer is None,
# so setting stub first prevents re-entry for circular type references.
typeinfo.serializer = DataClassStubSerializer(self.fory, typeinfo.cls, xlang=not self.fory.is_py)
if self.meta_share:
type_def = encode_typedef(self, typeinfo.cls)
if type_def is not None:
typeinfo.serializer = type_def.create_serializer(self)
typeinfo.type_def = type_def
else:
typeinfo.serializer = DataClassSerializer(self.fory, typeinfo.cls, xlang=not self.fory.is_py)
else:
typeinfo.serializer = DataClassSerializer(self.fory, typeinfo.cls, xlang=not self.fory.is_py)
else:
typeinfo.serializer = self._create_serializer(typeinfo.cls)
return typeinfo
def _create_serializer(self, cls):
# Check if it's a Union type first
origin = typing.get_origin(cls) if hasattr(typing, "get_origin") else getattr(cls, "__origin__", None)
if origin is typing.Union:
# Extract alternative types from Union
args = typing.get_args(cls) if hasattr(typing, "get_args") else getattr(cls, "__args__", ())
# Filter out NoneType as it's handled separately via ref tracking
alternative_types = [arg for arg in args if arg is not type(None)]
if len(alternative_types) == 0:
# Union with only None is equivalent to NoneType
return NoneSerializer(self.fory)
elif len(alternative_types) == 1:
# Optional[T] should use the serializer for T
return self.get_serializer(alternative_types[0])
else:
# Real union with multiple alternatives
return UnionSerializer(self.fory, cls, alternative_types)
for clz in cls.__mro__:
type_info = self._types_info.get(clz)
if type_info and type_info.serializer and type_info.serializer.support_subclass():
serializer = type(type_info.serializer)(self.fory, cls)
break
else:
if cls is types.FunctionType:
# Use FunctionSerializer for function types (including lambdas)
serializer = FunctionSerializer(self.fory, cls)
elif dataclasses.is_dataclass(cls):
# lazy create serializer to handle nested struct fields.
from pyfory.struct import DataClassStubSerializer
serializer = DataClassStubSerializer(self.fory, cls, xlang=not self.fory.is_py)
elif issubclass(cls, enum.Enum):
serializer = EnumSerializer(self.fory, cls)
elif ("builtin_function_or_method" in str(cls) or "cython_function_or_method" in str(cls)) and "<locals>" not in str(cls):
serializer = NativeFuncMethodSerializer(self.fory, cls)
elif cls is type(self.initialize):
# Handle bound method objects
serializer = MethodSerializer(self.fory, cls)
elif issubclass(cls, type):
# Handle Python type objects and metaclass such as numpy._DTypeMeta(i.e. np.dtype)
serializer = TypeSerializer(self.fory, cls)
elif cls is array.array:
# Handle array.array objects with DynamicPyArraySerializer
# Note: This will use DynamicPyArraySerializer for all array.array objects
serializer = DynamicPyArraySerializer(self.fory, cls)
elif (hasattr(cls, "__reduce__") and cls.__reduce__ is not object.__reduce__) or (
hasattr(cls, "__reduce_ex__") and cls.__reduce_ex__ is not object.__reduce_ex__
):
# Use ReduceSerializer for objects that have custom __reduce__ or __reduce_ex__ methods
# This has higher precedence than StatefulSerializer and ObjectSerializer
# Only use it for objects with custom reduce methods, not default ones from the object
serializer = ReduceSerializer(self.fory, cls)
elif hasattr(cls, "__getstate__") and hasattr(cls, "__setstate__"):
# Use StatefulSerializer for objects that support __getstate__ and __setstate__
serializer = StatefulSerializer(self.fory, cls)
elif hasattr(cls, "__dict__") or hasattr(cls, "__slots__"):
serializer = ObjectSerializer(self.fory, cls)
else:
# c-extension types will go to here
serializer = UnsupportedSerializer(self.fory, cls)
return serializer
def is_registered_by_name(self, cls):
typeinfo = self._types_info.get(cls)
if typeinfo is None:
return False
return TypeId.is_namespaced_type(typeinfo.type_id & 0xFF)
def is_registered_by_id(self, cls=None, type_id=None):
if cls is not None:
typeinfo = self._types_info.get(cls)
if typeinfo is None:
return False
return not TypeId.is_namespaced_type(typeinfo.type_id & 0xFF)
else:
return type_id in self._type_id_to_typeinfo
def get_registered_name(self, cls):
typeinfo = self._types_info.get(cls)
assert typeinfo is not None, f"{cls} not registered"
return typeinfo.decode_namespace(), typeinfo.decode_typename()
def get_registered_id(self, cls):
typeinfo = self._types_info.get(cls)
assert typeinfo is not None, f"{cls} not registered"
return typeinfo.type_id
def _load_metabytes_to_typeinfo(self, ns_metabytes, type_metabytes):
typeinfo = self._ns_type_to_typeinfo.get((ns_metabytes, type_metabytes))
if typeinfo is not None:
return typeinfo
ns = ns_metabytes.decode(self.namespace_decoder)
typename = type_metabytes.decode(self.typename_decoder)
# the hash computed between languages may be different.
typeinfo = self._named_type_to_typeinfo.get((ns, typename))
if typeinfo is not None:
self._ns_type_to_typeinfo[(ns_metabytes, type_metabytes)] = typeinfo
return typeinfo
cls = load_class(ns + "#" + typename)
typeinfo = self.get_typeinfo(cls)
self._ns_type_to_typeinfo[(ns_metabytes, type_metabytes)] = typeinfo
return typeinfo
def write_typeinfo(self, buffer, typeinfo):
if typeinfo.dynamic_type:
return
# Check if meta share is enabled first
if self.meta_share:
self.write_shared_type_meta(buffer, typeinfo)
return
type_id = typeinfo.type_id
internal_type_id = type_id & 0xFF
buffer.write_varuint32(type_id)
if TypeId.is_namespaced_type(internal_type_id):
self.metastring_resolver.write_meta_string_bytes(buffer, typeinfo.namespace_bytes)
self.metastring_resolver.write_meta_string_bytes(buffer, typeinfo.typename_bytes)
def read_typeinfo(self, buffer):
# Check if meta share is enabled first
if self.meta_share:
return self.read_shared_type_meta(buffer)
type_id = buffer.read_varuint32()
internal_type_id = type_id & 0xFF
if TypeId.is_namespaced_type(internal_type_id):
ns_metabytes = self.metastring_resolver.read_meta_string_bytes(buffer)
type_metabytes = self.metastring_resolver.read_meta_string_bytes(buffer)
typeinfo = self._ns_type_to_typeinfo.get((ns_metabytes, type_metabytes))
if typeinfo is None:
ns = ns_metabytes.decode(self.namespace_decoder)
typename = type_metabytes.decode(self.typename_decoder)
typeinfo = self._named_type_to_typeinfo.get((ns, typename))
if typeinfo is not None:
self._ns_type_to_typeinfo[(ns_metabytes, type_metabytes)] = typeinfo
return typeinfo
name = ns + "." + typename if ns else typename
raise TypeUnregisteredError(f"{name} not registered")
return typeinfo
else:
return self._type_id_to_typeinfo.get(type_id)
def get_typeinfo_by_id(self, type_id):
"""Get typeinfo by type_id. Never returns None.
For unknown ENUM types, returns NonExistEnum typeinfo.
For other unknown types, raises TypeUnregisteredError.
"""
typeinfo = self._type_id_to_typeinfo.get(type_id)
if typeinfo is not None:
return typeinfo
base_type_id = type_id & 0xFF
if base_type_id == TypeId.ENUM:
return self._get_nonexist_enum_typeinfo()
raise TypeUnregisteredError(f"type id {type_id} (base {base_type_id}) not registered")
def _get_nonexist_enum_typeinfo(self):
"""Get or create TypeInfo for NonExistEnum to handle unknown enum types."""
from pyfory.serializer import NonExistEnum, NonExistEnumSerializer
typeinfo = self._types_info.get(NonExistEnum)
if typeinfo is None:
serializer = NonExistEnumSerializer(self.fory)
typeinfo = TypeInfo(NonExistEnum, TypeId.ENUM, serializer, None, None, False)
self._types_info[NonExistEnum] = typeinfo
return typeinfo
def get_typeinfo_by_name(self, namespace, typename):
"""Get typeinfo by namespace and typename."""
return self._named_type_to_typeinfo.get((namespace, typename))
def get_meta_compressor(self):
return self.meta_compressor
def write_shared_type_meta(self, buffer, typeinfo):
"""Write shared type meta information."""
meta_context = self.fory.serialization_context.meta_context
meta_context.write_shared_typeinfo(buffer, typeinfo)
def read_shared_type_meta(self, buffer):
"""Read shared type meta information."""
meta_context = self.serialization_context.meta_context
assert meta_context is not None, "Meta context must be set when meta share is enabled"
return meta_context.read_shared_typeinfo(buffer)
def write_type_defs(self, buffer):
"""Write all type definitions that need to be sent."""
meta_context = self.fory.serialization_context.meta_context
if meta_context is None:
return
writing_type_defs = meta_context.get_writing_type_defs()
buffer.write_varuint32(len(writing_type_defs))
for type_def in writing_type_defs:
# Just copy the encoded bytes directly
buffer.write_bytes(type_def.encoded)
def read_type_defs(self, buffer):
"""Read all type definitions from the buffer."""
meta_context = self.fory.serialization_context.meta_context
if meta_context is None:
return
num_type_defs = buffer.read_varuint32()
for _ in range(num_type_defs):
# Read the header (first 8 bytes) to get the type ID
header = buffer.read_int64()
# Check if we already have this TypeDef cached
type_info = self._meta_shared_typeinfo.get(header)
if type_info is not None:
# Skip the rest of the TypeDef binary for faster performance
skip_typedef(buffer, header)
else:
# Read the TypeDef and create TypeInfo
type_def = decode_typedef(buffer, self, header=header)
type_info = self._build_type_info_from_typedef(type_def)
# Cache the tuple for future use
self._meta_shared_typeinfo[header] = type_info
meta_context.add_read_typeinfo(type_info)
def _build_type_info_from_typedef(self, type_def):
"""Build TypeInfo from TypeDef using TypeDef's create_serializer method."""
# Create serializer using TypeDef's create_serializer method
serializer = type_def.create_serializer(self)
ns_metastr = self.namespace_encoder.encode(type_def.namespace or "")
ns_meta_bytes = self.metastring_resolver.get_metastr_bytes(ns_metastr)
type_metastr = self.typename_encoder.encode(type_def.typename)
type_meta_bytes = self.metastring_resolver.get_metastr_bytes(type_metastr)
typeinfo = TypeInfo(
type_def.cls,
type_def.type_id,
serializer,
ns_meta_bytes,
type_meta_bytes,
False,
type_def,
)
return typeinfo
def reset(self):
pass
def reset_read(self):
pass
def reset_write(self):
pass