blob: f25fedddcc78feb9c00ad85c802612bbceee7b30 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from pyignite.api import (
sql_fields, sql_fields_cursor_get_page,
sql, sql_cursor_get_page,
cache_get_configuration,
)
from pyignite.datatypes.cache_config import CacheMode
from pyignite.datatypes.prop_codes import *
from pyignite.exceptions import SQLError
from pyignite.utils import entity_id
from pyignite.binary import unwrap_binary
initial_data = [
('John', 'Doe', 5),
('Jane', 'Roe', 4),
('Joe', 'Bloggs', 4),
('Richard', 'Public', 3),
('Negidius', 'Numerius', 3),
]
create_query = '''CREATE TABLE Student (
id INT(11) PRIMARY KEY,
first_name CHAR(24),
last_name CHAR(32),
grade INT(11))'''
insert_query = '''INSERT INTO Student(id, first_name, last_name, grade)
VALUES (?, ?, ?, ?)'''
select_query = 'SELECT id, first_name, last_name, grade FROM Student'
drop_query = 'DROP TABLE Student IF EXISTS'
page_size = 4
def test_sql(client):
conn = client.random_node
# cleanup
client.sql(drop_query)
result = sql_fields(
conn,
0,
create_query,
page_size,
schema='PUBLIC',
include_field_names=True
)
assert result.status == 0, result.message
for i, data_line in enumerate(initial_data, start=1):
fname, lname, grade = data_line
result = sql_fields(
conn,
0,
insert_query,
page_size,
schema='PUBLIC',
query_args=[i, fname, lname, grade],
include_field_names=True
)
assert result.status == 0, result.message
result = cache_get_configuration(conn, 'SQL_PUBLIC_STUDENT')
assert result.status == 0, result.message
binary_type_name = result.value[PROP_QUERY_ENTITIES][0]['value_type_name']
result = sql(
conn,
'SQL_PUBLIC_STUDENT',
binary_type_name,
'TRUE',
page_size
)
assert result.status == 0, result.message
assert len(result.value['data']) == page_size
assert result.value['more'] is True
for wrapped_object in result.value['data'].values():
data = unwrap_binary(client, wrapped_object)
assert data.type_id == entity_id(binary_type_name)
cursor = result.value['cursor']
while result.value['more']:
result = sql_cursor_get_page(conn, cursor)
assert result.status == 0, result.message
for wrapped_object in result.value['data'].values():
data = unwrap_binary(client, wrapped_object)
assert data.type_id == entity_id(binary_type_name)
# repeat cleanup
result = sql_fields(conn, 0, drop_query, page_size, schema='PUBLIC')
assert result.status == 0
def test_sql_fields(client):
conn = client.random_node
# cleanup
client.sql(drop_query)
result = sql_fields(
conn,
0,
create_query,
page_size,
schema='PUBLIC',
include_field_names=True
)
assert result.status == 0, result.message
for i, data_line in enumerate(initial_data, start=1):
fname, lname, grade = data_line
result = sql_fields(
conn,
0,
insert_query,
page_size,
schema='PUBLIC',
query_args=[i, fname, lname, grade],
include_field_names=True
)
assert result.status == 0, result.message
result = sql_fields(
conn,
0,
select_query,
page_size,
schema='PUBLIC',
include_field_names=True
)
assert result.status == 0
assert len(result.value['data']) == page_size
assert result.value['more'] is True
cursor = result.value['cursor']
result = sql_fields_cursor_get_page(conn, cursor, field_count=4)
assert result.status == 0
assert len(result.value['data']) == len(initial_data) - page_size
assert result.value['more'] is False
# repeat cleanup
result = sql_fields(conn, 0, drop_query, page_size, schema='PUBLIC')
assert result.status == 0
def test_long_multipage_query(client):
"""
The test creates a table with 13 columns (id and 12 enumerated columns)
and 20 records with id in range from 1 to 20. Values of enumerated columns
are = column number * id.
The goal is to ensure that all the values are selected in a right order.
"""
fields = ["id", "abc", "ghi", "def", "jkl", "prs", "mno", "tuw", "zyz", "abc1", "def1", "jkl1", "prs1"]
client.sql('DROP TABLE LongMultipageQuery IF EXISTS')
client.sql("CREATE TABLE LongMultiPageQuery (%s, %s)" %
(fields[0] + " INT(11) PRIMARY KEY", ",".join(map(lambda f: f + " INT(11)", fields[1:]))))
for id in range(1, 21):
client.sql(
"INSERT INTO LongMultipageQuery (%s) VALUES (%s)" % (",".join(fields), ",".join("?" * len(fields))),
query_args=[id] + list(i * id for i in range(1, len(fields))))
result = client.sql('SELECT * FROM LongMultipageQuery', page_size=1)
for page in result:
assert len(page) == len(fields)
for field_number, value in enumerate(page[1:], start=1):
assert value == field_number * page[0]
client.sql(drop_query)
def test_sql_not_create_cache_with_schema(client):
with pytest.raises(SQLError, match=r".*Cache does not exist.*"):
client.sql(schema=None, cache='NOT_EXISTING', query_str='select * from NotExisting')
def test_sql_not_create_cache_with_cache(client):
with pytest.raises(SQLError, match=r".*Failed to set schema.*"):
client.sql(schema='NOT_EXISTING', query_str='select * from NotExisting')
def test_query_with_cache(client):
test_key = 42
test_value = 'Lorem ipsum'
cache_name = test_query_with_cache.__name__.upper()
schema_name = f'{cache_name}_schema'.upper()
table_name = f'{cache_name}_table'.upper()
cache = client.create_cache({
PROP_NAME: cache_name,
PROP_SQL_SCHEMA: schema_name,
PROP_CACHE_MODE: CacheMode.PARTITIONED,
PROP_QUERY_ENTITIES: [
{
'table_name': table_name,
'key_field_name': 'KEY',
'value_field_name': 'VALUE',
'key_type_name': 'java.lang.Long',
'value_type_name': 'java.lang.String',
'query_indexes': [],
'field_name_aliases': [],
'query_fields': [
{
'name': 'KEY',
'type_name': 'java.lang.Long',
'is_key_field': True,
'is_notnull_constraint_field': True,
},
{
'name': 'VALUE',
'type_name': 'java.lang.String',
},
],
},
],
})
cache.put(test_key, test_value)
args_to_check = [
('schema', schema_name),
('cache', cache),
('cache', cache.name),
('cache', cache.cache_id)
]
for param, value in args_to_check:
page = client.sql(f'select value from {table_name}', **{param: value})
received = next(page)[0]
assert test_value == received