# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import ctypes
import decimal

from functools import wraps
from threading import Event, Thread
from typing import Any, Callable, Optional, Type, Tuple, Union

from pyignite.datatypes.base import IgniteDataType
from .constants import *


LONG_MASK = 0xffffffff
DIGITS_PER_INT = 9


def is_pow2(value: int) -> bool:
    """ Check if value is power of two. """
    return value > 0 and ((value & (value - 1)) == 0)


def is_iterable(value: Any) -> bool:
    """ Check if value is iterable. """
    try:
        iter(value)
        return True
    except TypeError:
        return False


def is_binary(value):
    """
    Check if a value is a pythonic representation of a Complex object.
    """
    return all([
        hasattr(value, 'type_name'),
        hasattr(value, 'type_id'),
        hasattr(value, 'schema'),
        hasattr(value, 'schema_id'),
    ])


def is_hinted(value):
    """
    Check if a value is a tuple of data item and its type hint.
    """
    return (
        isinstance(value, tuple)
        and len(value) == 2
        and issubclass(value[1], IgniteDataType)
    )


def is_wrapped(value: Any) -> bool:
    """
    Check if a value is of WrappedDataObject type.
    """
    return (
        type(value) is tuple
        and len(value) == 2
        and type(value[0]) is bytes
        and type(value[1]) is int
    )


def int_overflow(value: int) -> int:
    """
    Simulates 32bit integer overflow.
    """
    return ((value ^ 0x80000000) & 0xffffffff) - 0x80000000


def unwrap_binary(client: 'Client', wrapped: tuple) -> object:
    """
    Unwrap wrapped BinaryObject and convert it to Python data.

    :param client: connection to Ignite cluster,
    :param wrapped: `WrappedDataObject` value,
    :return: dict representing wrapped BinaryObject.
    """
    from pyignite.datatypes.complex import BinaryObject

    blob, offset = wrapped
    conn_clone = client.random_node.clone(prefetch=blob)
    conn_clone.pos = offset
    data_class, data_bytes = BinaryObject.parse(conn_clone)
    result = BinaryObject.to_python(
        data_class.from_buffer_copy(data_bytes),
        client,
    )
    conn_clone.close()
    return result


def hashcode(string: Union[str, bytes]) -> int:
    """
    Calculate hash code used for identifying objects in Ignite binary API.

    :param string: UTF-8-encoded string identifier of binary buffer,
    :return: hash code.
    """
    result = 0
    for char in string:
        try:
            char = ord(char)
        except TypeError:
            pass
        result = int_overflow(31 * result + char)
    return result


def cache_id(cache: Union[str, int]) -> int:
    """
    Create a cache ID from cache name.

    :param cache: cache name or ID,
    :return: cache ID.
    """
    return cache if type(cache) is int else hashcode(cache)


def entity_id(cache: Union[str, int]) -> Optional[int]:
    """
    Create a type ID from type name or field ID from field name.

    :param cache: entity name or ID,
    :return: entity ID.
    """
    if cache is None:
        return None
    return cache if type(cache) is int else hashcode(cache.lower())


def schema_id(schema: Union[int, dict]) -> int:
    """
    Calculate Complex Object schema ID.

    :param schema: a dict of field names: field types,
    :return: schema ID.
    """
    if type(schema) is int:
        return schema
    if schema is None:
        return 0
    s_id = FNV1_OFFSET_BASIS if schema else 0
    for field_name in schema.keys():
        field_id = entity_id(field_name)
        s_id ^= (field_id & 0xff)
        s_id = int_overflow(s_id * FNV1_PRIME)
        s_id ^= ((field_id >> 8) & 0xff)
        s_id = int_overflow(s_id * FNV1_PRIME)
        s_id ^= ((field_id >> 16) & 0xff)
        s_id = int_overflow(s_id * FNV1_PRIME)
        s_id ^= ((field_id >> 24) & 0xff)
        s_id = int_overflow(s_id * FNV1_PRIME)
    return s_id


def decimal_hashcode(value: decimal.Decimal) -> int:
    """
    This is a translation of `java.math.BigDecimal` class `hashCode()` method
    to Python.

    :param value: pythonic decimal value,
    :return: hashcode.
    """
    sign, digits, scale = value.normalize().as_tuple()
    sign = -1 if sign else 1
    value = int(''.join([str(d) for d in digits]))

    if value < MAX_LONG:
        # this is the case when Java BigDecimal digits are stored
        # compactly, in the internal 64-bit integer field
        int_hash = (
            (unsigned(value, ctypes.c_ulonglong) >> 32) * 31
            + (value & LONG_MASK)
        ) & LONG_MASK
    else:
        # digits are not fit in the 64-bit long, so they get split internally
        # to an array of values within 32-bit integer range each (it is really
        # a part of `java.math.BigInteger` class internals)
        magnitude = []
        order = 0
        while True:
            elem = value >> order
            if elem > 1:
                magnitude.insert(0, ctypes.c_int(elem).value)
                order += 32
            else:
                break

        int_hash = 0
        for v in magnitude:
            int_hash = (31 * int_hash + (v & LONG_MASK)) & LONG_MASK

    return ctypes.c_int(31 * int_hash * sign - scale).value


def datetime_hashcode(value: int) -> int:
    """
    Calculates hashcode from UNIX epoch.

    :param value: UNIX time,
    :return: Java hashcode.
    """
    return (value & LONG_MASK) ^ (unsigned(value, ctypes.c_ulonglong) >> 32)


def status_to_exception(exc: Type[Exception]):
    """
    Converts erroneous status code with error message to an exception
    of the given class.

    :param exc: the class of exception to raise,
    :return: decorator.
    """
    def ste_decorator(fn):
        @wraps(fn)
        def ste_wrapper(*args, **kwargs):
            result = fn(*args, **kwargs)
            if result.status != 0:
                raise exc(result.message)
            return result.value
        return ste_wrapper
    return ste_decorator


def get_field_by_id(
    obj: 'GenericObjectMeta', field_id: int
) -> Tuple[Any, IgniteDataType]:
    """
    Returns a complex object's field value, given the field's entity ID.

    :param obj: complex object,
    :param field_id: field ID,
    :return: complex object field's value and type.
    """
    for fname, ftype in obj._schema.items():
        if entity_id(fname) == field_id:
            return getattr(obj, fname, getattr(ftype, 'default')), ftype


def unsigned(value: int, c_type: ctypes._SimpleCData = ctypes.c_uint) -> int:
    """ Convert signed integer value to unsigned. """
    return c_type(value).value


class DaemonicTimer(Thread):
    """
    Same as normal `threading.Timer`, but do not delay the program exit.
    """

    def __init__(self, interval, function, args=None, kwargs=None):
        Thread.__init__(self, daemon=True)
        self.interval = interval
        self.function = function
        self.args = args if args is not None else []
        self.kwargs = kwargs if kwargs is not None else {}
        self.finished = Event()

    def cancel(self):
        """Stop the timer if it hasn't finished yet."""
        self.finished.set()

    def run(self):
        self.finished.wait(self.interval)
        if not self.finished.is_set():
            self.function(*self.args, **self.kwargs)
        self.finished.set()


def capitalize(string: str) -> str:
    """
    Capitalizing the string, assuming the first character is a letter.
    Does not touch any other character, unlike the `string.capitalize()`.
    """
    return string[:1].upper() + string[1:]


def process_delimiter(name: str, delimiter: str) -> str:
    """
    Splits the name by delimiter, capitalize each part, merge.
    """
    return ''.join([capitalize(x) for x in name.split(delimiter)])
