# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime, timedelta
import decimal
from uuid import UUID, uuid4

import pytest

from pyignite import GenericObjectMeta
from pyignite.api import *
from pyignite.constants import *
from pyignite.datatypes import *
from pyignite.datatypes.cache_config import CacheMode
from pyignite.datatypes.prop_codes import *


def test_get_node_partitions(client_partition_aware):

    conn = client_partition_aware.random_node

    cache_1 = client_partition_aware.get_or_create_cache('test_cache_1')
    cache_2 = client_partition_aware.get_or_create_cache({
        PROP_NAME: 'test_cache_2',
        PROP_CACHE_KEY_CONFIGURATION: [
            {
                'type_name': ByteArray.type_name,
                'affinity_key_field_name': 'byte_affinity',
            }
        ],
    })
    cache_3 = client_partition_aware.get_or_create_cache('test_cache_3')
    cache_4 = client_partition_aware.get_or_create_cache('test_cache_4')
    cache_5 = client_partition_aware.get_or_create_cache('test_cache_5')

    result = cache_get_node_partitions(
        conn,
        [cache_1.cache_id, cache_2.cache_id]
    )
    assert result.status == 0, result.message


@pytest.mark.parametrize(
    'key, key_hint', [
        # integers
        (42, None),
        (43, ByteObject),
        (-44, ByteObject),
        (45, IntObject),
        (-46, IntObject),
        (47, ShortObject),
        (-48, ShortObject),
        (49, LongObject),
        (MAX_INT-50, LongObject),
        (MAX_INT+51, LongObject),

        # floating point
        (5.2, None),
        (5.354, FloatObject),
        (-5.556, FloatObject),
        (-57.58, DoubleObject),

        # boolean
        (True, None),
        (True, BoolObject),
        (False, BoolObject),

        # char
        ('A', CharObject),
        ('Z', CharObject),
        ('⅓', CharObject),
        ('á', CharObject),
        ('ы', CharObject),
        ('カ', CharObject),
        ('Ø', CharObject),
        ('ß', CharObject),

        # string
        ('This is a test string', None),
        ('Кириллица', None),
        ('Little Mary had a lamb', String),

        # UUID
        (UUID('12345678123456789876543298765432'), None),
        (UUID('74274274274274274274274274274274'), UUIDObject),
        (uuid4(), None),

        # decimal (long internal representation in Java)
        (decimal.Decimal('-234.567'), None),
        (decimal.Decimal('200.0'), None),
        (decimal.Decimal('123.456'), DecimalObject),
        (decimal.Decimal('1.0'), None),
        (decimal.Decimal('0.02'), None),

        # decimal (BigInteger internal representation in Java)
        (decimal.Decimal('12345671234567123.45671234567'), None),
        (decimal.Decimal('-845678456.7845678456784567845'), None),

        # date and time
        (datetime(1980, 1, 1), None),
        ((datetime(1980, 1, 1), 999), TimestampObject),
        (timedelta(days=99), TimeObject),

    ],
)
def test_affinity(client_partition_aware, key, key_hint):

    cache_1 = client_partition_aware.get_or_create_cache({
        PROP_NAME: 'test_cache_1',
        PROP_CACHE_MODE: CacheMode.PARTITIONED,
    })
    value = 42
    cache_1.put(key, value, key_hint=key_hint)

    best_node = cache_1.get_best_node(key, key_hint=key_hint)

    for node in filter(lambda n: n.alive, client_partition_aware._nodes):
        result = cache_local_peek(
            node, cache_1.cache_id, key, key_hint=key_hint,
        )
        if node is best_node:
            assert result.value == value, (
                'Affinity calculation error for {}'.format(key)
            )
        else:
            assert result.value is None, (
                'Affinity calculation error for {}'.format(key)
            )

    cache_1.destroy()


def test_affinity_for_generic_object(client_partition_aware):

    cache_1 = client_partition_aware.get_or_create_cache({
        PROP_NAME: 'test_cache_1',
        PROP_CACHE_MODE: CacheMode.PARTITIONED,
    })

    class KeyClass(
        metaclass=GenericObjectMeta,
        schema={
            'NO': IntObject,
            'NAME': String,
        },
    ):
        pass

    key = KeyClass()
    key.NO = 1
    key.NAME = 'test_string'

    cache_1.put(key, 42, key_hint=BinaryObject)

    best_node = cache_1.get_best_node(key, key_hint=BinaryObject)

    for node in filter(lambda n: n.alive, client_partition_aware._nodes):
        result = cache_local_peek(
            node, cache_1.cache_id, key, key_hint=BinaryObject,
        )
        if node is best_node:
            assert result.value == 42, (
                'Affinity calculation error for {}'.format(key)
            )
        else:
            assert result.value is None, (
                'Affinity calculation error for {}'.format(key)
            )

    cache_1.destroy()


def test_affinity_for_generic_object_without_type_hints(client_partition_aware):

    if not client_partition_aware.partition_awareness_supported_by_protocol:
        pytest.skip(
            'Best effort affinity is not supported by the protocol {}.'.format(
                client_partition_aware.protocol_version
            )
        )

    cache_1 = client_partition_aware.get_or_create_cache({
        PROP_NAME: 'test_cache_1',
        PROP_CACHE_MODE: CacheMode.PARTITIONED,
    })

    class KeyClass(
        metaclass=GenericObjectMeta,
        schema={
            'NO': IntObject,
            'NAME': String,
        },
    ):
        pass

    key = KeyClass()
    key.NO = 2
    key.NAME = 'another_test_string'

    cache_1.put(key, 42)

    best_node = cache_1.get_best_node(key)

    for node in filter(lambda n: n.alive, client_partition_aware._nodes):
        result = cache_local_peek(
            node, cache_1.cache_id, key
        )
        if node is best_node:
            assert result.value == 42, (
                'Affinity calculation error for {}'.format(key)
            )
        else:
            assert result.value is None, (
                'Affinity calculation error for {}'.format(key)
            )

    cache_1.destroy()
