| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """ |
| :class:`~pyignite.binary.GenericObjectMeta` is a metaclass used to create |
| classes, which objects serve as a native Python values for Ignite Complex |
| object data type. You can use this metaclass with your existing classes |
| to save and restore their selected attributes and properties to/from |
| Ignite caches. It is also used internally by `pyignite` to create simple |
| data classes “on the fly” when retrieving arbitrary Complex objects. |
| |
| You can get the examples of using Complex objects in the |
| :ref:`complex_object_usage` section of `pyignite` documentation. |
| """ |
| |
| from collections import OrderedDict |
| import ctypes |
| from io import SEEK_CUR |
| from typing import Any |
| |
| import attr |
| |
| from .constants import PROTOCOL_BYTE_ORDER |
| from .datatypes import ( |
| Null, ByteObject, ShortObject, IntObject, LongObject, FloatObject, DoubleObject, CharObject, BoolObject, UUIDObject, |
| DateObject, TimestampObject, TimeObject, EnumObject, BinaryEnumObject, ByteArrayObject, ShortArrayObject, |
| IntArrayObject, LongArrayObject, FloatArrayObject, DoubleArrayObject, CharArrayObject, BoolArrayObject, |
| UUIDArrayObject, DateArrayObject, TimestampArrayObject, TimeArrayObject, EnumArrayObject, String, StringArrayObject, |
| DecimalObject, DecimalArrayObject, ObjectArrayObject, CollectionObject, MapObject, BinaryObject, WrappedDataObject |
| ) |
| from .datatypes.base import IgniteDataTypeProps |
| from .exceptions import ParseError |
| from .utils import entity_id, schema_id |
| |
| |
| ALLOWED_FIELD_TYPES = [ |
| Null, ByteObject, ShortObject, IntObject, LongObject, FloatObject, |
| DoubleObject, CharObject, BoolObject, UUIDObject, DateObject, |
| TimestampObject, TimeObject, EnumObject, BinaryEnumObject, |
| ByteArrayObject, ShortArrayObject, IntArrayObject, LongArrayObject, |
| FloatArrayObject, DoubleArrayObject, CharArrayObject, BoolArrayObject, |
| UUIDArrayObject, DateArrayObject, TimestampArrayObject, |
| TimeArrayObject, EnumArrayObject, String, StringArrayObject, |
| DecimalObject, DecimalArrayObject, ObjectArrayObject, CollectionObject, |
| MapObject, BinaryObject, WrappedDataObject, |
| ] |
| |
| |
| class GenericObjectProps(IgniteDataTypeProps): |
| """ |
| This class is mixed both to metaclass and to resulting class to make class |
| properties universally available. You should not subclass it directly. |
| """ |
| @property |
| def schema(self) -> OrderedDict: |
| """ Binary object schema. """ |
| return self._schema.copy() |
| |
| @property |
| def schema_id(self) -> int: |
| """ Binary object schema ID. """ |
| return schema_id(self._schema) |
| |
| def __new__(cls, *args, **kwargs) -> Any: |
| # allow all items in Binary Object schema to be populated as optional |
| # arguments to `__init__()` with sensible defaults. |
| if not attr.has(cls): |
| attributes = { |
| k: attr.ib(type=getattr(v, 'pythonic', type(None)), default=getattr(v, 'default', None)) |
| for k, v in cls.schema.items() |
| } |
| |
| attributes.update({'version': attr.ib(type=int, default=1)}) |
| cls = attr.s(cls, these=attributes) |
| # skip parameters |
| return super().__new__(cls) |
| |
| |
| class GenericObjectPropsMeta(type, GenericObjectProps): |
| pass |
| |
| |
| class GenericObjectMeta(GenericObjectPropsMeta): |
| """ |
| Complex (or Binary) Object metaclass. It is aimed to help user create |
| classes, which objects could serve as a pythonic representation of the |
| :class:`~pyignite.datatypes.complex.BinaryObject` Ignite data type. |
| """ |
| _schema = None |
| _type_name = None |
| version = None |
| |
| def __new__( |
| mcs: Any, name: str, base_classes: tuple, namespace: dict, **kwargs |
| ) -> Any: |
| """ Sort out class creation arguments. """ |
| |
| result = super().__new__( |
| mcs, name, (GenericObjectProps, ) + base_classes, namespace |
| ) |
| |
| def _from_python(self, stream, save_to_buf=False): |
| """ |
| Method for building binary representation of the Generic object |
| and calculating a hashcode from it. |
| |
| :param self: Generic object instance, |
| :param stream: BinaryStream |
| :param save_to_buf: Optional. If True, save serialized data to buffer. |
| """ |
| initial_pos = stream.tell() |
| header, header_class = write_header(self, stream) |
| |
| offsets = [ctypes.sizeof(header_class)] |
| schema_items = list(self.schema.items()) |
| for field_name, field_type in schema_items: |
| val = getattr(self, field_name, getattr(field_type, 'default', None)) |
| field_start_pos = stream.tell() |
| field_type.from_python(stream, val) |
| offsets.append(max(offsets) + stream.tell() - field_start_pos) |
| |
| write_footer(self, stream, header, header_class, schema_items, offsets, initial_pos, save_to_buf) |
| |
| async def _from_python_async(self, stream, save_to_buf=False): |
| """ |
| Async version of _from_python |
| """ |
| initial_pos = stream.tell() |
| header, header_class = write_header(self, stream) |
| |
| offsets = [ctypes.sizeof(header_class)] |
| schema_items = list(self.schema.items()) |
| for field_name, field_type in schema_items: |
| val = getattr(self, field_name, getattr(field_type, 'default', None)) |
| field_start_pos = stream.tell() |
| await field_type.from_python_async(stream, val) |
| offsets.append(max(offsets) + stream.tell() - field_start_pos) |
| |
| write_footer(self, stream, header, header_class, schema_items, offsets, initial_pos, save_to_buf) |
| |
| def write_header(obj, stream): |
| header_class = BinaryObject.get_header_class() |
| header = header_class() |
| header.type_code = int.from_bytes( |
| BinaryObject.type_code, |
| byteorder=PROTOCOL_BYTE_ORDER |
| ) |
| header.flags = BinaryObject.USER_TYPE | BinaryObject.HAS_SCHEMA |
| if stream.compact_footer: |
| header.flags |= BinaryObject.COMPACT_FOOTER |
| header.version = obj.version |
| header.type_id = obj.type_id |
| header.schema_id = obj.schema_id |
| |
| stream.seek(ctypes.sizeof(header_class), SEEK_CUR) |
| |
| return header, header_class |
| |
| def write_footer(obj, stream, header, header_class, schema_items, offsets, initial_pos, save_to_buf): |
| offsets = offsets[:-1] |
| header_len = ctypes.sizeof(header_class) |
| |
| # create footer |
| if max(offsets, default=0) < 255: |
| header.flags |= BinaryObject.OFFSET_ONE_BYTE |
| elif max(offsets) < 65535: |
| header.flags |= BinaryObject.OFFSET_TWO_BYTES |
| |
| schema_class = BinaryObject.schema_type(header.flags) * len(offsets) |
| schema = schema_class() |
| |
| if stream.compact_footer: |
| for i, offset in enumerate(offsets): |
| schema[i] = offset |
| else: |
| for i, offset in enumerate(offsets): |
| schema[i].field_id = entity_id(schema_items[i][0]) |
| schema[i].offset = offset |
| |
| # calculate size and hash code |
| fields_data_len = stream.tell() - initial_pos - header_len |
| header.schema_offset = fields_data_len + header_len |
| header.length = header.schema_offset + ctypes.sizeof(schema_class) |
| header.hash_code = stream.hashcode(initial_pos + header_len, fields_data_len) |
| |
| stream.seek(initial_pos) |
| stream.write(header) |
| stream.seek(initial_pos + header.schema_offset) |
| stream.write(schema) |
| |
| if save_to_buf: |
| obj._buffer = stream.slice(initial_pos, stream.tell() - initial_pos) |
| obj._hashcode = header.hash_code |
| |
| def _setattr(self, attr_name: str, attr_value: Any): |
| # reset binary representation, if any field is changed |
| if attr_name in self._schema.keys(): |
| self._buffer = None |
| self._hashcode = None |
| |
| # `super()` is really need these parameters |
| super(result, self).__setattr__(attr_name, attr_value) |
| |
| setattr(result, _from_python.__name__, _from_python) |
| setattr(result, _from_python_async.__name__, _from_python_async) |
| setattr(result, '__setattr__', _setattr) |
| setattr(result, '_buffer', None) |
| setattr(result, '_hashcode', None) |
| return result |
| |
| @staticmethod |
| def _validate_schema(schema: dict): |
| for field_type in schema.values(): |
| if field_type not in ALLOWED_FIELD_TYPES: |
| raise ParseError( |
| 'Wrong binary field type: {}'.format(field_type) |
| ) |
| |
| def __init__( |
| cls, name: str, base_classes: tuple, namespace: dict, |
| type_name: str = None, schema: OrderedDict = None, **kwargs |
| ): |
| """ |
| Initializes binary object class. |
| |
| :param type_name: (optional) binary object name. Defaults to class |
| name, |
| :param schema: (optional) a dict of field names: field types, |
| :raise: ParseError if one or more binary field types |
| did not recognized. |
| """ |
| cls._type_name = type_name or cls.__name__ |
| cls._type_id = entity_id(cls._type_name) |
| schema = schema or OrderedDict() |
| cls._validate_schema(schema) |
| cls._schema = schema |
| super().__init__(name, base_classes, namespace) |