blob: 449709e517da1c42864060b8031d1edf17eaea94 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from collections import OrderedDict
from decimal import Decimal
import pytest
from pyignite import GenericObjectMeta
from pyignite.aio_cache import AioCache
from pyignite.datatypes import (
BinaryObject, BoolObject, IntObject, DecimalObject, LongObject, String, ByteObject, ShortObject, FloatObject,
DoubleObject, CharObject, UUIDObject, DateObject, TimestampObject, TimeObject, EnumObject, BinaryEnumObject,
ByteArrayObject, ShortArrayObject, IntArrayObject, LongArrayObject, FloatArrayObject, DoubleArrayObject,
CharArrayObject, BoolArrayObject, UUIDArrayObject, DateArrayObject, TimestampArrayObject, TimeArrayObject,
EnumArrayObject, StringArrayObject, DecimalArrayObject, ObjectArrayObject, CollectionObject, MapObject)
from pyignite.datatypes.prop_codes import PROP_NAME, PROP_SQL_SCHEMA, PROP_QUERY_ENTITIES
insert_data = [
[1, True, 'asdf', 42, Decimal('2.4')],
[2, False, 'zxcvb', 43, Decimal('2.5')],
[3, True, 'qwerty', 44, Decimal('2.6')],
]
page_size = 100
scheme_name = 'PUBLIC'
table_sql_name = 'AllDataType'
table_cache_name = 'SQL_{}_{}'.format(
scheme_name,
table_sql_name.upper(),
)
create_query = '''
CREATE TABLE {} (
test_pk INTEGER(11) PRIMARY KEY,
test_bool BOOLEAN,
test_str VARCHAR(24),
test_int INTEGER(11),
test_decimal DECIMAL(11, 5),
)
'''.format(table_sql_name)
insert_query = '''
INSERT INTO {} (
test_pk, test_bool, test_str, test_int, test_decimal,
) VALUES (?, ?, ?, ?, ?)'''.format(table_sql_name)
select_query = '''SELECT * FROM {}'''.format(table_sql_name)
drop_query = 'DROP TABLE {} IF EXISTS'.format(table_sql_name)
@pytest.fixture
def table_cache_read(client):
client.sql(drop_query)
client.sql(create_query)
for line in insert_data:
client.sql(insert_query, query_args=line)
cache = client.get_cache(table_cache_name)
yield cache
client.sql(drop_query)
@pytest.fixture
async def table_cache_read_async(async_client):
await async_client.sql(drop_query)
await async_client.sql(create_query)
for line in insert_data:
await async_client.sql(insert_query, query_args=line)
cache = await async_client.get_cache(table_cache_name)
yield cache
await async_client.sql(drop_query)
def test_sql_read_as_binary(table_cache_read):
with table_cache_read.scan() as cursor:
# convert Binary object fields' values to a tuple
# to compare it with the initial data
for key, value in cursor:
assert key in {x[0] for x in insert_data}
assert (value.TEST_BOOL, value.TEST_STR, value.TEST_INT, value.TEST_DECIMAL) \
in {tuple(x[1:]) for x in insert_data}
@pytest.mark.asyncio
async def test_sql_read_as_binary_async(table_cache_read_async):
async with table_cache_read_async.scan() as cursor:
# convert Binary object fields' values to a tuple
# to compare it with the initial data
async for key, value in cursor:
assert key in {x[0] for x in insert_data}
assert (value.TEST_BOOL, value.TEST_STR, value.TEST_INT, value.TEST_DECIMAL) \
in {tuple(x[1:]) for x in insert_data}
class AllDataType(
metaclass=GenericObjectMeta,
type_name=table_cache_name,
schema=OrderedDict([
('TEST_BOOL', BoolObject),
('TEST_STR', String),
('TEST_INT', IntObject),
('TEST_DECIMAL', DecimalObject),
]),
):
pass
@pytest.fixture
def table_cache_write_settings():
return {
PROP_NAME: table_cache_name,
PROP_SQL_SCHEMA: scheme_name,
PROP_QUERY_ENTITIES: [
{
'table_name': table_sql_name.upper(),
'key_field_name': 'TEST_PK',
'key_type_name': 'java.lang.Integer',
'field_name_aliases': [],
'query_fields': [
{
'name': 'TEST_PK',
'type_name': 'java.lang.Integer',
'is_notnull_constraint_field': True,
},
{
'name': 'TEST_BOOL',
'type_name': 'java.lang.Boolean',
},
{
'name': 'TEST_STR',
'type_name': 'java.lang.String',
},
{
'name': 'TEST_INT',
'type_name': 'java.lang.Integer',
},
{
'name': 'TEST_DECIMAL',
'type_name': 'java.math.BigDecimal',
'default_value': Decimal('0.00'),
'precision': 11,
'scale': 2,
},
],
'query_indexes': [],
'value_type_name': table_cache_name,
'value_field_name': None,
},
],
}
@pytest.fixture
def table_cache_write(client, table_cache_write_settings):
cache = client.get_or_create_cache(table_cache_write_settings)
assert cache.settings, 'SQL table cache settings are empty'
for row in insert_data:
value = AllDataType()
(
value.TEST_BOOL,
value.TEST_STR,
value.TEST_INT,
value.TEST_DECIMAL,
) = row[1:]
cache.put(row[0], value, key_hint=IntObject)
data = cache.scan()
assert len(list(data)) == len(insert_data), 'Not all data was read as key-value'
yield cache
cache.destroy()
@pytest.fixture
async def async_table_cache_write(async_client, table_cache_write_settings):
cache = await async_client.get_or_create_cache(table_cache_write_settings)
assert await cache.settings(), 'SQL table cache settings are empty'
for row in insert_data:
value = AllDataType()
(
value.TEST_BOOL,
value.TEST_STR,
value.TEST_INT,
value.TEST_DECIMAL,
) = row[1:]
await cache.put(row[0], value, key_hint=IntObject)
async with cache.scan() as cursor:
data = [a async for a in cursor]
assert len(data) == len(insert_data), 'Not all data was read as key-value'
yield cache
await cache.destroy()
def test_sql_write_as_binary(client, table_cache_write):
# read rows as SQL
data = client.sql(select_query, include_field_names=True)
header_row = next(data)
for field_name in AllDataType.schema.keys():
assert field_name in header_row, 'Not all field names in header row'
data = list(data)
assert len(data) == len(insert_data), 'Not all data was read as SQL rows'
@pytest.mark.asyncio
async def test_sql_write_as_binary_async(async_client, async_table_cache_write):
# read rows as SQL
async with async_client.sql(select_query, include_field_names=True) as cursor:
header_row = await cursor.__anext__()
for field_name in AllDataType.schema.keys():
assert field_name in header_row, 'Not all field names in header row'
data = [v async for v in cursor]
assert len(data) == len(insert_data), 'Not all data was read as SQL rows'
def test_nested_binary_objects(cache):
__check_nested_binary_objects(cache)
@pytest.mark.asyncio
async def test_nested_binary_objects_async(async_cache):
await __check_nested_binary_objects(async_cache)
def __check_nested_binary_objects(cache):
class InnerType(
metaclass=GenericObjectMeta,
schema=OrderedDict([
('inner_int', LongObject),
('inner_str', String),
]),
):
pass
class OuterType(
metaclass=GenericObjectMeta,
schema=OrderedDict([
('outer_int', LongObject),
('nested_binary', BinaryObject),
('outer_str', String),
]),
):
pass
def prepare_obj():
inner = InnerType(inner_int=42, inner_str='This is a test string')
return OuterType(
outer_int=43,
nested_binary=inner,
outer_str='This is another test string'
)
def check_obj(result):
assert result.outer_int == 43
assert result.outer_str == 'This is another test string'
assert result.nested_binary.inner_int == 42
assert result.nested_binary.inner_str == 'This is a test string'
async def inner_async():
await cache.put(1, prepare_obj())
check_obj(await cache.get(1))
def inner():
cache.put(1, prepare_obj())
check_obj(cache.get(1))
return inner_async() if isinstance(cache, AioCache) else inner()
def test_add_schema_to_binary_object(cache):
__check_add_schema_to_binary_object(cache)
@pytest.mark.asyncio
async def test_add_schema_to_binary_object_async(async_cache):
await __check_add_schema_to_binary_object(async_cache)
def __check_add_schema_to_binary_object(cache):
class MyBinaryType(
metaclass=GenericObjectMeta,
schema=OrderedDict([
('test_str', String),
('test_int', LongObject),
('test_bool', BoolObject),
]),
):
pass
def prepare_bo_v1():
return MyBinaryType(test_str='Test string', test_int=42, test_bool=True)
def check_bo_v1(result):
assert result.test_str == 'Test string'
assert result.test_int == 42
assert result.test_bool is True
def prepare_bo_v2():
modified_schema = MyBinaryType.schema.copy()
modified_schema['test_decimal'] = DecimalObject
del modified_schema['test_bool']
class MyBinaryTypeV2(
metaclass=GenericObjectMeta,
type_name='MyBinaryType',
schema=modified_schema,
):
pass
assert MyBinaryType.type_id == MyBinaryTypeV2.type_id
assert MyBinaryType.schema_id != MyBinaryTypeV2.schema_id
return MyBinaryTypeV2(test_str='Another test', test_int=43, test_decimal=Decimal('2.34'))
def check_bo_v2(result):
assert result.test_str == 'Another test'
assert result.test_int == 43
assert result.test_decimal == Decimal('2.34')
assert not hasattr(result, 'test_bool')
async def inner_async():
await cache.put(1, prepare_bo_v1())
check_bo_v1(await cache.get(1))
await cache.put(2, prepare_bo_v2())
check_bo_v2(await cache.get(2))
def inner():
cache.put(1, prepare_bo_v1())
check_bo_v1(cache.get(1))
cache.put(2, prepare_bo_v2())
check_bo_v2(cache.get(2))
return inner_async() if isinstance(cache, AioCache) else inner()
def test_complex_object_names(cache):
"""
Test the ability to work with Complex types, which names contains symbols
not suitable for use in Python identifiers.
"""
__check_complex_object_names(cache)
@pytest.mark.asyncio
async def test_complex_object_names_async(async_cache):
await __check_complex_object_names(async_cache)
def __check_complex_object_names(cache):
type_name = 'Non.Pythonic#type-name$'
key = 'key'
data = 'test'
class NonPythonicallyNamedType(
metaclass=GenericObjectMeta,
type_name=type_name,
schema=OrderedDict([
('field', String),
])
):
pass
def check(obj):
assert obj.type_name == type_name, 'Complex type name mismatch'
assert obj.field == data, 'Complex object data failure'
async def inner_async():
await cache.put(key, NonPythonicallyNamedType(field=data))
check(await cache.get(key))
def inner():
cache.put(key, NonPythonicallyNamedType(field=data))
check(cache.get(key))
return inner_async() if isinstance(cache, AioCache) else inner()
class Internal(
metaclass=GenericObjectMeta, type_name='Internal',
schema=OrderedDict([
('id', IntObject),
('str', String)
])
):
pass
class NestedObject(
metaclass=GenericObjectMeta, type_name='NestedObject',
schema=OrderedDict([
('id', IntObject),
('str', String),
('internal', BinaryObject)
])
):
pass
@pytest.fixture
def complex_objects():
fixtures = []
obj_ascii = NestedObject()
obj_ascii.id = 1
obj_ascii.str = 'test_string'
obj_ascii.internal = Internal()
obj_ascii.internal.id = 2
obj_ascii.internal.str = 'lorem ipsum'
fixtures.append((obj_ascii, -1314567146))
obj_utf8 = NestedObject()
obj_utf8.id = 1
obj_utf8.str = 'юникод'
obj_utf8.internal = Internal()
obj_utf8.internal.id = 2
obj_utf8.internal.str = 'ユニコード'
fixtures.append((obj_utf8, -1945378474))
yield fixtures
def test_complex_object_hash(client, complex_objects):
for obj, hash in complex_objects:
assert hash == BinaryObject.hashcode(obj, client=client)
@pytest.mark.asyncio
async def test_complex_object_hash_async(async_client, complex_objects):
for obj, hash in complex_objects:
assert hash == await BinaryObject.hashcode_async(obj, client=async_client)
def camel_to_snake(name):
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()
fields = {camel_to_snake(type_.__name__): type_ for type_ in [
ByteObject, ShortObject, IntObject, LongObject, FloatObject, DoubleObject, CharObject, BoolObject, UUIDObject,
DateObject, TimestampObject, TimeObject, EnumObject, BinaryEnumObject, ByteArrayObject, ShortArrayObject,
IntArrayObject, LongArrayObject, FloatArrayObject, DoubleArrayObject, CharArrayObject, BoolArrayObject,
UUIDArrayObject, DateArrayObject, TimestampArrayObject, TimeArrayObject, EnumArrayObject, String,
StringArrayObject, DecimalObject, DecimalArrayObject, ObjectArrayObject, CollectionObject, MapObject,
BinaryObject]}
class AllTypesObject(metaclass=GenericObjectMeta, type_name='AllTypesObject', schema=fields):
pass
@pytest.fixture
def null_fields_object():
res = AllTypesObject()
for field in fields.keys():
setattr(res, field, None)
yield res
def test_complex_object_null_fields(cache, null_fields_object):
"""
Test that Python client can correctly write and read binary object that
contains null fields.
"""
cache.put(1, null_fields_object)
assert cache.get(1) == null_fields_object, 'Objects mismatch'
@pytest.mark.asyncio
async def test_complex_object_null_fields_async(async_cache, null_fields_object):
"""
Test that Python client can correctly write and read binary object that
contains null fields.
"""
await async_cache.put(1, null_fields_object)
assert await async_cache.get(1) == null_fields_object, 'Objects mismatch'
def test_object_with_collections_of_binary_objects(cache):
__check_object_with_collections_of_binary_objects(cache)
@pytest.mark.asyncio
async def test_object_with_collections_of_binary_objects_async(async_cache):
await __check_object_with_collections_of_binary_objects(async_cache)
def __check_object_with_collections_of_binary_objects(cache):
class Container(
metaclass=GenericObjectMeta,
schema={
'id': IntObject,
'collection': CollectionObject,
'array': ObjectArrayObject,
'map': MapObject
}
):
pass
class Value(
metaclass=GenericObjectMeta,
schema={
'id': IntObject,
'name': String
}
):
pass
def fixtures():
map_obj = (MapObject.HASH_MAP, {i: Value(i, f'val_{i}') for i in range(10)})
col_obj = (CollectionObject.ARR_LIST, [Value(i, f'val_{i}') for i in range(10)])
arr_obj = (ObjectArrayObject.OBJECT, [Value(i, f'val_{i}') for i in range(10)])
return [
Container(1, map=map_obj, collection=col_obj, array=arr_obj),
Container(2), # Check if collections are not set
]
async def inner_async():
for i, val in enumerate(fixtures()):
await cache.put(i, val)
assert await cache.get(i) == val
def inner():
for i, val in enumerate(fixtures()):
cache.put(i, val)
assert cache.get(i) == val
return inner_async() if isinstance(cache, AioCache) else inner()