blob: ca9725d483ce595c59b99928045feadaa5b0be7c [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ctypes
import decimal
from functools import wraps
from threading import Event, Thread
from typing import Any, Callable, Optional, Type, Tuple, Union
from pyignite.datatypes.base import IgniteDataType
from .constants import *
LONG_MASK = 0xffffffff
DIGITS_PER_INT = 9
def is_pow2(value: int) -> bool:
""" Check if value is power of two. """
return value > 0 and ((value & (value - 1)) == 0)
def is_iterable(value: Any) -> bool:
""" Check if value is iterable. """
try:
iter(value)
return True
except TypeError:
return False
def is_binary(value):
"""
Check if a value is a pythonic representation of a Complex object.
"""
return all([
hasattr(value, 'type_name'),
hasattr(value, 'type_id'),
hasattr(value, 'schema'),
hasattr(value, 'schema_id'),
])
def is_hinted(value):
"""
Check if a value is a tuple of data item and its type hint.
"""
return (
isinstance(value, tuple)
and len(value) == 2
and issubclass(value[1], IgniteDataType)
)
def is_wrapped(value: Any) -> bool:
"""
Check if a value is of WrappedDataObject type.
"""
return (
type(value) is tuple
and len(value) == 2
and type(value[0]) is bytes
and type(value[1]) is int
)
def int_overflow(value: int) -> int:
"""
Simulates 32bit integer overflow.
"""
return ((value ^ 0x80000000) & 0xffffffff) - 0x80000000
def unwrap_binary(client: 'Client', wrapped: tuple) -> object:
"""
Unwrap wrapped BinaryObject and convert it to Python data.
:param client: connection to Ignite cluster,
:param wrapped: `WrappedDataObject` value,
:return: dict representing wrapped BinaryObject.
"""
from pyignite.datatypes.complex import BinaryObject
blob, offset = wrapped
conn_clone = client.random_node.clone(prefetch=blob)
conn_clone.pos = offset
data_class, data_bytes = BinaryObject.parse(conn_clone)
result = BinaryObject.to_python(
data_class.from_buffer_copy(data_bytes),
client,
)
conn_clone.close()
return result
def hashcode(string: Union[str, bytes]) -> int:
"""
Calculate hash code used for identifying objects in Ignite binary API.
:param string: UTF-8-encoded string identifier of binary buffer,
:return: hash code.
"""
result = 0
for char in string:
try:
char = ord(char)
except TypeError:
pass
result = int_overflow(31 * result + char)
return result
def cache_id(cache: Union[str, int]) -> int:
"""
Create a cache ID from cache name.
:param cache: cache name or ID,
:return: cache ID.
"""
return cache if type(cache) is int else hashcode(cache)
def entity_id(cache: Union[str, int]) -> Optional[int]:
"""
Create a type ID from type name or field ID from field name.
:param cache: entity name or ID,
:return: entity ID.
"""
if cache is None:
return None
return cache if type(cache) is int else hashcode(cache.lower())
def schema_id(schema: Union[int, dict]) -> int:
"""
Calculate Complex Object schema ID.
:param schema: a dict of field names: field types,
:return: schema ID.
"""
if type(schema) is int:
return schema
if schema is None:
return 0
s_id = FNV1_OFFSET_BASIS if schema else 0
for field_name in schema.keys():
field_id = entity_id(field_name)
s_id ^= (field_id & 0xff)
s_id = int_overflow(s_id * FNV1_PRIME)
s_id ^= ((field_id >> 8) & 0xff)
s_id = int_overflow(s_id * FNV1_PRIME)
s_id ^= ((field_id >> 16) & 0xff)
s_id = int_overflow(s_id * FNV1_PRIME)
s_id ^= ((field_id >> 24) & 0xff)
s_id = int_overflow(s_id * FNV1_PRIME)
return s_id
def decimal_hashcode(value: decimal.Decimal) -> int:
"""
This is a translation of `java.math.BigDecimal` class `hashCode()` method
to Python.
:param value: pythonic decimal value,
:return: hashcode.
"""
sign, digits, scale = value.normalize().as_tuple()
sign = -1 if sign else 1
value = int(''.join([str(d) for d in digits]))
if value < MAX_LONG:
# this is the case when Java BigDecimal digits are stored
# compactly, in the internal 64-bit integer field
int_hash = (
(unsigned(value, ctypes.c_ulonglong) >> 32) * 31
+ (value & LONG_MASK)
) & LONG_MASK
else:
# digits are not fit in the 64-bit long, so they get split internally
# to an array of values within 32-bit integer range each (it is really
# a part of `java.math.BigInteger` class internals)
magnitude = []
order = 0
while True:
elem = value >> order
if elem > 1:
magnitude.insert(0, ctypes.c_int(elem).value)
order += 32
else:
break
int_hash = 0
for v in magnitude:
int_hash = (31 * int_hash + (v & LONG_MASK)) & LONG_MASK
return ctypes.c_int(31 * int_hash * sign - scale).value
def datetime_hashcode(value: int) -> int:
"""
Calculates hashcode from UNIX epoch.
:param value: UNIX time,
:return: Java hashcode.
"""
return (value & LONG_MASK) ^ (unsigned(value, ctypes.c_ulonglong) >> 32)
def status_to_exception(exc: Type[Exception]):
"""
Converts erroneous status code with error message to an exception
of the given class.
:param exc: the class of exception to raise,
:return: decorator.
"""
def ste_decorator(fn):
@wraps(fn)
def ste_wrapper(*args, **kwargs):
result = fn(*args, **kwargs)
if result.status != 0:
raise exc(result.message)
return result.value
return ste_wrapper
return ste_decorator
def get_field_by_id(
obj: 'GenericObjectMeta', field_id: int
) -> Tuple[Any, IgniteDataType]:
"""
Returns a complex object's field value, given the field's entity ID.
:param obj: complex object,
:param field_id: field ID,
:return: complex object field's value and type.
"""
for fname, ftype in obj._schema.items():
if entity_id(fname) == field_id:
return getattr(obj, fname, getattr(ftype, 'default')), ftype
def unsigned(value: int, c_type: ctypes._SimpleCData = ctypes.c_uint) -> int:
""" Convert signed integer value to unsigned. """
return c_type(value).value
class DaemonicTimer(Thread):
"""
Same as normal `threading.Timer`, but do not delay the program exit.
"""
def __init__(self, interval, function, args=None, kwargs=None):
Thread.__init__(self, daemon=True)
self.interval = interval
self.function = function
self.args = args if args is not None else []
self.kwargs = kwargs if kwargs is not None else {}
self.finished = Event()
def cancel(self):
"""Stop the timer if it hasn't finished yet."""
self.finished.set()
def run(self):
self.finished.wait(self.interval)
if not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.set()
def capitalize(string: str) -> str:
"""
Capitalizing the string, assuming the first character is a letter.
Does not touch any other character, unlike the `string.capitalize()`.
"""
return string[:1].upper() + string[1:]
def process_delimiter(name: str, delimiter: str) -> str:
"""
Splits the name by delimiter, capitalize each part, merge.
"""
return ''.join([capitalize(x) for x in name.split(delimiter)])