blob: 75dc39a1b648202da54306a6b160ec750252f42a [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from pyfory.context import EncodedMetaString, EMPTY_ENCODED_META_STRING
from pyfory.resolver import NULL_FLAG, REF_FLAG, NOT_NULL_VALUE_FLAG, REF_VALUE_FLAG
INT64_TYPE_ID = TypeId.VARINT64
FLOAT64_TYPE_ID = TypeId.FLOAT64
BOOL_TYPE_ID = TypeId.BOOL
STRING_TYPE_ID = TypeId.STRING
SMALL_STRING_THRESHOLD = 16
cdef inline uint64_t _mix64(uint64_t x):
x ^= x >> 33
x *= <uint64_t> 0xff51afd7ed558ccd
x ^= x >> 33
x *= <uint64_t> 0xc4ceb9fe1a85ec53
x ^= x >> 33
return x
cdef inline int64_t _hash_small_metastring(
int64_t v1,
int64_t v2,
int32_t length,
uint8_t encoding,
):
cdef uint64_t k = <uint64_t> 0x9e3779b97f4a7c15
cdef uint64_t x = (<uint64_t> v1) ^ ((<uint64_t> v2) * k)
x ^= (<uint64_t> length) << 56
cdef uint64_t h = _mix64(x)
h = (h & <uint64_t> 0xffffffffffffff00) | encoding
return <int64_t> h
cdef class WriteContext
cdef class ReadContext
@cython.final
cdef class RefWriter:
cdef flat_hash_map[uint64_t, int32_t] written_objects_id
cdef vector[PyObject *] written_objects
cdef readonly bint track_ref
def __cinit__(self, bint track_ref):
self.track_ref = track_ref
def __dealloc__(self):
self.reset()
cdef inline bint write_ref_or_null(self, CBuffer * c_buffer, obj):
cdef uint64_t object_id
cdef int32_t next_id
cdef flat_hash_map[uint64_t, int32_t].iterator it
if not self.track_ref:
if obj is None:
deref(c_buffer).write_int8(NULL_FLAG)
return True
deref(c_buffer).write_int8(NOT_NULL_VALUE_FLAG)
return False
if obj is None:
deref(c_buffer).write_int8(NULL_FLAG)
return True
object_id = <uintptr_t> <PyObject *> obj
it = self.written_objects_id.find(object_id)
if it == self.written_objects_id.end():
next_id = self.written_objects_id.size()
self.written_objects_id[object_id] = next_id
self.written_objects.push_back(<PyObject *> obj)
Py_INCREF(obj)
deref(c_buffer).write_int8(REF_VALUE_FLAG)
return False
deref(c_buffer).write_int8(REF_FLAG)
deref(c_buffer).write_var_uint32(<uint64_t> deref(it).second)
return True
cdef inline bint write_ref_value_flag(self, CBuffer * c_buffer, obj):
cdef uint64_t object_id
cdef int32_t next_id
cdef flat_hash_map[uint64_t, int32_t].iterator it
assert obj is not None
if not self.track_ref:
deref(c_buffer).write_int8(NOT_NULL_VALUE_FLAG)
return True
object_id = <uintptr_t> <PyObject *> obj
it = self.written_objects_id.find(object_id)
if it == self.written_objects_id.end():
next_id = self.written_objects_id.size()
self.written_objects_id[object_id] = next_id
self.written_objects.push_back(<PyObject *> obj)
Py_INCREF(obj)
deref(c_buffer).write_int8(REF_VALUE_FLAG)
return True
deref(c_buffer).write_int8(REF_FLAG)
deref(c_buffer).write_var_uint32(<uint64_t> deref(it).second)
return False
cdef inline bint write_null_flag(self, CBuffer * c_buffer, obj):
if obj is None:
deref(c_buffer).write_int8(NULL_FLAG)
return True
return False
cpdef inline reset(self):
cdef PyObject *item
if not self.track_ref:
return
self.written_objects_id.clear()
for item in self.written_objects:
Py_XDECREF(item)
self.written_objects.clear()
@cython.final
cdef class RefReader:
cdef vector[PyObject *] read_objects
cdef vector[int32_t] read_ref_ids
# Keep the last resolved reference in its own object slot so hot REF_FLAG
# reads can return it directly instead of probing read_objects again.
cdef object read_object
cdef readonly bint track_ref
def __cinit__(self, bint track_ref):
self.track_ref = track_ref
self.read_object = None
def __dealloc__(self):
self.reset()
cdef inline int32_t read_ref_or_null(self, Buffer buffer):
cdef int8_t head_flag = buffer.c_buffer.read_int8(buffer._error)
cdef int32_t ref_id
cdef int32_t size
cdef PyObject *obj
if not self.track_ref:
return head_flag
if head_flag == REF_FLAG:
ref_id = buffer.c_buffer.read_var_uint32(buffer._error)
size = self.read_objects.size()
if ref_id < 0 or ref_id >= size:
raise ValueError(f"Invalid ref id {ref_id}, current size {size}")
obj = self.read_objects[ref_id]
if obj == NULL:
raise ValueError(f"Invalid ref id {ref_id}, current size {size}")
self.read_object = <object>obj
return REF_FLAG
self.read_object = None
return head_flag
cdef inline int32_t preserve_next_ref_id(self):
cdef int32_t ref_id
if not self.track_ref:
return -1
ref_id = self.read_objects.size()
self.read_objects.push_back(NULL)
self.read_ref_ids.push_back(ref_id)
return ref_id
cdef inline int32_t preserve_ref_id(self, int32_t ref_id):
if not self.track_ref:
return -1
self.read_ref_ids.push_back(ref_id)
return ref_id
cdef inline int32_t try_preserve_ref_id(self, Buffer buffer):
cdef int8_t head_flag
cdef int32_t ref_id
cdef int32_t size
cdef PyObject *obj
if not self.track_ref:
return buffer.c_buffer.read_int8(buffer._error)
head_flag = buffer.c_buffer.read_int8(buffer._error)
if head_flag == REF_FLAG:
ref_id = buffer.c_buffer.read_var_uint32(buffer._error)
size = self.read_objects.size()
if ref_id < 0 or ref_id >= size:
raise ValueError(f"Invalid ref id {ref_id}, current size {size}")
obj = self.read_objects[ref_id]
if obj == NULL:
raise ValueError(f"Invalid ref id {ref_id}, current size {size}")
self.read_object = <object>obj
return head_flag
self.read_object = None
if head_flag == REF_VALUE_FLAG:
return self.preserve_next_ref_id()
self.read_ref_ids.push_back(-1)
return head_flag
cdef inline int32_t last_preserved_ref_id(self):
cdef int32_t length
if not self.track_ref:
return -1
length = self.read_ref_ids.size()
assert length > 0
return self.read_ref_ids[length - 1]
cdef inline bint has_preserved_ref_id(self):
if not self.track_ref:
return False
return self.read_ref_ids.size() != 0
cdef inline reference(self, obj):
cdef int32_t ref_id
cdef bint need_inc
if not self.track_ref:
return
ref_id = self.read_ref_ids.back()
self.read_ref_ids.pop_back()
if ref_id < 0:
return
need_inc = self.read_objects[ref_id] == NULL
if need_inc:
Py_INCREF(obj)
self.read_objects[ref_id] = <PyObject *>obj
cdef inline get_read_ref(self, id_=None):
cdef int32_t ref_id
cdef int32_t size
cdef PyObject *obj
if not self.track_ref:
return None
if id_ is None:
return self.read_object
ref_id = id_
size = self.read_objects.size()
if ref_id < 0 or ref_id >= size:
raise ValueError(f"Invalid ref id {ref_id}, current size {size}")
obj = self.read_objects[ref_id]
if obj == NULL:
raise ValueError(f"Invalid ref id {ref_id}, current size {size}")
return <object> obj
cdef inline set_read_ref(self, int32_t ref_id, obj):
if not self.track_ref:
return
if ref_id >= 0:
# ref_id < 0 is the NOT_NULL_VALUE_FLAG sentinel path and has no
# slot in read_objects. Referenceable containers/structs populate
# their slot eagerly through reference(), so the follow-up store here
# should only fill slots that are still empty.
if self.read_objects[ref_id] == NULL:
Py_INCREF(obj)
self.read_objects[ref_id] = <PyObject *>obj
cpdef inline reset(self):
cdef PyObject *item
if self.track_ref:
for item in self.read_objects:
Py_XDECREF(item)
self.read_objects.clear()
self.read_ref_ids.clear()
self.read_object = None
@cython.final
cdef class MetaStringWriter:
cdef flat_hash_map[uint64_t, int32_t] _written_encoded_meta_strings
cdef vector[PyObject *] _written_objects
def __dealloc__(self):
self.reset()
cpdef inline write_encoded_meta_string(self, Buffer buffer, encoded_meta_string):
cdef uint64_t object_id = <uintptr_t> <PyObject *> encoded_meta_string
cdef int32_t length = encoded_meta_string.length
cdef int32_t dynamic_id
cdef flat_hash_map[uint64_t, int32_t].iterator it = self._written_encoded_meta_strings.find(object_id)
if it == self._written_encoded_meta_strings.end():
dynamic_id = self._written_encoded_meta_strings.size()
self._written_encoded_meta_strings[object_id] = dynamic_id
self._written_objects.push_back(<PyObject *> encoded_meta_string)
Py_INCREF(encoded_meta_string)
buffer.write_var_uint32(length << 1)
if length <= SMALL_STRING_THRESHOLD:
if length != 0:
buffer.write_int8(encoded_meta_string.encoding)
else:
buffer.write_int64(encoded_meta_string.hashcode)
buffer.write_bytes(encoded_meta_string.data)
return
buffer.write_var_uint32(((deref(it).second + 1) << 1) | 1)
cpdef inline reset(self):
cdef PyObject *item
self._written_encoded_meta_strings.clear()
for item in self._written_objects:
Py_XDECREF(item)
self._written_objects.clear()
@cython.final
cdef class MetaStringReader:
cdef object shared_registry
cdef vector[PyObject *] _c_dynamic_id_to_encoded_meta_string_vec
cdef flat_hash_map[int64_t, PyObject *] _c_hash_to_encoded_meta_string
cdef flat_hash_map[int64_t, PyObject *] _c_hash_to_small_encoded_meta_string
def __init__(self, shared_registry):
self.shared_registry = shared_registry
cpdef inline read_encoded_meta_string(self, Buffer buffer):
cdef int32_t header = buffer.read_var_uint32()
cdef int32_t length = header >> 1
cdef int64_t v1 = 0
cdef int64_t v2 = 0
cdef int64_t hashcode
cdef PyObject *encoded_meta_string_ptr
cdef int32_t reader_index
cdef int8_t encoding = 0
cdef bytes data
cdef object encoded_meta_string
if header & 0b1:
if length <= 0:
raise ValueError("Invalid dynamic metastring id 0")
return <object> self._c_dynamic_id_to_encoded_meta_string_vec[length - 1]
if length <= SMALL_STRING_THRESHOLD:
if length == 0:
encoded_meta_string_ptr = <PyObject *> EMPTY_ENCODED_META_STRING
self._c_dynamic_id_to_encoded_meta_string_vec.push_back(encoded_meta_string_ptr)
return <object> encoded_meta_string_ptr
encoding = buffer.read_int8()
if length <= 8:
v1 = buffer.read_bytes_as_int64(length)
else:
v1 = buffer.read_int64()
v2 = buffer.read_bytes_as_int64(length - 8)
hashcode = _hash_small_metastring(v1, v2, length, <uint8_t> encoding)
encoded_meta_string_ptr = self._c_hash_to_small_encoded_meta_string[hashcode]
if encoded_meta_string_ptr == NULL:
reader_index = buffer.get_reader_index()
data = buffer.get_bytes(reader_index - length, length)
encoded_meta_string = self.shared_registry.get_or_create_encoded_meta_string(
data,
hashcode,
)
encoded_meta_string_ptr = <PyObject *> encoded_meta_string
self._c_hash_to_small_encoded_meta_string[hashcode] = encoded_meta_string_ptr
else:
hashcode = buffer.read_int64()
reader_index = buffer.get_reader_index()
buffer.check_bound(reader_index, length)
buffer.set_reader_index(reader_index + length)
encoded_meta_string_ptr = self._c_hash_to_encoded_meta_string[hashcode]
if encoded_meta_string_ptr == NULL:
data = buffer.get_bytes(reader_index, length)
encoded_meta_string = self.shared_registry.get_or_create_encoded_meta_string(
data,
hashcode,
)
encoded_meta_string_ptr = <PyObject *> encoded_meta_string
self._c_hash_to_encoded_meta_string[hashcode] = encoded_meta_string_ptr
self._c_dynamic_id_to_encoded_meta_string_vec.push_back(encoded_meta_string_ptr)
return <object> encoded_meta_string_ptr
cpdef inline reset(self):
self._c_dynamic_id_to_encoded_meta_string_vec.clear()
@cython.final
cdef class MetaShareWriteContext:
cdef flat_hash_map[uint64_t, int32_t] class_map
cpdef inline reset(self):
self.class_map.clear()
@cython.final
cdef class MetaShareReadContext:
cdef public list read_type_infos
def __init__(self):
self.read_type_infos = []
cpdef inline reset(self):
self.read_type_infos.clear()
@cython.final
cdef class WriteContext:
"""
Per-operation serialization state for the active Cython runtime.
"""
cdef readonly TypeResolver type_resolver
cdef readonly bint xlang
cdef readonly bint track_ref
cdef readonly bint strict
cdef readonly bint compatible
cdef readonly bint field_nullable
cdef readonly object policy
cdef readonly int32_t max_collection_size
cdef readonly int32_t max_binary_size
cdef readonly RefWriter ref_writer
cdef readonly MetaStringWriter meta_string_writer
cdef readonly MetaShareWriteContext meta_share_context
cdef public Buffer buffer
cdef CBuffer * c_buffer
cdef public object buffer_callback
cdef public object unsupported_callback
cdef dict context_objects
cdef public int32_t depth
def __init__(self, Config config, TypeResolver type_resolver):
self.type_resolver = type_resolver
self.xlang = config.xlang
self.track_ref = config.track_ref
self.strict = config.strict
self.compatible = config.compatible
self.field_nullable = config.field_nullable
self.policy = config.policy
self.max_collection_size = config.max_collection_size
self.max_binary_size = config.max_binary_size
self.ref_writer = RefWriter(self.track_ref)
self.meta_string_writer = MetaStringWriter()
self.meta_share_context = MetaShareWriteContext() if config.scoped_meta_share_enabled else None
self.buffer = None
self.c_buffer = NULL
self.buffer_callback = None
self.unsupported_callback = None
self.context_objects = {}
self.depth = 0
cpdef inline prepare(self, Buffer buffer, buffer_callback=None, unsupported_callback=None):
self.buffer = buffer
self.c_buffer = buffer.c_buffer
self.buffer_callback = buffer_callback
self.unsupported_callback = unsupported_callback
self.depth = 0
cpdef inline reset(self):
self.ref_writer.reset()
self.meta_string_writer.reset()
if self.meta_share_context is not None:
self.meta_share_context.reset()
if self.context_objects:
self.context_objects.clear()
self.buffer = None
self.c_buffer = NULL
self.buffer_callback = None
self.unsupported_callback = None
self.depth = 0
cpdef inline add_context_object(self, key, obj):
self.context_objects[id(key)] = obj
cpdef inline bint has_context_object(self, key):
return id(key) in self.context_objects
cpdef inline get_context_object(self, key, default=None):
return self.context_objects.get(id(key), default)
cpdef inline increase_depth(self, int32_t diff=1):
self.depth += diff
cpdef inline decrease_depth(self, int32_t diff=1):
self.depth -= diff
cpdef inline bint write_ref_or_null(self, obj):
return self.ref_writer.write_ref_or_null(self.c_buffer, obj)
cpdef inline bint write_ref_value_flag(self, obj):
return self.ref_writer.write_ref_value_flag(self.c_buffer, obj)
cpdef inline bint write_null_flag(self, obj):
return self.ref_writer.write_null_flag(self.c_buffer, obj)
cpdef inline write_ref(self, obj, TypeInfo typeinfo=None, Serializer serializer=None):
if serializer is None and typeinfo is not None:
serializer = typeinfo.serializer
if serializer is None or serializer.need_to_write_ref:
if self.ref_writer.write_ref_or_null(self.c_buffer, obj):
return
self.write_non_ref(obj, serializer=serializer, typeinfo=typeinfo)
return
if obj is None:
self.write_int8(NULL_FLAG)
return
self.write_int8(NOT_NULL_VALUE_FLAG)
self.write_non_ref(obj, serializer=serializer, typeinfo=typeinfo)
cpdef inline write_non_ref(self, obj, Serializer serializer=None, TypeInfo typeinfo=None):
cdef object cls
cdef TypeInfo c_typeinfo
if serializer is not None:
serializer.write(self, obj)
return
cls = type(obj)
if cls is str:
self.write_uint8(STRING_TYPE_ID)
self.buffer.write_string(obj)
return
if cls is int:
self.write_uint8(INT64_TYPE_ID)
self.write_varint64(obj)
return
if cls is bool:
self.write_uint8(BOOL_TYPE_ID)
self.write_bool(obj)
return
if cls is float:
self.write_uint8(FLOAT64_TYPE_ID)
self.write_double(obj)
return
if typeinfo is None:
typeinfo = self.type_resolver.get_type_info(cls)
c_typeinfo = <TypeInfo> typeinfo
self.type_resolver.write_type_info(self, c_typeinfo)
c_typeinfo.serializer.write(self, obj)
cpdef inline write_no_ref(self, obj, Serializer serializer=None, TypeInfo typeinfo=None):
self.write_non_ref(obj, serializer=serializer, typeinfo=typeinfo)
cpdef write_buffer_object(self, buffer_object):
cdef int32_t size
cdef int32_t writer_index
cdef Buffer buf
if self.buffer_callback is None:
size = buffer_object.total_bytes()
self.write_var_uint32(size)
writer_index = self.buffer.get_writer_index()
self.buffer.ensure(writer_index + size)
buf = self.buffer.slice(writer_index, size)
buffer_object.write_to(buf)
self.buffer.set_writer_index(writer_index + size)
return
if self.buffer_callback(buffer_object):
self.write_bool(True)
size = buffer_object.total_bytes()
self.write_var_uint32(size)
writer_index = self.buffer.get_writer_index()
self.buffer.ensure(writer_index + size)
buf = self.buffer.slice(writer_index, size)
buffer_object.write_to(buf)
self.buffer.set_writer_index(writer_index + size)
return
self.write_bool(False)
cpdef handle_unsupported_write(self, obj):
if self.unsupported_callback is None or self.unsupported_callback(obj):
raise NotImplementedError(f"{type(obj)} is not supported for write")
cpdef enter_flush_barrier(self):
cdef object output_stream = None if self.buffer is None else self.buffer.get_output_stream()
if output_stream is not None:
output_stream.enter_flush_barrier()
cpdef exit_flush_barrier(self):
cdef object output_stream = None if self.buffer is None else self.buffer.get_output_stream()
if output_stream is not None:
output_stream.exit_flush_barrier()
cpdef try_flush(self):
cdef object output_stream
if self.buffer is None or self.buffer.get_writer_index() <= 4096:
return
output_stream = self.buffer.get_output_stream()
if output_stream is not None:
output_stream.try_flush()
cpdef force_flush(self):
cdef object output_stream
if self.buffer is None:
return
output_stream = self.buffer.get_output_stream()
if output_stream is not None:
output_stream.force_flush()
cpdef inline void write_bool(self, bint value):
self.c_buffer.write_uint8(<uint8_t> value)
cpdef inline void write_int8(self, int8_t value):
self.c_buffer.write_int8(value)
cpdef inline void write_uint8(self, uint8_t value):
self.c_buffer.write_uint8(value)
cpdef inline void write_int16(self, int16_t value):
self.c_buffer.write_int16(value)
cpdef inline void write_uint16(self, uint16_t value):
self.c_buffer.write_uint16(value)
cpdef inline void write_int32(self, int32_t value):
self.c_buffer.write_int32(value)
cpdef inline void write_uint32(self, uint32_t value):
self.c_buffer.write_uint32(value)
cpdef inline void write_int64(self, int64_t value):
self.c_buffer.write_int64(value)
cpdef inline void write_uint64(self, uint64_t value):
self.c_buffer.write_int64(<int64_t> value)
cpdef inline void write_varint32(self, int32_t value):
self.c_buffer.write_var_int32(value)
cpdef inline void write_var_uint32(self, uint32_t value):
self.c_buffer.write_var_uint32(value)
cpdef inline void write_varint64(self, int64_t value):
self.c_buffer.write_var_int64(value)
cpdef inline void write_var_uint64(self, uint64_t value):
self.c_buffer.write_var_uint64(value)
cpdef inline void write_tagged_int64(self, int64_t value):
self.c_buffer.write_tagged_int64(value)
cpdef inline void write_tagged_uint64(self, uint64_t value):
self.c_buffer.write_tagged_uint64(value)
cpdef inline void write_float(self, float value):
self.c_buffer.write_float(value)
cpdef inline void write_float32(self, float value):
self.c_buffer.write_float(value)
cpdef inline void write_double(self, double value):
self.c_buffer.write_double(value)
cpdef inline void write_float64(self, double value):
self.c_buffer.write_double(value)
cpdef write_string(self, str value):
self.buffer.write_string(value)
cpdef write_bytes(self, bytes value):
self.buffer.write_bytes(value)
cpdef inline void write_bytes_and_size(self, bytes value):
self.buffer.write_bytes_and_size(value)
cpdef write_buffer(self, value, src_index=0, length_=None):
self.buffer.write_buffer(value, src_index=src_index, length_=length_)
cpdef inline int32_t get_writer_index(self):
return self.buffer.get_writer_index()
cpdef inline put_uint8(self, uint32_t offset, uint8_t value):
self.buffer.put_uint8(offset, value)
@cython.final
cdef class ReadContext:
"""
Per-operation deserialization state for the active Cython runtime.
"""
cdef readonly TypeResolver type_resolver
cdef readonly bint xlang
cdef readonly bint track_ref
cdef readonly bint strict
cdef readonly bint compatible
cdef readonly bint field_nullable
cdef readonly object policy
cdef readonly int32_t max_depth
cdef readonly int32_t max_collection_size
cdef readonly int32_t max_binary_size
cdef readonly RefReader ref_reader
cdef readonly MetaStringReader meta_string_reader
cdef readonly MetaShareReadContext meta_share_context
cdef public Buffer buffer
cdef CBuffer * c_buffer
cdef public object buffers
cdef public object unsupported_objects
cdef public bint peer_out_of_band_enabled
cdef dict context_objects
cdef public int32_t depth
def __init__(self, Config config, TypeResolver type_resolver):
self.type_resolver = type_resolver
self.xlang = config.xlang
self.track_ref = config.track_ref
self.strict = config.strict
self.compatible = config.compatible
self.field_nullable = config.field_nullable
self.policy = config.policy
self.max_depth = config.max_depth
self.max_collection_size = config.max_collection_size
self.max_binary_size = config.max_binary_size
self.ref_reader = RefReader(self.track_ref)
self.meta_string_reader = MetaStringReader(self.type_resolver.shared_registry)
self.meta_share_context = MetaShareReadContext() if config.scoped_meta_share_enabled else None
self.buffer = None
self.c_buffer = NULL
self.buffers = None
self.unsupported_objects = None
self.peer_out_of_band_enabled = False
self.context_objects = {}
self.depth = 0
cpdef inline prepare(
self,
Buffer buffer,
buffers=None,
unsupported_objects=None,
bint peer_out_of_band_enabled=False,
):
self.buffer = buffer
self.c_buffer = buffer.c_buffer
self.buffers = iter(buffers) if buffers is not None else None
self.unsupported_objects = iter(unsupported_objects) if unsupported_objects is not None else None
self.peer_out_of_band_enabled = peer_out_of_band_enabled
self.depth = 0
cpdef inline reset(self):
self.ref_reader.reset()
self.meta_string_reader.reset()
if self.meta_share_context is not None:
self.meta_share_context.reset()
if self.context_objects:
self.context_objects.clear()
self.buffer = None
self.c_buffer = NULL
self.buffers = None
self.unsupported_objects = None
self.peer_out_of_band_enabled = False
self.depth = 0
cpdef inline add_context_object(self, key, obj):
self.context_objects[id(key)] = obj
cpdef inline bint has_context_object(self, key):
return id(key) in self.context_objects
cpdef inline get_context_object(self, key, default=None):
return self.context_objects.get(id(key), default)
cpdef inline increase_depth(self, int32_t diff=1):
# Depth accounting is paired on the successful path only.
# If a nested read raises, the top-level deserialize/reset path clears
# `depth`, so nested readers must not add local try/finally wrappers
# around increase/decrease pairs.
self.depth += diff
if self.depth > self.max_depth:
raise Exception(
f"Read depth exceed max depth: {self.depth}, the deserialization data may be malicious. "
"If it's not malicious, please increase max read depth by Fory(..., max_depth=...)"
)
cpdef inline decrease_depth(self, int32_t diff=1):
# Only call this after the matching nested read completed successfully.
self.depth -= diff
cpdef inline int32_t read_ref_or_null(self):
return self.ref_reader.read_ref_or_null(self.buffer)
cpdef inline int32_t preserve_ref_id(self, ref_id=None):
if ref_id is None:
return self.ref_reader.preserve_next_ref_id()
return self.ref_reader.preserve_ref_id(ref_id)
cpdef inline int32_t try_preserve_ref_id(self):
return self.ref_reader.try_preserve_ref_id(self.buffer)
cpdef inline int32_t last_preserved_ref_id(self):
return self.ref_reader.last_preserved_ref_id()
cpdef inline bint has_preserved_ref_id(self):
return self.ref_reader.has_preserved_ref_id()
cpdef inline reference(self, obj):
self.ref_reader.reference(obj)
cpdef inline get_read_ref(self, ref_id=None):
return self.ref_reader.get_read_ref(ref_id)
cpdef inline set_read_ref(self, int32_t ref_id, obj):
self.ref_reader.set_read_ref(ref_id, obj)
cpdef inline read_ref(self, Serializer serializer=None):
cdef int32_t ref_id
cdef TypeInfo typeinfo
cdef uint8_t type_id
cdef object obj
cdef Buffer buffer = self.buffer
if serializer is None or serializer.need_to_write_ref:
ref_id = self.ref_reader.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
return self.ref_reader.get_read_ref()
if serializer is None:
typeinfo = self.type_resolver.read_type_info(self)
type_id = typeinfo.type_id
if type_id == STRING_TYPE_ID:
obj = self.buffer.read_string()
if ref_id >= 0 and self.ref_reader.read_objects[ref_id] == NULL:
Py_INCREF(obj)
self.ref_reader.read_objects[ref_id] = <PyObject *>obj
return obj
if type_id == INT64_TYPE_ID:
obj = self.read_varint64()
if ref_id >= 0 and self.ref_reader.read_objects[ref_id] == NULL:
Py_INCREF(obj)
self.ref_reader.read_objects[ref_id] = <PyObject *>obj
return obj
if type_id == BOOL_TYPE_ID:
obj = self.read_bool()
if ref_id >= 0 and self.ref_reader.read_objects[ref_id] == NULL:
Py_INCREF(obj)
self.ref_reader.read_objects[ref_id] = <PyObject *>obj
return obj
if type_id == FLOAT64_TYPE_ID:
obj = self.read_double()
if ref_id >= 0 and self.ref_reader.read_objects[ref_id] == NULL:
Py_INCREF(obj)
self.ref_reader.read_objects[ref_id] = <PyObject *>obj
return obj
serializer = typeinfo.serializer
obj = self._read_non_ref_internal(serializer)
if ref_id >= 0 and self.ref_reader.read_objects[ref_id] == NULL:
Py_INCREF(obj)
self.ref_reader.read_objects[ref_id] = <PyObject *>obj
return obj
if self.read_int8() == NULL_FLAG:
return None
return self._read_non_ref_internal(serializer)
cpdef inline read_non_ref(self, Serializer serializer=None):
if self.track_ref:
self.ref_reader.read_ref_ids.push_back(-1)
return self._read_non_ref_internal(serializer)
cpdef inline read_no_ref(self, Serializer serializer=None):
return self.read_non_ref(serializer=serializer)
cpdef inline read_nullable(self, Serializer serializer=None):
if self.read_int8() == NULL_FLAG:
return None
return self._read_non_ref_internal(serializer)
cdef inline object _read_non_ref_internal(self, Serializer serializer=None):
cdef TypeInfo typeinfo
cdef uint8_t type_id
cdef object obj
if serializer is None:
typeinfo = self.type_resolver.read_type_info(self)
type_id = typeinfo.type_id
if type_id == STRING_TYPE_ID:
return self.buffer.read_string()
if type_id == INT64_TYPE_ID:
return self.read_varint64()
if type_id == BOOL_TYPE_ID:
return self.read_bool()
if type_id == FLOAT64_TYPE_ID:
return self.read_double()
serializer = typeinfo.serializer
self.increase_depth()
obj = serializer.read(self)
self.decrease_depth()
return obj
cpdef read_buffer_object(self):
cdef int32_t size
cdef int32_t reader_index
cdef Buffer buf
if not self.peer_out_of_band_enabled:
size = self.read_var_uint32()
if self.buffer.has_input_stream():
return self.buffer.read_bytes(size)
reader_index = self.buffer.get_reader_index()
buf = self.buffer.slice(reader_index, size)
self.buffer.set_reader_index(reader_index + size)
return buf
if not self.read_bool():
assert self.buffers is not None
return next(self.buffers)
size = self.read_var_uint32()
if self.buffer.has_input_stream():
return self.buffer.read_bytes(size)
reader_index = self.buffer.get_reader_index()
buf = self.buffer.slice(reader_index, size)
self.buffer.set_reader_index(reader_index + size)
return buf
cpdef handle_unsupported_read(self):
assert self.unsupported_objects is not None
return next(self.unsupported_objects)
cpdef inline bint read_bool(self):
cdef Buffer buffer = self.buffer
cdef uint8_t value = self.c_buffer.read_uint8(buffer._error)
if not buffer._error.ok():
buffer._raise_if_error()
return value != 0
cpdef inline uint8_t read_uint8(self):
cdef Buffer buffer = self.buffer
cdef uint8_t value = self.c_buffer.read_uint8(buffer._error)
if not buffer._error.ok():
buffer._raise_if_error()
return value
cpdef inline int8_t read_int8(self):
cdef Buffer buffer = self.buffer
cdef int8_t value = self.c_buffer.read_int8(buffer._error)
if not buffer._error.ok():
buffer._raise_if_error()
return value
cpdef inline int16_t read_int16(self):
cdef Buffer buffer = self.buffer
cdef int16_t value = self.c_buffer.read_int16(buffer._error)
if not buffer._error.ok():
buffer._raise_if_error()
return value
cpdef inline uint16_t read_uint16(self):
cdef Buffer buffer = self.buffer
cdef uint16_t value = self.c_buffer.read_uint16(buffer._error)
if not buffer._error.ok():
buffer._raise_if_error()
return value
cpdef inline int32_t read_int32(self):
cdef Buffer buffer = self.buffer
cdef int32_t value = self.c_buffer.read_int32(buffer._error)
if not buffer._error.ok():
buffer._raise_if_error()
return value
cpdef inline uint32_t read_uint32(self):
cdef Buffer buffer = self.buffer
cdef uint32_t value = self.c_buffer.read_uint32(buffer._error)
if not buffer._error.ok():
buffer._raise_if_error()
return value
cpdef inline int64_t read_int64(self):
cdef Buffer buffer = self.buffer
cdef int64_t value = self.c_buffer.read_int64(buffer._error)
if not buffer._error.ok():
buffer._raise_if_error()
return value
cpdef inline uint64_t read_uint64(self):
cdef Buffer buffer = self.buffer
cdef uint64_t value = self.c_buffer.read_uint64(buffer._error)
if not buffer._error.ok():
buffer._raise_if_error()
return value
cpdef inline int32_t read_varint32(self):
cdef Buffer buffer = self.buffer
cdef int32_t value = self.c_buffer.read_var_int32(buffer._error)
if not buffer._error.ok():
buffer._raise_if_error()
return value
cpdef inline uint32_t read_var_uint32(self):
cdef Buffer buffer = self.buffer
cdef uint32_t value = self.c_buffer.read_var_uint32(buffer._error)
if not buffer._error.ok():
buffer._raise_if_error()
return value
cpdef inline int64_t read_varint64(self):
cdef Buffer buffer = self.buffer
cdef int64_t value = self.c_buffer.read_var_int64(buffer._error)
if not buffer._error.ok():
buffer._raise_if_error()
return value
cpdef inline uint64_t read_var_uint64(self):
cdef Buffer buffer = self.buffer
cdef uint64_t value = self.c_buffer.read_var_uint64(buffer._error)
if not buffer._error.ok():
buffer._raise_if_error()
return value
cpdef inline int64_t read_tagged_int64(self):
cdef Buffer buffer = self.buffer
cdef int64_t value = self.c_buffer.read_tagged_int64(buffer._error)
if not buffer._error.ok():
buffer._raise_if_error()
return value
cpdef inline uint64_t read_tagged_uint64(self):
cdef Buffer buffer = self.buffer
cdef uint64_t value = self.c_buffer.read_tagged_uint64(buffer._error)
if not buffer._error.ok():
buffer._raise_if_error()
return value
cpdef inline float read_float(self):
cdef Buffer buffer = self.buffer
cdef float value = self.c_buffer.read_float(buffer._error)
if not buffer._error.ok():
buffer._raise_if_error()
return value
cpdef inline float read_float32(self):
return self.read_float()
cpdef inline double read_double(self):
cdef Buffer buffer = self.buffer
cdef double value = self.c_buffer.read_double(buffer._error)
if not buffer._error.ok():
buffer._raise_if_error()
return value
cpdef inline double read_float64(self):
return self.read_double()
cpdef read_string(self):
return self.buffer.read_string()
cpdef read_bytes(self, int32_t length):
return self.buffer.read_bytes(length)
cpdef read_bytes_and_size(self):
return self.buffer.read_bytes_and_size()
cpdef inline int32_t get_reader_index(self):
return self.buffer.get_reader_index()
cpdef inline set_reader_index(self, int32_t reader_index):
self.buffer.set_reader_index(reader_index)
cpdef shrink_input_buffer(self):
self.buffer.shrink_input_buffer()