| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| import asyncio |
| from collections import OrderedDict |
| import ctypes |
| from io import SEEK_CUR |
| from typing import Optional |
| |
| from pyignite.constants import * |
| from pyignite.exceptions import ParseError |
| from .base import IgniteDataType |
| from .internal import AnyDataObject, Struct, infer_from_python, infer_from_python_async |
| from .type_codes import * |
| from .type_ids import * |
| from .type_names import * |
| from .null_object import Null, Nullable |
| from ..stream import AioBinaryStream, BinaryStream |
| |
| __all__ = ['Map', 'ObjectArrayObject', 'CollectionObject', 'MapObject', 'WrappedDataObject', 'BinaryObject'] |
| |
| |
| class ObjectArrayObject(Nullable): |
| """ |
| Array of Ignite objects of any consistent type. Its Python representation |
| is tuple(type_id, iterable of any type). The only type ID that makes sense |
| in Python client is :py:attr:`~OBJECT`, that corresponds directly to |
| the root object type in Java type hierarchy (`java.lang.Object`). |
| """ |
| OBJECT = -1 |
| |
| _type_name = NAME_OBJ_ARR |
| _type_id = TYPE_OBJ_ARR |
| _fields = [ |
| ('type_code', ctypes.c_byte), |
| ('type_id', ctypes.c_int), |
| ('length', ctypes.c_int) |
| ] |
| type_code = TC_OBJECT_ARRAY |
| |
| @classmethod |
| def parse_not_null(cls, stream): |
| length, fields = cls.__get_length(stream), [] |
| |
| for i in range(length): |
| c_type = AnyDataObject.parse(stream) |
| fields.append((f'element_{i}', c_type)) |
| |
| return cls.__build_final_class(fields) |
| |
| @classmethod |
| async def parse_not_null_async(cls, stream): |
| length, fields = cls.__get_length(stream), [] |
| for i in range(length): |
| c_type = await AnyDataObject.parse_async(stream) |
| fields.append((f'element_{i}', c_type)) |
| |
| return cls.__build_final_class(fields) |
| |
| @classmethod |
| def __get_length(cls, stream): |
| int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte) |
| length = int.from_bytes( |
| stream.slice(stream.tell() + b_sz + int_sz, int_sz), |
| byteorder=PROTOCOL_BYTE_ORDER |
| ) |
| stream.seek(2 * int_sz + b_sz, SEEK_CUR) |
| return length |
| |
| @classmethod |
| def __build_final_class(cls, fields): |
| return type( |
| cls.__name__, |
| (ctypes.LittleEndianStructure,), |
| { |
| '_pack_': 1, |
| '_fields_': cls._fields + fields, |
| } |
| ) |
| |
| @classmethod |
| def to_python_not_null(cls, ctypes_object, **kwargs): |
| result = [] |
| for i in range(ctypes_object.length): |
| result.append( |
| AnyDataObject.to_python( |
| getattr(ctypes_object, f'element_{i}'), **kwargs |
| ) |
| ) |
| return ctypes_object.type_id, result |
| |
| @classmethod |
| async def to_python_not_null_async(cls, ctypes_object, **kwargs): |
| result = [ |
| await AnyDataObject.to_python_async( |
| getattr(ctypes_object, f'element_{i}'), **kwargs |
| ) |
| for i in range(ctypes_object.length)] |
| return ctypes_object.type_id, result |
| |
| @classmethod |
| def from_python_not_null(cls, stream, value, *args, **kwargs): |
| value = cls.__write_header(stream, value) |
| for x in value: |
| infer_from_python(stream, x) |
| |
| @classmethod |
| async def from_python_not_null_async(cls, stream, value, *args, **kwargs): |
| value = cls.__write_header(stream, value) |
| for x in value: |
| await infer_from_python_async(stream, x) |
| |
| @classmethod |
| def __write_header(cls, stream, value): |
| type_id, value = value |
| try: |
| length = len(value) |
| except TypeError: |
| value = [value] |
| length = 1 |
| |
| stream.write(cls.type_code) |
| stream.write(type_id.to_bytes(ctypes.sizeof(ctypes.c_int), byteorder=PROTOCOL_BYTE_ORDER, signed=True)) |
| stream.write(length.to_bytes(ctypes.sizeof(ctypes.c_int), byteorder=PROTOCOL_BYTE_ORDER)) |
| |
| return value |
| |
| |
| class WrappedDataObject(Nullable): |
| """ |
| One or more binary objects can be wrapped in an array. This allows reading, |
| storing, passing and writing objects efficiently without understanding |
| their contents, performing simple byte copy. |
| |
| Python representation: tuple(payload: bytes, offset: integer). Offset |
| points to the root object of the array. |
| """ |
| type_code = TC_ARRAY_WRAPPED_OBJECTS |
| |
| @classmethod |
| def parse_not_null(cls, stream): |
| int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte) |
| length = int.from_bytes( |
| stream.slice(stream.tell() + b_sz, int_sz), |
| byteorder=PROTOCOL_BYTE_ORDER |
| ) |
| |
| final_class = type( |
| cls.__name__, |
| (ctypes.LittleEndianStructure,), |
| { |
| '_pack_': 1, |
| '_fields_': [ |
| ('type_code', ctypes.c_byte), |
| ('length', ctypes.c_int), |
| ('payload', ctypes.c_byte * length), |
| ('offset', ctypes.c_int), |
| ], |
| } |
| ) |
| |
| stream.seek(ctypes.sizeof(final_class), SEEK_CUR) |
| return final_class |
| |
| @classmethod |
| def to_python_not_null(cls, ctypes_object, *args, **kwargs): |
| return bytes(ctypes_object.payload), ctypes_object.offset |
| |
| @classmethod |
| def from_python_not_null(cls, stream, value, *args, **kwargs): |
| raise ParseError('Send unwrapped data.') |
| |
| |
| class CollectionObject(Nullable): |
| """ |
| Similar to object array, but contains platform-agnostic deserialization |
| type hint instead of type ID. |
| |
| Represented as tuple(hint, iterable of any type) in Python. Hints are: |
| |
| * :py:attr:`~pyignite.datatypes.complex.CollectionObject.USER_SET` − |
| a set of unique Ignite thin data objects. The exact Java type of a set |
| is undefined, |
| * :py:attr:`~pyignite.datatypes.complex.CollectionObject.USER_COL` − |
| a collection of Ignite thin data objects. The exact Java type |
| of a collection is undefined, |
| * :py:attr:`~pyignite.datatypes.complex.CollectionObject.ARR_LIST` − |
| represents the `java.util.ArrayList` type, |
| * :py:attr:`~pyignite.datatypes.complex.CollectionObject.LINKED_LIST` − |
| represents the `java.util.LinkedList` type, |
| * :py:attr:`~pyignite.datatypes.complex.CollectionObject.HASH_SET`− |
| represents the `java.util.HashSet` type, |
| * :py:attr:`~pyignite.datatypes.complex.CollectionObject.LINKED_HASH_SET` − |
| represents the `java.util.LinkedHashSet` type, |
| * :py:attr:`~pyignite.datatypes.complex.CollectionObject.SINGLETON_LIST` − |
| represents the return type of the `java.util.Collection.singletonList` |
| method. |
| |
| It is safe to say that `USER_SET` (`set` in Python) and `USER_COL` (`list`) |
| can cover all the imaginable use cases from Python perspective. |
| """ |
| USER_SET = -1 |
| USER_COL = 0 |
| ARR_LIST = 1 |
| LINKED_LIST = 2 |
| HASH_SET = 3 |
| LINKED_HASH_SET = 4 |
| SINGLETON_LIST = 5 |
| |
| _type_name = NAME_COL |
| _type_id = TYPE_COL |
| _header_class = None |
| type_code = TC_COLLECTION |
| |
| @classmethod |
| def parse_not_null(cls, stream): |
| fields, length = cls.__parse_header(stream) |
| |
| for i in range(length): |
| c_type = AnyDataObject.parse(stream) |
| fields.append((f'element_{i}', c_type)) |
| |
| return cls.__build_final_class(fields) |
| |
| @classmethod |
| async def parse_not_null_async(cls, stream): |
| fields, length = cls.__parse_header(stream) |
| |
| for i in range(length): |
| c_type = await AnyDataObject.parse_async(stream) |
| fields.append((f'element_{i}', c_type)) |
| |
| return cls.__build_final_class(fields) |
| |
| @classmethod |
| def __parse_header(cls, stream): |
| int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte) |
| header_fields = [('type_code', ctypes.c_byte), ('length', ctypes.c_int), ('type', ctypes.c_byte)] |
| length = int.from_bytes( |
| stream.slice(stream.tell() + b_sz, int_sz), |
| byteorder=PROTOCOL_BYTE_ORDER |
| ) |
| stream.seek(int_sz + 2 * b_sz, SEEK_CUR) |
| return header_fields, length |
| |
| @classmethod |
| def __build_final_class(cls, fields): |
| return type( |
| cls.__name__, |
| (ctypes.LittleEndianStructure,), |
| { |
| '_pack_': 1, |
| '_fields_': fields, |
| } |
| ) |
| |
| @classmethod |
| def to_python_not_null(cls, ctypes_object, *args, **kwargs): |
| result = [ |
| AnyDataObject.to_python(getattr(ctypes_object, f'element_{i}'), **kwargs) |
| for i in range(ctypes_object.length) |
| ] |
| return ctypes_object.type, result |
| |
| @classmethod |
| async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs): |
| result_coro = [ |
| AnyDataObject.to_python_async(getattr(ctypes_object, f'element_{i}'), **kwargs) |
| for i in range(ctypes_object.length) |
| ] |
| |
| return ctypes_object.type, await asyncio.gather(*result_coro) |
| |
| @classmethod |
| def from_python_not_null(cls, stream, value, *args, **kwargs): |
| type_id, value = value |
| try: |
| length = len(value) |
| except TypeError: |
| value = [value] |
| length = 1 |
| |
| cls.__write_header(stream, type_id, length) |
| for x in value: |
| infer_from_python(stream, x) |
| |
| @classmethod |
| async def from_python_not_null_async(cls, stream, value, *args, **kwargs): |
| type_id, value = value |
| try: |
| length = len(value) |
| except TypeError: |
| value = [value] |
| length = 1 |
| |
| cls.__write_header(stream, type_id, length) |
| for x in value: |
| await infer_from_python_async(stream, x) |
| |
| @classmethod |
| def __write_header(cls, stream, type_id, length): |
| stream.write(cls.type_code) |
| stream.write(length.to_bytes( |
| ctypes.sizeof(ctypes.c_int), byteorder=PROTOCOL_BYTE_ORDER |
| )) |
| stream.write(type_id.to_bytes( |
| length=ctypes.sizeof(ctypes.c_byte), |
| byteorder=PROTOCOL_BYTE_ORDER, |
| signed=True) |
| ) |
| |
| |
| class _MapBase: |
| HASH_MAP = 1 |
| LINKED_HASH_MAP = 2 |
| |
| @classmethod |
| def _parse_header(cls, stream): |
| raise NotImplementedError |
| |
| @classmethod |
| def _parse(cls, stream): |
| fields, length = cls._parse_header(stream) |
| for i in range(length << 1): |
| c_type = AnyDataObject.parse(stream) |
| fields.append((f'element_{i}', c_type)) |
| return cls.__build_final_class(fields) |
| |
| @classmethod |
| async def _parse_async(cls, stream): |
| fields, length = cls._parse_header(stream) |
| for i in range(length << 1): |
| c_type = await AnyDataObject.parse_async(stream) |
| fields.append((f'element_{i}', c_type)) |
| |
| return cls.__build_final_class(fields) |
| |
| @classmethod |
| def __build_final_class(cls, fields): |
| return type( |
| cls.__name__, |
| (ctypes.LittleEndianStructure,), |
| { |
| '_pack_': 1, |
| '_fields_': fields, |
| } |
| ) |
| |
| @classmethod |
| def _to_python(cls, ctypes_object, **kwargs): |
| map_cls = cls.__get_map_class(ctypes_object) |
| |
| result = map_cls() |
| for i in range(0, ctypes_object.length << 1, 2): |
| k = AnyDataObject.to_python(getattr(ctypes_object, f'element_{i}'), **kwargs) |
| v = AnyDataObject.to_python(getattr(ctypes_object, f'element_{i + 1}'), **kwargs) |
| result[k] = v |
| return result |
| |
| @classmethod |
| async def _to_python_async(cls, ctypes_object, **kwargs): |
| map_cls = cls.__get_map_class(ctypes_object) |
| |
| kv_pairs_coro = [ |
| asyncio.gather( |
| AnyDataObject.to_python_async( |
| getattr(ctypes_object, f'element_{i}'), **kwargs |
| ), |
| AnyDataObject.to_python_async( |
| getattr(ctypes_object, f'element_{i + 1}'), **kwargs |
| ) |
| ) for i in range(0, ctypes_object.length << 1, 2) |
| ] |
| |
| return map_cls(await asyncio.gather(*kv_pairs_coro)) |
| |
| @classmethod |
| def __get_map_class(cls, ctypes_object): |
| map_type = getattr(ctypes_object, 'type', cls.HASH_MAP) |
| return OrderedDict if map_type == cls.LINKED_HASH_MAP else dict |
| |
| @classmethod |
| def _from_python(cls, stream, value, type_id=None): |
| cls._write_header(stream, type_id, len(value)) |
| for k, v in value.items(): |
| infer_from_python(stream, k) |
| infer_from_python(stream, v) |
| |
| @classmethod |
| async def _from_python_async(cls, stream, value, type_id): |
| cls._write_header(stream, type_id, len(value)) |
| for k, v in value.items(): |
| await infer_from_python_async(stream, k) |
| await infer_from_python_async(stream, v) |
| |
| @classmethod |
| def _write_header(cls, stream, type_id, length): |
| raise NotImplementedError |
| |
| |
| class Map(IgniteDataType, _MapBase): |
| """ |
| Dictionary type, payload-only. |
| |
| Ignite does not track the order of key-value pairs in its caches, hence |
| the ordinary Python dict type, not the collections.OrderedDict. |
| """ |
| _type_name = NAME_MAP |
| _type_id = TYPE_MAP |
| |
| @classmethod |
| def parse(cls, stream): |
| return cls._parse(stream) |
| |
| @classmethod |
| async def parse_async(cls, stream): |
| return await cls._parse_async(stream) |
| |
| @classmethod |
| def _parse_header(cls, stream): |
| int_sz = ctypes.sizeof(ctypes.c_int) |
| length = int.from_bytes( |
| stream.slice(stream.tell(), int_sz), |
| byteorder=PROTOCOL_BYTE_ORDER |
| ) |
| stream.seek(int_sz, SEEK_CUR) |
| return [('length', ctypes.c_int)], length |
| |
| @classmethod |
| def to_python(cls, ctypes_object, **kwargs): |
| return cls._to_python(ctypes_object, **kwargs) |
| |
| @classmethod |
| async def to_python_async(cls, ctypes_object, **kwargs): |
| return await cls._to_python_async(ctypes_object, **kwargs) |
| |
| @classmethod |
| def from_python(cls, stream, value, type_id=None): |
| return cls._from_python(stream, value, type_id) |
| |
| @classmethod |
| async def from_python_async(cls, stream, value, type_id=None): |
| return await cls._from_python_async(stream, value, type_id) |
| |
| @classmethod |
| def _write_header(cls, stream, type_id, length): |
| stream.write(length.to_bytes( |
| length=ctypes.sizeof(ctypes.c_int), |
| byteorder=PROTOCOL_BYTE_ORDER |
| )) |
| |
| |
| class MapObject(Nullable, _MapBase): |
| """ |
| This is a dictionary type. |
| |
| Represented as tuple(type_id, value). |
| |
| Type ID can be a :py:attr:`~HASH_MAP` (corresponds to an ordinary `dict` |
| in Python) or a :py:attr:`~LINKED_HASH_MAP` (`collections.OrderedDict`). |
| """ |
| _type_name = NAME_MAP |
| _type_id = TYPE_MAP |
| type_code = TC_MAP |
| |
| @classmethod |
| def parse_not_null(cls, stream): |
| return cls._parse(stream) |
| |
| @classmethod |
| async def parse_not_null_async(cls, stream): |
| return await cls._parse_async(stream) |
| |
| @classmethod |
| def _parse_header(cls, stream): |
| int_sz, b_sz = ctypes.sizeof(ctypes.c_int), ctypes.sizeof(ctypes.c_byte) |
| length = int.from_bytes( |
| stream.slice(stream.tell() + b_sz, int_sz), |
| byteorder=PROTOCOL_BYTE_ORDER |
| ) |
| stream.seek(int_sz + 2 * b_sz, SEEK_CUR) |
| fields = [('type_code', ctypes.c_byte), ('length', ctypes.c_int), ('type', ctypes.c_byte)] |
| return fields, length |
| |
| @classmethod |
| def to_python_not_null(cls, ctypes_object, **kwargs): |
| return ctypes_object.type, cls._to_python(ctypes_object, **kwargs) |
| |
| @classmethod |
| async def to_python_not_null_async(cls, ctypes_object, **kwargs): |
| return ctypes_object.type, await cls._to_python_async(ctypes_object, **kwargs) |
| |
| @classmethod |
| def from_python_not_null(cls, stream, value, **kwargs): |
| type_id, value = value |
| if value is None: |
| Null.from_python(stream) |
| else: |
| cls._from_python(stream, value, type_id) |
| |
| @classmethod |
| async def from_python_not_null_async(cls, stream, value, **kwargs): |
| type_id, value = value |
| if value is None: |
| Null.from_python(stream) |
| else: |
| await cls._from_python_async(stream, value, type_id) |
| |
| @classmethod |
| def _write_header(cls, stream, type_id, length): |
| stream.write(cls.type_code) |
| stream.write(length.to_bytes( |
| length=ctypes.sizeof(ctypes.c_int), |
| byteorder=PROTOCOL_BYTE_ORDER) |
| ) |
| stream.write(type_id.to_bytes( |
| length=ctypes.sizeof(ctypes.c_byte), |
| byteorder=PROTOCOL_BYTE_ORDER, |
| signed=True) |
| ) |
| |
| |
| class BinaryObject(Nullable): |
| _type_id = TYPE_BINARY_OBJ |
| _header_class = None |
| type_code = TC_COMPLEX_OBJECT |
| |
| USER_TYPE = 0x0001 |
| HAS_SCHEMA = 0x0002 |
| HAS_RAW_DATA = 0x0004 |
| OFFSET_ONE_BYTE = 0x0008 |
| OFFSET_TWO_BYTES = 0x0010 |
| COMPACT_FOOTER = 0x0020 |
| |
| @classmethod |
| def hashcode(cls, value: object, client: Optional['Client'] = None) -> int: |
| # binary objects's hashcode implementation is special in the sense |
| # that you need to fully serialize the object to calculate |
| # its hashcode |
| if not value._hashcode and client: |
| with BinaryStream(client) as stream: |
| value._from_python(stream, save_to_buf=True) |
| |
| return value._hashcode |
| |
| @classmethod |
| async def hashcode_async(cls, value: object, client: Optional['AioClient'] = None) -> int: |
| if not value._hashcode and client: |
| with AioBinaryStream(client) as stream: |
| await value._from_python_async(stream, save_to_buf=True) |
| |
| return value._hashcode |
| |
| @classmethod |
| def get_header_class(cls): |
| if not cls._header_class: |
| cls._header_class = type( |
| cls.__name__, |
| (ctypes.LittleEndianStructure,), |
| { |
| '_pack_': 1, |
| '_fields_': [ |
| ('type_code', ctypes.c_byte), |
| ('version', ctypes.c_byte), |
| ('flags', ctypes.c_short), |
| ('type_id', ctypes.c_int), |
| ('hash_code', ctypes.c_int), |
| ('length', ctypes.c_int), |
| ('schema_id', ctypes.c_int), |
| ('schema_offset', ctypes.c_int), |
| ], |
| } |
| ) |
| return cls._header_class |
| |
| @classmethod |
| def offset_c_type(cls, flags: int): |
| if flags & cls.OFFSET_ONE_BYTE: |
| return ctypes.c_ubyte |
| if flags & cls.OFFSET_TWO_BYTES: |
| return ctypes.c_uint16 |
| return ctypes.c_uint |
| |
| @classmethod |
| def schema_type(cls, flags: int): |
| if flags & cls.COMPACT_FOOTER: |
| return cls.offset_c_type(flags) |
| return type( |
| 'SchemaElement', |
| (ctypes.LittleEndianStructure,), |
| { |
| '_pack_': 1, |
| '_fields_': [ |
| ('field_id', ctypes.c_int), |
| ('offset', cls.offset_c_type(flags)), |
| ], |
| }, |
| ) |
| |
| @classmethod |
| def parse_not_null(cls, stream): |
| header, header_class = cls.__parse_header(stream) |
| |
| # ignore full schema, always retrieve fields' types and order |
| # from complex types registry |
| data_class = stream.get_dataclass(header) |
| object_fields_struct = cls.__build_object_fields_struct(data_class) |
| object_fields = object_fields_struct.parse(stream) |
| |
| return cls.__build_final_class(stream, header, header_class, object_fields, |
| len(object_fields_struct.fields)) |
| |
| @classmethod |
| async def parse_not_null_async(cls, stream): |
| header, header_class = cls.__parse_header(stream) |
| |
| # ignore full schema, always retrieve fields' types and order |
| # from complex types registry |
| data_class = await stream.get_dataclass(header) |
| object_fields_struct = cls.__build_object_fields_struct(data_class) |
| object_fields = await object_fields_struct.parse_async(stream) |
| |
| return cls.__build_final_class(stream, header, header_class, object_fields, |
| len(object_fields_struct.fields)) |
| |
| @classmethod |
| def __parse_header(cls, stream): |
| header_class = cls.get_header_class() |
| header = stream.read_ctype(header_class) |
| stream.seek(ctypes.sizeof(header_class), SEEK_CUR) |
| return header, header_class |
| |
| @staticmethod |
| def __build_object_fields_struct(data_class): |
| fields = data_class.schema.items() |
| return Struct(fields) |
| |
| @classmethod |
| def __build_final_class(cls, stream, header, header_class, object_fields, fields_len): |
| final_class_fields = [('object_fields', object_fields)] |
| if header.flags & cls.HAS_SCHEMA: |
| schema = cls.schema_type(header.flags) * fields_len |
| stream.seek(ctypes.sizeof(schema), SEEK_CUR) |
| final_class_fields.append(('schema', schema)) |
| |
| final_class = type( |
| cls.__name__, |
| (header_class,), |
| { |
| '_pack_': 1, |
| '_fields_': final_class_fields, |
| } |
| ) |
| # register schema encoding approach |
| stream.compact_footer = bool(header.flags & cls.COMPACT_FOOTER) |
| return final_class |
| |
| @classmethod |
| def to_python_not_null(cls, ctypes_object, client: 'Client' = None, **kwargs): |
| type_id = ctypes_object.type_id |
| if not client: |
| raise ParseError(f'Can not query binary type {type_id}') |
| |
| data_class = client.query_binary_type(type_id, ctypes_object.schema_id) |
| result = data_class() |
| result.version = ctypes_object.version |
| |
| for field_name, field_type in data_class.schema.items(): |
| setattr( |
| result, field_name, field_type.to_python( |
| getattr(ctypes_object.object_fields, field_name), client=client, **kwargs |
| ) |
| ) |
| return result |
| |
| @classmethod |
| async def to_python_not_null_async(cls, ctypes_object, client: 'AioClient' = None, **kwargs): |
| type_id = ctypes_object.type_id |
| if not client: |
| raise ParseError(f'Can not query binary type {type_id}') |
| |
| data_class = await client.query_binary_type(type_id, ctypes_object.schema_id) |
| result = data_class() |
| result.version = ctypes_object.version |
| |
| field_values = await asyncio.gather( |
| *[ |
| field_type.to_python_async( |
| getattr(ctypes_object.object_fields, field_name), client=client, **kwargs |
| ) |
| for field_name, field_type in data_class.schema.items() |
| ] |
| ) |
| |
| for i, field_name in enumerate(data_class.schema.keys()): |
| setattr(result, field_name, field_values[i]) |
| |
| return result |
| |
| @classmethod |
| def __get_type_id(cls, ctypes_object, client): |
| type_id = getattr(ctypes_object, "type_id", None) |
| if type_id: |
| if not client: |
| raise ParseError(f'Can not query binary type {type_id}') |
| return type_id |
| return None |
| |
| @classmethod |
| def from_python_not_null(cls, stream, value, **kwargs): |
| if cls.__write_fast_path(stream, value): |
| stream.register_binary_type(value.__class__) |
| value._from_python(stream) |
| |
| @classmethod |
| async def from_python_not_null_async(cls, stream, value, **kwargs): |
| if cls.__write_fast_path(stream, value): |
| await stream.register_binary_type(value.__class__) |
| await value._from_python_async(stream) |
| |
| @classmethod |
| def __write_fast_path(cls, stream, value): |
| if getattr(value, '_buffer', None): |
| stream.write(value._buffer) |
| return False |
| return True |