| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| from typing import Union |
| |
| from pyignite.connection import Connection, AioConnection |
| from pyignite.constants import PROTOCOL_BYTE_ORDER |
| from pyignite.datatypes.binary import enum_struct, schema_struct, binary_fields_struct |
| from pyignite.datatypes import String, Int, Bool |
| from pyignite.queries import Query, query_perform |
| from pyignite.queries.op_codes import OP_GET_BINARY_TYPE, OP_PUT_BINARY_TYPE |
| from pyignite.utils import entity_id, schema_id |
| from .result import APIResult |
| from ..queries.response import BinaryTypeResponse |
| |
| |
| def get_binary_type(conn: 'Connection', binary_type: Union[str, int], query_id=None) -> APIResult: |
| """ |
| Gets the binary type information by type ID. |
| |
| :param conn: connection to Ignite server, |
| :param binary_type: binary type name or ID, |
| :param query_id: (optional) a value generated by client and returned as-is |
| in response.query_id. When the parameter is omitted, a random value |
| is generated, |
| :return: API result data object. |
| """ |
| return __get_binary_type(conn, binary_type, query_id) |
| |
| |
| async def get_binary_type_async(conn: 'AioConnection', binary_type: Union[str, int], query_id=None) -> APIResult: |
| """ |
| Async version of get_binary_type. |
| """ |
| return await __get_binary_type(conn, binary_type, query_id) |
| |
| |
| def __get_binary_type(conn, binary_type, query_id): |
| query_struct = Query( |
| OP_GET_BINARY_TYPE, |
| [ |
| ('type_id', Int), |
| ], |
| query_id=query_id, |
| response_type=BinaryTypeResponse |
| ) |
| |
| return query_perform(query_struct, conn, query_params={ |
| 'type_id': entity_id(binary_type), |
| }) |
| |
| |
| def put_binary_type(connection: 'Connection', type_name: str, affinity_key_field: str = None, |
| is_enum=False, schema: dict = None, query_id=None) -> APIResult: |
| """ |
| Registers binary type information in cluster. |
| |
| :param connection: connection to Ignite server, |
| :param type_name: name of the data type being registered, |
| :param affinity_key_field: (optional) name of the affinity key field, |
| :param is_enum: (optional) register enum if True, binary object otherwise. |
| Defaults to False, |
| :param schema: (optional) when register enum, pass a dict of enumerated |
| parameter names as keys and an integers as values. When register binary |
| type, pass a dict of field names: field types. Binary type with no fields |
| is OK, |
| :param query_id: (optional) a value generated by client and returned as-is |
| in response.query_id. When the parameter is omitted, a random value |
| is generated, |
| :return: API result data object. |
| """ |
| return __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema, query_id) |
| |
| |
| async def put_binary_type_async(connection: 'AioConnection', type_name: str, affinity_key_field: str = None, |
| is_enum=False, schema: dict = None, query_id=None) -> APIResult: |
| """ |
| Async version of put_binary_type. |
| """ |
| return await __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema, query_id) |
| |
| |
| def __post_process_put_binary(type_id): |
| def internal(result): |
| if result.status == 0: |
| result.value = { |
| 'type_id': type_id, |
| 'schema_id': schema_id, |
| } |
| return result |
| return internal |
| |
| |
| def __put_binary_type(connection, type_name, affinity_key_field, is_enum, schema, query_id): |
| # prepare data |
| if schema is None: |
| schema = {} |
| type_id = entity_id(type_name) |
| data = { |
| 'type_name': type_name, |
| 'type_id': type_id, |
| 'affinity_key_field': affinity_key_field, |
| 'binary_fields': [], |
| 'is_enum': is_enum, |
| 'schema': [], |
| } |
| s_id = None |
| if is_enum: |
| data['enums'] = [] |
| for literal, ordinal in schema.items(): |
| data['enums'].append({ |
| 'literal': literal, |
| 'type_id': ordinal, |
| }) |
| else: |
| # assemble schema and calculate schema ID in one go |
| s_id = schema_id(schema) |
| for field_name, data_type in schema.items(): |
| # TODO: check for allowed data types |
| field_id = entity_id(field_name) |
| data['binary_fields'].append({ |
| 'field_name': field_name, |
| 'type_id': int.from_bytes( |
| data_type.type_code, |
| byteorder=PROTOCOL_BYTE_ORDER |
| ), |
| 'field_id': field_id, |
| }) |
| |
| data['schema'].append({ |
| 'schema_id': s_id, |
| 'schema_fields': [ |
| {'schema_field_id': entity_id(x)} for x in schema |
| ], |
| }) |
| |
| # do query |
| if is_enum: |
| query_struct = Query( |
| OP_PUT_BINARY_TYPE, |
| [ |
| ('type_id', Int), |
| ('type_name', String), |
| ('affinity_key_field', String), |
| ('binary_fields', binary_fields_struct), |
| ('is_enum', Bool), |
| ('enums', enum_struct), |
| ('schema', schema_struct), |
| ], |
| query_id=query_id, |
| ) |
| else: |
| query_struct = Query( |
| OP_PUT_BINARY_TYPE, |
| [ |
| ('type_id', Int), |
| ('type_name', String), |
| ('affinity_key_field', String), |
| ('binary_fields', binary_fields_struct), |
| ('is_enum', Bool), |
| ('schema', schema_struct), |
| ], |
| query_id=query_id, |
| ) |
| return query_perform(query_struct, connection, query_params=data, |
| post_process_fun=__post_process_put_binary(type_id)) |