blob: c9a6b603c7cc402a959a0b205ac49470b6590831 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import decimal
from datetime import datetime, timedelta
from uuid import UUID, uuid4
import pytest
from pyignite import GenericObjectMeta, AioClient
from pyignite.api import (
cache_get_node_partitions, cache_get_node_partitions_async, cache_local_peek, cache_local_peek_async
)
from pyignite.constants import MAX_INT
from pyignite.datatypes import (
BinaryObject, ByteArray, ByteObject, IntObject, ShortObject, LongObject, FloatObject, DoubleObject, BoolObject,
CharObject, String, UUIDObject, DecimalObject, TimestampObject, TimeObject
)
from pyignite.datatypes.cache_config import CacheMode
from pyignite.datatypes.prop_codes import PROP_NAME, PROP_CACHE_MODE, PROP_CACHE_KEY_CONFIGURATION
from tests.util import wait_for_condition, wait_for_condition_async
def test_get_node_partitions(client, caches):
cache_ids = [cache.cache_id for cache in caches]
mappings = __get_mappings(client, cache_ids)
__check_mappings(mappings, cache_ids)
@pytest.mark.asyncio
async def test_get_node_partitions_async(async_client, async_caches):
cache_ids = [cache.cache_id for cache in async_caches]
mappings = await __get_mappings(async_client, cache_ids)
__check_mappings(mappings, cache_ids)
def __wait_for_ready_affinity(client, cache_ids):
def inner():
def condition():
result = __get_mappings(client, cache_ids)
return len(result.value['partition_mapping']) == len(cache_ids)
wait_for_condition(condition)
async def inner_async():
async def condition():
result = await __get_mappings(client, cache_ids)
return len(result.value['partition_mapping']) == len(cache_ids)
await wait_for_condition_async(condition)
return inner_async() if isinstance(client, AioClient) else inner()
def __get_mappings(client, cache_ids):
def inner():
conn = client.random_node
result = cache_get_node_partitions(conn, cache_ids)
assert result.status == 0, result.message
return result
async def inner_async():
conn = await client.random_node()
result = await cache_get_node_partitions_async(conn, cache_ids)
assert result.status == 0, result.message
return result
return inner_async() if isinstance(client, AioClient) else inner()
def __check_mappings(result, cache_ids):
partition_mapping = result.value['partition_mapping']
for i, cache_id in enumerate(cache_ids):
cache_mapping = partition_mapping[cache_id]
assert 'is_applicable' in cache_mapping
# Check replicated cache
if i == 3:
assert not cache_mapping['is_applicable']
assert 'node_mapping' not in cache_mapping
assert cache_mapping['number_of_partitions'] == 0
else:
# Check cache config
if i == 2:
assert cache_mapping['cache_config']
assert cache_mapping['is_applicable']
assert cache_mapping['node_mapping']
assert cache_mapping['number_of_partitions'] == 1024
@pytest.fixture
def caches(client):
yield from __create_caches_fixture(client)
@pytest.fixture
async def async_caches(async_client):
async for caches in __create_caches_fixture(async_client):
yield caches
def __create_caches_fixture(client):
caches_to_create = []
for i in range(0, 5):
cache_name = f'test_cache_{i}'
if i == 2:
caches_to_create.append((
cache_name,
{
PROP_NAME: cache_name,
PROP_CACHE_KEY_CONFIGURATION: [
{
'type_name': ByteArray.type_name,
'affinity_key_field_name': 'byte_affinity',
}
]
}))
elif i == 3:
caches_to_create.append((
cache_name,
{
PROP_NAME: cache_name,
PROP_CACHE_MODE: CacheMode.REPLICATED
}
))
else:
caches_to_create.append((cache_name, None))
def generate_caches():
caches = []
for name, config in caches_to_create:
if config:
cache = client.get_or_create_cache(config)
else:
cache = client.get_or_create_cache(name)
caches.append(cache)
return asyncio.gather(*caches) if isinstance(client, AioClient) else caches
def inner():
caches = []
try:
caches = generate_caches()
__wait_for_ready_affinity(client, [cache.cache_id for cache in caches])
yield caches
finally:
for cache in caches:
cache.destroy()
async def inner_async():
caches = []
try:
caches = await generate_caches()
await __wait_for_ready_affinity(client, [cache.cache_id for cache in caches])
yield caches
finally:
await asyncio.gather(*[cache.destroy() for cache in caches])
return inner_async() if isinstance(client, AioClient) else inner()
@pytest.fixture
def cache(client):
cache = client.get_or_create_cache({
PROP_NAME: 'test_cache_1',
PROP_CACHE_MODE: CacheMode.PARTITIONED,
})
try:
__wait_for_ready_affinity(client, [cache.cache_id])
yield cache
finally:
cache.destroy()
@pytest.fixture
async def async_cache(async_client):
cache = await async_client.get_or_create_cache({
PROP_NAME: 'test_cache_1',
PROP_CACHE_MODE: CacheMode.PARTITIONED,
})
try:
await __wait_for_ready_affinity(async_client, [cache.cache_id])
yield cache
finally:
await cache.destroy()
affinity_primitives_params = [
# integers
(42, None),
(43, ByteObject),
(-44, ByteObject),
(45, IntObject),
(-46, IntObject),
(47, ShortObject),
(-48, ShortObject),
(49, LongObject),
(MAX_INT - 50, LongObject),
(MAX_INT + 51, LongObject),
# floating point
(5.2, None),
(5.354, FloatObject),
(-5.556, FloatObject),
(-57.58, DoubleObject),
# boolean
(True, None),
(True, BoolObject),
(False, BoolObject),
# char
('A', CharObject),
('Z', CharObject),
('⅓', CharObject),
('á', CharObject),
('ы', CharObject),
('カ', CharObject),
('Ø', CharObject),
('ß', CharObject),
# string
('This is a test string', None),
('Кириллица', None),
('Little Mary had a lamb', String),
# UUID
(UUID('12345678123456789876543298765432'), None),
(UUID('74274274274274274274274274274274'), UUIDObject),
(uuid4(), None),
# decimal (long internal representation in Java)
(decimal.Decimal('-234.567'), None),
(decimal.Decimal('200.0'), None),
(decimal.Decimal('123.456'), DecimalObject),
(decimal.Decimal('1.0'), None),
(decimal.Decimal('0.02'), None),
# decimal (BigInteger internal representation in Java)
(decimal.Decimal('12345671234567123.45671234567'), None),
(decimal.Decimal('-845678456.7845678456784567845'), None),
# date and time
(datetime(1980, 1, 1), None),
((datetime(1980, 1, 1), 999), TimestampObject),
(timedelta(days=99), TimeObject)
]
@pytest.mark.parametrize('key, key_hint', affinity_primitives_params)
def test_affinity(client, cache, key, key_hint):
__check_best_node_calculation(client, cache, key, 42, key_hint=key_hint)
@pytest.mark.parametrize('key, key_hint', affinity_primitives_params)
@pytest.mark.asyncio
async def test_affinity_async(async_client, async_cache, key, key_hint):
await __check_best_node_calculation(async_client, async_cache, key, 42, key_hint=key_hint)
@pytest.fixture
def key_generic_object():
class KeyClass(
metaclass=GenericObjectMeta,
schema={
'NO': IntObject,
'NAME': String,
},
):
pass
key = KeyClass()
key.NO = 1
key.NAME = 'test_string'
yield key
@pytest.mark.parametrize('with_type_hint', [True, False])
def test_affinity_for_generic_object(client, cache, key_generic_object, with_type_hint):
key_hint = BinaryObject if with_type_hint else None
__check_best_node_calculation(client, cache, key_generic_object, 42, key_hint=key_hint)
@pytest.mark.parametrize('with_type_hint', [True, False])
@pytest.mark.asyncio
async def test_affinity_for_generic_object_async(async_client, async_cache, key_generic_object, with_type_hint):
key_hint = BinaryObject if with_type_hint else None
await __check_best_node_calculation(async_client, async_cache, key_generic_object, 42, key_hint=key_hint)
def __check_best_node_calculation(client, cache, key, value, key_hint=None):
def check_peek_value(node, best_node, result):
if node is best_node:
assert result.value == value, f'Affinity calculation error for {key}'
else:
assert result.value is None, f'Affinity calculation error for {key}'
def inner():
cache.put(key, value, key_hint=key_hint)
best_node = client.get_best_node(cache, key, key_hint=key_hint)
for node in filter(lambda n: n.alive, client._nodes):
result = cache_local_peek(node, cache.cache_info, key, key_hint=key_hint)
check_peek_value(node, best_node, result)
async def inner_async():
await cache.put(key, value, key_hint=key_hint)
best_node = await client.get_best_node(cache, key, key_hint=key_hint)
for node in filter(lambda n: n.alive, client._nodes):
result = await cache_local_peek_async(node, cache.cache_info, key, key_hint=key_hint)
check_peek_value(node, best_node, result)
return inner_async() if isinstance(client, AioClient) else inner()