| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| """ |
| Collection serialization format: |
| https://fory.apache.org/docs/specification/fory_xlang_serialization_spec/#list |
| Has the following changes: |
| * None has an independent type, so COLL_NOT_SAME_TYPE can also cover the concept of being nullable. |
| * No flag is needed to indicate that the element type is not the declared type. |
| """ |
| cdef int8_t COLL_DEFAULT_FLAG = 0b0 |
| cdef int8_t COLL_TRACKING_REF = 0b1 |
| cdef int8_t COLL_HAS_NULL = 0b10 |
| cdef int8_t COLL_IS_DECL_ELEMENT_TYPE = 0b100 |
| cdef int8_t COLL_IS_SAME_TYPE = 0b1000 |
| cdef int8_t COLL_DECL_SAME_TYPE_TRACKING_REF = COLL_IS_DECL_ELEMENT_TYPE | COLL_IS_SAME_TYPE | COLL_TRACKING_REF |
| cdef int8_t COLL_DECL_SAME_TYPE_NOT_TRACKING_REF = COLL_IS_DECL_ELEMENT_TYPE | COLL_IS_SAME_TYPE |
| cdef int8_t COLL_DECL_SAME_TYPE_HAS_NULL = COLL_IS_DECL_ELEMENT_TYPE | COLL_IS_SAME_TYPE | COLL_HAS_NULL |
| cdef int8_t COLL_DECL_SAME_TYPE_NOT_HAS_NULL = COLL_IS_DECL_ELEMENT_TYPE | COLL_IS_SAME_TYPE |
| |
| |
| cdef class CollectionSerializer(Serializer): |
| cdef TypeResolver type_resolver |
| cdef MapRefResolver ref_resolver |
| cdef Serializer elem_serializer |
| cdef c_bool is_py |
| cdef int8_t elem_tracking_ref |
| cdef elem_type |
| cdef TypeInfo elem_typeinfo |
| |
| def __init__(self, fory, type_, elem_serializer=None): |
| super().__init__(fory, type_) |
| self.type_resolver = fory.type_resolver |
| self.ref_resolver = fory.ref_resolver |
| self.elem_serializer = elem_serializer |
| if elem_serializer is None: |
| self.elem_type = None |
| self.elem_typeinfo = self.type_resolver.get_typeinfo(None) |
| self.elem_tracking_ref = -1 |
| else: |
| self.elem_type = elem_serializer.type_ |
| self.elem_typeinfo = fory.type_resolver.get_typeinfo(self.elem_type) |
| self.elem_tracking_ref = <int8_t> (elem_serializer.need_to_write_ref) |
| self.is_py = fory.is_py |
| |
| cdef inline pair[int8_t, int64_t] write_header(self, Buffer buffer, value): |
| cdef int8_t collect_flag = COLL_DEFAULT_FLAG |
| elem_type = self.elem_type |
| cdef TypeInfo elem_typeinfo = self.elem_typeinfo |
| cdef c_bool has_null = False |
| cdef c_bool has_same_type = True |
| if elem_type is None: |
| for s in value: |
| if not has_null and s is None: |
| has_null = True |
| continue |
| if elem_type is None: |
| elem_type = type(s) |
| elif has_same_type and type(s) is not elem_type: |
| has_same_type = False |
| if has_same_type: |
| collect_flag |= COLL_IS_SAME_TYPE |
| elem_typeinfo = self.type_resolver.get_typeinfo(elem_type) |
| else: |
| collect_flag |= COLL_IS_DECL_ELEMENT_TYPE | COLL_IS_SAME_TYPE |
| for s in value: |
| if s is None: |
| has_null = True |
| break |
| if has_null: |
| collect_flag |= COLL_HAS_NULL |
| if self.fory.ref_tracking: |
| if self.elem_tracking_ref == 1: |
| collect_flag |= COLL_TRACKING_REF |
| elif self.elem_tracking_ref == -1: |
| if not has_same_type or elem_typeinfo.serializer.need_to_write_ref: |
| collect_flag |= COLL_TRACKING_REF |
| buffer.write_varuint32(len(value)) |
| buffer.write_int8(collect_flag) |
| if (has_same_type and |
| collect_flag & COLL_IS_DECL_ELEMENT_TYPE == 0): |
| self.type_resolver.write_typeinfo(buffer, elem_typeinfo) |
| return pair[int8_t, int64_t](collect_flag, obj2int(elem_typeinfo)) |
| |
| cpdef write(self, Buffer buffer, value): |
| if len(value) == 0: |
| buffer.write_varuint64(0) |
| return |
| cdef pair[int8_t, int64_t] header_pair = self.write_header(buffer, value) |
| cdef int8_t collect_flag = header_pair.first |
| cdef int64_t elem_typeinfo_ptr = header_pair.second |
| cdef TypeInfo elem_typeinfo = <type> int2obj(elem_typeinfo_ptr) |
| cdef elem_type = elem_typeinfo.cls |
| cdef MapRefResolver ref_resolver = self.ref_resolver |
| cdef TypeResolver type_resolver = self.type_resolver |
| cdef c_bool is_py = self.is_py |
| cdef serializer = type(elem_typeinfo.serializer) |
| cdef c_bool tracking_ref |
| cdef c_bool has_null |
| if (collect_flag & COLL_IS_SAME_TYPE) != 0: |
| if (collect_flag & COLL_HAS_NULL) == 0: |
| if elem_type is str: |
| self._write_string(buffer, value) |
| elif serializer is Int64Serializer: |
| self._write_int(buffer, value) |
| elif elem_type is bool: |
| self._write_bool(buffer, value) |
| elif serializer is Float64Serializer: |
| self._write_float(buffer, value) |
| elif (collect_flag & COLL_TRACKING_REF) == 0: |
| self._write_same_type_no_ref(buffer, value, elem_typeinfo) |
| else: |
| self._write_same_type_ref(buffer, value, elem_typeinfo) |
| elif (collect_flag & COLL_TRACKING_REF) != 0: |
| self._write_same_type_ref(buffer, value, elem_typeinfo) |
| else: |
| self._write_same_type_has_null(buffer, value, elem_typeinfo) |
| else: |
| # Check tracking_ref and has_null flags for different types writing |
| tracking_ref = (collect_flag & COLL_TRACKING_REF) != 0 |
| has_null = (collect_flag & COLL_HAS_NULL) != 0 |
| if tracking_ref: |
| # When ref tracking is enabled, write with ref handling |
| for s in value: |
| cls = type(s) |
| if cls is str: |
| buffer.write_int16(NOT_NULL_STRING_FLAG) |
| buffer.write_string(s) |
| elif cls is int: |
| buffer.write_int16(NOT_NULL_INT64_FLAG) |
| buffer.write_varint64(s) |
| elif cls is bool: |
| buffer.write_int16(NOT_NULL_BOOL_FLAG) |
| buffer.write_bool(s) |
| elif cls is float: |
| buffer.write_int16(NOT_NULL_FLOAT64_FLAG) |
| buffer.write_double(s) |
| else: |
| if not ref_resolver.write_ref_or_null(buffer, s): |
| typeinfo = type_resolver.get_typeinfo(cls) |
| type_resolver.write_typeinfo(buffer, typeinfo) |
| if is_py: |
| typeinfo.serializer.write(buffer, s) |
| else: |
| typeinfo.serializer.xwrite(buffer, s) |
| elif not has_null: |
| # When ref tracking is disabled and no nulls, write type info directly |
| for s in value: |
| cls = type(s) |
| typeinfo = type_resolver.get_typeinfo(cls) |
| type_resolver.write_typeinfo(buffer, typeinfo) |
| if is_py: |
| typeinfo.serializer.write(buffer, s) |
| else: |
| typeinfo.serializer.xwrite(buffer, s) |
| else: |
| # When ref tracking is disabled but has nulls, write null flag first |
| for s in value: |
| if s is None: |
| buffer.write_int8(NULL_FLAG) |
| else: |
| buffer.write_int8(NOT_NULL_VALUE_FLAG) |
| cls = type(s) |
| typeinfo = type_resolver.get_typeinfo(cls) |
| type_resolver.write_typeinfo(buffer, typeinfo) |
| if is_py: |
| typeinfo.serializer.write(buffer, s) |
| else: |
| typeinfo.serializer.xwrite(buffer, s) |
| |
| cdef inline _write_string(self, Buffer buffer, value): |
| for s in value: |
| buffer.write_string(s) |
| |
| cdef inline _read_string(self, Buffer buffer, int64_t len_, object collection_): |
| for i in range(len_): |
| self._add_element(collection_, i, buffer.read_string()) |
| |
| cdef inline _write_int(self, Buffer buffer, value): |
| for s in value: |
| buffer.write_varint64(s) |
| |
| cdef inline _read_int(self, Buffer buffer, int64_t len_, object collection_): |
| for i in range(len_): |
| self._add_element(collection_, i, buffer.read_varint64()) |
| |
| cdef inline _write_bool(self, Buffer buffer, value): |
| value_type = type(value) |
| if value_type is list or value_type is tuple: |
| size = sizeof(bool) * Py_SIZE(value) |
| buffer.grow(<int32_t>size) |
| Fory_PyBooleanSequenceWriteToBuffer(value, buffer.c_buffer.get(), buffer.writer_index) |
| buffer.writer_index += size |
| else: |
| for s in value: |
| buffer.write_bool(s) |
| |
| cdef inline _read_bool(self, Buffer buffer, int64_t len_, object collection_): |
| for i in range(len_): |
| self._add_element(collection_, i, buffer.read_bool()) |
| |
| cdef inline _write_float(self, Buffer buffer, value): |
| value_type = type(value) |
| if value_type is list or value_type is tuple: |
| size = sizeof(double) * Py_SIZE(value) |
| buffer.grow(<int32_t>size) |
| Fory_PyFloatSequenceWriteToBuffer(value, buffer.c_buffer.get(), buffer.writer_index) |
| buffer.writer_index += size |
| else: |
| for s in value: |
| buffer.write_double(s) |
| |
| cdef inline _read_float(self, Buffer buffer, int64_t len_, object collection_): |
| for i in range(len_): |
| self._add_element(collection_, i, buffer.read_double()) |
| |
| cpdef _write_same_type_no_ref(self, Buffer buffer, value, TypeInfo typeinfo): |
| cdef MapRefResolver ref_resolver = self.ref_resolver |
| cdef TypeResolver type_resolver = self.type_resolver |
| if self.is_py: |
| for s in value: |
| typeinfo.serializer.write(buffer, s) |
| else: |
| for s in value: |
| typeinfo.serializer.xwrite(buffer, s) |
| |
| cpdef _read_same_type_no_ref(self, Buffer buffer, int64_t len_, object collection_, TypeInfo typeinfo): |
| cdef MapRefResolver ref_resolver = self.ref_resolver |
| cdef TypeResolver type_resolver = self.type_resolver |
| self.fory.inc_depth() |
| if self.is_py: |
| for i in range(len_): |
| obj = typeinfo.serializer.read(buffer) |
| self._add_element(collection_, i, obj) |
| else: |
| for i in range(len_): |
| obj = typeinfo.serializer.xread(buffer) |
| self._add_element(collection_, i, obj) |
| self.fory.dec_depth() |
| |
| cpdef _write_same_type_has_null(self, Buffer buffer, value, TypeInfo typeinfo): |
| if self.is_py: |
| for s in value: |
| if s is None: |
| buffer.write_int8(NULL_FLAG) |
| else: |
| buffer.write_int8(NOT_NULL_VALUE_FLAG) |
| typeinfo.serializer.write(buffer, s) |
| else: |
| for s in value: |
| if s is None: |
| buffer.write_int8(NULL_FLAG) |
| else: |
| buffer.write_int8(NOT_NULL_VALUE_FLAG) |
| typeinfo.serializer.xwrite(buffer, s) |
| |
| cpdef _read_same_type_has_null(self, Buffer buffer, int64_t len_, object collection_, TypeInfo typeinfo): |
| cdef int8_t flag |
| self.fory.inc_depth() |
| if self.is_py: |
| for i in range(len_): |
| flag = buffer.read_int8() |
| if flag == NULL_FLAG: |
| self._add_element(collection_, i, None) |
| else: |
| self._add_element(collection_, i, typeinfo.serializer.read(buffer)) |
| else: |
| for i in range(len_): |
| flag = buffer.read_int8() |
| if flag == NULL_FLAG: |
| self._add_element(collection_, i, None) |
| else: |
| self._add_element(collection_, i, typeinfo.serializer.xread(buffer)) |
| self.fory.dec_depth() |
| |
| cpdef _write_same_type_ref(self, Buffer buffer, value, TypeInfo typeinfo): |
| cdef MapRefResolver ref_resolver = self.ref_resolver |
| cdef TypeResolver type_resolver = self.type_resolver |
| if self.is_py: |
| for s in value: |
| if not ref_resolver.write_ref_or_null(buffer, s): |
| typeinfo.serializer.write(buffer, s) |
| else: |
| for s in value: |
| if not ref_resolver.write_ref_or_null(buffer, s): |
| typeinfo.serializer.xwrite(buffer, s) |
| |
| cpdef _read_same_type_ref(self, Buffer buffer, int64_t len_, object collection_, TypeInfo typeinfo): |
| cdef MapRefResolver ref_resolver = self.ref_resolver |
| cdef TypeResolver type_resolver = self.type_resolver |
| cdef c_bool is_py = self.is_py |
| self.fory.inc_depth() |
| for i in range(len_): |
| ref_id = ref_resolver.try_preserve_ref_id(buffer) |
| if ref_id < NOT_NULL_VALUE_FLAG: |
| obj = ref_resolver.get_read_object() |
| else: |
| if is_py: |
| obj = typeinfo.serializer.read(buffer) |
| else: |
| obj = typeinfo.serializer.xread(buffer) |
| ref_resolver.set_read_object(ref_id, obj) |
| self._add_element(collection_, i, obj) |
| self.fory.dec_depth() |
| |
| cpdef _add_element(self, object collection_, int64_t index, object element): |
| raise NotImplementedError |
| |
| cpdef xwrite(self, Buffer buffer, value): |
| self.write(buffer, value) |
| |
| cdef class ListSerializer(CollectionSerializer): |
| cpdef read(self, Buffer buffer): |
| cdef MapRefResolver ref_resolver = self.fory.ref_resolver |
| cdef TypeResolver type_resolver = self.fory.type_resolver |
| cdef int32_t len_ = buffer.read_varuint32() |
| cdef list list_ = PyList_New(len_) |
| if len_ == 0: |
| return list_ |
| cdef int8_t collect_flag = buffer.read_int8() |
| ref_resolver.reference(list_) |
| cdef c_bool is_py = self.is_py |
| cdef TypeInfo typeinfo |
| cdef int32_t type_id = -1 |
| cdef c_bool tracking_ref |
| cdef c_bool has_null |
| cdef int8_t head_flag |
| if (collect_flag & COLL_IS_SAME_TYPE) != 0: |
| if collect_flag & COLL_IS_DECL_ELEMENT_TYPE == 0: |
| typeinfo = self.type_resolver.read_typeinfo(buffer) |
| else: |
| typeinfo = self.elem_typeinfo |
| if (collect_flag & COLL_HAS_NULL) == 0: |
| type_id = typeinfo.type_id |
| if type_id == <int32_t>TypeId.STRING: |
| self._read_string(buffer, len_, list_) |
| return list_ |
| elif type_id == <int32_t>TypeId.VARINT64: |
| self._read_int(buffer, len_, list_) |
| return list_ |
| elif type_id == <int32_t>TypeId.BOOL: |
| self._read_bool(buffer, len_, list_) |
| return list_ |
| elif type_id == <int32_t>TypeId.FLOAT64: |
| self._read_float(buffer, len_, list_) |
| return list_ |
| elif (collect_flag & COLL_TRACKING_REF) == 0: |
| self._read_same_type_no_ref(buffer, len_, list_, typeinfo) |
| else: |
| self._read_same_type_ref(buffer, len_, list_, typeinfo) |
| elif (collect_flag & COLL_TRACKING_REF) != 0: |
| self._read_same_type_ref(buffer, len_, list_, typeinfo) |
| else: |
| self._read_same_type_has_null(buffer, len_, list_, typeinfo) |
| else: |
| self.fory.inc_depth() |
| # Check tracking_ref and has_null flags for different types handling |
| tracking_ref = (collect_flag & COLL_TRACKING_REF) != 0 |
| has_null = (collect_flag & COLL_HAS_NULL) != 0 |
| if tracking_ref: |
| # When ref tracking is enabled, read with ref handling |
| for i in range(len_): |
| elem = get_next_element(buffer, ref_resolver, type_resolver, is_py) |
| Py_INCREF(elem) |
| PyList_SET_ITEM(list_, i, elem) |
| elif not has_null: |
| # When ref tracking is disabled and no nulls, read type info directly |
| for i in range(len_): |
| typeinfo = type_resolver.read_typeinfo(buffer) |
| if is_py: |
| elem = typeinfo.serializer.read(buffer) |
| else: |
| elem = typeinfo.serializer.xread(buffer) |
| Py_INCREF(elem) |
| PyList_SET_ITEM(list_, i, elem) |
| else: |
| # When ref tracking is disabled but has nulls, read null flag first |
| for i in range(len_): |
| head_flag = buffer.read_int8() |
| if head_flag == NULL_FLAG: |
| elem = None |
| else: |
| typeinfo = type_resolver.read_typeinfo(buffer) |
| if is_py: |
| elem = typeinfo.serializer.read(buffer) |
| else: |
| elem = typeinfo.serializer.xread(buffer) |
| Py_INCREF(elem) |
| PyList_SET_ITEM(list_, i, elem) |
| self.fory.dec_depth() |
| return list_ |
| |
| cpdef _add_element(self, object collection_, int64_t index, object element): |
| Py_INCREF(element) |
| PyList_SET_ITEM(collection_, index, element) |
| |
| cpdef xread(self, Buffer buffer): |
| return self.read(buffer) |
| |
| cdef inline get_next_element( |
| Buffer buffer, |
| MapRefResolver ref_resolver, |
| TypeResolver type_resolver, |
| c_bool is_py, |
| ): |
| cdef int32_t ref_id |
| cdef TypeInfo typeinfo |
| ref_id = ref_resolver.try_preserve_ref_id(buffer) |
| if ref_id < NOT_NULL_VALUE_FLAG: |
| return ref_resolver.get_read_object() |
| # indicates that the object is first read. |
| typeinfo = type_resolver.read_typeinfo(buffer) |
| cdef int32_t type_id = typeinfo.type_id |
| # Note that all read operations in fast paths of list/tuple/set/dict/sub_dict |
| # must match corresponding writing operations. Otherwise, ref tracking will |
| # error. |
| if type_id == <int32_t>TypeId.STRING: |
| return buffer.read_string() |
| elif type_id == <int32_t>TypeId.VARINT32: |
| return buffer.read_varint64() |
| elif type_id == <int32_t>TypeId.BOOL: |
| return buffer.read_bool() |
| elif type_id == <int32_t>TypeId.FLOAT64: |
| return buffer.read_double() |
| else: |
| if is_py: |
| o = typeinfo.serializer.read(buffer) |
| else: |
| o = typeinfo.serializer.xread(buffer) |
| ref_resolver.set_read_object(ref_id, o) |
| return o |
| |
| |
| @cython.final |
| cdef class TupleSerializer(CollectionSerializer): |
| cpdef inline read(self, Buffer buffer): |
| cdef MapRefResolver ref_resolver = self.fory.ref_resolver |
| cdef TypeResolver type_resolver = self.fory.type_resolver |
| cdef int32_t len_ = buffer.read_varuint32() |
| cdef tuple tuple_ = PyTuple_New(len_) |
| if len_ == 0: |
| return tuple_ |
| cdef int8_t collect_flag = buffer.read_int8() |
| cdef c_bool is_py = self.is_py |
| cdef TypeInfo typeinfo |
| cdef int32_t type_id = -1 |
| cdef c_bool tracking_ref |
| cdef c_bool has_null |
| cdef int8_t head_flag |
| if (collect_flag & COLL_IS_SAME_TYPE) != 0: |
| if collect_flag & COLL_IS_DECL_ELEMENT_TYPE == 0: |
| typeinfo = self.type_resolver.read_typeinfo(buffer) |
| else: |
| typeinfo = self.elem_typeinfo |
| if (collect_flag & COLL_HAS_NULL) == 0: |
| type_id = typeinfo.type_id |
| if type_id == <int32_t>TypeId.STRING: |
| self._read_string(buffer, len_, tuple_) |
| return tuple_ |
| if type_id == <int32_t>TypeId.VARINT64: |
| self._read_int(buffer, len_, tuple_) |
| return tuple_ |
| if type_id == <int32_t>TypeId.BOOL: |
| self._read_bool(buffer, len_, tuple_) |
| return tuple_ |
| if type_id == <int32_t>TypeId.FLOAT64: |
| self._read_float(buffer, len_, tuple_) |
| return tuple_ |
| elif (collect_flag & COLL_TRACKING_REF) == 0: |
| self._read_same_type_no_ref(buffer, len_, tuple_, typeinfo) |
| else: |
| self._read_same_type_ref(buffer, len_, tuple_, typeinfo) |
| elif (collect_flag & COLL_TRACKING_REF) != 0: |
| self._read_same_type_ref(buffer, len_, tuple_, typeinfo) |
| else: |
| self._read_same_type_has_null(buffer, len_, tuple_, typeinfo) |
| else: |
| self.fory.inc_depth() |
| # Check tracking_ref and has_null flags for different types handling |
| tracking_ref = (collect_flag & COLL_TRACKING_REF) != 0 |
| has_null = (collect_flag & COLL_HAS_NULL) != 0 |
| if tracking_ref: |
| # When ref tracking is enabled, read with ref handling |
| for i in range(len_): |
| elem = get_next_element(buffer, ref_resolver, type_resolver, is_py) |
| Py_INCREF(elem) |
| PyTuple_SET_ITEM(tuple_, i, elem) |
| elif not has_null: |
| # When ref tracking is disabled and no nulls, read type info directly |
| for i in range(len_): |
| typeinfo = type_resolver.read_typeinfo(buffer) |
| if is_py: |
| elem = typeinfo.serializer.read(buffer) |
| else: |
| elem = typeinfo.serializer.xread(buffer) |
| Py_INCREF(elem) |
| PyTuple_SET_ITEM(tuple_, i, elem) |
| else: |
| # When ref tracking is disabled but has nulls, read null flag first |
| for i in range(len_): |
| head_flag = buffer.read_int8() |
| if head_flag == NULL_FLAG: |
| elem = None |
| else: |
| typeinfo = type_resolver.read_typeinfo(buffer) |
| if is_py: |
| elem = typeinfo.serializer.read(buffer) |
| else: |
| elem = typeinfo.serializer.xread(buffer) |
| Py_INCREF(elem) |
| PyTuple_SET_ITEM(tuple_, i, elem) |
| self.fory.dec_depth() |
| return tuple_ |
| |
| cpdef inline _add_element(self, object collection_, int64_t index, object element): |
| Py_INCREF(element) |
| PyTuple_SET_ITEM(collection_, index, element) |
| |
| cpdef inline xread(self, Buffer buffer): |
| return self.read(buffer) |
| |
| |
| @cython.final |
| cdef class StringArraySerializer(ListSerializer): |
| def __init__(self, fory, type_): |
| super().__init__(fory, type_, StringSerializer(fory, str)) |
| |
| |
| @cython.final |
| cdef class SetSerializer(CollectionSerializer): |
| cpdef inline read(self, Buffer buffer): |
| cdef MapRefResolver ref_resolver = self.fory.ref_resolver |
| cdef TypeResolver type_resolver = self.fory.type_resolver |
| cdef set instance = set() |
| ref_resolver.reference(instance) |
| cdef int32_t len_ = buffer.read_varuint32() |
| if len_ == 0: |
| return instance |
| cdef int8_t collect_flag = buffer.read_int8() |
| cdef int32_t ref_id |
| cdef TypeInfo typeinfo |
| cdef int32_t type_id = -1 |
| cdef c_bool is_py = self.is_py |
| cdef c_bool tracking_ref |
| cdef c_bool has_null |
| cdef int8_t head_flag |
| if (collect_flag & COLL_IS_SAME_TYPE) != 0: |
| if collect_flag & COLL_IS_DECL_ELEMENT_TYPE == 0: |
| typeinfo = self.type_resolver.read_typeinfo(buffer) |
| else: |
| typeinfo = self.elem_typeinfo |
| if (collect_flag & COLL_HAS_NULL) == 0: |
| type_id = typeinfo.type_id |
| if type_id == <int32_t>TypeId.STRING: |
| self._read_string(buffer, len_, instance) |
| return instance |
| if type_id == <int32_t>TypeId.VARINT64: |
| self._read_int(buffer, len_, instance) |
| return instance |
| if type_id == <int32_t>TypeId.BOOL: |
| self._read_bool(buffer, len_, instance) |
| return instance |
| if type_id == <int32_t>TypeId.FLOAT64: |
| self._read_float(buffer, len_, instance) |
| return instance |
| elif (collect_flag & COLL_TRACKING_REF) == 0: |
| self._read_same_type_no_ref(buffer, len_, instance, typeinfo) |
| else: |
| self._read_same_type_ref(buffer, len_, instance, typeinfo) |
| elif (collect_flag & COLL_TRACKING_REF) != 0: |
| self._read_same_type_ref(buffer, len_, instance, typeinfo) |
| else: |
| self._read_same_type_has_null(buffer, len_, instance, typeinfo) |
| else: |
| self.fory.inc_depth() |
| # Check tracking_ref and has_null flags for different types handling |
| tracking_ref = (collect_flag & COLL_TRACKING_REF) != 0 |
| has_null = (collect_flag & COLL_HAS_NULL) != 0 |
| if tracking_ref: |
| # When ref tracking is enabled, read with ref handling |
| for i in range(len_): |
| ref_id = ref_resolver.try_preserve_ref_id(buffer) |
| if ref_id < NOT_NULL_VALUE_FLAG: |
| instance.add(ref_resolver.get_read_object()) |
| continue |
| # indicates that the object is first read. |
| typeinfo = type_resolver.read_typeinfo(buffer) |
| type_id = typeinfo.type_id |
| if type_id == <int32_t>TypeId.STRING: |
| instance.add(buffer.read_string()) |
| elif type_id == <int32_t>TypeId.VARINT64: |
| instance.add(buffer.read_varint64()) |
| elif type_id == <int32_t>TypeId.BOOL: |
| instance.add(buffer.read_bool()) |
| elif type_id == <int32_t>TypeId.FLOAT64: |
| instance.add(buffer.read_double()) |
| else: |
| if is_py: |
| o = typeinfo.serializer.read(buffer) |
| else: |
| o = typeinfo.serializer.xread(buffer) |
| ref_resolver.set_read_object(ref_id, o) |
| instance.add(o) |
| elif not has_null: |
| # When ref tracking is disabled and no nulls, read type info directly |
| for i in range(len_): |
| typeinfo = type_resolver.read_typeinfo(buffer) |
| type_id = typeinfo.type_id |
| if type_id == <int32_t>TypeId.STRING: |
| instance.add(buffer.read_string()) |
| elif type_id == <int32_t>TypeId.VARINT64: |
| instance.add(buffer.read_varint64()) |
| elif type_id == <int32_t>TypeId.BOOL: |
| instance.add(buffer.read_bool()) |
| elif type_id == <int32_t>TypeId.FLOAT64: |
| instance.add(buffer.read_double()) |
| else: |
| if is_py: |
| instance.add(typeinfo.serializer.read(buffer)) |
| else: |
| instance.add(typeinfo.serializer.xread(buffer)) |
| else: |
| # When ref tracking is disabled but has nulls, read null flag first |
| for i in range(len_): |
| head_flag = buffer.read_int8() |
| if head_flag == NULL_FLAG: |
| instance.add(None) |
| else: |
| typeinfo = type_resolver.read_typeinfo(buffer) |
| type_id = typeinfo.type_id |
| if type_id == <int32_t>TypeId.STRING: |
| instance.add(buffer.read_string()) |
| elif type_id == <int32_t>TypeId.VARINT64: |
| instance.add(buffer.read_varint64()) |
| elif type_id == <int32_t>TypeId.BOOL: |
| instance.add(buffer.read_bool()) |
| elif type_id == <int32_t>TypeId.FLOAT64: |
| instance.add(buffer.read_double()) |
| else: |
| if is_py: |
| instance.add(typeinfo.serializer.read(buffer)) |
| else: |
| instance.add(typeinfo.serializer.xread(buffer)) |
| self.fory.dec_depth() |
| return instance |
| |
| cpdef inline _add_element(self, object collection_, int64_t index, object element): |
| collection_.add(element) |
| |
| cpdef inline xread(self, Buffer buffer): |
| return self.read(buffer) |
| |
| |
| cdef int32_t MAX_CHUNK_SIZE = 255 |
| # Whether track key ref. |
| cdef int32_t TRACKING_KEY_REF = 0b1 |
| # Whether key has null. |
| cdef int32_t KEY_HAS_NULL = 0b10 |
| # Whether key is not declare type. |
| cdef int32_t KEY_DECL_TYPE = 0b100 |
| # Whether track value ref. |
| cdef int32_t TRACKING_VALUE_REF = 0b1000 |
| # Whether value has null. |
| cdef int32_t VALUE_HAS_NULL = 0b10000 |
| # Whether value is not declare type. |
| cdef int32_t VALUE_DECL_TYPE = 0b100000 |
| # When key or value is null that entry will be serialized as a new chunk with size 1. |
| # In such cases, chunk size will be skipped writing. |
| # Both key and value are null. |
| cdef int32_t KV_NULL = KEY_HAS_NULL | VALUE_HAS_NULL |
| # Key is null, value type is declared type, and ref tracking for value is disabled. |
| cdef int32_t NULL_KEY_VALUE_DECL_TYPE = KEY_HAS_NULL | VALUE_DECL_TYPE |
| # Key is null, value type is declared type, and ref tracking for value is enabled. |
| cdef int32_t NULL_KEY_VALUE_DECL_TYPE_TRACKING_REF =KEY_HAS_NULL | VALUE_DECL_TYPE | TRACKING_VALUE_REF |
| # Value is null, key type is declared type, and ref tracking for key is disabled. |
| cdef int32_t NULL_VALUE_KEY_DECL_TYPE = VALUE_HAS_NULL | KEY_DECL_TYPE |
| # Value is null, key type is declared type, and ref tracking for key is enabled. |
| cdef int32_t NULL_VALUE_KEY_DECL_TYPE_TRACKING_REF = VALUE_HAS_NULL | KEY_DECL_TYPE | TRACKING_KEY_REF |
| |
| |
| @cython.final |
| cdef class MapSerializer(Serializer): |
| cdef TypeResolver type_resolver |
| cdef MapRefResolver ref_resolver |
| cdef Serializer key_serializer |
| cdef Serializer value_serializer |
| cdef c_bool is_py |
| |
| def __init__(self, fory, type_, key_serializer=None, value_serializer=None): |
| super().__init__(fory, type_) |
| self.type_resolver = fory.type_resolver |
| self.ref_resolver = fory.ref_resolver |
| self.key_serializer = key_serializer |
| self.value_serializer = value_serializer |
| self.is_py = fory.is_py |
| |
| cpdef inline write(self, Buffer buffer, o): |
| cdef dict obj = o |
| cdef int32_t length = len(obj) |
| buffer.write_varuint32(length) |
| if length == 0: |
| return |
| cdef int64_t key_addr, value_addr |
| cdef Py_ssize_t pos = 0 |
| cdef Fory fory = self.fory |
| cdef TypeResolver type_resolver = fory.type_resolver |
| cdef MapRefResolver ref_resolver = fory.ref_resolver |
| cdef Serializer key_serializer = self.key_serializer |
| cdef Serializer value_serializer = self.value_serializer |
| cdef type key_cls, value_cls, key_serializer_type, value_serializer_type |
| cdef TypeInfo key_typeinfo, value_typeinfo |
| cdef int32_t chunk_size_offset, chunk_header, chunk_size |
| cdef c_bool key_write_ref, value_write_ref |
| cdef int has_next = PyDict_Next(obj, &pos, <PyObject **>&key_addr, <PyObject **>&value_addr) |
| cdef c_bool is_py = self.is_py |
| while has_next != 0: |
| key = int2obj(key_addr) |
| Py_INCREF(key) |
| value = int2obj(value_addr) |
| Py_INCREF(value) |
| while has_next != 0: |
| if key is not None: |
| if value is not None: |
| break |
| if key_serializer is not None: |
| if key_serializer.need_to_write_ref: |
| buffer.write_int8(NULL_VALUE_KEY_DECL_TYPE_TRACKING_REF) |
| if not self.ref_resolver.write_ref_or_null(buffer, key): |
| if is_py: |
| key_serializer.write(buffer, key) |
| else: |
| key_serializer.xwrite(buffer, key) |
| else: |
| buffer.write_int8(NULL_VALUE_KEY_DECL_TYPE) |
| if is_py: |
| key_serializer.write(buffer, key) |
| else: |
| key_serializer.xwrite(buffer, key) |
| else: |
| buffer.write_int8(VALUE_HAS_NULL | TRACKING_KEY_REF) |
| if is_py: |
| fory.write_ref(buffer, key) |
| else: |
| fory.xwrite_ref(buffer, key) |
| else: |
| if value is not None: |
| if value_serializer is not None: |
| if value_serializer.need_to_write_ref: |
| buffer.write_int8(NULL_KEY_VALUE_DECL_TYPE_TRACKING_REF) |
| if not self.ref_resolver.write_ref_or_null(buffer, value): |
| if is_py: |
| value_serializer.write(buffer, value) |
| else: |
| value_serializer.xwrite(buffer, value) |
| if not self.ref_resolver.write_ref_or_null(buffer, value): |
| if is_py: |
| value_serializer.write(buffer, value) |
| else: |
| value_serializer.xwrite(buffer, value) |
| else: |
| buffer.write_int8(NULL_KEY_VALUE_DECL_TYPE) |
| if is_py: |
| value_serializer.write(buffer, value) |
| else: |
| value_serializer.xwrite(buffer, value) |
| else: |
| buffer.write_int8(KEY_HAS_NULL | TRACKING_VALUE_REF) |
| if is_py: |
| fory.write_ref(buffer, value) |
| else: |
| fory.xwrite_ref(buffer, value) |
| else: |
| buffer.write_int8(KV_NULL) |
| has_next = PyDict_Next(obj, &pos, <PyObject **>&key_addr, <PyObject **>&value_addr) |
| key = int2obj(key_addr) |
| Py_INCREF(key) |
| value = int2obj(value_addr) |
| Py_INCREF(value) |
| if has_next == 0: |
| break |
| key_cls = type(key) |
| value_cls = type(value) |
| buffer.write_int16(-1) |
| chunk_size_offset = buffer.writer_index - 1 |
| chunk_header = 0 |
| if key_serializer is not None: |
| chunk_header |= KEY_DECL_TYPE |
| else: |
| key_typeinfo = self.type_resolver.get_typeinfo(key_cls) |
| type_resolver.write_typeinfo(buffer, key_typeinfo) |
| key_serializer = key_typeinfo.serializer |
| if value_serializer is not None: |
| chunk_header |= VALUE_DECL_TYPE |
| else: |
| value_typeinfo = self.type_resolver.get_typeinfo(value_cls) |
| type_resolver.write_typeinfo(buffer, value_typeinfo) |
| value_serializer = value_typeinfo.serializer |
| key_write_ref = key_serializer.need_to_write_ref |
| value_write_ref = value_serializer.need_to_write_ref |
| if key_write_ref: |
| chunk_header |= TRACKING_KEY_REF |
| if value_write_ref: |
| chunk_header |= TRACKING_VALUE_REF |
| buffer.put_int8(chunk_size_offset - 1, chunk_header) |
| key_serializer_type = type(key_serializer) |
| value_serializer_type = type(value_serializer) |
| chunk_size = 0 |
| while True: |
| if (key is None or value is None or |
| type(key) is not key_cls or type(value) is not value_cls): |
| break |
| if not key_write_ref or not ref_resolver.write_ref_or_null(buffer, key): |
| if key_cls is str: |
| buffer.write_string(key) |
| elif key_serializer_type is Int64Serializer: |
| buffer.write_varint64(key) |
| elif key_serializer_type is Float64Serializer: |
| buffer.write_double(key) |
| elif key_serializer_type is Int32Serializer: |
| buffer.write_varint32(key) |
| elif key_serializer_type is Float32Serializer: |
| buffer.write_float(key) |
| else: |
| if is_py: |
| key_serializer.write(buffer, key) |
| else: |
| key_serializer.xwrite(buffer, key) |
| if not value_write_ref or not ref_resolver.write_ref_or_null(buffer, value): |
| if value_cls is str: |
| buffer.write_string(value) |
| elif value_serializer_type is Int64Serializer: |
| buffer.write_varint64(value) |
| elif value_serializer_type is Float64Serializer: |
| buffer.write_double(value) |
| elif value_serializer_type is Int32Serializer: |
| buffer.write_varint32(value) |
| elif value_serializer_type is Float32Serializer: |
| buffer.write_float(value) |
| elif value_serializer_type is BooleanSerializer: |
| buffer.write_bool(value) |
| else: |
| if is_py: |
| value_serializer.write(buffer, value) |
| else: |
| value_serializer.xwrite(buffer, value) |
| chunk_size += 1 |
| has_next = PyDict_Next(obj, &pos, <PyObject **>&key_addr, <PyObject **>&value_addr) |
| if has_next == 0: |
| break |
| if chunk_size == MAX_CHUNK_SIZE: |
| break |
| key = int2obj(key_addr) |
| Py_INCREF(key) |
| value = int2obj(value_addr) |
| Py_INCREF(value) |
| key_serializer = self.key_serializer |
| value_serializer = self.value_serializer |
| buffer.put_int8(chunk_size_offset, chunk_size) |
| |
| cpdef inline read(self, Buffer buffer): |
| cdef Fory fory = self.fory |
| cdef MapRefResolver ref_resolver = self.ref_resolver |
| cdef TypeResolver type_resolver = self.type_resolver |
| cdef int32_t size = buffer.read_varuint32() |
| cdef dict map_ = _PyDict_NewPresized(size) |
| ref_resolver.reference(map_) |
| cdef int32_t ref_id |
| cdef TypeInfo key_typeinfo, value_typeinfo |
| cdef int32_t chunk_header = 0 |
| if size != 0: |
| chunk_header = buffer.read_uint8() |
| cdef Serializer key_serializer = self.key_serializer |
| cdef Serializer value_serializer = self.value_serializer |
| cdef c_bool key_has_null, value_has_null, track_key_ref, track_value_ref |
| cdef c_bool key_is_declared_type, value_is_declared_type |
| cdef type key_serializer_type, value_serializer_type |
| cdef int32_t chunk_size |
| cdef c_bool is_py = self.is_py |
| self.fory.inc_depth() |
| while size > 0: |
| while True: |
| key_has_null = (chunk_header & KEY_HAS_NULL) != 0 |
| value_has_null = (chunk_header & VALUE_HAS_NULL) != 0 |
| if not key_has_null: |
| if not value_has_null: |
| break |
| else: |
| track_key_ref = (chunk_header & TRACKING_KEY_REF) != 0 |
| if (chunk_header & KEY_DECL_TYPE) != 0: |
| if track_key_ref: |
| ref_id = ref_resolver.try_preserve_ref_id(buffer) |
| if ref_id < NOT_NULL_VALUE_FLAG: |
| key = ref_resolver.get_read_object() |
| else: |
| if is_py: |
| key = key_serializer.read(buffer) |
| else: |
| key = key_serializer.xread(buffer) |
| ref_resolver.set_read_object(ref_id, key) |
| else: |
| if is_py: |
| key = key_serializer.read(buffer) |
| else: |
| key = key_serializer.xread(buffer) |
| else: |
| if is_py: |
| key = fory.read_ref(buffer) |
| else: |
| key = fory.xread_ref(buffer) |
| map_[key] = None |
| else: |
| if not value_has_null: |
| track_value_ref = (chunk_header & TRACKING_VALUE_REF) != 0 |
| if (chunk_header & VALUE_DECL_TYPE) != 0: |
| if track_value_ref: |
| ref_id = ref_resolver.try_preserve_ref_id(buffer) |
| if ref_id < NOT_NULL_VALUE_FLAG: |
| value = ref_resolver.get_read_object() |
| else: |
| if is_py: |
| value = value_serializer.read(buffer) |
| else: |
| value = value_serializer.xread(buffer) |
| ref_resolver.set_read_object(ref_id, value) |
| else: |
| if is_py: |
| value = fory.read_ref(buffer) |
| else: |
| value = fory.xread_ref(buffer) |
| map_[None] = value |
| else: |
| map_[None] = None |
| size -= 1 |
| if size == 0: |
| self.fory.dec_depth() |
| return map_ |
| else: |
| chunk_header = buffer.read_uint8() |
| track_key_ref = (chunk_header & TRACKING_KEY_REF) != 0 |
| track_value_ref = (chunk_header & TRACKING_VALUE_REF) != 0 |
| key_is_declared_type = (chunk_header & KEY_DECL_TYPE) != 0 |
| value_is_declared_type = (chunk_header & VALUE_DECL_TYPE) != 0 |
| chunk_size = buffer.read_uint8() |
| if not key_is_declared_type: |
| key_serializer = type_resolver.read_typeinfo(buffer).serializer |
| if not value_is_declared_type: |
| value_serializer = type_resolver.read_typeinfo(buffer).serializer |
| key_serializer_type = type(key_serializer) |
| value_serializer_type = type(value_serializer) |
| for i in range(chunk_size): |
| if track_key_ref: |
| ref_id = ref_resolver.try_preserve_ref_id(buffer) |
| if ref_id < NOT_NULL_VALUE_FLAG: |
| key = ref_resolver.get_read_object() |
| else: |
| if is_py: |
| key = key_serializer.read(buffer) |
| else: |
| key = key_serializer.xread(buffer) |
| ref_resolver.set_read_object(ref_id, key) |
| else: |
| if key_serializer_type is StringSerializer: |
| key = buffer.read_string() |
| elif key_serializer_type is Int64Serializer: |
| key = buffer.read_varint64() |
| elif key_serializer_type is Float64Serializer: |
| key = buffer.read_double() |
| elif key_serializer_type is Int32Serializer: |
| key = buffer.read_varint32() |
| elif key_serializer_type is Float32Serializer: |
| key = buffer.read_float() |
| else: |
| if is_py: |
| key = key_serializer.read(buffer) |
| else: |
| key = key_serializer.xread(buffer) |
| if track_value_ref: |
| ref_id = ref_resolver.try_preserve_ref_id(buffer) |
| if ref_id < NOT_NULL_VALUE_FLAG: |
| value = ref_resolver.get_read_object() |
| else: |
| if is_py: |
| value = value_serializer.read(buffer) |
| else: |
| value = value_serializer.xread(buffer) |
| ref_resolver.set_read_object(ref_id, value) |
| else: |
| if value_serializer_type is StringSerializer: |
| value = buffer.read_string() |
| elif value_serializer_type is Int64Serializer: |
| value = buffer.read_varint64() |
| elif value_serializer_type is Float64Serializer: |
| value = buffer.read_double() |
| elif value_serializer_type is Int32Serializer: |
| value = buffer.read_varint32() |
| elif value_serializer_type is Float32Serializer: |
| value = buffer.read_float() |
| elif value_serializer_type is BooleanSerializer: |
| value = buffer.read_bool() |
| else: |
| if is_py: |
| value = value_serializer.read(buffer) |
| else: |
| value = value_serializer.xread(buffer) |
| map_[key] = value |
| size -= 1 |
| if size != 0: |
| chunk_header = buffer.read_uint8() |
| self.fory.dec_depth() |
| return map_ |
| |
| cpdef inline xwrite(self, Buffer buffer, o): |
| self.write(buffer, o) |
| |
| cpdef inline xread(self, Buffer buffer): |
| return self.read(buffer) |