blob: 87a5232cf3d659ed6bbc8638fbd2f2c846c94974 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Union
from pyignite.constants import *
from pyignite.datatypes.binary import (
body_struct, enum_struct, schema_struct, binary_fields_struct,
)
from pyignite.datatypes import String, Int, Bool
from pyignite.queries import Query
from pyignite.queries.op_codes import *
from pyignite.utils import entity_id, schema_id
from .result import APIResult
from ..stream import BinaryStream, READ_BACKWARD
from ..queries.response import Response
def get_binary_type(conn: 'Connection', binary_type: Union[str, int], query_id=None) -> APIResult:
"""
Gets the binary type information by type ID.
:param conn: connection to Ignite server,
:param binary_type: binary type name or ID,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object.
"""
query_struct = Query(
OP_GET_BINARY_TYPE,
[
('type_id', Int),
],
query_id=query_id,
)
with BinaryStream(conn) as stream:
query_struct.from_python(stream, {
'type_id': entity_id(binary_type),
})
conn.send(stream.getbuffer())
response_head_struct = Response(protocol_version=conn.get_protocol_version(),
following=[('type_exists', Bool)])
with BinaryStream(conn, conn.recv()) as stream:
init_pos = stream.tell()
response_head_type = response_head_struct.parse(stream)
response_head = stream.read_ctype(response_head_type, direction=READ_BACKWARD)
response_parts = []
if response_head.type_exists:
resp_body_type = body_struct.parse(stream)
response_parts.append(('body', resp_body_type))
resp_body = stream.read_ctype(resp_body_type, direction=READ_BACKWARD)
if resp_body.is_enum:
resp_enum = enum_struct.parse(stream)
response_parts.append(('enums', resp_enum))
resp_schema_type = schema_struct.parse(stream)
response_parts.append(('schema', resp_schema_type))
response_class = type(
'GetBinaryTypeResponse',
(response_head_type,),
{
'_pack_': 1,
'_fields_': response_parts,
}
)
response = stream.read_ctype(response_class, position=init_pos)
result = APIResult(response)
if result.status != 0:
return result
result.value = {
'type_exists': Bool.to_python(response.type_exists)
}
if hasattr(response, 'body'):
result.value.update(body_struct.to_python(response.body))
if hasattr(response, 'enums'):
result.value['enums'] = enum_struct.to_python(response.enums)
if hasattr(response, 'schema'):
result.value['schema'] = {
x['schema_id']: [
z['schema_field_id'] for z in x['schema_fields']
]
for x in schema_struct.to_python(response.schema)
}
return result
def put_binary_type(
connection: 'Connection', type_name: str, affinity_key_field: str=None,
is_enum=False, schema: dict=None, query_id=None,
) -> APIResult:
"""
Registers binary type information in cluster.
:param connection: connection to Ignite server,
:param type_name: name of the data type being registered,
:param affinity_key_field: (optional) name of the affinity key field,
:param is_enum: (optional) register enum if True, binary object otherwise.
Defaults to False,
:param schema: (optional) when register enum, pass a dict of enumerated
parameter names as keys and an integers as values. When register binary
type, pass a dict of field names: field types. Binary type with no fields
is OK,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object.
"""
# prepare data
if schema is None:
schema = {}
type_id = entity_id(type_name)
data = {
'type_name': type_name,
'type_id': type_id,
'affinity_key_field': affinity_key_field,
'binary_fields': [],
'is_enum': is_enum,
'schema': [],
}
s_id = None
if is_enum:
data['enums'] = []
for literal, ordinal in schema.items():
data['enums'].append({
'literal': literal,
'type_id': ordinal,
})
else:
# assemble schema and calculate schema ID in one go
s_id = schema_id(schema)
for field_name, data_type in schema.items():
# TODO: check for allowed data types
field_id = entity_id(field_name)
data['binary_fields'].append({
'field_name': field_name,
'type_id': int.from_bytes(
data_type.type_code,
byteorder=PROTOCOL_BYTE_ORDER
),
'field_id': field_id,
})
data['schema'].append({
'schema_id': s_id,
'schema_fields': [
{'schema_field_id': entity_id(x)} for x in schema
],
})
# do query
if is_enum:
query_struct = Query(
OP_PUT_BINARY_TYPE,
[
('type_id', Int),
('type_name', String),
('affinity_key_field', String),
('binary_fields', binary_fields_struct),
('is_enum', Bool),
('enums', enum_struct),
('schema', schema_struct),
],
query_id=query_id,
)
else:
query_struct = Query(
OP_PUT_BINARY_TYPE,
[
('type_id', Int),
('type_name', String),
('affinity_key_field', String),
('binary_fields', binary_fields_struct),
('is_enum', Bool),
('schema', schema_struct),
],
query_id=query_id,
)
result = query_struct.perform(connection, query_params=data)
if result.status == 0:
result.value = {
'type_id': type_id,
'schema_id': schema_id,
}
return result